Implement MVP of NonFinalizedState and integrate it with the state service (#1101)

* implement most of the chain functions
* implement fork
* fix outpoint handling in Chain struct
* update expect for work
* split utxo into two sets
* update the Chain definition
* remove allow attribute in zebra-state/lib.rs
* merge ChainSet type into MemoryState
* Add error messages to asserts
* export proptest impls for use in downstream crates
* add testjob for disabled feature in zebra-chain
* try to fix github actions syntax
* add module doc comment
* update RFC for utxos
* add missing header
* working proptest for Chain
* propagate back results over channel
* Start updating RFC to match changes
* implement queued block pruning
* and now it syncs wooo!
* remove empty modules
* setup config for proptests
* re-enable missing_docs lint
* update RFC to match changes in impl
* add documentation
* use more explicit variable names
This commit is contained in:
Jane Lusby 2020-10-07 20:07:32 -07:00 committed by GitHub
parent 1b7bf61f96
commit 855f9b5bcb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 595 additions and 242 deletions

1
Cargo.lock generated
View File

@ -3257,6 +3257,7 @@ version = "3.0.0-alpha.0"
dependencies = [ dependencies = [
"color-eyre", "color-eyre",
"dirs", "dirs",
"displaydoc",
"futures", "futures",
"hex", "hex",
"lazy_static", "lazy_static",

View File

@ -291,7 +291,6 @@ Push a block into a chain as the new tip
- Add key: `transaction.hash` and value: `(height, tx_index)` to `tx_by_hash` - Add key: `transaction.hash` and value: `(height, tx_index)` to `tx_by_hash`
- Add created utxos to `self.created_utxos` - Add created utxos to `self.created_utxos`
- Add spent utxos to `self.spent_utxos` - Add spent utxos to `self.spent_utxos`
- Add anchors to the appropriate `self.<version>_anchors`
- Add nullifiers to the appropriate `self.<version>_nullifiers` - Add nullifiers to the appropriate `self.<version>_nullifiers`
2. Add block to `self.blocks` 2. Add block to `self.blocks`
@ -309,7 +308,6 @@ Remove the lowest height block of the non-finalized portion of a chain.
- Remove `transaction.hash` from `tx_by_hash` - Remove `transaction.hash` from `tx_by_hash`
- Remove created utxos from `self.created_utxos` - Remove created utxos from `self.created_utxos`
- Remove spent utxos from `self.spent_utxos` - Remove spent utxos from `self.spent_utxos`
- Remove the anchors from the appropriate `self.<version>_anchors`
- Remove the nullifiers from the appropriate `self.<version>_nullifiers` - Remove the nullifiers from the appropriate `self.<version>_nullifiers`
3. Return the block 3. Return the block
@ -340,7 +338,6 @@ Remove the highest height block of the non-finalized portion of a chain.
- remove `transaction.hash` from `tx_by_hash` - remove `transaction.hash` from `tx_by_hash`
- Remove created utxos from `self.created_utxos` - Remove created utxos from `self.created_utxos`
- Remove spent utxos from `self.spent_utxos` - Remove spent utxos from `self.spent_utxos`
- Remove anchors from the appropriate `self.<version>_anchors`
- Remove the nullifiers from the appropriate `self.<version>_nullifiers` - Remove the nullifiers from the appropriate `self.<version>_nullifiers`
#### `Ord` #### `Ord`
@ -390,7 +387,70 @@ pub struct NonFinalizedState {
/// Blocks awaiting their parent blocks for contextual verification. /// Blocks awaiting their parent blocks for contextual verification.
contextual_queue: QueuedBlocks, contextual_queue: QueuedBlocks,
} }
```
#### `pub fn finalize(&mut self) -> Arc<Block>`
Finalize the lowest height block in the non-finalized portion of the best
chain and updates all side chains to match.
1. Extract the best chain from `self.chain_set` into `best_chain`
2. Extract the rest of the chains into a `side_chains` temporary variable, so
they can be mutated
3. Remove the lowest height block from the best chain with
`let finalized_block = best_chain.pop_root();`
4. Add `best_chain` back to `self.chain_set`
5. For each remaining `chain` in `side_chains`
- remove the lowest height block from `chain`
- If that block is equal to `finalized_block` add `chain` back to `self.chain_set`
- Else, drop `chain`
6. Return `finalized_block`
#### `fn commit_block(&mut self, block: Arc<Block>)`
Commit `block` to the non-finalized state.
1. If the block is a pre-Sapling block, panic.
2. If any chains tip hash equal `block.header.previous_block_hash` remove that chain from `self.chain_set`
3. Else Find the first chain that contains `block.parent` and fork it with
`block.parent` as the new tip
- `let fork = self.chain_set.iter().find_map(|chain| chain.fork(block.parent));`
4. Else panic, this should be unreachable because `commit_block` is only
called when `block` is ready to be committed.
5. Push `block` into `parent_chain`
6. Insert `parent_chain` into `self.chain_set`
### `pub(super) fn commit_new_chain(&mut self, block: Arc<Block>)`
Construct a new chain starting with `block`.
1. Construct a new empty chain
2. `push` `block` into that new chain
3. Insert the new chain into `self.chain_set`
### The `QueuedBlocks` type
The queued blocks type represents the non-finalized blocks that were commited
before their parent blocks were. It is responsible for tracking which blocks
are queued by their parent so they can be commited immediately after the
parent is commited. It also tracks blocks by their height so they can be
discarded if they ever end up below the reorg limit.
`NonFinalizedState` is defined by the following structure and API:
```rust
/// A queue of blocks, awaiting the arrival of parent blocks. /// A queue of blocks, awaiting the arrival of parent blocks.
#[derive(Debug, Default)] #[derive(Debug, Default)]
struct QueuedBlocks { struct QueuedBlocks {
@ -403,149 +463,110 @@ struct QueuedBlocks {
} }
``` ```
#### `pub fn finalize(&mut self) -> Arc<Block>` #### `pub fn queue(&mut self, new: QueuedBlock)`
Finalize the lowest height block in the non-finalized portion of the best Add a block to the queue of blocks waiting for their requisite context to
chain and updates all side chains to match. become available.
1. Extract the best chain from `self.chains` into `best_chain` 1. extract the `parent_hash`, `new_hash`, and `new_height` from `new.block`
2. Extract the rest of the chains into a `side_chains` temporary variable, so 2. Add `new` to `self.blocks` using `new_hash` as the key
they can be mutated
3. Remove the lowest height block from the best chain with 3. Add `new_hash` to the set of hashes in
`let block = best_chain.pop_root();` `self.by_parent.entry(parent_hash).or_default()`
4. Add `best_chain` back to `self.chains` 4. Add `new_hash` to the set of hashes in
`self.by_height.entry(new_height).or_default()`
5. For each remaining `chain` in `side_chains` #### `pub fn dequeue_children(&mut self, parent: block::Hash) -> Vec<QueuedBlock>`
- If `chain` starts with `block`, remove `block` and add `chain` back to
`self.chains`
- Else, drop `chain`
6. calculate the new finalized tip height from the new `best_chain` Dequeue the set of blocks waiting on `parent`.
7. for each `height` in `self.queued_by_height` where the height is lower than the 1. Remove the set of hashes waiting on `parent` from `self.by_parent`
new reorg limit
- for each `hash` in `self.queued_by_height.remove(height)`
- Remove the key `hash` from `self.queued_blocks` and store the removed `block`
- Find and remove `hash` from `self.queued_by_parent` using `block.parent`'s hash
8. Return `block` 2. Remove and collect each block in that set of hashes from `self.blocks` as
`queued_children`
### `pub fn queue(&mut self, block: QueuedBlock)` 3. For each `block` in `queued_children` remove the associated `block.hash`
from `self.by_height`
Queue a non-finalized block to be committed to the state. 4. Return `queued_children`
After queueing a non-finalized block, this method checks whether the newly #### `pub fn prune_by_height(&mut self, finalized_height: block::Height)`
queued block (and any of its descendants) can be committed to the state
1. If the block itself exists in any current chain, it has already been successfully verified: Prune all queued blocks whose height are less than or equal to
- broadcast `Ok(block.hash())` via `block.rsp_tx`, and return `finalized_height`.
2. If the parent block exists in any current chain: 1. Split the `by_height` list at the finalized height, removing all heights
- Call `let hash = self.commit_block(block)` that are below `finalized_height`
- Call `self.process_queued(hash)`
3. Else Add `block` to `self.queued_blocks` and related members and return 2. for each hash in the removed values of `by_height`
- remove the corresponding block from `self.blocks`
- remove the block's hash from the list of blocks waiting on
`block.header.previous_block_hash` from `self.by_parent`
### `fn process_queued(&mut self, new_parent: block::Hash)`
1. Create a list of `new_parents` and populate it with `new_parent`
2. While let Some(parent) = new_parents.pop()
- for each `hash` in `self.queued_by_parent.remove(&parent.hash)`
- lookup the `block` for `hash`
- remove `block` from `self.queued_blocks`
- remove `hash` from `self.queued_by_height`
- let hash = `self.commit_block(block)`;
- add `hash` to `new_parents`
### `fn commit_block(&mut self, block: QueuedBlock) -> block::Hash`
Try to commit `block` to the non-finalized state. Must succeed, because
`commit_block` is only called when `block` is ready to be committed.
1. If the block is a pre-Sapling block, panic.
2. Search for the first chain where `block.parent` == `chain.tip`. If it exists:
- return `self.push_block_on_chain(block, chain)`
3. Find the first chain that contains `block.parent` and fork it with
`block.parent` as the new tip
- `let fork = self.chains.iter().find_map(|chain| chain.fork(block.parent));`
4. If `fork` is `Some`
- call `let hash = self.push_block_on_chain(block, fork)`
- add `fork` to `self.chains`
- return `hash`
5. Else panic, this should be unreachable because `commit_block` is only
called when `block` is ready to be committed.
### `pub(super) fn push_block_on_chain(&mut self, block: QueuedBlock, &mut chain: Chain) -> block::Hash`
Try to commit `block` to `chain`. Must succeed, because
`push_block_on_chain` is only called when `block` is ready to be committed.
1. push `block` onto `chain`
2. broadcast `result` via `block.rsp_tx`
3. return `block.hash` if `result.is_ok()`
4. Else panic, this should be unreachable because `push_block_on_chain` is only
called when `block` is ready to be committed.
### Summary ### Summary
- `Chain` represents the non-finalized portion of a single chain - `Chain` represents the non-finalized portion of a single chain
- `NonFinalizedState` represents the non-finalized portion of all chains and all - `NonFinalizedState` represents the non-finalized portion of all chains
unverified blocks that are waiting for context to be available.
- `NonFinalizedState::queue` handles queueing and or committing blocks and - `QueuedBlocks` represents all unverified blocks that are waiting for
reorganizing chains (via `commit_block`) but not finalizing them context to be available.
- Finalized blocks are returned from `finalize` and must still be committed The state service uses the following entry points:
to disk afterwards - `commit_block` when it receives new blocks.
- `finalize` handles pruning queued blocks that are past the reorg limit - `finalize` to prevent chains in `NonFinalizedState` from growing beyond the reorg limit.
- [FinalizedState.queue_and_commit_finalized_blocks](#committing-finalized-blocks) on the blocks returned by `finalize`, to commit those finalized blocks to disk.
## Committing non-finalized blocks ## Committing non-finalized blocks
Given the above structures for manipulating the non-finalized state new Given the above structures for manipulating the non-finalized state new
`non-finalized` blocks are committed in two steps. First we commit the block `non-finalized` blocks are commited as follows:
to the in memory state, then we finalize all lowest height blocks that are
past the reorg limit, finally we process any queued blocks and prune any that
are now past the reorg limit.
1. If the block itself exists in the finalized chain, it has already been successfully verified: ### `fn queue_and_commit_non_finalized_blocks(&mut self, new: QueuedBlock)`
1. If the block itself exists in the finalized chain, it has already been
successfully verified:
- broadcast `Ok(block.hash())` via `block.rsp_tx`, and return - broadcast `Ok(block.hash())` via `block.rsp_tx`, and return
2. Run contextual validation on `block` against the finalized and non 2. Add `block` to `self.queued_blocks`
finalized state
3. If `block.parent` == `finalized_tip.hash` 3. If `block.header.previous_block_hash` is not present in the finalized or
- Construct a new `chain` with `Chain::default` non-finalized state return early
- call `let hash = chain_set.push_block_on_chain(block, chain)`
- add `fork` to `chain_set.chains`
- return `hash`
4. Otherwise, commit or queue the block to the non-finalized state with 4. Else iteratively attempt to process queued blocks by their parent hash
`chain_set.queue(block);` starting with `block.header.previous_block_hash`
5. If the best chain is longer than the reorg limit 5. While there are recently commited parent hashes to process
- Finalize all lowest height blocks in the best chain, and commit them to - Dequeue all blocks waiting on `parent` with `let queued_children =
disk with `CommitFinalizedBlock`: self.queued_blocks.dequeue_children(parent);`
- for each queued `block`
- **Run contextual validation** on `block`
- If the block fails contextual validation return the result over the
associated channel
- Else if the block's previous hash is the finalized tip add to the
non-finalized state with `self.mem.commit_new_chain(block)`
- Else add the new block to an existing non-finalized chain or new fork
with `self.mem.commit_block(block);`
- Return `Ok(hash)` over the associated channel to indicate the block
was successfully commited
- Add `block.hash` to the set of recently commited parent hashes to
process
6. While the length of the non-finalized portion of the best chain is greater
than the reorg limit
- Remove the lowest height block from the non-finalized state with
`self.mem.finalize();`
- Commit that block to the finalized state with
`self.sled.commit_finalized_direct(finalized);`
7. Prune orphaned blocks from `self.queued_blocks` with
`self.queued_blocks.prune_by_height(finalized_height);`
```
while self.best_chain().len() > reorg_limit {
let finalized = chain_set.finalize()?;
let request = CommitFinalizedBlock { finalized };
sled_state.ready_and().await?.call(request).await?;
};
```
## Sled data structures ## Sled data structures
[sled]: #sled [sled]: #sled
@ -604,6 +625,8 @@ Committing a block to the sled state should be implemented as a wrapper around
a function also called by [`Request::CommitBlock`](#request-commit-block), a function also called by [`Request::CommitBlock`](#request-commit-block),
which should: which should:
### `pub fn queue_and_commit_finalized_blocks(&mut self, queued_block: QueuedBlock)`
1. Obtain the highest entry of `hash_by_height` as `(old_height, old_tip)`. 1. Obtain the highest entry of `hash_by_height` as `(old_height, old_tip)`.
Check that `block`'s parent hash is `old_tip` and its height is Check that `block`'s parent hash is `old_tip` and its height is
`old_height+1`, or panic. This check is performed as defense-in-depth to `old_height+1`, or panic. This check is performed as defense-in-depth to

View File

@ -14,6 +14,8 @@ mod arbitrary;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
use std::fmt;
pub use hash::Hash; pub use hash::Hash;
pub use header::Header; pub use header::Header;
pub use height::Height; pub use height::Height;
@ -36,6 +38,28 @@ pub struct Block {
pub transactions: Vec<std::sync::Arc<Transaction>>, pub transactions: Vec<std::sync::Arc<Transaction>>,
} }
impl fmt::Display for Block {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut fmter = f.debug_struct("Block");
if let Some(height) = self.coinbase_height() {
fmter.field("height", &height);
}
fmter.field("hash", &DisplayToDebug(self.hash())).finish()
}
}
struct DisplayToDebug<T>(T);
impl<T> fmt::Debug for DisplayToDebug<T>
where
T: fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
impl Block { impl Block {
/// Return the block height reported in the coinbase transaction, if any. /// Return the block height reported in the coinbase transaction, if any.
pub fn coinbase_height(&self) -> Option<Height> { pub fn coinbase_height(&self) -> Option<Height> {

View File

@ -30,6 +30,8 @@ impl Arbitrary for Block {
} }
impl Block { impl Block {
/// Returns a strategy for creating Vecs of blocks with increasing height of
/// the given length.
pub fn partial_chain_strategy( pub fn partial_chain_strategy(
init: LedgerState, init: LedgerState,
count: usize, count: usize,

View File

@ -25,12 +25,28 @@ pub mod work;
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
#[cfg(any(test, feature = "proptest-impl"))] #[cfg(any(test, feature = "proptest-impl"))]
#[non_exhaustive]
/// The configuration data for proptest when generating arbitrary chains
pub struct LedgerState { pub struct LedgerState {
/// The tip height of the block or start of the chain
pub tip_height: block::Height, pub tip_height: block::Height,
pub is_coinbase: bool, is_coinbase: bool,
/// The network to generate fake blocks for
pub network: parameters::Network, pub network: parameters::Network,
} }
#[cfg(any(test, feature = "proptest-impl"))]
impl LedgerState {
/// Construct a new ledger state for generating arbitrary chains via proptest
pub fn new(tip_height: block::Height, network: parameters::Network) -> Self {
Self {
tip_height,
is_coinbase: true,
network,
}
}
}
#[cfg(any(test, feature = "proptest-impl"))] #[cfg(any(test, feature = "proptest-impl"))]
impl Default for LedgerState { impl Default for LedgerState {
fn default() -> Self { fn default() -> Self {

View File

@ -1,7 +1,5 @@
//! Note and value commitments. //! Note and value commitments.
#[cfg(any(test, feature = "proptest-impl"))]
mod arbitrary;
#[cfg(test)] #[cfg(test)]
mod test_vectors; mod test_vectors;

View File

@ -9,8 +9,6 @@
//! [3.1]: https://zips.z.cash/protocol/protocol.pdf#addressesandkeys //! [3.1]: https://zips.z.cash/protocol/protocol.pdf#addressesandkeys
#![allow(clippy::unit_arg)] #![allow(clippy::unit_arg)]
#[cfg(any(test, feature = "proptest-impl"))]
mod arbitrary;
#[cfg(test)] #[cfg(test)]
mod test_vectors; mod test_vectors;
#[cfg(test)] #[cfg(test)]
@ -26,9 +24,6 @@ use std::{
use bech32::{self, FromBase32, ToBase32}; use bech32::{self, FromBase32, ToBase32};
use rand_core::{CryptoRng, RngCore}; use rand_core::{CryptoRng, RngCore};
#[cfg(any(test, feature = "proptest-impl"))]
use proptest_derive::Arbitrary;
use crate::{ use crate::{
parameters::Network, parameters::Network,
primitives::redjubjub::{self, SpendAuth}, primitives::redjubjub::{self, SpendAuth},
@ -183,7 +178,10 @@ mod sk_hrp {
/// ///
/// [ps]: https://zips.z.cash/protocol/protocol.pdf#saplingkeycomponents /// [ps]: https://zips.z.cash/protocol/protocol.pdf#saplingkeycomponents
#[derive(Copy, Clone, Debug, Eq, PartialEq)] #[derive(Copy, Clone, Debug, Eq, PartialEq)]
#[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))] #[cfg_attr(
any(test, feature = "proptest-impl"),
derive(proptest_derive::Arbitrary)
)]
pub struct SpendingKey { pub struct SpendingKey {
network: Network, network: Network,
bytes: [u8; 32], bytes: [u8; 32],
@ -610,7 +608,10 @@ impl PartialEq<[u8; 32]> for IncomingViewingKey {
/// ///
/// [ps]: https://zips.z.cash/protocol/protocol.pdf#saplingkeycomponents /// [ps]: https://zips.z.cash/protocol/protocol.pdf#saplingkeycomponents
#[derive(Copy, Clone, Eq, PartialEq)] #[derive(Copy, Clone, Eq, PartialEq)]
#[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))] #[cfg_attr(
any(test, feature = "proptest-impl"),
derive(proptest_derive::Arbitrary)
)]
pub struct Diversifier(pub [u8; 11]); pub struct Diversifier(pub [u8; 11]);
impl fmt::Debug for Diversifier { impl fmt::Debug for Diversifier {

View File

@ -1 +0,0 @@

View File

@ -105,6 +105,8 @@ impl Transaction {
.boxed() .boxed()
} }
/// Proptest Strategy for creating a Vector of transactions where the first
/// transaction is always the only coinbase transaction
pub fn vec_strategy( pub fn vec_strategy(
mut ledger_state: LedgerState, mut ledger_state: LedgerState,
len: usize, len: usize,

View File

@ -24,6 +24,7 @@ tracing = "0.1"
tracing-error = "0.1.2" tracing-error = "0.1.2"
thiserror = "1.0.21" thiserror = "1.0.21"
tokio = { version = "0.2.22", features = ["sync"] } tokio = { version = "0.2.22", features = ["sync"] }
displaydoc = "0.1.7"
[dev-dependencies] [dev-dependencies]
zebra-chain = { path = "../zebra-chain", features = ["proptest-impl"] } zebra-chain = { path = "../zebra-chain", features = ["proptest-impl"] }

View File

@ -8,7 +8,6 @@
mod config; mod config;
mod constants; mod constants;
mod memory_state;
mod request; mod request;
mod response; mod response;
mod service; mod service;
@ -19,7 +18,6 @@ mod util;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
use memory_state::NonFinalizedState;
use service::QueuedBlock; use service::QueuedBlock;
use sled_state::FinalizedState; use sled_state::FinalizedState;

View File

@ -6,14 +6,19 @@ use std::{
}; };
use futures::future::{FutureExt, TryFutureExt}; use futures::future::{FutureExt, TryFutureExt};
use memory_state::{NonFinalizedState, QueuedBlocks};
use thiserror::Error;
use tokio::sync::oneshot; use tokio::sync::oneshot;
use tower::{buffer::Buffer, util::BoxService, Service}; use tower::{buffer::Buffer, util::BoxService, Service};
use tracing::instrument;
use zebra_chain::{ use zebra_chain::{
block::{self, Block}, block::{self, Block},
parameters::Network, parameters::Network,
}; };
use crate::{BoxError, Config, FinalizedState, NonFinalizedState, Request, Response}; use crate::{BoxError, Config, FinalizedState, Request, Response};
mod memory_state;
// todo: put this somewhere // todo: put this somewhere
#[derive(Debug)] #[derive(Debug)]
@ -29,14 +34,124 @@ struct StateService {
/// Holds data relating to finalized chain state. /// Holds data relating to finalized chain state.
sled: FinalizedState, sled: FinalizedState,
/// Holds data relating to non-finalized chain state. /// Holds data relating to non-finalized chain state.
_mem: NonFinalizedState, mem: NonFinalizedState,
/// Blocks awaiting their parent blocks for contextual verification.
queued_blocks: QueuedBlocks,
}
#[derive(Debug, Error)]
#[error("block is not contextually valid")]
struct CommitError(#[from] ValidateContextError);
#[derive(displaydoc::Display, Debug, Error)]
enum ValidateContextError {
/// block.height is lower than the current finalized height
OrphanedBlock,
} }
impl StateService { impl StateService {
pub fn new(config: Config, network: Network) -> Self { pub fn new(config: Config, network: Network) -> Self {
let sled = FinalizedState::new(&config, network); let sled = FinalizedState::new(&config, network);
let _mem = NonFinalizedState::default(); let mem = NonFinalizedState::default();
Self { sled, _mem } let queued_blocks = QueuedBlocks::default();
Self {
sled,
mem,
queued_blocks,
}
}
/// Queue a non finalized block for verification and check if any queued
/// blocks are ready to be verified and committed to the state.
///
/// This function encodes the logic for [committing non-finalized blocks][1]
/// in RFC0005.
///
/// [1]: https://zebra.zfnd.org/dev/rfcs/0005-state-updates.html#committing-non-finalized-blocks
#[instrument(skip(self, new))]
fn queue_and_commit_non_finalized_blocks(&mut self, new: QueuedBlock) {
let parent_hash = new.block.header.previous_block_hash;
self.queued_blocks.queue(new);
if !self.can_fork_chain_at(&parent_hash) {
return;
}
self.process_queued(parent_hash);
while self.mem.best_chain_len() > crate::constants::MAX_BLOCK_REORG_HEIGHT {
let finalized = self.mem.finalize();
self.sled
.commit_finalized_direct(finalized)
.expect("expected that sled errors would not occur");
}
self.queued_blocks
.prune_by_height(self.sled.finalized_tip_height().expect(
"Finalized state must have at least one block before committing non-finalized state",
));
}
/// Run contextual validation on `block` and add it to the non-finalized
/// state if it is contextually valid.
fn validate_and_commit(&mut self, block: Arc<Block>) -> Result<(), CommitError> {
self.check_contextual_validity(&block)?;
let parent_hash = block.header.previous_block_hash;
if self.sled.finalized_tip_hash() == parent_hash {
self.mem.commit_new_chain(block);
} else {
self.mem.commit_block(block);
}
Ok(())
}
/// Returns `true` if `hash` is a valid previous block hash for new non-finalized blocks.
fn can_fork_chain_at(&self, hash: &block::Hash) -> bool {
self.mem.any_chain_contains(hash) || &self.sled.finalized_tip_hash() == hash
}
/// Attempt to validate and commit all queued blocks whose parents have
/// recently arrived starting from `new_parent`, in breadth-first ordering.
#[instrument(skip(self))]
fn process_queued(&mut self, new_parent: block::Hash) {
let mut new_parents = vec![new_parent];
while let Some(parent) = new_parents.pop() {
let queued_children = self.queued_blocks.dequeue_children(parent);
for QueuedBlock { block, rsp_tx } in queued_children {
let hash = block.hash();
let result = self
.validate_and_commit(block)
.map(|()| hash)
.map_err(Into::into);
let _ = rsp_tx.send(result);
new_parents.push(hash);
}
}
}
/// Check that `block` is contextually valid based on the committed finalized
/// and non-finalized state.
fn check_contextual_validity(&mut self, block: &Block) -> Result<(), ValidateContextError> {
use ValidateContextError::*;
if block
.coinbase_height()
.expect("valid blocks have a coinbase height")
<= self.sled.finalized_tip_height().expect(
"finalized state must contain at least one block to use the non-finalized state",
)
{
Err(OrphanedBlock)?;
}
// TODO: contextual validation design and implementation
Ok(())
} }
} }
@ -52,11 +167,24 @@ impl Service<Request> for StateService {
fn call(&mut self, req: Request) -> Self::Future { fn call(&mut self, req: Request) -> Self::Future {
match req { match req {
Request::CommitBlock { .. } => unimplemented!(), Request::CommitBlock { block } => {
let (rsp_tx, rsp_rx) = oneshot::channel();
self.queue_and_commit_non_finalized_blocks(QueuedBlock { block, rsp_tx });
async move {
rsp_rx
.await
.expect("sender oneshot is not dropped")
.map(Response::Committed)
}
.boxed()
}
Request::CommitFinalizedBlock { block } => { Request::CommitFinalizedBlock { block } => {
let (rsp_tx, rsp_rx) = oneshot::channel(); let (rsp_tx, rsp_rx) = oneshot::channel();
self.sled.queue(QueuedBlock { block, rsp_tx }); self.sled
.queue_and_commit_finalized_blocks(QueuedBlock { block, rsp_tx });
async move { async move {
rsp_rx rsp_rx

View File

@ -5,11 +5,12 @@
use std::{ use std::{
cmp::Ordering, cmp::Ordering,
collections::{BTreeMap, BTreeSet, HashMap, HashSet}, collections::{BTreeMap, BTreeSet, HashMap, HashSet},
fmt, mem,
ops::Deref, ops::Deref,
sync::Arc, sync::Arc,
}; };
use tracing::instrument;
use zebra_chain::{ use zebra_chain::{
block::{self, Block}, block::{self, Block},
primitives::Groth16Proof, primitives::Groth16Proof,
@ -19,44 +20,6 @@ use zebra_chain::{
use crate::service::QueuedBlock; use crate::service::QueuedBlock;
/// The state of the chains in memory, incuding queued blocks.
#[derive(Default)]
pub struct NonFinalizedState {
/// Verified, non-finalized chains.
chain_set: BTreeSet<Chain>,
/// Blocks awaiting their parent blocks for contextual verification.
contextual_queue: QueuedBlocks,
}
/// A queue of blocks, awaiting the arrival of parent blocks.
#[derive(Default)]
struct QueuedBlocks {
/// Blocks awaiting their parent blocks for contextual verification.
blocks: HashMap<block::Hash, QueuedBlock>,
/// Hashes from `queued_blocks`, indexed by parent hash.
by_parent: HashMap<block::Hash, Vec<block::Hash>>,
/// Hashes from `queued_blocks`, indexed by block height.
by_height: BTreeMap<block::Height, Vec<block::Hash>>,
}
impl NonFinalizedState {
pub fn finalize(&mut self) -> Arc<Block> {
todo!()
}
pub fn queue(&mut self, _block: QueuedBlock) {
todo!()
}
fn process_queued(&mut self, _new_parent: block::Hash) {
todo!()
}
fn commit_block(&mut self, _block: QueuedBlock) -> Option<block::Hash> {
todo!()
}
}
#[derive(Default, Clone)] #[derive(Default, Clone)]
struct Chain { struct Chain {
blocks: BTreeMap<block::Height, Arc<Block>>, blocks: BTreeMap<block::Height, Arc<Block>>,
@ -342,16 +305,13 @@ impl UpdateWith<Vec<transparent::Input>> for Chain {
} }
impl UpdateWith<Option<transaction::JoinSplitData<Groth16Proof>>> for Chain { impl UpdateWith<Option<transaction::JoinSplitData<Groth16Proof>>> for Chain {
#[instrument(skip(self, joinsplit_data))]
fn update_chain_state_with( fn update_chain_state_with(
&mut self, &mut self,
joinsplit_data: &Option<transaction::JoinSplitData<Groth16Proof>>, joinsplit_data: &Option<transaction::JoinSplitData<Groth16Proof>>,
) { ) {
if let Some(joinsplit_data) = joinsplit_data { if let Some(joinsplit_data) = joinsplit_data {
for sprout::JoinSplit { for sprout::JoinSplit { nullifiers, .. } in joinsplit_data.joinsplits() {
anchor, nullifiers, ..
} in joinsplit_data.joinsplits()
{
self.sprout_anchors.insert(*anchor);
self.sprout_nullifiers.insert(nullifiers[0]); self.sprout_nullifiers.insert(nullifiers[0]);
self.sprout_nullifiers.insert(nullifiers[1]); self.sprout_nullifiers.insert(nullifiers[1]);
} }
@ -363,14 +323,7 @@ impl UpdateWith<Option<transaction::JoinSplitData<Groth16Proof>>> for Chain {
joinsplit_data: &Option<transaction::JoinSplitData<Groth16Proof>>, joinsplit_data: &Option<transaction::JoinSplitData<Groth16Proof>>,
) { ) {
if let Some(joinsplit_data) = joinsplit_data { if let Some(joinsplit_data) = joinsplit_data {
for sprout::JoinSplit { for sprout::JoinSplit { nullifiers, .. } in joinsplit_data.joinsplits() {
anchor, nullifiers, ..
} in joinsplit_data.joinsplits()
{
assert!(
self.sprout_anchors.remove(anchor),
"anchor must be present if block was"
);
assert!( assert!(
self.sprout_nullifiers.remove(&nullifiers[0]), self.sprout_nullifiers.remove(&nullifiers[0]),
"nullifiers must be present if block was" "nullifiers must be present if block was"
@ -387,11 +340,7 @@ impl UpdateWith<Option<transaction::JoinSplitData<Groth16Proof>>> for Chain {
impl UpdateWith<Option<transaction::ShieldedData>> for Chain { impl UpdateWith<Option<transaction::ShieldedData>> for Chain {
fn update_chain_state_with(&mut self, shielded_data: &Option<transaction::ShieldedData>) { fn update_chain_state_with(&mut self, shielded_data: &Option<transaction::ShieldedData>) {
if let Some(shielded_data) = shielded_data { if let Some(shielded_data) = shielded_data {
for sapling::Spend { for sapling::Spend { nullifier, .. } in shielded_data.spends() {
anchor, nullifier, ..
} in shielded_data.spends()
{
self.sapling_anchors.insert(*anchor);
self.sapling_nullifiers.insert(*nullifier); self.sapling_nullifiers.insert(*nullifier);
} }
} }
@ -399,14 +348,7 @@ impl UpdateWith<Option<transaction::ShieldedData>> for Chain {
fn revert_chain_state_with(&mut self, shielded_data: &Option<transaction::ShieldedData>) { fn revert_chain_state_with(&mut self, shielded_data: &Option<transaction::ShieldedData>) {
if let Some(shielded_data) = shielded_data { if let Some(shielded_data) = shielded_data {
for sapling::Spend { for sapling::Spend { nullifier, .. } in shielded_data.spends() {
anchor, nullifier, ..
} in shielded_data.spends()
{
assert!(
self.sapling_anchors.remove(anchor),
"anchor must be present if block was"
);
assert!( assert!(
self.sapling_nullifiers.remove(nullifier), self.sapling_nullifiers.remove(nullifier),
"nullifier must be present if block was" "nullifier must be present if block was"
@ -460,11 +402,223 @@ impl Ord for Chain {
} }
} }
/// The state of the chains in memory, incuding queued blocks.
#[derive(Default)]
pub struct NonFinalizedState {
/// Verified, non-finalized chains, in ascending order.
///
/// The best chain is `chain_set.last()` or `chain_set.iter().next_back()`.
chain_set: BTreeSet<Box<Chain>>,
}
impl NonFinalizedState {
/// Finalize the lowest height block in the non-finalized portion of the best
/// chain and update all side-chains to match.
pub fn finalize(&mut self) -> Arc<Block> {
let chains = mem::take(&mut self.chain_set);
let mut chains = chains.into_iter();
// extract best chain
let mut best_chain = chains.next_back().expect("there's at least one chain");
// extract the rest into side_chains so they can be mutated
let side_chains = chains;
// remove the lowest height block from the best_chain as finalized_block
let finalized_block = best_chain.pop_root();
// add best_chain back to `self.chain_set`
self.chain_set.insert(best_chain);
// for each remaining chain in side_chains
for mut chain in side_chains {
// remove the first block from `chain`
let chain_start = chain.pop_root();
// if block equals finalized_block
if chain_start == finalized_block {
// add the chain back to `self.chain_set`
self.chain_set.insert(chain);
} else {
// else discard `chain`
drop(chain);
}
}
// return the finalized block
finalized_block
}
/// Commit block to the non-finalize state.
pub fn commit_block(&mut self, block: Arc<Block>) {
let parent_hash = block.header.previous_block_hash;
let mut parent_chain = self
.take_chain_if(|chain| chain.non_finalized_tip_hash() == parent_hash)
.or_else(|| {
self.chain_set
.iter()
.find_map(|chain| chain.fork(parent_hash))
.map(Box::new)
})
.expect("commit_block is only called with blocks that are ready to be commited");
parent_chain.push(block);
self.chain_set.insert(parent_chain);
}
/// Commit block to the non-finalized state as a new chain where its parent
/// is the finalized tip.
pub fn commit_new_chain(&mut self, block: Arc<Block>) {
let mut chain = Chain::default();
chain.push(block);
self.chain_set.insert(Box::new(chain));
}
/// Returns the length of the non-finalized portion of the current best chain.
pub fn best_chain_len(&self) -> block::Height {
block::Height(
self.chain_set
.iter()
.next_back()
.expect("only called after inserting a block")
.blocks
.len() as u32,
)
}
/// Returns `true` if `hash` is contained in the non-finalized portion of any
/// known chain.
pub fn any_chain_contains(&self, hash: &block::Hash) -> bool {
self.chain_set
.iter()
.any(|chain| chain.height_by_hash.contains_key(hash))
}
/// Remove and return the first chain satisfying the given predicate.
fn take_chain_if<F>(&mut self, predicate: F) -> Option<Box<Chain>>
where
F: Fn(&Chain) -> bool,
{
let chains = mem::take(&mut self.chain_set);
let mut best_chain_iter = chains.into_iter().rev();
while let Some(next_best_chain) = best_chain_iter.next() {
// if the predicate says we should remove it
if predicate(&next_best_chain) {
// add back the remaining chains
for remaining_chain in best_chain_iter {
self.chain_set.insert(remaining_chain);
}
// and return the chain
return Some(next_best_chain);
} else {
// add the chain back to the set and continue
self.chain_set.insert(next_best_chain);
}
}
None
}
}
/// A queue of blocks, awaiting the arrival of parent blocks.
#[derive(Default)]
pub struct QueuedBlocks {
/// Blocks awaiting their parent blocks for contextual verification.
blocks: HashMap<block::Hash, QueuedBlock>,
/// Hashes from `queued_blocks`, indexed by parent hash.
by_parent: HashMap<block::Hash, HashSet<block::Hash>>,
/// Hashes from `queued_blocks`, indexed by block height.
by_height: BTreeMap<block::Height, HashSet<block::Hash>>,
}
impl QueuedBlocks {
/// Queue a block for eventual verification and commit.
///
/// # Panics
///
/// - if a block with the same `block::Hash` has already been queued.
pub fn queue(&mut self, new: QueuedBlock) {
let new_hash = new.block.hash();
let new_height = new
.block
.coinbase_height()
.expect("validated non-finalized blocks have a coinbase height");
let parent_hash = new.block.header.previous_block_hash;
let replaced = self.blocks.insert(new_hash, new);
assert!(replaced.is_none(), "hashes must be unique");
let inserted = self
.by_height
.entry(new_height)
.or_default()
.insert(new_hash);
assert!(inserted, "hashes must be unique");
let inserted = self
.by_parent
.entry(parent_hash)
.or_default()
.insert(new_hash);
assert!(inserted, "hashes must be unique");
tracing::trace!(num_blocks = %self.blocks.len(), %parent_hash, ?new_height, "Finished queueing a new block");
}
/// Dequeue and return all blocks that were waiting for the arrival of
/// `parent`.
#[instrument(skip(self))]
pub fn dequeue_children(&mut self, parent: block::Hash) -> Vec<QueuedBlock> {
let queued_children = self
.by_parent
.remove(&parent)
.unwrap_or_default()
.into_iter()
.map(|hash| {
self.blocks
.remove(&hash)
.expect("block is present if its hash is in by_parent")
})
.collect::<Vec<_>>();
for queued in &queued_children {
let height = queued.block.coinbase_height().unwrap();
self.by_height.remove(&height);
}
tracing::trace!(num_blocks = %self.blocks.len(), "Finished dequeuing blocks waiting for parent hash",);
queued_children
}
/// Remove all queued blocks whose height is less than or equal to the given
/// `finalized_tip_height`.
pub fn prune_by_height(&mut self, finalized_tip_height: block::Height) {
// split_off returns the values _greater than or equal to_ the key. What
// we need is the keys that are less than or equal to
// `finalized_tip_height`. To get this we have split at
// `finalized_tip_height + 1` and swap the removed portion of the list
// with the remainder.
let split_height = finalized_tip_height + 1;
let split_height =
split_height.expect("height after finalized tip won't exceed max height");
let mut by_height = self.by_height.split_off(&split_height);
mem::swap(&mut self.by_height, &mut by_height);
for hash in by_height.into_iter().flat_map(|(_, hashes)| hashes) {
let expired = self.blocks.remove(&hash).expect("block is present");
let parent_hash = &expired.block.header.previous_block_hash;
self.by_parent
.get_mut(parent_hash)
.expect("parent is present")
.remove(&hash);
}
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use transaction::Transaction; use transaction::Transaction;
use std::{env, mem}; use std::{env, fmt, mem};
use zebra_chain::serialization::ZcashDeserializeInto; use zebra_chain::serialization::ZcashDeserializeInto;
use zebra_chain::{ use zebra_chain::{
@ -476,6 +630,14 @@ mod tests {
use self::assert_eq; use self::assert_eq;
use super::*; use super::*;
struct SummaryDebug<T>(T);
impl<T> fmt::Debug for SummaryDebug<Vec<T>> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}, len={}", std::any::type_name::<T>(), self.0.len())
}
}
/// Helper trait for constructing "valid" looking chains of blocks /// Helper trait for constructing "valid" looking chains of blocks
trait FakeChainHelper { trait FakeChainHelper {
fn make_fake_child(&self) -> Arc<Block>; fn make_fake_child(&self) -> Arc<Block>;
@ -551,23 +713,16 @@ mod tests {
Ok(()) Ok(())
} }
fn arbitrary_chain(height: block::Height) -> BoxedStrategy<Vec<Arc<Block>>> { fn arbitrary_chain(tip_height: block::Height) -> BoxedStrategy<Vec<Arc<Block>>> {
Block::partial_chain_strategy( Block::partial_chain_strategy(LedgerState::new(tip_height, Network::Mainnet), 100)
LedgerState {
tip_height: height,
is_coinbase: true,
network: Network::Mainnet,
},
100,
)
} }
prop_compose! { prop_compose! {
fn arbitrary_chain_and_count() fn arbitrary_chain_and_count()
(chain in arbitrary_chain(NetworkUpgrade::Blossom.activation_height(Network::Mainnet).unwrap())) (chain in arbitrary_chain(NetworkUpgrade::Blossom.activation_height(Network::Mainnet).unwrap()))
(count in 1..chain.len(), chain in Just(chain)) -> (NoDebug<Vec<Arc<Block>>>, usize) (count in 1..chain.len(), chain in Just(chain)) -> (SummaryDebug<Vec<Arc<Block>>>, usize)
{ {
(NoDebug(chain), count) (SummaryDebug(chain), count)
} }
} }
@ -635,11 +790,3 @@ mod tests {
Ok(()) Ok(())
} }
} }
struct NoDebug<T>(T);
impl<T> fmt::Debug for NoDebug<Vec<T>> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}, len={}", std::any::type_name::<T>(), self.0.len())
}
}

View File

@ -2,6 +2,7 @@
use std::{collections::HashMap, convert::TryInto, future::Future, sync::Arc}; use std::{collections::HashMap, convert::TryInto, future::Future, sync::Arc};
use tracing::trace;
use zebra_chain::serialization::{ZcashDeserialize, ZcashSerialize}; use zebra_chain::serialization::{ZcashDeserialize, ZcashSerialize};
use zebra_chain::{ use zebra_chain::{
block::{self, Block}, block::{self, Block},
@ -62,33 +63,34 @@ impl FinalizedState {
/// ///
/// After queueing a finalized block, this method checks whether the newly /// After queueing a finalized block, this method checks whether the newly
/// queued block (and any of its descendants) can be committed to the state. /// queued block (and any of its descendants) can be committed to the state.
pub fn queue(&mut self, queued_block: QueuedBlock) { pub fn queue_and_commit_finalized_blocks(&mut self, queued_block: QueuedBlock) {
let prev_hash = queued_block.block.header.previous_block_hash; let prev_hash = queued_block.block.header.previous_block_hash;
self.queued_by_prev_hash.insert(prev_hash, queued_block); self.queued_by_prev_hash.insert(prev_hash, queued_block);
// Cloning means the closure doesn't hold a borrow of &self, while let Some(queued_block) = self.queued_by_prev_hash.remove(&self.finalized_tip_hash()) {
// conflicting with mutable access in the loop below.
let hash_by_height = self.hash_by_height.clone();
let tip_hash = || {
read_tip(&hash_by_height)
.expect("inability to look up tip is unrecoverable")
.map(|(_height, hash)| hash)
.unwrap_or(block::Hash([0; 32]))
};
while let Some(queued_block) = self.queued_by_prev_hash.remove(&tip_hash()) {
self.commit_finalized(queued_block) self.commit_finalized(queued_block)
} }
} }
/// Commit a finalized block to the state. /// Returns the hash of the current finalized tip block.
/// pub fn finalized_tip_hash(&self) -> block::Hash {
/// It's the caller's responsibility to ensure that blocks are committed in read_tip(&self.hash_by_height)
/// order. This function is called by [`process_queue`], which ensures order. .expect("inability to look up tip is unrecoverable")
/// It is intentionally not exposed as part of the public API of the .map(|(_, hash)| hash)
/// [`FinalizedState`]. // if the state is empty, return the genesis previous block hash
fn commit_finalized(&mut self, queued_block: QueuedBlock) { .unwrap_or(block::Hash([0; 32]))
let QueuedBlock { block, rsp_tx } = queued_block; }
/// Returns the height of the current finalized tip block.
pub fn finalized_tip_height(&self) -> Option<block::Height> {
read_tip(&self.hash_by_height)
.expect("inability to look up tip is unrecoverable")
.map(|(height, _)| height)
}
/// Immediately commit `block` to the finalized state.
pub fn commit_finalized_direct(&mut self, block: Arc<Block>) -> Result<block::Hash, BoxError> {
use sled::Transactional;
let height = block let height = block
.coinbase_height() .coinbase_height()
@ -96,8 +98,9 @@ impl FinalizedState {
let height_bytes = height.0.to_be_bytes(); let height_bytes = height.0.to_be_bytes();
let hash = block.hash(); let hash = block.hash();
use sled::Transactional; trace!(?height, "Finalized block");
let transaction_result = (
(
&self.hash_by_height, &self.hash_by_height,
&self.height_by_hash, &self.height_by_hash,
&self.block_by_height, &self.block_by_height,
@ -116,10 +119,21 @@ impl FinalizedState {
block_by_height.insert(&height_bytes, block_bytes)?; block_by_height.insert(&height_bytes, block_bytes)?;
// for some reason type inference fails here // for some reason type inference fails here
Ok::<_, sled::transaction::ConflictableTransactionError>(()) Ok::<_, sled::transaction::ConflictableTransactionError>(hash)
}); })
.map_err(Into::into)
}
let _ = rsp_tx.send(transaction_result.map(|_| hash).map_err(Into::into)); /// Commit a finalized block to the state.
///
/// It's the caller's responsibility to ensure that blocks are committed in
/// order. This function is called by [`queue`], which ensures order.
/// It is intentionally not exposed as part of the public API of the
/// [`FinalizedState`].
fn commit_finalized(&mut self, queued_block: QueuedBlock) {
let QueuedBlock { block, rsp_tx } = queued_block;
let result = self.commit_finalized_direct(block);
let _ = rsp_tx.send(result);
} }
// TODO: this impl works only during checkpointing, it needs to be rewritten // TODO: this impl works only during checkpointing, it needs to be rewritten

View File

@ -612,7 +612,7 @@ where
tracing::trace!(?hash, "requested block"); tracing::trace!(?hash, "requested block");
// This span is used to help diagnose sync warnings // This span is used to help diagnose sync warnings
let span = tracing::warn_span!("block_fetch_verify", ?hash); let span = tracing::warn_span!("block_fetch_verify", %hash);
let mut verifier = self.verifier.clone(); let mut verifier = self.verifier.clone();
let task = tokio::spawn( let task = tokio::spawn(
async move { async move {