From b274ee4066ed07b7ce664f2e41f1f8eb95dc5e9e Mon Sep 17 00:00:00 2001 From: teor Date: Wed, 13 Oct 2021 03:31:54 +1000 Subject: [PATCH] Pass the mempool config to the mempool (#2861) * Split mempool config into its own module Also: - expand config docs - clean up mempool imports * Pass the mempool config to the mempool * Create the transaction sender channel inside the mempool 1/2 This simplifies all the code that calls the mempool. Also: - update the mempool enabled state before returning the new mempool - add some test module doc comments * Refactor a setup function out of the mempool unit tests 2/2 Also: - update the setup function to handle the latest mempool changes * Clarify a comment Co-authored-by: Conrado Gouvea --- zebrad/src/commands/start.rs | 17 +- zebrad/src/components/inbound/tests.rs | 8 +- zebrad/src/components/mempool.rs | 64 ++--- zebrad/src/components/mempool/config.rs | 46 ++++ zebrad/src/components/mempool/tests/prop.rs | 15 +- zebrad/src/components/mempool/tests/vector.rs | 228 +++++------------- 6 files changed, 152 insertions(+), 226 deletions(-) create mode 100644 zebrad/src/components/mempool/config.rs diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index 3aef33da..06e77a42 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -26,10 +26,8 @@ use abscissa_core::{config, Command, FrameworkError, Options, Runnable}; use color_eyre::eyre::{eyre, Report}; use futures::{select, FutureExt}; -use std::collections::HashSet; use tokio::sync::oneshot; -use tower::builder::ServiceBuilder; -use tower::util::BoxService; +use tower::{builder::ServiceBuilder, util::BoxService}; use crate::{ components::{ @@ -92,20 +90,17 @@ impl StartCmd { ChainSync::new(&config, peer_set.clone(), state.clone(), chain_verifier); info!("initializing mempool"); - - let (mempool_transaction_sender, mempool_transaction_receiver) = - tokio::sync::watch::channel(HashSet::new()); - - let mempool_service = BoxService::new(Mempool::new( + let (mempool, mempool_transaction_receiver) = Mempool::new( + &config.mempool, peer_set.clone(), state, tx_verifier, sync_status.clone(), latest_chain_tip, chain_tip_change.clone(), - mempool_transaction_sender, - )); - let mempool = ServiceBuilder::new().buffer(20).service(mempool_service); + ); + let mempool = BoxService::new(mempool); + let mempool = ServiceBuilder::new().buffer(20).service(mempool); setup_tx .send((peer_set.clone(), address_book, mempool.clone())) diff --git a/zebrad/src/components/inbound/tests.rs b/zebrad/src/components/inbound/tests.rs index ec937a2f..73efe4a0 100644 --- a/zebrad/src/components/inbound/tests.rs +++ b/zebrad/src/components/inbound/tests.rs @@ -1,3 +1,5 @@ +//! Inbound service tests. + use std::{collections::HashSet, iter::FromIterator, net::SocketAddr, str::FromStr, sync::Arc}; use futures::FutureExt; @@ -576,16 +578,14 @@ async fn setup( .unwrap(); committed_blocks.push(block_one); - let (transaction_sender, transaction_receiver) = tokio::sync::watch::channel(HashSet::new()); - - let mut mempool_service = Mempool::new( + let (mut mempool_service, transaction_receiver) = Mempool::new( + &mempool::Config::default(), buffered_peer_set.clone(), state_service.clone(), buffered_tx_verifier.clone(), sync_status.clone(), latest_chain_tip, chain_tip_change.clone(), - transaction_sender, ); // Enable the mempool diff --git a/zebrad/src/components/mempool.rs b/zebrad/src/components/mempool.rs index 4d4b12ba..a423f972 100644 --- a/zebrad/src/components/mempool.rs +++ b/zebrad/src/components/mempool.rs @@ -1,14 +1,11 @@ //! Zebra mempool. -use serde::{Deserialize, Serialize}; - use std::{ collections::HashSet, future::Future, iter, pin::Pin, task::{Context, Poll}, - time::Duration, }; use futures::{future::FutureExt, stream::Stream}; @@ -24,8 +21,9 @@ use zebra_network as zn; use zebra_state as zs; use zebra_state::{ChainTipChange, TipAction}; -pub use crate::BoxError; +use crate::components::sync::SyncStatus; +mod config; mod crawler; pub mod downloads; mod error; @@ -35,23 +33,24 @@ mod storage; #[cfg(test)] mod tests; -pub use self::crawler::Crawler; -pub use self::error::MempoolError; -pub use self::storage::{ +pub use crate::BoxError; + +pub use config::Config; +pub use crawler::Crawler; +pub use error::MempoolError; +pub use gossip::gossip_mempool_transaction_id; +pub use storage::{ ExactTipRejectionError, SameEffectsChainRejectionError, SameEffectsTipRejectionError, }; #[cfg(test)] -pub use self::storage::tests::unmined_transactions_in_blocks; -pub use gossip::gossip_mempool_transaction_id; +pub use storage::tests::unmined_transactions_in_blocks; -use self::downloads::{ +use downloads::{ Downloads as TxDownloads, Gossip, TransactionDownloadVerifyError, TRANSACTION_DOWNLOAD_TIMEOUT, TRANSACTION_VERIFY_TIMEOUT, }; -use super::sync::SyncStatus; - type Outbound = Buffer, zn::Request>; type State = Buffer, zs::Request>; type TxVerifier = Buffer< @@ -97,30 +96,6 @@ enum ActiveState { }, } -/// Mempool configuration section. -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct Config { - /// The transaction cost limit - pub tx_cost_limit: u32, - /// Max amount of minutes for transactions to be in recently evicted - pub eviction_memory_time: Duration, -} - -/// Consensus rules: -/// -/// - There MUST be a configuration option mempooltxcostlimit, which SHOULD default to 80000000. -/// - There MUST be a configuration option mempoolevictionmemoryminutes, which SHOULD default to 60. -/// -/// https://zips.z.cash/zip-0401#specification -impl Default for Config { - fn default() -> Self { - Self { - tx_cost_limit: 80_000_000, - eviction_memory_time: Duration::from_secs(60 * 60), - } - } -} - /// Mempool async management and query service. /// /// The mempool is the set of all verified transactions that this node is aware @@ -157,15 +132,18 @@ pub struct Mempool { impl Mempool { pub(crate) fn new( + _config: &Config, outbound: Outbound, state: State, tx_verifier: TxVerifier, sync_status: SyncStatus, latest_chain_tip: zs::LatestChainTip, chain_tip_change: ChainTipChange, - transaction_sender: watch::Sender>, - ) -> Self { - Mempool { + ) -> (Self, watch::Receiver>) { + let (transaction_sender, transaction_receiver) = + tokio::sync::watch::channel(HashSet::new()); + + let mut service = Mempool { active_state: ActiveState::Disabled, sync_status, latest_chain_tip, @@ -174,7 +152,13 @@ impl Mempool { state, tx_verifier, transaction_sender, - } + }; + + // Make sure `is_enabled` is accurate. + // Otherwise, it is only updated in `poll_ready`, right before each service call. + service.update_state(); + + (service, transaction_receiver) } /// Update the mempool state (enabled / disabled) depending on how close to diff --git a/zebrad/src/components/mempool/config.rs b/zebrad/src/components/mempool/config.rs new file mode 100644 index 00000000..08592b5c --- /dev/null +++ b/zebrad/src/components/mempool/config.rs @@ -0,0 +1,46 @@ +//! User-configurable mempool parameters. + +use std::time::Duration; + +use serde::{Deserialize, Serialize}; + +/// Mempool configuration section. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct Config { + /// The mempool transaction cost limit. + /// + /// This limits the total serialized byte size of all transactions in the mempool. + /// + /// This corresponds to `mempooltxcostlimit` from [ZIP-401](https://zips.z.cash/zip-0401#specification). + pub tx_cost_limit: u32, + + /// The mempool transaction eviction age limit. + /// + /// This limits the maximum amount of time evicted transaction IDs stay in the mempool rejection list. + /// Transactions are randomly evicted from the mempool when the mempool reaches [`tx_cost_limit`]. + /// + /// (Transactions can also be rejected by the mempool for other reasons. + /// Different rejection reasons can have different age limits.) + /// + /// This corresponds to `mempoolevictionmemoryminutes` from + /// [ZIP-401](https://zips.z.cash/zip-0401#specification). + pub eviction_memory_time: Duration, +} + +/// Consensus rules: +/// +/// > There MUST be a configuration option mempooltxcostlimit, +/// > which SHOULD default to 80000000. +/// > +/// > There MUST be a configuration option mempoolevictionmemoryminutes, +/// > which SHOULD default to 60 [minutes]. +/// +/// https://zips.z.cash/zip-0401#specification +impl Default for Config { + fn default() -> Self { + Self { + tx_cost_limit: 80_000_000, + eviction_memory_time: Duration::from_secs(60 * 60), + } + } +} diff --git a/zebrad/src/components/mempool/tests/prop.rs b/zebrad/src/components/mempool/tests/prop.rs index efe20c8f..393a9eb5 100644 --- a/zebrad/src/components/mempool/tests/prop.rs +++ b/zebrad/src/components/mempool/tests/prop.rs @@ -1,5 +1,6 @@ +//! Randomised property tests for the mempool. + use proptest::prelude::*; -use std::collections::HashSet; use tokio::time; use tower::{buffer::Buffer, util::BoxService}; @@ -9,8 +10,10 @@ use zebra_network as zn; use zebra_state::{self as zs, ChainTipBlock, ChainTipSender}; use zebra_test::mock_service::{MockService, PropTestAssertion}; -use super::super::Mempool; -use crate::components::sync::{RecentSyncLengths, SyncStatus}; +use crate::components::{ + mempool::{self, Mempool}, + sync::{RecentSyncLengths, SyncStatus}, +}; /// A [`MockService`] representing the network service. type MockPeerSet = MockService; @@ -151,16 +154,14 @@ fn setup( let (sync_status, recent_syncs) = SyncStatus::new(); let (chain_tip_sender, latest_chain_tip, chain_tip_change) = ChainTipSender::new(None, network); - let (transaction_sender, _transaction_receiver) = tokio::sync::watch::channel(HashSet::new()); - - let mempool = Mempool::new( + let (mempool, _transaction_receiver) = Mempool::new( + &mempool::Config::default(), Buffer::new(BoxService::new(peer_set.clone()), 1), Buffer::new(BoxService::new(state_service.clone()), 1), Buffer::new(BoxService::new(tx_verifier.clone()), 1), sync_status, latest_chain_tip, chain_tip_change, - transaction_sender, ); ( diff --git a/zebrad/src/components/mempool/tests/vector.rs b/zebrad/src/components/mempool/tests/vector.rs index c60295b8..e84262e5 100644 --- a/zebrad/src/components/mempool/tests/vector.rs +++ b/zebrad/src/components/mempool/tests/vector.rs @@ -1,3 +1,5 @@ +//! Fixed test vectors for the mempool. + use std::{collections::HashSet, sync::Arc}; use color_eyre::Report; @@ -5,27 +7,31 @@ use tokio::time; use tower::{ServiceBuilder, ServiceExt}; use zebra_chain::{block::Block, parameters::Network, serialization::ZcashDeserializeInto}; -use zebra_consensus::Config as ConsensusConfig; +use zebra_consensus::transaction as tx; use zebra_state::Config as StateConfig; -use zebra_test::mock_service::MockService; +use zebra_test::mock_service::{MockService, PanicAssertion}; -use super::super::{storage::tests::unmined_transactions_in_blocks, *}; +use crate::components::{ + mempool::{self, storage::tests::unmined_transactions_in_blocks, *}, + sync::RecentSyncLengths, +}; + +/// A [`MockService`] representing the network service. +type MockPeerSet = MockService; + +/// The unmocked Zebra state service's type. +type StateService = Buffer, zs::Request>; + +/// A [`MockService`] representing the Zebra transaction verifier service. +type MockTxVerifier = MockService; #[tokio::test] async fn mempool_service_basic() -> Result<(), Report> { // Using the mainnet for now let network = Network::Mainnet; - let consensus_config = ConsensusConfig::default(); - let state_config = StateConfig::ephemeral(); - let peer_set = MockService::build().for_unit_tests(); - let (sync_status, mut recent_syncs) = SyncStatus::new(); - let (state, latest_chain_tip, chain_tip_change) = - zebra_state::init(state_config.clone(), network); - let state_service = ServiceBuilder::new().buffer(1).service(state); - let (_chain_verifier, tx_verifier) = - zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone()) - .await; + let (mut service, _peer_set, _state_service, _tx_verifier, mut recent_syncs) = + setup(network).await; // get the genesis block transactions from the Zcash blockchain. let mut unmined_transactions = unmined_transactions_in_blocks(..=10, network); @@ -35,19 +41,6 @@ async fn mempool_service_basic() -> Result<(), Report> { let last_transaction = unmined_transactions.next_back().unwrap(); let more_transactions = unmined_transactions; - // Start the mempool service - let (transaction_sender, _transaction_receiver) = tokio::sync::watch::channel(HashSet::new()); - - let mut service = Mempool::new( - Buffer::new(BoxService::new(peer_set), 1), - state_service.clone(), - tx_verifier, - sync_status, - latest_chain_tip, - chain_tip_change, - transaction_sender, - ); - // Enable the mempool let _ = service.enable(&mut recent_syncs).await; @@ -138,17 +131,9 @@ async fn mempool_service_basic() -> Result<(), Report> { async fn mempool_queue() -> Result<(), Report> { // Using the mainnet for now let network = Network::Mainnet; - let consensus_config = ConsensusConfig::default(); - let state_config = StateConfig::ephemeral(); - let peer_set = MockService::build().for_unit_tests(); - let (sync_status, mut recent_syncs) = SyncStatus::new(); - let (state, latest_chain_tip, chain_tip_change) = - zebra_state::init(state_config.clone(), network); - let state_service = ServiceBuilder::new().buffer(1).service(state); - let (_chain_verifier, tx_verifier) = - zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone()) - .await; + let (mut service, _peer_set, _state_service, _tx_verifier, mut recent_syncs) = + setup(network).await; // Get transactions to use in the test let unmined_transactions = unmined_transactions_in_blocks(..=10, network); @@ -164,19 +149,6 @@ async fn mempool_queue() -> Result<(), Report> { // The last transaction that will be added in the mempool (and thus not rejected) let stored_tx = transactions.next_back().unwrap().clone(); - // Start the mempool service - let (transaction_sender, _transaction_receiver) = tokio::sync::watch::channel(HashSet::new()); - - let mut service = Mempool::new( - Buffer::new(BoxService::new(peer_set), 1), - state_service.clone(), - tx_verifier, - sync_status, - latest_chain_tip, - chain_tip_change, - transaction_sender, - ); - // Enable the mempool let _ = service.enable(&mut recent_syncs).await; @@ -247,16 +219,9 @@ async fn mempool_queue() -> Result<(), Report> { async fn mempool_service_disabled() -> Result<(), Report> { // Using the mainnet for now let network = Network::Mainnet; - let consensus_config = ConsensusConfig::default(); - let state_config = StateConfig::ephemeral(); - let peer_set = MockService::build().for_unit_tests(); - let (sync_status, mut recent_syncs) = SyncStatus::new(); - let (state, latest_chain_tip, chain_tip_change) = zebra_state::init(state_config, network); - let state_service = ServiceBuilder::new().buffer(1).service(state); - let (_chain_verifier, tx_verifier) = - zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone()) - .await; + let (mut service, _peer_set, _state_service, _tx_verifier, mut recent_syncs) = + setup(network).await; // get the genesis block transactions from the Zcash blockchain. let mut unmined_transactions = unmined_transactions_in_blocks(..=10, network); @@ -265,19 +230,6 @@ async fn mempool_service_disabled() -> Result<(), Report> { .expect("Missing genesis transaction"); let more_transactions = unmined_transactions; - // Start the mempool service - let (transaction_sender, _transaction_receiver) = tokio::sync::watch::channel(HashSet::new()); - - let mut service = Mempool::new( - Buffer::new(BoxService::new(peer_set), 1), - state_service.clone(), - tx_verifier, - sync_status, - latest_chain_tip, - chain_tip_change, - transaction_sender, - ); - // Test if mempool is disabled (it should start disabled) assert!(!service.is_enabled()); @@ -374,33 +326,12 @@ async fn mempool_cancel_mined() -> Result<(), Report> { // Using the mainnet for now let network = Network::Mainnet; - let consensus_config = ConsensusConfig::default(); - let state_config = StateConfig::ephemeral(); - let peer_set = MockService::build().for_unit_tests(); - let (sync_status, mut recent_syncs) = SyncStatus::new(); - let (state, latest_chain_tip, chain_tip_change) = - zebra_state::init(state_config.clone(), network); - let mut state_service = ServiceBuilder::new().buffer(1).service(state); - let (_chain_verifier, tx_verifier) = - zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone()) - .await; + let (mut mempool, _peer_set, mut state_service, _tx_verifier, mut recent_syncs) = + setup(network).await; time::pause(); - // Start the mempool service - let (transaction_sender, _transaction_receiver) = tokio::sync::watch::channel(HashSet::new()); - - let mut mempool = Mempool::new( - Buffer::new(BoxService::new(peer_set), 1), - state_service.clone(), - tx_verifier, - sync_status, - latest_chain_tip, - chain_tip_change, - transaction_sender, - ); - // Enable the mempool let _ = mempool.enable(&mut recent_syncs).await; assert!(mempool.is_enabled()); @@ -490,30 +421,9 @@ async fn mempool_cancel_downloads_after_network_upgrade() -> Result<(), Report> // Using the mainnet for now let network = Network::Mainnet; - let consensus_config = ConsensusConfig::default(); - let state_config = StateConfig::ephemeral(); - let peer_set = MockService::build().for_unit_tests(); - let (sync_status, mut recent_syncs) = SyncStatus::new(); - let (state, latest_chain_tip, chain_tip_change) = - zebra_state::init(state_config.clone(), network); - let mut state_service = ServiceBuilder::new().buffer(1).service(state); - let (_chain_verifier, tx_verifier) = - zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone()) - .await; - - // Start the mempool service - let (transaction_sender, _transaction_receiver) = tokio::sync::watch::channel(HashSet::new()); - - let mut mempool = Mempool::new( - Buffer::new(BoxService::new(peer_set), 1), - state_service.clone(), - tx_verifier, - sync_status, - latest_chain_tip, - chain_tip_change, - transaction_sender, - ); + let (mut mempool, _peer_set, mut state_service, _tx_verifier, mut recent_syncs) = + setup(network).await; // Enable the mempool let _ = mempool.enable(&mut recent_syncs).await; @@ -579,18 +489,9 @@ async fn mempool_cancel_downloads_after_network_upgrade() -> Result<(), Report> async fn mempool_failed_verification_is_rejected() -> Result<(), Report> { // Using the mainnet for now let network = Network::Mainnet; - let consensus_config = ConsensusConfig::default(); - let state_config = StateConfig::ephemeral(); - let peer_set = MockService::build().for_unit_tests(); - let (sync_status, mut recent_syncs) = SyncStatus::new(); - let (state, latest_chain_tip, chain_tip_change) = - zebra_state::init(state_config.clone(), network); - let mut state_service = ServiceBuilder::new().buffer(1).service(state); - let (_chain_verifier, _tx_verifier) = - zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone()) - .await; - let mut tx_verifier = MockService::build().for_unit_tests(); + let (mut mempool, _peer_set, mut state_service, mut tx_verifier, mut recent_syncs) = + setup(network).await; // Get transactions to use in the test let mut unmined_transactions = unmined_transactions_in_blocks(1..=2, network); @@ -598,19 +499,6 @@ async fn mempool_failed_verification_is_rejected() -> Result<(), Report> { time::pause(); - // Start the mempool service - let (transaction_sender, _transaction_receiver) = tokio::sync::watch::channel(HashSet::new()); - - let mut mempool = Mempool::new( - Buffer::new(BoxService::new(peer_set.clone()), 1), - state_service.clone(), - Buffer::new(BoxService::new(tx_verifier.clone()), 1), - sync_status, - latest_chain_tip, - chain_tip_change, - transaction_sender, - ); - // Enable the mempool let _ = mempool.enable(&mut recent_syncs).await; @@ -684,17 +572,9 @@ async fn mempool_failed_verification_is_rejected() -> Result<(), Report> { async fn mempool_failed_download_is_not_rejected() -> Result<(), Report> { // Using the mainnet for now let network = Network::Mainnet; - let consensus_config = ConsensusConfig::default(); - let state_config = StateConfig::ephemeral(); - let mut peer_set = MockService::build().for_unit_tests(); - let (sync_status, mut recent_syncs) = SyncStatus::new(); - let (state, latest_chain_tip, chain_tip_change) = - zebra_state::init(state_config.clone(), network); - let mut state_service = ServiceBuilder::new().buffer(1).service(state); - let (_chain_verifier, tx_verifier) = - zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone()) - .await; + let (mut mempool, mut peer_set, mut state_service, _tx_verifier, mut recent_syncs) = + setup(network).await; // Get transactions to use in the test let mut unmined_transactions = unmined_transactions_in_blocks(1..=2, network); @@ -702,19 +582,6 @@ async fn mempool_failed_download_is_not_rejected() -> Result<(), Report> { time::pause(); - // Start the mempool service - let (transaction_sender, _transaction_receiver) = tokio::sync::watch::channel(HashSet::new()); - - let mut mempool = Mempool::new( - Buffer::new(BoxService::new(peer_set.clone()), 1), - state_service.clone(), - tx_verifier, - sync_status, - latest_chain_tip, - chain_tip_change, - transaction_sender, - ); - // Enable the mempool let _ = mempool.enable(&mut recent_syncs).await; @@ -778,3 +645,36 @@ async fn mempool_failed_download_is_not_rejected() -> Result<(), Report> { Ok(()) } + +/// Create a new [`Mempool`] instance using mocked services. +async fn setup( + network: Network, +) -> ( + Mempool, + MockPeerSet, + StateService, + MockTxVerifier, + RecentSyncLengths, +) { + let peer_set = MockService::build().for_unit_tests(); + + let state_config = StateConfig::ephemeral(); + let (state, latest_chain_tip, chain_tip_change) = zebra_state::init(state_config, network); + let state_service = ServiceBuilder::new().buffer(1).service(state); + + let tx_verifier = MockService::build().for_unit_tests(); + + let (sync_status, recent_syncs) = SyncStatus::new(); + + let (mempool, _mempool_transaction_receiver) = Mempool::new( + &mempool::Config::default(), + Buffer::new(BoxService::new(peer_set.clone()), 1), + state_service.clone(), + Buffer::new(BoxService::new(tx_verifier.clone()), 1), + sync_status, + latest_chain_tip, + chain_tip_change, + ); + + (mempool, peer_set, state_service, tx_verifier, recent_syncs) +}