From 3af57ece7ae5d43cfbcb6a9215433705aad70b80 Mon Sep 17 00:00:00 2001 From: teor Date: Sat, 20 Feb 2021 10:43:38 +1000 Subject: [PATCH] Partly revert "Fix poll_ready usage in ChainVerifier" (#1735) * Revert "Fix poll_ready usage in ChainVerifier (#1700)" This reverts commit 0723ac5be17ba7bf208a46c59b36cdc933011f45. * Keep the VERIFIER_BUFFER_BOUND change * Correctly implement multiple readiness --- zebra-consensus/src/chain.rs | 73 +++++++++++++++++------------------- 1 file changed, 35 insertions(+), 38 deletions(-) diff --git a/zebra-consensus/src/chain.rs b/zebra-consensus/src/chain.rs index 5c7e1c6a..e23d78c2 100644 --- a/zebra-consensus/src/chain.rs +++ b/zebra-consensus/src/chain.rs @@ -22,11 +22,12 @@ use zebra_state as zs; use crate::{ block::BlockVerifier, - checkpoint::{CheckpointList, CheckpointVerifier}, + block::VerifyBlockError, + checkpoint::{CheckpointList, CheckpointVerifier, VerifyCheckpointError}, BoxError, Config, }; -/// The bound for each verifier's buffer. +/// The bound for the chain verifier's buffer. /// /// We choose the verifier buffer bound based on the maximum number of /// concurrent verifier users, to avoid contention: @@ -46,20 +47,17 @@ where S: Service + Send + Clone + 'static, S::Future: Send + 'static, { - // Normally, we erase the types on buffer-wrapped services. - // But if we did that here, the block and checkpoint services would be - // type-indistinguishable, risking future substitution errors. - block_verifier: Buffer, Arc>, - checkpoint_verifier: Buffer, Arc>, + block: BlockVerifier, + checkpoint: CheckpointVerifier, max_checkpoint_height: block::Height, } #[derive(Debug, Display, Error)] pub enum VerifyChainError { /// block could not be checkpointed - Checkpoint(#[source] BoxError), + Checkpoint(#[source] VerifyCheckpointError), /// block could not be verified - Block(#[source] BoxError), + Block(#[source] VerifyBlockError), } impl Service> for ChainVerifier @@ -72,34 +70,38 @@ where type Future = Pin> + Send + 'static>>; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { // Correctness: // - // We can't call `poll_ready` on the block and checkpoint verifiers here, - // because each `poll_ready` must be followed by a `call`, and we don't - // know which verifier we're going to choose yet. - // See #1593 for details. + // We acquire checkpoint readiness before block readiness, to avoid an unlikely + // hang during the checkpoint to block verifier transition. If the checkpoint and + // block verifiers are contending for the same buffer/batch, we want the checkpoint + // verifier to win, so that checkpoint verification completes, and block verification + // can start. (Buffers and batches have multiple slots, so this contention is unlikely.) + use futures::ready; + // The chain verifier holds one slot in each verifier, for each concurrent task. + // Therefore, any shared buffers or batches polled by these verifiers should double + // their bounds. (For example, the state service buffer.) + ready!(self + .checkpoint + .poll_ready(cx) + .map_err(VerifyChainError::Checkpoint))?; + ready!(self.block.poll_ready(cx).map_err(VerifyChainError::Block))?; Poll::Ready(Ok(())) } fn call(&mut self, block: Arc) -> Self::Future { match block.coinbase_height() { - // Correctness: - // - // We use `ServiceExt::oneshot` to make sure every `poll_ready` has - // a matching `call`. See #1593 for details. Some(height) if height <= self.max_checkpoint_height => self - .checkpoint_verifier - .clone() - .oneshot(block) + .checkpoint + .call(block) .map_err(VerifyChainError::Checkpoint) .boxed(), // This also covers blocks with no height, which the block verifier // will reject immediately. _ => self - .block_verifier - .clone() - .oneshot(block) + .block + .call(block) .map_err(VerifyChainError::Block) .boxed(), } @@ -122,7 +124,7 @@ where pub async fn init( config: Config, network: Network, - state_service: S, + mut state_service: S, ) -> Buffer, block::Hash, VerifyChainError>, Arc> where S: Service + Send + Clone + 'static, @@ -137,13 +139,11 @@ where .expect("hardcoded checkpoint list extends past sapling activation") }; - // Correctness: - // - // We use `ServiceExt::oneshot` to make sure every `poll_ready` has a - // matching `call`. See #1593 for details. let tip = match state_service - .clone() - .oneshot(zs::Request::Tip) + .ready_and() + .await + .unwrap() + .call(zs::Request::Tip) .await .unwrap() { @@ -152,16 +152,13 @@ where }; tracing::info!(?tip, ?max_checkpoint_height, "initializing chain verifier"); - let block_verifier = BlockVerifier::new(network, state_service.clone()); - let checkpoint_verifier = CheckpointVerifier::from_checkpoint_list(list, tip, state_service); - - let block_verifier = Buffer::new(block_verifier, VERIFIER_BUFFER_BOUND); - let checkpoint_verifier = Buffer::new(checkpoint_verifier, VERIFIER_BUFFER_BOUND); + let block = BlockVerifier::new(network, state_service.clone()); + let checkpoint = CheckpointVerifier::from_checkpoint_list(list, tip, state_service); Buffer::new( BoxService::new(ChainVerifier { - block_verifier, - checkpoint_verifier, + block, + checkpoint, max_checkpoint_height, }), VERIFIER_BUFFER_BOUND,