diff --git a/zebra-state/src/service.rs b/zebra-state/src/service.rs
index 956d6300..93f384e8 100644
--- a/zebra-state/src/service.rs
+++ b/zebra-state/src/service.rs
@@ -42,6 +42,7 @@ use crate::{
finalized_state::{FinalizedState, ZebraDb},
non_finalized_state::{Chain, NonFinalizedState, QueuedBlocks},
pending_utxos::PendingUtxos,
+ watch_receiver::WatchReceiver,
},
BoxError, CloneError, CommitBlockError, Config, FinalizedBlock, PreparedBlock, Request,
Response, ValidateContextError,
@@ -49,6 +50,7 @@ use crate::{
pub mod block_iter;
pub mod chain_tip;
+pub mod watch_receiver;
pub(crate) mod check;
@@ -144,7 +146,7 @@ pub struct ReadStateService {
///
/// This chain is only updated between requests,
/// so it might include some block data that is also on `disk`.
- best_chain_receiver: watch::Receiver>>,
+ best_chain_receiver: WatchReceiver >>,
/// The configured Zcash network.
network: Network,
@@ -290,27 +292,40 @@ impl StateService {
);
self.queued_blocks.prune_by_height(finalized_tip_height);
- let best_chain = self.mem.best_chain();
- let tip_block = best_chain
- .and_then(|chain| chain.tip_block())
- .cloned()
- .map(ChainTipBlock::from);
+ let tip_block_height = self.update_latest_chain_channels();
// update metrics using the best non-finalized tip
- if let Some(tip_block) = tip_block.as_ref() {
+ if let Some(tip_block_height) = tip_block_height {
metrics::gauge!(
"state.full_verifier.committed.block.height",
- tip_block.height.0 as _
+ tip_block_height.0 as _
);
// This height gauge is updated for both fully verified and checkpoint blocks.
// These updates can't conflict, because the state makes sure that blocks
// are committed in order.
- metrics::gauge!("zcash.chain.verified.block.height", tip_block.height.0 as _);
+ metrics::gauge!("zcash.chain.verified.block.height", tip_block_height.0 as _);
}
- // update the chain watch channels
+ tracing::trace!("finished processing queued block");
+ rsp_rx
+ }
+ /// Update the [`LatestChainTip`], [`ChainTipChange`], and [`LatestChain`] channels
+ /// with the latest non-finalized [`ChainTipBlock`] and [`Chain`].
+ ///
+ /// Returns the latest non-finalized chain tip height,
+ /// or `None` if the non-finalized state is empty.
+ #[instrument(level = "debug", skip(self))]
+ fn update_latest_chain_channels(&mut self) -> Option {
+ let best_chain = self.mem.best_chain();
+ let tip_block = best_chain
+ .and_then(|chain| chain.tip_block())
+ .cloned()
+ .map(ChainTipBlock::from);
+ let tip_block_height = tip_block.as_ref().map(|block| block.height);
+
+ // The RPC service uses the ReadStateService, but it is not turned on by default.
if self.best_chain_sender.receiver_count() > 0 {
// If the final receiver was just dropped, ignore the error.
let _ = self.best_chain_sender.send(best_chain.cloned());
@@ -318,8 +333,7 @@ impl StateService {
self.chain_tip_sender.set_best_non_finalized_tip(tip_block);
- tracing::trace!("finished processing queued block");
- rsp_rx
+ tip_block_height
}
/// Run contextual validation on the prepared block and add it to the
@@ -444,8 +458,8 @@ impl StateService {
Some(tip.0 - height.0)
}
- /// Return the block identified by either its `height` or `hash`,
- /// if it exists in the current best chain.
+ /// Returns the [`Block`] with [`Hash`](zebra_chain::block::Hash) or
+ /// [`Height`](zebra_chain::block::Height), if it exists in the current best chain.
pub fn best_block(&self, hash_or_height: HashOrHeight) -> Option> {
read::block(self.mem.best_chain(), self.disk.db(), hash_or_height)
}
@@ -675,7 +689,7 @@ impl ReadStateService {
let read_only_service = Self {
db: disk.db().clone(),
- best_chain_receiver,
+ best_chain_receiver: WatchReceiver::new(best_chain_receiver),
network: disk.network(),
};
@@ -849,12 +863,11 @@ impl Service for ReadStateService {
let state = self.clone();
async move {
- Ok(read::block(
- state.best_chain_receiver.borrow().clone().as_ref(),
- &state.db,
- hash_or_height,
- ))
- .map(Response::Block)
+ let block = state.best_chain_receiver.with_watch_data(|best_chain| {
+ read::block(best_chain, &state.db, hash_or_height)
+ });
+
+ Ok(Response::Block(block))
}
.boxed()
}
diff --git a/zebra-state/src/service/chain_tip.rs b/zebra-state/src/service/chain_tip.rs
index 68ff60c4..7ed7d32a 100644
--- a/zebra-state/src/service/chain_tip.rs
+++ b/zebra-state/src/service/chain_tip.rs
@@ -23,7 +23,9 @@ use zebra_chain::{
transaction,
};
-use crate::{request::ContextuallyValidBlock, FinalizedBlock};
+use crate::{
+ request::ContextuallyValidBlock, service::watch_receiver::WatchReceiver, FinalizedBlock,
+};
use TipAction::*;
@@ -154,6 +156,9 @@ impl ChainTipSender {
/// Update the latest finalized tip.
///
/// May trigger an update to the best tip.
+ //
+ // TODO: when we replace active_value with `watch::Sender::borrow`,
+ // refactor instrument to avoid multiple borrows, to prevent deadlocks
#[instrument(
skip(self, new_tip),
fields(
@@ -175,6 +180,9 @@ impl ChainTipSender {
/// Update the latest non-finalized tip.
///
/// May trigger an update to the best tip.
+ //
+ // TODO: when we replace active_value with `watch::Sender::borrow`,
+ // refactor instrument to avoid multiple borrows, to prevent deadlocks
#[instrument(
skip(self, new_tip),
fields(
@@ -250,65 +258,67 @@ impl ChainTipSender {
#[derive(Clone, Debug)]
pub struct LatestChainTip {
/// The receiver for the current chain tip's data.
- receiver: watch::Receiver,
+ receiver: WatchReceiver,
}
impl LatestChainTip {
/// Create a new [`LatestChainTip`] from a watch channel receiver.
fn new(receiver: watch::Receiver) -> Self {
- Self { receiver }
+ Self {
+ receiver: WatchReceiver::new(receiver),
+ }
}
- /// Retrieve a result `R` from the current [`ChainTipBlock`], if it's available.
+ /// Maps the current data `ChainTipData` to `Option`
+ /// by applying a function to the watched value,
+ /// while holding the receiver lock as briefly as possible.
///
/// This helper method is a shorter way to borrow the value from the [`watch::Receiver`] and
/// extract some information from it, while also adding the current chain tip block's fields as
/// records to the current span.
///
- /// A single read lock is kept during the execution of the method, and it is dropped at the end
- /// of it.
+ /// A single read lock is acquired to clone `T`, and then released after the clone.
+ /// See the performance note on [`WatchReceiver::with_watch_data`].
///
/// # Correctness
///
- /// To prevent deadlocks:
- ///
- /// - `receiver.borrow()` should not be called before this method while in the same scope.
- /// - `receiver.borrow()` should not be called inside the `action` closure.
- ///
- /// It is important to avoid calling `borrow` more than once in the same scope, which
- /// effectively tries to acquire two read locks to the shared data in the watch channel. If
- /// that is done, there's a chance that the [`watch::Sender`] tries to send a value, which
- /// starts acquiring a write-lock, and prevents further read-locks from being acquired until
- /// the update is finished.
- ///
- /// What can happen in that scenario is:
- ///
- /// 1. The receiver manages to acquire a read-lock for the first `borrow`
- /// 2. The sender starts acquiring the write-lock
- /// 3. The receiver fails to acquire a read-lock for the second `borrow`
- ///
- /// Now both the sender and the receivers hang, because the sender won't release the lock until
- /// it can update the value, and the receiver won't release its first read-lock until it
- /// acquires the second read-lock and finishes what it's doing.
- fn with_chain_tip_block(&self, action: impl FnOnce(&ChainTipBlock) -> R) -> Option {
+ /// To avoid deadlocks, see the correctness note on [`WatchReceiver::with_watch_data`].
+ fn with_chain_tip_block(&self, f: F) -> Option
+ where
+ F: FnOnce(&ChainTipBlock) -> U,
+ {
let span = tracing::Span::current();
- let borrow_guard = self.receiver.borrow();
- let chain_tip_block = borrow_guard.as_ref();
- span.record(
- "height",
- &tracing::field::debug(chain_tip_block.map(|block| block.height)),
- );
- span.record(
- "hash",
- &tracing::field::debug(chain_tip_block.map(|block| block.hash)),
- );
- span.record(
- "transaction_count",
- &tracing::field::debug(chain_tip_block.map(|block| block.transaction_hashes.len())),
- );
+ let register_span_fields = |chain_tip_block: Option<&ChainTipBlock>| {
+ span.record(
+ "height",
+ &tracing::field::debug(chain_tip_block.map(|block| block.height)),
+ );
+ span.record(
+ "hash",
+ &tracing::field::debug(chain_tip_block.map(|block| block.hash)),
+ );
+ span.record(
+ "time",
+ &tracing::field::debug(chain_tip_block.map(|block| block.time)),
+ );
+ span.record(
+ "previous_hash",
+ &tracing::field::debug(chain_tip_block.map(|block| block.previous_block_hash)),
+ );
+ span.record(
+ "transaction_count",
+ &tracing::field::debug(chain_tip_block.map(|block| block.transaction_hashes.len())),
+ );
+ };
- chain_tip_block.map(action)
+ self.receiver.with_watch_data(|chain_tip_block| {
+ // TODO: replace with Option::inspect when it stabilises
+ // https://github.com/rust-lang/rust/issues/91345
+ register_span_fields(chain_tip_block.as_ref());
+
+ chain_tip_block.as_ref().map(f)
+ })
}
}
diff --git a/zebra-state/src/service/finalized_state/zebra_db/block.rs b/zebra-state/src/service/finalized_state/zebra_db/block.rs
index 4a50957a..958b3fa7 100644
--- a/zebra-state/src/service/finalized_state/zebra_db/block.rs
+++ b/zebra-state/src/service/finalized_state/zebra_db/block.rs
@@ -69,7 +69,8 @@ impl ZebraDb {
self.db.zs_get(height_by_hash, &hash)
}
- /// Returns the given block if it exists.
+ /// Returns the [`Block`] with [`Hash`](zebra_chain::block::Hash) or
+ /// [`Height`](zebra_chain::block::Height), if it exists in the finalized chain.
pub fn block(&self, hash_or_height: HashOrHeight) -> Option> {
let height_by_hash = self.db.cf_handle("height_by_hash").unwrap();
let block_by_height = self.db.cf_handle("block_by_height").unwrap();
diff --git a/zebra-state/src/service/non_finalized_state/chain.rs b/zebra-state/src/service/non_finalized_state/chain.rs
index 63ba91b1..89f873c4 100644
--- a/zebra-state/src/service/non_finalized_state/chain.rs
+++ b/zebra-state/src/service/non_finalized_state/chain.rs
@@ -318,7 +318,8 @@ impl Chain {
Ok(Some(forked))
}
- /// Returns the [`ContextuallyValidBlock`] at a given height or hash in this chain.
+ /// Returns the [`ContextuallyValidBlock`] with [`Hash`](zebra_chain::block::Hash) or
+ /// [`Height`](zebra_chain::block::Height), if it exists in this chain.
pub fn block(&self, hash_or_height: HashOrHeight) -> Option<&ContextuallyValidBlock> {
let height =
hash_or_height.height_or_else(|hash| self.height_by_hash.get(&hash).cloned())?;
diff --git a/zebra-state/src/service/read.rs b/zebra-state/src/service/read.rs
index 069cf198..43e3dd59 100644
--- a/zebra-state/src/service/read.rs
+++ b/zebra-state/src/service/read.rs
@@ -13,15 +13,28 @@ use crate::{
HashOrHeight,
};
-/// Return the block identified by either its `height` or `hash` if it exists
-/// in the non-finalized `chain` or finalized `db`.
-pub(crate) fn block(
- chain: Option<&Arc>,
+/// Returns the [`Block`] with [`Hash`](zebra_chain::block::Hash) or
+/// [`Height`](zebra_chain::block::Height),
+/// if it exists in the non-finalized `chain` or finalized `db`.
+pub(crate) fn block(
+ chain: Option,
db: &ZebraDb,
hash_or_height: HashOrHeight,
-) -> Option> {
+) -> Option>
+where
+ C: AsRef,
+{
+ // # Correctness
+ //
+ // The StateService commits blocks to the finalized state before updating the latest chain,
+ // and it can commit additional blocks after we've cloned this `chain` variable.
+ //
+ // Since blocks are the same in the finalized and non-finalized state,
+ // we check the most efficient alternative first.
+ // (`chain` is always in memory, but `db` stores blocks on disk, with a memory cache.)
chain
- .and_then(|chain| chain.block(hash_or_height))
+ .as_ref()
+ .and_then(|chain| chain.as_ref().block(hash_or_height))
.map(|contextual| contextual.block.clone())
.or_else(|| db.block(hash_or_height))
}
diff --git a/zebra-state/src/service/watch_receiver.rs b/zebra-state/src/service/watch_receiver.rs
new file mode 100644
index 00000000..ee40eebc
--- /dev/null
+++ b/zebra-state/src/service/watch_receiver.rs
@@ -0,0 +1,99 @@
+//! Shared [`tokio::sync::watch`] channel wrappers.
+//!
+//! These wrappers help use watch channels correctly.
+
+use tokio::sync::watch;
+
+/// Efficient access to state data via a [`tokio`] [`watch::Receiver`] channel,
+/// while avoiding deadlocks.
+///
+/// Returns data from the most recent state,
+/// regardless of how many times you call its methods.
+///
+/// Cloned instances provide identical state data.
+///
+/// # Correctness
+///
+/// To avoid deadlocks, see the correctness note on [`WatchReceiver::with_watch_data`].
+///
+/// # Note
+///
+/// If a lot of blocks are committed at the same time,
+/// the watch chanel will skip some block updates,
+/// even though those updates were committed to the state.
+#[derive(Clone, Debug)]
+pub struct WatchReceiver {
+ /// The receiver for the current state data.
+ receiver: watch::Receiver,
+}
+
+impl WatchReceiver {
+ /// Create a new [`WatchReceiver`] from a watch channel receiver.
+ pub fn new(receiver: watch::Receiver) -> Self {
+ Self { receiver }
+ }
+}
+
+impl WatchReceiver
+where
+ T: Clone,
+{
+ /// Maps the current data `T` to `U` by applying a function to the watched value,
+ /// while holding the receiver lock as briefly as possible.
+ ///
+ /// This helper method is a shorter way to borrow the value from the [`watch::Receiver`] and
+ /// extract some information from it.
+ ///
+ /// # Performance
+ ///
+ /// A single read lock is acquired to clone `T`, and then released after the clone.
+ /// To make this clone efficient, large or expensive `T` can be wrapped in an [`Arc`].
+ /// (Or individual fields can be wrapped in an `Arc`.)
+ ///
+ /// # Correctness
+ ///
+ /// To prevent deadlocks:
+ ///
+ /// - `receiver.borrow()` should not be called before this method while in the same scope.
+ ///
+ /// It is important to avoid calling `borrow` more than once in the same scope, which
+ /// effectively tries to acquire two read locks to the shared data in the watch channel. If
+ /// that is done, there's a chance that the [`watch::Sender`] tries to send a value, which
+ /// starts acquiring a write-lock, and prevents further read-locks from being acquired until
+ /// the update is finished.
+ ///
+ /// What can happen in that scenario is:
+ ///
+ /// 1. The receiver manages to acquire a read-lock for the first `borrow`
+ /// 2. The sender starts acquiring the write-lock
+ /// 3. The receiver fails to acquire a read-lock for the second `borrow`
+ ///
+ /// Now both the sender and the receivers hang, because the sender won't release the lock until
+ /// it can update the value, and the receiver won't release its first read-lock until it
+ /// acquires the second read-lock and finishes what it's doing.
+ pub fn with_watch_data(&self, f: F) -> U
+ where
+ F: FnOnce(T) -> U,
+ {
+ // Make sure that the borrow's watch channel read lock
+ // is dropped before the closure is executed.
+ //
+ // Without this change, an eager reader can repeatedly block the channel writer.
+ // This seems to happen easily in RPC & ReadStateService futures.
+ // (For example, when lightwalletd syncs from Zebra, while Zebra syncs from peers.)
+ let cloned_data = {
+ let borrow_guard = self.receiver.borrow();
+ let cloned_data = borrow_guard.clone();
+ std::mem::drop(borrow_guard);
+
+ cloned_data
+ };
+
+ f(cloned_data)
+ }
+
+ /// Calls [`watch::Receiver::changed`] and returns the result.
+ pub async fn changed(&mut self) -> Result<(), watch::error::RecvError> {
+ self.receiver.changed().await
+ }
+}