Add a debug config that enables the mempool (#2862)

* Update some comments

* Add a mempool debug_enable_at_height config

* Rename a field in the mempool crawler

* Propagate syncer channel errors through the crawler

We don't want to ignore these errors, because they might indicate a shutdown.
(Or a bug that we should fix.)

* Use debug_enable_at_height in the mempool crawler

* Log when the mempool is activated or deactivated

* Deny unknown fields and apply defaults for all configs

* Move Duration last, as required for TOML tables

* Add a basic mempool acceptance test

Co-authored-by: Conrado Gouvea <conrado@zfnd.org>
This commit is contained in:
teor 2021-10-14 01:04:49 +10:00 committed by GitHub
parent be185315ad
commit b64ed62777
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 232 additions and 40 deletions

View File

@ -17,6 +17,7 @@ const MAX_SINGLE_PEER_RETRIES: usize = 2;
/// Configuration for networking code. /// Configuration for networking code.
#[derive(Clone, Debug, Serialize)] #[derive(Clone, Debug, Serialize)]
#[serde(deny_unknown_fields, default)]
pub struct Config { pub struct Config {
/// The address on which this node should listen for connections. /// The address on which this node should listen for connections.
/// ///
@ -64,6 +65,9 @@ pub struct Config {
/// - regularly, every time `crawl_new_peer_interval` elapses, and /// - regularly, every time `crawl_new_peer_interval` elapses, and
/// - if the peer set is busy, and there aren't any peer addresses for the /// - if the peer set is busy, and there aren't any peer addresses for the
/// next connection attempt. /// next connection attempt.
//
// Note: Durations become a TOML table, so they must be the final item in the config
// We'll replace them with a more user-friendly format in #2847
pub crawl_new_peer_interval: Duration, pub crawl_new_peer_interval: Duration,
} }

View File

@ -110,12 +110,17 @@ impl StartCmd {
let sync_gossip_task_handle = tokio::spawn(sync::gossip_best_tip_block_hashes( let sync_gossip_task_handle = tokio::spawn(sync::gossip_best_tip_block_hashes(
sync_status.clone(), sync_status.clone(),
chain_tip_change, chain_tip_change.clone(),
peer_set.clone(), peer_set.clone(),
)); ));
let mempool_crawler_task_handle = let mempool_crawler_task_handle = mempool::Crawler::spawn(
mempool::Crawler::spawn(peer_set.clone(), mempool, sync_status); &config.mempool,
peer_set.clone(),
mempool,
sync_status,
chain_tip_change,
);
let tx_gossip_task_handle = tokio::spawn(mempool::gossip_mempool_transaction_id( let tx_gossip_task_handle = tokio::spawn(mempool::gossip_mempool_transaction_id(
mempool_transaction_receiver, mempool_transaction_receiver,

View File

@ -13,6 +13,7 @@ use tokio::sync::watch;
use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service}; use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service};
use zebra_chain::{ use zebra_chain::{
block::Height,
chain_tip::ChainTip, chain_tip::ChainTip,
transaction::{UnminedTx, UnminedTxId}, transaction::{UnminedTx, UnminedTxId},
}; };
@ -108,9 +109,14 @@ pub struct Mempool {
/// Allows checking if we are near the tip to enable/disable the mempool. /// Allows checking if we are near the tip to enable/disable the mempool.
sync_status: SyncStatus, sync_status: SyncStatus,
/// If the state's best chain tip has reached this height, always enable the mempool.
debug_enable_at_height: Option<Height>,
/// Allow efficient access to the best tip of the blockchain. /// Allow efficient access to the best tip of the blockchain.
latest_chain_tip: zs::LatestChainTip, latest_chain_tip: zs::LatestChainTip,
/// Allows the detection of chain tip resets.
/// Allows the detection of newly added chain tip blocks,
/// and chain tip resets.
chain_tip_change: ChainTipChange, chain_tip_change: ChainTipChange,
/// Handle to the outbound service. /// Handle to the outbound service.
@ -132,7 +138,7 @@ pub struct Mempool {
impl Mempool { impl Mempool {
pub(crate) fn new( pub(crate) fn new(
_config: &Config, config: &Config,
outbound: Outbound, outbound: Outbound,
state: State, state: State,
tx_verifier: TxVerifier, tx_verifier: TxVerifier,
@ -146,6 +152,7 @@ impl Mempool {
let mut service = Mempool { let mut service = Mempool {
active_state: ActiveState::Disabled, active_state: ActiveState::Disabled,
sync_status, sync_status,
debug_enable_at_height: config.debug_enable_at_height.map(Height),
latest_chain_tip, latest_chain_tip,
chain_tip_change, chain_tip_change,
outbound, outbound,
@ -161,10 +168,39 @@ impl Mempool {
(service, transaction_receiver) (service, transaction_receiver)
} }
/// Is the mempool enabled by a debug config option?
fn is_enabled_by_debug(&self) -> bool {
let mut is_debug_enabled = false;
// optimise non-debug performance
if self.debug_enable_at_height.is_none() {
return is_debug_enabled;
}
let enable_at_height = self
.debug_enable_at_height
.expect("unexpected debug_enable_at_height: just checked for None");
if let Some(best_tip_height) = self.latest_chain_tip.best_tip_height() {
is_debug_enabled = best_tip_height >= enable_at_height;
if is_debug_enabled && !self.is_enabled() {
info!(
?best_tip_height,
?enable_at_height,
"enabling mempool for debugging"
);
}
}
is_debug_enabled
}
/// Update the mempool state (enabled / disabled) depending on how close to /// Update the mempool state (enabled / disabled) depending on how close to
/// the tip is the synchronization, including side effects to state changes. /// the tip is the synchronization, including side effects to state changes.
fn update_state(&mut self) { fn update_state(&mut self) {
let is_close_to_tip = self.sync_status.is_close_to_tip(); let is_close_to_tip = self.sync_status.is_close_to_tip() || self.is_enabled_by_debug();
if self.is_enabled() == is_close_to_tip { if self.is_enabled() == is_close_to_tip {
// the active state is up to date // the active state is up to date
return; return;
@ -172,6 +208,8 @@ impl Mempool {
// Update enabled / disabled state // Update enabled / disabled state
if is_close_to_tip { if is_close_to_tip {
info!("activating mempool: Zebra is close to the tip");
let tx_downloads = Box::pin(TxDownloads::new( let tx_downloads = Box::pin(TxDownloads::new(
Timeout::new(self.outbound.clone(), TRANSACTION_DOWNLOAD_TIMEOUT), Timeout::new(self.outbound.clone(), TRANSACTION_DOWNLOAD_TIMEOUT),
Timeout::new(self.tx_verifier.clone(), TRANSACTION_VERIFY_TIMEOUT), Timeout::new(self.tx_verifier.clone(), TRANSACTION_VERIFY_TIMEOUT),
@ -182,6 +220,8 @@ impl Mempool {
tx_downloads, tx_downloads,
}; };
} else { } else {
info!("deactivating mempool: Zebra is syncing lots of blocks");
self.active_state = ActiveState::Disabled self.active_state = ActiveState::Disabled
} }
} }
@ -415,6 +455,7 @@ fn reject_if_needed(
// If it was cancelled then a block was mined, or there was a network // If it was cancelled then a block was mined, or there was a network
// upgrade, etc. No reason to reject it. // upgrade, etc. No reason to reject it.
TransactionDownloadVerifyError::Cancelled => {} TransactionDownloadVerifyError::Cancelled => {}
// Consensus verification failed. Reject transaction to avoid // Consensus verification failed. Reject transaction to avoid
// having to download and verify it again just for it to fail again. // having to download and verify it again just for it to fail again.
TransactionDownloadVerifyError::Invalid(e) => { TransactionDownloadVerifyError::Invalid(e) => {

View File

@ -6,7 +6,19 @@ use serde::{Deserialize, Serialize};
/// Mempool configuration section. /// Mempool configuration section.
#[derive(Clone, Debug, Deserialize, Serialize)] #[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields, default)]
pub struct Config { pub struct Config {
/// If the state's best chain tip has reached this height, always enable the mempool,
/// regardless of Zebra's sync status.
///
/// Set to `None` by default: Zebra always checks the sync status before enabling the mempool.
//
// TODO:
// - allow the mempool to be enabled before the genesis block is committed?
// we could replace `Option` with an enum that has an `AlwaysEnable` variant
// - move debug configs last (needs #2847)
pub debug_enable_at_height: Option<u32>,
/// The mempool transaction cost limit. /// The mempool transaction cost limit.
/// ///
/// This limits the total serialized byte size of all transactions in the mempool. /// This limits the total serialized byte size of all transactions in the mempool.
@ -24,23 +36,28 @@ pub struct Config {
/// ///
/// This corresponds to `mempoolevictionmemoryminutes` from /// This corresponds to `mempoolevictionmemoryminutes` from
/// [ZIP-401](https://zips.z.cash/zip-0401#specification). /// [ZIP-401](https://zips.z.cash/zip-0401#specification).
//
// Note: Durations become a TOML table, so they must be the final item in the config
// We'll replace them with a more user-friendly format in #2847
pub eviction_memory_time: Duration, pub eviction_memory_time: Duration,
} }
/// Consensus rules:
///
/// > There MUST be a configuration option mempooltxcostlimit,
/// > which SHOULD default to 80000000.
/// >
/// > There MUST be a configuration option mempoolevictionmemoryminutes,
/// > which SHOULD default to 60 [minutes].
///
/// https://zips.z.cash/zip-0401#specification
impl Default for Config { impl Default for Config {
fn default() -> Self { fn default() -> Self {
Self { Self {
/// Consensus rules:
///
/// > There MUST be a configuration option mempooltxcostlimit,
/// > which SHOULD default to 80000000.
/// >
/// > There MUST be a configuration option mempoolevictionmemoryminutes,
/// > which SHOULD default to 60 [minutes].
///
/// https://zips.z.cash/zip-0401#specification
tx_cost_limit: 80_000_000, tx_cost_limit: 80_000_000,
eviction_memory_time: Duration::from_secs(60 * 60), eviction_memory_time: Duration::from_secs(60 * 60),
debug_enable_at_height: None,
} }
} }
} }

View File

@ -4,15 +4,17 @@
use std::time::Duration; use std::time::Duration;
use futures::{stream::FuturesUnordered, StreamExt}; use futures::{future, pin_mut, stream::FuturesUnordered, StreamExt};
use tokio::{task::JoinHandle, time::sleep}; use tokio::{sync::watch, task::JoinHandle, time::sleep};
use tower::{timeout::Timeout, BoxError, Service, ServiceExt}; use tower::{timeout::Timeout, BoxError, Service, ServiceExt};
use zebra_chain::block::Height;
use zebra_network as zn; use zebra_network as zn;
use zebra_state::ChainTipChange;
use super::{ use crate::components::{
super::{mempool, sync::SyncStatus}, mempool::{self, downloads::Gossip, Config},
downloads::Gossip, sync::SyncStatus,
}; };
#[cfg(test)] #[cfg(test)]
@ -35,9 +37,20 @@ const PEER_RESPONSE_TIMEOUT: Duration = Duration::from_secs(6);
/// The mempool transaction crawler. /// The mempool transaction crawler.
pub struct Crawler<PeerSet, Mempool> { pub struct Crawler<PeerSet, Mempool> {
/// The network peer set to crawl.
peer_set: Timeout<PeerSet>, peer_set: Timeout<PeerSet>,
/// The mempool service that receives crawled transaction IDs.
mempool: Mempool, mempool: Mempool,
status: SyncStatus,
/// Allows checking if we are near the tip to enable/disable the mempool crawler.
sync_status: SyncStatus,
/// Notifies the crawler when the best chain tip height changes.
chain_tip_change: ChainTipChange,
/// If the state's best chain tip has reached this height, always enable the mempool crawler.
debug_enable_at_height: Option<Height>,
} }
impl<PeerSet, Mempool> Crawler<PeerSet, Mempool> impl<PeerSet, Mempool> Crawler<PeerSet, Mempool>
@ -51,19 +64,67 @@ where
{ {
/// Spawn an asynchronous task to run the mempool crawler. /// Spawn an asynchronous task to run the mempool crawler.
pub fn spawn( pub fn spawn(
config: &Config,
peer_set: PeerSet, peer_set: PeerSet,
mempool: Mempool, mempool: Mempool,
status: SyncStatus, sync_status: SyncStatus,
chain_tip_change: ChainTipChange,
) -> JoinHandle<Result<(), BoxError>> { ) -> JoinHandle<Result<(), BoxError>> {
let crawler = Crawler { let crawler = Crawler {
peer_set: Timeout::new(peer_set, PEER_RESPONSE_TIMEOUT), peer_set: Timeout::new(peer_set, PEER_RESPONSE_TIMEOUT),
mempool, mempool,
status, sync_status,
chain_tip_change,
debug_enable_at_height: config.debug_enable_at_height.map(Height),
}; };
tokio::spawn(crawler.run()) tokio::spawn(crawler.run())
} }
/// Waits until the mempool crawler is enabled by a debug config option.
///
/// Returns an error if communication with the state is lost.
async fn wait_until_enabled_by_debug(&mut self) -> Result<(), watch::error::RecvError> {
// optimise non-debug performance
if self.debug_enable_at_height.is_none() {
return future::pending().await;
}
let enable_at_height = self
.debug_enable_at_height
.expect("unexpected debug_enable_at_height: just checked for None");
loop {
let best_tip_height = self
.chain_tip_change
.wait_for_tip_change()
.await?
.best_tip_height();
if best_tip_height >= enable_at_height {
return Ok(());
}
}
}
/// Waits until the mempool crawler is enabled.
///
/// Returns an error if communication with the syncer or state is lost.
async fn wait_until_enabled(&mut self) -> Result<(), watch::error::RecvError> {
let mut sync_status = self.sync_status.clone();
let tip_future = sync_status.wait_until_close_to_tip();
let debug_future = self.wait_until_enabled_by_debug();
pin_mut!(tip_future);
pin_mut!(debug_future);
let (result, _unready_future) = future::select(tip_future, debug_future)
.await
.factor_first();
result
}
/// Periodically crawl peers for transactions to include in the mempool. /// Periodically crawl peers for transactions to include in the mempool.
/// ///
/// Runs until the [`SyncStatus`] loses its connection to the chain syncer, which happens when /// Runs until the [`SyncStatus`] loses its connection to the chain syncer, which happens when
@ -71,12 +132,11 @@ where
pub async fn run(mut self) -> Result<(), BoxError> { pub async fn run(mut self) -> Result<(), BoxError> {
info!("initializing mempool crawler task"); info!("initializing mempool crawler task");
while self.status.wait_until_close_to_tip().await.is_ok() { loop {
self.wait_until_enabled().await?;
self.crawl_transactions().await?; self.crawl_transactions().await?;
sleep(RATE_LIMIT_DELAY).await; sleep(RATE_LIMIT_DELAY).await;
} }
Ok(())
} }
/// Crawl peers for transactions. /// Crawl peers for transactions.

View File

@ -3,17 +3,20 @@ use std::time::Duration;
use proptest::{collection::vec, prelude::*}; use proptest::{collection::vec, prelude::*};
use tokio::time; use tokio::time;
use zebra_chain::transaction::UnminedTxId; use zebra_chain::{parameters::Network, transaction::UnminedTxId};
use zebra_network as zn; use zebra_network as zn;
use zebra_state::ChainTipSender;
use zebra_test::mock_service::{MockService, PropTestAssertion}; use zebra_test::mock_service::{MockService, PropTestAssertion};
use super::{ use crate::components::{
super::{ mempool::{
super::{mempool, sync::RecentSyncLengths}, self,
crawler::{Crawler, SyncStatus, FANOUT, RATE_LIMIT_DELAY},
downloads::Gossip, downloads::Gossip,
error::MempoolError, error::MempoolError,
Config,
}, },
Crawler, SyncStatus, FANOUT, RATE_LIMIT_DELAY, sync::RecentSyncLengths,
}; };
/// The number of iterations to crawl while testing. /// The number of iterations to crawl while testing.
@ -54,7 +57,8 @@ proptest! {
sync_lengths.push(0); sync_lengths.push(0);
runtime.block_on(async move { runtime.block_on(async move {
let (mut peer_set, _mempool, sync_status, mut recent_sync_lengths) = setup_crawler(); let (mut peer_set, _mempool, sync_status, mut recent_sync_lengths, _chain_tip_sender,
) = setup_crawler();
time::pause(); time::pause();
@ -103,7 +107,7 @@ proptest! {
let transaction_id_count = transaction_ids.len(); let transaction_id_count = transaction_ids.len();
runtime.block_on(async move { runtime.block_on(async move {
let (mut peer_set, mut mempool, _sync_status, mut recent_sync_lengths) = let (mut peer_set, mut mempool, _sync_status, mut recent_sync_lengths, _chain_tip_sender) =
setup_crawler(); setup_crawler();
time::pause(); time::pause();
@ -144,7 +148,7 @@ proptest! {
let _guard = runtime.enter(); let _guard = runtime.enter();
runtime.block_on(async move { runtime.block_on(async move {
let (mut peer_set, mut mempool, _sync_status, mut recent_sync_lengths) = let (mut peer_set, mut mempool, _sync_status, mut recent_sync_lengths, _chain_tip_sender) =
setup_crawler(); setup_crawler();
time::pause(); time::pause();
@ -218,14 +222,36 @@ proptest! {
} }
/// Spawn a crawler instance using mock services. /// Spawn a crawler instance using mock services.
fn setup_crawler() -> (MockPeerSet, MockMempool, SyncStatus, RecentSyncLengths) { fn setup_crawler() -> (
MockPeerSet,
MockMempool,
SyncStatus,
RecentSyncLengths,
ChainTipSender,
) {
let peer_set = MockService::build().for_prop_tests(); let peer_set = MockService::build().for_prop_tests();
let mempool = MockService::build().for_prop_tests(); let mempool = MockService::build().for_prop_tests();
let (sync_status, recent_sync_lengths) = SyncStatus::new(); let (sync_status, recent_sync_lengths) = SyncStatus::new();
Crawler::spawn(peer_set.clone(), mempool.clone(), sync_status.clone()); // the network should be irrelevant here
let (chain_tip_sender, _latest_chain_tip, chain_tip_change) =
ChainTipSender::new(None, Network::Mainnet);
(peer_set, mempool, sync_status, recent_sync_lengths) Crawler::spawn(
&Config::default(),
peer_set.clone(),
mempool.clone(),
sync_status.clone(),
chain_tip_change,
);
(
peer_set,
mempool,
sync_status,
recent_sync_lengths,
chain_tip_sender,
)
} }
/// Intercept a request for mempool transaction IDs and respond with the `transaction_ids` list. /// Intercept a request for mempool transaction IDs and respond with the `transaction_ids` list.

View File

@ -714,6 +714,7 @@ fn sync_one_checkpoint_mainnet() -> Result<()> {
SMALL_CHECKPOINT_TIMEOUT, SMALL_CHECKPOINT_TIMEOUT,
None, None,
true, true,
None,
) )
.map(|_tempdir| ()) .map(|_tempdir| ())
} }
@ -730,6 +731,7 @@ fn sync_one_checkpoint_testnet() -> Result<()> {
SMALL_CHECKPOINT_TIMEOUT, SMALL_CHECKPOINT_TIMEOUT,
None, None,
true, true,
None,
) )
.map(|_tempdir| ()) .map(|_tempdir| ())
} }
@ -753,6 +755,7 @@ fn restart_stop_at_height_for_network(network: Network, height: Height) -> Resul
SMALL_CHECKPOINT_TIMEOUT, SMALL_CHECKPOINT_TIMEOUT,
None, None,
true, true,
None,
)?; )?;
// if stopping corrupts the rocksdb database, zebrad might hang or crash here // if stopping corrupts the rocksdb database, zebrad might hang or crash here
// if stopping does not write the rocksdb database to disk, Zebra will // if stopping does not write the rocksdb database to disk, Zebra will
@ -762,13 +765,30 @@ fn restart_stop_at_height_for_network(network: Network, height: Height) -> Resul
network, network,
"state is already at the configured height", "state is already at the configured height",
STOP_ON_LOAD_TIMEOUT, STOP_ON_LOAD_TIMEOUT,
Some(reuse_tempdir), reuse_tempdir,
false, false,
None,
)?; )?;
Ok(()) Ok(())
} }
/// Test if `zebrad` can activate the mempool on mainnet.
/// Debug activation happens after committing the genesis block.
#[test]
fn activate_mempool_mainnet() -> Result<()> {
sync_until(
Height(1),
Mainnet,
STOP_AT_HEIGHT_REGEX,
SMALL_CHECKPOINT_TIMEOUT,
None,
true,
Some(Height(0)),
)
.map(|_tempdir| ())
}
/// Test if `zebrad` can sync some larger checkpoints on mainnet. /// Test if `zebrad` can sync some larger checkpoints on mainnet.
/// ///
/// This test might fail or timeout on slow or unreliable networks, /// This test might fail or timeout on slow or unreliable networks,
@ -784,6 +804,7 @@ fn sync_large_checkpoints_mainnet() -> Result<()> {
LARGE_CHECKPOINT_TIMEOUT, LARGE_CHECKPOINT_TIMEOUT,
None, None,
true, true,
None,
)?; )?;
// if this sync fails, see the failure notes in `restart_stop_at_height` // if this sync fails, see the failure notes in `restart_stop_at_height`
sync_until( sync_until(
@ -791,8 +812,9 @@ fn sync_large_checkpoints_mainnet() -> Result<()> {
Mainnet, Mainnet,
"previous state height is greater than the stop height", "previous state height is greater than the stop height",
STOP_ON_LOAD_TIMEOUT, STOP_ON_LOAD_TIMEOUT,
Some(reuse_tempdir), reuse_tempdir,
false, false,
None,
)?; )?;
Ok(()) Ok(())
@ -805,6 +827,13 @@ fn sync_large_checkpoints_mainnet() -> Result<()> {
/// the output contains `stop_regex`. If `reuse_tempdir` is supplied, /// the output contains `stop_regex`. If `reuse_tempdir` is supplied,
/// use it as the test's temporary directory. /// use it as the test's temporary directory.
/// ///
/// If `check_legacy_chain` is true,
/// make sure the logs contain the legacy chain check.
///
/// If `enable_mempool_at_height` is `Some(Height(_))`,
/// configure `zebrad` to debug-enable the mempool at that height.
/// Then check the logs for the mempool being enabled.
///
/// If `stop_regex` is encountered before the process exits, kills the /// If `stop_regex` is encountered before the process exits, kills the
/// process, and mark the test as successful, even if `height` has not /// process, and mark the test as successful, even if `height` has not
/// been reached. /// been reached.
@ -819,8 +848,9 @@ fn sync_until(
network: Network, network: Network,
stop_regex: &str, stop_regex: &str,
timeout: Duration, timeout: Duration,
reuse_tempdir: Option<TempDir>, reuse_tempdir: impl Into<Option<TempDir>>,
check_legacy_chain: bool, check_legacy_chain: bool,
enable_mempool_at_height: impl Into<Option<Height>>,
) -> Result<TempDir> { ) -> Result<TempDir> {
zebra_test::init(); zebra_test::init();
@ -828,11 +858,15 @@ fn sync_until(
return testdir(); return testdir();
} }
let reuse_tempdir = reuse_tempdir.into();
let enable_mempool_at_height = enable_mempool_at_height.into();
// Use a persistent state, so we can handle large syncs // Use a persistent state, so we can handle large syncs
let mut config = persistent_test_config()?; let mut config = persistent_test_config()?;
// TODO: add convenience methods? // TODO: add convenience methods?
config.network.network = network; config.network.network = network;
config.state.debug_stop_at_height = Some(height.0); config.state.debug_stop_at_height = Some(height.0);
config.mempool.debug_enable_at_height = enable_mempool_at_height.map(|height| height.0);
let tempdir = if let Some(reuse_tempdir) = reuse_tempdir { let tempdir = if let Some(reuse_tempdir) = reuse_tempdir {
reuse_tempdir.replace_config(&mut config)? reuse_tempdir.replace_config(&mut config)?
@ -850,6 +884,11 @@ fn sync_until(
child.expect_stdout_line_matches("no legacy chain found")?; child.expect_stdout_line_matches("no legacy chain found")?;
} }
if enable_mempool_at_height.is_some() {
child.expect_stdout_line_matches("enabling mempool for debugging")?;
child.expect_stdout_line_matches("activating mempool")?;
}
child.expect_stdout_line_matches(stop_regex)?; child.expect_stdout_line_matches(stop_regex)?;
child.kill()?; child.kill()?;