diff --git a/Cargo.lock b/Cargo.lock index 2a5e4b3d..8ff4c431 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5688,6 +5688,13 @@ dependencies = [ "zebra-test", ] +[[package]] +name = "zebra-node-services" +version = "1.0.0-beta.5" +dependencies = [ + "zebra-chain", +] + [[package]] name = "zebra-rpc" version = "1.0.0-beta.0" @@ -5831,6 +5838,7 @@ dependencies = [ "zebra-chain", "zebra-consensus", "zebra-network", + "zebra-node-services", "zebra-rpc", "zebra-state", "zebra-test", diff --git a/Cargo.toml b/Cargo.toml index cae6d3b4..829aae53 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ members = [ "zebra-consensus", "zebra-rpc", "zebra-client", + "zebra-node-services", "zebra-test", "zebra-utils", "tower-batch", diff --git a/zebra-node-services/Cargo.toml b/zebra-node-services/Cargo.toml new file mode 100644 index 00000000..28532250 --- /dev/null +++ b/zebra-node-services/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "zebra-node-services" +authors = ["Zcash Foundation "] +license = "MIT OR Apache-2.0" +version = "1.0.0-beta.5" +edition = "2021" +repository = "https://github.com/ZcashFoundation/zebra" + +[dependencies] +zebra-chain = { path = "../zebra-chain" } diff --git a/zebra-node-services/src/lib.rs b/zebra-node-services/src/lib.rs new file mode 100644 index 00000000..b113e1fc --- /dev/null +++ b/zebra-node-services/src/lib.rs @@ -0,0 +1,10 @@ +//! The interfaces of some Zebra node services. + +pub mod mempool; + +/// Error type alias to make working with tower traits easier. +/// +/// Note: the 'static lifetime bound means that the *type* cannot have any +/// non-'static lifetimes, (e.g., when a type contains a borrow and is +/// parameterized by 'a), *not* that the object itself has 'static lifetime. +pub type BoxError = Box; diff --git a/zebra-node-services/src/mempool.rs b/zebra-node-services/src/mempool.rs new file mode 100644 index 00000000..4fe0c83a --- /dev/null +++ b/zebra-node-services/src/mempool.rs @@ -0,0 +1,93 @@ +//! The Zebra mempool. +//! +//! A service that manages known unmined Zcash transactions. + +use std::collections::HashSet; + +use zebra_chain::transaction::{UnminedTx, UnminedTxId}; + +use crate::BoxError; + +mod gossip; +pub use self::gossip::Gossip; + +/// A mempool service request. +/// +/// Requests can query the current set of mempool transactions, +/// queue transactions to be downloaded and verified, or +/// run the mempool to check for newly verified transactions. +/// +/// Requests can't modify the mempool directly, +/// because all mempool transactions must be verified. +#[derive(Debug, Eq, PartialEq)] +#[allow(dead_code)] +pub enum Request { + /// Query all transaction IDs in the mempool. + TransactionIds, + + /// Query matching transactions in the mempool, + /// using a unique set of [`UnminedTxId`]s. + TransactionsById(HashSet), + + /// Query matching cached rejected transaction IDs in the mempool, + /// using a unique set of [`UnminedTxId`]s. + RejectedTransactionIds(HashSet), + + /// Queue a list of gossiped transactions or transaction IDs, or + /// crawled transaction IDs. + /// + /// The transaction downloader checks for duplicates across IDs and transactions. + Queue(Vec), + + /// Check for newly verified transactions. + /// + /// The transaction downloader does not push transactions into the mempool. + /// So a task should send this request regularly (every 5-10 seconds). + /// + /// These checks also happen for other request variants, + /// but we can't rely on peers to send queries regularly, + /// and crawler queue requests depend on peer responses. + /// Also, crawler requests aren't frequent enough for transaction propagation. + /// + /// # Correctness + /// + /// This request is required to avoid hangs in the mempool. + /// + /// The queue checker task can't call `poll_ready` directly on the [`Mempool`] service, + /// because the mempool service is wrapped in a `Buffer`. + /// Calling [`Buffer::poll_ready`] reserves a buffer slot, which can cause hangs when + /// too many slots are reserved but unused: + /// + CheckForVerifiedTransactions, +} + +/// A response to a mempool service request. +/// +/// Responses can read the current set of mempool transactions, +/// check the queued status of transactions to be downloaded and verified, or +/// confirm that the mempool has been checked for newly verified transactions. +#[derive(Debug)] +pub enum Response { + /// Returns all transaction IDs from the mempool. + TransactionIds(HashSet), + + /// Returns matching transactions from the mempool. + /// + /// Since the [`TransactionsById`] request is unique, + /// the response transactions are also unique. + Transactions(Vec), + + /// Returns matching cached rejected transaction IDs from the mempool, + RejectedTransactionIds(HashSet), + + /// Returns a list of queue results. + /// + /// These are the results of the initial queue checks. + /// The transaction may also fail download or verification later. + /// + /// Each result matches the request at the corresponding vector index. + Queued(Vec>), + + /// Confirms that the mempool has checked for recently verified transactions. + CheckedForVerifiedTransactions, +} diff --git a/zebra-node-services/src/mempool/gossip.rs b/zebra-node-services/src/mempool/gossip.rs new file mode 100644 index 00000000..2e344893 --- /dev/null +++ b/zebra-node-services/src/mempool/gossip.rs @@ -0,0 +1,35 @@ +//! Representation of a gossiped transaction to send to the mempool. + +use zebra_chain::transaction::{UnminedTx, UnminedTxId}; + +/// A gossiped transaction, which can be the transaction itself or just its ID. +#[derive(Debug, Eq, PartialEq)] +pub enum Gossip { + /// Just the ID of an unmined transaction. + Id(UnminedTxId), + + /// The full contents of an unmined transaction. + Tx(UnminedTx), +} + +impl Gossip { + /// Return the [`UnminedTxId`] of a gossiped transaction. + pub fn id(&self) -> UnminedTxId { + match self { + Gossip::Id(txid) => *txid, + Gossip::Tx(tx) => tx.id, + } + } +} + +impl From for Gossip { + fn from(txid: UnminedTxId) -> Self { + Gossip::Id(txid) + } +} + +impl From for Gossip { + fn from(tx: UnminedTx) -> Self { + Gossip::Tx(tx) + } +} diff --git a/zebrad/Cargo.toml b/zebrad/Cargo.toml index 081bfd65..67266f2e 100644 --- a/zebrad/Cargo.toml +++ b/zebrad/Cargo.toml @@ -13,6 +13,7 @@ default-run = "zebrad" zebra-chain = { path = "../zebra-chain" } zebra-consensus = { path = "../zebra-consensus/" } zebra-network = { path = "../zebra-network" } +zebra-node-services = { path = "../zebra-node-services" } zebra-rpc = { path = "../zebra-rpc" } zebra-state = { path = "../zebra-state" } diff --git a/zebrad/src/components/inbound.rs b/zebrad/src/components/inbound.rs index 5267a4c2..0eb5a34d 100644 --- a/zebrad/src/components/inbound.rs +++ b/zebrad/src/components/inbound.rs @@ -33,12 +33,11 @@ use zebra_network::{ constants::{ADDR_RESPONSE_LIMIT_DENOMINATOR, MAX_ADDRS_IN_MESSAGE}, AddressBook, InventoryResponse, }; +use zebra_node_services::mempool; // Re-use the syncer timeouts for consistency. -use super::{ - mempool, mempool as mp, - sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT}, -}; +use super::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT}; +use crate::BoxError; use InventoryResponse::*; @@ -52,7 +51,7 @@ use downloads::Downloads as BlockDownloads; type BlockDownloadPeerSet = Buffer, zn::Request>; type State = Buffer, zs::Request>; -type Mempool = Buffer, mp::Request>; +type Mempool = Buffer, mempool::Request>; type BlockVerifier = Buffer, block::Hash, VerifyChainError>, Arc>; type GossipedBlockDownloads = BlockDownloads, Timeout, State>; diff --git a/zebrad/src/components/inbound/tests/fake_peer_set.rs b/zebrad/src/components/inbound/tests/fake_peer_set.rs index e5b4ca64..33d2074e 100644 --- a/zebrad/src/components/inbound/tests/fake_peer_set.rs +++ b/zebrad/src/components/inbound/tests/fake_peer_set.rs @@ -23,13 +23,17 @@ use zebra_chain::{ }; use zebra_consensus::{error::TransactionError, transaction, Config as ConsensusConfig}; use zebra_network::{AddressBook, InventoryResponse, Request, Response}; +use zebra_node_services::mempool; use zebra_state::Config as StateConfig; use zebra_test::mock_service::{MockService, PanicAssertion}; use crate::{ components::{ inbound::{Inbound, InboundSetupData}, - mempool::{self, gossip_mempool_transaction_id, unmined_transactions_in_blocks, Mempool}, + mempool::{ + gossip_mempool_transaction_id, unmined_transactions_in_blocks, Config as MempoolConfig, + Mempool, MempoolError, SameEffectsChainRejectionError, UnboxMempoolError, + }, sync::{self, BlockGossipError, SyncStatus}, }, BoxError, @@ -487,10 +491,12 @@ async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> { assert_eq!(queued_responses.len(), 1); assert_eq!( - queued_responses[0], - Err(mempool::MempoolError::StorageEffectsChain( - mempool::SameEffectsChainRejectionError::Expired - )) + queued_responses + .into_iter() + .next() + .unwrap() + .unbox_mempool_error(), + MempoolError::StorageEffectsChain(SameEffectsChainRejectionError::Expired) ); // Test transaction 2 is gossiped @@ -750,7 +756,7 @@ async fn setup( committed_blocks.push(block_one); let (mut mempool_service, transaction_receiver) = Mempool::new( - &mempool::Config::default(), + &MempoolConfig::default(), buffered_peer_set.clone(), state_service.clone(), buffered_tx_verifier.clone(), diff --git a/zebrad/src/components/inbound/tests/real_peer_set.rs b/zebrad/src/components/inbound/tests/real_peer_set.rs index dfac2da0..89af5f22 100644 --- a/zebrad/src/components/inbound/tests/real_peer_set.rs +++ b/zebrad/src/components/inbound/tests/real_peer_set.rs @@ -22,13 +22,14 @@ use zebra_network::{ connect_isolated_tcp_direct_with_inbound, types::InventoryHash, Config as NetworkConfig, InventoryResponse, PeerError, Request, Response, SharedPeerError, }; +use zebra_node_services::mempool; use zebra_state::Config as StateConfig; use zebra_test::mock_service::{MockService, PanicAssertion}; use crate::{ components::{ inbound::{Inbound, InboundSetupData}, - mempool::{self, gossip_mempool_transaction_id, Mempool}, + mempool::{gossip_mempool_transaction_id, Config as MempoolConfig, Mempool}, sync::{self, BlockGossipError, SyncStatus}, }, BoxError, @@ -697,7 +698,7 @@ async fn setup( .service(BoxService::new(mock_tx_verifier.clone())); // Mempool - let mempool_config = mempool::Config::default(); + let mempool_config = MempoolConfig::default(); let (mut mempool_service, transaction_receiver) = Mempool::new( &mempool_config, peer_set.clone(), diff --git a/zebrad/src/components/mempool.rs b/zebrad/src/components/mempool.rs index 9f1510cd..bdca55c1 100644 --- a/zebrad/src/components/mempool.rs +++ b/zebrad/src/components/mempool.rs @@ -30,13 +30,10 @@ use futures::{future::FutureExt, stream::Stream}; use tokio::sync::watch; use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service}; -use zebra_chain::{ - block::Height, - chain_tip::ChainTip, - transaction::{UnminedTx, UnminedTxId}, -}; +use zebra_chain::{block::Height, chain_tip::ChainTip, transaction::UnminedTxId}; use zebra_consensus::{error::TransactionError, transaction}; use zebra_network as zn; +use zebra_node_services::mempool::{Request, Response}; use zebra_state as zs; use zebra_state::{ChainTipChange, TipAction}; @@ -65,10 +62,10 @@ pub use storage::{ }; #[cfg(test)] -pub use storage::tests::unmined_transactions_in_blocks; +pub use self::{storage::tests::unmined_transactions_in_blocks, tests::UnboxMempoolError}; use downloads::{ - Downloads as TxDownloads, Gossip, TRANSACTION_DOWNLOAD_TIMEOUT, TRANSACTION_VERIFY_TIMEOUT, + Downloads as TxDownloads, TRANSACTION_DOWNLOAD_TIMEOUT, TRANSACTION_VERIFY_TIMEOUT, }; type Outbound = Buffer, zn::Request>; @@ -79,87 +76,6 @@ type TxVerifier = Buffer< >; type InboundTxDownloads = TxDownloads, Timeout, State>; -/// A mempool service request. -/// -/// Requests can query the current set of mempool transactions, -/// queue transactions to be downloaded and verified, or -/// run the mempool to check for newly verified transactions. -/// -/// Requests can't modify the mempool directly, -/// because all mempool transactions must be verified. -#[derive(Debug, Eq, PartialEq)] -#[allow(dead_code)] -pub enum Request { - /// Query all transaction IDs in the mempool. - TransactionIds, - - /// Query matching transactions in the mempool, - /// using a unique set of [`UnminedTxId`]s. - TransactionsById(HashSet), - - /// Query matching cached rejected transaction IDs in the mempool, - /// using a unique set of [`UnminedTxId`]s. - RejectedTransactionIds(HashSet), - - /// Queue a list of gossiped transactions or transaction IDs, or - /// crawled transaction IDs. - /// - /// The transaction downloader checks for duplicates across IDs and transactions. - Queue(Vec), - - /// Check for newly verified transactions. - /// - /// The transaction downloader does not push transactions into the mempool. - /// So a task should send this request regularly (every 5-10 seconds). - /// - /// These checks also happen for other request variants, - /// but we can't rely on peers to send queries regularly, - /// and crawler queue requests depend on peer responses. - /// Also, crawler requests aren't frequent enough for transaction propagation. - /// - /// # Correctness - /// - /// This request is required to avoid hangs in the mempool. - /// - /// The queue checker task can't call `poll_ready` directly on the [`Mempool`] service, - /// because the mempool service is wrapped in a `Buffer`. - /// Calling [`Buffer::poll_ready`] reserves a buffer slot, which can cause hangs when - /// too many slots are reserved but unused: - /// - CheckForVerifiedTransactions, -} - -/// A response to a mempool service request. -/// -/// Responses can read the current set of mempool transactions, -/// check the queued status of transactions to be downloaded and verified, or -/// confirm that the mempool has been checked for newly verified transactions. -#[derive(Debug)] -pub enum Response { - /// Returns all transaction IDs from the mempool. - TransactionIds(HashSet), - - /// Returns matching transactions from the mempool. - /// - /// Since the [`TransactionsById`] request is unique, - /// the response transactions are also unique. - Transactions(Vec), - - /// Returns matching cached rejected transaction IDs from the mempool, - RejectedTransactionIds(HashSet), - - /// Returns a list of queue results. - /// - /// These are the results of the initial queue checks. - /// The transaction may also fail download or verification later. - /// - /// Each result matches the request at the corresponding vector index. - Queued(Vec>), - - /// Confirms that the mempool has checked for recently verified transactions. - CheckedForVerifiedTransactions, -} - /// The state of the mempool. /// /// Indicates whether it is enabled or disabled and, if enabled, contains @@ -489,13 +405,14 @@ impl Service for Mempool { // Queue mempool candidates Request::Queue(gossiped_txs) => { - let rsp: Vec> = gossiped_txs + let rsp: Vec> = gossiped_txs .into_iter() - .map(|gossiped_tx| { + .map(|gossiped_tx| -> Result<(), MempoolError> { storage.should_download_or_verify(gossiped_tx.id())?; tx_downloads.download_if_needed_and_verify(gossiped_tx)?; Ok(()) }) + .map(|result| result.map_err(BoxError::from)) .collect(); async move { Ok(Response::Queued(rsp)) }.boxed() } @@ -522,8 +439,10 @@ impl Service for Mempool { Request::Queue(gossiped_txs) => Response::Queued( // Special case; we can signal the error inside the response, // because the inbound service ignores inner errors. - iter::repeat(Err(MempoolError::Disabled)) + iter::repeat(MempoolError::Disabled) .take(gossiped_txs.len()) + .map(BoxError::from) + .map(Err) .collect(), ), diff --git a/zebrad/src/components/mempool/crawler.rs b/zebrad/src/components/mempool/crawler.rs index c7b03370..b111f339 100644 --- a/zebrad/src/components/mempool/crawler.rs +++ b/zebrad/src/components/mempool/crawler.rs @@ -56,10 +56,11 @@ use tracing_futures::Instrument; use zebra_chain::{block::Height, transaction::UnminedTxId}; use zebra_network as zn; +use zebra_node_services::mempool::Gossip; use zebra_state::ChainTipChange; use crate::components::{ - mempool::{self, downloads::Gossip, Config}, + mempool::{self, Config}, sync::SyncStatus, }; diff --git a/zebrad/src/components/mempool/crawler/tests/prop.rs b/zebrad/src/components/mempool/crawler/tests/prop.rs index 6eab653d..e853a608 100644 --- a/zebrad/src/components/mempool/crawler/tests/prop.rs +++ b/zebrad/src/components/mempool/crawler/tests/prop.rs @@ -10,18 +10,21 @@ use tokio::time; use zebra_chain::{parameters::Network, transaction::UnminedTxId}; use zebra_network as zn; +use zebra_node_services::mempool::Gossip; use zebra_state::ChainTipSender; use zebra_test::mock_service::{MockService, PropTestAssertion}; -use crate::components::{ - mempool::{ - self, - crawler::{Crawler, SyncStatus, FANOUT, RATE_LIMIT_DELAY}, - downloads::Gossip, - error::MempoolError, - Config, +use crate::{ + components::{ + mempool::{ + self, + crawler::{Crawler, SyncStatus, FANOUT, RATE_LIMIT_DELAY}, + error::MempoolError, + Config, + }, + sync::RecentSyncLengths, }, - sync::RecentSyncLengths, + BoxError, }; /// The number of iterations to crawl while testing. @@ -310,8 +313,13 @@ async fn crawler_iteration( async fn respond_to_queue_request( mempool: &mut MockMempool, expected_transaction_ids: HashSet, - response: Vec>, + response: impl IntoIterator>, ) -> Result<(), TestCaseError> { + let response = response + .into_iter() + .map(|result| result.map_err(BoxError::from)) + .collect(); + mempool .expect_request_that(|req| { if let mempool::Request::Queue(req) = req { diff --git a/zebrad/src/components/mempool/downloads.rs b/zebrad/src/components/mempool/downloads.rs index 0720fd3f..5d5e5ace 100644 --- a/zebrad/src/components/mempool/downloads.rs +++ b/zebrad/src/components/mempool/downloads.rs @@ -39,9 +39,10 @@ use tokio::{sync::oneshot, task::JoinHandle}; use tower::{Service, ServiceExt}; use tracing_futures::Instrument; -use zebra_chain::transaction::{self, UnminedTx, UnminedTxId, VerifiedUnminedTx}; +use zebra_chain::transaction::{self, UnminedTxId, VerifiedUnminedTx}; use zebra_consensus::transaction as tx; use zebra_network as zn; +use zebra_node_services::mempool::Gossip; use zebra_state as zs; use crate::components::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT}; @@ -111,35 +112,6 @@ pub enum TransactionDownloadVerifyError { Invalid(#[from] zebra_consensus::error::TransactionError), } -/// A gossiped transaction, which can be the transaction itself or just its ID. -#[derive(Debug, Eq, PartialEq)] -pub enum Gossip { - Id(UnminedTxId), - Tx(UnminedTx), -} - -impl Gossip { - /// Return the [`UnminedTxId`] of a gossiped transaction. - pub fn id(&self) -> UnminedTxId { - match self { - Gossip::Id(txid) => *txid, - Gossip::Tx(tx) => tx.id, - } - } -} - -impl From for Gossip { - fn from(txid: UnminedTxId) -> Self { - Gossip::Id(txid) - } -} - -impl From for Gossip { - fn from(tx: UnminedTx) -> Self { - Gossip::Tx(tx) - } -} - /// Represents a [`Stream`] of download and verification tasks. #[pin_project(PinnedDrop)] #[derive(Debug)] diff --git a/zebrad/src/components/mempool/tests.rs b/zebrad/src/components/mempool/tests.rs index d2a5f543..5eb6b6f6 100644 --- a/zebrad/src/components/mempool/tests.rs +++ b/zebrad/src/components/mempool/tests.rs @@ -2,8 +2,13 @@ use std::pin::Pin; use tower::ServiceExt; -use super::{storage::Storage, ActiveState, InboundTxDownloads, Mempool, Request}; -use crate::components::sync::{RecentSyncLengths, SyncStatus}; +use super::{ + error::MempoolError, storage::Storage, ActiveState, InboundTxDownloads, Mempool, Request, +}; +use crate::{ + components::sync::{RecentSyncLengths, SyncStatus}, + BoxError, +}; mod prop; mod vector; @@ -48,3 +53,41 @@ impl Mempool { .expect("unexpected failure when checking for verified transactions"); } } + +/// Helper trait to extract the [`MempoolError`] from a [`BoxError`]. +pub trait UnboxMempoolError { + /// Extract and unbox the [`MempoolError`] stored inside `self`. + /// + /// # Panics + /// + /// If the `boxed_error` is not a boxed [`MempoolError`]. + fn unbox_mempool_error(self) -> MempoolError; +} + +impl UnboxMempoolError for MempoolError { + fn unbox_mempool_error(self) -> MempoolError { + self + } +} + +impl UnboxMempoolError for BoxError { + fn unbox_mempool_error(self) -> MempoolError { + self.downcast::() + .expect("error is not an expected `MempoolError`") + // TODO: use `Box::into_inner` when it becomes stabilized. + .as_ref() + .clone() + } +} + +impl UnboxMempoolError for Result +where + E: UnboxMempoolError, +{ + fn unbox_mempool_error(self) -> MempoolError { + match self { + Ok(_) => panic!("expected a mempool error, but got a success instead"), + Err(error) => error.unbox_mempool_error(), + } + } +} diff --git a/zebrad/src/components/mempool/tests/vector.rs b/zebrad/src/components/mempool/tests/vector.rs index 1c63d224..1e1d5cf9 100644 --- a/zebrad/src/components/mempool/tests/vector.rs +++ b/zebrad/src/components/mempool/tests/vector.rs @@ -11,6 +11,7 @@ use zebra_consensus::transaction as tx; use zebra_state::Config as StateConfig; use zebra_test::mock_service::{MockService, PanicAssertion}; +use super::UnboxMempoolError; use crate::components::{ mempool::{self, storage::tests::unmined_transactions_in_blocks, *}, sync::RecentSyncLengths, @@ -228,15 +229,12 @@ async fn mempool_queue_single() -> Result<(), Report> { let mut in_mempool_count = 0; let mut evicted_count = 0; for response in queued_responses { - match response { - Ok(_) => panic!("all transactions should have been rejected"), - Err(e) => match e { - MempoolError::StorageEffectsChain( - SameEffectsChainRejectionError::RandomlyEvicted, - ) => evicted_count += 1, - MempoolError::InMempool => in_mempool_count += 1, - _ => panic!("transaction should not be rejected with reason {:?}", e), - }, + match response.unbox_mempool_error() { + MempoolError::StorageEffectsChain(SameEffectsChainRejectionError::RandomlyEvicted) => { + evicted_count += 1 + } + MempoolError::InMempool => in_mempool_count += 1, + error => panic!("transaction should not be rejected with reason {:?}", error), } } assert_eq!(in_mempool_count, transactions.len() - 1); @@ -339,8 +337,16 @@ async fn mempool_service_disabled() -> Result<(), Report> { Response::Queued(queue_responses) => queue_responses, _ => unreachable!("will never happen in this test"), }; + assert_eq!(queued_responses.len(), 1); - assert_eq!(queued_responses[0], Err(MempoolError::Disabled)); + assert_eq!( + queued_responses + .into_iter() + .next() + .unwrap() + .unbox_mempool_error(), + MempoolError::Disabled + ); Ok(()) } @@ -588,10 +594,12 @@ async fn mempool_failed_verification_is_rejected() -> Result<(), Report> { }; assert_eq!(queued_responses.len(), 1); assert!(matches!( - queued_responses[0], - Err(MempoolError::StorageExactTip( - ExactTipRejectionError::FailedVerification(_) - )) + queued_responses + .into_iter() + .next() + .unwrap() + .unbox_mempool_error(), + MempoolError::StorageExactTip(ExactTipRejectionError::FailedVerification(_)) )); Ok(())