diff --git a/zebra-consensus/src/chain.rs b/zebra-consensus/src/chain.rs index 3e3722ae..06dea6e6 100644 --- a/zebra-consensus/src/chain.rs +++ b/zebra-consensus/src/chain.rs @@ -87,10 +87,18 @@ where } } -/// Return a block verification service, using `config`, `network` and -/// `state_service`. +/// Initialize a block verification service. +/// +/// The consensus configuration is specified by `config`, and the Zcash network +/// to verify blocks for is specified by `network`. +/// +/// The block verification service asynchronously performs semantic verification +/// checks. Blocks that pass semantic verification are submitted to the supplied +/// `state_service` for contextual verification before being committed to the chain. /// /// This function should only be called once for a particular state service. +/// +/// Dropped requests are cancelled on a best-effort basis, but may continue to be processed. #[instrument(skip(state_service))] pub async fn init( config: Config, diff --git a/zebra-consensus/src/checkpoint.rs b/zebra-consensus/src/checkpoint.rs index c9a407ef..be8b1285 100644 --- a/zebra-consensus/src/checkpoint.rs +++ b/zebra-consensus/src/checkpoint.rs @@ -98,8 +98,6 @@ where S: Service + Send + Clone + 'static, S::Future: Send + 'static, { - // Inputs - // /// The checkpoint list for this verifier. checkpoint_list: CheckpointList, @@ -109,8 +107,6 @@ where /// The underlying state service, possibly wrapped in other services. state_service: S, - // Queued Blocks - // /// A queue of unverified blocks. /// /// Contains a list of unverified blocks at each block height. In most cases, @@ -127,9 +123,6 @@ where verifier_progress: Progress, } -/// The CheckpointVerifier implementation. -/// -/// Contains non-service utility functions for CheckpointVerifiers. impl CheckpointVerifier where S: Service + Send + Clone + 'static, @@ -386,8 +379,7 @@ where height, verified_height: previous_height, }); - // TODO: reduce to trace level once the AlreadyVerified bug is fixed - tracing::info!(?e); + tracing::trace!(?e); e?; } InitialTip(_) | PreviousCheckpoint(_) => {} @@ -580,7 +572,7 @@ where valid_qblock } - /// Check all the blocks in the current checkpoint range. + /// Try to verify from the previous checkpoint to a target checkpoint. /// /// Send `Ok` for the blocks that are in the chain, and `Err` for side-chain /// blocks. @@ -830,34 +822,43 @@ where return async { Err(VerifyCheckpointError::Finished) }.boxed(); } - // Queue the block for verification, until we receive all the blocks for - // the current checkpoint range. let rx = self.queue_block(block.clone()); - - // Try to verify from the previous checkpoint to a target checkpoint. - // - // If there are multiple checkpoints in the target range, and one of - // the ranges is invalid, we'll try again with a smaller target range - // on the next call(). Failures always reject a block, so we know - // there will be at least one more call(). - // - // We don't retry with a smaller range on failure, because failures - // should be rare. self.process_checkpoint_range(); + metrics::gauge!("checkpoint.queued_slots", self.queued.len() as i64); + + // Because the checkpoint verifier duplicates state from the state + // service (it tracks which checkpoints have been verified), we must + // commit blocks transactionally on a per-checkpoint basis. Otherwise, + // the checkpoint verifier's state could desync from the underlying + // state service. Among other problems, this could cause the checkpoint + // verifier to reject blocks not already in the state as + // already-verified. + // + // To commit blocks transactionally on a per-checkpoint basis, we must + // commit all verified blocks in a checkpoint range, regardless of + // whether or not the response futures for each block were dropped. + // + // We accomplish this by spawning a new task containing the + // commit-if-verified logic. This task will always execute, except if + // the program is interrupted, in which case there is no longer a + // checkpoint verifier to keep in sync with the state. let mut state_service = self.state_service.clone(); - async move { + let commit_finalized_block = tokio::spawn(async move { let hash = rx .await .expect("CheckpointVerifier does not leave dangling receivers")?; + // Once we get a verified hash, we must commit it to the chain state + // as a finalized block, or exit the program, so .expect rather than + // propagate errors from the state service. match state_service .ready_and() .await - .map_err(VerifyCheckpointError::CommitFinalized)? + .expect("Verified checkpoints must be committed transactionally") .call(zs::Request::CommitFinalizedBlock { block }) .await - .map_err(VerifyCheckpointError::CommitFinalized)? + .expect("Verified checkpoints must be committed transactionally") { zs::Response::Committed(committed_hash) => { assert_eq!(committed_hash, hash, "state must commit correct hash"); @@ -865,6 +866,12 @@ where } _ => unreachable!("wrong response for CommitFinalizedBlock"), } + }); + + async move { + commit_finalized_block + .await + .expect("commit_finalized_block should not panic") } .boxed() } diff --git a/zebra-network/src/protocol/internal/request.rs b/zebra-network/src/protocol/internal/request.rs index f0f4cc87..39fc436f 100644 --- a/zebra-network/src/protocol/internal/request.rs +++ b/zebra-network/src/protocol/internal/request.rs @@ -14,6 +14,18 @@ use super::super::types::Nonce; /// possible [`Response`s](super::Response) it can generate; it is fine (and /// recommended!) to match on the expected responses and treat the others as /// `unreachable!()`, since their return indicates a bug in the network code. +/// +/// # Cancellations +/// +/// The peer set handles cancelled requests (i.e., requests where the future +/// returned by `Service::call` is dropped before it resolves) on a best-effort +/// basis. Requests are routed to a particular peer connection, and then +/// translated into Zcash protocol messages and sent over the network. If a +/// request is cancelled after it is submitted but before it is processed by a +/// peer connection, no messages will be sent. Otherwise, if it is cancelled +/// while waiting for a response, the peer connection resets its state and makes +/// a best-effort attempt to ignore any messages responsive to the cancelled +/// request, subject to limitations in the underlying Zcash protocol. #[derive(Clone, Debug)] pub enum Request { /// Requests additional peers from the server. diff --git a/zebra-state/src/request.rs b/zebra-state/src/request.rs index 6a0da75c..76167735 100644 --- a/zebra-state/src/request.rs +++ b/zebra-state/src/request.rs @@ -53,11 +53,18 @@ pub enum Request { /// Performs contextual validation of the given block, committing it to the /// state if successful. /// - /// Returns [`Response::Committed`] with the hash of the newly - /// committed block, or an error. + /// It is the caller's responsibility to perform semantic validation. This + /// request can be made out-of-order; the state service will queue it until + /// its parent is ready. /// - /// This request can be made out-of-order; the state service will buffer it - /// until its parent is ready. + /// Returns [`Response::Committed`] with the hash of the block when it is + /// committed to the state, or an error if the block fails contextual + /// validation or has already been committed to the state. + /// + /// This request cannot be cancelled once submitted; dropping the response + /// future will have no effect on whether it is eventually processed. A + /// request to commit a block which has been queued internally but not yet + /// committed will fail the older request and replace it with the newer request. CommitBlock { /// The block to commit to the state. block: Arc, @@ -66,15 +73,20 @@ pub enum Request { // sapling_anchor: sapling::tree::Root, }, - /// Commit a finalized block to the state, skipping contextual validation. + /// Commit a finalized block to the state, skipping all validation. + /// /// This is exposed for use in checkpointing, which produces finalized - /// blocks. + /// blocks. It is the caller's responsibility to ensure that the block is + /// valid and final. This request can be made out-of-order; the state service + /// will queue it until its parent is ready. /// - /// Returns [`Response::Committed`] with the hash of the newly - /// committed block, or an error. + /// Returns [`Response::Committed`] with the hash of the newly committed + /// block, or an error. /// - /// This request can be made out-of-order; the state service will buffer it - /// until its parent is ready. + /// This request cannot be cancelled once submitted; dropping the response + /// future will have no effect on whether it is eventually processed. + /// Duplicate requests should not be made, because it is the caller's + /// responsibility to ensure that each block is valid and final. CommitFinalizedBlock { /// The block to commit to the state. block: Arc, @@ -124,8 +136,14 @@ pub enum Request { /// [`block::Height`] using `.into()`. Block(HashOrHeight), - /// Request a UTXO identified by the given Outpoint in any chain. + /// Request a UTXO identified by the given Outpoint, waiting until it becomes + /// available if it is unknown. /// - /// Returns UTXOs fron any chain, including side-chains. + /// This request is purely informational, and there are no guarantees about + /// whether the UTXO remains unspent or is on the best chain. Its purpose is + /// to allow asynchronous script verification. + /// + /// Code making this request should apply a timeout layer to the service to + /// handle missing UTXOs. AwaitUtxo(transparent::OutPoint), } diff --git a/zebra-state/src/service.rs b/zebra-state/src/service.rs index 6ca069e9..a100eaf9 100644 --- a/zebra-state/src/service.rs +++ b/zebra-state/src/service.rs @@ -92,19 +92,17 @@ impl StateService { if self.contains_committed_block(&block) { let (rsp_tx, rsp_rx) = oneshot::channel(); - let _ = rsp_tx.send(Err("duplicate block".into())); + let _ = rsp_tx.send(Err("block is already committed to the state".into())); return rsp_rx; } - // The queue of blocks maintained by this service acts as a pipeline for - // blocks waiting for contextual verification. We lazily flush the - // pipeline here by handling duplicate requests to verify an existing - // queued block. We handle those duplicate requests by replacing the old - // channel with the new one and sending an error over the old channel. + // Request::CommitBlock contract: a request to commit a block which has + // been queued but not yet committed to the state fails the older + // request and replaces it with the newer request. let rsp_rx = if let Some(queued_block) = self.queued_blocks.get_mut(&hash) { let (mut rsp_tx, rsp_rx) = oneshot::channel(); std::mem::swap(&mut queued_block.rsp_tx, &mut rsp_tx); - let _ = rsp_tx.send(Err("duplicate block".into())); + let _ = rsp_tx.send(Err("replaced by newer request".into())); rsp_rx } else { let (rsp_tx, rsp_rx) = oneshot::channel();