Cancel all mempool download and verify tasks when a network upgrade activates

This commit is contained in:
Conrado Gouvea 2021-09-30 15:20:43 -03:00 committed by Deirdre Connolly
parent 0325ba9f1f
commit 16a4110475
3 changed files with 141 additions and 18 deletions

View File

@ -248,9 +248,10 @@ impl Service<Request> for Mempool {
} => {
if let Some(tip_action) = self.chain_tip_change.last_tip_change() {
match tip_action {
// Clear the mempool if there has been a chain tip reset.
// Clear the mempool and cancel downloads if there has been a chain tip reset.
TipAction::Reset { .. } => {
storage.clear();
tx_downloads.cancel_all();
}
// Cancel downloads/verifications of transactions with the same
// IDs as recently mined transactions.

View File

@ -334,6 +334,21 @@ where
}
}
/// Cancel all running tasks and reset the downloader state.
// Note: copied from zebrad/src/components/sync/downloads.rs
pub fn cancel_all(&mut self) {
// Replace the pending task list with an empty one and drop it.
let _ = std::mem::take(&mut self.pending);
// Signal cancellation to all running tasks.
// Since we already dropped the JoinHandles above, they should
// fail silently.
for (_hash, cancel) in self.cancel_handles.drain() {
let _ = cancel.send(());
}
assert!(self.pending.is_empty());
assert!(self.cancel_handles.is_empty());
}
/// Get the number of currently in-flight download tasks.
// Note: copied from zebrad/src/components/sync/downloads.rs
#[allow(dead_code)]

View File

@ -406,23 +406,6 @@ async fn mempool_cancel_mined() -> Result<(), Report> {
.await
.unwrap();
// Queue transaction from block 2 for download
let txid = block2.transactions[0].unmined_id();
let response = mempool
.ready_and()
.await
.unwrap()
.call(Request::Queue(vec![txid.into()]))
.await
.unwrap();
let queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert!(queued_responses[0].is_ok());
assert_eq!(mempool.tx_downloads().in_flight(), 1);
// Query the mempool to make it poll chain_tip_change
let _response = mempool
.ready_and()
@ -452,6 +435,25 @@ async fn mempool_cancel_mined() -> Result<(), Report> {
.await
.unwrap();
// Queue transaction from block 2 for download.
// It can't be queued before because block 1 triggers a network upgrade,
// which cancels all downloads.
let txid = block2.transactions[0].unmined_id();
let response = mempool
.ready_and()
.await
.unwrap()
.call(Request::Queue(vec![txid.into()]))
.await
.unwrap();
let queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert!(queued_responses[0].is_ok());
assert_eq!(mempool.tx_downloads().in_flight(), 1);
// Push block 2 to the state
state_service
.oneshot(zebra_state::Request::CommitFinalizedBlock(
@ -481,3 +483,108 @@ async fn mempool_cancel_mined() -> Result<(), Report> {
Ok(())
}
#[tokio::test]
async fn mempool_cancel_downloads_after_network_upgrade() -> Result<(), Report> {
let block1: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_1_BYTES
.zcash_deserialize_into()
.unwrap();
let block2: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_2_BYTES
.zcash_deserialize_into()
.unwrap();
// Using the mainnet for now
let network = Network::Mainnet;
let consensus_config = ConsensusConfig::default();
let state_config = StateConfig::ephemeral();
let peer_set = MockService::build().for_unit_tests();
let (sync_status, mut recent_syncs) = SyncStatus::new();
let (state, latest_chain_tip, chain_tip_change) =
zebra_state::init(state_config.clone(), network);
let mut state_service = ServiceBuilder::new().buffer(1).service(state);
let (_chain_verifier, tx_verifier) =
zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone())
.await;
// Start the mempool service
let mut mempool = Mempool::new(
network,
Buffer::new(BoxService::new(peer_set), 1),
state_service.clone(),
tx_verifier,
sync_status,
latest_chain_tip,
chain_tip_change,
);
// Enable the mempool
let _ = mempool.enable(&mut recent_syncs).await;
assert!(mempool.is_enabled());
// Push the genesis block to the state
let genesis_block: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES
.zcash_deserialize_into()
.unwrap();
state_service
.ready_and()
.await
.unwrap()
.call(zebra_state::Request::CommitFinalizedBlock(
genesis_block.clone().into(),
))
.await
.unwrap();
// Queue transaction from block 2 for download
let txid = block2.transactions[0].unmined_id();
let response = mempool
.ready_and()
.await
.unwrap()
.call(Request::Queue(vec![txid.into()]))
.await
.unwrap();
let queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert!(queued_responses[0].is_ok());
assert_eq!(mempool.tx_downloads().in_flight(), 1);
// Query the mempool to make it poll chain_tip_change
let _response = mempool
.ready_and()
.await
.unwrap()
.call(Request::TransactionIds)
.await
.unwrap();
// Push block 1 to the state. This is considered a network upgrade,
// and thus must cancel all pending transaction downloads.
state_service
.ready_and()
.await
.unwrap()
.call(zebra_state::Request::CommitFinalizedBlock(
block1.clone().into(),
))
.await
.unwrap();
// Query the mempool to make it poll chain_tip_change
let _response = mempool
.ready_and()
.await
.unwrap()
.call(Request::TransactionIds)
.await
.unwrap();
// Check if download was cancelled.
assert_eq!(mempool.tx_downloads().in_flight(), 0);
Ok(())
}