From dd1f0a6dccb17a590e71bfd7ec95b72899f346d4 Mon Sep 17 00:00:00 2001 From: Conrado Gouvea Date: Thu, 7 Oct 2021 18:34:01 -0300 Subject: [PATCH] Add transactions that failed verification to the mempool rejected list (#2821) * Add transactions that failed verification to the mempool rejected list * Add tests * Work with recent changes --- zebrad/src/components/mempool.rs | 54 ++++- zebrad/src/components/mempool/downloads.rs | 69 ++++-- zebrad/src/components/mempool/error.rs | 3 + zebrad/src/components/mempool/storage.rs | 28 +++ zebrad/src/components/mempool/tests/vector.rs | 201 ++++++++++++++++++ 5 files changed, 332 insertions(+), 23 deletions(-) diff --git a/zebrad/src/components/mempool.rs b/zebrad/src/components/mempool.rs index 5203ac95..b1340cde 100644 --- a/zebrad/src/components/mempool.rs +++ b/zebrad/src/components/mempool.rs @@ -41,7 +41,8 @@ pub use self::storage::{ pub use self::storage::tests::unmined_transactions_in_blocks; use self::downloads::{ - Downloads as TxDownloads, Gossip, TRANSACTION_DOWNLOAD_TIMEOUT, TRANSACTION_VERIFY_TIMEOUT, + Downloads as TxDownloads, Gossip, TransactionDownloadVerifyError, TRANSACTION_DOWNLOAD_TIMEOUT, + TRANSACTION_VERIFY_TIMEOUT, }; use super::sync::SyncStatus; @@ -227,11 +228,17 @@ impl Service for Mempool { // Clean up completed download tasks and add to mempool if successful. while let Poll::Ready(Some(r)) = tx_downloads.as_mut().poll_next(cx) { - if let Ok(tx) = r { - // Storage handles conflicting transactions or a full mempool internally, - // so just ignore the storage result here - let _ = storage.insert(tx); - } + match r { + Ok(tx) => { + // Storage handles conflicting transactions or a full mempool internally, + // so just ignore the storage result here + let _ = storage.insert(tx); + } + Err((txid, e)) => { + reject_if_needed(storage, txid, e); + // TODO: should we also log the result? + } + }; } // Remove expired transactions from the mempool. @@ -327,3 +334,38 @@ fn remove_expired_transactions( // expiry height is effecting data, so we match by non-malleable TXID storage.remove_same_effects(&txid_set); } + +/// Add a transaction that failed download and verification to the rejected list +/// if needed, depending on the reason for the failure. +fn reject_if_needed( + storage: &mut storage::Storage, + txid: UnminedTxId, + e: TransactionDownloadVerifyError, +) { + match e { + // Rejecting a transaction already in state would speed up further + // download attempts without checking the state. However it would + // make the reject list grow forever. + // TODO: revisit after reviewing the rejected list cleanup criteria? + // TODO: if we decide to reject it, then we need to pass the block hash + // to State::Confirmed. This would require the zs::Response::Transaction + // to include the hash, which would need to be implemented. + TransactionDownloadVerifyError::InState | + // An unknown error in the state service, better do nothing + TransactionDownloadVerifyError::StateError(_) | + // Sync has just started. Mempool shouldn't even be enabled, so will not + // happen in practice. + TransactionDownloadVerifyError::NoTip | + // If download failed, do nothing; the crawler will end up trying to + // download it again. + TransactionDownloadVerifyError::DownloadFailed(_) | + // If it was cancelled then a block was mined, or there was a network + // upgrade, etc. No reason to reject it. + TransactionDownloadVerifyError::Cancelled => {} + // Consensus verification failed. Reject transaction to avoid + // having to download and verify it again just for it to fail again. + TransactionDownloadVerifyError::Invalid(e) => { + storage.reject(txid, ExactTipRejectionError::FailedVerification(e).into()) + } + } +} diff --git a/zebrad/src/components/mempool/downloads.rs b/zebrad/src/components/mempool/downloads.rs index 8bd7bfd8..64b1f390 100644 --- a/zebrad/src/components/mempool/downloads.rs +++ b/zebrad/src/components/mempool/downloads.rs @@ -5,13 +5,13 @@ use std::{ time::Duration, }; -use color_eyre::eyre::eyre; use futures::{ future::TryFutureExt, ready, stream::{FuturesUnordered, Stream}, }; use pin_project::pin_project; +use thiserror::Error; use tokio::{sync::oneshot, task::JoinHandle}; use tower::{Service, ServiceExt}; use tracing_futures::Instrument; @@ -65,6 +65,29 @@ pub(crate) const TRANSACTION_VERIFY_TIMEOUT: Duration = BLOCK_VERIFY_TIMEOUT; /// Therefore, this attack can be carried out by a single malicious node. pub(crate) const MAX_INBOUND_CONCURRENCY: usize = 10; +/// Errors that can occur while downloading and verifying a transaction. +#[derive(Error, Debug)] +#[allow(dead_code)] +pub enum TransactionDownloadVerifyError { + #[error("transaction is already in state")] + InState, + + #[error("error in state service")] + StateError(#[source] BoxError), + + #[error("transaction not validated because the tip is empty")] + NoTip, + + #[error("error downloading transaction")] + DownloadFailed(#[source] BoxError), + + #[error("transaction download / verification was cancelled")] + Cancelled, + + #[error("transaction did not pass consensus validation")] + Invalid(#[from] zebra_consensus::error::TransactionError), +} + /// A gossiped transaction, which can be the transaction itself or just its ID. #[derive(Debug, Eq, PartialEq)] pub enum Gossip { @@ -120,7 +143,9 @@ where // Internal downloads state /// A list of pending transaction download and verify tasks. #[pin] - pending: FuturesUnordered>>, + pending: FuturesUnordered< + JoinHandle>, + >, /// A list of channels that can be used to cancel pending transaction download and /// verify tasks. @@ -136,7 +161,7 @@ where ZS: Service + Send + Clone + 'static, ZS::Future: Send, { - type Item = Result; + type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let this = self.project(); @@ -157,7 +182,7 @@ where } Err((e, hash)) => { this.cancel_handles.remove(&hash); - Poll::Ready(Some(Err(e))) + Poll::Ready(Some(Err((hash, e)))) } } } else { @@ -237,21 +262,27 @@ where Self::transaction_in_state(&mut state, txid).await?; let height = match state.oneshot(zs::Request::Tip).await { - Ok(zs::Response::Tip(None)) => Err("no block at the tip".into()), + Ok(zs::Response::Tip(None)) => Err(TransactionDownloadVerifyError::NoTip), Ok(zs::Response::Tip(Some((height, _hash)))) => Ok(height), Ok(_) => unreachable!("wrong response"), - Err(e) => Err(e), + Err(e) => Err(TransactionDownloadVerifyError::StateError(e)), }?; - let height = (height + 1).ok_or_else(|| eyre!("no next height"))?; + let height = (height + 1).expect("must have next height"); let tx = match gossiped_tx { Gossip::Id(txid) => { let req = zn::Request::TransactionsById(std::iter::once(txid).collect()); - let tx = match network.oneshot(req).await? { - zn::Response::Transactions(mut txs) => txs - .pop() - .expect("successful response has the transaction in it"), + let tx = match network + .oneshot(req) + .await + .map_err(|e| TransactionDownloadVerifyError::DownloadFailed(e))? + { + zn::Response::Transactions(mut txs) => txs.pop().ok_or_else(|| { + TransactionDownloadVerifyError::DownloadFailed( + "no transactions returned".into(), + ) + })?, _ => unreachable!("wrong response to transaction request"), }; @@ -274,7 +305,7 @@ where tracing::debug!(?txid, ?result, "verified transaction for the mempool"); - result + result.map_err(|e| TransactionDownloadVerifyError::Invalid(e.into())) } .map_ok(|tx| { metrics::counter!("gossip.verified.transaction.count", 1); @@ -292,7 +323,7 @@ where _ = &mut cancel_rx => { tracing::trace!("task cancelled prior to completion"); metrics::counter!("gossip.cancelled.count", 1); - Err(("canceled".into(), txid)) + Err((TransactionDownloadVerifyError::Cancelled, txid)) } verification = fut => verification, } @@ -357,18 +388,22 @@ where } /// Check if transaction is already in the state. - async fn transaction_in_state(state: &mut ZS, txid: UnminedTxId) -> Result<(), BoxError> { + async fn transaction_in_state( + state: &mut ZS, + txid: UnminedTxId, + ) -> Result<(), TransactionDownloadVerifyError> { // Check if the transaction is already in the state. match state .ready_and() - .await? + .await + .map_err(|e| TransactionDownloadVerifyError::StateError(e))? .call(zs::Request::Transaction(txid.mined_id())) .await { Ok(zs::Response::Transaction(None)) => Ok(()), - Ok(zs::Response::Transaction(Some(_))) => Err("already present in state".into()), + Ok(zs::Response::Transaction(Some(_))) => Err(TransactionDownloadVerifyError::InState), Ok(_) => unreachable!("wrong response"), - Err(e) => Err(e), + Err(e) => Err(TransactionDownloadVerifyError::StateError(e)), }?; Ok(()) diff --git a/zebrad/src/components/mempool/error.rs b/zebrad/src/components/mempool/error.rs index 1b1ad6cd..0c56bc66 100644 --- a/zebrad/src/components/mempool/error.rs +++ b/zebrad/src/components/mempool/error.rs @@ -52,4 +52,7 @@ pub enum MempoolError { #[error("mempool is disabled since synchronization is behind the chain tip")] Disabled, + + #[error("error calling a service")] + ServiceError, } diff --git a/zebrad/src/components/mempool/storage.rs b/zebrad/src/components/mempool/storage.rs index 798f997a..b6ef49d7 100644 --- a/zebrad/src/components/mempool/storage.rs +++ b/zebrad/src/components/mempool/storage.rs @@ -76,6 +76,18 @@ pub enum SameEffectsChainRejectionError { RandomlyEvicted, } +/// Storage error that combines all other specific error types. +#[derive(Error, Clone, Debug, PartialEq, Eq)] +#[allow(dead_code)] +pub enum RejectionError { + #[error(transparent)] + ExactTip(#[from] ExactTipRejectionError), + #[error(transparent)] + SameEffectsTip(#[from] SameEffectsTipRejectionError), + #[error(transparent)] + SameEffectsChain(#[from] SameEffectsChainRejectionError), +} + #[derive(Default)] pub struct Storage { /// The set of verified transactions in the mempool. This is a @@ -279,6 +291,22 @@ impl Storage { + self.chain_rejected_same_effects.len() } + /// Add a transaction to the rejected list for the given reason. + pub fn reject(&mut self, txid: UnminedTxId, reason: RejectionError) { + match reason { + RejectionError::ExactTip(e) => { + self.tip_rejected_exact.insert(txid, e); + } + RejectionError::SameEffectsTip(e) => { + self.tip_rejected_same_effects.insert(txid.mined_id(), e); + } + RejectionError::SameEffectsChain(e) => { + self.chain_rejected_same_effects.insert(txid.mined_id(), e); + } + } + self.limit_rejection_list_memory(); + } + /// Returns `true` if a [`UnminedTx`] matching an [`UnminedTxId`] is in /// any mempool rejected list. /// diff --git a/zebrad/src/components/mempool/tests/vector.rs b/zebrad/src/components/mempool/tests/vector.rs index e7c7fce2..4101001a 100644 --- a/zebrad/src/components/mempool/tests/vector.rs +++ b/zebrad/src/components/mempool/tests/vector.rs @@ -563,3 +563,204 @@ async fn mempool_cancel_downloads_after_network_upgrade() -> Result<(), Report> Ok(()) } + +/// Check if a transaction that fails verification is rejected by the mempool. +#[tokio::test] +async fn mempool_failed_verification_is_rejected() -> Result<(), Report> { + // Using the mainnet for now + let network = Network::Mainnet; + let consensus_config = ConsensusConfig::default(); + let state_config = StateConfig::ephemeral(); + let peer_set = MockService::build().for_unit_tests(); + let (sync_status, mut recent_syncs) = SyncStatus::new(); + let (state, latest_chain_tip, chain_tip_change) = + zebra_state::init(state_config.clone(), network); + + let mut state_service = ServiceBuilder::new().buffer(1).service(state); + let (_chain_verifier, _tx_verifier) = + zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone()) + .await; + let mut tx_verifier = MockService::build().for_unit_tests(); + + // Get transactions to use in the test + let mut unmined_transactions = unmined_transactions_in_blocks(1..=2, network); + let rejected_tx = unmined_transactions.next().unwrap().clone(); + + time::pause(); + + // Start the mempool service + let mut mempool = Mempool::new( + network, + Buffer::new(BoxService::new(peer_set.clone()), 1), + state_service.clone(), + Buffer::new(BoxService::new(tx_verifier.clone()), 1), + sync_status, + latest_chain_tip, + chain_tip_change, + ); + + // Enable the mempool + let _ = mempool.enable(&mut recent_syncs).await; + + // Push the genesis block to the state, since downloader needs a valid tip. + let genesis_block: Arc = zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES + .zcash_deserialize_into() + .unwrap(); + state_service + .ready_and() + .await + .unwrap() + .call(zebra_state::Request::CommitFinalizedBlock( + genesis_block.clone().into(), + )) + .await + .unwrap(); + + // Queue first transaction for verification + // (queue the transaction itself to avoid a download). + let request = mempool + .ready_and() + .await + .unwrap() + .call(Request::Queue(vec![rejected_tx.clone().into()])); + // Make the mock verifier return that the transaction is invalid. + let verification = tx_verifier.expect_request_that(|_| true).map(|responder| { + responder.respond(Err(TransactionError::BadBalance)); + }); + let (response, _) = futures::join!(request, verification); + let queued_responses = match response.unwrap() { + Response::Queued(queue_responses) => queue_responses, + _ => unreachable!("will never happen in this test"), + }; + // Check that the request was enqueued successfully. + assert_eq!(queued_responses.len(), 1); + assert!(queued_responses[0].is_ok()); + + for _ in 0..2 { + // Query the mempool just to poll it and make get the downloader/verifier result. + mempool.dummy_call().await; + // Sleep to avoid starvation and make sure the verification failure is picked up. + time::sleep(time::Duration::from_millis(100)).await; + } + + // Try to queue the same transaction by its ID and check if it's correctly + // rejected. + let response = mempool + .ready_and() + .await + .unwrap() + .call(Request::Queue(vec![rejected_tx.id.into()])) + .await + .unwrap(); + let queued_responses = match response { + Response::Queued(queue_responses) => queue_responses, + _ => unreachable!("will never happen in this test"), + }; + assert_eq!(queued_responses.len(), 1); + assert!(matches!( + queued_responses[0], + Err(MempoolError::StorageExactTip( + ExactTipRejectionError::FailedVerification(_) + )) + )); + + Ok(()) +} + +/// Check if a transaction that fails download is _not_ rejected. +#[tokio::test] +async fn mempool_failed_download_is_not_rejected() -> Result<(), Report> { + // Using the mainnet for now + let network = Network::Mainnet; + let consensus_config = ConsensusConfig::default(); + let state_config = StateConfig::ephemeral(); + let mut peer_set = MockService::build().for_unit_tests(); + let (sync_status, mut recent_syncs) = SyncStatus::new(); + let (state, latest_chain_tip, chain_tip_change) = + zebra_state::init(state_config.clone(), network); + + let mut state_service = ServiceBuilder::new().buffer(1).service(state); + let (_chain_verifier, tx_verifier) = + zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone()) + .await; + + // Get transactions to use in the test + let mut unmined_transactions = unmined_transactions_in_blocks(1..=2, network); + let rejected_valid_tx = unmined_transactions.next().unwrap().clone(); + + time::pause(); + + // Start the mempool service + let mut mempool = Mempool::new( + network, + Buffer::new(BoxService::new(peer_set.clone()), 1), + state_service.clone(), + tx_verifier, + sync_status, + latest_chain_tip, + chain_tip_change, + ); + + // Enable the mempool + let _ = mempool.enable(&mut recent_syncs).await; + + // Push the genesis block to the state, since downloader needs a valid tip. + let genesis_block: Arc = zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES + .zcash_deserialize_into() + .unwrap(); + state_service + .ready_and() + .await + .unwrap() + .call(zebra_state::Request::CommitFinalizedBlock( + genesis_block.clone().into(), + )) + .await + .unwrap(); + + // Queue second transaction for download and verification. + let request = mempool + .ready_and() + .await + .unwrap() + .call(Request::Queue(vec![rejected_valid_tx.id.into()])); + // Make the mock peer set return that the download failed. + let verification = peer_set + .expect_request_that(|r| matches!(r, zn::Request::TransactionsById(_))) + .map(|responder| { + responder.respond(zn::Response::Transactions(vec![])); + }); + let (response, _) = futures::join!(request, verification); + let queued_responses = match response.unwrap() { + Response::Queued(queue_responses) => queue_responses, + _ => unreachable!("will never happen in this test"), + }; + // Check that the request was enqueued successfully. + assert_eq!(queued_responses.len(), 1); + assert!(queued_responses[0].is_ok()); + + for _ in 0..2 { + // Query the mempool just to poll it and make get the downloader/verifier result. + mempool.dummy_call().await; + // Sleep to avoid starvation and make sure the download failure is picked up. + time::sleep(time::Duration::from_millis(100)).await; + } + + // Try to queue the same transaction by its ID and check if it's not being + // rejected. + let response = mempool + .ready_and() + .await + .unwrap() + .call(Request::Queue(vec![rejected_valid_tx.id.into()])) + .await + .unwrap(); + let queued_responses = match response { + Response::Queued(queue_responses) => queue_responses, + _ => unreachable!("will never happen in this test"), + }; + assert_eq!(queued_responses.len(), 1); + assert!(queued_responses[0].is_ok()); + + Ok(()) +}