diff --git a/zebra-chain/Cargo.toml b/zebra-chain/Cargo.toml index 6a31534c..2e15b23d 100644 --- a/zebra-chain/Cargo.toml +++ b/zebra-chain/Cargo.toml @@ -56,6 +56,7 @@ humantime = "2.1.0" displaydoc = "0.2.3" static_assertions = "1.1.0" thiserror = "1.0.31" +tracing = "0.1.31" # Serialization hex = { version = "0.4.3", features = ["serde"] } diff --git a/zebra-chain/src/diagnostic.rs b/zebra-chain/src/diagnostic.rs new file mode 100644 index 00000000..2b41f647 --- /dev/null +++ b/zebra-chain/src/diagnostic.rs @@ -0,0 +1,120 @@ +//! Tracing the execution time of functions. +//! +//! TODO: also trace polling time for futures, using a `Future` wrapper + +use std::time::{Duration, Instant}; + +use crate::fmt::duration_short; + +/// The default minimum info-level message time. +pub const DEFAULT_MIN_INFO_TIME: Duration = Duration::from_secs(5); + +/// The default minimum warning message time. +pub const DEFAULT_MIN_WARN_TIME: Duration = Duration::from_secs(20); + +/// A guard that logs code execution time when dropped. +#[derive(Debug)] +pub struct CodeTimer { + /// The time that the code started executing. + start: Instant, + + /// The minimum duration for info-level messages. + min_info_time: Duration, + + /// The minimum duration for warning messages. + min_warn_time: Duration, + + /// `true` if this timer has finished. + has_finished: bool, +} + +impl CodeTimer { + /// Start timing the execution of a function, method, or other code region. + /// + /// Returns a guard that finishes timing the code when dropped, + /// or when [`CodeTimer::finish()`] is called. + #[track_caller] + pub fn start() -> Self { + let start = Instant::now(); + trace!(?start, "starting code timer"); + + Self { + start, + min_info_time: DEFAULT_MIN_INFO_TIME, + min_warn_time: DEFAULT_MIN_WARN_TIME, + has_finished: false, + } + } + + /// Finish timing the execution of a function, method, or other code region. + #[track_caller] + pub fn finish( + mut self, + module_path: &'static str, + line: u32, + description: impl Into>, + ) where + S: ToString, + { + self.finish_inner(Some(module_path), Some(line), description); + } + + /// Finish timing the execution of a function, method, or other code region. + /// + /// This private method can be called from [`CodeTimer::finish()`] or `drop()`. + #[track_caller] + fn finish_inner( + &mut self, + module_path: impl Into>, + line: impl Into>, + description: impl Into>, + ) where + S: ToString, + { + if self.has_finished { + return; + } + + self.has_finished = true; + + let execution = self.start.elapsed(); + let execution_time = duration_short(execution); + + let module_path = module_path.into(); + let line = line.into(); + let description = description + .into() + .map(|desc| desc.to_string() + " ") + .unwrap_or_default(); + + if execution >= self.min_warn_time { + warn!( + ?execution_time, + ?module_path, + ?line, + "{description}code took a long time to execute", + ); + } else if execution >= self.min_info_time { + info!( + ?execution_time, + ?module_path, + ?line, + "{description}code took longer than expected to execute", + ); + } else { + trace!( + ?execution_time, + ?module_path, + ?line, + "{description}code timer finished", + ); + } + } +} + +impl Drop for CodeTimer { + #[track_caller] + fn drop(&mut self) { + self.finish_inner(None, None, "(dropped, cancelled, or aborted)") + } +} diff --git a/zebra-chain/src/fmt.rs b/zebra-chain/src/fmt.rs index e5fa646c..aaacf973 100644 --- a/zebra-chain/src/fmt.rs +++ b/zebra-chain/src/fmt.rs @@ -8,7 +8,8 @@ use proptest::prelude::*; use proptest_derive::Arbitrary; pub mod time; -pub use time::{humantime_milliseconds, humantime_seconds}; + +pub use time::{duration_short, humantime_milliseconds, humantime_seconds}; /// Wrapper to override `Debug`, redirecting it to only output the type's name. #[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] diff --git a/zebra-chain/src/fmt/time.rs b/zebra-chain/src/fmt/time.rs index 63a0b0ba..4e8a3c34 100644 --- a/zebra-chain/src/fmt/time.rs +++ b/zebra-chain/src/fmt/time.rs @@ -2,6 +2,22 @@ use std::time::Duration; +/// The minimum amount of time displayed with only seconds (no milliseconds). +pub const MIN_SECONDS_ONLY_TIME: Duration = Duration::from_secs(5); + +/// Returns a human-friendly formatted string for the whole number of seconds in `duration`. +pub fn duration_short(duration: impl Into) -> String { + let duration = duration.into(); + + if duration >= MIN_SECONDS_ONLY_TIME { + humantime_seconds(duration) + } else { + humantime_milliseconds(duration) + } +} + +// TODO: rename these functions to duration_* + /// Returns a human-friendly formatted string for the whole number of seconds in `duration`. pub fn humantime_seconds(duration: impl Into) -> String { let duration = duration.into(); diff --git a/zebra-chain/src/lib.rs b/zebra-chain/src/lib.rs index a0bd3da1..a9d20dc2 100644 --- a/zebra-chain/src/lib.rs +++ b/zebra-chain/src/lib.rs @@ -9,15 +9,19 @@ // Required by bitvec! macro #![recursion_limit = "256"] +#[macro_use] +extern crate bitflags; + #[macro_use] extern crate serde; #[macro_use] -extern crate bitflags; +extern crate tracing; pub mod amount; pub mod block; pub mod chain_tip; +pub mod diagnostic; pub mod fmt; pub mod history_tree; pub mod orchard; diff --git a/zebra-state/src/service.rs b/zebra-state/src/service.rs index d7e478ac..8d440a25 100644 --- a/zebra-state/src/service.rs +++ b/zebra-state/src/service.rs @@ -27,13 +27,14 @@ use std::{ use futures::future::FutureExt; use tokio::sync::{oneshot, watch}; use tower::{util::BoxService, Service}; -use tracing::instrument; +use tracing::{instrument, Instrument, Span}; #[cfg(any(test, feature = "proptest-impl"))] use tower::buffer::Buffer; use zebra_chain::{ block, + diagnostic::CodeTimer, parameters::{Network, NetworkUpgrade}, transparent, }; @@ -166,12 +167,19 @@ impl StateService { config: Config, network: Network, ) -> (Self, ReadStateService, LatestChainTip, ChainTipChange) { + let timer = CodeTimer::start(); let disk = FinalizedState::new(&config, network); + timer.finish(module_path!(), line!(), "opening finalized state database"); + + let timer = CodeTimer::start(); let initial_tip = disk .db() .tip_block() .map(FinalizedBlock::from) .map(ChainTipBlock::from); + timer.finish(module_path!(), line!(), "fetching database tip"); + + let timer = CodeTimer::start(); let (chain_tip_sender, latest_chain_tip, chain_tip_change) = ChainTipSender::new(initial_tip, network); @@ -192,8 +200,11 @@ impl StateService { chain_tip_sender, best_chain_sender, }; + timer.finish(module_path!(), line!(), "initializing state service"); tracing::info!("starting legacy chain check"); + let timer = CodeTimer::start(); + if let Some(tip) = state.best_tip() { if let Some(nu5_activation_height) = NetworkUpgrade::Nu5.activation_height(network) { if check::legacy_chain( @@ -216,6 +227,7 @@ impl StateService { } } tracing::info!("no legacy chain found"); + timer.finish(module_path!(), line!(), "legacy chain check"); (state, read_only_service, latest_chain_tip, chain_tip_change) } @@ -754,6 +766,8 @@ impl Service for StateService { "type" => "commit_block", ); + let timer = CodeTimer::start(); + self.assert_block_can_be_validated(&prepared); self.pending_utxos @@ -768,10 +782,16 @@ impl Service for StateService { // Since each block is spawned into its own task, // there shouldn't be any other code running in the same task, // so we don't need to worry about blocking it: - // https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html# - let rsp_rx = - tokio::task::block_in_place(|| self.queue_and_commit_non_finalized(prepared)); + // https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html + let span = Span::current(); + let rsp_rx = tokio::task::block_in_place(move || { + span.in_scope(|| self.queue_and_commit_non_finalized(prepared)) + }); + // The work is all done, the future just waits on a channel for the result + timer.finish(module_path!(), line!(), "CommitBlock"); + + let span = Span::current(); async move { rsp_rx .await @@ -784,6 +804,7 @@ impl Service for StateService { .map(Response::Committed) .map_err(Into::into) } + .instrument(span) .boxed() } Request::CommitFinalizedBlock(finalized) => { @@ -794,6 +815,8 @@ impl Service for StateService { "type" => "commit_finalized_block", ); + let timer = CodeTimer::start(); + self.pending_utxos.check_against(&finalized.new_outputs); // # Performance @@ -802,9 +825,15 @@ impl Service for StateService { // and written to disk. // // See the note in `CommitBlock` for more details. - let rsp_rx = - tokio::task::block_in_place(|| self.queue_and_commit_finalized(finalized)); + let span = Span::current(); + let rsp_rx = tokio::task::block_in_place(move || { + span.in_scope(|| self.queue_and_commit_finalized(finalized)) + }); + // The work is all done, the future just waits on a channel for the result + timer.finish(module_path!(), line!(), "CommitFinalizedBlock"); + + let span = Span::current(); async move { rsp_rx .await @@ -819,6 +848,7 @@ impl Service for StateService { .map(Response::Committed) .map_err(Into::into) } + .instrument(span) .boxed() } Request::Depth(hash) => { @@ -829,7 +859,14 @@ impl Service for StateService { "type" => "depth", ); + let timer = CodeTimer::start(); + + // TODO: move this work into the future, like Block and Transaction? let rsp = Ok(Response::Depth(self.best_depth(hash))); + + // The work is all done, the future just returns the result. + timer.finish(module_path!(), line!(), "Depth"); + async move { rsp }.boxed() } // TODO: consider spawning small reads into blocking tasks, @@ -842,7 +879,14 @@ impl Service for StateService { "type" => "tip", ); + let timer = CodeTimer::start(); + + // TODO: move this work into the future, like Block and Transaction? let rsp = Ok(Response::Tip(self.best_tip())); + + // The work is all done, the future just returns the result. + timer.finish(module_path!(), line!(), "Tip"); + async move { rsp }.boxed() } Request::BlockLocator => { @@ -853,9 +897,16 @@ impl Service for StateService { "type" => "block_locator", ); + let timer = CodeTimer::start(); + + // TODO: move this work into the future, like Block and Transaction? let rsp = Ok(Response::BlockLocator( self.block_locator().unwrap_or_default(), )); + + // The work is all done, the future just returns the result. + timer.finish(module_path!(), line!(), "BlockLocator"); + async move { rsp }.boxed() } Request::Transaction(hash) => { @@ -866,6 +917,8 @@ impl Service for StateService { "type" => "transaction", ); + let timer = CodeTimer::start(); + // Prepare data for concurrent execution let best_chain = self.mem.best_chain().cloned(); let db = self.disk.db().clone(); @@ -873,10 +926,16 @@ impl Service for StateService { // # Performance // // Allow other async tasks to make progress while the transaction is being read from disk. + let span = Span::current(); tokio::task::spawn_blocking(move || { - let rsp = read::transaction(best_chain, &db, hash); + span.in_scope(|| { + let rsp = read::transaction(best_chain, &db, hash); - Ok(Response::Transaction(rsp.map(|(tx, _height)| tx))) + // The work is done in the future. + timer.finish(module_path!(), line!(), "Transaction"); + + Ok(Response::Transaction(rsp.map(|(tx, _height)| tx))) + }) }) .map(|join_result| join_result.expect("panic in Request::Transaction")) .boxed() @@ -889,6 +948,8 @@ impl Service for StateService { "type" => "block", ); + let timer = CodeTimer::start(); + // Prepare data for concurrent execution let best_chain = self.mem.best_chain().cloned(); let db = self.disk.db().clone(); @@ -896,10 +957,16 @@ impl Service for StateService { // # Performance // // Allow other async tasks to make progress while the block is being read from disk. + let span = Span::current(); tokio::task::spawn_blocking(move || { - let rsp = read::block(best_chain, &db, hash_or_height); + span.in_scope(move || { + let rsp = read::block(best_chain, &db, hash_or_height); - Ok(Response::Block(rsp)) + // The work is done in the future. + timer.finish(module_path!(), line!(), "Block"); + + Ok(Response::Block(rsp)) + }) }) .map(|join_result| join_result.expect("panic in Request::Block")) .boxed() @@ -912,13 +979,19 @@ impl Service for StateService { "type" => "await_utxo", ); + let timer = CodeTimer::start(); + let span = Span::current(); + let fut = self.pending_utxos.queue(outpoint); if let Some(utxo) = self.any_utxo(&outpoint) { self.pending_utxos.respond(&outpoint, utxo); } - fut.boxed() + // The future waits on a channel for a response. + timer.finish(module_path!(), line!(), "AwaitUtxo"); + + fut.instrument(span).boxed() } Request::FindBlockHashes { known_blocks, stop } => { metrics::counter!( @@ -929,8 +1002,16 @@ impl Service for StateService { ); const MAX_FIND_BLOCK_HASHES_RESULTS: usize = 500; + + let timer = CodeTimer::start(); + + // TODO: move this work into the future, like Block and Transaction? let res = self.find_best_chain_hashes(known_blocks, stop, MAX_FIND_BLOCK_HASHES_RESULTS); + + // The work is all done, the future just returns the result. + timer.finish(module_path!(), line!(), "FindBlockHashes"); + async move { Ok(Response::BlockHashes(res)) }.boxed() } Request::FindBlockHeaders { known_blocks, stop } => { @@ -951,6 +1032,11 @@ impl Service for StateService { // // https://github.com/bitcoin/bitcoin/pull/4468/files#r17026905 let count = MAX_FIND_BLOCK_HEADERS_RESULTS - 2; + + let timer = CodeTimer::start(); + + // TODO: move this work into the future, like Block and Transaction? + // return heights instead, to improve lookup performance? let res = self.find_best_chain_hashes(known_blocks, stop, count); // And prepare data for concurrent execution @@ -961,18 +1047,25 @@ impl Service for StateService { // // Now we have the chain hashes, we can read the headers concurrently, // which allows other async tasks to make progress while data is being read from disk. + let span = Span::current(); tokio::task::spawn_blocking(move || { - let res = res - .iter() - .map(|&hash| { - let header = read::block_header(best_chain.clone(), &db, hash.into()) - .expect("block header for found hash is in the best chain"); + span.in_scope(move || { + let res = res + .iter() + .map(|&hash| { + let header = + read::block_header(best_chain.clone(), &db, hash.into()) + .expect("block header for found hash is in the best chain"); - block::CountedHeader { header } - }) - .collect(); + block::CountedHeader { header } + }) + .collect(); - Ok(Response::BlockHeaders(res)) + // Some of the work is done in the future. + timer.finish(module_path!(), line!(), "FindBlockHeaders"); + + Ok(Response::BlockHeaders(res)) + }) }) .map(|join_result| join_result.expect("panic in Request::FindBlockHeaders")) .boxed() @@ -1003,17 +1096,25 @@ impl Service for ReadStateService { "type" => "block", ); + let timer = CodeTimer::start(); + let state = self.clone(); // # Performance // // Allow other async tasks to make progress while concurrently reading blocks from disk. + let span = Span::current(); tokio::task::spawn_blocking(move || { - let block = state.best_chain_receiver.with_watch_data(|best_chain| { - read::block(best_chain, &state.db, hash_or_height) - }); + span.in_scope(move || { + let block = state.best_chain_receiver.with_watch_data(|best_chain| { + read::block(best_chain, &state.db, hash_or_height) + }); - Ok(ReadResponse::Block(block)) + // The work is done in the future. + timer.finish(module_path!(), line!(), "ReadRequest::Block"); + + Ok(ReadResponse::Block(block)) + }) }) .map(|join_result| join_result.expect("panic in ReadRequest::Block")) .boxed() @@ -1028,18 +1129,26 @@ impl Service for ReadStateService { "type" => "transaction", ); + let timer = CodeTimer::start(); + let state = self.clone(); // # Performance // // Allow other async tasks to make progress while concurrently reading transactions from disk. + let span = Span::current(); tokio::task::spawn_blocking(move || { - let transaction_and_height = - state.best_chain_receiver.with_watch_data(|best_chain| { - read::transaction(best_chain, &state.db, hash) - }); + span.in_scope(move || { + let transaction_and_height = + state.best_chain_receiver.with_watch_data(|best_chain| { + read::transaction(best_chain, &state.db, hash) + }); - Ok(ReadResponse::Transaction(transaction_and_height)) + // The work is done in the future. + timer.finish(module_path!(), line!(), "ReadRequest::Transaction"); + + Ok(ReadResponse::Transaction(transaction_and_height)) + }) }) .map(|join_result| join_result.expect("panic in ReadRequest::Transaction")) .boxed() @@ -1053,17 +1162,26 @@ impl Service for ReadStateService { "type" => "sapling_tree", ); + let timer = CodeTimer::start(); + let state = self.clone(); // # Performance // // Allow other async tasks to make progress while concurrently reading trees from disk. + let span = Span::current(); tokio::task::spawn_blocking(move || { - let sapling_tree = state.best_chain_receiver.with_watch_data(|best_chain| { - read::sapling_tree(best_chain, &state.db, hash_or_height) - }); + span.in_scope(move || { + let sapling_tree = + state.best_chain_receiver.with_watch_data(|best_chain| { + read::sapling_tree(best_chain, &state.db, hash_or_height) + }); - Ok(ReadResponse::SaplingTree(sapling_tree)) + // The work is done in the future. + timer.finish(module_path!(), line!(), "ReadRequest::SaplingTree"); + + Ok(ReadResponse::SaplingTree(sapling_tree)) + }) }) .map(|join_result| join_result.expect("panic in ReadRequest::SaplingTree")) .boxed() @@ -1077,17 +1195,26 @@ impl Service for ReadStateService { "type" => "orchard_tree", ); + let timer = CodeTimer::start(); + let state = self.clone(); // # Performance // // Allow other async tasks to make progress while concurrently reading trees from disk. + let span = Span::current(); tokio::task::spawn_blocking(move || { - let orchard_tree = state.best_chain_receiver.with_watch_data(|best_chain| { - read::orchard_tree(best_chain, &state.db, hash_or_height) - }); + span.in_scope(move || { + let orchard_tree = + state.best_chain_receiver.with_watch_data(|best_chain| { + read::orchard_tree(best_chain, &state.db, hash_or_height) + }); - Ok(ReadResponse::OrchardTree(orchard_tree)) + // The work is done in the future. + timer.finish(module_path!(), line!(), "ReadRequest::OrchardTree"); + + Ok(ReadResponse::OrchardTree(orchard_tree)) + }) }) .map(|join_result| join_result.expect("panic in ReadRequest::OrchardTree")) .boxed() @@ -1105,17 +1232,29 @@ impl Service for ReadStateService { "type" => "transaction_ids_by_addresses", ); + let timer = CodeTimer::start(); + let state = self.clone(); // # Performance // // Allow other async tasks to make progress while concurrently reading transaction IDs from disk. + let span = Span::current(); tokio::task::spawn_blocking(move || { - let tx_ids = state.best_chain_receiver.with_watch_data(|best_chain| { - read::transparent_tx_ids(best_chain, &state.db, addresses, height_range) - }); + span.in_scope(move || { + let tx_ids = state.best_chain_receiver.with_watch_data(|best_chain| { + read::transparent_tx_ids(best_chain, &state.db, addresses, height_range) + }); - tx_ids.map(ReadResponse::AddressesTransactionIds) + // The work is done in the future. + timer.finish( + module_path!(), + line!(), + "ReadRequest::TransactionIdsByAddresses", + ); + + tx_ids.map(ReadResponse::AddressesTransactionIds) + }) }) .map(|join_result| { join_result.expect("panic in ReadRequest::TransactionIdsByAddresses") @@ -1132,17 +1271,25 @@ impl Service for ReadStateService { "type" => "address_balance", ); + let timer = CodeTimer::start(); + let state = self.clone(); // # Performance // // Allow other async tasks to make progress while concurrently reading balances from disk. + let span = Span::current(); tokio::task::spawn_blocking(move || { - let balance = state.best_chain_receiver.with_watch_data(|best_chain| { - read::transparent_balance(best_chain, &state.db, addresses) - })?; + span.in_scope(move || { + let balance = state.best_chain_receiver.with_watch_data(|best_chain| { + read::transparent_balance(best_chain, &state.db, addresses) + })?; - Ok(ReadResponse::AddressBalance(balance)) + // The work is done in the future. + timer.finish(module_path!(), line!(), "ReadRequest::AddressBalance"); + + Ok(ReadResponse::AddressBalance(balance)) + }) }) .map(|join_result| join_result.expect("panic in ReadRequest::AddressBalance")) .boxed() @@ -1157,17 +1304,25 @@ impl Service for ReadStateService { "type" => "utxos_by_addresses", ); + let timer = CodeTimer::start(); + let state = self.clone(); // # Performance // // Allow other async tasks to make progress while concurrently reading UTXOs from disk. + let span = Span::current(); tokio::task::spawn_blocking(move || { - let utxos = state.best_chain_receiver.with_watch_data(|best_chain| { - read::transparent_utxos(state.network, best_chain, &state.db, addresses) - }); + span.in_scope(move || { + let utxos = state.best_chain_receiver.with_watch_data(|best_chain| { + read::transparent_utxos(state.network, best_chain, &state.db, addresses) + }); - utxos.map(ReadResponse::Utxos) + // The work is done in the future. + timer.finish(module_path!(), line!(), "ReadRequest::UtxosByAddresses"); + + utxos.map(ReadResponse::Utxos) + }) }) .map(|join_result| join_result.expect("panic in ReadRequest::UtxosByAddresses")) .boxed()