diff --git a/zebra-network/src/config.rs b/zebra-network/src/config.rs index bc1a26fc..e6c1b3b9 100644 --- a/zebra-network/src/config.rs +++ b/zebra-network/src/config.rs @@ -17,6 +17,7 @@ const MAX_SINGLE_PEER_RETRIES: usize = 2; /// Configuration for networking code. #[derive(Clone, Debug, Serialize)] +#[serde(deny_unknown_fields, default)] pub struct Config { /// The address on which this node should listen for connections. /// @@ -64,6 +65,9 @@ pub struct Config { /// - regularly, every time `crawl_new_peer_interval` elapses, and /// - if the peer set is busy, and there aren't any peer addresses for the /// next connection attempt. + // + // Note: Durations become a TOML table, so they must be the final item in the config + // We'll replace them with a more user-friendly format in #2847 pub crawl_new_peer_interval: Duration, } diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index 06e77a42..0a012ebf 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -110,12 +110,17 @@ impl StartCmd { let sync_gossip_task_handle = tokio::spawn(sync::gossip_best_tip_block_hashes( sync_status.clone(), - chain_tip_change, + chain_tip_change.clone(), peer_set.clone(), )); - let mempool_crawler_task_handle = - mempool::Crawler::spawn(peer_set.clone(), mempool, sync_status); + let mempool_crawler_task_handle = mempool::Crawler::spawn( + &config.mempool, + peer_set.clone(), + mempool, + sync_status, + chain_tip_change, + ); let tx_gossip_task_handle = tokio::spawn(mempool::gossip_mempool_transaction_id( mempool_transaction_receiver, diff --git a/zebrad/src/components/mempool.rs b/zebrad/src/components/mempool.rs index 186cf8a2..94bcba90 100644 --- a/zebrad/src/components/mempool.rs +++ b/zebrad/src/components/mempool.rs @@ -13,6 +13,7 @@ use tokio::sync::watch; use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service}; use zebra_chain::{ + block::Height, chain_tip::ChainTip, transaction::{UnminedTx, UnminedTxId}, }; @@ -108,9 +109,14 @@ pub struct Mempool { /// Allows checking if we are near the tip to enable/disable the mempool. sync_status: SyncStatus, + /// If the state's best chain tip has reached this height, always enable the mempool. + debug_enable_at_height: Option, + /// Allow efficient access to the best tip of the blockchain. latest_chain_tip: zs::LatestChainTip, - /// Allows the detection of chain tip resets. + + /// Allows the detection of newly added chain tip blocks, + /// and chain tip resets. chain_tip_change: ChainTipChange, /// Handle to the outbound service. @@ -132,7 +138,7 @@ pub struct Mempool { impl Mempool { pub(crate) fn new( - _config: &Config, + config: &Config, outbound: Outbound, state: State, tx_verifier: TxVerifier, @@ -146,6 +152,7 @@ impl Mempool { let mut service = Mempool { active_state: ActiveState::Disabled, sync_status, + debug_enable_at_height: config.debug_enable_at_height.map(Height), latest_chain_tip, chain_tip_change, outbound, @@ -161,10 +168,39 @@ impl Mempool { (service, transaction_receiver) } + /// Is the mempool enabled by a debug config option? + fn is_enabled_by_debug(&self) -> bool { + let mut is_debug_enabled = false; + + // optimise non-debug performance + if self.debug_enable_at_height.is_none() { + return is_debug_enabled; + } + + let enable_at_height = self + .debug_enable_at_height + .expect("unexpected debug_enable_at_height: just checked for None"); + + if let Some(best_tip_height) = self.latest_chain_tip.best_tip_height() { + is_debug_enabled = best_tip_height >= enable_at_height; + + if is_debug_enabled && !self.is_enabled() { + info!( + ?best_tip_height, + ?enable_at_height, + "enabling mempool for debugging" + ); + } + } + + is_debug_enabled + } + /// Update the mempool state (enabled / disabled) depending on how close to /// the tip is the synchronization, including side effects to state changes. fn update_state(&mut self) { - let is_close_to_tip = self.sync_status.is_close_to_tip(); + let is_close_to_tip = self.sync_status.is_close_to_tip() || self.is_enabled_by_debug(); + if self.is_enabled() == is_close_to_tip { // the active state is up to date return; @@ -172,6 +208,8 @@ impl Mempool { // Update enabled / disabled state if is_close_to_tip { + info!("activating mempool: Zebra is close to the tip"); + let tx_downloads = Box::pin(TxDownloads::new( Timeout::new(self.outbound.clone(), TRANSACTION_DOWNLOAD_TIMEOUT), Timeout::new(self.tx_verifier.clone(), TRANSACTION_VERIFY_TIMEOUT), @@ -182,6 +220,8 @@ impl Mempool { tx_downloads, }; } else { + info!("deactivating mempool: Zebra is syncing lots of blocks"); + self.active_state = ActiveState::Disabled } } @@ -415,6 +455,7 @@ fn reject_if_needed( // If it was cancelled then a block was mined, or there was a network // upgrade, etc. No reason to reject it. TransactionDownloadVerifyError::Cancelled => {} + // Consensus verification failed. Reject transaction to avoid // having to download and verify it again just for it to fail again. TransactionDownloadVerifyError::Invalid(e) => { diff --git a/zebrad/src/components/mempool/config.rs b/zebrad/src/components/mempool/config.rs index 08592b5c..aab68787 100644 --- a/zebrad/src/components/mempool/config.rs +++ b/zebrad/src/components/mempool/config.rs @@ -6,7 +6,19 @@ use serde::{Deserialize, Serialize}; /// Mempool configuration section. #[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(deny_unknown_fields, default)] pub struct Config { + /// If the state's best chain tip has reached this height, always enable the mempool, + /// regardless of Zebra's sync status. + /// + /// Set to `None` by default: Zebra always checks the sync status before enabling the mempool. + // + // TODO: + // - allow the mempool to be enabled before the genesis block is committed? + // we could replace `Option` with an enum that has an `AlwaysEnable` variant + // - move debug configs last (needs #2847) + pub debug_enable_at_height: Option, + /// The mempool transaction cost limit. /// /// This limits the total serialized byte size of all transactions in the mempool. @@ -24,23 +36,28 @@ pub struct Config { /// /// This corresponds to `mempoolevictionmemoryminutes` from /// [ZIP-401](https://zips.z.cash/zip-0401#specification). + // + // Note: Durations become a TOML table, so they must be the final item in the config + // We'll replace them with a more user-friendly format in #2847 pub eviction_memory_time: Duration, } -/// Consensus rules: -/// -/// > There MUST be a configuration option mempooltxcostlimit, -/// > which SHOULD default to 80000000. -/// > -/// > There MUST be a configuration option mempoolevictionmemoryminutes, -/// > which SHOULD default to 60 [minutes]. -/// -/// https://zips.z.cash/zip-0401#specification impl Default for Config { fn default() -> Self { Self { + /// Consensus rules: + /// + /// > There MUST be a configuration option mempooltxcostlimit, + /// > which SHOULD default to 80000000. + /// > + /// > There MUST be a configuration option mempoolevictionmemoryminutes, + /// > which SHOULD default to 60 [minutes]. + /// + /// https://zips.z.cash/zip-0401#specification tx_cost_limit: 80_000_000, eviction_memory_time: Duration::from_secs(60 * 60), + + debug_enable_at_height: None, } } } diff --git a/zebrad/src/components/mempool/crawler.rs b/zebrad/src/components/mempool/crawler.rs index c93002c6..fa156caf 100644 --- a/zebrad/src/components/mempool/crawler.rs +++ b/zebrad/src/components/mempool/crawler.rs @@ -4,15 +4,17 @@ use std::time::Duration; -use futures::{stream::FuturesUnordered, StreamExt}; -use tokio::{task::JoinHandle, time::sleep}; +use futures::{future, pin_mut, stream::FuturesUnordered, StreamExt}; +use tokio::{sync::watch, task::JoinHandle, time::sleep}; use tower::{timeout::Timeout, BoxError, Service, ServiceExt}; +use zebra_chain::block::Height; use zebra_network as zn; +use zebra_state::ChainTipChange; -use super::{ - super::{mempool, sync::SyncStatus}, - downloads::Gossip, +use crate::components::{ + mempool::{self, downloads::Gossip, Config}, + sync::SyncStatus, }; #[cfg(test)] @@ -35,9 +37,20 @@ const PEER_RESPONSE_TIMEOUT: Duration = Duration::from_secs(6); /// The mempool transaction crawler. pub struct Crawler { + /// The network peer set to crawl. peer_set: Timeout, + + /// The mempool service that receives crawled transaction IDs. mempool: Mempool, - status: SyncStatus, + + /// Allows checking if we are near the tip to enable/disable the mempool crawler. + sync_status: SyncStatus, + + /// Notifies the crawler when the best chain tip height changes. + chain_tip_change: ChainTipChange, + + /// If the state's best chain tip has reached this height, always enable the mempool crawler. + debug_enable_at_height: Option, } impl Crawler @@ -51,19 +64,67 @@ where { /// Spawn an asynchronous task to run the mempool crawler. pub fn spawn( + config: &Config, peer_set: PeerSet, mempool: Mempool, - status: SyncStatus, + sync_status: SyncStatus, + chain_tip_change: ChainTipChange, ) -> JoinHandle> { let crawler = Crawler { peer_set: Timeout::new(peer_set, PEER_RESPONSE_TIMEOUT), mempool, - status, + sync_status, + chain_tip_change, + debug_enable_at_height: config.debug_enable_at_height.map(Height), }; tokio::spawn(crawler.run()) } + /// Waits until the mempool crawler is enabled by a debug config option. + /// + /// Returns an error if communication with the state is lost. + async fn wait_until_enabled_by_debug(&mut self) -> Result<(), watch::error::RecvError> { + // optimise non-debug performance + if self.debug_enable_at_height.is_none() { + return future::pending().await; + } + + let enable_at_height = self + .debug_enable_at_height + .expect("unexpected debug_enable_at_height: just checked for None"); + + loop { + let best_tip_height = self + .chain_tip_change + .wait_for_tip_change() + .await? + .best_tip_height(); + + if best_tip_height >= enable_at_height { + return Ok(()); + } + } + } + + /// Waits until the mempool crawler is enabled. + /// + /// Returns an error if communication with the syncer or state is lost. + async fn wait_until_enabled(&mut self) -> Result<(), watch::error::RecvError> { + let mut sync_status = self.sync_status.clone(); + let tip_future = sync_status.wait_until_close_to_tip(); + let debug_future = self.wait_until_enabled_by_debug(); + + pin_mut!(tip_future); + pin_mut!(debug_future); + + let (result, _unready_future) = future::select(tip_future, debug_future) + .await + .factor_first(); + + result + } + /// Periodically crawl peers for transactions to include in the mempool. /// /// Runs until the [`SyncStatus`] loses its connection to the chain syncer, which happens when @@ -71,12 +132,11 @@ where pub async fn run(mut self) -> Result<(), BoxError> { info!("initializing mempool crawler task"); - while self.status.wait_until_close_to_tip().await.is_ok() { + loop { + self.wait_until_enabled().await?; self.crawl_transactions().await?; sleep(RATE_LIMIT_DELAY).await; } - - Ok(()) } /// Crawl peers for transactions. diff --git a/zebrad/src/components/mempool/crawler/tests.rs b/zebrad/src/components/mempool/crawler/tests.rs index 7a97b72c..29279e2f 100644 --- a/zebrad/src/components/mempool/crawler/tests.rs +++ b/zebrad/src/components/mempool/crawler/tests.rs @@ -3,17 +3,20 @@ use std::time::Duration; use proptest::{collection::vec, prelude::*}; use tokio::time; -use zebra_chain::transaction::UnminedTxId; +use zebra_chain::{parameters::Network, transaction::UnminedTxId}; use zebra_network as zn; +use zebra_state::ChainTipSender; use zebra_test::mock_service::{MockService, PropTestAssertion}; -use super::{ - super::{ - super::{mempool, sync::RecentSyncLengths}, +use crate::components::{ + mempool::{ + self, + crawler::{Crawler, SyncStatus, FANOUT, RATE_LIMIT_DELAY}, downloads::Gossip, error::MempoolError, + Config, }, - Crawler, SyncStatus, FANOUT, RATE_LIMIT_DELAY, + sync::RecentSyncLengths, }; /// The number of iterations to crawl while testing. @@ -54,7 +57,8 @@ proptest! { sync_lengths.push(0); runtime.block_on(async move { - let (mut peer_set, _mempool, sync_status, mut recent_sync_lengths) = setup_crawler(); + let (mut peer_set, _mempool, sync_status, mut recent_sync_lengths, _chain_tip_sender, + ) = setup_crawler(); time::pause(); @@ -103,7 +107,7 @@ proptest! { let transaction_id_count = transaction_ids.len(); runtime.block_on(async move { - let (mut peer_set, mut mempool, _sync_status, mut recent_sync_lengths) = + let (mut peer_set, mut mempool, _sync_status, mut recent_sync_lengths, _chain_tip_sender) = setup_crawler(); time::pause(); @@ -144,7 +148,7 @@ proptest! { let _guard = runtime.enter(); runtime.block_on(async move { - let (mut peer_set, mut mempool, _sync_status, mut recent_sync_lengths) = + let (mut peer_set, mut mempool, _sync_status, mut recent_sync_lengths, _chain_tip_sender) = setup_crawler(); time::pause(); @@ -218,14 +222,36 @@ proptest! { } /// Spawn a crawler instance using mock services. -fn setup_crawler() -> (MockPeerSet, MockMempool, SyncStatus, RecentSyncLengths) { +fn setup_crawler() -> ( + MockPeerSet, + MockMempool, + SyncStatus, + RecentSyncLengths, + ChainTipSender, +) { let peer_set = MockService::build().for_prop_tests(); let mempool = MockService::build().for_prop_tests(); let (sync_status, recent_sync_lengths) = SyncStatus::new(); - Crawler::spawn(peer_set.clone(), mempool.clone(), sync_status.clone()); + // the network should be irrelevant here + let (chain_tip_sender, _latest_chain_tip, chain_tip_change) = + ChainTipSender::new(None, Network::Mainnet); - (peer_set, mempool, sync_status, recent_sync_lengths) + Crawler::spawn( + &Config::default(), + peer_set.clone(), + mempool.clone(), + sync_status.clone(), + chain_tip_change, + ); + + ( + peer_set, + mempool, + sync_status, + recent_sync_lengths, + chain_tip_sender, + ) } /// Intercept a request for mempool transaction IDs and respond with the `transaction_ids` list. diff --git a/zebrad/tests/acceptance.rs b/zebrad/tests/acceptance.rs index 6d002642..f44f2f6d 100644 --- a/zebrad/tests/acceptance.rs +++ b/zebrad/tests/acceptance.rs @@ -714,6 +714,7 @@ fn sync_one_checkpoint_mainnet() -> Result<()> { SMALL_CHECKPOINT_TIMEOUT, None, true, + None, ) .map(|_tempdir| ()) } @@ -730,6 +731,7 @@ fn sync_one_checkpoint_testnet() -> Result<()> { SMALL_CHECKPOINT_TIMEOUT, None, true, + None, ) .map(|_tempdir| ()) } @@ -753,6 +755,7 @@ fn restart_stop_at_height_for_network(network: Network, height: Height) -> Resul SMALL_CHECKPOINT_TIMEOUT, None, true, + None, )?; // if stopping corrupts the rocksdb database, zebrad might hang or crash here // if stopping does not write the rocksdb database to disk, Zebra will @@ -762,13 +765,30 @@ fn restart_stop_at_height_for_network(network: Network, height: Height) -> Resul network, "state is already at the configured height", STOP_ON_LOAD_TIMEOUT, - Some(reuse_tempdir), + reuse_tempdir, false, + None, )?; Ok(()) } +/// Test if `zebrad` can activate the mempool on mainnet. +/// Debug activation happens after committing the genesis block. +#[test] +fn activate_mempool_mainnet() -> Result<()> { + sync_until( + Height(1), + Mainnet, + STOP_AT_HEIGHT_REGEX, + SMALL_CHECKPOINT_TIMEOUT, + None, + true, + Some(Height(0)), + ) + .map(|_tempdir| ()) +} + /// Test if `zebrad` can sync some larger checkpoints on mainnet. /// /// This test might fail or timeout on slow or unreliable networks, @@ -784,6 +804,7 @@ fn sync_large_checkpoints_mainnet() -> Result<()> { LARGE_CHECKPOINT_TIMEOUT, None, true, + None, )?; // if this sync fails, see the failure notes in `restart_stop_at_height` sync_until( @@ -791,8 +812,9 @@ fn sync_large_checkpoints_mainnet() -> Result<()> { Mainnet, "previous state height is greater than the stop height", STOP_ON_LOAD_TIMEOUT, - Some(reuse_tempdir), + reuse_tempdir, false, + None, )?; Ok(()) @@ -805,6 +827,13 @@ fn sync_large_checkpoints_mainnet() -> Result<()> { /// the output contains `stop_regex`. If `reuse_tempdir` is supplied, /// use it as the test's temporary directory. /// +/// If `check_legacy_chain` is true, +/// make sure the logs contain the legacy chain check. +/// +/// If `enable_mempool_at_height` is `Some(Height(_))`, +/// configure `zebrad` to debug-enable the mempool at that height. +/// Then check the logs for the mempool being enabled. +/// /// If `stop_regex` is encountered before the process exits, kills the /// process, and mark the test as successful, even if `height` has not /// been reached. @@ -819,8 +848,9 @@ fn sync_until( network: Network, stop_regex: &str, timeout: Duration, - reuse_tempdir: Option, + reuse_tempdir: impl Into>, check_legacy_chain: bool, + enable_mempool_at_height: impl Into>, ) -> Result { zebra_test::init(); @@ -828,11 +858,15 @@ fn sync_until( return testdir(); } + let reuse_tempdir = reuse_tempdir.into(); + let enable_mempool_at_height = enable_mempool_at_height.into(); + // Use a persistent state, so we can handle large syncs let mut config = persistent_test_config()?; // TODO: add convenience methods? config.network.network = network; config.state.debug_stop_at_height = Some(height.0); + config.mempool.debug_enable_at_height = enable_mempool_at_height.map(|height| height.0); let tempdir = if let Some(reuse_tempdir) = reuse_tempdir { reuse_tempdir.replace_config(&mut config)? @@ -850,6 +884,11 @@ fn sync_until( child.expect_stdout_line_matches("no legacy chain found")?; } + if enable_mempool_at_height.is_some() { + child.expect_stdout_line_matches("enabling mempool for debugging")?; + child.expect_stdout_line_matches("activating mempool")?; + } + child.expect_stdout_line_matches(stop_regex)?; child.kill()?;