From 42ce79aad9b65e342e429c38c90969b5657ffd79 Mon Sep 17 00:00:00 2001 From: teor Date: Tue, 19 Oct 2021 03:39:56 +1000 Subject: [PATCH] Cancel pending download tasks when the mempool is disabled (#2886) * Impl Drop, Default and take() for ActiveState * Refactor Mempool::poll_ready to check disabled and reset first Also remove some levels of nesting. * Use the same code for dropping and resetting the mempool * Document where the tasks are dropped when switching states * Log mempool resets at info level And add heights to mempool enable/disable/reset logs Co-authored-by: Conrado Gouvea --- zebrad/src/components/mempool.rs | 160 ++++++++++++++--------- zebrad/src/components/mempool/storage.rs | 1 + 2 files changed, 102 insertions(+), 59 deletions(-) diff --git a/zebrad/src/components/mempool.rs b/zebrad/src/components/mempool.rs index 066ae57b..06947889 100644 --- a/zebrad/src/components/mempool.rs +++ b/zebrad/src/components/mempool.rs @@ -96,6 +96,27 @@ enum ActiveState { }, } +impl Default for ActiveState { + fn default() -> Self { + ActiveState::Disabled + } +} + +impl Drop for ActiveState { + fn drop(&mut self) { + if let ActiveState::Enabled { tx_downloads, .. } = self { + tx_downloads.cancel_all(); + } + } +} + +impl ActiveState { + /// Returns the current state, leaving a [`Disabled`] in its place. + fn take(&mut self) -> Self { + std::mem::take(self) + } +} + /// Mempool async management and query service. /// /// The mempool is the set of all verified transactions that this node is aware @@ -207,7 +228,10 @@ impl Mempool { // Update enabled / disabled state if is_close_to_tip { - info!("activating mempool: Zebra is close to the tip"); + info!( + tip_height = ?self.latest_chain_tip.best_tip_height(), + "activating mempool: Zebra is close to the tip" + ); let tx_downloads = Box::pin(TxDownloads::new( Timeout::new(self.outbound.clone(), TRANSACTION_DOWNLOAD_TIMEOUT), @@ -219,8 +243,13 @@ impl Mempool { tx_downloads, }; } else { - info!("deactivating mempool: Zebra is syncing lots of blocks"); + info!( + tip_height = ?self.latest_chain_tip.best_tip_height(), + "deactivating mempool: Zebra is syncing lots of blocks" + ); + // This drops the previous ActiveState::Enabled, + // cancelling its download tasks. self.active_state = ActiveState::Disabled } } @@ -254,68 +283,81 @@ impl Service for Mempool { fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.update_state(); - match &mut self.active_state { - ActiveState::Enabled { - storage, - tx_downloads, - } => { - // Collect inserted transaction ids. - let mut send_to_peers_ids = HashSet::<_>::new(); + // When the mempool is disabled we still return that the service is ready. + // Otherwise, callers could block waiting for the mempool to be enabled, + // which may not be the desired behavior. + if !self.is_enabled() { + return Poll::Ready(Ok(())); + } - // Clean up completed download tasks and add to mempool if successful. - while let Poll::Ready(Some(r)) = tx_downloads.as_mut().poll_next(cx) { - match r { - Ok(tx) => { - if let Ok(inserted_id) = storage.insert(tx.clone()) { - // Save transaction ids that we will send to peers - send_to_peers_ids.insert(inserted_id); - } - } - Err((txid, e)) => { - storage.reject_if_needed(txid, e); - // TODO: should we also log the result? - } - }; - } + let tip_action = self.chain_tip_change.last_tip_change(); - // Handle best chain tip changes - if let Some(tip_action) = self.chain_tip_change.last_tip_change() { - match tip_action { - // Clear the mempool and cancel downloads if there has been a chain tip reset. - TipAction::Reset { .. } => { - storage.clear(); - tx_downloads.cancel_all(); - } - TipAction::Grow { block } => { - // Cancel downloads/verifications/storage of transactions - // with the same mined IDs as recently mined transactions. - let mined_ids = block.transaction_hashes.iter().cloned().collect(); - tx_downloads.cancel(&mined_ids); - storage.remove_same_effects(&mined_ids); - storage.clear_tip_rejections(); + // Clear the mempool and cancel downloads if there has been a chain tip reset. + if matches!(tip_action, Some(TipAction::Reset { .. })) { + info!( + tip_height = ?tip_action.as_ref().unwrap().best_tip_height(), + "resetting mempool: switched best chain, skipped blocks, or activated network upgrade" + ); + + // Use the same code for dropping and resetting the mempool, + // to avoid subtle bugs. + + // Drop the current contents of the state, + // cancelling any pending download tasks, + // and dropping completed verification results. + std::mem::drop(self.active_state.take()); + + // Re-initialise an empty state. + self.update_state(); + + return Poll::Ready(Ok(())); + } + + if let ActiveState::Enabled { + storage, + tx_downloads, + } = &mut self.active_state + { + // Collect inserted transaction ids. + let mut send_to_peers_ids = HashSet::<_>::new(); + + // Clean up completed download tasks and add to mempool if successful. + while let Poll::Ready(Some(r)) = tx_downloads.as_mut().poll_next(cx) { + match r { + Ok(tx) => { + if let Ok(inserted_id) = storage.insert(tx.clone()) { + // Save transaction ids that we will send to peers + send_to_peers_ids.insert(inserted_id); } } - } - - // Remove expired transactions from the mempool. - if let Some(tip_height) = self.latest_chain_tip.best_tip_height() { - let expired_transactions = storage.remove_expired_transactions(tip_height); - // Remove transactions that are expired from the peers list - send_to_peers_ids = Self::remove_expired_from_peer_list( - &send_to_peers_ids, - &expired_transactions, - ); - } - - // Send transactions that were not rejected nor expired to peers - if !send_to_peers_ids.is_empty() { - let _ = self.transaction_sender.send(send_to_peers_ids)?; - } + Err((txid, e)) => { + storage.reject_if_needed(txid, e); + // TODO: should we also log the result? + } + }; } - ActiveState::Disabled => { - // When the mempool is disabled we still return that the service is ready. - // Otherwise, callers could block waiting for the mempool to be enabled, - // which may not be the desired behavior. + + // Handle best chain tip changes + if let Some(TipAction::Grow { block }) = tip_action { + // Cancel downloads/verifications/storage of transactions + // with the same mined IDs as recently mined transactions. + let mined_ids = block.transaction_hashes.iter().cloned().collect(); + tx_downloads.cancel(&mined_ids); + storage.remove_same_effects(&mined_ids); + storage.clear_tip_rejections(); + } + + // Remove expired transactions from the mempool. + if let Some(tip_height) = self.latest_chain_tip.best_tip_height() { + let expired_transactions = storage.remove_expired_transactions(tip_height); + // Remove transactions that are expired from the peers list + send_to_peers_ids = + Self::remove_expired_from_peer_list(&send_to_peers_ids, &expired_transactions); + } + + // Send transactions that were not rejected nor expired to peers + if !send_to_peers_ids.is_empty() { + let _ = self.transaction_sender.send(send_to_peers_ids)?; } } diff --git a/zebrad/src/components/mempool/storage.rs b/zebrad/src/components/mempool/storage.rs index d9a97af4..851512cd 100644 --- a/zebrad/src/components/mempool/storage.rs +++ b/zebrad/src/components/mempool/storage.rs @@ -225,6 +225,7 @@ impl Storage { } /// Clears the whole mempool storage. + #[allow(dead_code)] pub fn clear(&mut self) { self.verified.clear(); self.tip_rejected_exact.clear();