//! Chain state updates for Zebra. //! //! Chain state updates occur in multiple stages: //! - verify blocks (using `BlockVerifier` or `CheckpointVerifier`) //! - update the list of verified blocks on disk //! - create the chain state needed to verify child blocks //! - choose the best tip from all the available chain tips //! - update the mature chain state on disk //! - prune orphaned side-chains //! //! Chain state updates are provided via a `tower::Service`, to support //! backpressure and batch verification. #[cfg(test)] mod tests; use crate::checkpoint::CheckpointVerifier; use futures_util::FutureExt; use std::{ error, future::Future, pin::Pin, sync::Arc, task::{Context, Poll}, }; use tower::{buffer::Buffer, Service, ServiceExt}; use tracing_futures::Instrument; use zebra_chain::block::{Block, BlockHeaderHash}; use zebra_chain::types::BlockHeight; use zebra_chain::Network; /// The maximum expected gap between blocks. /// /// Used to identify unexpected high blocks. const MAX_EXPECTED_BLOCK_GAP: u32 = 100_000; struct ChainVerifier { /// The underlying `BlockVerifier`, possibly wrapped in other services. block_verifier: BV, /// The underlying `CheckpointVerifier`, wrapped in a buffer, so we can /// clone and share it with futures. checkpoint_verifier: Buffer>, /// The maximum checkpoint height for `checkpoint_verifier`. max_checkpoint_height: BlockHeight, /// The underlying `ZebraState`, possibly wrapped in other services. state_service: S, /// The last block height. Used for debugging. /// /// Not updated for unexpected high blocks. last_block_height: BlockHeight, } /// The error type for the ChainVerifier Service. // TODO(jlusby): Error = Report ? type Error = Box; /// The ChainVerifier service implementation. /// /// After verification, blocks are added to the underlying state service. impl Service> for ChainVerifier where BV: Service, Response = BlockHeaderHash, Error = Error> + Send + Clone + 'static, BV::Future: Send + 'static, S: Service + Send + Clone + 'static, S::Future: Send + 'static, { type Response = BlockHeaderHash; type Error = Error; type Future = Pin> + Send + 'static>>; fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { // We don't expect the state or verifiers to exert backpressure on our // users, so we don't need to call `state_service.poll_ready()` here. // (And we don't know which verifier to choose at this point, anyway.) Poll::Ready(Ok(())) } fn call(&mut self, block: Arc) -> Self::Future { // TODO(jlusby): Error = Report, handle errors from state_service. let mut block_verifier = self.block_verifier.clone(); let mut checkpoint_verifier = self.checkpoint_verifier.clone(); let mut state_service = self.state_service.clone(); let max_checkpoint_height = self.max_checkpoint_height; let span = tracing::debug_span!( "block_verify", height = ?block.coinbase_height(), hash = ?BlockHeaderHash::from(block.as_ref()) ); let height = block.coinbase_height(); // Log an info-level message on unexpected high blocks let is_unexpected_high_block = match height { Some(BlockHeight(height)) if (height > self.last_block_height.0 + MAX_EXPECTED_BLOCK_GAP) => { true } Some(height) => { // Update the last height if the block height was expected self.last_block_height = height; false } _ => false, }; async move { // TODO(teor): in the post-sapling checkpoint range, allow callers // to use BlockVerifier, CheckpointVerifier, or both. // Call a verifier based on the block height and checkpoints. match height { Some(height) if (height <= max_checkpoint_height) => { checkpoint_verifier .ready_and() .await? .call(block.clone()) .await? } _ => { // Log an info-level message on early high blocks. // The sync service rejects most of these blocks, but we // still want to know if a large number get through. if is_unexpected_high_block { tracing::info!(?height, "unexpected high block"); } block_verifier .ready_and() .await? .call(block.clone()) .await? } }; // TODO(teor): // - handle chain reorgs // - adjust state_service "unique block height" conditions let add_block = state_service .ready_and() .await? .call(zebra_state::Request::AddBlock { block }); match add_block.await? { zebra_state::Response::Added { hash } => Ok(hash), _ => Err("adding block to zebra-state failed".into()), } } .instrument(span) .boxed() } } /// Return a chain verification service, using `network` and `state_service`. /// /// Gets the initial tip from the state service, and uses it to create a block /// verifier and checkpoint verifier. /// /// This function should only be called once for a particular state service. If /// you need shared block or checkpoint verfiers, create them yourself, and pass /// them to `init_from_verifiers`. // // TODO: revise this interface when we generate our own blocks, or validate // mempool transactions. We might want to share the BlockVerifier, and we // might not want to add generated blocks to the state. pub async fn init( network: Network, state_service: S, ) -> impl Service< Arc, Response = BlockHeaderHash, Error = Error, Future = impl Future>, > + Send + Clone + 'static where S: Service + Send + Clone + 'static, S::Future: Send + 'static, { let initial_tip = zebra_state::initial_tip(state_service.clone()) .await .expect("State service poll_ready is Ok"); let height = initial_tip.clone().map(|b| b.coinbase_height()).flatten(); let hash = initial_tip.clone().map(|b| b.hash()); tracing::debug!( ?network, ?height, ?hash, "initialising ChainVerifier with network and initial tip" ); let block_verifier = crate::block::init(state_service.clone()); let checkpoint_verifier = CheckpointVerifier::new(network, initial_tip); init_from_verifiers(block_verifier, checkpoint_verifier, state_service, height) } /// Return a chain verification service, using the provided verifier, state /// services, and initial tip height. /// /// The chain verifier holds a state service of type `S`, used as context for /// block validation and to which newly verified blocks will be committed. This /// state is pluggable to allow for testing or instrumentation. /// /// The returned type is opaque to allow instrumentation or other wrappers, but /// can be boxed for storage. It is also `Clone` to allow sharing of a /// verification service. /// /// This function should only be called once for a particular state service and /// verifiers (and the result be shared, cloning if needed). Constructing /// multiple services from the same underlying state might cause synchronisation /// bugs. pub fn init_from_verifiers( block_verifier: BV, checkpoint_verifier: CheckpointVerifier, state_service: S, initial_tip_height: Option, ) -> impl Service< Arc, Response = BlockHeaderHash, Error = Error, Future = impl Future>, > + Send + Clone + 'static where BV: Service, Response = BlockHeaderHash, Error = Error> + Send + Clone + 'static, BV::Future: Send + 'static, S: Service + Send + Clone + 'static, S::Future: Send + 'static, { let max_checkpoint_height = checkpoint_verifier.list().max_height(); tracing::debug!( ?max_checkpoint_height, "initialising ChainVerifier with max checkpoint height" ); // Wrap the checkpoint verifier in a buffer, so we can share it let checkpoint_verifier = Buffer::new(checkpoint_verifier, 1); Buffer::new( ChainVerifier { block_verifier, checkpoint_verifier, max_checkpoint_height, state_service, // We might not have the genesis block yet, but that's ok, // because this field is only used for debugging unexpected high // blocks. last_block_height: initial_tip_height.unwrap_or(BlockHeight(0)), }, 1, ) }