From 67b367929cb46ccfa8fc378e4f7929c58a4fd437 Mon Sep 17 00:00:00 2001 From: teor Date: Thu, 17 Mar 2022 10:37:44 +1000 Subject: [PATCH] 3. fix(state): prevent watch channel deadlocks in the state (#3870) * Revert "Drop the Chain watch channel lock before accessing the finalized state" This reverts commit 8870944d1323fc31e2d009b9938623de6fcbe50f. * Add a WatchReceiver wrapper that always clones the borrowed watch data This avoids deadlocks, by holding the read lock for as short a time as possible. * Drop the shared Arcs as quickly as possible This reduces memory usage. * Make read::block more flexible, by accepting any AsRef * Make the block method docs consistent * Avoid livelocks by explicitly dropping the borrow after the clone --- zebra-state/src/service.rs | 55 +++++++---- zebra-state/src/service/chain_tip.rs | 94 ++++++++++-------- .../service/finalized_state/zebra_db/block.rs | 3 +- .../src/service/non_finalized_state/chain.rs | 3 +- zebra-state/src/service/read.rs | 25 +++-- zebra-state/src/service/watch_receiver.rs | 99 +++++++++++++++++++ 6 files changed, 208 insertions(+), 71 deletions(-) create mode 100644 zebra-state/src/service/watch_receiver.rs diff --git a/zebra-state/src/service.rs b/zebra-state/src/service.rs index 956d6300..93f384e8 100644 --- a/zebra-state/src/service.rs +++ b/zebra-state/src/service.rs @@ -42,6 +42,7 @@ use crate::{ finalized_state::{FinalizedState, ZebraDb}, non_finalized_state::{Chain, NonFinalizedState, QueuedBlocks}, pending_utxos::PendingUtxos, + watch_receiver::WatchReceiver, }, BoxError, CloneError, CommitBlockError, Config, FinalizedBlock, PreparedBlock, Request, Response, ValidateContextError, @@ -49,6 +50,7 @@ use crate::{ pub mod block_iter; pub mod chain_tip; +pub mod watch_receiver; pub(crate) mod check; @@ -144,7 +146,7 @@ pub struct ReadStateService { /// /// This chain is only updated between requests, /// so it might include some block data that is also on `disk`. - best_chain_receiver: watch::Receiver>>, + best_chain_receiver: WatchReceiver>>, /// The configured Zcash network. network: Network, @@ -290,27 +292,40 @@ impl StateService { ); self.queued_blocks.prune_by_height(finalized_tip_height); - let best_chain = self.mem.best_chain(); - let tip_block = best_chain - .and_then(|chain| chain.tip_block()) - .cloned() - .map(ChainTipBlock::from); + let tip_block_height = self.update_latest_chain_channels(); // update metrics using the best non-finalized tip - if let Some(tip_block) = tip_block.as_ref() { + if let Some(tip_block_height) = tip_block_height { metrics::gauge!( "state.full_verifier.committed.block.height", - tip_block.height.0 as _ + tip_block_height.0 as _ ); // This height gauge is updated for both fully verified and checkpoint blocks. // These updates can't conflict, because the state makes sure that blocks // are committed in order. - metrics::gauge!("zcash.chain.verified.block.height", tip_block.height.0 as _); + metrics::gauge!("zcash.chain.verified.block.height", tip_block_height.0 as _); } - // update the chain watch channels + tracing::trace!("finished processing queued block"); + rsp_rx + } + /// Update the [`LatestChainTip`], [`ChainTipChange`], and [`LatestChain`] channels + /// with the latest non-finalized [`ChainTipBlock`] and [`Chain`]. + /// + /// Returns the latest non-finalized chain tip height, + /// or `None` if the non-finalized state is empty. + #[instrument(level = "debug", skip(self))] + fn update_latest_chain_channels(&mut self) -> Option { + let best_chain = self.mem.best_chain(); + let tip_block = best_chain + .and_then(|chain| chain.tip_block()) + .cloned() + .map(ChainTipBlock::from); + let tip_block_height = tip_block.as_ref().map(|block| block.height); + + // The RPC service uses the ReadStateService, but it is not turned on by default. if self.best_chain_sender.receiver_count() > 0 { // If the final receiver was just dropped, ignore the error. let _ = self.best_chain_sender.send(best_chain.cloned()); @@ -318,8 +333,7 @@ impl StateService { self.chain_tip_sender.set_best_non_finalized_tip(tip_block); - tracing::trace!("finished processing queued block"); - rsp_rx + tip_block_height } /// Run contextual validation on the prepared block and add it to the @@ -444,8 +458,8 @@ impl StateService { Some(tip.0 - height.0) } - /// Return the block identified by either its `height` or `hash`, - /// if it exists in the current best chain. + /// Returns the [`Block`] with [`Hash`](zebra_chain::block::Hash) or + /// [`Height`](zebra_chain::block::Height), if it exists in the current best chain. pub fn best_block(&self, hash_or_height: HashOrHeight) -> Option> { read::block(self.mem.best_chain(), self.disk.db(), hash_or_height) } @@ -675,7 +689,7 @@ impl ReadStateService { let read_only_service = Self { db: disk.db().clone(), - best_chain_receiver, + best_chain_receiver: WatchReceiver::new(best_chain_receiver), network: disk.network(), }; @@ -849,12 +863,11 @@ impl Service for ReadStateService { let state = self.clone(); async move { - Ok(read::block( - state.best_chain_receiver.borrow().clone().as_ref(), - &state.db, - hash_or_height, - )) - .map(Response::Block) + let block = state.best_chain_receiver.with_watch_data(|best_chain| { + read::block(best_chain, &state.db, hash_or_height) + }); + + Ok(Response::Block(block)) } .boxed() } diff --git a/zebra-state/src/service/chain_tip.rs b/zebra-state/src/service/chain_tip.rs index 68ff60c4..7ed7d32a 100644 --- a/zebra-state/src/service/chain_tip.rs +++ b/zebra-state/src/service/chain_tip.rs @@ -23,7 +23,9 @@ use zebra_chain::{ transaction, }; -use crate::{request::ContextuallyValidBlock, FinalizedBlock}; +use crate::{ + request::ContextuallyValidBlock, service::watch_receiver::WatchReceiver, FinalizedBlock, +}; use TipAction::*; @@ -154,6 +156,9 @@ impl ChainTipSender { /// Update the latest finalized tip. /// /// May trigger an update to the best tip. + // + // TODO: when we replace active_value with `watch::Sender::borrow`, + // refactor instrument to avoid multiple borrows, to prevent deadlocks #[instrument( skip(self, new_tip), fields( @@ -175,6 +180,9 @@ impl ChainTipSender { /// Update the latest non-finalized tip. /// /// May trigger an update to the best tip. + // + // TODO: when we replace active_value with `watch::Sender::borrow`, + // refactor instrument to avoid multiple borrows, to prevent deadlocks #[instrument( skip(self, new_tip), fields( @@ -250,65 +258,67 @@ impl ChainTipSender { #[derive(Clone, Debug)] pub struct LatestChainTip { /// The receiver for the current chain tip's data. - receiver: watch::Receiver, + receiver: WatchReceiver, } impl LatestChainTip { /// Create a new [`LatestChainTip`] from a watch channel receiver. fn new(receiver: watch::Receiver) -> Self { - Self { receiver } + Self { + receiver: WatchReceiver::new(receiver), + } } - /// Retrieve a result `R` from the current [`ChainTipBlock`], if it's available. + /// Maps the current data `ChainTipData` to `Option` + /// by applying a function to the watched value, + /// while holding the receiver lock as briefly as possible. /// /// This helper method is a shorter way to borrow the value from the [`watch::Receiver`] and /// extract some information from it, while also adding the current chain tip block's fields as /// records to the current span. /// - /// A single read lock is kept during the execution of the method, and it is dropped at the end - /// of it. + /// A single read lock is acquired to clone `T`, and then released after the clone. + /// See the performance note on [`WatchReceiver::with_watch_data`]. /// /// # Correctness /// - /// To prevent deadlocks: - /// - /// - `receiver.borrow()` should not be called before this method while in the same scope. - /// - `receiver.borrow()` should not be called inside the `action` closure. - /// - /// It is important to avoid calling `borrow` more than once in the same scope, which - /// effectively tries to acquire two read locks to the shared data in the watch channel. If - /// that is done, there's a chance that the [`watch::Sender`] tries to send a value, which - /// starts acquiring a write-lock, and prevents further read-locks from being acquired until - /// the update is finished. - /// - /// What can happen in that scenario is: - /// - /// 1. The receiver manages to acquire a read-lock for the first `borrow` - /// 2. The sender starts acquiring the write-lock - /// 3. The receiver fails to acquire a read-lock for the second `borrow` - /// - /// Now both the sender and the receivers hang, because the sender won't release the lock until - /// it can update the value, and the receiver won't release its first read-lock until it - /// acquires the second read-lock and finishes what it's doing. - fn with_chain_tip_block(&self, action: impl FnOnce(&ChainTipBlock) -> R) -> Option { + /// To avoid deadlocks, see the correctness note on [`WatchReceiver::with_watch_data`]. + fn with_chain_tip_block(&self, f: F) -> Option + where + F: FnOnce(&ChainTipBlock) -> U, + { let span = tracing::Span::current(); - let borrow_guard = self.receiver.borrow(); - let chain_tip_block = borrow_guard.as_ref(); - span.record( - "height", - &tracing::field::debug(chain_tip_block.map(|block| block.height)), - ); - span.record( - "hash", - &tracing::field::debug(chain_tip_block.map(|block| block.hash)), - ); - span.record( - "transaction_count", - &tracing::field::debug(chain_tip_block.map(|block| block.transaction_hashes.len())), - ); + let register_span_fields = |chain_tip_block: Option<&ChainTipBlock>| { + span.record( + "height", + &tracing::field::debug(chain_tip_block.map(|block| block.height)), + ); + span.record( + "hash", + &tracing::field::debug(chain_tip_block.map(|block| block.hash)), + ); + span.record( + "time", + &tracing::field::debug(chain_tip_block.map(|block| block.time)), + ); + span.record( + "previous_hash", + &tracing::field::debug(chain_tip_block.map(|block| block.previous_block_hash)), + ); + span.record( + "transaction_count", + &tracing::field::debug(chain_tip_block.map(|block| block.transaction_hashes.len())), + ); + }; - chain_tip_block.map(action) + self.receiver.with_watch_data(|chain_tip_block| { + // TODO: replace with Option::inspect when it stabilises + // https://github.com/rust-lang/rust/issues/91345 + register_span_fields(chain_tip_block.as_ref()); + + chain_tip_block.as_ref().map(f) + }) } } diff --git a/zebra-state/src/service/finalized_state/zebra_db/block.rs b/zebra-state/src/service/finalized_state/zebra_db/block.rs index 4a50957a..958b3fa7 100644 --- a/zebra-state/src/service/finalized_state/zebra_db/block.rs +++ b/zebra-state/src/service/finalized_state/zebra_db/block.rs @@ -69,7 +69,8 @@ impl ZebraDb { self.db.zs_get(height_by_hash, &hash) } - /// Returns the given block if it exists. + /// Returns the [`Block`] with [`Hash`](zebra_chain::block::Hash) or + /// [`Height`](zebra_chain::block::Height), if it exists in the finalized chain. pub fn block(&self, hash_or_height: HashOrHeight) -> Option> { let height_by_hash = self.db.cf_handle("height_by_hash").unwrap(); let block_by_height = self.db.cf_handle("block_by_height").unwrap(); diff --git a/zebra-state/src/service/non_finalized_state/chain.rs b/zebra-state/src/service/non_finalized_state/chain.rs index 63ba91b1..89f873c4 100644 --- a/zebra-state/src/service/non_finalized_state/chain.rs +++ b/zebra-state/src/service/non_finalized_state/chain.rs @@ -318,7 +318,8 @@ impl Chain { Ok(Some(forked)) } - /// Returns the [`ContextuallyValidBlock`] at a given height or hash in this chain. + /// Returns the [`ContextuallyValidBlock`] with [`Hash`](zebra_chain::block::Hash) or + /// [`Height`](zebra_chain::block::Height), if it exists in this chain. pub fn block(&self, hash_or_height: HashOrHeight) -> Option<&ContextuallyValidBlock> { let height = hash_or_height.height_or_else(|hash| self.height_by_hash.get(&hash).cloned())?; diff --git a/zebra-state/src/service/read.rs b/zebra-state/src/service/read.rs index 069cf198..43e3dd59 100644 --- a/zebra-state/src/service/read.rs +++ b/zebra-state/src/service/read.rs @@ -13,15 +13,28 @@ use crate::{ HashOrHeight, }; -/// Return the block identified by either its `height` or `hash` if it exists -/// in the non-finalized `chain` or finalized `db`. -pub(crate) fn block( - chain: Option<&Arc>, +/// Returns the [`Block`] with [`Hash`](zebra_chain::block::Hash) or +/// [`Height`](zebra_chain::block::Height), +/// if it exists in the non-finalized `chain` or finalized `db`. +pub(crate) fn block( + chain: Option, db: &ZebraDb, hash_or_height: HashOrHeight, -) -> Option> { +) -> Option> +where + C: AsRef, +{ + // # Correctness + // + // The StateService commits blocks to the finalized state before updating the latest chain, + // and it can commit additional blocks after we've cloned this `chain` variable. + // + // Since blocks are the same in the finalized and non-finalized state, + // we check the most efficient alternative first. + // (`chain` is always in memory, but `db` stores blocks on disk, with a memory cache.) chain - .and_then(|chain| chain.block(hash_or_height)) + .as_ref() + .and_then(|chain| chain.as_ref().block(hash_or_height)) .map(|contextual| contextual.block.clone()) .or_else(|| db.block(hash_or_height)) } diff --git a/zebra-state/src/service/watch_receiver.rs b/zebra-state/src/service/watch_receiver.rs new file mode 100644 index 00000000..ee40eebc --- /dev/null +++ b/zebra-state/src/service/watch_receiver.rs @@ -0,0 +1,99 @@ +//! Shared [`tokio::sync::watch`] channel wrappers. +//! +//! These wrappers help use watch channels correctly. + +use tokio::sync::watch; + +/// Efficient access to state data via a [`tokio`] [`watch::Receiver`] channel, +/// while avoiding deadlocks. +/// +/// Returns data from the most recent state, +/// regardless of how many times you call its methods. +/// +/// Cloned instances provide identical state data. +/// +/// # Correctness +/// +/// To avoid deadlocks, see the correctness note on [`WatchReceiver::with_watch_data`]. +/// +/// # Note +/// +/// If a lot of blocks are committed at the same time, +/// the watch chanel will skip some block updates, +/// even though those updates were committed to the state. +#[derive(Clone, Debug)] +pub struct WatchReceiver { + /// The receiver for the current state data. + receiver: watch::Receiver, +} + +impl WatchReceiver { + /// Create a new [`WatchReceiver`] from a watch channel receiver. + pub fn new(receiver: watch::Receiver) -> Self { + Self { receiver } + } +} + +impl WatchReceiver +where + T: Clone, +{ + /// Maps the current data `T` to `U` by applying a function to the watched value, + /// while holding the receiver lock as briefly as possible. + /// + /// This helper method is a shorter way to borrow the value from the [`watch::Receiver`] and + /// extract some information from it. + /// + /// # Performance + /// + /// A single read lock is acquired to clone `T`, and then released after the clone. + /// To make this clone efficient, large or expensive `T` can be wrapped in an [`Arc`]. + /// (Or individual fields can be wrapped in an `Arc`.) + /// + /// # Correctness + /// + /// To prevent deadlocks: + /// + /// - `receiver.borrow()` should not be called before this method while in the same scope. + /// + /// It is important to avoid calling `borrow` more than once in the same scope, which + /// effectively tries to acquire two read locks to the shared data in the watch channel. If + /// that is done, there's a chance that the [`watch::Sender`] tries to send a value, which + /// starts acquiring a write-lock, and prevents further read-locks from being acquired until + /// the update is finished. + /// + /// What can happen in that scenario is: + /// + /// 1. The receiver manages to acquire a read-lock for the first `borrow` + /// 2. The sender starts acquiring the write-lock + /// 3. The receiver fails to acquire a read-lock for the second `borrow` + /// + /// Now both the sender and the receivers hang, because the sender won't release the lock until + /// it can update the value, and the receiver won't release its first read-lock until it + /// acquires the second read-lock and finishes what it's doing. + pub fn with_watch_data(&self, f: F) -> U + where + F: FnOnce(T) -> U, + { + // Make sure that the borrow's watch channel read lock + // is dropped before the closure is executed. + // + // Without this change, an eager reader can repeatedly block the channel writer. + // This seems to happen easily in RPC & ReadStateService futures. + // (For example, when lightwalletd syncs from Zebra, while Zebra syncs from peers.) + let cloned_data = { + let borrow_guard = self.receiver.borrow(); + let cloned_data = borrow_guard.clone(); + std::mem::drop(borrow_guard); + + cloned_data + }; + + f(cloned_data) + } + + /// Calls [`watch::Receiver::changed`] and returns the result. + pub async fn changed(&mut self) -> Result<(), watch::error::RecvError> { + self.receiver.changed().await + } +}