From 30c9618207458b3fc397182d2ff211274834ff64 Mon Sep 17 00:00:00 2001 From: Marek Date: Thu, 23 Sep 2021 21:09:44 +0200 Subject: [PATCH] Clear mempool at a network upgrade (#2773) * Update the expiry TODO * Clear the mempool at a chain tip reset * Clear the mempool by using a sync method (#2777) * Clear the mempool by using a sync method * Update docs * Apply suggestions from code review Co-authored-by: teor * Refactor last_tip_change() * Apply suggestions from code review Co-authored-by: Janito Vaqueiro Ferreira Filho * Fix brackets * Use best_tip_block instead of manual borrowing Co-authored-by: teor Co-authored-by: teor Co-authored-by: Janito Vaqueiro Ferreira Filho Co-authored-by: teor Co-authored-by: Janito Vaqueiro Ferreira Filho --- zebra-state/src/service/chain_tip.rs | 23 ++++++++++++++- .../src/service/chain_tip/tests/prop.rs | 2 +- .../src/service/chain_tip/tests/vectors.rs | 6 ++-- zebra-state/src/service/tests.rs | 29 +++---------------- zebrad/src/components/mempool.rs | 7 ++++- zebrad/src/components/mempool/storage.rs | 6 ++++ 6 files changed, 42 insertions(+), 31 deletions(-) diff --git a/zebra-state/src/service/chain_tip.rs b/zebra-state/src/service/chain_tip.rs index 7c581902..88aac9f7 100644 --- a/zebra-state/src/service/chain_tip.rs +++ b/zebra-state/src/service/chain_tip.rs @@ -310,7 +310,7 @@ impl ChainTipChange { /// /// If a lot of blocks are committed at the same time, /// the change will skip some blocks, and return a [`Reset`]. - pub async fn tip_change(&mut self) -> Result { + pub async fn wait_for_tip_change(&mut self) -> Result { let block = self.tip_block_change().await?; let action = self.action(block.clone()); @@ -320,6 +320,27 @@ impl ChainTipChange { Ok(action) } + /// Returns: + /// - `Some(`[`TipAction`]`)` if there has been a change since the last time the method was called. + /// - `None` if there has been no change. + /// + /// See [`wait_for_tip_change`] for details. + pub fn last_tip_change(&mut self) -> Option { + // Obtain the tip block. + let block = self.best_tip_block()?; + + // Ignore an unchanged tip. + if Some(block.hash) == self.last_change_hash { + return None; + } + + let action = self.action(block.clone()); + + self.last_change_hash = Some(block.hash); + + Some(action) + } + /// Return an action based on `block` and the last change we returned. fn action(&self, block: ChainTipBlock) -> TipAction { // check for an edge case that's dealt with by other code diff --git a/zebra-state/src/service/chain_tip/tests/prop.rs b/zebra-state/src/service/chain_tip/tests/prop.rs index 0ffd70cd..3a7a3cf0 100644 --- a/zebra-state/src/service/chain_tip/tests/prop.rs +++ b/zebra-state/src/service/chain_tip/tests/prop.rs @@ -100,7 +100,7 @@ proptest! { prop_assert_eq!( chain_tip_change - .tip_change() + .wait_for_tip_change() .now_or_never() .transpose() .expect("watch sender is not dropped"), diff --git a/zebra-state/src/service/chain_tip/tests/vectors.rs b/zebra-state/src/service/chain_tip/tests/vectors.rs index 30597c6b..9b80dfd6 100644 --- a/zebra-state/src/service/chain_tip/tests/vectors.rs +++ b/zebra-state/src/service/chain_tip/tests/vectors.rs @@ -40,7 +40,7 @@ fn chain_tip_change_is_initially_not_ready() { ChainTipSender::new(None, Mainnet); let first = chain_tip_change - .tip_change() + .wait_for_tip_change() .now_or_never() .transpose() .expect("watch sender is not dropped"); @@ -49,7 +49,7 @@ fn chain_tip_change_is_initially_not_ready() { // try again, just to be sure let first = chain_tip_change - .tip_change() + .wait_for_tip_change() .now_or_never() .transpose() .expect("watch sender is not dropped"); @@ -60,7 +60,7 @@ fn chain_tip_change_is_initially_not_ready() { #[allow(clippy::redundant_clone)] let first_clone = chain_tip_change .clone() - .tip_change() + .wait_for_tip_change() .now_or_never() .transpose() .expect("watch sender is not dropped"); diff --git a/zebra-state/src/service/tests.rs b/zebra-state/src/service/tests.rs index 22913990..be35cc45 100644 --- a/zebra-state/src/service/tests.rs +++ b/zebra-state/src/service/tests.rs @@ -1,6 +1,6 @@ use std::{convert::TryInto, env, sync::Arc}; -use futures::{stream::FuturesUnordered, FutureExt}; +use futures::stream::FuturesUnordered; use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt}; use zebra_chain::{ @@ -300,14 +300,7 @@ proptest! { let (mut state_service, latest_chain_tip, mut chain_tip_change) = StateService::new(Config::ephemeral(), network); prop_assert_eq!(latest_chain_tip.best_tip_height(), None); - prop_assert_eq!( - chain_tip_change - .tip_change() - .now_or_never() - .transpose() - .expect("watch sender is not dropped"), - None - ); + prop_assert_eq!(chain_tip_change.last_tip_change(), None); for block in finalized_blocks { let expected_block = block.clone(); @@ -323,14 +316,7 @@ proptest! { state_service.queue_and_commit_finalized(block); prop_assert_eq!(latest_chain_tip.best_tip_height(), Some(expected_block.height)); - prop_assert_eq!( - chain_tip_change - .tip_change() - .now_or_never() - .transpose() - .expect("watch sender is not dropped"), - Some(expected_action) - ); + prop_assert_eq!(chain_tip_change.last_tip_change(), Some(expected_action)); } for block in non_finalized_blocks { @@ -346,14 +332,7 @@ proptest! { state_service.queue_and_commit_non_finalized(block); prop_assert_eq!(latest_chain_tip.best_tip_height(), Some(expected_block.height)); - prop_assert_eq!( - chain_tip_change - .tip_change() - .now_or_never() - .transpose() - .expect("watch sender is not dropped"), - Some(expected_action) - ); + prop_assert_eq!(chain_tip_change.last_tip_change(), Some(expected_action)); } } diff --git a/zebrad/src/components/mempool.rs b/zebrad/src/components/mempool.rs index a75f799b..884aea93 100644 --- a/zebrad/src/components/mempool.rs +++ b/zebrad/src/components/mempool.rs @@ -17,7 +17,7 @@ use zebra_chain::{ use zebra_consensus::{error::TransactionError, transaction}; use zebra_network as zn; use zebra_state as zs; -use zs::ChainTipChange; +use zebra_state::{ChainTipChange, TipAction}; pub use crate::BoxError; @@ -142,6 +142,11 @@ impl Service for Mempool { Pin> + Send + 'static>>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + // Clear the mempool if there has been a chain tip reset. + if let Some(TipAction::Reset { .. }) = self.chain_tip_change.last_tip_change() { + self.storage.clear(); + } + // Clean up completed download tasks and add to mempool if successful while let Poll::Ready(Some(r)) = self.tx_downloads.as_mut().poll_next(cx) { if let Ok(tx) = r { diff --git a/zebrad/src/components/mempool/storage.rs b/zebrad/src/components/mempool/storage.rs index 29c9d063..6f706c85 100644 --- a/zebrad/src/components/mempool/storage.rs +++ b/zebrad/src/components/mempool/storage.rs @@ -140,4 +140,10 @@ impl Storage { .filter(|tx| self.rejected.contains_key(tx)) .collect() } + + /// Clears the whole mempool storage. + pub fn clear(&mut self) { + self.verified.clear(); + self.rejected.clear(); + } }