From 303c8cf5ef273db7e11b2d177f66e39d1fdd6cea Mon Sep 17 00:00:00 2001 From: teor Date: Tue, 19 Oct 2021 05:23:21 +1000 Subject: [PATCH] Add a queue checker task, to make sure mempool transactions propagate (#2888) * Guarantee unique IDs in mempool service responses * Guarantee unique IDs in crawler task mempool Queue requests Also update the tests to use unique IDs. * Add a CheckForVerifiedTransactions mempool request Also document the mempool request and response variants. * Spawn a QueueChecker task to check for newly verified transactions This task makes sure that transactions reliably propagate, rather than relying on peer requests or responses to trigger propagation. * Update the start command documentation Co-authored-by: Alfredo Garcia --- zebrad/src/commands/start.rs | 47 ++++++++-- zebrad/src/components/mempool.rs | 80 ++++++++++++++++- .../src/components/mempool/queue_checker.rs | 86 +++++++++++++++++++ zebrad/src/components/mempool/tests.rs | 4 +- 4 files changed, 205 insertions(+), 12 deletions(-) create mode 100644 zebrad/src/components/mempool/queue_checker.rs diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index 0a012ebf..ff532755 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -4,24 +4,49 @@ //! //! A zebra node consists of the following services and tasks: //! +//! Peers: //! * Network Service -//! * primary interface to the node +//! * primary external interface to the node //! * handles all external network requests for the Zcash protocol //! * via zebra_network::Message and zebra_network::Response //! * provides an interface to the rest of the network for other services and //! tasks running within this node //! * via zebra_network::Request +//! +//! Blocks & Mempool Transactions: //! * Consensus Service //! * handles all validation logic for the node -//! * verifies blocks using zebra-chain and zebra-script, then stores verified -//! blocks in zebra-state +//! * verifies blocks using zebra-chain, then stores verified blocks in zebra-state +//! * verifies mempool and block transactions using zebra-chain and zebra-script, +//! and returns verified mempool transactions for mempool storage +//! * Inbound Service +//! * handles requests from peers for network data, chain data, and mempool transactions +//! * spawns download and verify tasks for each gossiped block +//! * sends gossiped transactions to the mempool service +//! +//! Blocks: //! * Sync Task //! * runs in the background and continuously queries the network for //! new blocks to be verified and added to the local state -//! * Inbound Service -//! * handles requests from peers for network data and chain data -//! * performs transaction and block diffusion -//! * downloads and verifies gossiped blocks and transactions +//! * spawns download and verify tasks for each crawled block +//! * State Service +//! * contextually verifies blocks +//! * handles in-memory storage of multiple non-finalized chains +//! * handles permanent storage of the best finalized chain +//! * Block Gossip Task +//! * runs in the background and continuously queries the state for +//! newly committed blocks to be gossiped to peers +//! +//! Mempool Transactions: +//! * Mempool Service +//! * activates when the syncer is near the chain tip +//! * spawns download and verify tasks for each crawled or gossiped transaction +//! * handles in-memory storage of unmined transactions +//! * Queue Checker Task +//! * runs in the background, polling the mempool to store newly verified transactions +//! * Transaction Gossip Task +//! * runs in the background and gossips newly added mempool transactions +//! to peers use abscissa_core::{config, Command, FrameworkError, Options, Runnable}; use color_eyre::eyre::{eyre, Report}; @@ -117,11 +142,13 @@ impl StartCmd { let mempool_crawler_task_handle = mempool::Crawler::spawn( &config.mempool, peer_set.clone(), - mempool, + mempool.clone(), sync_status, chain_tip_change, ); + let mempool_queue_checker_task_handle = mempool::QueueChecker::spawn(mempool); + let tx_gossip_task_handle = tokio::spawn(mempool::gossip_mempool_transaction_id( mempool_transaction_receiver, peer_set, @@ -138,6 +165,10 @@ impl StartCmd { .expect("unexpected panic in the mempool crawler") .map_err(|e| eyre!(e)), + mempool_queue_result = mempool_queue_checker_task_handle.fuse() => mempool_queue_result + .expect("unexpected panic in the mempool queue checker") + .map_err(|e| eyre!(e)), + tx_gossip_result = tx_gossip_task_handle.fuse() => tx_gossip_result .expect("unexpected panic in the transaction gossip task") .map_err(|e| eyre!(e)), diff --git a/zebrad/src/components/mempool.rs b/zebrad/src/components/mempool.rs index 06947889..69a18596 100644 --- a/zebrad/src/components/mempool.rs +++ b/zebrad/src/components/mempool.rs @@ -29,6 +29,7 @@ mod crawler; pub mod downloads; mod error; pub mod gossip; +mod queue_checker; mod storage; #[cfg(test)] @@ -40,6 +41,7 @@ pub use config::Config; pub use crawler::Crawler; pub use error::MempoolError; pub use gossip::gossip_mempool_transaction_id; +pub use queue_checker::QueueChecker; pub use storage::{ ExactTipRejectionError, SameEffectsChainRejectionError, SameEffectsTipRejectionError, }; @@ -59,21 +61,75 @@ type TxVerifier = Buffer< >; type InboundTxDownloads = TxDownloads, Timeout, State>; +/// A mempool service request. +/// +/// Requests can query the current set of mempool transactions, +/// queue transactions to be downloaded and verified, or +/// run the mempool to check for newly verified transactions. +/// +/// Requests can't modify the mempool directly, +/// because all mempool transactions must be verified. #[derive(Debug, Eq, PartialEq)] #[allow(dead_code)] pub enum Request { + /// Query all transaction IDs in the mempool. TransactionIds, + + /// Query matching transactions in the mempool, + /// using a unique set of [`UnminedTxId`]s. TransactionsById(HashSet), + + /// Query matching cached rejected transaction IDs in the mempool, + /// using a unique set of [`UnminedTxId`]s. RejectedTransactionIds(HashSet), + + /// Queue a list of gossiped transactions or transaction IDs, or + /// crawled transaction IDs. + /// + /// The transaction downloader checks for duplicates across IDs and transactions. Queue(Vec), + + /// Check for newly verified transactions. + /// + /// The transaction downloader does not push transactions into the mempool. + /// So a task should send this request regularly (every 5-10 seconds). + /// + /// These checks also happen for other request variants, + /// but we can't rely on peers to send queries regularly, + /// and crawler queue requests depend on peer responses. + /// Also, crawler requests aren't frequent enough for transaction propagation. + CheckForVerifiedTransactions, } +/// A response to a mempool service request. +/// +/// Responses can read the current set of mempool transactions, +/// check the queued status of transactions to be downloaded and verified, or +/// confirm that the mempool has been checked for newly verified transactions. #[derive(Debug)] pub enum Response { - Transactions(Vec), + /// Returns all transaction IDs from the mempool. TransactionIds(HashSet), + + /// Returns matching transactions from the mempool. + /// + /// Since the [`TransactionsById`] request is unique, + /// the response transactions are also unique. + Transactions(Vec), + + /// Returns matching cached rejected transaction IDs from the mempool, RejectedTransactionIds(HashSet), + + /// Returns a list of queue results. + /// + /// These are the results of the initial queue checks. + /// The transaction may also fail download or verification later. + /// + /// Each result matches the request at the corresponding vector index. Queued(Vec>), + + /// Confirms that the mempool has checked for recently verified transactions. + CheckedForVerifiedTransactions, } /// The state of the mempool. @@ -375,6 +431,7 @@ impl Service for Mempool { storage, tx_downloads, } => match req { + // Queries Request::TransactionIds => { let res = storage.tx_ids().collect(); async move { Ok(Response::TransactionIds(res)) }.boxed() @@ -387,6 +444,8 @@ impl Service for Mempool { let res = storage.rejected_transactions(ids).collect(); async move { Ok(Response::RejectedTransactionIds(res)) }.boxed() } + + // Queue mempool candidates Request::Queue(gossiped_txs) => { let rsp: Vec> = gossiped_txs .into_iter() @@ -398,23 +457,40 @@ impl Service for Mempool { .collect(); async move { Ok(Response::Queued(rsp)) }.boxed() } + + // Store successfully downloaded and verified transactions in the mempool + Request::CheckForVerifiedTransactions => { + // all the work for this request is done in poll_ready + async move { Ok(Response::CheckedForVerifiedTransactions) }.boxed() + } }, ActiveState::Disabled => { // We can't return an error since that will cause a disconnection // by the peer connection handler. Therefore, return successful // empty responses. let resp = match req { + // Empty Queries Request::TransactionIds => Response::TransactionIds(Default::default()), Request::TransactionsById(_) => Response::Transactions(Default::default()), Request::RejectedTransactionIds(_) => { Response::RejectedTransactionIds(Default::default()) } - // Special case; we can signal the error inside the response. + + // Don't queue mempool candidates Request::Queue(gossiped_txs) => Response::Queued( + // Special case; we can signal the error inside the response, + // because the inbound service ignores inner errors. iter::repeat(Err(MempoolError::Disabled)) .take(gossiped_txs.len()) .collect(), ), + + // Check if the mempool should be enabled. + // This request makes sure mempools are debug-enabled in the acceptance tests. + Request::CheckForVerifiedTransactions => { + // all the work for this request is done in poll_ready + Response::CheckedForVerifiedTransactions + } }; async move { Ok(resp) }.boxed() } diff --git a/zebrad/src/components/mempool/queue_checker.rs b/zebrad/src/components/mempool/queue_checker.rs new file mode 100644 index 00000000..4d828dc5 --- /dev/null +++ b/zebrad/src/components/mempool/queue_checker.rs @@ -0,0 +1,86 @@ +//! Zebra Mempool queue checker. +//! +//! The queue checker periodically sends a request to the mempool, +//! so that newly verified transactions are added to the mempool, +//! and gossiped to peers. +//! +//! The mempool performs these actions on every request, +//! but we can't guarantee that requests will arrive from peers +//! on a regular basis. +//! +//! Crawler queue requests are also too infrequent, +//! and they only happen if peers respond within the timeout. + +use std::time::Duration; + +use tokio::{task::JoinHandle, time::sleep}; +use tower::{BoxError, Service, ServiceExt}; + +use crate::components::mempool; + +/// The delay between queue check events. +/// +/// This interval is chosen so that there are a significant number of +/// queue checks in each target block interval. +/// +/// This allows transactions to propagate across the network for each block, +/// even if some peers are poorly connected. +const RATE_LIMIT_DELAY: Duration = Duration::from_secs(5); + +/// The mempool queue checker. +/// +/// The queue checker relies on the mempool to ignore requests when the mempool is inactive. +pub struct QueueChecker { + /// The mempool service that receives crawled transaction IDs. + mempool: Mempool, +} + +impl QueueChecker +where + Mempool: + Service + Send + 'static, + Mempool::Future: Send, +{ + /// Spawn an asynchronous task to run the mempool queue checker. + pub fn spawn(mempool: Mempool) -> JoinHandle> { + let queue_checker = QueueChecker { mempool }; + + tokio::spawn(queue_checker.run()) + } + + /// Periodically check if the mempool has newly verified transactions. + /// + /// Runs until the mempool returns an error, + /// which happens when Zebra is shutting down. + pub async fn run(mut self) -> Result<(), BoxError> { + info!("initializing mempool queue checker task"); + + loop { + sleep(RATE_LIMIT_DELAY).await; + self.check_queue().await?; + } + } + + /// Check if the mempool has newly verified transactions. + async fn check_queue(&mut self) -> Result<(), BoxError> { + debug!("checking for newly verified mempool transactions"); + + // Since this is an internal request, we don't expect any errors. + // So we propagate any unexpected errors to the task that spawned us. + let response = self + .mempool + .ready_and() + .await? + .call(mempool::Request::CheckForVerifiedTransactions) + .await?; + + match response { + mempool::Response::CheckedForVerifiedTransactions => {} + _ => { + unreachable!("mempool did not respond with checked queue to mempool queue checker") + } + }; + + Ok(()) + } +} diff --git a/zebrad/src/components/mempool/tests.rs b/zebrad/src/components/mempool/tests.rs index 0511d883..d2a5f543 100644 --- a/zebrad/src/components/mempool/tests.rs +++ b/zebrad/src/components/mempool/tests.rs @@ -43,8 +43,8 @@ impl Mempool { /// Perform a dummy service call so that `poll_ready` is called. pub async fn dummy_call(&mut self) { - self.oneshot(Request::Queue(vec![])) + self.oneshot(Request::CheckForVerifiedTransactions) .await - .expect("Queuing no transactions shouldn't fail"); + .expect("unexpected failure when checking for verified transactions"); } }