Cancel pending download tasks when the mempool is disabled (#2886)

* Impl Drop, Default and take() for ActiveState

* Refactor Mempool::poll_ready to check disabled and reset first

Also remove some levels of nesting.

* Use the same code for dropping and resetting the mempool

* Document where the tasks are dropped when switching states

* Log mempool resets at info level

And add heights to mempool enable/disable/reset logs

Co-authored-by: Conrado Gouvea <conrado@zfnd.org>
This commit is contained in:
teor 2021-10-19 03:39:56 +10:00 committed by GitHub
parent 40c907dd09
commit 42ce79aad9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 102 additions and 59 deletions

View File

@ -96,6 +96,27 @@ enum ActiveState {
}, },
} }
impl Default for ActiveState {
fn default() -> Self {
ActiveState::Disabled
}
}
impl Drop for ActiveState {
fn drop(&mut self) {
if let ActiveState::Enabled { tx_downloads, .. } = self {
tx_downloads.cancel_all();
}
}
}
impl ActiveState {
/// Returns the current state, leaving a [`Disabled`] in its place.
fn take(&mut self) -> Self {
std::mem::take(self)
}
}
/// Mempool async management and query service. /// Mempool async management and query service.
/// ///
/// The mempool is the set of all verified transactions that this node is aware /// The mempool is the set of all verified transactions that this node is aware
@ -207,7 +228,10 @@ impl Mempool {
// Update enabled / disabled state // Update enabled / disabled state
if is_close_to_tip { if is_close_to_tip {
info!("activating mempool: Zebra is close to the tip"); info!(
tip_height = ?self.latest_chain_tip.best_tip_height(),
"activating mempool: Zebra is close to the tip"
);
let tx_downloads = Box::pin(TxDownloads::new( let tx_downloads = Box::pin(TxDownloads::new(
Timeout::new(self.outbound.clone(), TRANSACTION_DOWNLOAD_TIMEOUT), Timeout::new(self.outbound.clone(), TRANSACTION_DOWNLOAD_TIMEOUT),
@ -219,8 +243,13 @@ impl Mempool {
tx_downloads, tx_downloads,
}; };
} else { } else {
info!("deactivating mempool: Zebra is syncing lots of blocks"); info!(
tip_height = ?self.latest_chain_tip.best_tip_height(),
"deactivating mempool: Zebra is syncing lots of blocks"
);
// This drops the previous ActiveState::Enabled,
// cancelling its download tasks.
self.active_state = ActiveState::Disabled self.active_state = ActiveState::Disabled
} }
} }
@ -254,68 +283,81 @@ impl Service<Request> for Mempool {
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.update_state(); self.update_state();
match &mut self.active_state { // When the mempool is disabled we still return that the service is ready.
ActiveState::Enabled { // Otherwise, callers could block waiting for the mempool to be enabled,
storage, // which may not be the desired behavior.
tx_downloads, if !self.is_enabled() {
} => { return Poll::Ready(Ok(()));
// Collect inserted transaction ids. }
let mut send_to_peers_ids = HashSet::<_>::new();
// Clean up completed download tasks and add to mempool if successful. let tip_action = self.chain_tip_change.last_tip_change();
while let Poll::Ready(Some(r)) = tx_downloads.as_mut().poll_next(cx) {
match r {
Ok(tx) => {
if let Ok(inserted_id) = storage.insert(tx.clone()) {
// Save transaction ids that we will send to peers
send_to_peers_ids.insert(inserted_id);
}
}
Err((txid, e)) => {
storage.reject_if_needed(txid, e);
// TODO: should we also log the result?
}
};
}
// Handle best chain tip changes // Clear the mempool and cancel downloads if there has been a chain tip reset.
if let Some(tip_action) = self.chain_tip_change.last_tip_change() { if matches!(tip_action, Some(TipAction::Reset { .. })) {
match tip_action { info!(
// Clear the mempool and cancel downloads if there has been a chain tip reset. tip_height = ?tip_action.as_ref().unwrap().best_tip_height(),
TipAction::Reset { .. } => { "resetting mempool: switched best chain, skipped blocks, or activated network upgrade"
storage.clear(); );
tx_downloads.cancel_all();
} // Use the same code for dropping and resetting the mempool,
TipAction::Grow { block } => { // to avoid subtle bugs.
// Cancel downloads/verifications/storage of transactions
// with the same mined IDs as recently mined transactions. // Drop the current contents of the state,
let mined_ids = block.transaction_hashes.iter().cloned().collect(); // cancelling any pending download tasks,
tx_downloads.cancel(&mined_ids); // and dropping completed verification results.
storage.remove_same_effects(&mined_ids); std::mem::drop(self.active_state.take());
storage.clear_tip_rejections();
// Re-initialise an empty state.
self.update_state();
return Poll::Ready(Ok(()));
}
if let ActiveState::Enabled {
storage,
tx_downloads,
} = &mut self.active_state
{
// Collect inserted transaction ids.
let mut send_to_peers_ids = HashSet::<_>::new();
// Clean up completed download tasks and add to mempool if successful.
while let Poll::Ready(Some(r)) = tx_downloads.as_mut().poll_next(cx) {
match r {
Ok(tx) => {
if let Ok(inserted_id) = storage.insert(tx.clone()) {
// Save transaction ids that we will send to peers
send_to_peers_ids.insert(inserted_id);
} }
} }
} Err((txid, e)) => {
storage.reject_if_needed(txid, e);
// Remove expired transactions from the mempool. // TODO: should we also log the result?
if let Some(tip_height) = self.latest_chain_tip.best_tip_height() { }
let expired_transactions = storage.remove_expired_transactions(tip_height); };
// Remove transactions that are expired from the peers list
send_to_peers_ids = Self::remove_expired_from_peer_list(
&send_to_peers_ids,
&expired_transactions,
);
}
// Send transactions that were not rejected nor expired to peers
if !send_to_peers_ids.is_empty() {
let _ = self.transaction_sender.send(send_to_peers_ids)?;
}
} }
ActiveState::Disabled => {
// When the mempool is disabled we still return that the service is ready. // Handle best chain tip changes
// Otherwise, callers could block waiting for the mempool to be enabled, if let Some(TipAction::Grow { block }) = tip_action {
// which may not be the desired behavior. // Cancel downloads/verifications/storage of transactions
// with the same mined IDs as recently mined transactions.
let mined_ids = block.transaction_hashes.iter().cloned().collect();
tx_downloads.cancel(&mined_ids);
storage.remove_same_effects(&mined_ids);
storage.clear_tip_rejections();
}
// Remove expired transactions from the mempool.
if let Some(tip_height) = self.latest_chain_tip.best_tip_height() {
let expired_transactions = storage.remove_expired_transactions(tip_height);
// Remove transactions that are expired from the peers list
send_to_peers_ids =
Self::remove_expired_from_peer_list(&send_to_peers_ids, &expired_transactions);
}
// Send transactions that were not rejected nor expired to peers
if !send_to_peers_ids.is_empty() {
let _ = self.transaction_sender.send(send_to_peers_ids)?;
} }
} }

View File

@ -225,6 +225,7 @@ impl Storage {
} }
/// Clears the whole mempool storage. /// Clears the whole mempool storage.
#[allow(dead_code)]
pub fn clear(&mut self) { pub fn clear(&mut self) {
self.verified.clear(); self.verified.clear();
self.tip_rejected_exact.clear(); self.tip_rejected_exact.clear();