3. change(state): Move the finalized queue to the StateService (#5152)

* Move the finalized block queue into the StateService

* Move the queued_blocks module to the state service

* Move QueuedFinalized into queued_blocks

* Move the queued_blocks tests into their own module

* Make the FinalizedState cloneable
This commit is contained in:
teor 2022-09-16 23:53:40 +10:00 committed by GitHub
parent 20d80adfba
commit bfdb29b757
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 329 additions and 266 deletions

View File

@ -15,6 +15,7 @@
//! chain tip changes. //! chain tip changes.
use std::{ use std::{
collections::HashMap,
convert, convert,
future::Future, future::Future,
pin::Pin, pin::Pin,
@ -41,8 +42,9 @@ use crate::{
service::{ service::{
chain_tip::{ChainTipBlock, ChainTipChange, ChainTipSender, LatestChainTip}, chain_tip::{ChainTipBlock, ChainTipChange, ChainTipSender, LatestChainTip},
finalized_state::{FinalizedState, ZebraDb}, finalized_state::{FinalizedState, ZebraDb},
non_finalized_state::{NonFinalizedState, QueuedBlocks}, non_finalized_state::NonFinalizedState,
pending_utxos::PendingUtxos, pending_utxos::PendingUtxos,
queued_blocks::QueuedBlocks,
watch_receiver::WatchReceiver, watch_receiver::WatchReceiver,
}, },
BoxError, CloneError, CommitBlockError, Config, FinalizedBlock, PreparedBlock, ReadRequest, BoxError, CloneError, CommitBlockError, Config, FinalizedBlock, PreparedBlock, ReadRequest,
@ -58,6 +60,7 @@ pub(crate) mod check;
mod finalized_state; mod finalized_state;
mod non_finalized_state; mod non_finalized_state;
mod pending_utxos; mod pending_utxos;
mod queued_blocks;
pub(crate) mod read; pub(crate) mod read;
#[cfg(any(test, feature = "proptest-impl"))] #[cfg(any(test, feature = "proptest-impl"))]
@ -68,10 +71,7 @@ mod tests;
pub use finalized_state::{OutputIndex, OutputLocation, TransactionLocation}; pub use finalized_state::{OutputIndex, OutputLocation, TransactionLocation};
pub type QueuedFinalized = ( use self::queued_blocks::QueuedFinalized;
FinalizedBlock,
oneshot::Sender<Result<block::Hash, BoxError>>,
);
/// A read-write service for Zebra's cached blockchain state. /// A read-write service for Zebra's cached blockchain state.
/// ///
@ -98,19 +98,30 @@ pub(crate) struct StateService {
/// The configured Zcash network. /// The configured Zcash network.
network: Network, network: Network,
// Exclusively Writeable State
//
/// The finalized chain state, including its on-disk database.
pub(crate) disk: FinalizedState,
/// The non-finalized chain state, including its in-memory chain forks.
mem: NonFinalizedState,
// Queued Blocks // Queued Blocks
// //
/// Blocks for the [`NonFinalizedState`], which are awaiting their parent blocks /// Queued blocks for the [`NonFinalizedState`] that arrived out of order.
/// before they can do contextual verification. /// These blocks are awaiting their parent blocks before they can do contextual verification.
queued_blocks: QueuedBlocks, queued_non_finalized_blocks: QueuedBlocks,
/// Queued blocks for the [`FinalizedState`] that arrived out of order.
/// These blocks are awaiting their parent blocks before they can do contextual verification.
///
/// Indexed by their parent block hash.
queued_finalized_blocks: HashMap<block::Hash, QueuedFinalized>,
// Exclusively Writeable State
//
/// The non-finalized chain state, including its in-memory chain forks.
//
// TODO: get rid of this struct member, and just let the block write task own the NonFinalizedState.
mem: NonFinalizedState,
/// The finalized chain state, including its on-disk database.
//
// TODO: get rid of this struct member, and just let the ReadStateService
// and block write task share ownership of the database.
pub(crate) disk: FinalizedState,
// Pending UTXO Request Tracking // Pending UTXO Request Tracking
// //
@ -133,6 +144,14 @@ pub(crate) struct StateService {
/// ///
/// TODO: move users of read [`Request`]s to [`ReadStateService`], and remove `read_service`. /// TODO: move users of read [`Request`]s to [`ReadStateService`], and remove `read_service`.
read_service: ReadStateService, read_service: ReadStateService,
// Metrics
//
/// A metric tracking the maximum height that's currently in `queued_finalized_blocks`
///
/// Set to `f64::NAN` if `queued_finalized_blocks` is empty, because grafana shows NaNs
/// as a break in the graph.
max_queued_height: f64,
} }
/// A read-only service for accessing Zebra's cached blockchain state. /// A read-only service for accessing Zebra's cached blockchain state.
@ -155,6 +174,12 @@ pub struct ReadStateService {
// Shared Concurrently Readable State // Shared Concurrently Readable State
// //
/// A watch channel for a recent [`NonFinalizedState`].
///
/// This state is only updated between requests,
/// so it might include some block data that is also on `disk`.
non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
/// The shared inner on-disk database for the finalized state. /// The shared inner on-disk database for the finalized state.
/// ///
/// RocksDB allows reads and writes via a shared reference, /// RocksDB allows reads and writes via a shared reference,
@ -163,12 +188,6 @@ pub struct ReadStateService {
/// This chain is updated concurrently with requests, /// This chain is updated concurrently with requests,
/// so it might include some block data that is also in `best_mem`. /// so it might include some block data that is also in `best_mem`.
db: ZebraDb, db: ZebraDb,
/// A watch channel for a recent [`NonFinalizedState`].
///
/// This state is only updated between requests,
/// so it might include some block data that is also on `disk`.
non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
} }
impl StateService { impl StateService {
@ -182,6 +201,7 @@ impl StateService {
network: Network, network: Network,
) -> (Self, ReadStateService, LatestChainTip, ChainTipChange) { ) -> (Self, ReadStateService, LatestChainTip, ChainTipChange) {
let timer = CodeTimer::start(); let timer = CodeTimer::start();
let disk = FinalizedState::new(&config, network); let disk = FinalizedState::new(&config, network);
timer.finish(module_path!(), line!(), "opening finalized state database"); timer.finish(module_path!(), line!(), "opening finalized state database");
@ -201,19 +221,21 @@ impl StateService {
let (read_service, non_finalized_state_sender) = ReadStateService::new(&disk); let (read_service, non_finalized_state_sender) = ReadStateService::new(&disk);
let queued_blocks = QueuedBlocks::default(); let queued_non_finalized_blocks = QueuedBlocks::default();
let pending_utxos = PendingUtxos::default(); let pending_utxos = PendingUtxos::default();
let state = Self { let state = Self {
network, network,
disk, queued_non_finalized_blocks,
queued_finalized_blocks: HashMap::new(),
mem, mem,
queued_blocks, disk,
pending_utxos, pending_utxos,
last_prune: Instant::now(), last_prune: Instant::now(),
chain_tip_sender, chain_tip_sender,
non_finalized_state_sender, non_finalized_state_sender,
read_service: read_service.clone(), read_service: read_service.clone(),
max_queued_height: f64::NAN,
}; };
timer.finish(module_path!(), line!(), "initializing state service"); timer.finish(module_path!(), line!(), "initializing state service");
@ -262,8 +284,7 @@ impl StateService {
// - run the set_finalized_tip() in this function in the state block commit task // - run the set_finalized_tip() in this function in the state block commit task
// - move all that code to the inner service // - move all that code to the inner service
let tip_block = self let tip_block = self
.disk .drain_queue_and_commit_finalized((finalized, rsp_tx))
.queue_and_commit_finalized((finalized, rsp_tx))
.map(ChainTipBlock::from); .map(ChainTipBlock::from);
self.chain_tip_sender.set_finalized_tip(tip_block); self.chain_tip_sender.set_finalized_tip(tip_block);
@ -271,6 +292,56 @@ impl StateService {
rsp_rx rsp_rx
} }
/// Queue a finalized block to be committed to the state.
///
/// After queueing a finalized block, this method checks whether the newly
/// queued block (and any of its descendants) can be committed to the state.
///
/// Returns the highest finalized tip block committed from the queue,
/// or `None` if no blocks were committed in this call.
/// (Use `tip_block` to get the finalized tip, regardless of when it was committed.)
pub fn drain_queue_and_commit_finalized(
&mut self,
queued: QueuedFinalized,
) -> Option<FinalizedBlock> {
let mut highest_queue_commit = None;
let prev_hash = queued.0.block.header.previous_block_hash;
let height = queued.0.height;
self.queued_finalized_blocks.insert(prev_hash, queued);
while let Some(queued_block) = self
.queued_finalized_blocks
.remove(&self.disk.db().finalized_tip_hash())
{
if let Ok(finalized) = self.disk.commit_finalized(queued_block) {
highest_queue_commit = Some(finalized);
} else {
// the last block in the queue failed, so we can't commit the next block
break;
}
}
if self.queued_finalized_blocks.is_empty() {
self.max_queued_height = f64::NAN;
} else if self.max_queued_height.is_nan() || self.max_queued_height < height.0 as f64 {
// if there are still blocks in the queue, then either:
// - the new block was lower than the old maximum, and there was a gap before it,
// so the maximum is still the same (and we skip this code), or
// - the new block is higher than the old maximum, and there is at least one gap
// between the finalized tip and the new maximum
self.max_queued_height = height.0 as f64;
}
metrics::gauge!("state.checkpoint.queued.max.height", self.max_queued_height);
metrics::gauge!(
"state.checkpoint.queued.block.count",
self.queued_finalized_blocks.len() as f64,
);
highest_queue_commit
}
/// Queue a non finalized block for verification and check if any queued /// Queue a non finalized block for verification and check if any queued
/// blocks are ready to be verified and committed to the state. /// blocks are ready to be verified and committed to the state.
/// ///
@ -297,7 +368,9 @@ impl StateService {
// Request::CommitBlock contract: a request to commit a block which has // Request::CommitBlock contract: a request to commit a block which has
// been queued but not yet committed to the state fails the older // been queued but not yet committed to the state fails the older
// request and replaces it with the newer request. // request and replaces it with the newer request.
let rsp_rx = if let Some((_, old_rsp_tx)) = self.queued_blocks.get_mut(&prepared.hash) { let rsp_rx = if let Some((_, old_rsp_tx)) =
self.queued_non_finalized_blocks.get_mut(&prepared.hash)
{
tracing::debug!("replacing older queued request with new request"); tracing::debug!("replacing older queued request with new request");
let (mut rsp_tx, rsp_rx) = oneshot::channel(); let (mut rsp_tx, rsp_rx) = oneshot::channel();
std::mem::swap(old_rsp_tx, &mut rsp_tx); std::mem::swap(old_rsp_tx, &mut rsp_tx);
@ -305,7 +378,7 @@ impl StateService {
rsp_rx rsp_rx
} else { } else {
let (rsp_tx, rsp_rx) = oneshot::channel(); let (rsp_tx, rsp_rx) = oneshot::channel();
self.queued_blocks.queue((prepared, rsp_tx)); self.queued_non_finalized_blocks.queue((prepared, rsp_tx));
rsp_rx rsp_rx
}; };
@ -337,7 +410,8 @@ impl StateService {
let finalized_tip_height = self.disk.db().finalized_tip_height().expect( let finalized_tip_height = self.disk.db().finalized_tip_height().expect(
"Finalized state must have at least one block before committing non-finalized state", "Finalized state must have at least one block before committing non-finalized state",
); );
self.queued_blocks.prune_by_height(finalized_tip_height); self.queued_non_finalized_blocks
.prune_by_height(finalized_tip_height);
let tip_block_height = self.update_latest_chain_channels(); let tip_block_height = self.update_latest_chain_channels();
@ -415,7 +489,9 @@ impl StateService {
vec![(new_parent, Ok(()))]; vec![(new_parent, Ok(()))];
while let Some((parent_hash, parent_result)) = new_parents.pop() { while let Some((parent_hash, parent_result)) = new_parents.pop() {
let queued_children = self.queued_blocks.dequeue_children(parent_hash); let queued_children = self
.queued_non_finalized_blocks
.dequeue_children(parent_hash);
for (child, rsp_tx) in queued_children { for (child, rsp_tx) in queued_children {
let child_hash = child.hash; let child_hash = child.hash;
@ -572,7 +648,7 @@ impl Service<Request> for StateService {
#[instrument(name = "state", skip(self, req))] #[instrument(name = "state", skip(self, req))]
fn call(&mut self, req: Request) -> Self::Future { fn call(&mut self, req: Request) -> Self::Future {
match req { match req {
// Uses queued_blocks and pending_utxos in the StateService // Uses queued_non_finalized_blocks and pending_utxos in the StateService
// Accesses shared writeable state in the StateService, NonFinalizedState, and ZebraDb. // Accesses shared writeable state in the StateService, NonFinalizedState, and ZebraDb.
Request::CommitBlock(prepared) => { Request::CommitBlock(prepared) => {
metrics::counter!( metrics::counter!(
@ -624,8 +700,8 @@ impl Service<Request> for StateService {
.boxed() .boxed()
} }
// Uses queued_by_prev_hash in the FinalizedState and pending_utxos in the StateService. // Uses queued_finalized_blocks and pending_utxos in the StateService.
// Accesses shared writeable state in the StateService, FinalizedState, and ZebraDb. // Accesses shared writeable state in the StateService and ZebraDb.
Request::CommitFinalizedBlock(finalized) => { Request::CommitFinalizedBlock(finalized) => {
metrics::counter!( metrics::counter!(
"state.requests", "state.requests",
@ -679,7 +755,7 @@ impl Service<Request> for StateService {
.boxed() .boxed()
} }
// Uses pending_utxos and queued_blocks in the StateService. // Uses pending_utxos and queued_non_finalized_blocks in the StateService.
// If the UTXO isn't in the queued blocks, runs concurrently using the ReadStateService. // If the UTXO isn't in the queued blocks, runs concurrently using the ReadStateService.
Request::AwaitUtxo(outpoint) => { Request::AwaitUtxo(outpoint) => {
metrics::counter!( metrics::counter!(
@ -700,7 +776,7 @@ impl Service<Request> for StateService {
// Check the non-finalized block queue outside the returned future, // Check the non-finalized block queue outside the returned future,
// so we can access mutable state fields. // so we can access mutable state fields.
if let Some(utxo) = self.queued_blocks.utxo(&outpoint) { if let Some(utxo) = self.queued_non_finalized_blocks.utxo(&outpoint) {
self.pending_utxos.respond(&outpoint, utxo); self.pending_utxos.respond(&outpoint, utxo);
// We're finished, the returned future gets the UTXO from the respond() channel. // We're finished, the returned future gets the UTXO from the respond() channel.
@ -709,7 +785,7 @@ impl Service<Request> for StateService {
return response_fut; return response_fut;
} }
// We ignore any UTXOs in FinalizedState.queued_by_prev_hash, // We ignore any UTXOs in FinalizedState.queued_finalized_blocks,
// because it is only used during checkpoint verification. // because it is only used during checkpoint verification.
// //
// This creates a rare race condition, but it doesn't seem to happen much in practice. // This creates a rare race condition, but it doesn't seem to happen much in practice.

View File

@ -16,7 +16,6 @@
//! be incremented each time the database format (column, serialization, etc) changes. //! be incremented each time the database format (column, serialization, etc) changes.
use std::{ use std::{
collections::HashMap,
io::{stderr, stdout, Write}, io::{stderr, stdout, Write},
path::Path, path::Path,
sync::Arc, sync::Arc,
@ -45,39 +44,47 @@ pub use disk_format::{OutputIndex, OutputLocation, TransactionLocation};
pub(super) use zebra_db::ZebraDb; pub(super) use zebra_db::ZebraDb;
/// The finalized part of the chain state, stored in the db. /// The finalized part of the chain state, stored in the db.
#[derive(Debug)]
pub struct FinalizedState {
/// The underlying database.
db: ZebraDb,
/// Queued blocks that arrived out of order, indexed by their parent block hash.
queued_by_prev_hash: HashMap<block::Hash, QueuedFinalized>,
/// A metric tracking the maximum height that's currently in `queued_by_prev_hash`
/// ///
/// Set to `f64::NAN` if `queued_by_prev_hash` is empty, because grafana shows NaNs /// `rocksdb` allows concurrent writes through a shared reference,
/// as a break in the graph. /// so finalized state instances are cloneable. When the final clone is dropped,
max_queued_height: f64, /// the database is closed.
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct FinalizedState {
// Configuration
//
// This configuration cannot be modified after the database is initialized,
// because some clones would have different values.
//
/// The configured network.
network: Network,
/// The configured stop height. /// The configured stop height.
/// ///
/// Commit blocks to the finalized state up to this height, then exit Zebra. /// Commit blocks to the finalized state up to this height, then exit Zebra.
debug_stop_at_height: Option<block::Height>, debug_stop_at_height: Option<block::Height>,
/// The configured network. // Owned State
network: Network, //
// Everything contained in this state must be shared by all clones, or read-only.
//
/// The underlying database.
///
/// `rocksdb` allows reads and writes via a shared reference,
/// so this database object can be freely cloned.
/// The last instance that is dropped will close the underlying database.
db: ZebraDb,
} }
impl FinalizedState { impl FinalizedState {
/// Returns an on-disk database instance for `config` and `network`.
/// If there is no existing database, creates a new database on disk.
pub fn new(config: &Config, network: Network) -> Self { pub fn new(config: &Config, network: Network) -> Self {
let db = ZebraDb::new(config, network); let db = ZebraDb::new(config, network);
let new_state = Self { let new_state = Self {
queued_by_prev_hash: HashMap::new(),
max_queued_height: f64::NAN,
db,
debug_stop_at_height: config.debug_stop_at_height.map(block::Height),
network, network,
debug_stop_at_height: config.debug_stop_at_height.map(block::Height),
db,
}; };
// TODO: move debug_stop_at_height into a task in the start command (#3442) // TODO: move debug_stop_at_height into a task in the start command (#3442)
@ -137,63 +144,14 @@ impl FinalizedState {
&self.db &self.db
} }
/// Queue a finalized block to be committed to the state.
///
/// After queueing a finalized block, this method checks whether the newly
/// queued block (and any of its descendants) can be committed to the state.
///
/// Returns the highest finalized tip block committed from the queue,
/// or `None` if no blocks were committed in this call.
/// (Use `tip_block` to get the finalized tip, regardless of when it was committed.)
pub fn queue_and_commit_finalized(
&mut self,
queued: QueuedFinalized,
) -> Option<FinalizedBlock> {
let mut highest_queue_commit = None;
let prev_hash = queued.0.block.header.previous_block_hash;
let height = queued.0.height;
self.queued_by_prev_hash.insert(prev_hash, queued);
while let Some(queued_block) = self
.queued_by_prev_hash
.remove(&self.db.finalized_tip_hash())
{
if let Ok(finalized) = self.commit_finalized(queued_block) {
highest_queue_commit = Some(finalized);
} else {
// the last block in the queue failed, so we can't commit the next block
break;
}
}
if self.queued_by_prev_hash.is_empty() {
self.max_queued_height = f64::NAN;
} else if self.max_queued_height.is_nan() || self.max_queued_height < height.0 as f64 {
// if there are still blocks in the queue, then either:
// - the new block was lower than the old maximum, and there was a gap before it,
// so the maximum is still the same (and we skip this code), or
// - the new block is higher than the old maximum, and there is at least one gap
// between the finalized tip and the new maximum
self.max_queued_height = height.0 as f64;
}
metrics::gauge!("state.checkpoint.queued.max.height", self.max_queued_height);
metrics::gauge!(
"state.checkpoint.queued.block.count",
self.queued_by_prev_hash.len() as f64,
);
highest_queue_commit
}
/// Commit a finalized block to the state. /// Commit a finalized block to the state.
/// ///
/// It's the caller's responsibility to ensure that blocks are committed in /// It's the caller's responsibility to ensure that blocks are committed in
/// order. This function is called by [`Self::queue_and_commit_finalized`], /// order.
/// which ensures order. It is intentionally not exposed as part of the pub fn commit_finalized(
/// public API of the [`FinalizedState`]. &mut self,
fn commit_finalized(&mut self, queued_block: QueuedFinalized) -> Result<FinalizedBlock, ()> { queued_block: QueuedFinalized,
) -> Result<FinalizedBlock, ()> {
let (finalized, rsp_tx) = queued_block; let (finalized, rsp_tx) = queued_block;
let result = let result =
self.commit_finalized_direct(finalized.clone().into(), "CommitFinalized request"); self.commit_finalized_direct(finalized.clone().into(), "CommitFinalized request");

View File

@ -34,6 +34,10 @@ pub type DB = rocksdb::DBWithThreadMode<DBThreadMode>;
/// Wrapper struct to ensure low-level database access goes through the correct API. /// Wrapper struct to ensure low-level database access goes through the correct API.
/// ///
/// `rocksdb` allows concurrent writes through a shared reference,
/// so database instances are cloneable. When the final clone is dropped,
/// the database is closed.
///
/// # Correctness /// # Correctness
/// ///
/// Reading transactions from the database using RocksDB iterators causes hangs. /// Reading transactions from the database using RocksDB iterators causes hangs.
@ -48,6 +52,20 @@ pub type DB = rocksdb::DBWithThreadMode<DBThreadMode>;
/// (Or it might be fixed by future RocksDB upgrades.) /// (Or it might be fixed by future RocksDB upgrades.)
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct DiskDb { pub struct DiskDb {
// Configuration
//
// This configuration cannot be modified after the database is initialized,
// because some clones would have different values.
//
/// The configured temporary database setting.
///
/// If true, the database files are deleted on drop.
ephemeral: bool,
// Owned State
//
// Everything contained in this state must be shared by all clones, or read-only.
//
/// The shared inner RocksDB database. /// The shared inner RocksDB database.
/// ///
/// RocksDB allows reads and writes via a shared reference. /// RocksDB allows reads and writes via a shared reference.
@ -58,11 +76,6 @@ pub struct DiskDb {
/// In [`MultiThreaded`](rocksdb::MultiThreaded) mode, /// In [`MultiThreaded`](rocksdb::MultiThreaded) mode,
/// only [`Drop`] requires exclusive access. /// only [`Drop`] requires exclusive access.
db: Arc<DB>, db: Arc<DB>,
/// The configured temporary database setting.
///
/// If true, the database files are deleted on drop.
ephemeral: bool,
} }
/// Wrapper struct to ensure low-level database writes go through the correct API. /// Wrapper struct to ensure low-level database writes go through the correct API.
@ -434,8 +447,8 @@ impl DiskDb {
info!("Opened Zebra state cache at {}", path.display()); info!("Opened Zebra state cache at {}", path.display());
let db = DiskDb { let db = DiskDb {
db: Arc::new(db),
ephemeral: config.ephemeral, ephemeral: config.ephemeral,
db: Arc::new(db),
}; };
db.assert_default_cf_is_empty(); db.assert_default_cf_is_empty();

View File

@ -28,10 +28,17 @@ pub mod transparent;
pub mod arbitrary; pub mod arbitrary;
/// Wrapper struct to ensure high-level typed database access goes through the correct API. /// Wrapper struct to ensure high-level typed database access goes through the correct API.
#[derive(Clone, Debug)] ///
/// `rocksdb` allows concurrent writes through a shared reference,
/// so database instances are cloneable. When the final clone is dropped,
/// the database is closed.
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct ZebraDb { pub struct ZebraDb {
// Owned State
//
// Everything contained in this state must be shared by all clones, or read-only.
//
/// The inner low-level database wrapper for the RocksDB database. /// The inner low-level database wrapper for the RocksDB database.
/// This wrapper can be cloned and shared.
db: DiskDb, db: DiskDb,
} }

View File

@ -23,13 +23,10 @@ use crate::{
}; };
mod chain; mod chain;
mod queued_blocks;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
pub use queued_blocks::QueuedBlocks;
pub(crate) use chain::Chain; pub(crate) use chain::Chain;
/// The state of the chains in memory, including queued blocks. /// The state of the chains in memory, including queued blocks.

View File

@ -10,10 +10,19 @@ use tracing::instrument;
use zebra_chain::{block, transparent}; use zebra_chain::{block, transparent};
use crate::{BoxError, PreparedBlock}; use crate::{BoxError, FinalizedBlock, PreparedBlock};
#[cfg(test)]
mod tests;
/// A queued finalized block, and its corresponding [`Result`] channel.
pub type QueuedFinalized = (
FinalizedBlock,
oneshot::Sender<Result<block::Hash, BoxError>>,
);
/// A queued non-finalized block, and its corresponding [`Result`] channel. /// A queued non-finalized block, and its corresponding [`Result`] channel.
pub type QueuedBlock = ( pub type QueuedNonFinalized = (
PreparedBlock, PreparedBlock,
oneshot::Sender<Result<block::Hash, BoxError>>, oneshot::Sender<Result<block::Hash, BoxError>>,
); );
@ -22,7 +31,7 @@ pub type QueuedBlock = (
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct QueuedBlocks { pub struct QueuedBlocks {
/// Blocks awaiting their parent blocks for contextual verification. /// Blocks awaiting their parent blocks for contextual verification.
blocks: HashMap<block::Hash, QueuedBlock>, blocks: HashMap<block::Hash, QueuedNonFinalized>,
/// Hashes from `queued_blocks`, indexed by parent hash. /// Hashes from `queued_blocks`, indexed by parent hash.
by_parent: HashMap<block::Hash, HashSet<block::Hash>>, by_parent: HashMap<block::Hash, HashSet<block::Hash>>,
/// Hashes from `queued_blocks`, indexed by block height. /// Hashes from `queued_blocks`, indexed by block height.
@ -38,7 +47,7 @@ impl QueuedBlocks {
/// ///
/// - if a block with the same `block::Hash` has already been queued. /// - if a block with the same `block::Hash` has already been queued.
#[instrument(skip(self), fields(height = ?new.0.height, hash = %new.0.hash))] #[instrument(skip(self), fields(height = ?new.0.height, hash = %new.0.hash))]
pub fn queue(&mut self, new: QueuedBlock) { pub fn queue(&mut self, new: QueuedNonFinalized) {
let new_hash = new.0.hash; let new_hash = new.0.hash;
let new_height = new.0.height; let new_height = new.0.height;
let parent_hash = new.0.block.header.previous_block_hash; let parent_hash = new.0.block.header.previous_block_hash;
@ -71,7 +80,7 @@ impl QueuedBlocks {
/// Dequeue and return all blocks that were waiting for the arrival of /// Dequeue and return all blocks that were waiting for the arrival of
/// `parent`. /// `parent`.
#[instrument(skip(self), fields(%parent_hash))] #[instrument(skip(self), fields(%parent_hash))]
pub fn dequeue_children(&mut self, parent_hash: block::Hash) -> Vec<QueuedBlock> { pub fn dequeue_children(&mut self, parent_hash: block::Hash) -> Vec<QueuedNonFinalized> {
let queued_children = self let queued_children = self
.by_parent .by_parent
.remove(&parent_hash) .remove(&parent_hash)
@ -161,7 +170,7 @@ impl QueuedBlocks {
} }
/// Return the queued block if it has already been registered /// Return the queued block if it has already been registered
pub fn get_mut(&mut self, hash: &block::Hash) -> Option<&mut QueuedBlock> { pub fn get_mut(&mut self, hash: &block::Hash) -> Option<&mut QueuedNonFinalized> {
self.blocks.get_mut(hash) self.blocks.get_mut(hash)
} }
@ -182,142 +191,3 @@ impl QueuedBlocks {
self.known_utxos.get(outpoint).cloned() self.known_utxos.get(outpoint).cloned()
} }
} }
// TODO: move these tests into their own `tests/vectors.rs` module
#[cfg(test)]
mod tests {
use std::sync::Arc;
use tokio::sync::oneshot;
use zebra_chain::{block::Block, serialization::ZcashDeserializeInto};
use zebra_test::prelude::*;
use crate::{arbitrary::Prepare, tests::FakeChainHelper};
use super::*;
// Quick helper trait for making queued blocks with throw away channels
trait IntoQueued {
fn into_queued(self) -> QueuedBlock;
}
impl IntoQueued for Arc<Block> {
fn into_queued(self) -> QueuedBlock {
let (rsp_tx, _) = oneshot::channel();
(self.prepare(), rsp_tx)
}
}
#[test]
fn dequeue_gives_right_children() -> Result<()> {
let _init_guard = zebra_test::init();
let block1: Arc<Block> =
zebra_test::vectors::BLOCK_MAINNET_419200_BYTES.zcash_deserialize_into()?;
let child1: Arc<Block> =
zebra_test::vectors::BLOCK_MAINNET_419201_BYTES.zcash_deserialize_into()?;
let child2 = block1.make_fake_child();
let parent = block1.header.previous_block_hash;
let mut queue = QueuedBlocks::default();
// Empty to start
assert_eq!(0, queue.blocks.len());
assert_eq!(0, queue.by_parent.len());
assert_eq!(0, queue.by_height.len());
assert_eq!(0, queue.known_utxos.len());
// Inserting the first block gives us 1 in each table, and some UTXOs
queue.queue(block1.clone().into_queued());
assert_eq!(1, queue.blocks.len());
assert_eq!(1, queue.by_parent.len());
assert_eq!(1, queue.by_height.len());
assert_eq!(2, queue.known_utxos.len());
// The second gives us another in each table because its a child of the first,
// and a lot of UTXOs
queue.queue(child1.clone().into_queued());
assert_eq!(2, queue.blocks.len());
assert_eq!(2, queue.by_parent.len());
assert_eq!(2, queue.by_height.len());
assert_eq!(632, queue.known_utxos.len());
// The 3rd only increments blocks, because it is also a child of the
// first block, so for the second and third tables it gets added to the
// existing HashSet value
queue.queue(child2.clone().into_queued());
assert_eq!(3, queue.blocks.len());
assert_eq!(2, queue.by_parent.len());
assert_eq!(2, queue.by_height.len());
assert_eq!(634, queue.known_utxos.len());
// Dequeueing the first block removes 1 block from each list
let children = queue.dequeue_children(parent);
assert_eq!(1, children.len());
assert_eq!(block1, children[0].0.block);
assert_eq!(2, queue.blocks.len());
assert_eq!(1, queue.by_parent.len());
assert_eq!(1, queue.by_height.len());
assert_eq!(632, queue.known_utxos.len());
// Dequeueing the children of the first block removes both of the other
// blocks, and empties all lists
let parent = children[0].0.block.hash();
let children = queue.dequeue_children(parent);
assert_eq!(2, children.len());
assert!(children
.iter()
.any(|(block, _)| block.hash == child1.hash()));
assert!(children
.iter()
.any(|(block, _)| block.hash == child2.hash()));
assert_eq!(0, queue.blocks.len());
assert_eq!(0, queue.by_parent.len());
assert_eq!(0, queue.by_height.len());
assert_eq!(0, queue.known_utxos.len());
Ok(())
}
#[test]
fn prune_removes_right_children() -> Result<()> {
let _init_guard = zebra_test::init();
let block1: Arc<Block> =
zebra_test::vectors::BLOCK_MAINNET_419200_BYTES.zcash_deserialize_into()?;
let child1: Arc<Block> =
zebra_test::vectors::BLOCK_MAINNET_419201_BYTES.zcash_deserialize_into()?;
let child2 = block1.make_fake_child();
let mut queue = QueuedBlocks::default();
queue.queue(block1.clone().into_queued());
queue.queue(child1.clone().into_queued());
queue.queue(child2.clone().into_queued());
assert_eq!(3, queue.blocks.len());
assert_eq!(2, queue.by_parent.len());
assert_eq!(2, queue.by_height.len());
assert_eq!(634, queue.known_utxos.len());
// Pruning the first height removes only block1
queue.prune_by_height(block1.coinbase_height().unwrap());
assert_eq!(2, queue.blocks.len());
assert_eq!(1, queue.by_parent.len());
assert_eq!(1, queue.by_height.len());
assert!(queue.get_mut(&block1.hash()).is_none());
assert!(queue.get_mut(&child1.hash()).is_some());
assert!(queue.get_mut(&child2.hash()).is_some());
assert_eq!(632, queue.known_utxos.len());
// Pruning the children of the first block removes both of the other
// blocks, and empties all lists
queue.prune_by_height(child1.coinbase_height().unwrap());
assert_eq!(0, queue.blocks.len());
assert_eq!(0, queue.by_parent.len());
assert_eq!(0, queue.by_height.len());
assert!(queue.get_mut(&child1.hash()).is_none());
assert!(queue.get_mut(&child2.hash()).is_none());
assert_eq!(0, queue.known_utxos.len());
Ok(())
}
}

View File

@ -0,0 +1,3 @@
//! Tests for block queues.
mod vectors;

View File

@ -0,0 +1,139 @@
//! Fixed test vectors for block queues.
use std::sync::Arc;
use tokio::sync::oneshot;
use zebra_chain::{block::Block, serialization::ZcashDeserializeInto};
use zebra_test::prelude::*;
use crate::{
arbitrary::Prepare,
service::queued_blocks::{QueuedBlocks, QueuedNonFinalized},
tests::FakeChainHelper,
};
// Quick helper trait for making queued blocks with throw away channels
trait IntoQueued {
fn into_queued(self) -> QueuedNonFinalized;
}
impl IntoQueued for Arc<Block> {
fn into_queued(self) -> QueuedNonFinalized {
let (rsp_tx, _) = oneshot::channel();
(self.prepare(), rsp_tx)
}
}
#[test]
fn dequeue_gives_right_children() -> Result<()> {
let _init_guard = zebra_test::init();
let block1: Arc<Block> =
zebra_test::vectors::BLOCK_MAINNET_419200_BYTES.zcash_deserialize_into()?;
let child1: Arc<Block> =
zebra_test::vectors::BLOCK_MAINNET_419201_BYTES.zcash_deserialize_into()?;
let child2 = block1.make_fake_child();
let parent = block1.header.previous_block_hash;
let mut queue = QueuedBlocks::default();
// Empty to start
assert_eq!(0, queue.blocks.len());
assert_eq!(0, queue.by_parent.len());
assert_eq!(0, queue.by_height.len());
assert_eq!(0, queue.known_utxos.len());
// Inserting the first block gives us 1 in each table, and some UTXOs
queue.queue(block1.clone().into_queued());
assert_eq!(1, queue.blocks.len());
assert_eq!(1, queue.by_parent.len());
assert_eq!(1, queue.by_height.len());
assert_eq!(2, queue.known_utxos.len());
// The second gives us another in each table because its a child of the first,
// and a lot of UTXOs
queue.queue(child1.clone().into_queued());
assert_eq!(2, queue.blocks.len());
assert_eq!(2, queue.by_parent.len());
assert_eq!(2, queue.by_height.len());
assert_eq!(632, queue.known_utxos.len());
// The 3rd only increments blocks, because it is also a child of the
// first block, so for the second and third tables it gets added to the
// existing HashSet value
queue.queue(child2.clone().into_queued());
assert_eq!(3, queue.blocks.len());
assert_eq!(2, queue.by_parent.len());
assert_eq!(2, queue.by_height.len());
assert_eq!(634, queue.known_utxos.len());
// Dequeueing the first block removes 1 block from each list
let children = queue.dequeue_children(parent);
assert_eq!(1, children.len());
assert_eq!(block1, children[0].0.block);
assert_eq!(2, queue.blocks.len());
assert_eq!(1, queue.by_parent.len());
assert_eq!(1, queue.by_height.len());
assert_eq!(632, queue.known_utxos.len());
// Dequeueing the children of the first block removes both of the other
// blocks, and empties all lists
let parent = children[0].0.block.hash();
let children = queue.dequeue_children(parent);
assert_eq!(2, children.len());
assert!(children
.iter()
.any(|(block, _)| block.hash == child1.hash()));
assert!(children
.iter()
.any(|(block, _)| block.hash == child2.hash()));
assert_eq!(0, queue.blocks.len());
assert_eq!(0, queue.by_parent.len());
assert_eq!(0, queue.by_height.len());
assert_eq!(0, queue.known_utxos.len());
Ok(())
}
#[test]
fn prune_removes_right_children() -> Result<()> {
let _init_guard = zebra_test::init();
let block1: Arc<Block> =
zebra_test::vectors::BLOCK_MAINNET_419200_BYTES.zcash_deserialize_into()?;
let child1: Arc<Block> =
zebra_test::vectors::BLOCK_MAINNET_419201_BYTES.zcash_deserialize_into()?;
let child2 = block1.make_fake_child();
let mut queue = QueuedBlocks::default();
queue.queue(block1.clone().into_queued());
queue.queue(child1.clone().into_queued());
queue.queue(child2.clone().into_queued());
assert_eq!(3, queue.blocks.len());
assert_eq!(2, queue.by_parent.len());
assert_eq!(2, queue.by_height.len());
assert_eq!(634, queue.known_utxos.len());
// Pruning the first height removes only block1
queue.prune_by_height(block1.coinbase_height().unwrap());
assert_eq!(2, queue.blocks.len());
assert_eq!(1, queue.by_parent.len());
assert_eq!(1, queue.by_height.len());
assert!(queue.get_mut(&block1.hash()).is_none());
assert!(queue.get_mut(&child1.hash()).is_some());
assert!(queue.get_mut(&child2.hash()).is_some());
assert_eq!(632, queue.known_utxos.len());
// Pruning the children of the first block removes both of the other
// blocks, and empties all lists
queue.prune_by_height(child1.coinbase_height().unwrap());
assert_eq!(0, queue.blocks.len());
assert_eq!(0, queue.by_parent.len());
assert_eq!(0, queue.by_height.len());
assert!(queue.get_mut(&child1.hash()).is_none());
assert!(queue.get_mut(&child2.hash()).is_none());
assert_eq!(0, queue.known_utxos.len());
Ok(())
}