Skip download and verification if the transaction is already in the mempool or state (#2718)
* Check if tx already exists in mempool or state before downloading * Reorder checks * Add rejected test; refactor into separate function * Wrap mempool in buffered service * Rename RejectedTransactionsById -> RejectedTransactionsIds * Add RejectedTransactionIds response; fix request name * Organize imports * add a test for Storage::rejected_transactions * add test for mempool `Request::RejectedTransactionIds` * change buffer size to 1 in the test Co-authored-by: Alfredo Garcia <oxarbitrage@gmail.com> Co-authored-by: Deirdre Connolly <deirdre@zfnd.org>
This commit is contained in:
parent
be59dd2b93
commit
a2993e8df0
|
|
@ -28,11 +28,15 @@ use color_eyre::eyre::{eyre, Report};
|
||||||
use futures::{select, FutureExt};
|
use futures::{select, FutureExt};
|
||||||
use tokio::sync::oneshot;
|
use tokio::sync::oneshot;
|
||||||
use tower::builder::ServiceBuilder;
|
use tower::builder::ServiceBuilder;
|
||||||
|
use tower::util::BoxService;
|
||||||
|
|
||||||
use crate::components::{tokio::RuntimeRun, Inbound};
|
|
||||||
use crate::config::ZebradConfig;
|
|
||||||
use crate::{
|
use crate::{
|
||||||
components::{mempool, tokio::TokioComponent, ChainSync},
|
components::{
|
||||||
|
mempool::{self, Mempool},
|
||||||
|
tokio::{RuntimeRun, TokioComponent},
|
||||||
|
ChainSync, Inbound,
|
||||||
|
},
|
||||||
|
config::ZebradConfig,
|
||||||
prelude::*,
|
prelude::*,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -65,7 +69,8 @@ impl StartCmd {
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
info!("initializing mempool");
|
info!("initializing mempool");
|
||||||
let mempool = mempool::Mempool::new(config.network.network);
|
let mempool_service = BoxService::new(Mempool::new(config.network.network));
|
||||||
|
let mempool = ServiceBuilder::new().buffer(20).service(mempool_service);
|
||||||
|
|
||||||
info!("initializing network");
|
info!("initializing network");
|
||||||
// The service that our node uses to respond to requests by peers. The
|
// The service that our node uses to respond to requests by peers. The
|
||||||
|
|
@ -80,7 +85,7 @@ impl StartCmd {
|
||||||
state.clone(),
|
state.clone(),
|
||||||
chain_verifier.clone(),
|
chain_verifier.clone(),
|
||||||
tx_verifier.clone(),
|
tx_verifier.clone(),
|
||||||
mempool,
|
mempool.clone(),
|
||||||
));
|
));
|
||||||
|
|
||||||
let (peer_set, address_book) =
|
let (peer_set, address_book) =
|
||||||
|
|
|
||||||
|
|
@ -21,8 +21,11 @@ use zebra_consensus::transaction;
|
||||||
use zebra_consensus::{chain::VerifyChainError, error::TransactionError};
|
use zebra_consensus::{chain::VerifyChainError, error::TransactionError};
|
||||||
use zebra_network::AddressBook;
|
use zebra_network::AddressBook;
|
||||||
|
|
||||||
use super::mempool::downloads::{
|
use super::mempool::{
|
||||||
Downloads as TxDownloads, TRANSACTION_DOWNLOAD_TIMEOUT, TRANSACTION_VERIFY_TIMEOUT,
|
self as mp,
|
||||||
|
downloads::{
|
||||||
|
Downloads as TxDownloads, TRANSACTION_DOWNLOAD_TIMEOUT, TRANSACTION_VERIFY_TIMEOUT,
|
||||||
|
},
|
||||||
};
|
};
|
||||||
// Re-use the syncer timeouts for consistency.
|
// Re-use the syncer timeouts for consistency.
|
||||||
use super::{
|
use super::{
|
||||||
|
|
@ -38,13 +41,14 @@ use downloads::Downloads as BlockDownloads;
|
||||||
|
|
||||||
type Outbound = Buffer<BoxService<zn::Request, zn::Response, zn::BoxError>, zn::Request>;
|
type Outbound = Buffer<BoxService<zn::Request, zn::Response, zn::BoxError>, zn::Request>;
|
||||||
type State = Buffer<BoxService<zs::Request, zs::Response, zs::BoxError>, zs::Request>;
|
type State = Buffer<BoxService<zs::Request, zs::Response, zs::BoxError>, zs::Request>;
|
||||||
|
type Mempool = Buffer<BoxService<mp::Request, mp::Response, mp::BoxError>, mp::Request>;
|
||||||
type BlockVerifier = Buffer<BoxService<Arc<Block>, block::Hash, VerifyChainError>, Arc<Block>>;
|
type BlockVerifier = Buffer<BoxService<Arc<Block>, block::Hash, VerifyChainError>, Arc<Block>>;
|
||||||
type TxVerifier = Buffer<
|
type TxVerifier = Buffer<
|
||||||
BoxService<transaction::Request, transaction::Response, TransactionError>,
|
BoxService<transaction::Request, transaction::Response, TransactionError>,
|
||||||
transaction::Request,
|
transaction::Request,
|
||||||
>;
|
>;
|
||||||
type InboundBlockDownloads = BlockDownloads<Timeout<Outbound>, Timeout<BlockVerifier>, State>;
|
type InboundBlockDownloads = BlockDownloads<Timeout<Outbound>, Timeout<BlockVerifier>, State>;
|
||||||
type InboundTxDownloads = TxDownloads<Timeout<Outbound>, Timeout<TxVerifier>, State>;
|
type InboundTxDownloads = TxDownloads<Timeout<Outbound>, Timeout<TxVerifier>, State, Mempool>;
|
||||||
|
|
||||||
pub type NetworkSetupData = (Outbound, Arc<std::sync::Mutex<AddressBook>>);
|
pub type NetworkSetupData = (Outbound, Arc<std::sync::Mutex<AddressBook>>);
|
||||||
|
|
||||||
|
|
@ -134,7 +138,7 @@ pub struct Inbound {
|
||||||
state: State,
|
state: State,
|
||||||
|
|
||||||
/// A service that manages transactions in the memory pool.
|
/// A service that manages transactions in the memory pool.
|
||||||
mempool: mempool::Mempool,
|
mempool: Mempool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Inbound {
|
impl Inbound {
|
||||||
|
|
@ -143,7 +147,7 @@ impl Inbound {
|
||||||
state: State,
|
state: State,
|
||||||
block_verifier: BlockVerifier,
|
block_verifier: BlockVerifier,
|
||||||
tx_verifier: TxVerifier,
|
tx_verifier: TxVerifier,
|
||||||
mempool: mempool::Mempool,
|
mempool: Mempool,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
network_setup: Setup::AwaitingNetwork {
|
network_setup: Setup::AwaitingNetwork {
|
||||||
|
|
@ -195,6 +199,7 @@ impl Service<zn::Request> for Inbound {
|
||||||
Timeout::new(outbound, TRANSACTION_DOWNLOAD_TIMEOUT),
|
Timeout::new(outbound, TRANSACTION_DOWNLOAD_TIMEOUT),
|
||||||
Timeout::new(tx_verifier, TRANSACTION_VERIFY_TIMEOUT),
|
Timeout::new(tx_verifier, TRANSACTION_VERIFY_TIMEOUT),
|
||||||
self.state.clone(),
|
self.state.clone(),
|
||||||
|
self.mempool.clone(),
|
||||||
));
|
));
|
||||||
result = Ok(());
|
result = Ok(());
|
||||||
Setup::Initialized {
|
Setup::Initialized {
|
||||||
|
|
@ -350,6 +355,7 @@ impl Service<zn::Request> for Inbound {
|
||||||
zn::Request::PushTransaction(_transaction) => {
|
zn::Request::PushTransaction(_transaction) => {
|
||||||
debug!("ignoring unimplemented request");
|
debug!("ignoring unimplemented request");
|
||||||
// TODO: send to Tx Download & Verify Stream
|
// TODO: send to Tx Download & Verify Stream
|
||||||
|
// https://github.com/ZcashFoundation/zebra/issues/2692
|
||||||
async { Ok(zn::Response::Nil) }.boxed()
|
async { Ok(zn::Response::Nil) }.boxed()
|
||||||
}
|
}
|
||||||
zn::Request::AdvertiseTransactionIds(transactions) => {
|
zn::Request::AdvertiseTransactionIds(transactions) => {
|
||||||
|
|
|
||||||
|
|
@ -3,7 +3,7 @@ use std::collections::HashSet;
|
||||||
use super::mempool::{unmined_transactions_in_blocks, Mempool};
|
use super::mempool::{unmined_transactions_in_blocks, Mempool};
|
||||||
|
|
||||||
use tokio::sync::oneshot;
|
use tokio::sync::oneshot;
|
||||||
use tower::{builder::ServiceBuilder, ServiceExt};
|
use tower::{builder::ServiceBuilder, util::BoxService, ServiceExt};
|
||||||
|
|
||||||
use zebra_chain::{
|
use zebra_chain::{
|
||||||
parameters::Network,
|
parameters::Network,
|
||||||
|
|
@ -26,6 +26,9 @@ async fn mempool_requests_for_transactions() {
|
||||||
let added_transactions = add_some_stuff_to_mempool(&mut mempool_service, network);
|
let added_transactions = add_some_stuff_to_mempool(&mut mempool_service, network);
|
||||||
let added_transaction_ids: Vec<UnminedTxId> = added_transactions.iter().map(|t| t.id).collect();
|
let added_transaction_ids: Vec<UnminedTxId> = added_transactions.iter().map(|t| t.id).collect();
|
||||||
|
|
||||||
|
let mempool_service = BoxService::new(mempool_service);
|
||||||
|
let mempool = ServiceBuilder::new().buffer(1).service(mempool_service);
|
||||||
|
|
||||||
let (block_verifier, transaction_verifier) =
|
let (block_verifier, transaction_verifier) =
|
||||||
zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone())
|
zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone())
|
||||||
.await;
|
.await;
|
||||||
|
|
@ -39,7 +42,7 @@ async fn mempool_requests_for_transactions() {
|
||||||
state_service,
|
state_service,
|
||||||
block_verifier.clone(),
|
block_verifier.clone(),
|
||||||
transaction_verifier.clone(),
|
transaction_verifier.clone(),
|
||||||
mempool_service,
|
mempool,
|
||||||
));
|
));
|
||||||
|
|
||||||
// Test `Request::MempoolTransactionIds`
|
// Test `Request::MempoolTransactionIds`
|
||||||
|
|
|
||||||
|
|
@ -15,7 +15,7 @@ use zebra_chain::{
|
||||||
transaction::{UnminedTx, UnminedTxId},
|
transaction::{UnminedTx, UnminedTxId},
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::BoxError;
|
pub use crate::BoxError;
|
||||||
|
|
||||||
mod crawler;
|
mod crawler;
|
||||||
pub mod downloads;
|
pub mod downloads;
|
||||||
|
|
@ -35,12 +35,14 @@ pub use self::storage::tests::unmined_transactions_in_blocks;
|
||||||
pub enum Request {
|
pub enum Request {
|
||||||
TransactionIds,
|
TransactionIds,
|
||||||
TransactionsById(HashSet<UnminedTxId>),
|
TransactionsById(HashSet<UnminedTxId>),
|
||||||
|
RejectedTransactionIds(HashSet<UnminedTxId>),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum Response {
|
pub enum Response {
|
||||||
Transactions(Vec<UnminedTx>),
|
Transactions(Vec<UnminedTx>),
|
||||||
TransactionIds(Vec<UnminedTxId>),
|
TransactionIds(Vec<UnminedTxId>),
|
||||||
|
RejectedTransactionIds(Vec<UnminedTxId>),
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Mempool async management and query service.
|
/// Mempool async management and query service.
|
||||||
|
|
@ -93,6 +95,11 @@ impl Service<Request> for Mempool {
|
||||||
let rsp = Ok(self.storage.clone().transactions(ids)).map(Response::Transactions);
|
let rsp = Ok(self.storage.clone().transactions(ids)).map(Response::Transactions);
|
||||||
async move { rsp }.boxed()
|
async move { rsp }.boxed()
|
||||||
}
|
}
|
||||||
|
Request::RejectedTransactionIds(ids) => {
|
||||||
|
let rsp = Ok(self.storage.clone().rejected_transactions(ids))
|
||||||
|
.map(Response::RejectedTransactionIds);
|
||||||
|
async move { rsp }.boxed()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -21,6 +21,7 @@ use zebra_consensus::transaction as tx;
|
||||||
use zebra_network as zn;
|
use zebra_network as zn;
|
||||||
use zebra_state as zs;
|
use zebra_state as zs;
|
||||||
|
|
||||||
|
use crate::components::mempool as mp;
|
||||||
use crate::components::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT};
|
use crate::components::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT};
|
||||||
|
|
||||||
type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
|
type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
|
||||||
|
|
@ -85,7 +86,7 @@ pub enum DownloadAction {
|
||||||
/// Represents a [`Stream`] of download and verification tasks.
|
/// Represents a [`Stream`] of download and verification tasks.
|
||||||
#[pin_project]
|
#[pin_project]
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Downloads<ZN, ZV, ZS>
|
pub struct Downloads<ZN, ZV, ZS, ZM>
|
||||||
where
|
where
|
||||||
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + 'static,
|
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + 'static,
|
||||||
ZN::Future: Send,
|
ZN::Future: Send,
|
||||||
|
|
@ -93,6 +94,8 @@ where
|
||||||
ZV::Future: Send,
|
ZV::Future: Send,
|
||||||
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
|
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
|
||||||
ZS::Future: Send,
|
ZS::Future: Send,
|
||||||
|
ZM: Service<mp::Request, Response = mp::Response, Error = BoxError> + Send + Clone + 'static,
|
||||||
|
ZM::Future: Send,
|
||||||
{
|
{
|
||||||
// Services
|
// Services
|
||||||
/// A service that forwards requests to connected peers, and returns their
|
/// A service that forwards requests to connected peers, and returns their
|
||||||
|
|
@ -105,6 +108,9 @@ where
|
||||||
/// A service that manages cached blockchain state.
|
/// A service that manages cached blockchain state.
|
||||||
state: ZS,
|
state: ZS,
|
||||||
|
|
||||||
|
/// A service that manages the mempool.
|
||||||
|
mempool: ZM,
|
||||||
|
|
||||||
// Internal downloads state
|
// Internal downloads state
|
||||||
/// A list of pending transaction download and verify tasks.
|
/// A list of pending transaction download and verify tasks.
|
||||||
#[pin]
|
#[pin]
|
||||||
|
|
@ -115,7 +121,7 @@ where
|
||||||
cancel_handles: HashMap<UnminedTxId, oneshot::Sender<()>>,
|
cancel_handles: HashMap<UnminedTxId, oneshot::Sender<()>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<ZN, ZV, ZS> Stream for Downloads<ZN, ZV, ZS>
|
impl<ZN, ZV, ZS, ZM> Stream for Downloads<ZN, ZV, ZS, ZM>
|
||||||
where
|
where
|
||||||
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
|
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
|
||||||
ZN::Future: Send,
|
ZN::Future: Send,
|
||||||
|
|
@ -123,6 +129,8 @@ where
|
||||||
ZV::Future: Send,
|
ZV::Future: Send,
|
||||||
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
|
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
|
||||||
ZS::Future: Send,
|
ZS::Future: Send,
|
||||||
|
ZM: Service<mp::Request, Response = mp::Response, Error = BoxError> + Send + Clone + 'static,
|
||||||
|
ZM::Future: Send,
|
||||||
{
|
{
|
||||||
type Item = Result<UnminedTxId, BoxError>;
|
type Item = Result<UnminedTxId, BoxError>;
|
||||||
|
|
||||||
|
|
@ -158,7 +166,7 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<ZN, ZV, ZS> Downloads<ZN, ZV, ZS>
|
impl<ZN, ZV, ZS, ZM> Downloads<ZN, ZV, ZS, ZM>
|
||||||
where
|
where
|
||||||
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
|
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
|
||||||
ZN::Future: Send,
|
ZN::Future: Send,
|
||||||
|
|
@ -166,6 +174,8 @@ where
|
||||||
ZV::Future: Send,
|
ZV::Future: Send,
|
||||||
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
|
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
|
||||||
ZS::Future: Send,
|
ZS::Future: Send,
|
||||||
|
ZM: Service<mp::Request, Response = mp::Response, Error = BoxError> + Send + Clone + 'static,
|
||||||
|
ZM::Future: Send,
|
||||||
{
|
{
|
||||||
/// Initialize a new download stream with the provided `network` and
|
/// Initialize a new download stream with the provided `network` and
|
||||||
/// `verifier` services.
|
/// `verifier` services.
|
||||||
|
|
@ -173,11 +183,12 @@ where
|
||||||
/// The [`Downloads`] stream is agnostic to the network policy, so retry and
|
/// The [`Downloads`] stream is agnostic to the network policy, so retry and
|
||||||
/// timeout limits should be applied to the `network` service passed into
|
/// timeout limits should be applied to the `network` service passed into
|
||||||
/// this constructor.
|
/// this constructor.
|
||||||
pub fn new(network: ZN, verifier: ZV, state: ZS) -> Self {
|
pub fn new(network: ZN, verifier: ZV, state: ZS, mempool: ZM) -> Self {
|
||||||
Self {
|
Self {
|
||||||
network,
|
network,
|
||||||
verifier,
|
verifier,
|
||||||
state,
|
state,
|
||||||
|
mempool,
|
||||||
pending: FuturesUnordered::new(),
|
pending: FuturesUnordered::new(),
|
||||||
cancel_handles: HashMap::new(),
|
cancel_handles: HashMap::new(),
|
||||||
}
|
}
|
||||||
|
|
@ -213,19 +224,11 @@ where
|
||||||
|
|
||||||
let network = self.network.clone();
|
let network = self.network.clone();
|
||||||
let verifier = self.verifier.clone();
|
let verifier = self.verifier.clone();
|
||||||
let state = self.state.clone();
|
let mut state = self.state.clone();
|
||||||
|
let mut mempool = self.mempool.clone();
|
||||||
|
|
||||||
let fut = async move {
|
let fut = async move {
|
||||||
// TODO: adapt this for transaction / mempool
|
Self::should_download(&mut state, &mut mempool, txid).await?;
|
||||||
// // Check if the block is already in the state.
|
|
||||||
// // BUG: check if the hash is in any chain (#862).
|
|
||||||
// // Depth only checks the main chain.
|
|
||||||
// match state.oneshot(zs::Request::Depth(hash)).await {
|
|
||||||
// Ok(zs::Response::Depth(None)) => Ok(()),
|
|
||||||
// Ok(zs::Response::Depth(Some(_))) => Err("already present".into()),
|
|
||||||
// Ok(_) => unreachable!("wrong response"),
|
|
||||||
// Err(e) => Err(e),
|
|
||||||
// }?;
|
|
||||||
|
|
||||||
let height = match state.oneshot(zs::Request::Tip).await {
|
let height = match state.oneshot(zs::Request::Tip).await {
|
||||||
Ok(zs::Response::Tip(None)) => Err("no block at the tip".into()),
|
Ok(zs::Response::Tip(None)) => Err("no block at the tip".into()),
|
||||||
|
|
@ -298,4 +301,67 @@ where
|
||||||
|
|
||||||
DownloadAction::AddedToQueue
|
DownloadAction::AddedToQueue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Check if transaction should be downloaded and verified.
|
||||||
|
///
|
||||||
|
/// If it is already in the mempool (or in its rejected list)
|
||||||
|
/// or in state, then it shouldn't be downloaded (and an error is returned).
|
||||||
|
async fn should_download(
|
||||||
|
state: &mut ZS,
|
||||||
|
mempool: &mut ZM,
|
||||||
|
txid: UnminedTxId,
|
||||||
|
) -> Result<(), BoxError> {
|
||||||
|
// Check if the transaction is already in the mempool.
|
||||||
|
match mempool
|
||||||
|
.ready_and()
|
||||||
|
.await?
|
||||||
|
.call(mp::Request::TransactionsById(
|
||||||
|
[txid].iter().cloned().collect(),
|
||||||
|
))
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(mp::Response::Transactions(txs)) => {
|
||||||
|
if txs.is_empty() {
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err("already present in mempool".into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(_) => unreachable!("wrong response"),
|
||||||
|
Err(e) => Err(e),
|
||||||
|
}?;
|
||||||
|
|
||||||
|
// Check if the transaction is in the mempool rejected list.
|
||||||
|
match mempool
|
||||||
|
.oneshot(mp::Request::RejectedTransactionIds(
|
||||||
|
[txid].iter().cloned().collect(),
|
||||||
|
))
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(mp::Response::RejectedTransactionIds(txs)) => {
|
||||||
|
if txs.is_empty() {
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err("in mempool rejected list".into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(_) => unreachable!("wrong response"),
|
||||||
|
Err(e) => Err(e),
|
||||||
|
}?;
|
||||||
|
|
||||||
|
// Check if the transaction is already in the state.
|
||||||
|
match state
|
||||||
|
.ready_and()
|
||||||
|
.await?
|
||||||
|
.call(zs::Request::Transaction(txid.mined_id()))
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(zs::Response::Transaction(None)) => Ok(()),
|
||||||
|
Ok(zs::Response::Transaction(Some(_))) => Err("already present in state".into()),
|
||||||
|
Ok(_) => unreachable!("wrong response"),
|
||||||
|
Err(e) => Err(e),
|
||||||
|
}?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -103,4 +103,12 @@ impl Storage {
|
||||||
.filter(|tx| tx_ids.contains(&tx.id))
|
.filter(|tx| tx_ids.contains(&tx.id))
|
||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns the set of [`UnminedTxId`]s matching ids in the rejected list.
|
||||||
|
pub fn rejected_transactions(self, tx_ids: HashSet<UnminedTxId>) -> Vec<UnminedTxId> {
|
||||||
|
tx_ids
|
||||||
|
.into_iter()
|
||||||
|
.filter(|tx| self.rejected.contains_key(tx))
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -47,6 +47,19 @@ fn mempool_storage_basic_for_network(network: Network) -> Result<()> {
|
||||||
assert!(!storage.clone().contains(&tx.id));
|
assert!(!storage.clone().contains(&tx.id));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Query all the ids we have for rejected, get back `total - MEMPOOL_SIZE`
|
||||||
|
let all_ids: HashSet<UnminedTxId> = unmined_transactions.iter().map(|tx| tx.id).collect();
|
||||||
|
let rejected_ids: HashSet<UnminedTxId> = unmined_transactions
|
||||||
|
.iter()
|
||||||
|
.take(total_transactions - MEMPOOL_SIZE)
|
||||||
|
.map(|tx| tx.id)
|
||||||
|
.collect();
|
||||||
|
// Convert response to a `HashSet` as we need a fixed order to compare.
|
||||||
|
let rejected_response: HashSet<UnminedTxId> =
|
||||||
|
storage.rejected_transactions(all_ids).into_iter().collect();
|
||||||
|
|
||||||
|
assert_eq!(rejected_response, rejected_ids);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -22,15 +22,21 @@ async fn mempool_service_basic() -> Result<(), Report> {
|
||||||
.oneshot(Request::TransactionIds)
|
.oneshot(Request::TransactionIds)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let transaction_ids = match response {
|
let genesis_transaction_ids = match response {
|
||||||
Response::TransactionIds(ids) => ids,
|
Response::TransactionIds(ids) => ids,
|
||||||
_ => unreachable!("will never happen in this test"),
|
_ => unreachable!("will never happen in this test"),
|
||||||
};
|
};
|
||||||
|
|
||||||
// Test `Request::TransactionsById`
|
// Test `Request::TransactionsById`
|
||||||
let hash_set = transaction_ids.iter().copied().collect::<HashSet<_>>();
|
let genesis_transactions_hash_set = genesis_transaction_ids
|
||||||
|
.iter()
|
||||||
|
.copied()
|
||||||
|
.collect::<HashSet<_>>();
|
||||||
let response = service
|
let response = service
|
||||||
.oneshot(Request::TransactionsById(hash_set))
|
.clone()
|
||||||
|
.oneshot(Request::TransactionsById(
|
||||||
|
genesis_transactions_hash_set.clone(),
|
||||||
|
))
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let transactions = match response {
|
let transactions = match response {
|
||||||
|
|
@ -42,5 +48,26 @@ async fn mempool_service_basic() -> Result<(), Report> {
|
||||||
// response of `Request::TransactionsById`
|
// response of `Request::TransactionsById`
|
||||||
assert_eq!(genesis_transactions.1[0], transactions[0]);
|
assert_eq!(genesis_transactions.1[0], transactions[0]);
|
||||||
|
|
||||||
|
// Insert more transactions into the mempool storage.
|
||||||
|
// This will cause the genesis transaction to be moved into rejected.
|
||||||
|
let more_transactions = unmined_transactions_in_blocks(10, network);
|
||||||
|
for tx in more_transactions.1.iter().skip(1) {
|
||||||
|
service.storage.insert(tx.clone())?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test `Request::RejectedTransactionIds`
|
||||||
|
let response = service
|
||||||
|
.oneshot(Request::RejectedTransactionIds(
|
||||||
|
genesis_transactions_hash_set,
|
||||||
|
))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let rejected_ids = match response {
|
||||||
|
Response::RejectedTransactionIds(ids) => ids,
|
||||||
|
_ => unreachable!("will never happen in this test"),
|
||||||
|
};
|
||||||
|
|
||||||
|
assert_eq!(rejected_ids, genesis_transaction_ids);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue