From 8d3f6dc026d3c0bfe1804e61c2bfe07c72846089 Mon Sep 17 00:00:00 2001 From: Deirdre Connolly Date: Fri, 27 Aug 2021 10:36:17 -0400 Subject: [PATCH] Mempool component and storage (#2651) * First pass at a Mempool Service, incl. a storage layer underneath * Fixed up Mempool service and storage * allow dead code where needed * clippy * typo * only drain if the mempool is full * add a basic storage test * remove space * fix test for when MEMPOOL_SIZE change * group some imports * add a basic mempool service test * add clippy suggestions * remove not needed allow dead code * Apply suggestions from code review Co-authored-by: teor Co-authored-by: Alfredo Garcia Co-authored-by: teor --- zebrad/src/components/mempool.rs | 85 +++++++++++++- zebrad/src/components/mempool/error.rs | 3 + zebrad/src/components/mempool/storage.rs | 106 ++++++++++++++++++ .../src/components/mempool/storage/tests.rs | 79 +++++++++++++ zebrad/src/components/mempool/tests.rs | 46 ++++++++ 5 files changed, 317 insertions(+), 2 deletions(-) create mode 100644 zebrad/src/components/mempool/storage.rs create mode 100644 zebrad/src/components/mempool/storage/tests.rs create mode 100644 zebrad/src/components/mempool/tests.rs diff --git a/zebrad/src/components/mempool.rs b/zebrad/src/components/mempool.rs index 1ec888e5..7d419d0c 100644 --- a/zebrad/src/components/mempool.rs +++ b/zebrad/src/components/mempool.rs @@ -1,8 +1,89 @@ //! Zebra mempool. -/// Mempool-related errors. -pub mod error; +use std::{ + collections::HashSet, + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +use futures::future::FutureExt; +use tower::Service; + +use zebra_chain::{ + parameters::Network, + transaction::{UnminedTx, UnminedTxId}, +}; + +use crate::BoxError; mod crawler; +mod error; +mod storage; + +#[cfg(test)] +mod tests; pub use self::crawler::Crawler; +pub use self::error::MempoolError; + +#[derive(Debug)] +#[allow(dead_code)] +pub enum Request { + TransactionIds, + TransactionsById(HashSet), +} + +#[derive(Debug)] +pub enum Response { + Transactions(Vec), + TransactionIds(Vec), +} + +/// Mempool async management and query service. +/// +/// The mempool is the set of all verified transactions that this node is aware +/// of that have yet to be confirmed by the Zcash network. A transaction is +/// confirmed when it has been included in a block ('mined'). +#[derive(Clone)] +pub struct Mempool { + /// The Mempool storage itself. + /// + /// ##: Correctness: only components internal to the [`Mempool`] struct are allowed to + /// inject transactions into `storage`, as transactions must be verified beforehand. + storage: storage::Storage, +} + +impl Mempool { + #[allow(dead_code)] + pub(crate) fn new(_network: Network) -> Self { + Mempool { + storage: Default::default(), + } + } +} + +impl Service for Mempool { + type Response = Response; + type Error = BoxError; + type Future = + Pin> + Send + 'static>>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + #[instrument(name = "mempool", skip(self, req))] + fn call(&mut self, req: Request) -> Self::Future { + match req { + Request::TransactionIds => { + let res = self.storage.clone().tx_ids(); + async move { Ok(Response::TransactionIds(res)) }.boxed() + } + Request::TransactionsById(ids) => { + let rsp = Ok(self.storage.clone().transactions(ids)).map(Response::Transactions); + async move { rsp }.boxed() + } + } + } +} diff --git a/zebrad/src/components/mempool/error.rs b/zebrad/src/components/mempool/error.rs index f3a47406..32d1a58e 100644 --- a/zebrad/src/components/mempool/error.rs +++ b/zebrad/src/components/mempool/error.rs @@ -22,4 +22,7 @@ pub enum MempoolError { #[error("transaction was not found in mempool")] NotInMempool, + + #[error("transaction evicted from the mempool due to size restrictions")] + Excess, } diff --git a/zebrad/src/components/mempool/storage.rs b/zebrad/src/components/mempool/storage.rs new file mode 100644 index 00000000..0aa3727b --- /dev/null +++ b/zebrad/src/components/mempool/storage.rs @@ -0,0 +1,106 @@ +use std::collections::{HashMap, HashSet, VecDeque}; + +use zebra_chain::{ + block, + transaction::{UnminedTx, UnminedTxId}, +}; +use zebra_consensus::error::TransactionError; + +use super::MempoolError; + +#[cfg(test)] +pub mod tests; + +const MEMPOOL_SIZE: usize = 2; + +#[allow(dead_code)] +#[derive(Clone, Debug)] +pub enum State { + /// Rejected because verification failed. + Invalid(TransactionError), + /// An otherwise valid mempool transaction was mined into a block, therefore + /// no longer belongs in the mempool. + Confirmed(block::Hash), + /// Stayed in mempool for too long without being mined. + // TODO(2021-08-20): set expiration at 2 weeks? This is what Bitcoin does. + Expired, + /// Transaction fee is too low for the current mempool state. + LowFee, + /// Otherwise valid transaction removed from mempool, say because of FIFO + /// (first in, first out) policy. + Excess, +} + +#[derive(Clone, Default)] +pub struct Storage { + /// The set of verified transactions in the mempool. This is a + /// cache of size [`MEMPOOL_SIZE`]. + verified: VecDeque, + /// The set of rejected transactions by id, and their rejection reasons. + rejected: HashMap, +} + +impl Storage { + /// Insert a [`UnminedTx`] into the mempool. + /// + /// If its insertion results in evicting other transactions, they will be tracked + /// as [`State::Excess`]. + #[allow(dead_code)] + pub fn insert(&mut self, tx: UnminedTx) -> Result { + let tx_id = tx.id; + + // First, check if we should reject this transaction. + if self.rejected.contains_key(&tx.id) { + return Err(match self.rejected.get(&tx.id).unwrap() { + State::Invalid(e) => MempoolError::Invalid(e.clone()), + State::Expired => MempoolError::Expired, + State::Confirmed(block_hash) => MempoolError::InBlock(*block_hash), + State::Excess => MempoolError::Excess, + State::LowFee => MempoolError::LowFee, + }); + } + + // If `tx` is already in the mempool, we don't change anything. + // + // Security: transactions must not get refreshed by new queries, + // because that allows malicious peers to keep transactions live forever. + if self.verified.contains(&tx) { + return Err(MempoolError::InMempool); + } + + // Then, we insert into the pool. + self.verified.push_front(tx); + + // Once inserted, we evict transactions over the pool size limit in FIFO + // order. + if self.verified.len() > MEMPOOL_SIZE { + for evicted_tx in self.verified.drain(MEMPOOL_SIZE..) { + let _ = self.rejected.insert(evicted_tx.id, State::Excess); + } + + assert_eq!(self.verified.len(), MEMPOOL_SIZE); + } + + Ok(tx_id) + } + + /// Returns `true` if a [`UnminedTx`] matching an [`UnminedTxId`] is in + /// the mempool. + #[allow(dead_code)] + pub fn contains(self, txid: &UnminedTxId) -> bool { + self.verified.iter().any(|tx| &tx.id == txid) + } + + /// Returns the set of [`UnminedTxId`]s in the mempool. + pub fn tx_ids(self) -> Vec { + self.verified.iter().map(|tx| tx.id).collect() + } + + /// Returns the set of [`Transaction`]s matching ids in the mempool. + pub fn transactions(self, tx_ids: HashSet) -> Vec { + self.verified + .into_iter() + .filter(|tx| tx_ids.contains(&tx.id)) + .collect() + } +} diff --git a/zebrad/src/components/mempool/storage/tests.rs b/zebrad/src/components/mempool/storage/tests.rs new file mode 100644 index 00000000..332dc059 --- /dev/null +++ b/zebrad/src/components/mempool/storage/tests.rs @@ -0,0 +1,79 @@ +use super::*; + +use zebra_chain::{ + block::Block, parameters::Network, serialization::ZcashDeserializeInto, transaction::UnminedTx, +}; + +use color_eyre::eyre::Result; + +#[test] +fn mempool_storage_basic() -> Result<()> { + zebra_test::init(); + + mempool_storage_basic_for_network(Network::Mainnet)?; + mempool_storage_basic_for_network(Network::Testnet)?; + + Ok(()) +} + +fn mempool_storage_basic_for_network(network: Network) -> Result<()> { + // Create an empty storage + let mut storage: Storage = Default::default(); + + // Get transactions from the first 10 blocks of the Zcash blockchain + let (total_transactions, unmined_transactions) = unmined_transactions_in_blocks(10, network); + + // Insert them all to the storage + for unmined_transaction in unmined_transactions.clone() { + storage.insert(unmined_transaction)?; + } + + // Only MEMPOOL_SIZE should land in verified + assert_eq!(storage.verified.len(), MEMPOOL_SIZE); + + // The rest of the transactions will be in rejected + assert_eq!(storage.rejected.len(), total_transactions - MEMPOOL_SIZE); + + // Make sure the last MEMPOOL_SIZE transactions we sent are in the verified + for tx in unmined_transactions.iter().rev().take(MEMPOOL_SIZE) { + assert!(storage.clone().contains(&tx.id)); + } + + // Anything greater should not be in the verified + for tx in unmined_transactions + .iter() + .take(unmined_transactions.len() - MEMPOOL_SIZE) + { + assert!(!storage.clone().contains(&tx.id)); + } + + Ok(()) +} + +pub fn unmined_transactions_in_blocks( + last_block_height: u32, + network: Network, +) -> (usize, Vec) { + let mut transactions = vec![]; + let mut total = 0; + + let block_iter = match network { + Network::Mainnet => zebra_test::vectors::MAINNET_BLOCKS.iter(), + Network::Testnet => zebra_test::vectors::TESTNET_BLOCKS.iter(), + }; + + for (&height, block) in block_iter { + if height <= last_block_height { + let block = block + .zcash_deserialize_into::() + .expect("block is structurally valid"); + + for transaction in block.transactions.iter() { + transactions.push(UnminedTx::from(transaction)); + total += 1; + } + } + } + + (total, transactions) +} diff --git a/zebrad/src/components/mempool/tests.rs b/zebrad/src/components/mempool/tests.rs new file mode 100644 index 00000000..57c80704 --- /dev/null +++ b/zebrad/src/components/mempool/tests.rs @@ -0,0 +1,46 @@ +use super::*; +use color_eyre::Report; +use std::collections::HashSet; +use storage::tests::unmined_transactions_in_blocks; +use tower::ServiceExt; + +#[tokio::test] +async fn mempool_service_basic() -> Result<(), Report> { + // Using the mainnet for now + let network = Network::Mainnet; + + // get the genesis block transactions from the Zcash blockchain. + let genesis_transactions = unmined_transactions_in_blocks(0, network); + // Start the mempool service + let mut service = Mempool::new(network); + // Insert the genesis block coinbase transaction into the mempool storage. + service.storage.insert(genesis_transactions.1[0].clone())?; + + // Test `Request::TransactionIds` + let response = service + .clone() + .oneshot(Request::TransactionIds) + .await + .unwrap(); + let transaction_ids = match response { + Response::TransactionIds(ids) => ids, + _ => unreachable!("will never happen in this test"), + }; + + // Test `Request::TransactionsById` + let hash_set = transaction_ids.iter().copied().collect::>(); + let response = service + .oneshot(Request::TransactionsById(hash_set)) + .await + .unwrap(); + let transactions = match response { + Response::Transactions(transactions) => transactions, + _ => unreachable!("will never happen in this test"), + }; + + // Make sure the transaction from the blockchain test vector is the same as the + // response of `Request::TransactionsById` + assert_eq!(genesis_transactions.1[0], transactions[0]); + + Ok(()) +}