#[cfg(test)] mod tests; use displaydoc::Display; use futures::{FutureExt, TryFutureExt}; use std::{ future::Future, pin::Pin, sync::Arc, task::{Context, Poll}, }; use thiserror::Error; use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt}; use tracing::instrument; use zebra_chain::{ block::{self, Block}, parameters::{Network, NetworkUpgrade::Sapling}, }; use zebra_state as zs; use crate::{ block::BlockVerifier, block::VerifyBlockError, checkpoint::{CheckpointList, CheckpointVerifier, VerifyCheckpointError}, BoxError, Config, }; /// The maximum expected gap between blocks. /// /// Used to identify unexpected out of order blocks. const MAX_EXPECTED_BLOCK_GAP: u32 = 100_000; /// The chain verifier routes requests to either the checkpoint verifier or the /// block verifier, depending on the maximum checkpoint height. struct ChainVerifier where S: Service + Send + Clone + 'static, S::Future: Send + 'static, { block: BlockVerifier, checkpoint: CheckpointVerifier, max_checkpoint_height: block::Height, last_block_height: Option, } #[derive(Debug, Display, Error)] pub enum VerifyChainError { /// block could not be checkpointed Checkpoint(VerifyCheckpointError), /// block could not be verified Block(VerifyBlockError), } impl Service> for ChainVerifier where S: Service + Send + Clone + 'static, S::Future: Send + 'static, { type Response = block::Hash; type Error = VerifyChainError; type Future = Pin> + Send + 'static>>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { match (self.checkpoint.poll_ready(cx), self.block.poll_ready(cx)) { // First, fail if either service fails. (Poll::Ready(Err(e)), _) => Poll::Ready(Err(VerifyChainError::Checkpoint(e))), (_, Poll::Ready(Err(e))) => Poll::Ready(Err(VerifyChainError::Block(e))), // Second, we're unready if either service is unready. (Poll::Pending, _) | (_, Poll::Pending) => Poll::Pending, // Finally, we're ready if both services are ready and OK. (Poll::Ready(Ok(())), Poll::Ready(Ok(()))) => Poll::Ready(Ok(())), } } fn call(&mut self, block: Arc) -> Self::Future { let height = block.coinbase_height(); // TODO: do we still need this logging? // Log an info-level message on unexpected out of order blocks let is_unexpected_gap = match (height, self.last_block_height) { (Some(block::Height(height)), Some(block::Height(last_block_height))) if (height > last_block_height + MAX_EXPECTED_BLOCK_GAP) || (height + MAX_EXPECTED_BLOCK_GAP < last_block_height) => { self.last_block_height = Some(block::Height(height)); true } (Some(height), _) => { self.last_block_height = Some(height); false } // The other cases are covered by the verifiers _ => false, }; // Log a message on unexpected out of order blocks. // // The sync service rejects most of these blocks, but we // still want to know if a large number get through. if is_unexpected_gap { tracing::debug!( "large block height gap: this block or the previous block is out of order" ); } self.last_block_height = height; if let Some(height) = height { if height <= self.max_checkpoint_height { return self .checkpoint .call(block) .map_err(VerifyChainError::Checkpoint) .boxed(); } } // For the purposes of routing requests, we can send blocks // with no height to the block verifier, which will reject them. // // We choose the block verifier because it doesn't have any // internal state, so it will always return the same error for a // block with no height. self.block .call(block) .map_err(VerifyChainError::Block) .boxed() } } /// Return a block verification service, using `config`, `network` and /// `state_service`. /// /// This function should only be called once for a particular state service. #[instrument(skip(state_service))] pub async fn init( config: Config, network: Network, mut state_service: S, ) -> Buffer, block::Hash, VerifyChainError>, Arc> where S: Service + Send + Clone + 'static, S::Future: Send + 'static, { let list = CheckpointList::new(network); let max_checkpoint_height = if config.checkpoint_sync { list.max_height() } else { list.min_height_in_range(Sapling.activation_height(network).unwrap()..) .expect("hardcoded checkpoint list extends past sapling activation") }; let tip = match state_service .ready_and() .await .unwrap() .call(zs::Request::Tip) .await .unwrap() { zs::Response::Tip(tip) => tip, _ => unreachable!("wrong response to Request::Tip"), }; tracing::info!(?tip, ?max_checkpoint_height, "initializing chain verifier"); let block = BlockVerifier::new(state_service.clone()); let checkpoint = CheckpointVerifier::from_checkpoint_list(list, tip, state_service); Buffer::new( BoxService::new(ChainVerifier { block, checkpoint, max_checkpoint_height, last_block_height: None, }), 3, ) }