diff --git a/book/src/dev/rfcs/0005-state-updates.md b/book/src/dev/rfcs/0005-state-updates.md index bd01e864..49d949c6 100644 --- a/book/src/dev/rfcs/0005-state-updates.md +++ b/book/src/dev/rfcs/0005-state-updates.md @@ -526,32 +526,35 @@ The state service uses the following entry points: New `non-finalized` blocks are commited as follows: -#### `pub(super) fn queue_and_commit_non_finalized_blocks(&mut self, new: Arc) -> tokio::sync::broadcast::Receiver` +### `pub(super) fn queue_and_commit_non_finalized_blocks(&mut self, new: Arc) -> tokio::sync::oneshot::Receiver` -1. If a duplicate block exists in the queue: - - Find the `QueuedBlock` for that existing duplicate block - - Create an extra receiver for the existing block, using `block.rsp_tx.subscribe`, - - Drop the newly received duplicate block - - Return the extra receiver, so it can be used in the response future for the duplicate block request - -2. Create a `QueuedBlock` for `block`: - - Create a `tokio::sync::broadcast` channel - - Use that channel to create a `QueuedBlock` for `block`. - -3. If a duplicate block exists in a non-finalized chain, or the finalized chain, +1. If a duplicate block hash exists in a non-finalized chain, or the finalized chain, it has already been successfully verified: - - Broadcast `Ok(block.hash())` via `block.rsp_tx`, and return the receiver for the block's channel + - create a new oneshot channel + - immediately send `Err(DuplicateBlockHash)` drop the sender + - return the reciever -4. Add `block` to `self.queued_blocks` +2. If a duplicate block hash exists in the queue: + - Find the `QueuedBlock` for that existing duplicate block + - create a new channel for the new request + - replace the old sender in `queued_block` with the new sender + - send `Err(DuplicateBlockHash)` through the old sender channel + - continue to use the new receiver -5. If `block.header.previous_block_hash` is not present in the finalized or +3. Else create a `QueuedBlock` for `block`: + - Create a `tokio::sync::oneshot` channel + - Use that channel to create a `QueuedBlock` for `block` + - Add `block` to `self.queued_blocks` + - continue to use the new receiver + +4. If `block.header.previous_block_hash` is not present in the finalized or non-finalized state: - Return the receiver for the block's channel -6. Else iteratively attempt to process queued blocks by their parent hash +5. Else iteratively attempt to process queued blocks by their parent hash starting with `block.header.previous_block_hash` -7. While there are recently commited parent hashes to process +6. While there are recently commited parent hashes to process - Dequeue all blocks waiting on `parent` with `let queued_children = self.queued_blocks.dequeue_children(parent);` - for each queued `block` @@ -569,17 +572,17 @@ New `non-finalized` blocks are commited as follows: - Add `block.hash` to the set of recently commited parent hashes to process -8. While the length of the non-finalized portion of the best chain is greater +7. While the length of the non-finalized portion of the best chain is greater than the reorg limit - Remove the lowest height block from the non-finalized state with `self.mem.finalize();` - Commit that block to the finalized state with `self.sled.commit_finalized_direct(finalized);` -9. Prune orphaned blocks from `self.queued_blocks` with +8. Prune orphaned blocks from `self.queued_blocks` with `self.queued_blocks.prune_by_height(finalized_height);` - -10. Return the receiver for the block's channel + +9. Return the receiver for the block's channel ## Sled data structures [sled]: #sled diff --git a/zebra-state/src/service.rs b/zebra-state/src/service.rs index 35135675..599cdcd1 100644 --- a/zebra-state/src/service.rs +++ b/zebra-state/src/service.rs @@ -9,7 +9,7 @@ use std::{ use futures::future::FutureExt; use memory_state::{NonFinalizedState, QueuedBlocks}; -use tokio::sync::broadcast; +use tokio::sync::oneshot; use tower::{util::BoxService, Service}; use tracing::instrument; use zebra_chain::{ @@ -18,8 +18,7 @@ use zebra_chain::{ }; use crate::{ - BoxError, CloneError, CommitBlockError, Config, FinalizedState, Request, Response, - ValidateContextError, + BoxError, CommitBlockError, Config, FinalizedState, Request, Response, ValidateContextError, }; mod memory_state; @@ -32,7 +31,7 @@ pub struct QueuedBlock { // TODO: add these parameters when we can compute anchors. // sprout_anchor: sprout::tree::Root, // sapling_anchor: sapling::tree::Root, - pub rsp_tx: broadcast::Sender>, + pub rsp_tx: oneshot::Sender>, } struct StateService { @@ -73,14 +72,38 @@ impl StateService { /// in RFC0005. /// /// [1]: https://zebra.zfnd.org/dev/rfcs/0005-state-updates.html#committing-non-finalized-blocks - #[instrument(skip(self, new))] - fn queue_and_commit_non_finalized_blocks(&mut self, new: QueuedBlock) { - let parent_hash = new.block.header.previous_block_hash; + #[instrument(skip(self, block))] + fn queue_and_commit_non_finalized_blocks( + &mut self, + block: Arc, + ) -> oneshot::Receiver> { + let hash = block.hash(); + let parent_hash = block.header.previous_block_hash; - self.queued_blocks.queue(new); + if self.contains_committed_block(&block) { + let (rsp_tx, rsp_rx) = oneshot::channel(); + let _ = rsp_tx.send(Err("duplicate block".into())); + return rsp_rx; + } + + // The queue of blocks maintained by this service acts as a pipeline for + // blocks waiting for contextual verification. We lazily flush the + // pipeline here by handling duplicate requests to verify an existing + // queued block. We handle those duplicate requests by replacing the old + // channel with the new one and sending an error over the old channel. + let rsp_rx = if let Some(queued_block) = self.queued_blocks.get_mut(&hash) { + let (mut rsp_tx, rsp_rx) = oneshot::channel(); + std::mem::swap(&mut queued_block.rsp_tx, &mut rsp_tx); + let _ = rsp_tx.send(Err("duplicate block".into())); + rsp_rx + } else { + let (rsp_tx, rsp_rx) = oneshot::channel(); + self.queued_blocks.queue(QueuedBlock { block, rsp_tx }); + rsp_rx + }; if !self.can_fork_chain_at(&parent_hash) { - return; + return rsp_rx; } self.process_queued(parent_hash); @@ -96,6 +119,8 @@ impl StateService { .prune_by_height(self.sled.finalized_tip_height().expect( "Finalized state must have at least one block before committing non-finalized state", )); + + rsp_rx } /// Run contextual validation on `block` and add it to the non-finalized @@ -118,6 +143,17 @@ impl StateService { self.mem.any_chain_contains(hash) || &self.sled.finalized_tip_hash() == hash } + /// Returns true if the given hash has been committed to either the finalized + /// or non-finalized state. + fn contains_committed_block(&self, block: &Block) -> bool { + let hash = block.hash(); + let height = block + .coinbase_height() + .expect("coinbase heights should be valid"); + + self.mem.any_chain_contains(&hash) || self.sled.get_hash(height) == Some(hash) + } + /// Attempt to validate and commit all queued blocks whose parents have /// recently arrived starting from `new_parent`, in breadth-first ordering. #[instrument(skip(self))] @@ -132,7 +168,7 @@ impl StateService { let result = self .validate_and_commit(block) .map(|()| hash) - .map_err(CloneError::from); + .map_err(BoxError::from); let _ = rsp_tx.send(result); new_parents.push(hash); } @@ -179,14 +215,11 @@ impl Service for StateService { fn call(&mut self, req: Request) -> Self::Future { match req { Request::CommitBlock { block } => { - let (rsp_tx, mut rsp_rx) = broadcast::channel(1); - self.pending_utxos.check_block(&block); - self.queue_and_commit_non_finalized_blocks(QueuedBlock { block, rsp_tx }); + let rsp_rx = self.queue_and_commit_non_finalized_blocks(block); async move { rsp_rx - .recv() .await .expect("sender is not dropped") .map(Response::Committed) @@ -195,7 +228,7 @@ impl Service for StateService { .boxed() } Request::CommitFinalizedBlock { block } => { - let (rsp_tx, mut rsp_rx) = broadcast::channel(1); + let (rsp_tx, rsp_rx) = oneshot::channel(); self.pending_utxos.check_block(&block); self.sled @@ -203,7 +236,6 @@ impl Service for StateService { async move { rsp_rx - .recv() .await .expect("sender is not dropped") .map(Response::Committed) diff --git a/zebra-state/src/service/memory_state.rs b/zebra-state/src/service/memory_state.rs index 576c3da8..7f06e3f0 100644 --- a/zebra-state/src/service/memory_state.rs +++ b/zebra-state/src/service/memory_state.rs @@ -641,6 +641,11 @@ impl QueuedBlocks { .remove(&hash); } } + + /// Return the queued block if it has already been registered + pub fn get_mut(&mut self, hash: &block::Hash) -> Option<&mut QueuedBlock> { + self.blocks.get_mut(&hash) + } } #[cfg(test)] diff --git a/zebra-state/src/sled_state.rs b/zebra-state/src/sled_state.rs index 7c4cb2ea..f5baa9d4 100644 --- a/zebra-state/src/sled_state.rs +++ b/zebra-state/src/sled_state.rs @@ -304,4 +304,12 @@ impl FinalizedState { ) -> Result, BoxError> { self.utxo_by_outpoint.zs_get(outpoint) } + + /// Returns the finalized hash for a given `block::Height` if it is present. + pub fn get_hash(&self, height: block::Height) -> Option { + self.hash_by_height + .get(&height.0.to_be_bytes()) + .expect("expected that sled errors would not occur") + .map(|bytes| block::Hash(bytes.as_ref().try_into().unwrap())) + } }