Partly revert "Fix poll_ready usage in ChainVerifier" (#1735)

* Revert "Fix poll_ready usage in ChainVerifier (#1700)"

This reverts commit 0723ac5be1.

* Keep the VERIFIER_BUFFER_BOUND change
* Correctly implement multiple readiness
This commit is contained in:
teor 2021-02-20 10:43:38 +10:00 committed by GitHub
parent d4f2f27218
commit 3af57ece7a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 35 additions and 38 deletions

View File

@ -22,11 +22,12 @@ use zebra_state as zs;
use crate::{ use crate::{
block::BlockVerifier, block::BlockVerifier,
checkpoint::{CheckpointList, CheckpointVerifier}, block::VerifyBlockError,
checkpoint::{CheckpointList, CheckpointVerifier, VerifyCheckpointError},
BoxError, Config, BoxError, Config,
}; };
/// The bound for each verifier's buffer. /// The bound for the chain verifier's buffer.
/// ///
/// We choose the verifier buffer bound based on the maximum number of /// We choose the verifier buffer bound based on the maximum number of
/// concurrent verifier users, to avoid contention: /// concurrent verifier users, to avoid contention:
@ -46,20 +47,17 @@ where
S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static, S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
S::Future: Send + 'static, S::Future: Send + 'static,
{ {
// Normally, we erase the types on buffer-wrapped services. block: BlockVerifier<S>,
// But if we did that here, the block and checkpoint services would be checkpoint: CheckpointVerifier<S>,
// type-indistinguishable, risking future substitution errors.
block_verifier: Buffer<BlockVerifier<S>, Arc<block::Block>>,
checkpoint_verifier: Buffer<CheckpointVerifier<S>, Arc<block::Block>>,
max_checkpoint_height: block::Height, max_checkpoint_height: block::Height,
} }
#[derive(Debug, Display, Error)] #[derive(Debug, Display, Error)]
pub enum VerifyChainError { pub enum VerifyChainError {
/// block could not be checkpointed /// block could not be checkpointed
Checkpoint(#[source] BoxError), Checkpoint(#[source] VerifyCheckpointError),
/// block could not be verified /// block could not be verified
Block(#[source] BoxError), Block(#[source] VerifyBlockError),
} }
impl<S> Service<Arc<Block>> for ChainVerifier<S> impl<S> Service<Arc<Block>> for ChainVerifier<S>
@ -72,34 +70,38 @@ where
type Future = type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>; Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// Correctness: // Correctness:
// //
// We can't call `poll_ready` on the block and checkpoint verifiers here, // We acquire checkpoint readiness before block readiness, to avoid an unlikely
// because each `poll_ready` must be followed by a `call`, and we don't // hang during the checkpoint to block verifier transition. If the checkpoint and
// know which verifier we're going to choose yet. // block verifiers are contending for the same buffer/batch, we want the checkpoint
// See #1593 for details. // verifier to win, so that checkpoint verification completes, and block verification
// can start. (Buffers and batches have multiple slots, so this contention is unlikely.)
use futures::ready;
// The chain verifier holds one slot in each verifier, for each concurrent task.
// Therefore, any shared buffers or batches polled by these verifiers should double
// their bounds. (For example, the state service buffer.)
ready!(self
.checkpoint
.poll_ready(cx)
.map_err(VerifyChainError::Checkpoint))?;
ready!(self.block.poll_ready(cx).map_err(VerifyChainError::Block))?;
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
} }
fn call(&mut self, block: Arc<Block>) -> Self::Future { fn call(&mut self, block: Arc<Block>) -> Self::Future {
match block.coinbase_height() { match block.coinbase_height() {
// Correctness:
//
// We use `ServiceExt::oneshot` to make sure every `poll_ready` has
// a matching `call`. See #1593 for details.
Some(height) if height <= self.max_checkpoint_height => self Some(height) if height <= self.max_checkpoint_height => self
.checkpoint_verifier .checkpoint
.clone() .call(block)
.oneshot(block)
.map_err(VerifyChainError::Checkpoint) .map_err(VerifyChainError::Checkpoint)
.boxed(), .boxed(),
// This also covers blocks with no height, which the block verifier // This also covers blocks with no height, which the block verifier
// will reject immediately. // will reject immediately.
_ => self _ => self
.block_verifier .block
.clone() .call(block)
.oneshot(block)
.map_err(VerifyChainError::Block) .map_err(VerifyChainError::Block)
.boxed(), .boxed(),
} }
@ -122,7 +124,7 @@ where
pub async fn init<S>( pub async fn init<S>(
config: Config, config: Config,
network: Network, network: Network,
state_service: S, mut state_service: S,
) -> Buffer<BoxService<Arc<Block>, block::Hash, VerifyChainError>, Arc<Block>> ) -> Buffer<BoxService<Arc<Block>, block::Hash, VerifyChainError>, Arc<Block>>
where where
S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static, S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
@ -137,13 +139,11 @@ where
.expect("hardcoded checkpoint list extends past sapling activation") .expect("hardcoded checkpoint list extends past sapling activation")
}; };
// Correctness:
//
// We use `ServiceExt::oneshot` to make sure every `poll_ready` has a
// matching `call`. See #1593 for details.
let tip = match state_service let tip = match state_service
.clone() .ready_and()
.oneshot(zs::Request::Tip) .await
.unwrap()
.call(zs::Request::Tip)
.await .await
.unwrap() .unwrap()
{ {
@ -152,16 +152,13 @@ where
}; };
tracing::info!(?tip, ?max_checkpoint_height, "initializing chain verifier"); tracing::info!(?tip, ?max_checkpoint_height, "initializing chain verifier");
let block_verifier = BlockVerifier::new(network, state_service.clone()); let block = BlockVerifier::new(network, state_service.clone());
let checkpoint_verifier = CheckpointVerifier::from_checkpoint_list(list, tip, state_service); let checkpoint = CheckpointVerifier::from_checkpoint_list(list, tip, state_service);
let block_verifier = Buffer::new(block_verifier, VERIFIER_BUFFER_BOUND);
let checkpoint_verifier = Buffer::new(checkpoint_verifier, VERIFIER_BUFFER_BOUND);
Buffer::new( Buffer::new(
BoxService::new(ChainVerifier { BoxService::new(ChainVerifier {
block_verifier, block,
checkpoint_verifier, checkpoint,
max_checkpoint_height, max_checkpoint_height,
}), }),
VERIFIER_BUFFER_BOUND, VERIFIER_BUFFER_BOUND,