diff --git a/Cargo.lock b/Cargo.lock index e0ddc83b..9a37eafb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3257,6 +3257,7 @@ version = "3.0.0-alpha.0" dependencies = [ "color-eyre", "dirs", + "displaydoc", "futures", "hex", "lazy_static", diff --git a/book/src/dev/rfcs/0005-state-updates.md b/book/src/dev/rfcs/0005-state-updates.md index 425e9b03..d052e6ae 100644 --- a/book/src/dev/rfcs/0005-state-updates.md +++ b/book/src/dev/rfcs/0005-state-updates.md @@ -291,7 +291,6 @@ Push a block into a chain as the new tip - Add key: `transaction.hash` and value: `(height, tx_index)` to `tx_by_hash` - Add created utxos to `self.created_utxos` - Add spent utxos to `self.spent_utxos` - - Add anchors to the appropriate `self._anchors` - Add nullifiers to the appropriate `self._nullifiers` 2. Add block to `self.blocks` @@ -309,7 +308,6 @@ Remove the lowest height block of the non-finalized portion of a chain. - Remove `transaction.hash` from `tx_by_hash` - Remove created utxos from `self.created_utxos` - Remove spent utxos from `self.spent_utxos` - - Remove the anchors from the appropriate `self._anchors` - Remove the nullifiers from the appropriate `self._nullifiers` 3. Return the block @@ -340,7 +338,6 @@ Remove the highest height block of the non-finalized portion of a chain. - remove `transaction.hash` from `tx_by_hash` - Remove created utxos from `self.created_utxos` - Remove spent utxos from `self.spent_utxos` - - Remove anchors from the appropriate `self._anchors` - Remove the nullifiers from the appropriate `self._nullifiers` #### `Ord` @@ -390,7 +387,70 @@ pub struct NonFinalizedState { /// Blocks awaiting their parent blocks for contextual verification. contextual_queue: QueuedBlocks, } +``` +#### `pub fn finalize(&mut self) -> Arc` + +Finalize the lowest height block in the non-finalized portion of the best +chain and updates all side chains to match. + +1. Extract the best chain from `self.chain_set` into `best_chain` + +2. Extract the rest of the chains into a `side_chains` temporary variable, so + they can be mutated + +3. Remove the lowest height block from the best chain with + `let finalized_block = best_chain.pop_root();` + +4. Add `best_chain` back to `self.chain_set` + +5. For each remaining `chain` in `side_chains` + - remove the lowest height block from `chain` + - If that block is equal to `finalized_block` add `chain` back to `self.chain_set` + - Else, drop `chain` + +6. Return `finalized_block` + +#### `fn commit_block(&mut self, block: Arc)` + +Commit `block` to the non-finalized state. + +1. If the block is a pre-Sapling block, panic. + +2. If any chains tip hash equal `block.header.previous_block_hash` remove that chain from `self.chain_set` + +3. Else Find the first chain that contains `block.parent` and fork it with + `block.parent` as the new tip + - `let fork = self.chain_set.iter().find_map(|chain| chain.fork(block.parent));` + +4. Else panic, this should be unreachable because `commit_block` is only + called when `block` is ready to be committed. + +5. Push `block` into `parent_chain` + +6. Insert `parent_chain` into `self.chain_set` + +### `pub(super) fn commit_new_chain(&mut self, block: Arc)` + +Construct a new chain starting with `block`. + +1. Construct a new empty chain + +2. `push` `block` into that new chain + +3. Insert the new chain into `self.chain_set` + +### The `QueuedBlocks` type + +The queued blocks type represents the non-finalized blocks that were commited +before their parent blocks were. It is responsible for tracking which blocks +are queued by their parent so they can be commited immediately after the +parent is commited. It also tracks blocks by their height so they can be +discarded if they ever end up below the reorg limit. + +`NonFinalizedState` is defined by the following structure and API: + +```rust /// A queue of blocks, awaiting the arrival of parent blocks. #[derive(Debug, Default)] struct QueuedBlocks { @@ -403,149 +463,110 @@ struct QueuedBlocks { } ``` -#### `pub fn finalize(&mut self) -> Arc` +#### `pub fn queue(&mut self, new: QueuedBlock)` -Finalize the lowest height block in the non-finalized portion of the best -chain and updates all side chains to match. +Add a block to the queue of blocks waiting for their requisite context to +become available. -1. Extract the best chain from `self.chains` into `best_chain` +1. extract the `parent_hash`, `new_hash`, and `new_height` from `new.block` -2. Extract the rest of the chains into a `side_chains` temporary variable, so - they can be mutated +2. Add `new` to `self.blocks` using `new_hash` as the key -3. Remove the lowest height block from the best chain with - `let block = best_chain.pop_root();` +3. Add `new_hash` to the set of hashes in + `self.by_parent.entry(parent_hash).or_default()` -4. Add `best_chain` back to `self.chains` +4. Add `new_hash` to the set of hashes in + `self.by_height.entry(new_height).or_default()` -5. For each remaining `chain` in `side_chains` - - If `chain` starts with `block`, remove `block` and add `chain` back to - `self.chains` - - Else, drop `chain` +#### `pub fn dequeue_children(&mut self, parent: block::Hash) -> Vec` -6. calculate the new finalized tip height from the new `best_chain` +Dequeue the set of blocks waiting on `parent`. -7. for each `height` in `self.queued_by_height` where the height is lower than the - new reorg limit - - for each `hash` in `self.queued_by_height.remove(height)` - - Remove the key `hash` from `self.queued_blocks` and store the removed `block` - - Find and remove `hash` from `self.queued_by_parent` using `block.parent`'s hash +1. Remove the set of hashes waiting on `parent` from `self.by_parent` -8. Return `block` +2. Remove and collect each block in that set of hashes from `self.blocks` as + `queued_children` -### `pub fn queue(&mut self, block: QueuedBlock)` +3. For each `block` in `queued_children` remove the associated `block.hash` + from `self.by_height` -Queue a non-finalized block to be committed to the state. +4. Return `queued_children` -After queueing a non-finalized block, this method checks whether the newly -queued block (and any of its descendants) can be committed to the state +#### `pub fn prune_by_height(&mut self, finalized_height: block::Height)` -1. If the block itself exists in any current chain, it has already been successfully verified: - - broadcast `Ok(block.hash())` via `block.rsp_tx`, and return +Prune all queued blocks whose height are less than or equal to +`finalized_height`. -2. If the parent block exists in any current chain: - - Call `let hash = self.commit_block(block)` - - Call `self.process_queued(hash)` +1. Split the `by_height` list at the finalized height, removing all heights + that are below `finalized_height` -3. Else Add `block` to `self.queued_blocks` and related members and return +2. for each hash in the removed values of `by_height` + - remove the corresponding block from `self.blocks` + - remove the block's hash from the list of blocks waiting on + `block.header.previous_block_hash` from `self.by_parent` -### `fn process_queued(&mut self, new_parent: block::Hash)` - -1. Create a list of `new_parents` and populate it with `new_parent` - -2. While let Some(parent) = new_parents.pop() - - for each `hash` in `self.queued_by_parent.remove(&parent.hash)` - - lookup the `block` for `hash` - - remove `block` from `self.queued_blocks` - - remove `hash` from `self.queued_by_height` - - let hash = `self.commit_block(block)`; - - add `hash` to `new_parents` - -### `fn commit_block(&mut self, block: QueuedBlock) -> block::Hash` - -Try to commit `block` to the non-finalized state. Must succeed, because -`commit_block` is only called when `block` is ready to be committed. - -1. If the block is a pre-Sapling block, panic. - -2. Search for the first chain where `block.parent` == `chain.tip`. If it exists: - - return `self.push_block_on_chain(block, chain)` - -3. Find the first chain that contains `block.parent` and fork it with - `block.parent` as the new tip - - `let fork = self.chains.iter().find_map(|chain| chain.fork(block.parent));` - -4. If `fork` is `Some` - - call `let hash = self.push_block_on_chain(block, fork)` - - add `fork` to `self.chains` - - return `hash` - -5. Else panic, this should be unreachable because `commit_block` is only - called when `block` is ready to be committed. - -### `pub(super) fn push_block_on_chain(&mut self, block: QueuedBlock, &mut chain: Chain) -> block::Hash` - -Try to commit `block` to `chain`. Must succeed, because -`push_block_on_chain` is only called when `block` is ready to be committed. - -1. push `block` onto `chain` - -2. broadcast `result` via `block.rsp_tx` - -3. return `block.hash` if `result.is_ok()` - -4. Else panic, this should be unreachable because `push_block_on_chain` is only - called when `block` is ready to be committed. ### Summary - `Chain` represents the non-finalized portion of a single chain -- `NonFinalizedState` represents the non-finalized portion of all chains and all - unverified blocks that are waiting for context to be available. +- `NonFinalizedState` represents the non-finalized portion of all chains -- `NonFinalizedState::queue` handles queueing and or committing blocks and - reorganizing chains (via `commit_block`) but not finalizing them +- `QueuedBlocks` represents all unverified blocks that are waiting for + context to be available. -- Finalized blocks are returned from `finalize` and must still be committed - to disk afterwards +The state service uses the following entry points: +- `commit_block` when it receives new blocks. -- `finalize` handles pruning queued blocks that are past the reorg limit +- `finalize` to prevent chains in `NonFinalizedState` from growing beyond the reorg limit. + +- [FinalizedState.queue_and_commit_finalized_blocks](#committing-finalized-blocks) on the blocks returned by `finalize`, to commit those finalized blocks to disk. ## Committing non-finalized blocks Given the above structures for manipulating the non-finalized state new -`non-finalized` blocks are committed in two steps. First we commit the block -to the in memory state, then we finalize all lowest height blocks that are -past the reorg limit, finally we process any queued blocks and prune any that -are now past the reorg limit. +`non-finalized` blocks are commited as follows: -1. If the block itself exists in the finalized chain, it has already been successfully verified: +### `fn queue_and_commit_non_finalized_blocks(&mut self, new: QueuedBlock)` + +1. If the block itself exists in the finalized chain, it has already been + successfully verified: - broadcast `Ok(block.hash())` via `block.rsp_tx`, and return -2. Run contextual validation on `block` against the finalized and non - finalized state +2. Add `block` to `self.queued_blocks` -3. If `block.parent` == `finalized_tip.hash` - - Construct a new `chain` with `Chain::default` - - call `let hash = chain_set.push_block_on_chain(block, chain)` - - add `fork` to `chain_set.chains` - - return `hash` +3. If `block.header.previous_block_hash` is not present in the finalized or + non-finalized state return early -4. Otherwise, commit or queue the block to the non-finalized state with - `chain_set.queue(block);` +4. Else iteratively attempt to process queued blocks by their parent hash + starting with `block.header.previous_block_hash` -5. If the best chain is longer than the reorg limit - - Finalize all lowest height blocks in the best chain, and commit them to - disk with `CommitFinalizedBlock`: +5. While there are recently commited parent hashes to process + - Dequeue all blocks waiting on `parent` with `let queued_children = + self.queued_blocks.dequeue_children(parent);` + - for each queued `block` + - **Run contextual validation** on `block` + - If the block fails contextual validation return the result over the + associated channel + - Else if the block's previous hash is the finalized tip add to the + non-finalized state with `self.mem.commit_new_chain(block)` + - Else add the new block to an existing non-finalized chain or new fork + with `self.mem.commit_block(block);` + - Return `Ok(hash)` over the associated channel to indicate the block + was successfully commited + - Add `block.hash` to the set of recently commited parent hashes to + process + +6. While the length of the non-finalized portion of the best chain is greater + than the reorg limit + - Remove the lowest height block from the non-finalized state with + `self.mem.finalize();` + - Commit that block to the finalized state with + `self.sled.commit_finalized_direct(finalized);` + +7. Prune orphaned blocks from `self.queued_blocks` with + `self.queued_blocks.prune_by_height(finalized_height);` - ``` - while self.best_chain().len() > reorg_limit { - let finalized = chain_set.finalize()?; - let request = CommitFinalizedBlock { finalized }; - sled_state.ready_and().await?.call(request).await?; - }; - ``` ## Sled data structures [sled]: #sled @@ -604,6 +625,8 @@ Committing a block to the sled state should be implemented as a wrapper around a function also called by [`Request::CommitBlock`](#request-commit-block), which should: +### `pub fn queue_and_commit_finalized_blocks(&mut self, queued_block: QueuedBlock)` + 1. Obtain the highest entry of `hash_by_height` as `(old_height, old_tip)`. Check that `block`'s parent hash is `old_tip` and its height is `old_height+1`, or panic. This check is performed as defense-in-depth to diff --git a/zebra-chain/src/block.rs b/zebra-chain/src/block.rs index 51cacb44..8786695a 100644 --- a/zebra-chain/src/block.rs +++ b/zebra-chain/src/block.rs @@ -14,6 +14,8 @@ mod arbitrary; #[cfg(test)] mod tests; +use std::fmt; + pub use hash::Hash; pub use header::Header; pub use height::Height; @@ -36,6 +38,28 @@ pub struct Block { pub transactions: Vec>, } +impl fmt::Display for Block { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut fmter = f.debug_struct("Block"); + if let Some(height) = self.coinbase_height() { + fmter.field("height", &height); + } + + fmter.field("hash", &DisplayToDebug(self.hash())).finish() + } +} + +struct DisplayToDebug(T); + +impl fmt::Debug for DisplayToDebug +where + T: fmt::Display, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} + impl Block { /// Return the block height reported in the coinbase transaction, if any. pub fn coinbase_height(&self) -> Option { diff --git a/zebra-chain/src/block/arbitrary.rs b/zebra-chain/src/block/arbitrary.rs index fbe7335a..19e661fd 100644 --- a/zebra-chain/src/block/arbitrary.rs +++ b/zebra-chain/src/block/arbitrary.rs @@ -30,6 +30,8 @@ impl Arbitrary for Block { } impl Block { + /// Returns a strategy for creating Vecs of blocks with increasing height of + /// the given length. pub fn partial_chain_strategy( init: LedgerState, count: usize, diff --git a/zebra-chain/src/lib.rs b/zebra-chain/src/lib.rs index ca4bd0c6..3c504a26 100644 --- a/zebra-chain/src/lib.rs +++ b/zebra-chain/src/lib.rs @@ -25,12 +25,28 @@ pub mod work; #[derive(Debug, Clone, Copy)] #[cfg(any(test, feature = "proptest-impl"))] +#[non_exhaustive] +/// The configuration data for proptest when generating arbitrary chains pub struct LedgerState { + /// The tip height of the block or start of the chain pub tip_height: block::Height, - pub is_coinbase: bool, + is_coinbase: bool, + /// The network to generate fake blocks for pub network: parameters::Network, } +#[cfg(any(test, feature = "proptest-impl"))] +impl LedgerState { + /// Construct a new ledger state for generating arbitrary chains via proptest + pub fn new(tip_height: block::Height, network: parameters::Network) -> Self { + Self { + tip_height, + is_coinbase: true, + network, + } + } +} + #[cfg(any(test, feature = "proptest-impl"))] impl Default for LedgerState { fn default() -> Self { diff --git a/zebra-chain/src/sapling/commitment.rs b/zebra-chain/src/sapling/commitment.rs index 80afa558..cd702d32 100644 --- a/zebra-chain/src/sapling/commitment.rs +++ b/zebra-chain/src/sapling/commitment.rs @@ -1,7 +1,5 @@ //! Note and value commitments. -#[cfg(any(test, feature = "proptest-impl"))] -mod arbitrary; #[cfg(test)] mod test_vectors; diff --git a/zebra-chain/src/sapling/commitment/arbitrary.rs b/zebra-chain/src/sapling/commitment/arbitrary.rs deleted file mode 100644 index 8b137891..00000000 --- a/zebra-chain/src/sapling/commitment/arbitrary.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/zebra-chain/src/sapling/keys.rs b/zebra-chain/src/sapling/keys.rs index 640123be..9f83634d 100644 --- a/zebra-chain/src/sapling/keys.rs +++ b/zebra-chain/src/sapling/keys.rs @@ -9,8 +9,6 @@ //! [3.1]: https://zips.z.cash/protocol/protocol.pdf#addressesandkeys #![allow(clippy::unit_arg)] -#[cfg(any(test, feature = "proptest-impl"))] -mod arbitrary; #[cfg(test)] mod test_vectors; #[cfg(test)] @@ -26,9 +24,6 @@ use std::{ use bech32::{self, FromBase32, ToBase32}; use rand_core::{CryptoRng, RngCore}; -#[cfg(any(test, feature = "proptest-impl"))] -use proptest_derive::Arbitrary; - use crate::{ parameters::Network, primitives::redjubjub::{self, SpendAuth}, @@ -183,7 +178,10 @@ mod sk_hrp { /// /// [ps]: https://zips.z.cash/protocol/protocol.pdf#saplingkeycomponents #[derive(Copy, Clone, Debug, Eq, PartialEq)] -#[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))] +#[cfg_attr( + any(test, feature = "proptest-impl"), + derive(proptest_derive::Arbitrary) +)] pub struct SpendingKey { network: Network, bytes: [u8; 32], @@ -610,7 +608,10 @@ impl PartialEq<[u8; 32]> for IncomingViewingKey { /// /// [ps]: https://zips.z.cash/protocol/protocol.pdf#saplingkeycomponents #[derive(Copy, Clone, Eq, PartialEq)] -#[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))] +#[cfg_attr( + any(test, feature = "proptest-impl"), + derive(proptest_derive::Arbitrary) +)] pub struct Diversifier(pub [u8; 11]); impl fmt::Debug for Diversifier { diff --git a/zebra-chain/src/sapling/keys/arbitrary.rs b/zebra-chain/src/sapling/keys/arbitrary.rs deleted file mode 100644 index 8b137891..00000000 --- a/zebra-chain/src/sapling/keys/arbitrary.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/zebra-chain/src/transaction/arbitrary.rs b/zebra-chain/src/transaction/arbitrary.rs index db91007b..c21b3cfa 100644 --- a/zebra-chain/src/transaction/arbitrary.rs +++ b/zebra-chain/src/transaction/arbitrary.rs @@ -105,6 +105,8 @@ impl Transaction { .boxed() } + /// Proptest Strategy for creating a Vector of transactions where the first + /// transaction is always the only coinbase transaction pub fn vec_strategy( mut ledger_state: LedgerState, len: usize, diff --git a/zebra-state/Cargo.toml b/zebra-state/Cargo.toml index 4f1690f8..57bb2d36 100644 --- a/zebra-state/Cargo.toml +++ b/zebra-state/Cargo.toml @@ -24,6 +24,7 @@ tracing = "0.1" tracing-error = "0.1.2" thiserror = "1.0.21" tokio = { version = "0.2.22", features = ["sync"] } +displaydoc = "0.1.7" [dev-dependencies] zebra-chain = { path = "../zebra-chain", features = ["proptest-impl"] } diff --git a/zebra-state/src/lib.rs b/zebra-state/src/lib.rs index 682fdd79..aa076fd8 100644 --- a/zebra-state/src/lib.rs +++ b/zebra-state/src/lib.rs @@ -8,7 +8,6 @@ mod config; mod constants; -mod memory_state; mod request; mod response; mod service; @@ -19,7 +18,6 @@ mod util; #[cfg(test)] mod tests; -use memory_state::NonFinalizedState; use service::QueuedBlock; use sled_state::FinalizedState; diff --git a/zebra-state/src/service.rs b/zebra-state/src/service.rs index 2dfd664f..0eb48751 100644 --- a/zebra-state/src/service.rs +++ b/zebra-state/src/service.rs @@ -6,14 +6,19 @@ use std::{ }; use futures::future::{FutureExt, TryFutureExt}; +use memory_state::{NonFinalizedState, QueuedBlocks}; +use thiserror::Error; use tokio::sync::oneshot; use tower::{buffer::Buffer, util::BoxService, Service}; +use tracing::instrument; use zebra_chain::{ block::{self, Block}, parameters::Network, }; -use crate::{BoxError, Config, FinalizedState, NonFinalizedState, Request, Response}; +use crate::{BoxError, Config, FinalizedState, Request, Response}; + +mod memory_state; // todo: put this somewhere #[derive(Debug)] @@ -29,14 +34,124 @@ struct StateService { /// Holds data relating to finalized chain state. sled: FinalizedState, /// Holds data relating to non-finalized chain state. - _mem: NonFinalizedState, + mem: NonFinalizedState, + /// Blocks awaiting their parent blocks for contextual verification. + queued_blocks: QueuedBlocks, +} + +#[derive(Debug, Error)] +#[error("block is not contextually valid")] +struct CommitError(#[from] ValidateContextError); + +#[derive(displaydoc::Display, Debug, Error)] +enum ValidateContextError { + /// block.height is lower than the current finalized height + OrphanedBlock, } impl StateService { pub fn new(config: Config, network: Network) -> Self { let sled = FinalizedState::new(&config, network); - let _mem = NonFinalizedState::default(); - Self { sled, _mem } + let mem = NonFinalizedState::default(); + let queued_blocks = QueuedBlocks::default(); + + Self { + sled, + mem, + queued_blocks, + } + } + + /// Queue a non finalized block for verification and check if any queued + /// blocks are ready to be verified and committed to the state. + /// + /// This function encodes the logic for [committing non-finalized blocks][1] + /// in RFC0005. + /// + /// [1]: https://zebra.zfnd.org/dev/rfcs/0005-state-updates.html#committing-non-finalized-blocks + #[instrument(skip(self, new))] + fn queue_and_commit_non_finalized_blocks(&mut self, new: QueuedBlock) { + let parent_hash = new.block.header.previous_block_hash; + + self.queued_blocks.queue(new); + + if !self.can_fork_chain_at(&parent_hash) { + return; + } + + self.process_queued(parent_hash); + + while self.mem.best_chain_len() > crate::constants::MAX_BLOCK_REORG_HEIGHT { + let finalized = self.mem.finalize(); + self.sled + .commit_finalized_direct(finalized) + .expect("expected that sled errors would not occur"); + } + + self.queued_blocks + .prune_by_height(self.sled.finalized_tip_height().expect( + "Finalized state must have at least one block before committing non-finalized state", + )); + } + + /// Run contextual validation on `block` and add it to the non-finalized + /// state if it is contextually valid. + fn validate_and_commit(&mut self, block: Arc) -> Result<(), CommitError> { + self.check_contextual_validity(&block)?; + let parent_hash = block.header.previous_block_hash; + + if self.sled.finalized_tip_hash() == parent_hash { + self.mem.commit_new_chain(block); + } else { + self.mem.commit_block(block); + } + + Ok(()) + } + + /// Returns `true` if `hash` is a valid previous block hash for new non-finalized blocks. + fn can_fork_chain_at(&self, hash: &block::Hash) -> bool { + self.mem.any_chain_contains(hash) || &self.sled.finalized_tip_hash() == hash + } + + /// Attempt to validate and commit all queued blocks whose parents have + /// recently arrived starting from `new_parent`, in breadth-first ordering. + #[instrument(skip(self))] + fn process_queued(&mut self, new_parent: block::Hash) { + let mut new_parents = vec![new_parent]; + + while let Some(parent) = new_parents.pop() { + let queued_children = self.queued_blocks.dequeue_children(parent); + + for QueuedBlock { block, rsp_tx } in queued_children { + let hash = block.hash(); + let result = self + .validate_and_commit(block) + .map(|()| hash) + .map_err(Into::into); + let _ = rsp_tx.send(result); + new_parents.push(hash); + } + } + } + + /// Check that `block` is contextually valid based on the committed finalized + /// and non-finalized state. + fn check_contextual_validity(&mut self, block: &Block) -> Result<(), ValidateContextError> { + use ValidateContextError::*; + + if block + .coinbase_height() + .expect("valid blocks have a coinbase height") + <= self.sled.finalized_tip_height().expect( + "finalized state must contain at least one block to use the non-finalized state", + ) + { + Err(OrphanedBlock)?; + } + + // TODO: contextual validation design and implementation + Ok(()) } } @@ -52,11 +167,24 @@ impl Service for StateService { fn call(&mut self, req: Request) -> Self::Future { match req { - Request::CommitBlock { .. } => unimplemented!(), + Request::CommitBlock { block } => { + let (rsp_tx, rsp_rx) = oneshot::channel(); + + self.queue_and_commit_non_finalized_blocks(QueuedBlock { block, rsp_tx }); + + async move { + rsp_rx + .await + .expect("sender oneshot is not dropped") + .map(Response::Committed) + } + .boxed() + } Request::CommitFinalizedBlock { block } => { let (rsp_tx, rsp_rx) = oneshot::channel(); - self.sled.queue(QueuedBlock { block, rsp_tx }); + self.sled + .queue_and_commit_finalized_blocks(QueuedBlock { block, rsp_tx }); async move { rsp_rx diff --git a/zebra-state/src/memory_state.rs b/zebra-state/src/service/memory_state.rs similarity index 69% rename from zebra-state/src/memory_state.rs rename to zebra-state/src/service/memory_state.rs index e9c37bc1..d5a85fcf 100644 --- a/zebra-state/src/memory_state.rs +++ b/zebra-state/src/service/memory_state.rs @@ -5,11 +5,12 @@ use std::{ cmp::Ordering, collections::{BTreeMap, BTreeSet, HashMap, HashSet}, - fmt, + mem, ops::Deref, sync::Arc, }; +use tracing::instrument; use zebra_chain::{ block::{self, Block}, primitives::Groth16Proof, @@ -19,44 +20,6 @@ use zebra_chain::{ use crate::service::QueuedBlock; -/// The state of the chains in memory, incuding queued blocks. -#[derive(Default)] -pub struct NonFinalizedState { - /// Verified, non-finalized chains. - chain_set: BTreeSet, - /// Blocks awaiting their parent blocks for contextual verification. - contextual_queue: QueuedBlocks, -} - -/// A queue of blocks, awaiting the arrival of parent blocks. -#[derive(Default)] -struct QueuedBlocks { - /// Blocks awaiting their parent blocks for contextual verification. - blocks: HashMap, - /// Hashes from `queued_blocks`, indexed by parent hash. - by_parent: HashMap>, - /// Hashes from `queued_blocks`, indexed by block height. - by_height: BTreeMap>, -} - -impl NonFinalizedState { - pub fn finalize(&mut self) -> Arc { - todo!() - } - - pub fn queue(&mut self, _block: QueuedBlock) { - todo!() - } - - fn process_queued(&mut self, _new_parent: block::Hash) { - todo!() - } - - fn commit_block(&mut self, _block: QueuedBlock) -> Option { - todo!() - } -} - #[derive(Default, Clone)] struct Chain { blocks: BTreeMap>, @@ -342,16 +305,13 @@ impl UpdateWith> for Chain { } impl UpdateWith>> for Chain { + #[instrument(skip(self, joinsplit_data))] fn update_chain_state_with( &mut self, joinsplit_data: &Option>, ) { if let Some(joinsplit_data) = joinsplit_data { - for sprout::JoinSplit { - anchor, nullifiers, .. - } in joinsplit_data.joinsplits() - { - self.sprout_anchors.insert(*anchor); + for sprout::JoinSplit { nullifiers, .. } in joinsplit_data.joinsplits() { self.sprout_nullifiers.insert(nullifiers[0]); self.sprout_nullifiers.insert(nullifiers[1]); } @@ -363,14 +323,7 @@ impl UpdateWith>> for Chain { joinsplit_data: &Option>, ) { if let Some(joinsplit_data) = joinsplit_data { - for sprout::JoinSplit { - anchor, nullifiers, .. - } in joinsplit_data.joinsplits() - { - assert!( - self.sprout_anchors.remove(anchor), - "anchor must be present if block was" - ); + for sprout::JoinSplit { nullifiers, .. } in joinsplit_data.joinsplits() { assert!( self.sprout_nullifiers.remove(&nullifiers[0]), "nullifiers must be present if block was" @@ -387,11 +340,7 @@ impl UpdateWith>> for Chain { impl UpdateWith> for Chain { fn update_chain_state_with(&mut self, shielded_data: &Option) { if let Some(shielded_data) = shielded_data { - for sapling::Spend { - anchor, nullifier, .. - } in shielded_data.spends() - { - self.sapling_anchors.insert(*anchor); + for sapling::Spend { nullifier, .. } in shielded_data.spends() { self.sapling_nullifiers.insert(*nullifier); } } @@ -399,14 +348,7 @@ impl UpdateWith> for Chain { fn revert_chain_state_with(&mut self, shielded_data: &Option) { if let Some(shielded_data) = shielded_data { - for sapling::Spend { - anchor, nullifier, .. - } in shielded_data.spends() - { - assert!( - self.sapling_anchors.remove(anchor), - "anchor must be present if block was" - ); + for sapling::Spend { nullifier, .. } in shielded_data.spends() { assert!( self.sapling_nullifiers.remove(nullifier), "nullifier must be present if block was" @@ -460,11 +402,223 @@ impl Ord for Chain { } } +/// The state of the chains in memory, incuding queued blocks. +#[derive(Default)] +pub struct NonFinalizedState { + /// Verified, non-finalized chains, in ascending order. + /// + /// The best chain is `chain_set.last()` or `chain_set.iter().next_back()`. + chain_set: BTreeSet>, +} + +impl NonFinalizedState { + /// Finalize the lowest height block in the non-finalized portion of the best + /// chain and update all side-chains to match. + pub fn finalize(&mut self) -> Arc { + let chains = mem::take(&mut self.chain_set); + let mut chains = chains.into_iter(); + + // extract best chain + let mut best_chain = chains.next_back().expect("there's at least one chain"); + // extract the rest into side_chains so they can be mutated + let side_chains = chains; + + // remove the lowest height block from the best_chain as finalized_block + let finalized_block = best_chain.pop_root(); + // add best_chain back to `self.chain_set` + self.chain_set.insert(best_chain); + + // for each remaining chain in side_chains + for mut chain in side_chains { + // remove the first block from `chain` + let chain_start = chain.pop_root(); + // if block equals finalized_block + if chain_start == finalized_block { + // add the chain back to `self.chain_set` + self.chain_set.insert(chain); + } else { + // else discard `chain` + drop(chain); + } + } + + // return the finalized block + finalized_block + } + + /// Commit block to the non-finalize state. + pub fn commit_block(&mut self, block: Arc) { + let parent_hash = block.header.previous_block_hash; + + let mut parent_chain = self + .take_chain_if(|chain| chain.non_finalized_tip_hash() == parent_hash) + .or_else(|| { + self.chain_set + .iter() + .find_map(|chain| chain.fork(parent_hash)) + .map(Box::new) + }) + .expect("commit_block is only called with blocks that are ready to be commited"); + + parent_chain.push(block); + self.chain_set.insert(parent_chain); + } + + /// Commit block to the non-finalized state as a new chain where its parent + /// is the finalized tip. + pub fn commit_new_chain(&mut self, block: Arc) { + let mut chain = Chain::default(); + chain.push(block); + self.chain_set.insert(Box::new(chain)); + } + + /// Returns the length of the non-finalized portion of the current best chain. + pub fn best_chain_len(&self) -> block::Height { + block::Height( + self.chain_set + .iter() + .next_back() + .expect("only called after inserting a block") + .blocks + .len() as u32, + ) + } + + /// Returns `true` if `hash` is contained in the non-finalized portion of any + /// known chain. + pub fn any_chain_contains(&self, hash: &block::Hash) -> bool { + self.chain_set + .iter() + .any(|chain| chain.height_by_hash.contains_key(hash)) + } + + /// Remove and return the first chain satisfying the given predicate. + fn take_chain_if(&mut self, predicate: F) -> Option> + where + F: Fn(&Chain) -> bool, + { + let chains = mem::take(&mut self.chain_set); + let mut best_chain_iter = chains.into_iter().rev(); + + while let Some(next_best_chain) = best_chain_iter.next() { + // if the predicate says we should remove it + if predicate(&next_best_chain) { + // add back the remaining chains + for remaining_chain in best_chain_iter { + self.chain_set.insert(remaining_chain); + } + + // and return the chain + return Some(next_best_chain); + } else { + // add the chain back to the set and continue + self.chain_set.insert(next_best_chain); + } + } + + None + } +} + +/// A queue of blocks, awaiting the arrival of parent blocks. +#[derive(Default)] +pub struct QueuedBlocks { + /// Blocks awaiting their parent blocks for contextual verification. + blocks: HashMap, + /// Hashes from `queued_blocks`, indexed by parent hash. + by_parent: HashMap>, + /// Hashes from `queued_blocks`, indexed by block height. + by_height: BTreeMap>, +} + +impl QueuedBlocks { + /// Queue a block for eventual verification and commit. + /// + /// # Panics + /// + /// - if a block with the same `block::Hash` has already been queued. + pub fn queue(&mut self, new: QueuedBlock) { + let new_hash = new.block.hash(); + let new_height = new + .block + .coinbase_height() + .expect("validated non-finalized blocks have a coinbase height"); + let parent_hash = new.block.header.previous_block_hash; + + let replaced = self.blocks.insert(new_hash, new); + assert!(replaced.is_none(), "hashes must be unique"); + let inserted = self + .by_height + .entry(new_height) + .or_default() + .insert(new_hash); + assert!(inserted, "hashes must be unique"); + let inserted = self + .by_parent + .entry(parent_hash) + .or_default() + .insert(new_hash); + assert!(inserted, "hashes must be unique"); + + tracing::trace!(num_blocks = %self.blocks.len(), %parent_hash, ?new_height, "Finished queueing a new block"); + } + + /// Dequeue and return all blocks that were waiting for the arrival of + /// `parent`. + #[instrument(skip(self))] + pub fn dequeue_children(&mut self, parent: block::Hash) -> Vec { + let queued_children = self + .by_parent + .remove(&parent) + .unwrap_or_default() + .into_iter() + .map(|hash| { + self.blocks + .remove(&hash) + .expect("block is present if its hash is in by_parent") + }) + .collect::>(); + + for queued in &queued_children { + let height = queued.block.coinbase_height().unwrap(); + self.by_height.remove(&height); + } + + tracing::trace!(num_blocks = %self.blocks.len(), "Finished dequeuing blocks waiting for parent hash",); + + queued_children + } + + /// Remove all queued blocks whose height is less than or equal to the given + /// `finalized_tip_height`. + pub fn prune_by_height(&mut self, finalized_tip_height: block::Height) { + // split_off returns the values _greater than or equal to_ the key. What + // we need is the keys that are less than or equal to + // `finalized_tip_height`. To get this we have split at + // `finalized_tip_height + 1` and swap the removed portion of the list + // with the remainder. + let split_height = finalized_tip_height + 1; + let split_height = + split_height.expect("height after finalized tip won't exceed max height"); + let mut by_height = self.by_height.split_off(&split_height); + mem::swap(&mut self.by_height, &mut by_height); + + for hash in by_height.into_iter().flat_map(|(_, hashes)| hashes) { + let expired = self.blocks.remove(&hash).expect("block is present"); + let parent_hash = &expired.block.header.previous_block_hash; + self.by_parent + .get_mut(parent_hash) + .expect("parent is present") + .remove(&hash); + } + } +} + #[cfg(test)] mod tests { use transaction::Transaction; - use std::{env, mem}; + use std::{env, fmt, mem}; use zebra_chain::serialization::ZcashDeserializeInto; use zebra_chain::{ @@ -476,6 +630,14 @@ mod tests { use self::assert_eq; use super::*; + struct SummaryDebug(T); + + impl fmt::Debug for SummaryDebug> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}, len={}", std::any::type_name::(), self.0.len()) + } + } + /// Helper trait for constructing "valid" looking chains of blocks trait FakeChainHelper { fn make_fake_child(&self) -> Arc; @@ -551,23 +713,16 @@ mod tests { Ok(()) } - fn arbitrary_chain(height: block::Height) -> BoxedStrategy>> { - Block::partial_chain_strategy( - LedgerState { - tip_height: height, - is_coinbase: true, - network: Network::Mainnet, - }, - 100, - ) + fn arbitrary_chain(tip_height: block::Height) -> BoxedStrategy>> { + Block::partial_chain_strategy(LedgerState::new(tip_height, Network::Mainnet), 100) } prop_compose! { fn arbitrary_chain_and_count() (chain in arbitrary_chain(NetworkUpgrade::Blossom.activation_height(Network::Mainnet).unwrap())) - (count in 1..chain.len(), chain in Just(chain)) -> (NoDebug>>, usize) + (count in 1..chain.len(), chain in Just(chain)) -> (SummaryDebug>>, usize) { - (NoDebug(chain), count) + (SummaryDebug(chain), count) } } @@ -635,11 +790,3 @@ mod tests { Ok(()) } } - -struct NoDebug(T); - -impl fmt::Debug for NoDebug> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}, len={}", std::any::type_name::(), self.0.len()) - } -} diff --git a/zebra-state/src/sled_state.rs b/zebra-state/src/sled_state.rs index 665452dc..a1ba1e67 100644 --- a/zebra-state/src/sled_state.rs +++ b/zebra-state/src/sled_state.rs @@ -2,6 +2,7 @@ use std::{collections::HashMap, convert::TryInto, future::Future, sync::Arc}; +use tracing::trace; use zebra_chain::serialization::{ZcashDeserialize, ZcashSerialize}; use zebra_chain::{ block::{self, Block}, @@ -62,33 +63,34 @@ impl FinalizedState { /// /// After queueing a finalized block, this method checks whether the newly /// queued block (and any of its descendants) can be committed to the state. - pub fn queue(&mut self, queued_block: QueuedBlock) { + pub fn queue_and_commit_finalized_blocks(&mut self, queued_block: QueuedBlock) { let prev_hash = queued_block.block.header.previous_block_hash; self.queued_by_prev_hash.insert(prev_hash, queued_block); - // Cloning means the closure doesn't hold a borrow of &self, - // conflicting with mutable access in the loop below. - let hash_by_height = self.hash_by_height.clone(); - let tip_hash = || { - read_tip(&hash_by_height) - .expect("inability to look up tip is unrecoverable") - .map(|(_height, hash)| hash) - .unwrap_or(block::Hash([0; 32])) - }; - - while let Some(queued_block) = self.queued_by_prev_hash.remove(&tip_hash()) { + while let Some(queued_block) = self.queued_by_prev_hash.remove(&self.finalized_tip_hash()) { self.commit_finalized(queued_block) } } - /// Commit a finalized block to the state. - /// - /// It's the caller's responsibility to ensure that blocks are committed in - /// order. This function is called by [`process_queue`], which ensures order. - /// It is intentionally not exposed as part of the public API of the - /// [`FinalizedState`]. - fn commit_finalized(&mut self, queued_block: QueuedBlock) { - let QueuedBlock { block, rsp_tx } = queued_block; + /// Returns the hash of the current finalized tip block. + pub fn finalized_tip_hash(&self) -> block::Hash { + read_tip(&self.hash_by_height) + .expect("inability to look up tip is unrecoverable") + .map(|(_, hash)| hash) + // if the state is empty, return the genesis previous block hash + .unwrap_or(block::Hash([0; 32])) + } + + /// Returns the height of the current finalized tip block. + pub fn finalized_tip_height(&self) -> Option { + read_tip(&self.hash_by_height) + .expect("inability to look up tip is unrecoverable") + .map(|(height, _)| height) + } + + /// Immediately commit `block` to the finalized state. + pub fn commit_finalized_direct(&mut self, block: Arc) -> Result { + use sled::Transactional; let height = block .coinbase_height() @@ -96,8 +98,9 @@ impl FinalizedState { let height_bytes = height.0.to_be_bytes(); let hash = block.hash(); - use sled::Transactional; - let transaction_result = ( + trace!(?height, "Finalized block"); + + ( &self.hash_by_height, &self.height_by_hash, &self.block_by_height, @@ -116,10 +119,21 @@ impl FinalizedState { block_by_height.insert(&height_bytes, block_bytes)?; // for some reason type inference fails here - Ok::<_, sled::transaction::ConflictableTransactionError>(()) - }); + Ok::<_, sled::transaction::ConflictableTransactionError>(hash) + }) + .map_err(Into::into) + } - let _ = rsp_tx.send(transaction_result.map(|_| hash).map_err(Into::into)); + /// Commit a finalized block to the state. + /// + /// It's the caller's responsibility to ensure that blocks are committed in + /// order. This function is called by [`queue`], which ensures order. + /// It is intentionally not exposed as part of the public API of the + /// [`FinalizedState`]. + fn commit_finalized(&mut self, queued_block: QueuedBlock) { + let QueuedBlock { block, rsp_tx } = queued_block; + let result = self.commit_finalized_direct(block); + let _ = rsp_tx.send(result); } // TODO: this impl works only during checkpointing, it needs to be rewritten diff --git a/zebrad/src/components/sync.rs b/zebrad/src/components/sync.rs index ee06e5dc..ed178f73 100644 --- a/zebrad/src/components/sync.rs +++ b/zebrad/src/components/sync.rs @@ -612,7 +612,7 @@ where tracing::trace!(?hash, "requested block"); // This span is used to help diagnose sync warnings - let span = tracing::warn_span!("block_fetch_verify", ?hash); + let span = tracing::warn_span!("block_fetch_verify", %hash); let mut verifier = self.verifier.clone(); let task = tokio::spawn( async move {