diff --git a/zebra-state/Cargo.toml b/zebra-state/Cargo.toml index bb0f1031..fd072508 100644 --- a/zebra-state/Cargo.toml +++ b/zebra-state/Cargo.toml @@ -6,7 +6,7 @@ license = "MIT OR Apache-2.0" edition = "2018" [features] -proptest-impl = ["proptest", "zebra-test"] +proptest-impl = ["proptest", "proptest-derive", "zebra-test"] [dependencies] zebra-chain = { path = "../zebra-chain" } @@ -34,6 +34,7 @@ rlimit = "0.5.4" multiset = "0.0.5" proptest = { version = "0.10.1", optional = true } +proptest-derive = { version = "0.3", optional = true } zebra-test = { path = "../zebra-test/", optional = true } [dev-dependencies] diff --git a/zebra-state/src/lib.rs b/zebra-state/src/lib.rs index ffda8645..3621509f 100644 --- a/zebra-state/src/lib.rs +++ b/zebra-state/src/lib.rs @@ -41,6 +41,9 @@ pub use service::{ }; #[cfg(any(test, feature = "proptest-impl"))] -pub use service::init_test; +pub use service::{ + chain_tip::{ChainTipBlock, ChainTipSender}, + init_test, +}; pub(crate) use request::ContextuallyValidBlock; diff --git a/zebra-state/src/service/chain_tip.rs b/zebra-state/src/service/chain_tip.rs index 167da08d..38e76e20 100644 --- a/zebra-state/src/service/chain_tip.rs +++ b/zebra-state/src/service/chain_tip.rs @@ -10,6 +10,9 @@ use std::sync::Arc; use tokio::sync::watch; use tracing::instrument; +#[cfg(any(test, feature = "proptest-impl"))] +use proptest_derive::Arbitrary; + use zebra_chain::{ block, chain_tip::ChainTip, @@ -33,6 +36,7 @@ type ChainTipData = Option; /// Used to efficiently update [`ChainTipSender`], [`LatestChainTip`], /// and [`ChainTipChange`]. #[derive(Clone, Debug, PartialEq, Eq)] +#[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))] pub struct ChainTipBlock { /// The hash of the best chain tip block. pub hash: block::Hash, diff --git a/zebrad/Cargo.toml b/zebrad/Cargo.toml index 8cb6991b..363d0d08 100644 --- a/zebrad/Cargo.toml +++ b/zebrad/Cargo.toml @@ -61,6 +61,7 @@ proptest = "0.10" proptest-derive = "0.3" zebra-chain = { path = "../zebra-chain", features = ["proptest-impl"] } +zebra-state = { path = "../zebra-state", features = ["proptest-impl"] } zebra-test = { path = "../zebra-test" } [features] diff --git a/zebrad/src/components/mempool.rs b/zebrad/src/components/mempool.rs index 3372f8b0..66289c2a 100644 --- a/zebrad/src/components/mempool.rs +++ b/zebrad/src/components/mempool.rs @@ -40,8 +40,6 @@ use self::downloads::{ Downloads as TxDownloads, Gossip, TRANSACTION_DOWNLOAD_TIMEOUT, TRANSACTION_VERIFY_TIMEOUT, }; -#[cfg(test)] -use super::sync::RecentSyncLengths; use super::sync::SyncStatus; type Outbound = Buffer, zn::Request>; @@ -175,44 +173,6 @@ impl Mempool { } } - /// Get the storage field of the mempool for testing purposes. - #[cfg(test)] - pub fn storage(&mut self) -> &mut storage::Storage { - match &mut self.active_state { - ActiveState::Disabled => panic!("mempool must be enabled"), - ActiveState::Enabled { storage, .. } => storage, - } - } - - /// Get the transaction downloader of the mempool for testing purposes. - #[cfg(test)] - pub fn tx_downloads(&self) -> &Pin> { - match &self.active_state { - ActiveState::Disabled => panic!("mempool must be enabled"), - ActiveState::Enabled { tx_downloads, .. } => tx_downloads, - } - } - - /// Enable the mempool by pretending the synchronization is close to the tip. - #[cfg(test)] - pub async fn enable(&mut self, recent_syncs: &mut RecentSyncLengths) { - use tower::ServiceExt; - // Pretend we're close to tip - SyncStatus::sync_close_to_tip(recent_syncs); - // Make a dummy request to poll the mempool and make it enable itself - let _ = self.oneshot(Request::TransactionIds).await; - } - - /// Disable the mempool by pretending the synchronization is far from the tip. - #[cfg(test)] - pub async fn disable(&mut self, recent_syncs: &mut RecentSyncLengths) { - use tower::ServiceExt; - // Pretend we're far from the tip - SyncStatus::sync_far_from_tip(recent_syncs); - // Make a dummy request to poll the mempool and make it disable itself - let _ = self.oneshot(Request::TransactionIds).await; - } - /// Check if transaction should be downloaded and/or verified. /// /// If it is already in the mempool (or in its rejected list) diff --git a/zebrad/src/components/mempool/tests.rs b/zebrad/src/components/mempool/tests.rs index b82724d0..0511d883 100644 --- a/zebrad/src/components/mempool/tests.rs +++ b/zebrad/src/components/mempool/tests.rs @@ -1,590 +1,50 @@ -use super::*; -use color_eyre::Report; -use std::{collections::HashSet, sync::Arc}; -use storage::tests::unmined_transactions_in_blocks; -use tokio::time; -use tower::{ServiceBuilder, ServiceExt}; +use std::pin::Pin; -use zebra_chain::block::Block; -use zebra_chain::serialization::ZcashDeserializeInto; -use zebra_consensus::Config as ConsensusConfig; -use zebra_state::Config as StateConfig; -use zebra_test::mock_service::MockService; +use tower::ServiceExt; -#[tokio::test] -async fn mempool_service_basic() -> Result<(), Report> { - // Using the mainnet for now - let network = Network::Mainnet; - let consensus_config = ConsensusConfig::default(); - let state_config = StateConfig::ephemeral(); - let peer_set = MockService::build().for_unit_tests(); - let (sync_status, mut recent_syncs) = SyncStatus::new(); - let (state, latest_chain_tip, chain_tip_change) = - zebra_state::init(state_config.clone(), network); +use super::{storage::Storage, ActiveState, InboundTxDownloads, Mempool, Request}; +use crate::components::sync::{RecentSyncLengths, SyncStatus}; - let state_service = ServiceBuilder::new().buffer(1).service(state); - let (_chain_verifier, tx_verifier) = - zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone()) - .await; +mod prop; +mod vector; - // get the genesis block transactions from the Zcash blockchain. - let mut unmined_transactions = unmined_transactions_in_blocks(..=10, network); - let genesis_transaction = unmined_transactions - .next() - .expect("Missing genesis transaction"); - let txid = unmined_transactions.next_back().unwrap().id; - let more_transactions = unmined_transactions; - - // Start the mempool service - let mut service = Mempool::new( - network, - Buffer::new(BoxService::new(peer_set), 1), - state_service.clone(), - tx_verifier, - sync_status, - latest_chain_tip, - chain_tip_change, - ); - - // Enable the mempool - let _ = service.enable(&mut recent_syncs).await; - - // Insert the genesis block coinbase transaction into the mempool storage. - service.storage().insert(genesis_transaction.clone())?; - - // Test `Request::TransactionIds` - let response = service - .ready_and() - .await - .unwrap() - .call(Request::TransactionIds) - .await - .unwrap(); - let genesis_transaction_ids = match response { - Response::TransactionIds(ids) => ids, - _ => unreachable!("will never happen in this test"), - }; - - // Test `Request::TransactionsById` - let genesis_transactions_hash_set = genesis_transaction_ids - .iter() - .copied() - .collect::>(); - let response = service - .ready_and() - .await - .unwrap() - .call(Request::TransactionsById( - genesis_transactions_hash_set.clone(), - )) - .await - .unwrap(); - let transactions = match response { - Response::Transactions(transactions) => transactions, - _ => unreachable!("will never happen in this test"), - }; - - // Make sure the transaction from the blockchain test vector is the same as the - // response of `Request::TransactionsById` - assert_eq!(genesis_transaction, transactions[0]); - - // Insert more transactions into the mempool storage. - // This will cause the genesis transaction to be moved into rejected. - // Skip the last (will be used later) - for tx in more_transactions { - service.storage().insert(tx.clone())?; - } - - // Test `Request::RejectedTransactionIds` - let response = service - .ready_and() - .await - .unwrap() - .call(Request::RejectedTransactionIds( - genesis_transactions_hash_set, - )) - .await - .unwrap(); - let rejected_ids = match response { - Response::RejectedTransactionIds(ids) => ids, - _ => unreachable!("will never happen in this test"), - }; - - assert_eq!(rejected_ids, genesis_transaction_ids); - - // Test `Request::Queue` - // Use the ID of the last transaction in the list - let response = service - .ready_and() - .await - .unwrap() - .call(Request::Queue(vec![txid.into()])) - .await - .unwrap(); - let queued_responses = match response { - Response::Queued(queue_responses) => queue_responses, - _ => unreachable!("will never happen in this test"), - }; - assert_eq!(queued_responses.len(), 1); - assert!(queued_responses[0].is_ok()); - assert_eq!(service.tx_downloads().in_flight(), 1); - - Ok(()) -} - -#[tokio::test] -async fn mempool_queue() -> Result<(), Report> { - // Using the mainnet for now - let network = Network::Mainnet; - let consensus_config = ConsensusConfig::default(); - let state_config = StateConfig::ephemeral(); - let peer_set = MockService::build().for_unit_tests(); - let (sync_status, mut recent_syncs) = SyncStatus::new(); - let (state, latest_chain_tip, chain_tip_change) = - zebra_state::init(state_config.clone(), network); - - let state_service = ServiceBuilder::new().buffer(1).service(state); - let (_chain_verifier, tx_verifier) = - zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone()) - .await; - - // Get transactions to use in the test - let unmined_transactions = unmined_transactions_in_blocks(..=10, network); - let mut transactions = unmined_transactions; - // Split unmined_transactions into: - // [rejected_tx, transactions..., stored_tx, new_tx] - // - // The first transaction to be added in the mempool which will be eventually - // put in the rejected list - let rejected_tx = transactions.next().unwrap().clone(); - // A transaction not in the mempool that will be Queued - let new_tx = transactions.next_back().unwrap(); - // The last transaction that will be added in the mempool (and thus not rejected) - let stored_tx = transactions.next_back().unwrap().clone(); - - // Start the mempool service - let mut service = Mempool::new( - network, - Buffer::new(BoxService::new(peer_set), 1), - state_service.clone(), - tx_verifier, - sync_status, - latest_chain_tip, - chain_tip_change, - ); - - // Enable the mempool - let _ = service.enable(&mut recent_syncs).await; - - // Insert [rejected_tx, transactions..., stored_tx] into the mempool storage. - // Insert the genesis block coinbase transaction into the mempool storage. - service.storage().insert(rejected_tx.clone())?; - // Insert more transactions into the mempool storage. - // This will cause the `rejected_tx` to be moved into rejected. - for tx in transactions { - service.storage().insert(tx.clone())?; - } - service.storage().insert(stored_tx.clone())?; - - // Test `Request::Queue` for a new transaction - let response = service - .ready_and() - .await - .unwrap() - .call(Request::Queue(vec![new_tx.id.into()])) - .await - .unwrap(); - let queued_responses = match response { - Response::Queued(queue_responses) => queue_responses, - _ => unreachable!("will never happen in this test"), - }; - assert_eq!(queued_responses.len(), 1); - assert!(queued_responses[0].is_ok()); - - // Test `Request::Queue` for a transaction already in the mempool - let response = service - .ready_and() - .await - .unwrap() - .call(Request::Queue(vec![stored_tx.id.into()])) - .await - .unwrap(); - let queued_responses = match response { - Response::Queued(queue_responses) => queue_responses, - _ => unreachable!("will never happen in this test"), - }; - assert_eq!(queued_responses.len(), 1); - assert_eq!(queued_responses[0], Err(MempoolError::InMempool)); - - // Test `Request::Queue` for a transaction rejected by the mempool - let response = service - .ready_and() - .await - .unwrap() - .call(Request::Queue(vec![rejected_tx.id.into()])) - .await - .unwrap(); - let queued_responses = match response { - Response::Queued(queue_responses) => queue_responses, - _ => unreachable!("will never happen in this test"), - }; - assert_eq!(queued_responses.len(), 1); - assert_eq!(queued_responses[0], Err(MempoolError::Rejected)); - - Ok(()) -} - -#[tokio::test] -async fn mempool_service_disabled() -> Result<(), Report> { - // Using the mainnet for now - let network = Network::Mainnet; - let consensus_config = ConsensusConfig::default(); - let state_config = StateConfig::ephemeral(); - let peer_set = MockService::build().for_unit_tests(); - let (sync_status, mut recent_syncs) = SyncStatus::new(); - - let (state, latest_chain_tip, chain_tip_change) = zebra_state::init(state_config, network); - let state_service = ServiceBuilder::new().buffer(1).service(state); - let (_chain_verifier, tx_verifier) = - zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone()) - .await; - - // get the genesis block transactions from the Zcash blockchain. - let mut unmined_transactions = unmined_transactions_in_blocks(..=10, network); - let genesis_transaction = unmined_transactions - .next() - .expect("Missing genesis transaction"); - let more_transactions = unmined_transactions; - - // Start the mempool service - let mut service = Mempool::new( - network, - Buffer::new(BoxService::new(peer_set), 1), - state_service.clone(), - tx_verifier, - sync_status, - latest_chain_tip, - chain_tip_change, - ); - - // Test if mempool is disabled (it should start disabled) - assert!(!service.is_enabled()); - - // Enable the mempool - let _ = service.enable(&mut recent_syncs).await; - - assert!(service.is_enabled()); - - // Insert the genesis block coinbase transaction into the mempool storage. - service.storage().insert(genesis_transaction.clone())?; - - // Test if the mempool answers correctly (i.e. is enabled) - let response = service - .ready_and() - .await - .unwrap() - .call(Request::TransactionIds) - .await - .unwrap(); - let _genesis_transaction_ids = match response { - Response::TransactionIds(ids) => ids, - _ => unreachable!("will never happen in this test"), - }; - - // Queue a transaction for download - // Use the ID of the last transaction in the list - let txid = more_transactions.last().unwrap().id; - let response = service - .ready_and() - .await - .unwrap() - .call(Request::Queue(vec![txid.into()])) - .await - .unwrap(); - let queued_responses = match response { - Response::Queued(queue_responses) => queue_responses, - _ => unreachable!("will never happen in this test"), - }; - assert_eq!(queued_responses.len(), 1); - assert!(queued_responses[0].is_ok()); - assert_eq!(service.tx_downloads().in_flight(), 1); - - // Disable the mempool - let _ = service.disable(&mut recent_syncs).await; - - // Test if mempool is disabled again - assert!(!service.is_enabled()); - - // Test if the mempool returns no transactions when disabled - let response = service - .ready_and() - .await - .unwrap() - .call(Request::TransactionIds) - .await - .unwrap(); - match response { - Response::TransactionIds(ids) => { - assert_eq!( - ids.len(), - 0, - "mempool should return no transactions when disabled" - ) +impl Mempool { + /// Get the storage field of the mempool for testing purposes. + pub fn storage(&mut self) -> &mut Storage { + match &mut self.active_state { + ActiveState::Disabled => panic!("mempool must be enabled"), + ActiveState::Enabled { storage, .. } => storage, } - _ => unreachable!("will never happen in this test"), - }; - - // Test if the mempool returns to Queue requests correctly when disabled - let response = service - .ready_and() - .await - .unwrap() - .call(Request::Queue(vec![txid.into()])) - .await - .unwrap(); - let queued_responses = match response { - Response::Queued(queue_responses) => queue_responses, - _ => unreachable!("will never happen in this test"), - }; - assert_eq!(queued_responses.len(), 1); - assert_eq!(queued_responses[0], Err(MempoolError::Disabled)); - - Ok(()) -} - -#[tokio::test] -async fn mempool_cancel_mined() -> Result<(), Report> { - let block1: Arc = zebra_test::vectors::BLOCK_MAINNET_1_BYTES - .zcash_deserialize_into() - .unwrap(); - let block2: Arc = zebra_test::vectors::BLOCK_MAINNET_2_BYTES - .zcash_deserialize_into() - .unwrap(); - - // Using the mainnet for now - let network = Network::Mainnet; - let consensus_config = ConsensusConfig::default(); - let state_config = StateConfig::ephemeral(); - let peer_set = MockService::build().for_unit_tests(); - let (sync_status, mut recent_syncs) = SyncStatus::new(); - let (state, latest_chain_tip, chain_tip_change) = - zebra_state::init(state_config.clone(), network); - - let mut state_service = ServiceBuilder::new().buffer(1).service(state); - let (_chain_verifier, tx_verifier) = - zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone()) - .await; - - time::pause(); - - // Start the mempool service - let mut mempool = Mempool::new( - network, - Buffer::new(BoxService::new(peer_set), 1), - state_service.clone(), - tx_verifier, - sync_status, - latest_chain_tip, - chain_tip_change, - ); - - // Enable the mempool - let _ = mempool.enable(&mut recent_syncs).await; - assert!(mempool.is_enabled()); - - // Push the genesis block to the state - let genesis_block: Arc = zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES - .zcash_deserialize_into() - .unwrap(); - state_service - .ready_and() - .await - .unwrap() - .call(zebra_state::Request::CommitFinalizedBlock( - genesis_block.clone().into(), - )) - .await - .unwrap(); - - // Query the mempool to make it poll chain_tip_change - let _response = mempool - .ready_and() - .await - .unwrap() - .call(Request::TransactionIds) - .await - .unwrap(); - - // Push block 1 to the state - state_service - .ready_and() - .await - .unwrap() - .call(zebra_state::Request::CommitFinalizedBlock( - block1.clone().into(), - )) - .await - .unwrap(); - - // Query the mempool to make it poll chain_tip_change - let _response = mempool - .ready_and() - .await - .unwrap() - .call(Request::TransactionIds) - .await - .unwrap(); - - // Queue transaction from block 2 for download. - // It can't be queued before because block 1 triggers a network upgrade, - // which cancels all downloads. - let txid = block2.transactions[0].unmined_id(); - let response = mempool - .ready_and() - .await - .unwrap() - .call(Request::Queue(vec![txid.into()])) - .await - .unwrap(); - let queued_responses = match response { - Response::Queued(queue_responses) => queue_responses, - _ => unreachable!("will never happen in this test"), - }; - assert_eq!(queued_responses.len(), 1); - assert!(queued_responses[0].is_ok()); - assert_eq!(mempool.tx_downloads().in_flight(), 1); - - // Push block 2 to the state - state_service - .oneshot(zebra_state::Request::CommitFinalizedBlock( - block2.clone().into(), - )) - .await - .unwrap(); - - // This is done twice because after the first query the cancellation - // is picked up by select!, and after the second the mempool gets the - // result and the download future is removed. - for _ in 0..2 { - // Query the mempool just to poll it and make it cancel the download. - let _response = mempool - .ready_and() - .await - .unwrap() - .call(Request::TransactionIds) - .await - .unwrap(); - // Sleep to avoid starvation and make sure the cancellation is picked up. - time::sleep(time::Duration::from_millis(100)).await; } - // Check if download was cancelled. - assert_eq!(mempool.tx_downloads().in_flight(), 0); + /// Get the transaction downloader of the mempool for testing purposes. + pub fn tx_downloads(&self) -> &Pin> { + match &self.active_state { + ActiveState::Disabled => panic!("mempool must be enabled"), + ActiveState::Enabled { tx_downloads, .. } => tx_downloads, + } + } - Ok(()) -} - -#[tokio::test] -async fn mempool_cancel_downloads_after_network_upgrade() -> Result<(), Report> { - let block1: Arc = zebra_test::vectors::BLOCK_MAINNET_1_BYTES - .zcash_deserialize_into() - .unwrap(); - let block2: Arc = zebra_test::vectors::BLOCK_MAINNET_2_BYTES - .zcash_deserialize_into() - .unwrap(); - - // Using the mainnet for now - let network = Network::Mainnet; - let consensus_config = ConsensusConfig::default(); - let state_config = StateConfig::ephemeral(); - let peer_set = MockService::build().for_unit_tests(); - let (sync_status, mut recent_syncs) = SyncStatus::new(); - let (state, latest_chain_tip, chain_tip_change) = - zebra_state::init(state_config.clone(), network); - - let mut state_service = ServiceBuilder::new().buffer(1).service(state); - let (_chain_verifier, tx_verifier) = - zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone()) - .await; - - // Start the mempool service - let mut mempool = Mempool::new( - network, - Buffer::new(BoxService::new(peer_set), 1), - state_service.clone(), - tx_verifier, - sync_status, - latest_chain_tip, - chain_tip_change, - ); - - // Enable the mempool - let _ = mempool.enable(&mut recent_syncs).await; - assert!(mempool.is_enabled()); - - // Push the genesis block to the state - let genesis_block: Arc = zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES - .zcash_deserialize_into() - .unwrap(); - state_service - .ready_and() - .await - .unwrap() - .call(zebra_state::Request::CommitFinalizedBlock( - genesis_block.clone().into(), - )) - .await - .unwrap(); - - // Queue transaction from block 2 for download - let txid = block2.transactions[0].unmined_id(); - let response = mempool - .ready_and() - .await - .unwrap() - .call(Request::Queue(vec![txid.into()])) - .await - .unwrap(); - let queued_responses = match response { - Response::Queued(queue_responses) => queue_responses, - _ => unreachable!("will never happen in this test"), - }; - assert_eq!(queued_responses.len(), 1); - assert!(queued_responses[0].is_ok()); - assert_eq!(mempool.tx_downloads().in_flight(), 1); - - // Query the mempool to make it poll chain_tip_change - let _response = mempool - .ready_and() - .await - .unwrap() - .call(Request::TransactionIds) - .await - .unwrap(); - - // Push block 1 to the state. This is considered a network upgrade, - // and thus must cancel all pending transaction downloads. - state_service - .ready_and() - .await - .unwrap() - .call(zebra_state::Request::CommitFinalizedBlock( - block1.clone().into(), - )) - .await - .unwrap(); - - // Query the mempool to make it poll chain_tip_change - let _response = mempool - .ready_and() - .await - .unwrap() - .call(Request::TransactionIds) - .await - .unwrap(); - - // Check if download was cancelled. - assert_eq!(mempool.tx_downloads().in_flight(), 0); - - Ok(()) + /// Enable the mempool by pretending the synchronization is close to the tip. + pub async fn enable(&mut self, recent_syncs: &mut RecentSyncLengths) { + // Pretend we're close to tip + SyncStatus::sync_close_to_tip(recent_syncs); + // Make a dummy request to poll the mempool and make it enable itself + self.dummy_call().await; + } + + /// Disable the mempool by pretending the synchronization is far from the tip. + pub async fn disable(&mut self, recent_syncs: &mut RecentSyncLengths) { + // Pretend we're far from the tip + SyncStatus::sync_far_from_tip(recent_syncs); + // Make a dummy request to poll the mempool and make it disable itself + self.dummy_call().await; + } + + /// Perform a dummy service call so that `poll_ready` is called. + pub async fn dummy_call(&mut self) { + self.oneshot(Request::Queue(vec![])) + .await + .expect("Queuing no transactions shouldn't fail"); + } } diff --git a/zebrad/src/components/mempool/tests/prop.rs b/zebrad/src/components/mempool/tests/prop.rs new file mode 100644 index 00000000..f863040b --- /dev/null +++ b/zebrad/src/components/mempool/tests/prop.rs @@ -0,0 +1,171 @@ +use proptest::prelude::*; +use tokio::time; +use tower::{buffer::Buffer, util::BoxService}; + +use zebra_chain::{parameters::Network, transaction::UnminedTx}; +use zebra_consensus::{error::TransactionError, transaction as tx}; +use zebra_network as zn; +use zebra_state::{self as zs, ChainTipBlock, ChainTipSender}; +use zebra_test::mock_service::{MockService, PropTestAssertion}; + +use super::super::Mempool; +use crate::components::sync::{RecentSyncLengths, SyncStatus}; + +/// A [`MockService`] representing the network service. +type MockPeerSet = MockService; + +/// A [`MockService`] representing the Zebra state service. +type MockState = MockService; + +/// A [`MockService`] representing the Zebra transaction verifier service. +type MockTxVerifier = MockService; + +proptest! { + /// Test if the mempool storage is cleared on a chain reset. + #[test] + fn storage_is_cleared_on_chain_reset( + network in any::(), + transaction in any::(), + chain_tip in any::(), + ) { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("Failed to create Tokio runtime"); + let _guard = runtime.enter(); + + runtime.block_on(async move { + let ( + mut mempool, + mut peer_set, + mut state_service, + mut tx_verifier, + mut recent_syncs, + mut chain_tip_sender, + ) = setup(network); + + time::pause(); + + mempool.enable(&mut recent_syncs).await; + + // Insert a dummy transaction. + mempool + .storage() + .insert(transaction) + .expect("Inserting a transaction should succeed"); + + // The first call to `poll_ready` shouldn't clear the storage yet. + mempool.dummy_call().await; + + prop_assert_eq!(mempool.storage().tx_ids().len(), 1); + + // Simulate a chain reset. + chain_tip_sender.set_finalized_tip(chain_tip); + + // This time a call to `poll_ready` should clear the storage. + mempool.dummy_call().await; + + prop_assert!(mempool.storage().tx_ids().is_empty()); + + peer_set.expect_no_requests().await?; + state_service.expect_no_requests().await?; + tx_verifier.expect_no_requests().await?; + + Ok(()) + })?; + } + + /// Test if the mempool storage is cleared if the syncer falls behind and starts to catch up. + #[test] + fn storage_is_cleared_if_syncer_falls_behind( + network in any::(), + transaction in any::(), + ) { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("Failed to create Tokio runtime"); + let _guard = runtime.enter(); + + runtime.block_on(async move { + let ( + mut mempool, + mut peer_set, + mut state_service, + mut tx_verifier, + mut recent_syncs, + _chain_tip_sender, + ) = setup(network); + + time::pause(); + + mempool.enable(&mut recent_syncs).await; + + // Insert a dummy transaction. + mempool + .storage() + .insert(transaction) + .expect("Inserting a transaction should succeed"); + + // The first call to `poll_ready` shouldn't clear the storage yet. + mempool.dummy_call().await; + + prop_assert_eq!(mempool.storage().tx_ids().len(), 1); + + // Simulate the synchronizer catching up to the network chain tip. + mempool.disable(&mut recent_syncs).await; + + // This time a call to `poll_ready` should clear the storage. + mempool.dummy_call().await; + + // Enable the mempool again so the storage can be accessed. + mempool.enable(&mut recent_syncs).await; + + prop_assert!(mempool.storage().tx_ids().is_empty()); + + peer_set.expect_no_requests().await?; + state_service.expect_no_requests().await?; + tx_verifier.expect_no_requests().await?; + + Ok(()) + })?; + } +} + +/// Create a new [`Mempool`] instance using mocked services. +fn setup( + network: Network, +) -> ( + Mempool, + MockPeerSet, + MockState, + MockTxVerifier, + RecentSyncLengths, + ChainTipSender, +) { + let peer_set = MockService::build().for_prop_tests(); + let state_service = MockService::build().for_prop_tests(); + let tx_verifier = MockService::build().for_prop_tests(); + + let (sync_status, recent_syncs) = SyncStatus::new(); + let (chain_tip_sender, latest_chain_tip, chain_tip_change) = ChainTipSender::new(None, network); + + let mempool = Mempool::new( + network, + Buffer::new(BoxService::new(peer_set.clone()), 1), + Buffer::new(BoxService::new(state_service.clone()), 1), + Buffer::new(BoxService::new(tx_verifier.clone()), 1), + sync_status, + latest_chain_tip, + chain_tip_change, + ); + + ( + mempool, + peer_set, + state_service, + tx_verifier, + recent_syncs, + chain_tip_sender, + ) +} diff --git a/zebrad/src/components/mempool/tests/vector.rs b/zebrad/src/components/mempool/tests/vector.rs new file mode 100644 index 00000000..dba5c1e6 --- /dev/null +++ b/zebrad/src/components/mempool/tests/vector.rs @@ -0,0 +1,560 @@ +use std::{collections::HashSet, sync::Arc}; + +use color_eyre::Report; +use tokio::time; +use tower::{ServiceBuilder, ServiceExt}; + +use zebra_chain::{block::Block, serialization::ZcashDeserializeInto}; +use zebra_consensus::Config as ConsensusConfig; +use zebra_state::Config as StateConfig; +use zebra_test::mock_service::MockService; + +use super::super::{storage::tests::unmined_transactions_in_blocks, *}; + +#[tokio::test] +async fn mempool_service_basic() -> Result<(), Report> { + // Using the mainnet for now + let network = Network::Mainnet; + let consensus_config = ConsensusConfig::default(); + let state_config = StateConfig::ephemeral(); + let peer_set = MockService::build().for_unit_tests(); + let (sync_status, mut recent_syncs) = SyncStatus::new(); + let (state, latest_chain_tip, chain_tip_change) = + zebra_state::init(state_config.clone(), network); + + let state_service = ServiceBuilder::new().buffer(1).service(state); + let (_chain_verifier, tx_verifier) = + zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone()) + .await; + + // get the genesis block transactions from the Zcash blockchain. + let mut unmined_transactions = unmined_transactions_in_blocks(..=10, network); + let genesis_transaction = unmined_transactions + .next() + .expect("Missing genesis transaction"); + let last_transaction = unmined_transactions.next_back().unwrap(); + let more_transactions = unmined_transactions; + + // Start the mempool service + let mut service = Mempool::new( + network, + Buffer::new(BoxService::new(peer_set), 1), + state_service.clone(), + tx_verifier, + sync_status, + latest_chain_tip, + chain_tip_change, + ); + + // Enable the mempool + let _ = service.enable(&mut recent_syncs).await; + + // Insert the genesis block coinbase transaction into the mempool storage. + service.storage().insert(genesis_transaction.clone())?; + + // Test `Request::TransactionIds` + let response = service + .ready_and() + .await + .unwrap() + .call(Request::TransactionIds) + .await + .unwrap(); + let genesis_transaction_ids = match response { + Response::TransactionIds(ids) => ids, + _ => unreachable!("will never happen in this test"), + }; + + // Test `Request::TransactionsById` + let genesis_transactions_hash_set = genesis_transaction_ids + .iter() + .copied() + .collect::>(); + let response = service + .ready_and() + .await + .unwrap() + .call(Request::TransactionsById( + genesis_transactions_hash_set.clone(), + )) + .await + .unwrap(); + let transactions = match response { + Response::Transactions(transactions) => transactions, + _ => unreachable!("will never happen in this test"), + }; + + // Make sure the transaction from the blockchain test vector is the same as the + // response of `Request::TransactionsById` + assert_eq!(genesis_transaction, transactions[0]); + + // Insert more transactions into the mempool storage. + // This will cause the genesis transaction to be moved into rejected. + // Skip the last (will be used later) + for tx in more_transactions { + service.storage().insert(tx.clone())?; + } + + // Test `Request::RejectedTransactionIds` + let response = service + .ready_and() + .await + .unwrap() + .call(Request::RejectedTransactionIds( + genesis_transactions_hash_set, + )) + .await + .unwrap(); + let rejected_ids = match response { + Response::RejectedTransactionIds(ids) => ids, + _ => unreachable!("will never happen in this test"), + }; + + assert_eq!(rejected_ids, genesis_transaction_ids); + + // Test `Request::Queue` + // Use the ID of the last transaction in the list + let response = service + .ready_and() + .await + .unwrap() + .call(Request::Queue(vec![last_transaction.id.into()])) + .await + .unwrap(); + let queued_responses = match response { + Response::Queued(queue_responses) => queue_responses, + _ => unreachable!("will never happen in this test"), + }; + assert_eq!(queued_responses.len(), 1); + assert!(queued_responses[0].is_ok()); + assert_eq!(service.tx_downloads().in_flight(), 1); + + Ok(()) +} + +#[tokio::test] +async fn mempool_queue() -> Result<(), Report> { + // Using the mainnet for now + let network = Network::Mainnet; + let consensus_config = ConsensusConfig::default(); + let state_config = StateConfig::ephemeral(); + let peer_set = MockService::build().for_unit_tests(); + let (sync_status, mut recent_syncs) = SyncStatus::new(); + let (state, latest_chain_tip, chain_tip_change) = + zebra_state::init(state_config.clone(), network); + + let state_service = ServiceBuilder::new().buffer(1).service(state); + let (_chain_verifier, tx_verifier) = + zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone()) + .await; + + // Get transactions to use in the test + let unmined_transactions = unmined_transactions_in_blocks(..=10, network); + let mut transactions = unmined_transactions; + // Split unmined_transactions into: + // [rejected_tx, transactions..., stored_tx, new_tx] + // + // The first transaction to be added in the mempool which will be eventually + // put in the rejected list + let rejected_tx = transactions.next().unwrap().clone(); + // A transaction not in the mempool that will be Queued + let new_tx = transactions.next_back().unwrap(); + // The last transaction that will be added in the mempool (and thus not rejected) + let stored_tx = transactions.next_back().unwrap().clone(); + + // Start the mempool service + let mut service = Mempool::new( + network, + Buffer::new(BoxService::new(peer_set), 1), + state_service.clone(), + tx_verifier, + sync_status, + latest_chain_tip, + chain_tip_change, + ); + + // Enable the mempool + let _ = service.enable(&mut recent_syncs).await; + + // Insert [rejected_tx, transactions..., stored_tx] into the mempool storage. + // Insert the genesis block coinbase transaction into the mempool storage. + service.storage().insert(rejected_tx.clone())?; + // Insert more transactions into the mempool storage. + // This will cause the `rejected_tx` to be moved into rejected. + for tx in transactions { + service.storage().insert(tx.clone())?; + } + service.storage().insert(stored_tx.clone())?; + + // Test `Request::Queue` for a new transaction + let response = service + .ready_and() + .await + .unwrap() + .call(Request::Queue(vec![new_tx.id.into()])) + .await + .unwrap(); + let queued_responses = match response { + Response::Queued(queue_responses) => queue_responses, + _ => unreachable!("will never happen in this test"), + }; + assert_eq!(queued_responses.len(), 1); + assert!(queued_responses[0].is_ok()); + + // Test `Request::Queue` for a transaction already in the mempool + let response = service + .ready_and() + .await + .unwrap() + .call(Request::Queue(vec![stored_tx.id.into()])) + .await + .unwrap(); + let queued_responses = match response { + Response::Queued(queue_responses) => queue_responses, + _ => unreachable!("will never happen in this test"), + }; + assert_eq!(queued_responses.len(), 1); + assert_eq!(queued_responses[0], Err(MempoolError::InMempool)); + + // Test `Request::Queue` for a transaction rejected by the mempool + let response = service + .ready_and() + .await + .unwrap() + .call(Request::Queue(vec![rejected_tx.id.into()])) + .await + .unwrap(); + let queued_responses = match response { + Response::Queued(queue_responses) => queue_responses, + _ => unreachable!("will never happen in this test"), + }; + assert_eq!(queued_responses.len(), 1); + assert_eq!(queued_responses[0], Err(MempoolError::Rejected)); + + Ok(()) +} + +#[tokio::test] +async fn mempool_service_disabled() -> Result<(), Report> { + // Using the mainnet for now + let network = Network::Mainnet; + let consensus_config = ConsensusConfig::default(); + let state_config = StateConfig::ephemeral(); + let peer_set = MockService::build().for_unit_tests(); + let (sync_status, mut recent_syncs) = SyncStatus::new(); + + let (state, latest_chain_tip, chain_tip_change) = zebra_state::init(state_config, network); + let state_service = ServiceBuilder::new().buffer(1).service(state); + let (_chain_verifier, tx_verifier) = + zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone()) + .await; + + // get the genesis block transactions from the Zcash blockchain. + let mut unmined_transactions = unmined_transactions_in_blocks(..=10, network); + let genesis_transaction = unmined_transactions + .next() + .expect("Missing genesis transaction"); + let more_transactions = unmined_transactions; + + // Start the mempool service + let mut service = Mempool::new( + network, + Buffer::new(BoxService::new(peer_set), 1), + state_service.clone(), + tx_verifier, + sync_status, + latest_chain_tip, + chain_tip_change, + ); + + // Test if mempool is disabled (it should start disabled) + assert!(!service.is_enabled()); + + // Enable the mempool + let _ = service.enable(&mut recent_syncs).await; + + assert!(service.is_enabled()); + + // Insert the genesis block coinbase transaction into the mempool storage. + service.storage().insert(genesis_transaction.clone())?; + + // Test if the mempool answers correctly (i.e. is enabled) + let response = service + .ready_and() + .await + .unwrap() + .call(Request::TransactionIds) + .await + .unwrap(); + let _genesis_transaction_ids = match response { + Response::TransactionIds(ids) => ids, + _ => unreachable!("will never happen in this test"), + }; + + // Queue a transaction for download + // Use the ID of the last transaction in the list + let txid = more_transactions.last().unwrap().id; + let response = service + .ready_and() + .await + .unwrap() + .call(Request::Queue(vec![txid.into()])) + .await + .unwrap(); + let queued_responses = match response { + Response::Queued(queue_responses) => queue_responses, + _ => unreachable!("will never happen in this test"), + }; + assert_eq!(queued_responses.len(), 1); + assert!(queued_responses[0].is_ok()); + assert_eq!(service.tx_downloads().in_flight(), 1); + + // Disable the mempool + let _ = service.disable(&mut recent_syncs).await; + + // Test if mempool is disabled again + assert!(!service.is_enabled()); + + // Test if the mempool returns no transactions when disabled + let response = service + .ready_and() + .await + .unwrap() + .call(Request::TransactionIds) + .await + .unwrap(); + match response { + Response::TransactionIds(ids) => { + assert_eq!( + ids.len(), + 0, + "mempool should return no transactions when disabled" + ) + } + _ => unreachable!("will never happen in this test"), + }; + + // Test if the mempool returns to Queue requests correctly when disabled + let response = service + .ready_and() + .await + .unwrap() + .call(Request::Queue(vec![txid.into()])) + .await + .unwrap(); + let queued_responses = match response { + Response::Queued(queue_responses) => queue_responses, + _ => unreachable!("will never happen in this test"), + }; + assert_eq!(queued_responses.len(), 1); + assert_eq!(queued_responses[0], Err(MempoolError::Disabled)); + + Ok(()) +} + +#[tokio::test] +async fn mempool_cancel_mined() -> Result<(), Report> { + let block1: Arc = zebra_test::vectors::BLOCK_MAINNET_1_BYTES + .zcash_deserialize_into() + .unwrap(); + let block2: Arc = zebra_test::vectors::BLOCK_MAINNET_2_BYTES + .zcash_deserialize_into() + .unwrap(); + + // Using the mainnet for now + let network = Network::Mainnet; + let consensus_config = ConsensusConfig::default(); + let state_config = StateConfig::ephemeral(); + let peer_set = MockService::build().for_unit_tests(); + let (sync_status, mut recent_syncs) = SyncStatus::new(); + let (state, latest_chain_tip, chain_tip_change) = + zebra_state::init(state_config.clone(), network); + + let mut state_service = ServiceBuilder::new().buffer(1).service(state); + let (_chain_verifier, tx_verifier) = + zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone()) + .await; + + time::pause(); + + // Start the mempool service + let mut mempool = Mempool::new( + network, + Buffer::new(BoxService::new(peer_set), 1), + state_service.clone(), + tx_verifier, + sync_status, + latest_chain_tip, + chain_tip_change, + ); + + // Enable the mempool + let _ = mempool.enable(&mut recent_syncs).await; + assert!(mempool.is_enabled()); + + // Push the genesis block to the state + let genesis_block: Arc = zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES + .zcash_deserialize_into() + .unwrap(); + state_service + .ready_and() + .await + .unwrap() + .call(zebra_state::Request::CommitFinalizedBlock( + genesis_block.clone().into(), + )) + .await + .unwrap(); + + // Query the mempool to make it poll chain_tip_change + mempool.dummy_call().await; + + // Push block 1 to the state + state_service + .ready_and() + .await + .unwrap() + .call(zebra_state::Request::CommitFinalizedBlock( + block1.clone().into(), + )) + .await + .unwrap(); + + // Query the mempool to make it poll chain_tip_change + mempool.dummy_call().await; + + // Queue transaction from block 2 for download. + // It can't be queued before because block 1 triggers a network upgrade, + // which cancels all downloads. + let txid = block2.transactions[0].unmined_id(); + let response = mempool + .ready_and() + .await + .unwrap() + .call(Request::Queue(vec![txid.into()])) + .await + .unwrap(); + let queued_responses = match response { + Response::Queued(queue_responses) => queue_responses, + _ => unreachable!("will never happen in this test"), + }; + assert_eq!(queued_responses.len(), 1); + assert!(queued_responses[0].is_ok()); + assert_eq!(mempool.tx_downloads().in_flight(), 1); + + // Push block 2 to the state + state_service + .oneshot(zebra_state::Request::CommitFinalizedBlock( + block2.clone().into(), + )) + .await + .unwrap(); + + // This is done twice because after the first query the cancellation + // is picked up by select!, and after the second the mempool gets the + // result and the download future is removed. + for _ in 0..2 { + // Query the mempool just to poll it and make it cancel the download. + mempool.dummy_call().await; + // Sleep to avoid starvation and make sure the cancellation is picked up. + time::sleep(time::Duration::from_millis(100)).await; + } + + // Check if download was cancelled. + assert_eq!(mempool.tx_downloads().in_flight(), 0); + + Ok(()) +} + +#[tokio::test] +async fn mempool_cancel_downloads_after_network_upgrade() -> Result<(), Report> { + let block1: Arc = zebra_test::vectors::BLOCK_MAINNET_1_BYTES + .zcash_deserialize_into() + .unwrap(); + let block2: Arc = zebra_test::vectors::BLOCK_MAINNET_2_BYTES + .zcash_deserialize_into() + .unwrap(); + + // Using the mainnet for now + let network = Network::Mainnet; + let consensus_config = ConsensusConfig::default(); + let state_config = StateConfig::ephemeral(); + let peer_set = MockService::build().for_unit_tests(); + let (sync_status, mut recent_syncs) = SyncStatus::new(); + let (state, latest_chain_tip, chain_tip_change) = + zebra_state::init(state_config.clone(), network); + + let mut state_service = ServiceBuilder::new().buffer(1).service(state); + let (_chain_verifier, tx_verifier) = + zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone()) + .await; + + // Start the mempool service + let mut mempool = Mempool::new( + network, + Buffer::new(BoxService::new(peer_set), 1), + state_service.clone(), + tx_verifier, + sync_status, + latest_chain_tip, + chain_tip_change, + ); + + // Enable the mempool + let _ = mempool.enable(&mut recent_syncs).await; + assert!(mempool.is_enabled()); + + // Push the genesis block to the state + let genesis_block: Arc = zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES + .zcash_deserialize_into() + .unwrap(); + state_service + .ready_and() + .await + .unwrap() + .call(zebra_state::Request::CommitFinalizedBlock( + genesis_block.clone().into(), + )) + .await + .unwrap(); + + // Queue transaction from block 2 for download + let txid = block2.transactions[0].unmined_id(); + let response = mempool + .ready_and() + .await + .unwrap() + .call(Request::Queue(vec![txid.into()])) + .await + .unwrap(); + let queued_responses = match response { + Response::Queued(queue_responses) => queue_responses, + _ => unreachable!("will never happen in this test"), + }; + assert_eq!(queued_responses.len(), 1); + assert!(queued_responses[0].is_ok()); + assert_eq!(mempool.tx_downloads().in_flight(), 1); + + // Query the mempool to make it poll chain_tip_change + mempool.dummy_call().await; + + // Push block 1 to the state. This is considered a network upgrade, + // and thus must cancel all pending transaction downloads. + state_service + .ready_and() + .await + .unwrap() + .call(zebra_state::Request::CommitFinalizedBlock( + block1.clone().into(), + )) + .await + .unwrap(); + + // Query the mempool to make it poll chain_tip_change + mempool.dummy_call().await; + + // Check if download was cancelled. + assert_eq!(mempool.tx_downloads().in_flight(), 0); + + Ok(()) +}