rename(state): do additional renaming for clarification purposes (#6967)

* do renames by script

```
git ls-tree --full-tree -r --name-only HEAD | \
grep -v 'book' | grep -v 'CHANGELOG.md' | \
xargs sed -i 's/CommitFinalized/CommitCheckpointVerified/g'

git ls-tree --full-tree -r --name-only HEAD | \
grep -v 'book' | grep -v 'CHANGELOG.md' | \
xargs sed -i 's/commit_finalized_block/commit_checkpoint_verified/g'

git ls-tree --full-tree -r --name-only HEAD | \
grep -v 'book' | grep -v 'CHANGELOG.md' | \
xargs sed -i 's/drain_queue_and_commit_finalized/drain_finalized_queue_and_commit/g'

git ls-tree --full-tree -r --name-only HEAD | \
grep -v 'book' | grep -v 'CHANGELOG.md' | \
xargs sed -i 's/queue_and_commit_finalized/queue_and_commit_to_finalized_state/g'

git ls-tree --full-tree -r --name-only HEAD | \
grep -v 'book' | grep -v 'CHANGELOG.md' | \
xargs sed -i 's/queued_finalized_blocks/finalized_state_queued_blocks/g'

git ls-tree --full-tree -r --name-only HEAD | \
grep -v 'book' | grep -v 'CHANGELOG.md' | \
xargs sed -i 's/max_queued_finalized_height/max_finalized_queue_height/g'

git ls-tree --full-tree -r --name-only HEAD | \
grep -v 'book' | grep -v 'CHANGELOG.md' | \
xargs sed -i 's/send_finalized_block_error/send_checkpoint_verified_block_error/g'

git ls-tree --full-tree -r --name-only HEAD | \
grep -v 'book' | grep -v 'CHANGELOG.md' | \
xargs sed -i 's/QueuedFinalized/QueuedCheckpointVerified/g'

git ls-tree --full-tree -r --name-only HEAD | \
grep -v 'book' | grep -v 'CHANGELOG.md' | \
xargs sed -i 's/queue_and_commit_non_finalized/queue_and_commit_to_non_finalized_state/g'

git ls-tree --full-tree -r --name-only HEAD | \
grep -v 'book' | grep -v 'CHANGELOG.md' | \
xargs sed -i 's/queued_non_finalized_blocks/non_finalized_state_queued_blocks/g'

git ls-tree --full-tree -r --name-only HEAD | \
grep -v 'book' | grep -v 'CHANGELOG.md' | \
xargs sed -i 's/send_non_finalized_block_error/send_semantically_verified_block_error/g'

git ls-tree --full-tree -r --name-only HEAD | \
grep -v 'book' | grep -v 'CHANGELOG.md' | \
xargs sed -i 's/QueuedNonFinalized/QueuedSemanticallyVerified/g'

git ls-tree --full-tree -r --name-only HEAD | \
grep -v 'book' | grep -v 'CHANGELOG.md' | \
xargs sed -i 's/last_sent_finalized_block_hash/finalized_block_write_last_sent_hash/g'

git ls-tree --full-tree -r --name-only HEAD | \
grep -v 'book' | grep -v 'CHANGELOG.md' | \
xargs sed -i 's/sent_non_finalized_block_hashes/non_finalized_block_write_sent_hashes/g'

git ls-tree --full-tree -r --name-only HEAD | \
grep -v 'book' | grep -v 'CHANGELOG.md' | \
xargs sed -i 's/invalid_block_reset_receiver/invalid_block_write_reset_receiver/g'

cargo fmt --all
```

* add missing log renames by script

```
git ls-tree --full-tree -r --name-only HEAD | \
grep -v 'book' | grep -v 'CHANGELOG.md' | \
xargs sed -i 's/queued finalized block/finalized state queue block/g'

git ls-tree --full-tree -r --name-only HEAD | \
grep -v 'book' | grep -v 'CHANGELOG.md' | \
xargs sed -i 's/queued non-finalized block/non-finalized state queue block/g'

cargo fmt --all
```
This commit is contained in:
Alfredo Garcia 2023-06-15 02:12:45 -03:00 committed by GitHub
parent e748d9a833
commit 8a7c871480
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 120 additions and 106 deletions

View File

@ -970,7 +970,7 @@ pub enum VerifyCheckpointError {
#[error("checkpoint verifier was dropped")] #[error("checkpoint verifier was dropped")]
Dropped, Dropped,
#[error(transparent)] #[error(transparent)]
CommitFinalized(BoxError), CommitCheckpointVerified(BoxError),
#[error(transparent)] #[error(transparent)]
Tip(BoxError), Tip(BoxError),
#[error(transparent)] #[error(transparent)]
@ -1084,19 +1084,19 @@ where
// we don't reject the entire checkpoint. // we don't reject the entire checkpoint.
// Instead, we reset the verifier to the successfully committed state tip. // Instead, we reset the verifier to the successfully committed state tip.
let state_service = self.state_service.clone(); let state_service = self.state_service.clone();
let commit_finalized_block = tokio::spawn(async move { let commit_checkpoint_verified = tokio::spawn(async move {
let hash = req_block let hash = req_block
.rx .rx
.await .await
.map_err(Into::into) .map_err(Into::into)
.map_err(VerifyCheckpointError::CommitFinalized) .map_err(VerifyCheckpointError::CommitCheckpointVerified)
.expect("CheckpointVerifier does not leave dangling receivers")?; .expect("CheckpointVerifier does not leave dangling receivers")?;
// We use a `ServiceExt::oneshot`, so that every state service // We use a `ServiceExt::oneshot`, so that every state service
// `poll_ready` has a corresponding `call`. See #1593. // `poll_ready` has a corresponding `call`. See #1593.
match state_service match state_service
.oneshot(zs::Request::CommitCheckpointVerifiedBlock(req_block.block)) .oneshot(zs::Request::CommitCheckpointVerifiedBlock(req_block.block))
.map_err(VerifyCheckpointError::CommitFinalized) .map_err(VerifyCheckpointError::CommitCheckpointVerified)
.await? .await?
{ {
zs::Response::Committed(committed_hash) => { zs::Response::Committed(committed_hash) => {
@ -1110,10 +1110,10 @@ where
let state_service = self.state_service.clone(); let state_service = self.state_service.clone();
let reset_sender = self.reset_sender.clone(); let reset_sender = self.reset_sender.clone();
async move { async move {
let result = commit_finalized_block.await; let result = commit_checkpoint_verified.await;
// Avoid a panic on shutdown // Avoid a panic on shutdown
// //
// When `zebrad` is terminated using Ctrl-C, the `commit_finalized_block` task // When `zebrad` is terminated using Ctrl-C, the `commit_checkpoint_verified` task
// can return a `JoinError::Cancelled`. We expect task cancellation on shutdown, // can return a `JoinError::Cancelled`. We expect task cancellation on shutdown,
// so we don't need to panic here. The persistent state is correct even when the // so we don't need to panic here. The persistent state is correct even when the
// task is cancelled, because block data is committed inside transactions, in // task is cancelled, because block data is committed inside transactions, in
@ -1121,7 +1121,7 @@ where
let result = if zebra_chain::shutdown::is_shutting_down() { let result = if zebra_chain::shutdown::is_shutting_down() {
Err(VerifyCheckpointError::ShuttingDown) Err(VerifyCheckpointError::ShuttingDown)
} else { } else {
result.expect("commit_finalized_block should not panic") result.expect("commit_checkpoint_verified should not panic")
}; };
if result.is_err() { if result.is_err() {
// If there was an error committing the block, then this verifier // If there was an error committing the block, then this verifier

View File

@ -86,7 +86,7 @@ mod tests;
pub use finalized_state::{OutputIndex, OutputLocation, TransactionLocation}; pub use finalized_state::{OutputIndex, OutputLocation, TransactionLocation};
use self::queued_blocks::{QueuedFinalized, QueuedNonFinalized, SentHashes}; use self::queued_blocks::{QueuedCheckpointVerified, QueuedSemanticallyVerified, SentHashes};
/// A read-write service for Zebra's cached blockchain state. /// A read-write service for Zebra's cached blockchain state.
/// ///
@ -124,25 +124,26 @@ pub(crate) struct StateService {
// //
/// Queued blocks for the [`NonFinalizedState`] that arrived out of order. /// Queued blocks for the [`NonFinalizedState`] that arrived out of order.
/// These blocks are awaiting their parent blocks before they can do contextual verification. /// These blocks are awaiting their parent blocks before they can do contextual verification.
queued_non_finalized_blocks: QueuedBlocks, non_finalized_state_queued_blocks: QueuedBlocks,
/// Queued blocks for the [`FinalizedState`] that arrived out of order. /// Queued blocks for the [`FinalizedState`] that arrived out of order.
/// These blocks are awaiting their parent blocks before they can do contextual verification. /// These blocks are awaiting their parent blocks before they can do contextual verification.
/// ///
/// Indexed by their parent block hash. /// Indexed by their parent block hash.
queued_finalized_blocks: HashMap<block::Hash, QueuedFinalized>, finalized_state_queued_blocks: HashMap<block::Hash, QueuedCheckpointVerified>,
/// A channel to send blocks to the `block_write_task`, /// A channel to send blocks to the `block_write_task`,
/// so they can be written to the [`NonFinalizedState`]. /// so they can be written to the [`NonFinalizedState`].
non_finalized_block_write_sender: non_finalized_block_write_sender:
Option<tokio::sync::mpsc::UnboundedSender<QueuedNonFinalized>>, Option<tokio::sync::mpsc::UnboundedSender<QueuedSemanticallyVerified>>,
/// A channel to send blocks to the `block_write_task`, /// A channel to send blocks to the `block_write_task`,
/// so they can be written to the [`FinalizedState`]. /// so they can be written to the [`FinalizedState`].
/// ///
/// This sender is dropped after the state has finished sending all the checkpointed blocks, /// This sender is dropped after the state has finished sending all the checkpointed blocks,
/// and the lowest non-finalized block arrives. /// and the lowest non-finalized block arrives.
finalized_block_write_sender: Option<tokio::sync::mpsc::UnboundedSender<QueuedFinalized>>, finalized_block_write_sender:
Option<tokio::sync::mpsc::UnboundedSender<QueuedCheckpointVerified>>,
/// The [`block::Hash`] of the most recent block sent on /// The [`block::Hash`] of the most recent block sent on
/// `finalized_block_write_sender` or `non_finalized_block_write_sender`. /// `finalized_block_write_sender` or `non_finalized_block_write_sender`.
@ -151,25 +152,25 @@ pub(crate) struct StateService {
/// - the finalized tip, if there are stored blocks, or /// - the finalized tip, if there are stored blocks, or
/// - the genesis block's parent hash, if the database is empty. /// - the genesis block's parent hash, if the database is empty.
/// ///
/// If `invalid_block_reset_receiver` gets a reset, this is: /// If `invalid_block_write_reset_receiver` gets a reset, this is:
/// - the hash of the last valid committed block (the parent of the invalid block). /// - the hash of the last valid committed block (the parent of the invalid block).
// //
// TODO: // TODO:
// - turn this into an IndexMap containing recent non-finalized block hashes and heights // - turn this into an IndexMap containing recent non-finalized block hashes and heights
// (they are all potential tips) // (they are all potential tips)
// - remove block hashes once their heights are strictly less than the finalized tip // - remove block hashes once their heights are strictly less than the finalized tip
last_sent_finalized_block_hash: block::Hash, finalized_block_write_last_sent_hash: block::Hash,
/// A set of block hashes that have been sent to the block write task. /// A set of block hashes that have been sent to the block write task.
/// Hashes of blocks below the finalized tip height are periodically pruned. /// Hashes of blocks below the finalized tip height are periodically pruned.
sent_non_finalized_block_hashes: SentHashes, non_finalized_block_write_sent_hashes: SentHashes,
/// If an invalid block is sent on `finalized_block_write_sender` /// If an invalid block is sent on `finalized_block_write_sender`
/// or `non_finalized_block_write_sender`, /// or `non_finalized_block_write_sender`,
/// this channel gets the [`block::Hash`] of the valid tip. /// this channel gets the [`block::Hash`] of the valid tip.
// //
// TODO: add tests for finalized and non-finalized resets (#2654) // TODO: add tests for finalized and non-finalized resets (#2654)
invalid_block_reset_receiver: tokio::sync::mpsc::UnboundedReceiver<block::Hash>, invalid_block_write_reset_receiver: tokio::sync::mpsc::UnboundedReceiver<block::Hash>,
// Pending UTXO Request Tracking // Pending UTXO Request Tracking
// //
@ -188,11 +189,11 @@ pub(crate) struct StateService {
// Metrics // Metrics
// //
/// A metric tracking the maximum height that's currently in `queued_finalized_blocks` /// A metric tracking the maximum height that's currently in `finalized_state_queued_blocks`
/// ///
/// Set to `f64::NAN` if `queued_finalized_blocks` is empty, because grafana shows NaNs /// Set to `f64::NAN` if `finalized_state_queued_blocks` is empty, because grafana shows NaNs
/// as a break in the graph. /// as a break in the graph.
max_queued_finalized_height: f64, max_finalized_queue_height: f64,
} }
/// A read-only service for accessing Zebra's cached blockchain state. /// A read-only service for accessing Zebra's cached blockchain state.
@ -245,16 +246,16 @@ impl Drop for StateService {
// Close the channels (non-blocking) // Close the channels (non-blocking)
// This makes the block write thread exit the next time it checks the channels. // This makes the block write thread exit the next time it checks the channels.
// We want to do this here so we get any errors or panics from the block write task before it shuts down. // We want to do this here so we get any errors or panics from the block write task before it shuts down.
self.invalid_block_reset_receiver.close(); self.invalid_block_write_reset_receiver.close();
std::mem::drop(self.finalized_block_write_sender.take()); std::mem::drop(self.finalized_block_write_sender.take());
std::mem::drop(self.non_finalized_block_write_sender.take()); std::mem::drop(self.non_finalized_block_write_sender.take());
self.clear_finalized_block_queue( self.clear_finalized_block_queue(
"dropping the state: dropped unused queued finalized block", "dropping the state: dropped unused finalized state queue block",
); );
self.clear_non_finalized_block_queue( self.clear_non_finalized_block_queue(
"dropping the state: dropped unused queued non-finalized block", "dropping the state: dropped unused non-finalized state queue block",
); );
// Then drop self.read_service, which checks the block write task for panics, // Then drop self.read_service, which checks the block write task for panics,
@ -364,7 +365,7 @@ impl StateService {
tokio::sync::mpsc::unbounded_channel(); tokio::sync::mpsc::unbounded_channel();
let (finalized_block_write_sender, finalized_block_write_receiver) = let (finalized_block_write_sender, finalized_block_write_receiver) =
tokio::sync::mpsc::unbounded_channel(); tokio::sync::mpsc::unbounded_channel();
let (invalid_block_reset_sender, invalid_block_reset_receiver) = let (invalid_block_reset_sender, invalid_block_write_reset_receiver) =
tokio::sync::mpsc::unbounded_channel(); tokio::sync::mpsc::unbounded_channel();
let finalized_state_for_writing = finalized_state.clone(); let finalized_state_for_writing = finalized_state.clone();
@ -396,25 +397,25 @@ impl StateService {
let full_verifier_utxo_lookahead = let full_verifier_utxo_lookahead =
full_verifier_utxo_lookahead.expect("unexpected negative height"); full_verifier_utxo_lookahead.expect("unexpected negative height");
let queued_non_finalized_blocks = QueuedBlocks::default(); let non_finalized_state_queued_blocks = QueuedBlocks::default();
let pending_utxos = PendingUtxos::default(); let pending_utxos = PendingUtxos::default();
let last_sent_finalized_block_hash = finalized_state.db.finalized_tip_hash(); let finalized_block_write_last_sent_hash = finalized_state.db.finalized_tip_hash();
let state = Self { let state = Self {
network, network,
full_verifier_utxo_lookahead, full_verifier_utxo_lookahead,
queued_non_finalized_blocks, non_finalized_state_queued_blocks,
queued_finalized_blocks: HashMap::new(), finalized_state_queued_blocks: HashMap::new(),
non_finalized_block_write_sender: Some(non_finalized_block_write_sender), non_finalized_block_write_sender: Some(non_finalized_block_write_sender),
finalized_block_write_sender: Some(finalized_block_write_sender), finalized_block_write_sender: Some(finalized_block_write_sender),
last_sent_finalized_block_hash, finalized_block_write_last_sent_hash,
sent_non_finalized_block_hashes: SentHashes::default(), non_finalized_block_write_sent_hashes: SentHashes::default(),
invalid_block_reset_receiver, invalid_block_write_reset_receiver,
pending_utxos, pending_utxos,
last_prune: Instant::now(), last_prune: Instant::now(),
read_service: read_service.clone(), read_service: read_service.clone(),
max_queued_finalized_height: f64::NAN, max_finalized_queue_height: f64::NAN,
}; };
timer.finish(module_path!(), line!(), "initializing state service"); timer.finish(module_path!(), line!(), "initializing state service");
@ -457,7 +458,7 @@ impl StateService {
/// Queue a finalized block for verification and storage in the finalized state. /// Queue a finalized block for verification and storage in the finalized state.
/// ///
/// Returns a channel receiver that provides the result of the block commit. /// Returns a channel receiver that provides the result of the block commit.
fn queue_and_commit_finalized( fn queue_and_commit_to_finalized_state(
&mut self, &mut self,
checkpoint_verified: CheckpointVerifiedBlock, checkpoint_verified: CheckpointVerifiedBlock,
) -> oneshot::Receiver<Result<block::Hash, BoxError>> { ) -> oneshot::Receiver<Result<block::Hash, BoxError>> {
@ -472,7 +473,7 @@ impl StateService {
// If we're close to the final checkpoint, make the block's UTXOs available for // If we're close to the final checkpoint, make the block's UTXOs available for
// full verification of non-finalized blocks, even when it is in the channel. // full verification of non-finalized blocks, even when it is in the channel.
if self.is_close_to_final_checkpoint(queued_height) { if self.is_close_to_final_checkpoint(queued_height) {
self.sent_non_finalized_block_hashes self.non_finalized_block_write_sent_hashes
.add_finalized(&checkpoint_verified) .add_finalized(&checkpoint_verified)
} }
@ -482,23 +483,23 @@ impl StateService {
if self.finalized_block_write_sender.is_some() { if self.finalized_block_write_sender.is_some() {
// We're still committing finalized blocks // We're still committing finalized blocks
if let Some(duplicate_queued) = self if let Some(duplicate_queued) = self
.queued_finalized_blocks .finalized_state_queued_blocks
.insert(queued_prev_hash, queued) .insert(queued_prev_hash, queued)
{ {
Self::send_finalized_block_error( Self::send_checkpoint_verified_block_error(
duplicate_queued, duplicate_queued,
"dropping older finalized block: got newer duplicate block", "dropping older finalized block: got newer duplicate block",
); );
} }
self.drain_queue_and_commit_finalized(); self.drain_finalized_queue_and_commit();
} else { } else {
// We've finished committing finalized blocks, so drop any repeated queued blocks, // We've finished committing finalized blocks, so drop any repeated queued blocks,
// and return an error. // and return an error.
// //
// TODO: track the latest sent height, and drop any blocks under that height // TODO: track the latest sent height, and drop any blocks under that height
// every time we send some blocks (like QueuedNonFinalizedBlocks) // every time we send some blocks (like QueuedSemanticallyVerifiedBlocks)
Self::send_finalized_block_error( Self::send_checkpoint_verified_block_error(
queued, queued,
"already finished committing finalized blocks: dropped duplicate block, \ "already finished committing finalized blocks: dropped duplicate block, \
block is already committed to the state", block is already committed to the state",
@ -510,39 +511,39 @@ impl StateService {
); );
} }
if self.queued_finalized_blocks.is_empty() { if self.finalized_state_queued_blocks.is_empty() {
self.max_queued_finalized_height = f64::NAN; self.max_finalized_queue_height = f64::NAN;
} else if self.max_queued_finalized_height.is_nan() } else if self.max_finalized_queue_height.is_nan()
|| self.max_queued_finalized_height < queued_height.0 as f64 || self.max_finalized_queue_height < queued_height.0 as f64
{ {
// if there are still blocks in the queue, then either: // if there are still blocks in the queue, then either:
// - the new block was lower than the old maximum, and there was a gap before it, // - the new block was lower than the old maximum, and there was a gap before it,
// so the maximum is still the same (and we skip this code), or // so the maximum is still the same (and we skip this code), or
// - the new block is higher than the old maximum, and there is at least one gap // - the new block is higher than the old maximum, and there is at least one gap
// between the finalized tip and the new maximum // between the finalized tip and the new maximum
self.max_queued_finalized_height = queued_height.0 as f64; self.max_finalized_queue_height = queued_height.0 as f64;
} }
metrics::gauge!( metrics::gauge!(
"state.checkpoint.queued.max.height", "state.checkpoint.queued.max.height",
self.max_queued_finalized_height, self.max_finalized_queue_height,
); );
metrics::gauge!( metrics::gauge!(
"state.checkpoint.queued.block.count", "state.checkpoint.queued.block.count",
self.queued_finalized_blocks.len() as f64, self.finalized_state_queued_blocks.len() as f64,
); );
rsp_rx rsp_rx
} }
/// Finds queued finalized blocks to be committed to the state in order, /// Finds finalized state queue blocks to be committed to the state in order,
/// removes them from the queue, and sends them to the block commit task. /// removes them from the queue, and sends them to the block commit task.
/// ///
/// After queueing a finalized block, this method checks whether the newly /// After queueing a finalized block, this method checks whether the newly
/// queued block (and any of its descendants) can be committed to the state. /// queued block (and any of its descendants) can be committed to the state.
/// ///
/// Returns an error if the block commit channel has been closed. /// Returns an error if the block commit channel has been closed.
pub fn drain_queue_and_commit_finalized(&mut self) { pub fn drain_finalized_queue_and_commit(&mut self) {
use tokio::sync::mpsc::error::{SendError, TryRecvError}; use tokio::sync::mpsc::error::{SendError, TryRecvError};
// # Correctness & Performance // # Correctness & Performance
@ -551,8 +552,8 @@ impl StateService {
// because it is called directly from the tokio executor's Future threads. // because it is called directly from the tokio executor's Future threads.
// If a block failed, we need to start again from a valid tip. // If a block failed, we need to start again from a valid tip.
match self.invalid_block_reset_receiver.try_recv() { match self.invalid_block_write_reset_receiver.try_recv() {
Ok(reset_tip_hash) => self.last_sent_finalized_block_hash = reset_tip_hash, Ok(reset_tip_hash) => self.finalized_block_write_last_sent_hash = reset_tip_hash,
Err(TryRecvError::Disconnected) => { Err(TryRecvError::Disconnected) => {
info!("Block commit task closed the block reset channel. Is Zebra shutting down?"); info!("Block commit task closed the block reset channel. Is Zebra shutting down?");
return; return;
@ -562,12 +563,12 @@ impl StateService {
} }
while let Some(queued_block) = self while let Some(queued_block) = self
.queued_finalized_blocks .finalized_state_queued_blocks
.remove(&self.last_sent_finalized_block_hash) .remove(&self.finalized_block_write_last_sent_hash)
{ {
let last_sent_finalized_block_height = queued_block.0.height; let last_sent_finalized_block_height = queued_block.0.height;
self.last_sent_finalized_block_hash = queued_block.0.hash; self.finalized_block_write_last_sent_hash = queued_block.0.hash;
// If we've finished sending finalized blocks, ignore any repeated blocks. // If we've finished sending finalized blocks, ignore any repeated blocks.
// (Blocks can be repeated after a syncer reset.) // (Blocks can be repeated after a syncer reset.)
@ -577,7 +578,7 @@ impl StateService {
// If the receiver is closed, we can't send any more blocks. // If the receiver is closed, we can't send any more blocks.
if let Err(SendError(queued)) = send_result { if let Err(SendError(queued)) = send_result {
// If Zebra is shutting down, drop blocks and return an error. // If Zebra is shutting down, drop blocks and return an error.
Self::send_finalized_block_error( Self::send_checkpoint_verified_block_error(
queued, queued,
"block commit task exited. Is Zebra shutting down?", "block commit task exited. Is Zebra shutting down?",
); );
@ -595,15 +596,18 @@ impl StateService {
} }
} }
/// Drops all queued finalized blocks, and sends an error on their result channels. /// Drops all finalized state queue blocks, and sends an error on their result channels.
fn clear_finalized_block_queue(&mut self, error: impl Into<BoxError> + Clone) { fn clear_finalized_block_queue(&mut self, error: impl Into<BoxError> + Clone) {
for (_hash, queued) in self.queued_finalized_blocks.drain() { for (_hash, queued) in self.finalized_state_queued_blocks.drain() {
Self::send_finalized_block_error(queued, error.clone()); Self::send_checkpoint_verified_block_error(queued, error.clone());
} }
} }
/// Send an error on a `QueuedFinalized` block's result channel, and drop the block /// Send an error on a `QueuedCheckpointVerified` block's result channel, and drop the block
fn send_finalized_block_error(queued: QueuedFinalized, error: impl Into<BoxError>) { fn send_checkpoint_verified_block_error(
queued: QueuedCheckpointVerified,
error: impl Into<BoxError>,
) {
let (finalized, rsp_tx) = queued; let (finalized, rsp_tx) = queued;
// The block sender might have already given up on this block, // The block sender might have already given up on this block,
@ -612,15 +616,18 @@ impl StateService {
std::mem::drop(finalized); std::mem::drop(finalized);
} }
/// Drops all queued non-finalized blocks, and sends an error on their result channels. /// Drops all non-finalized state queue blocks, and sends an error on their result channels.
fn clear_non_finalized_block_queue(&mut self, error: impl Into<BoxError> + Clone) { fn clear_non_finalized_block_queue(&mut self, error: impl Into<BoxError> + Clone) {
for (_hash, queued) in self.queued_non_finalized_blocks.drain() { for (_hash, queued) in self.non_finalized_state_queued_blocks.drain() {
Self::send_non_finalized_block_error(queued, error.clone()); Self::send_semantically_verified_block_error(queued, error.clone());
} }
} }
/// Send an error on a `QueuedNonFinalized` block's result channel, and drop the block /// Send an error on a `QueuedSemanticallyVerified` block's result channel, and drop the block
fn send_non_finalized_block_error(queued: QueuedNonFinalized, error: impl Into<BoxError>) { fn send_semantically_verified_block_error(
queued: QueuedSemanticallyVerified,
error: impl Into<BoxError>,
) {
let (finalized, rsp_tx) = queued; let (finalized, rsp_tx) = queued;
// The block sender might have already given up on this block, // The block sender might have already given up on this block,
@ -637,7 +644,7 @@ impl StateService {
/// ///
/// [1]: https://zebra.zfnd.org/dev/rfcs/0005-state-updates.html#committing-non-finalized-blocks /// [1]: https://zebra.zfnd.org/dev/rfcs/0005-state-updates.html#committing-non-finalized-blocks
#[instrument(level = "debug", skip(self, semantically_verrified))] #[instrument(level = "debug", skip(self, semantically_verrified))]
fn queue_and_commit_non_finalized( fn queue_and_commit_to_non_finalized_state(
&mut self, &mut self,
semantically_verrified: SemanticallyVerifiedBlock, semantically_verrified: SemanticallyVerifiedBlock,
) -> oneshot::Receiver<Result<block::Hash, BoxError>> { ) -> oneshot::Receiver<Result<block::Hash, BoxError>> {
@ -645,7 +652,7 @@ impl StateService {
let parent_hash = semantically_verrified.block.header.previous_block_hash; let parent_hash = semantically_verrified.block.header.previous_block_hash;
if self if self
.sent_non_finalized_block_hashes .non_finalized_block_write_sent_hashes
.contains(&semantically_verrified.hash) .contains(&semantically_verrified.hash)
{ {
let (rsp_tx, rsp_rx) = oneshot::channel(); let (rsp_tx, rsp_rx) = oneshot::channel();
@ -672,7 +679,7 @@ impl StateService {
// has been queued but not yet committed to the state fails the older request and replaces // has been queued but not yet committed to the state fails the older request and replaces
// it with the newer request. // it with the newer request.
let rsp_rx = if let Some((_, old_rsp_tx)) = self let rsp_rx = if let Some((_, old_rsp_tx)) = self
.queued_non_finalized_blocks .non_finalized_state_queued_blocks
.get_mut(&semantically_verrified.hash) .get_mut(&semantically_verrified.hash)
{ {
tracing::debug!("replacing older queued request with new request"); tracing::debug!("replacing older queued request with new request");
@ -682,7 +689,7 @@ impl StateService {
rsp_rx rsp_rx
} else { } else {
let (rsp_tx, rsp_rx) = oneshot::channel(); let (rsp_tx, rsp_rx) = oneshot::channel();
self.queued_non_finalized_blocks self.non_finalized_state_queued_blocks
.queue((semantically_verrified, rsp_tx)); .queue((semantically_verrified, rsp_tx));
rsp_rx rsp_rx
}; };
@ -697,9 +704,10 @@ impl StateService {
// TODO: configure the state with the last checkpoint hash instead? // TODO: configure the state with the last checkpoint hash instead?
if self.finalized_block_write_sender.is_some() if self.finalized_block_write_sender.is_some()
&& self && self
.queued_non_finalized_blocks .non_finalized_state_queued_blocks
.has_queued_children(self.last_sent_finalized_block_hash) .has_queued_children(self.finalized_block_write_last_sent_hash)
&& self.read_service.db.finalized_tip_hash() == self.last_sent_finalized_block_hash && self.read_service.db.finalized_tip_hash()
== self.finalized_block_write_last_sent_hash
{ {
// Tell the block write task to stop committing finalized blocks, // Tell the block write task to stop committing finalized blocks,
// and move on to committing non-finalized blocks. // and move on to committing non-finalized blocks.
@ -728,10 +736,10 @@ impl StateService {
"Finalized state must have at least one block before committing non-finalized state", "Finalized state must have at least one block before committing non-finalized state",
); );
self.queued_non_finalized_blocks self.non_finalized_state_queued_blocks
.prune_by_height(finalized_tip_height); .prune_by_height(finalized_tip_height);
self.sent_non_finalized_block_hashes self.non_finalized_block_write_sent_hashes
.prune_by_height(finalized_tip_height); .prune_by_height(finalized_tip_height);
} }
@ -740,7 +748,7 @@ impl StateService {
/// Returns `true` if `hash` is a valid previous block hash for new non-finalized blocks. /// Returns `true` if `hash` is a valid previous block hash for new non-finalized blocks.
fn can_fork_chain_at(&self, hash: &block::Hash) -> bool { fn can_fork_chain_at(&self, hash: &block::Hash) -> bool {
self.sent_non_finalized_block_hashes.contains(hash) self.non_finalized_block_write_sent_hashes.contains(hash)
|| &self.read_service.db.finalized_tip_hash() == hash || &self.read_service.db.finalized_tip_hash() == hash
} }
@ -765,18 +773,19 @@ impl StateService {
while let Some(parent_hash) = new_parents.pop() { while let Some(parent_hash) = new_parents.pop() {
let queued_children = self let queued_children = self
.queued_non_finalized_blocks .non_finalized_state_queued_blocks
.dequeue_children(parent_hash); .dequeue_children(parent_hash);
for queued_child in queued_children { for queued_child in queued_children {
let (SemanticallyVerifiedBlock { hash, .. }, _) = queued_child; let (SemanticallyVerifiedBlock { hash, .. }, _) = queued_child;
self.sent_non_finalized_block_hashes.add(&queued_child.0); self.non_finalized_block_write_sent_hashes
.add(&queued_child.0);
let send_result = non_finalized_block_write_sender.send(queued_child); let send_result = non_finalized_block_write_sender.send(queued_child);
if let Err(SendError(queued)) = send_result { if let Err(SendError(queued)) = send_result {
// If Zebra is shutting down, drop blocks and return an error. // If Zebra is shutting down, drop blocks and return an error.
Self::send_non_finalized_block_error( Self::send_semantically_verified_block_error(
queued, queued,
"block commit task exited. Is Zebra shutting down?", "block commit task exited. Is Zebra shutting down?",
); );
@ -792,7 +801,7 @@ impl StateService {
} }
} }
self.sent_non_finalized_block_hashes.finish_batch(); self.non_finalized_block_write_sent_hashes.finish_batch();
}; };
} }
@ -905,7 +914,7 @@ impl Service<Request> for StateService {
let span = Span::current(); let span = Span::current();
match req { match req {
// Uses queued_non_finalized_blocks and pending_utxos in the StateService // Uses non_finalized_state_queued_blocks and pending_utxos in the StateService
// Accesses shared writeable state in the StateService, NonFinalizedState, and ZebraDb. // Accesses shared writeable state in the StateService, NonFinalizedState, and ZebraDb.
Request::CommitSemanticallyVerifiedBlock(semantically_verified) => { Request::CommitSemanticallyVerifiedBlock(semantically_verified) => {
self.assert_block_can_be_validated(&semantically_verified); self.assert_block_can_be_validated(&semantically_verified);
@ -925,7 +934,9 @@ impl Service<Request> for StateService {
// https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html // https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html
let rsp_rx = tokio::task::block_in_place(move || { let rsp_rx = tokio::task::block_in_place(move || {
span.in_scope(|| self.queue_and_commit_non_finalized(semantically_verified)) span.in_scope(|| {
self.queue_and_commit_to_non_finalized_state(semantically_verified)
})
}); });
// TODO: // TODO:
@ -954,7 +965,7 @@ impl Service<Request> for StateService {
.boxed() .boxed()
} }
// Uses queued_finalized_blocks and pending_utxos in the StateService. // Uses finalized_state_queued_blocks and pending_utxos in the StateService.
// Accesses shared writeable state in the StateService. // Accesses shared writeable state in the StateService.
Request::CommitCheckpointVerifiedBlock(finalized) => { Request::CommitCheckpointVerifiedBlock(finalized) => {
// # Consensus // # Consensus
@ -971,7 +982,7 @@ impl Service<Request> for StateService {
// //
// This method doesn't block, access the database, or perform CPU-intensive tasks, // This method doesn't block, access the database, or perform CPU-intensive tasks,
// so we can run it directly in the tokio executor's Future threads. // so we can run it directly in the tokio executor's Future threads.
let rsp_rx = self.queue_and_commit_finalized(finalized); let rsp_rx = self.queue_and_commit_to_finalized_state(finalized);
// TODO: // TODO:
// - check for panics in the block write task here, // - check for panics in the block write task here,
@ -996,7 +1007,7 @@ impl Service<Request> for StateService {
.boxed() .boxed()
} }
// Uses pending_utxos and queued_non_finalized_blocks in the StateService. // Uses pending_utxos and non_finalized_state_queued_blocks in the StateService.
// If the UTXO isn't in the queued blocks, runs concurrently using the ReadStateService. // If the UTXO isn't in the queued blocks, runs concurrently using the ReadStateService.
Request::AwaitUtxo(outpoint) => { Request::AwaitUtxo(outpoint) => {
// Prepare the AwaitUtxo future from PendingUxtos. // Prepare the AwaitUtxo future from PendingUxtos.
@ -1008,7 +1019,7 @@ impl Service<Request> for StateService {
// Check the non-finalized block queue outside the returned future, // Check the non-finalized block queue outside the returned future,
// so we can access mutable state fields. // so we can access mutable state fields.
if let Some(utxo) = self.queued_non_finalized_blocks.utxo(&outpoint) { if let Some(utxo) = self.non_finalized_state_queued_blocks.utxo(&outpoint) {
self.pending_utxos.respond(&outpoint, utxo); self.pending_utxos.respond(&outpoint, utxo);
// We're finished, the returned future gets the UTXO from the respond() channel. // We're finished, the returned future gets the UTXO from the respond() channel.
@ -1018,7 +1029,7 @@ impl Service<Request> for StateService {
} }
// Check the sent non-finalized blocks // Check the sent non-finalized blocks
if let Some(utxo) = self.sent_non_finalized_block_hashes.utxo(&outpoint) { if let Some(utxo) = self.non_finalized_block_write_sent_hashes.utxo(&outpoint) {
self.pending_utxos.respond(&outpoint, utxo); self.pending_utxos.respond(&outpoint, utxo);
// We're finished, the returned future gets the UTXO from the respond() channel. // We're finished, the returned future gets the UTXO from the respond() channel.
@ -1027,7 +1038,7 @@ impl Service<Request> for StateService {
return response_fut; return response_fut;
} }
// We ignore any UTXOs in FinalizedState.queued_finalized_blocks, // We ignore any UTXOs in FinalizedState.finalized_state_queued_blocks,
// because it is only used during checkpoint verification. // because it is only used during checkpoint verification.
// //
// This creates a rare race condition, but it doesn't seem to happen much in practice. // This creates a rare race condition, but it doesn't seem to happen much in practice.

View File

@ -24,7 +24,7 @@ use zebra_chain::{block, parameters::Network};
use crate::{ use crate::{
request::ContextuallyVerifiedBlockWithTrees, request::ContextuallyVerifiedBlockWithTrees,
service::{check, QueuedFinalized}, service::{check, QueuedCheckpointVerified},
BoxError, CheckpointVerifiedBlock, CloneError, Config, BoxError, CheckpointVerifiedBlock, CloneError, Config,
}; };
@ -167,7 +167,7 @@ impl FinalizedState {
/// order. /// order.
pub fn commit_finalized( pub fn commit_finalized(
&mut self, &mut self,
ordered_block: QueuedFinalized, ordered_block: QueuedCheckpointVerified,
) -> Result<CheckpointVerifiedBlock, BoxError> { ) -> Result<CheckpointVerifiedBlock, BoxError> {
let (checkpoint_verified, rsp_tx) = ordered_block; let (checkpoint_verified, rsp_tx) = ordered_block;
let result = self.commit_finalized_direct( let result = self.commit_finalized_direct(

View File

@ -15,14 +15,14 @@ use crate::{BoxError, CheckpointVerifiedBlock, SemanticallyVerifiedBlock};
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
/// A queued finalized block, and its corresponding [`Result`] channel. /// A finalized state queue block, and its corresponding [`Result`] channel.
pub type QueuedFinalized = ( pub type QueuedCheckpointVerified = (
CheckpointVerifiedBlock, CheckpointVerifiedBlock,
oneshot::Sender<Result<block::Hash, BoxError>>, oneshot::Sender<Result<block::Hash, BoxError>>,
); );
/// A queued non-finalized block, and its corresponding [`Result`] channel. /// A non-finalized state queue block, and its corresponding [`Result`] channel.
pub type QueuedNonFinalized = ( pub type QueuedSemanticallyVerified = (
SemanticallyVerifiedBlock, SemanticallyVerifiedBlock,
oneshot::Sender<Result<block::Hash, BoxError>>, oneshot::Sender<Result<block::Hash, BoxError>>,
); );
@ -31,7 +31,7 @@ pub type QueuedNonFinalized = (
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct QueuedBlocks { pub struct QueuedBlocks {
/// Blocks awaiting their parent blocks for contextual verification. /// Blocks awaiting their parent blocks for contextual verification.
blocks: HashMap<block::Hash, QueuedNonFinalized>, blocks: HashMap<block::Hash, QueuedSemanticallyVerified>,
/// Hashes from `queued_blocks`, indexed by parent hash. /// Hashes from `queued_blocks`, indexed by parent hash.
by_parent: HashMap<block::Hash, HashSet<block::Hash>>, by_parent: HashMap<block::Hash, HashSet<block::Hash>>,
/// Hashes from `queued_blocks`, indexed by block height. /// Hashes from `queued_blocks`, indexed by block height.
@ -47,7 +47,7 @@ impl QueuedBlocks {
/// ///
/// - if a block with the same `block::Hash` has already been queued. /// - if a block with the same `block::Hash` has already been queued.
#[instrument(skip(self), fields(height = ?new.0.height, hash = %new.0.hash))] #[instrument(skip(self), fields(height = ?new.0.height, hash = %new.0.hash))]
pub fn queue(&mut self, new: QueuedNonFinalized) { pub fn queue(&mut self, new: QueuedSemanticallyVerified) {
let new_hash = new.0.hash; let new_hash = new.0.hash;
let new_height = new.0.height; let new_height = new.0.height;
let parent_hash = new.0.block.header.previous_block_hash; let parent_hash = new.0.block.header.previous_block_hash;
@ -86,7 +86,10 @@ impl QueuedBlocks {
/// Dequeue and return all blocks that were waiting for the arrival of /// Dequeue and return all blocks that were waiting for the arrival of
/// `parent`. /// `parent`.
#[instrument(skip(self), fields(%parent_hash))] #[instrument(skip(self), fields(%parent_hash))]
pub fn dequeue_children(&mut self, parent_hash: block::Hash) -> Vec<QueuedNonFinalized> { pub fn dequeue_children(
&mut self,
parent_hash: block::Hash,
) -> Vec<QueuedSemanticallyVerified> {
let queued_children = self let queued_children = self
.by_parent .by_parent
.remove(&parent_hash) .remove(&parent_hash)
@ -176,7 +179,7 @@ impl QueuedBlocks {
} }
/// Return the queued block if it has already been registered /// Return the queued block if it has already been registered
pub fn get_mut(&mut self, hash: &block::Hash) -> Option<&mut QueuedNonFinalized> { pub fn get_mut(&mut self, hash: &block::Hash) -> Option<&mut QueuedSemanticallyVerified> {
self.blocks.get_mut(hash) self.blocks.get_mut(hash)
} }
@ -208,7 +211,7 @@ impl QueuedBlocks {
/// Returns all key-value pairs of blocks as an iterator. /// Returns all key-value pairs of blocks as an iterator.
/// ///
/// Doesn't update the metrics, because it is only used when the state is being dropped. /// Doesn't update the metrics, because it is only used when the state is being dropped.
pub fn drain(&mut self) -> Drain<'_, block::Hash, QueuedNonFinalized> { pub fn drain(&mut self) -> Drain<'_, block::Hash, QueuedSemanticallyVerified> {
self.known_utxos.clear(); self.known_utxos.clear();
self.known_utxos.shrink_to_fit(); self.known_utxos.shrink_to_fit();
self.by_parent.clear(); self.by_parent.clear();

View File

@ -9,17 +9,17 @@ use zebra_test::prelude::*;
use crate::{ use crate::{
arbitrary::Prepare, arbitrary::Prepare,
service::queued_blocks::{QueuedBlocks, QueuedNonFinalized}, service::queued_blocks::{QueuedBlocks, QueuedSemanticallyVerified},
tests::FakeChainHelper, tests::FakeChainHelper,
}; };
// Quick helper trait for making queued blocks with throw away channels // Quick helper trait for making queued blocks with throw away channels
trait IntoQueued { trait IntoQueued {
fn into_queued(self) -> QueuedNonFinalized; fn into_queued(self) -> QueuedSemanticallyVerified;
} }
impl IntoQueued for Arc<Block> { impl IntoQueued for Arc<Block> {
fn into_queued(self) -> QueuedNonFinalized { fn into_queued(self) -> QueuedSemanticallyVerified {
let (rsp_tx, _) = oneshot::channel(); let (rsp_tx, _) = oneshot::channel();
(self.prepare(), rsp_tx) (self.prepare(), rsp_tx)
} }

View File

@ -424,7 +424,7 @@ proptest! {
expected_finalized_value_pool += *block_value_pool; expected_finalized_value_pool += *block_value_pool;
} }
let result_receiver = state_service.queue_and_commit_finalized(block.clone()); let result_receiver = state_service.queue_and_commit_to_finalized_state(block.clone());
let result = result_receiver.blocking_recv(); let result = result_receiver.blocking_recv();
prop_assert!(result.is_ok(), "unexpected failed finalized block commit: {:?}", result); prop_assert!(result.is_ok(), "unexpected failed finalized block commit: {:?}", result);
@ -450,7 +450,7 @@ proptest! {
let block_value_pool = &block.block.chain_value_pool_change(&transparent::utxos_from_ordered_utxos(utxos))?; let block_value_pool = &block.block.chain_value_pool_change(&transparent::utxos_from_ordered_utxos(utxos))?;
expected_non_finalized_value_pool += *block_value_pool; expected_non_finalized_value_pool += *block_value_pool;
let result_receiver = state_service.queue_and_commit_non_finalized(block.clone()); let result_receiver = state_service.queue_and_commit_to_non_finalized_state(block.clone());
let result = result_receiver.blocking_recv(); let result = result_receiver.blocking_recv();
prop_assert!(result.is_ok(), "unexpected failed non-finalized block commit: {:?}", result); prop_assert!(result.is_ok(), "unexpected failed non-finalized block commit: {:?}", result);
@ -509,7 +509,7 @@ proptest! {
TipAction::grow_with(expected_block.clone().into()) TipAction::grow_with(expected_block.clone().into())
}; };
let result_receiver = state_service.queue_and_commit_finalized(block); let result_receiver = state_service.queue_and_commit_to_finalized_state(block);
let result = result_receiver.blocking_recv(); let result = result_receiver.blocking_recv();
prop_assert!(result.is_ok(), "unexpected failed finalized block commit: {:?}", result); prop_assert!(result.is_ok(), "unexpected failed finalized block commit: {:?}", result);
@ -532,7 +532,7 @@ proptest! {
TipAction::grow_with(expected_block.clone().into()) TipAction::grow_with(expected_block.clone().into())
}; };
let result_receiver = state_service.queue_and_commit_non_finalized(block); let result_receiver = state_service.queue_and_commit_to_non_finalized_state(block);
let result = result_receiver.blocking_recv(); let result = result_receiver.blocking_recv();
prop_assert!(result.is_ok(), "unexpected failed non-finalized block commit: {:?}", result); prop_assert!(result.is_ok(), "unexpected failed non-finalized block commit: {:?}", result);

View File

@ -17,7 +17,7 @@ use crate::{
check, check,
finalized_state::{FinalizedState, ZebraDb}, finalized_state::{FinalizedState, ZebraDb},
non_finalized_state::NonFinalizedState, non_finalized_state::NonFinalizedState,
queued_blocks::{QueuedFinalized, QueuedNonFinalized}, queued_blocks::{QueuedCheckpointVerified, QueuedSemanticallyVerified},
BoxError, ChainTipBlock, ChainTipSender, CloneError, BoxError, ChainTipBlock, ChainTipSender, CloneError,
}, },
CommitSemanticallyVerifiedError, SemanticallyVerifiedBlock, CommitSemanticallyVerifiedError, SemanticallyVerifiedBlock,
@ -131,8 +131,8 @@ fn update_latest_chain_channels(
) )
)] )]
pub fn write_blocks_from_channels( pub fn write_blocks_from_channels(
mut finalized_block_write_receiver: UnboundedReceiver<QueuedFinalized>, mut finalized_block_write_receiver: UnboundedReceiver<QueuedCheckpointVerified>,
mut non_finalized_block_write_receiver: UnboundedReceiver<QueuedNonFinalized>, mut non_finalized_block_write_receiver: UnboundedReceiver<QueuedSemanticallyVerified>,
mut finalized_state: FinalizedState, mut finalized_state: FinalizedState,
mut non_finalized_state: NonFinalizedState, mut non_finalized_state: NonFinalizedState,
invalid_block_reset_sender: UnboundedSender<block::Hash>, invalid_block_reset_sender: UnboundedSender<block::Hash>,