From df65b8cb6523b399ab3f538344b14c9672face5f Mon Sep 17 00:00:00 2001 From: Conrado Gouvea Date: Thu, 28 Oct 2021 17:55:05 -0300 Subject: [PATCH] Restore and update mempool tests (#2966) * Restore mempool_storage_basic * Restore storage_is_cleared_on_chain_resets * Restore mempool_service_basic() and mempool_queue() * Fix tests and repeat multiple times to catch intermittent bugs Co-authored-by: Deirdre Connolly Co-authored-by: Alfredo Garcia --- .../mempool/storage/tests/vectors.rs | 98 ++++++++ zebrad/src/components/mempool/tests/prop.rs | 111 ++++++++- zebrad/src/components/mempool/tests/vector.rs | 235 +++++++++++++++++- 3 files changed, 436 insertions(+), 8 deletions(-) diff --git a/zebrad/src/components/mempool/storage/tests/vectors.rs b/zebrad/src/components/mempool/storage/tests/vectors.rs index 4e2ecb8c..cadb5958 100644 --- a/zebrad/src/components/mempool/storage/tests/vectors.rs +++ b/zebrad/src/components/mempool/storage/tests/vectors.rs @@ -18,6 +18,9 @@ use crate::components::mempool::{ /// so we use a large enough value that will never be reached in the tests. const EVICTION_MEMORY_TIME: Duration = Duration::from_secs(60 * 60); +/// Transaction count used in some tests to derive the mempool test size. +const MEMPOOL_TX_COUNT: usize = 4; + #[test] fn mempool_storage_crud_exact_mainnet() { zebra_test::init(); @@ -50,6 +53,101 @@ fn mempool_storage_crud_exact_mainnet() { assert!(!storage.contains_transaction_exact(&unmined_tx.transaction.id)); } +#[test] +fn mempool_storage_basic() -> Result<()> { + zebra_test::init(); + + // Test multiple times to catch intermittent bugs since eviction is randomized + for _ in 0..10 { + mempool_storage_basic_for_network(Network::Mainnet)?; + mempool_storage_basic_for_network(Network::Testnet)?; + } + + Ok(()) +} + +fn mempool_storage_basic_for_network(network: Network) -> Result<()> { + // Get transactions from the first 10 blocks of the Zcash blockchain + let unmined_transactions: Vec<_> = unmined_transactions_in_blocks(..=10, network).collect(); + + assert!( + MEMPOOL_TX_COUNT < unmined_transactions.len(), + "inconsistent MEMPOOL_TX_COUNT value for this test; decrease it" + ); + + // Use the sum of the costs of the first `MEMPOOL_TX_COUNT` transactions + // as the cost limit + let tx_cost_limit = unmined_transactions + .iter() + .take(MEMPOOL_TX_COUNT) + .map(|tx| tx.cost()) + .sum(); + + // Create an empty storage + let mut storage: Storage = Storage::new(&config::Config { + tx_cost_limit, + ..Default::default() + }); + + // Insert them all to the storage + let mut maybe_inserted_transactions = Vec::new(); + let mut some_rejected_transactions = Vec::new(); + for unmined_transaction in unmined_transactions.clone() { + let result = storage.insert(unmined_transaction.clone()); + match result { + Ok(_) => { + // While the transaction was inserted here, it can be rejected later. + maybe_inserted_transactions.push(unmined_transaction); + } + Err(_) => { + // Other transactions can be rejected on a successful insert, + // so not all rejected transactions will be added. + // Note that `some_rejected_transactions` can be empty since `insert` only + // returns a rejection error if the transaction being inserted is the one + // that was randomly evicted. + some_rejected_transactions.push(unmined_transaction); + } + } + } + // Since transactions are rejected randomly we can't test exact numbers. + // We know the first MEMPOOL_TX_COUNT must have been inserted successfully. + assert!(maybe_inserted_transactions.len() >= MEMPOOL_TX_COUNT); + assert_eq!( + some_rejected_transactions.len() + maybe_inserted_transactions.len(), + unmined_transactions.len() + ); + + // Test if the actual number of inserted/rejected transactions is consistent. + assert!(storage.verified.transaction_count() <= maybe_inserted_transactions.len()); + assert!(storage.rejected_transaction_count() >= some_rejected_transactions.len()); + + // Test if rejected transactions were actually rejected. + for tx in some_rejected_transactions.iter() { + assert!(!storage.contains_transaction_exact(&tx.transaction.id)); + } + + // Query all the ids we have for rejected, get back `total - MEMPOOL_SIZE` + let all_ids: HashSet = unmined_transactions + .iter() + .map(|tx| tx.transaction.id) + .collect(); + + // Convert response to a `HashSet`, because the order of the response doesn't matter. + let all_rejected_ids: HashSet = + storage.rejected_transactions(all_ids).into_iter().collect(); + + let some_rejected_ids = some_rejected_transactions + .iter() + .map(|tx| tx.transaction.id) + .collect::>(); + + // Test if the rejected transactions we have are a subset of the actually + // rejected transactions. + assert!(some_rejected_ids.is_subset(&all_rejected_ids)); + + Ok(()) +} + #[test] fn mempool_storage_crud_same_effects_mainnet() { zebra_test::init(); diff --git a/zebrad/src/components/mempool/tests/prop.rs b/zebrad/src/components/mempool/tests/prop.rs index a26ab75f..b0c69bce 100644 --- a/zebrad/src/components/mempool/tests/prop.rs +++ b/zebrad/src/components/mempool/tests/prop.rs @@ -1,12 +1,13 @@ //! Randomised property tests for the mempool. +use proptest::collection::vec; use proptest::prelude::*; use proptest_derive::Arbitrary; use tokio::time; use tower::{buffer::Buffer, util::BoxService}; -use zebra_chain::{parameters::Network, transaction::VerifiedUnminedTx}; +use zebra_chain::{block, parameters::Network, transaction::VerifiedUnminedTx}; use zebra_consensus::{error::TransactionError, transaction as tx}; use zebra_network as zn; use zebra_state::{self as zs, ChainTipBlock, ChainTipSender}; @@ -26,6 +27,8 @@ type MockState = MockService; /// A [`MockService`] representing the Zebra transaction verifier service. type MockTxVerifier = MockService; +const CHAIN_LENGTH: usize = 10; + proptest! { /// Test if the mempool storage is cleared on a chain reset. #[test] @@ -81,6 +84,94 @@ proptest! { })?; } + /// Test if the mempool storage is cleared on multiple chain resets. + #[test] + fn storage_is_cleared_on_chain_resets( + network in any::(), + mut previous_chain_tip in any::(), + mut transactions in vec(any::(), 0..CHAIN_LENGTH), + fake_chain_tips in vec(any::(), 0..CHAIN_LENGTH), + ) { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("Failed to create Tokio runtime"); + let _guard = runtime.enter(); + + runtime.block_on(async move { + let ( + mut mempool, + mut peer_set, + mut state_service, + mut tx_verifier, + mut recent_syncs, + mut chain_tip_sender, + ) = setup(network); + + time::pause(); + + mempool.enable(&mut recent_syncs).await; + + // Set the initial chain tip. + chain_tip_sender.set_best_non_finalized_tip(previous_chain_tip.clone()); + + // Call the mempool so that it is aware of the initial chain tip. + mempool.dummy_call().await; + + for (fake_chain_tip, transaction) in fake_chain_tips.iter().zip(transactions.iter_mut()) { + // Obtain a new chain tip based on the previous one. + let chain_tip = fake_chain_tip.to_chain_tip_block(&previous_chain_tip); + + // Adjust the transaction expiry height based on the new chain + // tip height so that the mempool does not evict the transaction + // when there is a chain growth. + if let Some(expiry_height) = transaction.transaction.transaction.expiry_height() { + if chain_tip.height >= expiry_height { + let mut tmp_tx = (*transaction.transaction.transaction).clone(); + + // Set a new expiry height that is greater than the + // height of the current chain tip. + *tmp_tx.expiry_height_mut() = block::Height(chain_tip.height.0 + 1); + transaction.transaction = tmp_tx.into(); + } + } + + // Insert the dummy transaction into the mempool. + mempool + .storage() + .insert(transaction.clone()) + .expect("Inserting a transaction should succeed"); + + // Set the new chain tip. + chain_tip_sender.set_best_non_finalized_tip(chain_tip.clone()); + + // Call the mempool so that it is aware of the new chain tip. + mempool.dummy_call().await; + + match fake_chain_tip { + FakeChainTip::Grow(_) => { + // The mempool should not be empty because we had a regular chain growth. + prop_assert_ne!(mempool.storage().transaction_count(), 0); + } + + FakeChainTip::Reset(_) => { + // The mempool should be empty because we had a chain tip reset. + prop_assert_eq!(mempool.storage().transaction_count(), 0); + }, + } + + // Remember the current chain tip so that the next one can refer to it. + previous_chain_tip = chain_tip; + } + + peer_set.expect_no_requests().await?; + state_service.expect_no_requests().await?; + tx_verifier.expect_no_requests().await?; + + Ok(()) + })?; + } + /// Test if the mempool storage is cleared if the syncer falls behind and starts to catch up. #[test] fn storage_is_cleared_if_syncer_falls_behind( @@ -185,3 +276,21 @@ enum FakeChainTip { Grow(ChainTipBlock), Reset(ChainTipBlock), } + +impl FakeChainTip { + /// Returns a new [`ChainTipBlock`] placed on top of the previous block if + /// the chain is supposed to grow. Otherwise returns a [`ChainTipBlock`] + /// that does not reference the previous one. + fn to_chain_tip_block(&self, previous: &ChainTipBlock) -> ChainTipBlock { + match self { + Self::Grow(chain_tip_block) => ChainTipBlock { + hash: chain_tip_block.hash, + height: block::Height(previous.height.0 + 1), + transaction_hashes: chain_tip_block.transaction_hashes.clone(), + previous_block_hash: previous.hash, + }, + + Self::Reset(chain_tip_block) => chain_tip_block.clone(), + } + } +} diff --git a/zebrad/src/components/mempool/tests/vector.rs b/zebrad/src/components/mempool/tests/vector.rs index 0cc38602..5fd76886 100644 --- a/zebrad/src/components/mempool/tests/vector.rs +++ b/zebrad/src/components/mempool/tests/vector.rs @@ -1,6 +1,6 @@ //! Fixed test vectors for the mempool. -use std::sync::Arc; +use std::{collections::HashSet, sync::Arc}; use color_eyre::Report; use tokio::time; @@ -25,13 +25,233 @@ type StateService = Buffer, /// A [`MockService`] representing the Zebra transaction verifier service. type MockTxVerifier = MockService; +#[tokio::test] +async fn mempool_service_basic() -> Result<(), Report> { + // Test multiple times to catch intermittent bugs since eviction is randomized + for _ in 0..10 { + mempool_service_basic_single().await?; + } + Ok(()) +} + +async fn mempool_service_basic_single() -> Result<(), Report> { + // Using the mainnet for now + let network = Network::Mainnet; + + // get the genesis block transactions from the Zcash blockchain. + let mut unmined_transactions = unmined_transactions_in_blocks(..=10, network); + let genesis_transaction = unmined_transactions + .next() + .expect("Missing genesis transaction"); + let last_transaction = unmined_transactions.next_back().unwrap(); + let more_transactions = unmined_transactions.collect::>(); + + // Use as cost limit the costs of all transactions that will be + // inserted except one (the genesis block transaction). + let cost_limit = more_transactions.iter().map(|tx| tx.cost()).sum(); + + let (mut service, _peer_set, _state_service, _tx_verifier, mut recent_syncs) = + setup(network, cost_limit).await; + + // Enable the mempool + let _ = service.enable(&mut recent_syncs).await; + + // Insert the genesis block coinbase transaction into the mempool storage. + let mut inserted_ids = HashSet::new(); + service.storage().insert(genesis_transaction.clone())?; + inserted_ids.insert(genesis_transaction.transaction.id); + + // Test `Request::TransactionIds` + let response = service + .ready_and() + .await + .unwrap() + .call(Request::TransactionIds) + .await + .unwrap(); + let genesis_transaction_ids = match response { + Response::TransactionIds(ids) => ids, + _ => unreachable!("will never happen in this test"), + }; + + // Test `Request::TransactionsById` + let genesis_transactions_hash_set = genesis_transaction_ids + .iter() + .copied() + .collect::>(); + let response = service + .ready_and() + .await + .unwrap() + .call(Request::TransactionsById( + genesis_transactions_hash_set.clone(), + )) + .await + .unwrap(); + let transactions = match response { + Response::Transactions(transactions) => transactions, + _ => unreachable!("will never happen in this test"), + }; + + // Make sure the transaction from the blockchain test vector is the same as the + // response of `Request::TransactionsById` + assert_eq!(genesis_transaction.transaction, transactions[0]); + + // Insert more transactions into the mempool storage. + // This will cause the genesis transaction to be moved into rejected. + // Skip the last (will be used later) + for tx in more_transactions { + inserted_ids.insert(tx.transaction.id); + // Error must be ignored because a insert can trigger an eviction and + // an error is returned if the transaction being inserted in chosen. + let _ = service.storage().insert(tx.clone()); + } + + // Test `Request::RejectedTransactionIds` + let response = service + .ready_and() + .await + .unwrap() + .call(Request::RejectedTransactionIds( + genesis_transactions_hash_set, + )) + .await + .unwrap(); + let rejected_ids = match response { + Response::RejectedTransactionIds(ids) => ids, + _ => unreachable!("will never happen in this test"), + }; + + assert!(rejected_ids.is_subset(&inserted_ids)); + + // Test `Request::Queue` + // Use the ID of the last transaction in the list + let response = service + .ready_and() + .await + .unwrap() + .call(Request::Queue(vec![last_transaction.transaction.id.into()])) + .await + .unwrap(); + let queued_responses = match response { + Response::Queued(queue_responses) => queue_responses, + _ => unreachable!("will never happen in this test"), + }; + assert_eq!(queued_responses.len(), 1); + assert!(queued_responses[0].is_ok()); + assert_eq!(service.tx_downloads().in_flight(), 1); + + Ok(()) +} + +#[tokio::test] +async fn mempool_queue() -> Result<(), Report> { + // Test multiple times to catch intermittent bugs since eviction is randomized + for _ in 0..10 { + mempool_queue_single().await?; + } + Ok(()) +} + +async fn mempool_queue_single() -> Result<(), Report> { + // Using the mainnet for now + let network = Network::Mainnet; + + // Get transactions to use in the test + let unmined_transactions = unmined_transactions_in_blocks(..=10, network); + let mut transactions = unmined_transactions.collect::>(); + // Split unmined_transactions into: + // [transactions..., new_tx] + // A transaction not in the mempool that will be Queued + let new_tx = transactions.pop().unwrap(); + + // Use as cost limit the costs of all transactions that will be + // inserted except the last. + let cost_limit = transactions + .iter() + .take(transactions.len() - 1) + .map(|tx| tx.cost()) + .sum(); + + let (mut service, _peer_set, _state_service, _tx_verifier, mut recent_syncs) = + setup(network, cost_limit).await; + + // Enable the mempool + let _ = service.enable(&mut recent_syncs).await; + + // Insert [transactions...] into the mempool storage. + // This will cause the at least one transaction to be rejected, since + // the cost limit is the sum of all costs except of the last transaction. + for tx in transactions.iter() { + // Error must be ignored because a insert can trigger an eviction and + // an error is returned if the transaction being inserted in chosen. + let _ = service.storage().insert(tx.clone()); + } + + // Test `Request::Queue` for a new transaction + let response = service + .ready_and() + .await + .unwrap() + .call(Request::Queue(vec![new_tx.transaction.id.into()])) + .await + .unwrap(); + let queued_responses = match response { + Response::Queued(queue_responses) => queue_responses, + _ => unreachable!("will never happen in this test"), + }; + assert_eq!(queued_responses.len(), 1); + assert!(queued_responses[0].is_ok()); + + // Test `Request::Queue` with all previously inserted transactions. + // They should all be rejected; either because they are already in the mempool, + // or because they are in the recently evicted list. + let response = service + .ready_and() + .await + .unwrap() + .call(Request::Queue( + transactions + .iter() + .map(|tx| tx.transaction.id.into()) + .collect(), + )) + .await + .unwrap(); + let queued_responses = match response { + Response::Queued(queue_responses) => queue_responses, + _ => unreachable!("will never happen in this test"), + }; + assert_eq!(queued_responses.len(), transactions.len()); + + // Check if the responses are consistent + let mut in_mempool_count = 0; + let mut evicted_count = 0; + for response in queued_responses { + match response { + Ok(_) => panic!("all transactions should have been rejected"), + Err(e) => match e { + MempoolError::StorageEffectsChain( + SameEffectsChainRejectionError::RandomlyEvicted, + ) => evicted_count += 1, + MempoolError::InMempool => in_mempool_count += 1, + _ => panic!("transaction should not be rejected with reason {:?}", e), + }, + } + } + assert_eq!(in_mempool_count, transactions.len() - 1); + assert_eq!(evicted_count, 1); + + Ok(()) +} + #[tokio::test] async fn mempool_service_disabled() -> Result<(), Report> { // Using the mainnet for now let network = Network::Mainnet; let (mut service, _peer_set, _state_service, _tx_verifier, mut recent_syncs) = - setup(network).await; + setup(network, u64::MAX).await; // get the genesis block transactions from the Zcash blockchain. let mut unmined_transactions = unmined_transactions_in_blocks(..=10, network); @@ -138,7 +358,7 @@ async fn mempool_cancel_mined() -> Result<(), Report> { let network = Network::Mainnet; let (mut mempool, _peer_set, mut state_service, _tx_verifier, mut recent_syncs) = - setup(network).await; + setup(network, u64::MAX).await; time::pause(); @@ -233,7 +453,7 @@ async fn mempool_cancel_downloads_after_network_upgrade() -> Result<(), Report> let network = Network::Mainnet; let (mut mempool, _peer_set, mut state_service, _tx_verifier, mut recent_syncs) = - setup(network).await; + setup(network, u64::MAX).await; // Enable the mempool let _ = mempool.enable(&mut recent_syncs).await; @@ -301,7 +521,7 @@ async fn mempool_failed_verification_is_rejected() -> Result<(), Report> { let network = Network::Mainnet; let (mut mempool, _peer_set, mut state_service, mut tx_verifier, mut recent_syncs) = - setup(network).await; + setup(network, u64::MAX).await; // Get transactions to use in the test let mut unmined_transactions = unmined_transactions_in_blocks(1..=2, network); @@ -384,7 +604,7 @@ async fn mempool_failed_download_is_not_rejected() -> Result<(), Report> { let network = Network::Mainnet; let (mut mempool, mut peer_set, mut state_service, _tx_verifier, mut recent_syncs) = - setup(network).await; + setup(network, u64::MAX).await; // Get transactions to use in the test let mut unmined_transactions = unmined_transactions_in_blocks(1..=2, network); @@ -465,6 +685,7 @@ async fn mempool_failed_download_is_not_rejected() -> Result<(), Report> { /// Create a new [`Mempool`] instance using mocked services. async fn setup( network: Network, + tx_cost_limit: u64, ) -> ( Mempool, MockPeerSet, @@ -484,7 +705,7 @@ async fn setup( let (mempool, _mempool_transaction_receiver) = Mempool::new( &mempool::Config { - tx_cost_limit: u64::MAX, + tx_cost_limit, ..Default::default() }, Buffer::new(BoxService::new(peer_set.clone()), 1),