From 16a4110475fb51dce30558a2f759214faabe9935 Mon Sep 17 00:00:00 2001 From: Conrado Gouvea Date: Thu, 30 Sep 2021 15:20:43 -0300 Subject: [PATCH] Cancel all mempool download and verify tasks when a network upgrade activates --- zebrad/src/components/mempool.rs | 3 +- zebrad/src/components/mempool/downloads.rs | 15 +++ zebrad/src/components/mempool/tests.rs | 141 ++++++++++++++++++--- 3 files changed, 141 insertions(+), 18 deletions(-) diff --git a/zebrad/src/components/mempool.rs b/zebrad/src/components/mempool.rs index 64b05f72..3372f8b0 100644 --- a/zebrad/src/components/mempool.rs +++ b/zebrad/src/components/mempool.rs @@ -248,9 +248,10 @@ impl Service for Mempool { } => { if let Some(tip_action) = self.chain_tip_change.last_tip_change() { match tip_action { - // Clear the mempool if there has been a chain tip reset. + // Clear the mempool and cancel downloads if there has been a chain tip reset. TipAction::Reset { .. } => { storage.clear(); + tx_downloads.cancel_all(); } // Cancel downloads/verifications of transactions with the same // IDs as recently mined transactions. diff --git a/zebrad/src/components/mempool/downloads.rs b/zebrad/src/components/mempool/downloads.rs index 05524a6b..bb7d9b0a 100644 --- a/zebrad/src/components/mempool/downloads.rs +++ b/zebrad/src/components/mempool/downloads.rs @@ -334,6 +334,21 @@ where } } + /// Cancel all running tasks and reset the downloader state. + // Note: copied from zebrad/src/components/sync/downloads.rs + pub fn cancel_all(&mut self) { + // Replace the pending task list with an empty one and drop it. + let _ = std::mem::take(&mut self.pending); + // Signal cancellation to all running tasks. + // Since we already dropped the JoinHandles above, they should + // fail silently. + for (_hash, cancel) in self.cancel_handles.drain() { + let _ = cancel.send(()); + } + assert!(self.pending.is_empty()); + assert!(self.cancel_handles.is_empty()); + } + /// Get the number of currently in-flight download tasks. // Note: copied from zebrad/src/components/sync/downloads.rs #[allow(dead_code)] diff --git a/zebrad/src/components/mempool/tests.rs b/zebrad/src/components/mempool/tests.rs index 6e7c4b87..b82724d0 100644 --- a/zebrad/src/components/mempool/tests.rs +++ b/zebrad/src/components/mempool/tests.rs @@ -406,23 +406,6 @@ async fn mempool_cancel_mined() -> Result<(), Report> { .await .unwrap(); - // Queue transaction from block 2 for download - let txid = block2.transactions[0].unmined_id(); - let response = mempool - .ready_and() - .await - .unwrap() - .call(Request::Queue(vec![txid.into()])) - .await - .unwrap(); - let queued_responses = match response { - Response::Queued(queue_responses) => queue_responses, - _ => unreachable!("will never happen in this test"), - }; - assert_eq!(queued_responses.len(), 1); - assert!(queued_responses[0].is_ok()); - assert_eq!(mempool.tx_downloads().in_flight(), 1); - // Query the mempool to make it poll chain_tip_change let _response = mempool .ready_and() @@ -452,6 +435,25 @@ async fn mempool_cancel_mined() -> Result<(), Report> { .await .unwrap(); + // Queue transaction from block 2 for download. + // It can't be queued before because block 1 triggers a network upgrade, + // which cancels all downloads. + let txid = block2.transactions[0].unmined_id(); + let response = mempool + .ready_and() + .await + .unwrap() + .call(Request::Queue(vec![txid.into()])) + .await + .unwrap(); + let queued_responses = match response { + Response::Queued(queue_responses) => queue_responses, + _ => unreachable!("will never happen in this test"), + }; + assert_eq!(queued_responses.len(), 1); + assert!(queued_responses[0].is_ok()); + assert_eq!(mempool.tx_downloads().in_flight(), 1); + // Push block 2 to the state state_service .oneshot(zebra_state::Request::CommitFinalizedBlock( @@ -481,3 +483,108 @@ async fn mempool_cancel_mined() -> Result<(), Report> { Ok(()) } + +#[tokio::test] +async fn mempool_cancel_downloads_after_network_upgrade() -> Result<(), Report> { + let block1: Arc = zebra_test::vectors::BLOCK_MAINNET_1_BYTES + .zcash_deserialize_into() + .unwrap(); + let block2: Arc = zebra_test::vectors::BLOCK_MAINNET_2_BYTES + .zcash_deserialize_into() + .unwrap(); + + // Using the mainnet for now + let network = Network::Mainnet; + let consensus_config = ConsensusConfig::default(); + let state_config = StateConfig::ephemeral(); + let peer_set = MockService::build().for_unit_tests(); + let (sync_status, mut recent_syncs) = SyncStatus::new(); + let (state, latest_chain_tip, chain_tip_change) = + zebra_state::init(state_config.clone(), network); + + let mut state_service = ServiceBuilder::new().buffer(1).service(state); + let (_chain_verifier, tx_verifier) = + zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone()) + .await; + + // Start the mempool service + let mut mempool = Mempool::new( + network, + Buffer::new(BoxService::new(peer_set), 1), + state_service.clone(), + tx_verifier, + sync_status, + latest_chain_tip, + chain_tip_change, + ); + + // Enable the mempool + let _ = mempool.enable(&mut recent_syncs).await; + assert!(mempool.is_enabled()); + + // Push the genesis block to the state + let genesis_block: Arc = zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES + .zcash_deserialize_into() + .unwrap(); + state_service + .ready_and() + .await + .unwrap() + .call(zebra_state::Request::CommitFinalizedBlock( + genesis_block.clone().into(), + )) + .await + .unwrap(); + + // Queue transaction from block 2 for download + let txid = block2.transactions[0].unmined_id(); + let response = mempool + .ready_and() + .await + .unwrap() + .call(Request::Queue(vec![txid.into()])) + .await + .unwrap(); + let queued_responses = match response { + Response::Queued(queue_responses) => queue_responses, + _ => unreachable!("will never happen in this test"), + }; + assert_eq!(queued_responses.len(), 1); + assert!(queued_responses[0].is_ok()); + assert_eq!(mempool.tx_downloads().in_flight(), 1); + + // Query the mempool to make it poll chain_tip_change + let _response = mempool + .ready_and() + .await + .unwrap() + .call(Request::TransactionIds) + .await + .unwrap(); + + // Push block 1 to the state. This is considered a network upgrade, + // and thus must cancel all pending transaction downloads. + state_service + .ready_and() + .await + .unwrap() + .call(zebra_state::Request::CommitFinalizedBlock( + block1.clone().into(), + )) + .await + .unwrap(); + + // Query the mempool to make it poll chain_tip_change + let _response = mempool + .ready_and() + .await + .unwrap() + .call(Request::TransactionIds) + .await + .unwrap(); + + // Check if download was cancelled. + assert_eq!(mempool.tx_downloads().in_flight(), 0); + + Ok(()) +}