diff --git a/zebra-chain/Cargo.toml b/zebra-chain/Cargo.toml index 6a31534c..2334bade 100644 --- a/zebra-chain/Cargo.toml +++ b/zebra-chain/Cargo.toml @@ -82,6 +82,7 @@ rand_chacha = { version = "0.3.1", optional = true } tokio = { version = "1.20.0", features = ["tracing"], optional = true } zebra-test = { path = "../zebra-test/", optional = true } +rayon = "1.5.3" [dev-dependencies] diff --git a/zebra-consensus/src/chain/tests.rs b/zebra-consensus/src/chain/tests.rs index 9e8345c2..2b659866 100644 --- a/zebra-consensus/src/chain/tests.rs +++ b/zebra-consensus/src/chain/tests.rs @@ -176,7 +176,7 @@ async fn verify_checkpoint(config: Config) -> Result<(), Report> { Ok(()) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn verify_fail_no_coinbase_test() -> Result<(), Report> { verify_fail_no_coinbase().await } diff --git a/zebra-state/src/service.rs b/zebra-state/src/service.rs index 52d44163..d7e478ac 100644 --- a/zebra-state/src/service.rs +++ b/zebra-state/src/service.rs @@ -33,15 +33,12 @@ use tracing::instrument; use tower::buffer::Buffer; use zebra_chain::{ - block::{self, Block}, + block, parameters::{Network, NetworkUpgrade}, - transaction, - transaction::Transaction, transparent, }; use crate::{ - request::HashOrHeight, service::{ chain_tip::{ChainTipBlock, ChainTipChange, ChainTipSender, LatestChainTip}, finalized_state::{FinalizedState, ZebraDb}, @@ -471,24 +468,6 @@ impl StateService { Some(tip.0 - height.0) } - /// Returns the [`Block`] with [`Hash`](zebra_chain::block::Hash) or - /// [`Height`](zebra_chain::block::Height), if it exists in the current best chain. - pub fn best_block(&self, hash_or_height: HashOrHeight) -> Option> { - read::block(self.mem.best_chain(), self.disk.db(), hash_or_height) - } - - /// Returns the [`block::Header`] with [`Hash`](zebra_chain::block::Hash) or - /// [`Height`](zebra_chain::block::Height), if it exists in the current best chain. - pub fn best_block_header(&self, hash_or_height: HashOrHeight) -> Option> { - read::block_header(self.mem.best_chain(), self.disk.db(), hash_or_height) - } - - /// Returns the [`Transaction`] with [`transaction::Hash`], - /// if it exists in the current best chain. - pub fn best_transaction(&self, hash: transaction::Hash) -> Option> { - read::transaction(self.mem.best_chain(), self.disk.db(), hash).map(|(tx, _height)| tx) - } - /// Return the hash for the block at `height` in the current best chain. pub fn best_hash(&self, height: block::Height) -> Option { self.mem @@ -820,8 +799,7 @@ impl Service for StateService { // # Performance // // Allow other async tasks to make progress while blocks are being verified - // and written to disk. But wait for the blocks to finish committing, - // so that `StateService` multi-block queries always observe a consistent state. + // and written to disk. // // See the note in `CommitBlock` for more details. let rsp_rx = @@ -851,9 +829,11 @@ impl Service for StateService { "type" => "depth", ); - let rsp = Ok(self.best_depth(hash)).map(Response::Depth); + let rsp = Ok(Response::Depth(self.best_depth(hash))); async move { rsp }.boxed() } + // TODO: consider spawning small reads into blocking tasks, + // because the database can do large cleanups during small reads. Request::Tip => { metrics::counter!( "state.requests", @@ -862,7 +842,7 @@ impl Service for StateService { "type" => "tip", ); - let rsp = Ok(self.best_tip()).map(Response::Tip); + let rsp = Ok(Response::Tip(self.best_tip())); async move { rsp }.boxed() } Request::BlockLocator => { @@ -873,7 +853,9 @@ impl Service for StateService { "type" => "block_locator", ); - let rsp = Ok(self.block_locator().unwrap_or_default()).map(Response::BlockLocator); + let rsp = Ok(Response::BlockLocator( + self.block_locator().unwrap_or_default(), + )); async move { rsp }.boxed() } Request::Transaction(hash) => { @@ -884,8 +866,20 @@ impl Service for StateService { "type" => "transaction", ); - let rsp = Ok(self.best_transaction(hash)).map(Response::Transaction); - async move { rsp }.boxed() + // Prepare data for concurrent execution + let best_chain = self.mem.best_chain().cloned(); + let db = self.disk.db().clone(); + + // # Performance + // + // Allow other async tasks to make progress while the transaction is being read from disk. + tokio::task::spawn_blocking(move || { + let rsp = read::transaction(best_chain, &db, hash); + + Ok(Response::Transaction(rsp.map(|(tx, _height)| tx))) + }) + .map(|join_result| join_result.expect("panic in Request::Transaction")) + .boxed() } Request::Block(hash_or_height) => { metrics::counter!( @@ -895,8 +889,20 @@ impl Service for StateService { "type" => "block", ); - let rsp = Ok(self.best_block(hash_or_height)).map(Response::Block); - async move { rsp }.boxed() + // Prepare data for concurrent execution + let best_chain = self.mem.best_chain().cloned(); + let db = self.disk.db().clone(); + + // # Performance + // + // Allow other async tasks to make progress while the block is being read from disk. + tokio::task::spawn_blocking(move || { + let rsp = read::block(best_chain, &db, hash_or_height); + + Ok(Response::Block(rsp)) + }) + .map(|join_result| join_result.expect("panic in Request::Block")) + .boxed() } Request::AwaitUtxo(outpoint) => { metrics::counter!( @@ -935,6 +941,8 @@ impl Service for StateService { "type" => "find_block_headers", ); + // Before we spawn the future, get a consistent set of chain hashes from the state. + const MAX_FIND_BLOCK_HEADERS_RESULTS: usize = 160; // Zcashd will blindly request more block headers as long as it // got 160 block headers in response to a previous query, EVEN @@ -944,16 +952,30 @@ impl Service for StateService { // https://github.com/bitcoin/bitcoin/pull/4468/files#r17026905 let count = MAX_FIND_BLOCK_HEADERS_RESULTS - 2; let res = self.find_best_chain_hashes(known_blocks, stop, count); - let res: Vec<_> = res - .iter() - .map(|&hash| { - let header = self - .best_block_header(hash.into()) - .expect("block header for found hash is in the best chain"); - block::CountedHeader { header } - }) - .collect(); - async move { Ok(Response::BlockHeaders(res)) }.boxed() + + // And prepare data for concurrent execution + let best_chain = self.mem.best_chain().cloned(); + let db = self.disk.db().clone(); + + // # Performance + // + // Now we have the chain hashes, we can read the headers concurrently, + // which allows other async tasks to make progress while data is being read from disk. + tokio::task::spawn_blocking(move || { + let res = res + .iter() + .map(|&hash| { + let header = read::block_header(best_chain.clone(), &db, hash.into()) + .expect("block header for found hash is in the best chain"); + + block::CountedHeader { header } + }) + .collect(); + + Ok(Response::BlockHeaders(res)) + }) + .map(|join_result| join_result.expect("panic in Request::FindBlockHeaders")) + .boxed() } } } @@ -983,13 +1005,17 @@ impl Service for ReadStateService { let state = self.clone(); - async move { + // # Performance + // + // Allow other async tasks to make progress while concurrently reading blocks from disk. + tokio::task::spawn_blocking(move || { let block = state.best_chain_receiver.with_watch_data(|best_chain| { read::block(best_chain, &state.db, hash_or_height) }); Ok(ReadResponse::Block(block)) - } + }) + .map(|join_result| join_result.expect("panic in ReadRequest::Block")) .boxed() } @@ -1004,14 +1030,18 @@ impl Service for ReadStateService { let state = self.clone(); - async move { + // # Performance + // + // Allow other async tasks to make progress while concurrently reading transactions from disk. + tokio::task::spawn_blocking(move || { let transaction_and_height = state.best_chain_receiver.with_watch_data(|best_chain| { read::transaction(best_chain, &state.db, hash) }); Ok(ReadResponse::Transaction(transaction_and_height)) - } + }) + .map(|join_result| join_result.expect("panic in ReadRequest::Transaction")) .boxed() } @@ -1025,13 +1055,17 @@ impl Service for ReadStateService { let state = self.clone(); - async move { + // # Performance + // + // Allow other async tasks to make progress while concurrently reading trees from disk. + tokio::task::spawn_blocking(move || { let sapling_tree = state.best_chain_receiver.with_watch_data(|best_chain| { read::sapling_tree(best_chain, &state.db, hash_or_height) }); Ok(ReadResponse::SaplingTree(sapling_tree)) - } + }) + .map(|join_result| join_result.expect("panic in ReadRequest::SaplingTree")) .boxed() } @@ -1045,13 +1079,17 @@ impl Service for ReadStateService { let state = self.clone(); - async move { + // # Performance + // + // Allow other async tasks to make progress while concurrently reading trees from disk. + tokio::task::spawn_blocking(move || { let orchard_tree = state.best_chain_receiver.with_watch_data(|best_chain| { read::orchard_tree(best_chain, &state.db, hash_or_height) }); Ok(ReadResponse::OrchardTree(orchard_tree)) - } + }) + .map(|join_result| join_result.expect("panic in ReadRequest::OrchardTree")) .boxed() } @@ -1069,13 +1107,19 @@ impl Service for ReadStateService { let state = self.clone(); - async move { + // # Performance + // + // Allow other async tasks to make progress while concurrently reading transaction IDs from disk. + tokio::task::spawn_blocking(move || { let tx_ids = state.best_chain_receiver.with_watch_data(|best_chain| { read::transparent_tx_ids(best_chain, &state.db, addresses, height_range) }); tx_ids.map(ReadResponse::AddressesTransactionIds) - } + }) + .map(|join_result| { + join_result.expect("panic in ReadRequest::TransactionIdsByAddresses") + }) .boxed() } @@ -1090,13 +1134,17 @@ impl Service for ReadStateService { let state = self.clone(); - async move { + // # Performance + // + // Allow other async tasks to make progress while concurrently reading balances from disk. + tokio::task::spawn_blocking(move || { let balance = state.best_chain_receiver.with_watch_data(|best_chain| { read::transparent_balance(best_chain, &state.db, addresses) })?; Ok(ReadResponse::AddressBalance(balance)) - } + }) + .map(|join_result| join_result.expect("panic in ReadRequest::AddressBalance")) .boxed() } @@ -1111,13 +1159,17 @@ impl Service for ReadStateService { let state = self.clone(); - async move { + // # Performance + // + // Allow other async tasks to make progress while concurrently reading UTXOs from disk. + tokio::task::spawn_blocking(move || { let utxos = state.best_chain_receiver.with_watch_data(|best_chain| { read::transparent_utxos(state.network, best_chain, &state.db, addresses) }); utxos.map(ReadResponse::Utxos) - } + }) + .map(|join_result| join_result.expect("panic in ReadRequest::UtxosByAddresses")) .boxed() } } diff --git a/zebra-state/src/service/finalized_state/disk_format/block.rs b/zebra-state/src/service/finalized_state/disk_format/block.rs index fafa63d1..939408c7 100644 --- a/zebra-state/src/service/finalized_state/disk_format/block.rs +++ b/zebra-state/src/service/finalized_state/disk_format/block.rs @@ -229,10 +229,23 @@ impl IntoDisk for Transaction { impl FromDisk for Transaction { fn from_bytes(bytes: impl AsRef<[u8]>) -> Self { - bytes - .as_ref() - .zcash_deserialize_into() - .expect("deserialization format should match the serialization format used by IntoDisk") + let bytes = bytes.as_ref(); + + let mut tx = None; + + // # Performance + // + // Move CPU-intensive deserialization cryptography into the rayon thread pool. + // This avoids blocking the tokio executor. + rayon::in_place_scope_fifo(|scope| { + scope.spawn_fifo(|_scope| { + tx = Some(bytes.as_ref().zcash_deserialize_into().expect( + "deserialization format should match the serialization format used by IntoDisk", + )); + }); + }); + + tx.expect("scope has already run") } } diff --git a/zebra-state/src/service/finalized_state/zebra_db/block.rs b/zebra-state/src/service/finalized_state/zebra_db/block.rs index 8674945b..56089c6d 100644 --- a/zebra-state/src/service/finalized_state/zebra_db/block.rs +++ b/zebra-state/src/service/finalized_state/zebra_db/block.rs @@ -111,8 +111,11 @@ impl ZebraDb { // Manually fetch the entire block's transactions let mut transactions = Vec::new(); - // TODO: is this loop more efficient if we store the number of transactions? - // is the difference large enough to matter? + // TODO: + // - split disk reads from deserialization, and run deserialization in parallel, + // this improves performance for blocks with multiple large shielded transactions + // - is this loop more efficient if we store the number of transactions? + // - is the difference large enough to matter? for tx_index in 0..=Transaction::max_allocation() { let tx_loc = TransactionLocation::from_u64(height, tx_index); diff --git a/zebra-state/src/service/tests.rs b/zebra-state/src/service/tests.rs index 1a52ef05..cf5de2dd 100644 --- a/zebra-state/src/service/tests.rs +++ b/zebra-state/src/service/tests.rs @@ -208,7 +208,7 @@ fn out_of_order_committing_strategy() -> BoxedStrategy>> { Just(blocks).prop_shuffle().boxed() } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn empty_state_still_responds_to_requests() -> Result<()> { zebra_test::init(); diff --git a/zebrad/src/components/inbound/tests/real_peer_set.rs b/zebrad/src/components/inbound/tests/real_peer_set.rs index cef025d0..4aed985d 100644 --- a/zebrad/src/components/inbound/tests/real_peer_set.rs +++ b/zebrad/src/components/inbound/tests/real_peer_set.rs @@ -120,7 +120,7 @@ async fn inbound_peers_empty_address_book() -> Result<(), crate::BoxError> { /// Check that a network stack with an empty state responds to block requests with `notfound`. /// /// Uses a real Zebra network stack, with an isolated Zebra inbound TCP connection. -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn inbound_block_empty_state_notfound() -> Result<(), crate::BoxError> { let ( // real services diff --git a/zebrad/src/components/mempool/tests/vector.rs b/zebrad/src/components/mempool/tests/vector.rs index 3323324b..06ca8eaa 100644 --- a/zebrad/src/components/mempool/tests/vector.rs +++ b/zebrad/src/components/mempool/tests/vector.rs @@ -543,7 +543,7 @@ async fn mempool_cancel_downloads_after_network_upgrade() -> Result<(), Report> } /// Check if a transaction that fails verification is rejected by the mempool. -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn mempool_failed_verification_is_rejected() -> Result<(), Report> { // Using the mainnet for now let network = Network::Mainnet; @@ -555,8 +555,6 @@ async fn mempool_failed_verification_is_rejected() -> Result<(), Report> { let mut unmined_transactions = unmined_transactions_in_blocks(1..=2, network); let rejected_tx = unmined_transactions.next().unwrap().clone(); - time::pause(); - // Enable the mempool mempool.enable(&mut recent_syncs).await; @@ -614,7 +612,7 @@ async fn mempool_failed_verification_is_rejected() -> Result<(), Report> { } /// Check if a transaction that fails download is _not_ rejected. -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn mempool_failed_download_is_not_rejected() -> Result<(), Report> { // Using the mainnet for now let network = Network::Mainnet; @@ -626,8 +624,6 @@ async fn mempool_failed_download_is_not_rejected() -> Result<(), Report> { let mut unmined_transactions = unmined_transactions_in_blocks(1..=2, network); let rejected_valid_tx = unmined_transactions.next().unwrap().clone(); - time::pause(); - // Enable the mempool mempool.enable(&mut recent_syncs).await;