3. fix(state): prevent watch channel deadlocks in the state (#3870)
* Revert "Drop the Chain watch channel lock before accessing the finalized state" This reverts commit 8870944d1323fc31e2d009b9938623de6fcbe50f. * Add a WatchReceiver wrapper that always clones the borrowed watch data This avoids deadlocks, by holding the read lock for as short a time as possible. * Drop the shared Arc<Chain>s as quickly as possible This reduces memory usage. * Make read::block more flexible, by accepting any AsRef<Chain> * Make the block method docs consistent * Avoid livelocks by explicitly dropping the borrow after the clone
This commit is contained in:
parent
413f7fbb1d
commit
67b367929c
|
|
@ -42,6 +42,7 @@ use crate::{
|
|||
finalized_state::{FinalizedState, ZebraDb},
|
||||
non_finalized_state::{Chain, NonFinalizedState, QueuedBlocks},
|
||||
pending_utxos::PendingUtxos,
|
||||
watch_receiver::WatchReceiver,
|
||||
},
|
||||
BoxError, CloneError, CommitBlockError, Config, FinalizedBlock, PreparedBlock, Request,
|
||||
Response, ValidateContextError,
|
||||
|
|
@ -49,6 +50,7 @@ use crate::{
|
|||
|
||||
pub mod block_iter;
|
||||
pub mod chain_tip;
|
||||
pub mod watch_receiver;
|
||||
|
||||
pub(crate) mod check;
|
||||
|
||||
|
|
@ -144,7 +146,7 @@ pub struct ReadStateService {
|
|||
///
|
||||
/// This chain is only updated between requests,
|
||||
/// so it might include some block data that is also on `disk`.
|
||||
best_chain_receiver: watch::Receiver<Option<Arc<Chain>>>,
|
||||
best_chain_receiver: WatchReceiver<Option<Arc<Chain>>>,
|
||||
|
||||
/// The configured Zcash network.
|
||||
network: Network,
|
||||
|
|
@ -290,27 +292,40 @@ impl StateService {
|
|||
);
|
||||
self.queued_blocks.prune_by_height(finalized_tip_height);
|
||||
|
||||
let best_chain = self.mem.best_chain();
|
||||
let tip_block = best_chain
|
||||
.and_then(|chain| chain.tip_block())
|
||||
.cloned()
|
||||
.map(ChainTipBlock::from);
|
||||
let tip_block_height = self.update_latest_chain_channels();
|
||||
|
||||
// update metrics using the best non-finalized tip
|
||||
if let Some(tip_block) = tip_block.as_ref() {
|
||||
if let Some(tip_block_height) = tip_block_height {
|
||||
metrics::gauge!(
|
||||
"state.full_verifier.committed.block.height",
|
||||
tip_block.height.0 as _
|
||||
tip_block_height.0 as _
|
||||
);
|
||||
|
||||
// This height gauge is updated for both fully verified and checkpoint blocks.
|
||||
// These updates can't conflict, because the state makes sure that blocks
|
||||
// are committed in order.
|
||||
metrics::gauge!("zcash.chain.verified.block.height", tip_block.height.0 as _);
|
||||
metrics::gauge!("zcash.chain.verified.block.height", tip_block_height.0 as _);
|
||||
}
|
||||
|
||||
// update the chain watch channels
|
||||
tracing::trace!("finished processing queued block");
|
||||
rsp_rx
|
||||
}
|
||||
|
||||
/// Update the [`LatestChainTip`], [`ChainTipChange`], and [`LatestChain`] channels
|
||||
/// with the latest non-finalized [`ChainTipBlock`] and [`Chain`].
|
||||
///
|
||||
/// Returns the latest non-finalized chain tip height,
|
||||
/// or `None` if the non-finalized state is empty.
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
fn update_latest_chain_channels(&mut self) -> Option<block::Height> {
|
||||
let best_chain = self.mem.best_chain();
|
||||
let tip_block = best_chain
|
||||
.and_then(|chain| chain.tip_block())
|
||||
.cloned()
|
||||
.map(ChainTipBlock::from);
|
||||
let tip_block_height = tip_block.as_ref().map(|block| block.height);
|
||||
|
||||
// The RPC service uses the ReadStateService, but it is not turned on by default.
|
||||
if self.best_chain_sender.receiver_count() > 0 {
|
||||
// If the final receiver was just dropped, ignore the error.
|
||||
let _ = self.best_chain_sender.send(best_chain.cloned());
|
||||
|
|
@ -318,8 +333,7 @@ impl StateService {
|
|||
|
||||
self.chain_tip_sender.set_best_non_finalized_tip(tip_block);
|
||||
|
||||
tracing::trace!("finished processing queued block");
|
||||
rsp_rx
|
||||
tip_block_height
|
||||
}
|
||||
|
||||
/// Run contextual validation on the prepared block and add it to the
|
||||
|
|
@ -444,8 +458,8 @@ impl StateService {
|
|||
Some(tip.0 - height.0)
|
||||
}
|
||||
|
||||
/// Return the block identified by either its `height` or `hash`,
|
||||
/// if it exists in the current best chain.
|
||||
/// Returns the [`Block`] with [`Hash`](zebra_chain::block::Hash) or
|
||||
/// [`Height`](zebra_chain::block::Height), if it exists in the current best chain.
|
||||
pub fn best_block(&self, hash_or_height: HashOrHeight) -> Option<Arc<Block>> {
|
||||
read::block(self.mem.best_chain(), self.disk.db(), hash_or_height)
|
||||
}
|
||||
|
|
@ -675,7 +689,7 @@ impl ReadStateService {
|
|||
|
||||
let read_only_service = Self {
|
||||
db: disk.db().clone(),
|
||||
best_chain_receiver,
|
||||
best_chain_receiver: WatchReceiver::new(best_chain_receiver),
|
||||
network: disk.network(),
|
||||
};
|
||||
|
||||
|
|
@ -849,12 +863,11 @@ impl Service<Request> for ReadStateService {
|
|||
let state = self.clone();
|
||||
|
||||
async move {
|
||||
Ok(read::block(
|
||||
state.best_chain_receiver.borrow().clone().as_ref(),
|
||||
&state.db,
|
||||
hash_or_height,
|
||||
))
|
||||
.map(Response::Block)
|
||||
let block = state.best_chain_receiver.with_watch_data(|best_chain| {
|
||||
read::block(best_chain, &state.db, hash_or_height)
|
||||
});
|
||||
|
||||
Ok(Response::Block(block))
|
||||
}
|
||||
.boxed()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -23,7 +23,9 @@ use zebra_chain::{
|
|||
transaction,
|
||||
};
|
||||
|
||||
use crate::{request::ContextuallyValidBlock, FinalizedBlock};
|
||||
use crate::{
|
||||
request::ContextuallyValidBlock, service::watch_receiver::WatchReceiver, FinalizedBlock,
|
||||
};
|
||||
|
||||
use TipAction::*;
|
||||
|
||||
|
|
@ -154,6 +156,9 @@ impl ChainTipSender {
|
|||
/// Update the latest finalized tip.
|
||||
///
|
||||
/// May trigger an update to the best tip.
|
||||
//
|
||||
// TODO: when we replace active_value with `watch::Sender::borrow`,
|
||||
// refactor instrument to avoid multiple borrows, to prevent deadlocks
|
||||
#[instrument(
|
||||
skip(self, new_tip),
|
||||
fields(
|
||||
|
|
@ -175,6 +180,9 @@ impl ChainTipSender {
|
|||
/// Update the latest non-finalized tip.
|
||||
///
|
||||
/// May trigger an update to the best tip.
|
||||
//
|
||||
// TODO: when we replace active_value with `watch::Sender::borrow`,
|
||||
// refactor instrument to avoid multiple borrows, to prevent deadlocks
|
||||
#[instrument(
|
||||
skip(self, new_tip),
|
||||
fields(
|
||||
|
|
@ -250,65 +258,67 @@ impl ChainTipSender {
|
|||
#[derive(Clone, Debug)]
|
||||
pub struct LatestChainTip {
|
||||
/// The receiver for the current chain tip's data.
|
||||
receiver: watch::Receiver<ChainTipData>,
|
||||
receiver: WatchReceiver<ChainTipData>,
|
||||
}
|
||||
|
||||
impl LatestChainTip {
|
||||
/// Create a new [`LatestChainTip`] from a watch channel receiver.
|
||||
fn new(receiver: watch::Receiver<ChainTipData>) -> Self {
|
||||
Self { receiver }
|
||||
Self {
|
||||
receiver: WatchReceiver::new(receiver),
|
||||
}
|
||||
}
|
||||
|
||||
/// Retrieve a result `R` from the current [`ChainTipBlock`], if it's available.
|
||||
/// Maps the current data `ChainTipData` to `Option<U>`
|
||||
/// by applying a function to the watched value,
|
||||
/// while holding the receiver lock as briefly as possible.
|
||||
///
|
||||
/// This helper method is a shorter way to borrow the value from the [`watch::Receiver`] and
|
||||
/// extract some information from it, while also adding the current chain tip block's fields as
|
||||
/// records to the current span.
|
||||
///
|
||||
/// A single read lock is kept during the execution of the method, and it is dropped at the end
|
||||
/// of it.
|
||||
/// A single read lock is acquired to clone `T`, and then released after the clone.
|
||||
/// See the performance note on [`WatchReceiver::with_watch_data`].
|
||||
///
|
||||
/// # Correctness
|
||||
///
|
||||
/// To prevent deadlocks:
|
||||
///
|
||||
/// - `receiver.borrow()` should not be called before this method while in the same scope.
|
||||
/// - `receiver.borrow()` should not be called inside the `action` closure.
|
||||
///
|
||||
/// It is important to avoid calling `borrow` more than once in the same scope, which
|
||||
/// effectively tries to acquire two read locks to the shared data in the watch channel. If
|
||||
/// that is done, there's a chance that the [`watch::Sender`] tries to send a value, which
|
||||
/// starts acquiring a write-lock, and prevents further read-locks from being acquired until
|
||||
/// the update is finished.
|
||||
///
|
||||
/// What can happen in that scenario is:
|
||||
///
|
||||
/// 1. The receiver manages to acquire a read-lock for the first `borrow`
|
||||
/// 2. The sender starts acquiring the write-lock
|
||||
/// 3. The receiver fails to acquire a read-lock for the second `borrow`
|
||||
///
|
||||
/// Now both the sender and the receivers hang, because the sender won't release the lock until
|
||||
/// it can update the value, and the receiver won't release its first read-lock until it
|
||||
/// acquires the second read-lock and finishes what it's doing.
|
||||
fn with_chain_tip_block<R>(&self, action: impl FnOnce(&ChainTipBlock) -> R) -> Option<R> {
|
||||
/// To avoid deadlocks, see the correctness note on [`WatchReceiver::with_watch_data`].
|
||||
fn with_chain_tip_block<U, F>(&self, f: F) -> Option<U>
|
||||
where
|
||||
F: FnOnce(&ChainTipBlock) -> U,
|
||||
{
|
||||
let span = tracing::Span::current();
|
||||
let borrow_guard = self.receiver.borrow();
|
||||
let chain_tip_block = borrow_guard.as_ref();
|
||||
|
||||
span.record(
|
||||
"height",
|
||||
&tracing::field::debug(chain_tip_block.map(|block| block.height)),
|
||||
);
|
||||
span.record(
|
||||
"hash",
|
||||
&tracing::field::debug(chain_tip_block.map(|block| block.hash)),
|
||||
);
|
||||
span.record(
|
||||
"transaction_count",
|
||||
&tracing::field::debug(chain_tip_block.map(|block| block.transaction_hashes.len())),
|
||||
);
|
||||
let register_span_fields = |chain_tip_block: Option<&ChainTipBlock>| {
|
||||
span.record(
|
||||
"height",
|
||||
&tracing::field::debug(chain_tip_block.map(|block| block.height)),
|
||||
);
|
||||
span.record(
|
||||
"hash",
|
||||
&tracing::field::debug(chain_tip_block.map(|block| block.hash)),
|
||||
);
|
||||
span.record(
|
||||
"time",
|
||||
&tracing::field::debug(chain_tip_block.map(|block| block.time)),
|
||||
);
|
||||
span.record(
|
||||
"previous_hash",
|
||||
&tracing::field::debug(chain_tip_block.map(|block| block.previous_block_hash)),
|
||||
);
|
||||
span.record(
|
||||
"transaction_count",
|
||||
&tracing::field::debug(chain_tip_block.map(|block| block.transaction_hashes.len())),
|
||||
);
|
||||
};
|
||||
|
||||
chain_tip_block.map(action)
|
||||
self.receiver.with_watch_data(|chain_tip_block| {
|
||||
// TODO: replace with Option::inspect when it stabilises
|
||||
// https://github.com/rust-lang/rust/issues/91345
|
||||
register_span_fields(chain_tip_block.as_ref());
|
||||
|
||||
chain_tip_block.as_ref().map(f)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -69,7 +69,8 @@ impl ZebraDb {
|
|||
self.db.zs_get(height_by_hash, &hash)
|
||||
}
|
||||
|
||||
/// Returns the given block if it exists.
|
||||
/// Returns the [`Block`] with [`Hash`](zebra_chain::block::Hash) or
|
||||
/// [`Height`](zebra_chain::block::Height), if it exists in the finalized chain.
|
||||
pub fn block(&self, hash_or_height: HashOrHeight) -> Option<Arc<Block>> {
|
||||
let height_by_hash = self.db.cf_handle("height_by_hash").unwrap();
|
||||
let block_by_height = self.db.cf_handle("block_by_height").unwrap();
|
||||
|
|
|
|||
|
|
@ -318,7 +318,8 @@ impl Chain {
|
|||
Ok(Some(forked))
|
||||
}
|
||||
|
||||
/// Returns the [`ContextuallyValidBlock`] at a given height or hash in this chain.
|
||||
/// Returns the [`ContextuallyValidBlock`] with [`Hash`](zebra_chain::block::Hash) or
|
||||
/// [`Height`](zebra_chain::block::Height), if it exists in this chain.
|
||||
pub fn block(&self, hash_or_height: HashOrHeight) -> Option<&ContextuallyValidBlock> {
|
||||
let height =
|
||||
hash_or_height.height_or_else(|hash| self.height_by_hash.get(&hash).cloned())?;
|
||||
|
|
|
|||
|
|
@ -13,15 +13,28 @@ use crate::{
|
|||
HashOrHeight,
|
||||
};
|
||||
|
||||
/// Return the block identified by either its `height` or `hash` if it exists
|
||||
/// in the non-finalized `chain` or finalized `db`.
|
||||
pub(crate) fn block(
|
||||
chain: Option<&Arc<Chain>>,
|
||||
/// Returns the [`Block`] with [`Hash`](zebra_chain::block::Hash) or
|
||||
/// [`Height`](zebra_chain::block::Height),
|
||||
/// if it exists in the non-finalized `chain` or finalized `db`.
|
||||
pub(crate) fn block<C>(
|
||||
chain: Option<C>,
|
||||
db: &ZebraDb,
|
||||
hash_or_height: HashOrHeight,
|
||||
) -> Option<Arc<Block>> {
|
||||
) -> Option<Arc<Block>>
|
||||
where
|
||||
C: AsRef<Chain>,
|
||||
{
|
||||
// # Correctness
|
||||
//
|
||||
// The StateService commits blocks to the finalized state before updating the latest chain,
|
||||
// and it can commit additional blocks after we've cloned this `chain` variable.
|
||||
//
|
||||
// Since blocks are the same in the finalized and non-finalized state,
|
||||
// we check the most efficient alternative first.
|
||||
// (`chain` is always in memory, but `db` stores blocks on disk, with a memory cache.)
|
||||
chain
|
||||
.and_then(|chain| chain.block(hash_or_height))
|
||||
.as_ref()
|
||||
.and_then(|chain| chain.as_ref().block(hash_or_height))
|
||||
.map(|contextual| contextual.block.clone())
|
||||
.or_else(|| db.block(hash_or_height))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,99 @@
|
|||
//! Shared [`tokio::sync::watch`] channel wrappers.
|
||||
//!
|
||||
//! These wrappers help use watch channels correctly.
|
||||
|
||||
use tokio::sync::watch;
|
||||
|
||||
/// Efficient access to state data via a [`tokio`] [`watch::Receiver`] channel,
|
||||
/// while avoiding deadlocks.
|
||||
///
|
||||
/// Returns data from the most recent state,
|
||||
/// regardless of how many times you call its methods.
|
||||
///
|
||||
/// Cloned instances provide identical state data.
|
||||
///
|
||||
/// # Correctness
|
||||
///
|
||||
/// To avoid deadlocks, see the correctness note on [`WatchReceiver::with_watch_data`].
|
||||
///
|
||||
/// # Note
|
||||
///
|
||||
/// If a lot of blocks are committed at the same time,
|
||||
/// the watch chanel will skip some block updates,
|
||||
/// even though those updates were committed to the state.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct WatchReceiver<T> {
|
||||
/// The receiver for the current state data.
|
||||
receiver: watch::Receiver<T>,
|
||||
}
|
||||
|
||||
impl<T> WatchReceiver<T> {
|
||||
/// Create a new [`WatchReceiver`] from a watch channel receiver.
|
||||
pub fn new(receiver: watch::Receiver<T>) -> Self {
|
||||
Self { receiver }
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> WatchReceiver<T>
|
||||
where
|
||||
T: Clone,
|
||||
{
|
||||
/// Maps the current data `T` to `U` by applying a function to the watched value,
|
||||
/// while holding the receiver lock as briefly as possible.
|
||||
///
|
||||
/// This helper method is a shorter way to borrow the value from the [`watch::Receiver`] and
|
||||
/// extract some information from it.
|
||||
///
|
||||
/// # Performance
|
||||
///
|
||||
/// A single read lock is acquired to clone `T`, and then released after the clone.
|
||||
/// To make this clone efficient, large or expensive `T` can be wrapped in an [`Arc`].
|
||||
/// (Or individual fields can be wrapped in an `Arc`.)
|
||||
///
|
||||
/// # Correctness
|
||||
///
|
||||
/// To prevent deadlocks:
|
||||
///
|
||||
/// - `receiver.borrow()` should not be called before this method while in the same scope.
|
||||
///
|
||||
/// It is important to avoid calling `borrow` more than once in the same scope, which
|
||||
/// effectively tries to acquire two read locks to the shared data in the watch channel. If
|
||||
/// that is done, there's a chance that the [`watch::Sender`] tries to send a value, which
|
||||
/// starts acquiring a write-lock, and prevents further read-locks from being acquired until
|
||||
/// the update is finished.
|
||||
///
|
||||
/// What can happen in that scenario is:
|
||||
///
|
||||
/// 1. The receiver manages to acquire a read-lock for the first `borrow`
|
||||
/// 2. The sender starts acquiring the write-lock
|
||||
/// 3. The receiver fails to acquire a read-lock for the second `borrow`
|
||||
///
|
||||
/// Now both the sender and the receivers hang, because the sender won't release the lock until
|
||||
/// it can update the value, and the receiver won't release its first read-lock until it
|
||||
/// acquires the second read-lock and finishes what it's doing.
|
||||
pub fn with_watch_data<U, F>(&self, f: F) -> U
|
||||
where
|
||||
F: FnOnce(T) -> U,
|
||||
{
|
||||
// Make sure that the borrow's watch channel read lock
|
||||
// is dropped before the closure is executed.
|
||||
//
|
||||
// Without this change, an eager reader can repeatedly block the channel writer.
|
||||
// This seems to happen easily in RPC & ReadStateService futures.
|
||||
// (For example, when lightwalletd syncs from Zebra, while Zebra syncs from peers.)
|
||||
let cloned_data = {
|
||||
let borrow_guard = self.receiver.borrow();
|
||||
let cloned_data = borrow_guard.clone();
|
||||
std::mem::drop(borrow_guard);
|
||||
|
||||
cloned_data
|
||||
};
|
||||
|
||||
f(cloned_data)
|
||||
}
|
||||
|
||||
/// Calls [`watch::Receiver::changed`] and returns the result.
|
||||
pub async fn changed(&mut self) -> Result<(), watch::error::RecvError> {
|
||||
self.receiver.changed().await
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue