Move transaction download and verify stream into the mempool service (#2741)
* Move transaction dowloader and verifier into the mempool service * add test for `Storage::contains_rejected()` * Rename DownloadAndVerify->Queue; move should_download_or_verify() to previous impl * GossipedTx -> Gossip * Revamp error handling Co-authored-by: Alfredo Garcia <oxarbitrage@gmail.com> Co-authored-by: Deirdre Connolly <deirdre@zfnd.org>
This commit is contained in:
parent
d307d43e19
commit
8825a52bb8
|
|
@ -68,10 +68,6 @@ impl StartCmd {
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
info!("initializing mempool");
|
|
||||||
let mempool_service = BoxService::new(Mempool::new(config.network.network));
|
|
||||||
let mempool = ServiceBuilder::new().buffer(20).service(mempool_service);
|
|
||||||
|
|
||||||
info!("initializing network");
|
info!("initializing network");
|
||||||
// The service that our node uses to respond to requests by peers. The
|
// The service that our node uses to respond to requests by peers. The
|
||||||
// load_shed middleware ensures that we reduce the size of the peer set
|
// load_shed middleware ensures that we reduce the size of the peer set
|
||||||
|
|
@ -84,14 +80,22 @@ impl StartCmd {
|
||||||
setup_rx,
|
setup_rx,
|
||||||
state.clone(),
|
state.clone(),
|
||||||
chain_verifier.clone(),
|
chain_verifier.clone(),
|
||||||
tx_verifier.clone(),
|
|
||||||
mempool.clone(),
|
|
||||||
));
|
));
|
||||||
|
|
||||||
let (peer_set, address_book) =
|
let (peer_set, address_book) =
|
||||||
zebra_network::init(config.network.clone(), inbound, latest_chain_tip).await;
|
zebra_network::init(config.network.clone(), inbound, latest_chain_tip).await;
|
||||||
|
|
||||||
|
info!("initializing mempool");
|
||||||
|
let mempool_service = BoxService::new(Mempool::new(
|
||||||
|
config.network.network,
|
||||||
|
peer_set.clone(),
|
||||||
|
state.clone(),
|
||||||
|
tx_verifier,
|
||||||
|
));
|
||||||
|
let mempool = ServiceBuilder::new().buffer(20).service(mempool_service);
|
||||||
|
|
||||||
setup_tx
|
setup_tx
|
||||||
.send((peer_set.clone(), address_book))
|
.send((peer_set.clone(), address_book, mempool))
|
||||||
.map_err(|_| eyre!("could not send setup data to inbound service"))?;
|
.map_err(|_| eyre!("could not send setup data to inbound service"))?;
|
||||||
|
|
||||||
info!("initializing syncer");
|
info!("initializing syncer");
|
||||||
|
|
|
||||||
|
|
@ -12,5 +12,8 @@ mod sync;
|
||||||
pub mod tokio;
|
pub mod tokio;
|
||||||
pub mod tracing;
|
pub mod tracing;
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub(crate) mod tests;
|
||||||
|
|
||||||
pub use inbound::Inbound;
|
pub use inbound::Inbound;
|
||||||
pub use sync::ChainSync;
|
pub use sync::ChainSync;
|
||||||
|
|
|
||||||
|
|
@ -17,19 +17,12 @@ use zebra_network as zn;
|
||||||
use zebra_state as zs;
|
use zebra_state as zs;
|
||||||
|
|
||||||
use zebra_chain::block::{self, Block};
|
use zebra_chain::block::{self, Block};
|
||||||
use zebra_consensus::transaction;
|
use zebra_consensus::chain::VerifyChainError;
|
||||||
use zebra_consensus::{chain::VerifyChainError, error::TransactionError};
|
|
||||||
use zebra_network::AddressBook;
|
use zebra_network::AddressBook;
|
||||||
|
|
||||||
use super::mempool::{
|
|
||||||
self as mp,
|
|
||||||
downloads::{
|
|
||||||
Downloads as TxDownloads, TRANSACTION_DOWNLOAD_TIMEOUT, TRANSACTION_VERIFY_TIMEOUT,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
// Re-use the syncer timeouts for consistency.
|
// Re-use the syncer timeouts for consistency.
|
||||||
use super::{
|
use super::{
|
||||||
mempool,
|
mempool, mempool as mp,
|
||||||
sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT},
|
sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -43,14 +36,9 @@ type Outbound = Buffer<BoxService<zn::Request, zn::Response, zn::BoxError>, zn::
|
||||||
type State = Buffer<BoxService<zs::Request, zs::Response, zs::BoxError>, zs::Request>;
|
type State = Buffer<BoxService<zs::Request, zs::Response, zs::BoxError>, zs::Request>;
|
||||||
type Mempool = Buffer<BoxService<mp::Request, mp::Response, mp::BoxError>, mp::Request>;
|
type Mempool = Buffer<BoxService<mp::Request, mp::Response, mp::BoxError>, mp::Request>;
|
||||||
type BlockVerifier = Buffer<BoxService<Arc<Block>, block::Hash, VerifyChainError>, Arc<Block>>;
|
type BlockVerifier = Buffer<BoxService<Arc<Block>, block::Hash, VerifyChainError>, Arc<Block>>;
|
||||||
type TxVerifier = Buffer<
|
|
||||||
BoxService<transaction::Request, transaction::Response, TransactionError>,
|
|
||||||
transaction::Request,
|
|
||||||
>;
|
|
||||||
type InboundBlockDownloads = BlockDownloads<Timeout<Outbound>, Timeout<BlockVerifier>, State>;
|
type InboundBlockDownloads = BlockDownloads<Timeout<Outbound>, Timeout<BlockVerifier>, State>;
|
||||||
type InboundTxDownloads = TxDownloads<Timeout<Outbound>, Timeout<TxVerifier>, State, Mempool>;
|
|
||||||
|
|
||||||
pub type NetworkSetupData = (Outbound, Arc<std::sync::Mutex<AddressBook>>);
|
pub type NetworkSetupData = (Outbound, Arc<std::sync::Mutex<AddressBook>>, Mempool);
|
||||||
|
|
||||||
/// Tracks the internal state of the [`Inbound`] service during network setup.
|
/// Tracks the internal state of the [`Inbound`] service during network setup.
|
||||||
pub enum Setup {
|
pub enum Setup {
|
||||||
|
|
@ -66,10 +54,6 @@ pub enum Setup {
|
||||||
/// A service that verifies downloaded blocks. Given to `block_downloads`
|
/// A service that verifies downloaded blocks. Given to `block_downloads`
|
||||||
/// after the network is set up.
|
/// after the network is set up.
|
||||||
block_verifier: BlockVerifier,
|
block_verifier: BlockVerifier,
|
||||||
|
|
||||||
/// A service that verifies downloaded transactions. Given to `tx_downloads`
|
|
||||||
/// after the network is set up.
|
|
||||||
tx_verifier: TxVerifier,
|
|
||||||
},
|
},
|
||||||
|
|
||||||
/// Network setup is complete.
|
/// Network setup is complete.
|
||||||
|
|
@ -82,7 +66,8 @@ pub enum Setup {
|
||||||
/// A `futures::Stream` that downloads and verifies gossiped blocks.
|
/// A `futures::Stream` that downloads and verifies gossiped blocks.
|
||||||
block_downloads: Pin<Box<InboundBlockDownloads>>,
|
block_downloads: Pin<Box<InboundBlockDownloads>>,
|
||||||
|
|
||||||
tx_downloads: Pin<Box<InboundTxDownloads>>,
|
/// A service that manages transactions in the memory pool.
|
||||||
|
mempool: Mempool,
|
||||||
},
|
},
|
||||||
|
|
||||||
/// Temporary state used in the service's internal network initialization
|
/// Temporary state used in the service's internal network initialization
|
||||||
|
|
@ -136,9 +121,6 @@ pub struct Inbound {
|
||||||
|
|
||||||
/// A service that manages cached blockchain state.
|
/// A service that manages cached blockchain state.
|
||||||
state: State,
|
state: State,
|
||||||
|
|
||||||
/// A service that manages transactions in the memory pool.
|
|
||||||
mempool: Mempool,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Inbound {
|
impl Inbound {
|
||||||
|
|
@ -146,17 +128,13 @@ impl Inbound {
|
||||||
network_setup: oneshot::Receiver<NetworkSetupData>,
|
network_setup: oneshot::Receiver<NetworkSetupData>,
|
||||||
state: State,
|
state: State,
|
||||||
block_verifier: BlockVerifier,
|
block_verifier: BlockVerifier,
|
||||||
tx_verifier: TxVerifier,
|
|
||||||
mempool: Mempool,
|
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
network_setup: Setup::AwaitingNetwork {
|
network_setup: Setup::AwaitingNetwork {
|
||||||
network_setup,
|
network_setup,
|
||||||
block_verifier,
|
block_verifier,
|
||||||
tx_verifier,
|
|
||||||
},
|
},
|
||||||
state,
|
state,
|
||||||
mempool,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -187,25 +165,19 @@ impl Service<zn::Request> for Inbound {
|
||||||
Setup::AwaitingNetwork {
|
Setup::AwaitingNetwork {
|
||||||
mut network_setup,
|
mut network_setup,
|
||||||
block_verifier,
|
block_verifier,
|
||||||
tx_verifier,
|
|
||||||
} => match network_setup.try_recv() {
|
} => match network_setup.try_recv() {
|
||||||
Ok((outbound, address_book)) => {
|
Ok((outbound, address_book, mempool)) => {
|
||||||
let block_downloads = Box::pin(BlockDownloads::new(
|
let block_downloads = Box::pin(BlockDownloads::new(
|
||||||
Timeout::new(outbound.clone(), BLOCK_DOWNLOAD_TIMEOUT),
|
Timeout::new(outbound.clone(), BLOCK_DOWNLOAD_TIMEOUT),
|
||||||
Timeout::new(block_verifier, BLOCK_VERIFY_TIMEOUT),
|
Timeout::new(block_verifier, BLOCK_VERIFY_TIMEOUT),
|
||||||
self.state.clone(),
|
self.state.clone(),
|
||||||
));
|
));
|
||||||
let tx_downloads = Box::pin(TxDownloads::new(
|
|
||||||
Timeout::new(outbound, TRANSACTION_DOWNLOAD_TIMEOUT),
|
|
||||||
Timeout::new(tx_verifier, TRANSACTION_VERIFY_TIMEOUT),
|
|
||||||
self.state.clone(),
|
|
||||||
self.mempool.clone(),
|
|
||||||
));
|
|
||||||
result = Ok(());
|
result = Ok(());
|
||||||
Setup::Initialized {
|
Setup::Initialized {
|
||||||
address_book,
|
address_book,
|
||||||
block_downloads,
|
block_downloads,
|
||||||
tx_downloads,
|
mempool,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(TryRecvError::Empty) => {
|
Err(TryRecvError::Empty) => {
|
||||||
|
|
@ -214,7 +186,6 @@ impl Service<zn::Request> for Inbound {
|
||||||
Setup::AwaitingNetwork {
|
Setup::AwaitingNetwork {
|
||||||
network_setup,
|
network_setup,
|
||||||
block_verifier,
|
block_verifier,
|
||||||
tx_verifier,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(error @ TryRecvError::Closed) => {
|
Err(error @ TryRecvError::Closed) => {
|
||||||
|
|
@ -236,16 +207,15 @@ impl Service<zn::Request> for Inbound {
|
||||||
Setup::Initialized {
|
Setup::Initialized {
|
||||||
address_book,
|
address_book,
|
||||||
mut block_downloads,
|
mut block_downloads,
|
||||||
mut tx_downloads,
|
mempool,
|
||||||
} => {
|
} => {
|
||||||
while let Poll::Ready(Some(_)) = block_downloads.as_mut().poll_next(cx) {}
|
while let Poll::Ready(Some(_)) = block_downloads.as_mut().poll_next(cx) {}
|
||||||
while let Poll::Ready(Some(_)) = tx_downloads.as_mut().poll_next(cx) {}
|
|
||||||
|
|
||||||
result = Ok(());
|
result = Ok(());
|
||||||
Setup::Initialized {
|
Setup::Initialized {
|
||||||
address_book,
|
address_book,
|
||||||
block_downloads,
|
block_downloads,
|
||||||
tx_downloads,
|
mempool,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
@ -326,13 +296,16 @@ impl Service<zn::Request> for Inbound {
|
||||||
.boxed()
|
.boxed()
|
||||||
}
|
}
|
||||||
zn::Request::TransactionsById(transactions) => {
|
zn::Request::TransactionsById(transactions) => {
|
||||||
|
if let Setup::Initialized { mempool, .. } = &mut self.network_setup {
|
||||||
let request = mempool::Request::TransactionsById(transactions);
|
let request = mempool::Request::TransactionsById(transactions);
|
||||||
self.mempool.clone().oneshot(request).map_ok(|resp| match resp {
|
mempool.clone().oneshot(request).map_ok(|resp| match resp {
|
||||||
mempool::Response::Transactions(transactions) => zn::Response::Transactions(transactions),
|
mempool::Response::Transactions(transactions) => zn::Response::Transactions(transactions),
|
||||||
_ => unreachable!("Mempool component should always respond to a `TransactionsById` request with a `Transactions` response"),
|
_ => unreachable!("Mempool component should always respond to a `TransactionsById` request with a `Transactions` response"),
|
||||||
})
|
})
|
||||||
.boxed()
|
.boxed()
|
||||||
|
} else {
|
||||||
|
async { Ok(zn::Response::Transactions(Default::default())) }.boxed()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
zn::Request::FindBlocks { known_blocks, stop } => {
|
zn::Request::FindBlocks { known_blocks, stop } => {
|
||||||
let request = zs::Request::FindBlockHashes { known_blocks, stop };
|
let request = zs::Request::FindBlockHashes { known_blocks, stop };
|
||||||
|
|
@ -353,26 +326,36 @@ impl Service<zn::Request> for Inbound {
|
||||||
.boxed()
|
.boxed()
|
||||||
}
|
}
|
||||||
zn::Request::PushTransaction(transaction) => {
|
zn::Request::PushTransaction(transaction) => {
|
||||||
if let Setup::Initialized { tx_downloads, .. } = &mut self.network_setup {
|
if let Setup::Initialized { mempool, .. } = &mut self.network_setup {
|
||||||
tx_downloads.download_if_needed_and_verify(transaction.into());
|
mempool
|
||||||
|
.clone()
|
||||||
|
.oneshot(mempool::Request::Queue(vec![transaction.into()]))
|
||||||
|
// The response just indicates if processing was queued or not; ignore it
|
||||||
|
.map_ok(|_resp| zn::Response::Nil)
|
||||||
|
.boxed()
|
||||||
} else {
|
} else {
|
||||||
info!(
|
info!(
|
||||||
"ignoring `AdvertiseTransactionIds` request from remote peer during network setup"
|
?transaction.id,
|
||||||
|
"ignoring `PushTransaction` request from remote peer during network setup"
|
||||||
);
|
);
|
||||||
|
async { Ok(zn::Response::TransactionIds(Default::default())) }.boxed()
|
||||||
}
|
}
|
||||||
async { Ok(zn::Response::Nil) }.boxed()
|
|
||||||
}
|
}
|
||||||
zn::Request::AdvertiseTransactionIds(transactions) => {
|
zn::Request::AdvertiseTransactionIds(transactions) => {
|
||||||
if let Setup::Initialized { tx_downloads, .. } = &mut self.network_setup {
|
if let Setup::Initialized { mempool, .. } = &mut self.network_setup {
|
||||||
for txid in transactions {
|
let transactions = transactions.into_iter().map(Into::into).collect();
|
||||||
tx_downloads.download_if_needed_and_verify(txid.into());
|
mempool
|
||||||
}
|
.clone()
|
||||||
|
.oneshot(mempool::Request::Queue(transactions))
|
||||||
|
// The response just indicates if processing was queued or not; ignore it
|
||||||
|
.map_ok(|_resp| zn::Response::Nil)
|
||||||
|
.boxed()
|
||||||
} else {
|
} else {
|
||||||
info!(
|
info!(
|
||||||
"ignoring `AdvertiseTransactionIds` request from remote peer during network setup"
|
"ignoring `AdvertiseTransactionIds` request from remote peer during network setup"
|
||||||
);
|
);
|
||||||
|
async { Ok(zn::Response::TransactionIds(Default::default())) }.boxed()
|
||||||
}
|
}
|
||||||
async { Ok(zn::Response::Nil) }.boxed()
|
|
||||||
}
|
}
|
||||||
zn::Request::AdvertiseBlock(hash) => {
|
zn::Request::AdvertiseBlock(hash) => {
|
||||||
if let Setup::Initialized {
|
if let Setup::Initialized {
|
||||||
|
|
@ -389,11 +372,15 @@ impl Service<zn::Request> for Inbound {
|
||||||
async { Ok(zn::Response::Nil) }.boxed()
|
async { Ok(zn::Response::Nil) }.boxed()
|
||||||
}
|
}
|
||||||
zn::Request::MempoolTransactionIds => {
|
zn::Request::MempoolTransactionIds => {
|
||||||
self.mempool.clone().oneshot(mempool::Request::TransactionIds).map_ok(|resp| match resp {
|
if let Setup::Initialized { mempool, .. } = &mut self.network_setup {
|
||||||
|
mempool.clone().oneshot(mempool::Request::TransactionIds).map_ok(|resp| match resp {
|
||||||
mempool::Response::TransactionIds(transaction_ids) => zn::Response::TransactionIds(transaction_ids),
|
mempool::Response::TransactionIds(transaction_ids) => zn::Response::TransactionIds(transaction_ids),
|
||||||
_ => unreachable!("Mempool component should always respond to a `TransactionIds` request with a `TransactionIds` response"),
|
_ => unreachable!("Mempool component should always respond to a `TransactionIds` request with a `TransactionIds` response"),
|
||||||
})
|
})
|
||||||
.boxed()
|
.boxed()
|
||||||
|
} else {
|
||||||
|
async { Ok(zn::Response::TransactionIds(Default::default())) }.boxed()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
zn::Request::Ping(_) => {
|
zn::Request::Ping(_) => {
|
||||||
unreachable!("ping requests are handled internally");
|
unreachable!("ping requests are handled internally");
|
||||||
|
|
|
||||||
|
|
@ -1,16 +1,18 @@
|
||||||
use std::collections::HashSet;
|
use std::{collections::HashSet, net::SocketAddr, str::FromStr, sync::Arc};
|
||||||
|
|
||||||
use super::mempool::{unmined_transactions_in_blocks, Mempool};
|
use super::mempool::{unmined_transactions_in_blocks, Mempool};
|
||||||
|
use crate::components::tests::mock_peer_set;
|
||||||
|
|
||||||
use tokio::sync::oneshot;
|
use tokio::sync::oneshot;
|
||||||
use tower::{builder::ServiceBuilder, util::BoxService, ServiceExt};
|
use tower::{builder::ServiceBuilder, util::BoxService, ServiceExt};
|
||||||
|
|
||||||
|
use tracing::Span;
|
||||||
use zebra_chain::{
|
use zebra_chain::{
|
||||||
parameters::Network,
|
parameters::Network,
|
||||||
transaction::{UnminedTx, UnminedTxId},
|
transaction::{UnminedTx, UnminedTxId},
|
||||||
};
|
};
|
||||||
use zebra_consensus::Config as ConsensusConfig;
|
use zebra_consensus::Config as ConsensusConfig;
|
||||||
use zebra_network::{Request, Response};
|
use zebra_network::{AddressBook, Request, Response};
|
||||||
use zebra_state::Config as StateConfig;
|
use zebra_state::Config as StateConfig;
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
|
@ -18,10 +20,23 @@ async fn mempool_requests_for_transactions() {
|
||||||
let network = Network::Mainnet;
|
let network = Network::Mainnet;
|
||||||
let consensus_config = ConsensusConfig::default();
|
let consensus_config = ConsensusConfig::default();
|
||||||
let state_config = StateConfig::ephemeral();
|
let state_config = StateConfig::ephemeral();
|
||||||
|
let (peer_set, _) = mock_peer_set();
|
||||||
|
let address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Span::none());
|
||||||
|
let address_book = Arc::new(std::sync::Mutex::new(address_book));
|
||||||
|
|
||||||
let (state, _, _) = zebra_state::init(state_config, network);
|
let (state, _, _) = zebra_state::init(state_config, network);
|
||||||
let state_service = ServiceBuilder::new().buffer(1).service(state);
|
let state_service = ServiceBuilder::new().buffer(1).service(state);
|
||||||
let mut mempool_service = Mempool::new(network);
|
|
||||||
|
let (block_verifier, transaction_verifier) =
|
||||||
|
zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone())
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let mut mempool_service = Mempool::new(
|
||||||
|
network,
|
||||||
|
peer_set.clone(),
|
||||||
|
state_service.clone(),
|
||||||
|
transaction_verifier,
|
||||||
|
);
|
||||||
|
|
||||||
let added_transactions = add_some_stuff_to_mempool(&mut mempool_service, network);
|
let added_transactions = add_some_stuff_to_mempool(&mut mempool_service, network);
|
||||||
let added_transaction_ids: Vec<UnminedTxId> = added_transactions.iter().map(|t| t.id).collect();
|
let added_transaction_ids: Vec<UnminedTxId> = added_transactions.iter().map(|t| t.id).collect();
|
||||||
|
|
@ -29,10 +44,7 @@ async fn mempool_requests_for_transactions() {
|
||||||
let mempool_service = BoxService::new(mempool_service);
|
let mempool_service = BoxService::new(mempool_service);
|
||||||
let mempool = ServiceBuilder::new().buffer(1).service(mempool_service);
|
let mempool = ServiceBuilder::new().buffer(1).service(mempool_service);
|
||||||
|
|
||||||
let (block_verifier, transaction_verifier) =
|
let (setup_tx, setup_rx) = oneshot::channel();
|
||||||
zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone())
|
|
||||||
.await;
|
|
||||||
let (_setup_tx, setup_rx) = oneshot::channel();
|
|
||||||
|
|
||||||
let inbound_service = ServiceBuilder::new()
|
let inbound_service = ServiceBuilder::new()
|
||||||
.load_shed()
|
.load_shed()
|
||||||
|
|
@ -41,10 +53,12 @@ async fn mempool_requests_for_transactions() {
|
||||||
setup_rx,
|
setup_rx,
|
||||||
state_service,
|
state_service,
|
||||||
block_verifier.clone(),
|
block_verifier.clone(),
|
||||||
transaction_verifier.clone(),
|
|
||||||
mempool,
|
|
||||||
));
|
));
|
||||||
|
|
||||||
|
let r = setup_tx.send((peer_set.clone(), address_book, mempool));
|
||||||
|
// We can't expect or unwrap because the returned Result does not implement Debug
|
||||||
|
assert!(r.is_ok());
|
||||||
|
|
||||||
// Test `Request::MempoolTransactionIds`
|
// Test `Request::MempoolTransactionIds`
|
||||||
let request = inbound_service
|
let request = inbound_service
|
||||||
.clone()
|
.clone()
|
||||||
|
|
|
||||||
|
|
@ -7,13 +7,16 @@ use std::{
|
||||||
task::{Context, Poll},
|
task::{Context, Poll},
|
||||||
};
|
};
|
||||||
|
|
||||||
use futures::future::FutureExt;
|
use futures::{future::FutureExt, stream::Stream};
|
||||||
use tower::Service;
|
use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service};
|
||||||
|
|
||||||
use zebra_chain::{
|
use zebra_chain::{
|
||||||
parameters::Network,
|
parameters::Network,
|
||||||
transaction::{UnminedTx, UnminedTxId},
|
transaction::{UnminedTx, UnminedTxId},
|
||||||
};
|
};
|
||||||
|
use zebra_consensus::{error::TransactionError, transaction};
|
||||||
|
use zebra_network as zn;
|
||||||
|
use zebra_state as zs;
|
||||||
|
|
||||||
pub use crate::BoxError;
|
pub use crate::BoxError;
|
||||||
|
|
||||||
|
|
@ -30,12 +33,25 @@ pub use self::error::MempoolError;
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub use self::storage::tests::unmined_transactions_in_blocks;
|
pub use self::storage::tests::unmined_transactions_in_blocks;
|
||||||
|
|
||||||
|
use self::downloads::{
|
||||||
|
Downloads as TxDownloads, Gossip, TRANSACTION_DOWNLOAD_TIMEOUT, TRANSACTION_VERIFY_TIMEOUT,
|
||||||
|
};
|
||||||
|
|
||||||
|
type Outbound = Buffer<BoxService<zn::Request, zn::Response, zn::BoxError>, zn::Request>;
|
||||||
|
type State = Buffer<BoxService<zs::Request, zs::Response, zs::BoxError>, zs::Request>;
|
||||||
|
type TxVerifier = Buffer<
|
||||||
|
BoxService<transaction::Request, transaction::Response, TransactionError>,
|
||||||
|
transaction::Request,
|
||||||
|
>;
|
||||||
|
type InboundTxDownloads = TxDownloads<Timeout<Outbound>, Timeout<TxVerifier>, State>;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub enum Request {
|
pub enum Request {
|
||||||
TransactionIds,
|
TransactionIds,
|
||||||
TransactionsById(HashSet<UnminedTxId>),
|
TransactionsById(HashSet<UnminedTxId>),
|
||||||
RejectedTransactionIds(HashSet<UnminedTxId>),
|
RejectedTransactionIds(HashSet<UnminedTxId>),
|
||||||
|
Queue(Vec<Gossip>),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
|
@ -43,6 +59,7 @@ pub enum Response {
|
||||||
Transactions(Vec<UnminedTx>),
|
Transactions(Vec<UnminedTx>),
|
||||||
TransactionIds(Vec<UnminedTxId>),
|
TransactionIds(Vec<UnminedTxId>),
|
||||||
RejectedTransactionIds(Vec<UnminedTxId>),
|
RejectedTransactionIds(Vec<UnminedTxId>),
|
||||||
|
Queued(Vec<Result<(), MempoolError>>),
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Mempool async management and query service.
|
/// Mempool async management and query service.
|
||||||
|
|
@ -50,20 +67,33 @@ pub enum Response {
|
||||||
/// The mempool is the set of all verified transactions that this node is aware
|
/// The mempool is the set of all verified transactions that this node is aware
|
||||||
/// of that have yet to be confirmed by the Zcash network. A transaction is
|
/// of that have yet to be confirmed by the Zcash network. A transaction is
|
||||||
/// confirmed when it has been included in a block ('mined').
|
/// confirmed when it has been included in a block ('mined').
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct Mempool {
|
pub struct Mempool {
|
||||||
/// The Mempool storage itself.
|
/// The Mempool storage itself.
|
||||||
///
|
///
|
||||||
/// ##: Correctness: only components internal to the [`Mempool`] struct are allowed to
|
/// ##: Correctness: only components internal to the [`Mempool`] struct are allowed to
|
||||||
/// inject transactions into `storage`, as transactions must be verified beforehand.
|
/// inject transactions into `storage`, as transactions must be verified beforehand.
|
||||||
storage: storage::Storage,
|
storage: storage::Storage,
|
||||||
|
|
||||||
|
/// The transaction dowload and verify stream.
|
||||||
|
tx_downloads: Pin<Box<InboundTxDownloads>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Mempool {
|
impl Mempool {
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub(crate) fn new(_network: Network) -> Self {
|
pub(crate) fn new(
|
||||||
|
_network: Network,
|
||||||
|
outbound: Outbound,
|
||||||
|
state: State,
|
||||||
|
tx_verifier: TxVerifier,
|
||||||
|
) -> Self {
|
||||||
|
let tx_downloads = Box::pin(TxDownloads::new(
|
||||||
|
Timeout::new(outbound, TRANSACTION_DOWNLOAD_TIMEOUT),
|
||||||
|
Timeout::new(tx_verifier, TRANSACTION_VERIFY_TIMEOUT),
|
||||||
|
state,
|
||||||
|
));
|
||||||
Mempool {
|
Mempool {
|
||||||
storage: Default::default(),
|
storage: Default::default(),
|
||||||
|
tx_downloads,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -72,6 +102,21 @@ impl Mempool {
|
||||||
pub fn storage(&mut self) -> &mut storage::Storage {
|
pub fn storage(&mut self) -> &mut storage::Storage {
|
||||||
&mut self.storage
|
&mut self.storage
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Check if transaction should be downloaded and/or verified.
|
||||||
|
///
|
||||||
|
/// If it is already in the mempool (or in its rejected list)
|
||||||
|
/// then it shouldn't be downloaded/verified.
|
||||||
|
fn should_download_or_verify(&mut self, txid: UnminedTxId) -> Result<(), MempoolError> {
|
||||||
|
// Check if the transaction is already in the mempool.
|
||||||
|
if self.storage.clone().contains(&txid) {
|
||||||
|
return Err(MempoolError::InMempool);
|
||||||
|
}
|
||||||
|
if self.storage.clone().contains_rejected(&txid) {
|
||||||
|
return Err(MempoolError::Rejected);
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Service<Request> for Mempool {
|
impl Service<Request> for Mempool {
|
||||||
|
|
@ -80,7 +125,14 @@ impl Service<Request> for Mempool {
|
||||||
type Future =
|
type Future =
|
||||||
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
|
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
|
||||||
|
|
||||||
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
|
// Clean up completed download tasks and add to mempool if successful
|
||||||
|
while let Poll::Ready(Some(r)) = self.tx_downloads.as_mut().poll_next(cx) {
|
||||||
|
if let Ok(tx) = r {
|
||||||
|
// TODO: should we do something with the result?
|
||||||
|
let _ = self.storage.insert(tx);
|
||||||
|
}
|
||||||
|
}
|
||||||
Poll::Ready(Ok(()))
|
Poll::Ready(Ok(()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -100,6 +152,18 @@ impl Service<Request> for Mempool {
|
||||||
.map(Response::RejectedTransactionIds);
|
.map(Response::RejectedTransactionIds);
|
||||||
async move { rsp }.boxed()
|
async move { rsp }.boxed()
|
||||||
}
|
}
|
||||||
|
Request::Queue(gossiped_txs) => {
|
||||||
|
let rsp: Vec<Result<(), MempoolError>> = gossiped_txs
|
||||||
|
.into_iter()
|
||||||
|
.map(|gossiped_tx| {
|
||||||
|
self.should_download_or_verify(gossiped_tx.id())?;
|
||||||
|
self.tx_downloads
|
||||||
|
.download_if_needed_and_verify(gossiped_tx)?;
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
async move { Ok(Response::Queued(rsp)) }.boxed()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,13 +1,11 @@
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use proptest::prelude::*;
|
use proptest::prelude::*;
|
||||||
use tokio::{
|
use tokio::time::{self, timeout};
|
||||||
sync::mpsc::{self, UnboundedReceiver},
|
|
||||||
time::{self, timeout},
|
|
||||||
};
|
|
||||||
use tower::{buffer::Buffer, util::BoxService, BoxError};
|
|
||||||
|
|
||||||
use zebra_network::{Request, Response};
|
use zebra_network::Request;
|
||||||
|
|
||||||
|
use crate::components::tests::mock_peer_set;
|
||||||
|
|
||||||
use super::{Crawler, SyncStatus, FANOUT, RATE_LIMIT_DELAY};
|
use super::{Crawler, SyncStatus, FANOUT, RATE_LIMIT_DELAY};
|
||||||
|
|
||||||
|
|
@ -78,29 +76,3 @@ proptest! {
|
||||||
})?;
|
})?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a mock service to represent a [`PeerSet`][zebra_network::PeerSet] and intercept the
|
|
||||||
/// requests it receives.
|
|
||||||
///
|
|
||||||
/// The intercepted requests are sent through an unbounded channel to the receiver that's also
|
|
||||||
/// returned from this function.
|
|
||||||
fn mock_peer_set() -> (
|
|
||||||
Buffer<BoxService<Request, Response, BoxError>, Request>,
|
|
||||||
UnboundedReceiver<Request>,
|
|
||||||
) {
|
|
||||||
let (sender, receiver) = mpsc::unbounded_channel();
|
|
||||||
|
|
||||||
let proxy_service = tower::service_fn(move |request| {
|
|
||||||
let sender = sender.clone();
|
|
||||||
|
|
||||||
async move {
|
|
||||||
let _ = sender.send(request);
|
|
||||||
|
|
||||||
Ok(Response::TransactionIds(vec![]))
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
let service = Buffer::new(BoxService::new(proxy_service), 10);
|
|
||||||
|
|
||||||
(service, receiver)
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -21,9 +21,10 @@ use zebra_consensus::transaction as tx;
|
||||||
use zebra_network as zn;
|
use zebra_network as zn;
|
||||||
use zebra_state as zs;
|
use zebra_state as zs;
|
||||||
|
|
||||||
use crate::components::mempool as mp;
|
|
||||||
use crate::components::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT};
|
use crate::components::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT};
|
||||||
|
|
||||||
|
use super::MempoolError;
|
||||||
|
|
||||||
type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
|
type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
|
||||||
|
|
||||||
/// Controls how long we wait for a transaction download request to complete.
|
/// Controls how long we wait for a transaction download request to complete.
|
||||||
|
|
@ -62,59 +63,41 @@ pub(crate) const TRANSACTION_VERIFY_TIMEOUT: Duration = BLOCK_VERIFY_TIMEOUT;
|
||||||
/// Since Zebra keeps an `inv` index, inbound downloads for malicious transactions
|
/// Since Zebra keeps an `inv` index, inbound downloads for malicious transactions
|
||||||
/// will be directed to the malicious node that originally gossiped the hash.
|
/// will be directed to the malicious node that originally gossiped the hash.
|
||||||
/// Therefore, this attack can be carried out by a single malicious node.
|
/// Therefore, this attack can be carried out by a single malicious node.
|
||||||
const MAX_INBOUND_CONCURRENCY: usize = 10;
|
pub(crate) const MAX_INBOUND_CONCURRENCY: usize = 10;
|
||||||
|
|
||||||
/// The action taken in response to a peer's gossiped transaction hash.
|
|
||||||
pub enum DownloadAction {
|
|
||||||
/// The transaction hash was successfully queued for download and verification.
|
|
||||||
AddedToQueue,
|
|
||||||
|
|
||||||
/// The transaction hash is already queued, so this request was ignored.
|
|
||||||
///
|
|
||||||
/// Another peer has already gossiped the same hash to us, or the mempool crawler has fetched it.
|
|
||||||
AlreadyQueued,
|
|
||||||
|
|
||||||
/// The queue is at capacity, so this request was ignored.
|
|
||||||
///
|
|
||||||
/// The mempool crawler should discover this transaction later.
|
|
||||||
/// If it is mined into a block, it will be downloaded by the syncer, or the inbound block downloader.
|
|
||||||
///
|
|
||||||
/// The queue's capacity is [`MAX_INBOUND_CONCURRENCY`].
|
|
||||||
FullQueue,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A gossiped transaction, which can be the transaction itself or just its ID.
|
/// A gossiped transaction, which can be the transaction itself or just its ID.
|
||||||
pub enum GossipedTx {
|
#[derive(Debug)]
|
||||||
|
pub enum Gossip {
|
||||||
Id(UnminedTxId),
|
Id(UnminedTxId),
|
||||||
Tx(UnminedTx),
|
Tx(UnminedTx),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl GossipedTx {
|
impl Gossip {
|
||||||
/// Return the [`UnminedTxId`] of a gossiped transaction.
|
/// Return the [`UnminedTxId`] of a gossiped transaction.
|
||||||
fn id(&self) -> UnminedTxId {
|
pub fn id(&self) -> UnminedTxId {
|
||||||
match self {
|
match self {
|
||||||
GossipedTx::Id(txid) => *txid,
|
Gossip::Id(txid) => *txid,
|
||||||
GossipedTx::Tx(tx) => tx.id,
|
Gossip::Tx(tx) => tx.id,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<UnminedTxId> for GossipedTx {
|
impl From<UnminedTxId> for Gossip {
|
||||||
fn from(txid: UnminedTxId) -> Self {
|
fn from(txid: UnminedTxId) -> Self {
|
||||||
GossipedTx::Id(txid)
|
Gossip::Id(txid)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<UnminedTx> for GossipedTx {
|
impl From<UnminedTx> for Gossip {
|
||||||
fn from(tx: UnminedTx) -> Self {
|
fn from(tx: UnminedTx) -> Self {
|
||||||
GossipedTx::Tx(tx)
|
Gossip::Tx(tx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Represents a [`Stream`] of download and verification tasks.
|
/// Represents a [`Stream`] of download and verification tasks.
|
||||||
#[pin_project]
|
#[pin_project]
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Downloads<ZN, ZV, ZS, ZM>
|
pub struct Downloads<ZN, ZV, ZS>
|
||||||
where
|
where
|
||||||
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + 'static,
|
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + 'static,
|
||||||
ZN::Future: Send,
|
ZN::Future: Send,
|
||||||
|
|
@ -122,8 +105,6 @@ where
|
||||||
ZV::Future: Send,
|
ZV::Future: Send,
|
||||||
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
|
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
|
||||||
ZS::Future: Send,
|
ZS::Future: Send,
|
||||||
ZM: Service<mp::Request, Response = mp::Response, Error = BoxError> + Send + Clone + 'static,
|
|
||||||
ZM::Future: Send,
|
|
||||||
{
|
{
|
||||||
// Services
|
// Services
|
||||||
/// A service that forwards requests to connected peers, and returns their
|
/// A service that forwards requests to connected peers, and returns their
|
||||||
|
|
@ -136,20 +117,17 @@ where
|
||||||
/// A service that manages cached blockchain state.
|
/// A service that manages cached blockchain state.
|
||||||
state: ZS,
|
state: ZS,
|
||||||
|
|
||||||
/// A service that manages the mempool.
|
|
||||||
mempool: ZM,
|
|
||||||
|
|
||||||
// Internal downloads state
|
// Internal downloads state
|
||||||
/// A list of pending transaction download and verify tasks.
|
/// A list of pending transaction download and verify tasks.
|
||||||
#[pin]
|
#[pin]
|
||||||
pending: FuturesUnordered<JoinHandle<Result<UnminedTxId, (BoxError, UnminedTxId)>>>,
|
pending: FuturesUnordered<JoinHandle<Result<UnminedTx, (BoxError, UnminedTxId)>>>,
|
||||||
|
|
||||||
/// A list of channels that can be used to cancel pending transaction download and
|
/// A list of channels that can be used to cancel pending transaction download and
|
||||||
/// verify tasks.
|
/// verify tasks.
|
||||||
cancel_handles: HashMap<UnminedTxId, oneshot::Sender<()>>,
|
cancel_handles: HashMap<UnminedTxId, oneshot::Sender<()>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<ZN, ZV, ZS, ZM> Stream for Downloads<ZN, ZV, ZS, ZM>
|
impl<ZN, ZV, ZS> Stream for Downloads<ZN, ZV, ZS>
|
||||||
where
|
where
|
||||||
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
|
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
|
||||||
ZN::Future: Send,
|
ZN::Future: Send,
|
||||||
|
|
@ -157,10 +135,8 @@ where
|
||||||
ZV::Future: Send,
|
ZV::Future: Send,
|
||||||
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
|
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
|
||||||
ZS::Future: Send,
|
ZS::Future: Send,
|
||||||
ZM: Service<mp::Request, Response = mp::Response, Error = BoxError> + Send + Clone + 'static,
|
|
||||||
ZM::Future: Send,
|
|
||||||
{
|
{
|
||||||
type Item = Result<UnminedTxId, BoxError>;
|
type Item = Result<UnminedTx, BoxError>;
|
||||||
|
|
||||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||||
let this = self.project();
|
let this = self.project();
|
||||||
|
|
@ -175,9 +151,9 @@ where
|
||||||
// TODO: this would be cleaner with poll_map (#2693)
|
// TODO: this would be cleaner with poll_map (#2693)
|
||||||
if let Some(join_result) = ready!(this.pending.poll_next(cx)) {
|
if let Some(join_result) = ready!(this.pending.poll_next(cx)) {
|
||||||
match join_result.expect("transaction download and verify tasks must not panic") {
|
match join_result.expect("transaction download and verify tasks must not panic") {
|
||||||
Ok(hash) => {
|
Ok(tx) => {
|
||||||
this.cancel_handles.remove(&hash);
|
this.cancel_handles.remove(&tx.id);
|
||||||
Poll::Ready(Some(Ok(hash)))
|
Poll::Ready(Some(Ok(tx)))
|
||||||
}
|
}
|
||||||
Err((e, hash)) => {
|
Err((e, hash)) => {
|
||||||
this.cancel_handles.remove(&hash);
|
this.cancel_handles.remove(&hash);
|
||||||
|
|
@ -194,7 +170,7 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<ZN, ZV, ZS, ZM> Downloads<ZN, ZV, ZS, ZM>
|
impl<ZN, ZV, ZS> Downloads<ZN, ZV, ZS>
|
||||||
where
|
where
|
||||||
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
|
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
|
||||||
ZN::Future: Send,
|
ZN::Future: Send,
|
||||||
|
|
@ -202,8 +178,6 @@ where
|
||||||
ZV::Future: Send,
|
ZV::Future: Send,
|
||||||
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
|
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
|
||||||
ZS::Future: Send,
|
ZS::Future: Send,
|
||||||
ZM: Service<mp::Request, Response = mp::Response, Error = BoxError> + Send + Clone + 'static,
|
|
||||||
ZM::Future: Send,
|
|
||||||
{
|
{
|
||||||
/// Initialize a new download stream with the provided `network` and
|
/// Initialize a new download stream with the provided `network` and
|
||||||
/// `verifier` services.
|
/// `verifier` services.
|
||||||
|
|
@ -211,12 +185,11 @@ where
|
||||||
/// The [`Downloads`] stream is agnostic to the network policy, so retry and
|
/// The [`Downloads`] stream is agnostic to the network policy, so retry and
|
||||||
/// timeout limits should be applied to the `network` service passed into
|
/// timeout limits should be applied to the `network` service passed into
|
||||||
/// this constructor.
|
/// this constructor.
|
||||||
pub fn new(network: ZN, verifier: ZV, state: ZS, mempool: ZM) -> Self {
|
pub fn new(network: ZN, verifier: ZV, state: ZS) -> Self {
|
||||||
Self {
|
Self {
|
||||||
network,
|
network,
|
||||||
verifier,
|
verifier,
|
||||||
state,
|
state,
|
||||||
mempool,
|
|
||||||
pending: FuturesUnordered::new(),
|
pending: FuturesUnordered::new(),
|
||||||
cancel_handles: HashMap::new(),
|
cancel_handles: HashMap::new(),
|
||||||
}
|
}
|
||||||
|
|
@ -226,7 +199,10 @@ where
|
||||||
///
|
///
|
||||||
/// Returns the action taken in response to the queue request.
|
/// Returns the action taken in response to the queue request.
|
||||||
#[instrument(skip(self, gossiped_tx), fields(txid = %gossiped_tx.id()))]
|
#[instrument(skip(self, gossiped_tx), fields(txid = %gossiped_tx.id()))]
|
||||||
pub fn download_if_needed_and_verify(&mut self, gossiped_tx: GossipedTx) -> DownloadAction {
|
pub fn download_if_needed_and_verify(
|
||||||
|
&mut self,
|
||||||
|
gossiped_tx: Gossip,
|
||||||
|
) -> Result<(), MempoolError> {
|
||||||
let txid = gossiped_tx.id();
|
let txid = gossiped_tx.id();
|
||||||
|
|
||||||
if self.cancel_handles.contains_key(&txid) {
|
if self.cancel_handles.contains_key(&txid) {
|
||||||
|
|
@ -236,7 +212,7 @@ where
|
||||||
?MAX_INBOUND_CONCURRENCY,
|
?MAX_INBOUND_CONCURRENCY,
|
||||||
"transaction id already queued for inbound download: ignored transaction"
|
"transaction id already queued for inbound download: ignored transaction"
|
||||||
);
|
);
|
||||||
return DownloadAction::AlreadyQueued;
|
return Err(MempoolError::AlreadyQueued);
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.pending.len() >= MAX_INBOUND_CONCURRENCY {
|
if self.pending.len() >= MAX_INBOUND_CONCURRENCY {
|
||||||
|
|
@ -246,7 +222,7 @@ where
|
||||||
?MAX_INBOUND_CONCURRENCY,
|
?MAX_INBOUND_CONCURRENCY,
|
||||||
"too many transactions queued for inbound download: ignored transaction"
|
"too many transactions queued for inbound download: ignored transaction"
|
||||||
);
|
);
|
||||||
return DownloadAction::FullQueue;
|
return Err(MempoolError::FullQueue);
|
||||||
}
|
}
|
||||||
|
|
||||||
// This oneshot is used to signal cancellation to the download task.
|
// This oneshot is used to signal cancellation to the download task.
|
||||||
|
|
@ -255,10 +231,10 @@ where
|
||||||
let network = self.network.clone();
|
let network = self.network.clone();
|
||||||
let verifier = self.verifier.clone();
|
let verifier = self.verifier.clone();
|
||||||
let mut state = self.state.clone();
|
let mut state = self.state.clone();
|
||||||
let mut mempool = self.mempool.clone();
|
|
||||||
|
|
||||||
let fut = async move {
|
let fut = async move {
|
||||||
Self::should_download_or_verify(&mut state, &mut mempool, txid).await?;
|
// Don't download/verify if the transaction is already in the state.
|
||||||
|
Self::transaction_in_state(&mut state, txid).await?;
|
||||||
|
|
||||||
let height = match state.oneshot(zs::Request::Tip).await {
|
let height = match state.oneshot(zs::Request::Tip).await {
|
||||||
Ok(zs::Response::Tip(None)) => Err("no block at the tip".into()),
|
Ok(zs::Response::Tip(None)) => Err("no block at the tip".into()),
|
||||||
|
|
@ -269,7 +245,7 @@ where
|
||||||
let height = (height + 1).ok_or_else(|| eyre!("no next height"))?;
|
let height = (height + 1).ok_or_else(|| eyre!("no next height"))?;
|
||||||
|
|
||||||
let tx = match gossiped_tx {
|
let tx = match gossiped_tx {
|
||||||
GossipedTx::Id(txid) => {
|
Gossip::Id(txid) => {
|
||||||
let req = zn::Request::TransactionsById(std::iter::once(txid).collect());
|
let req = zn::Request::TransactionsById(std::iter::once(txid).collect());
|
||||||
|
|
||||||
let tx = match network.oneshot(req).await? {
|
let tx = match network.oneshot(req).await? {
|
||||||
|
|
@ -282,7 +258,7 @@ where
|
||||||
metrics::counter!("gossip.downloaded.transaction.count", 1);
|
metrics::counter!("gossip.downloaded.transaction.count", 1);
|
||||||
tx
|
tx
|
||||||
}
|
}
|
||||||
GossipedTx::Tx(tx) => {
|
Gossip::Tx(tx) => {
|
||||||
metrics::counter!("gossip.pushed.transaction.count", 1);
|
metrics::counter!("gossip.pushed.transaction.count", 1);
|
||||||
tx
|
tx
|
||||||
}
|
}
|
||||||
|
|
@ -290,18 +266,19 @@ where
|
||||||
|
|
||||||
let result = verifier
|
let result = verifier
|
||||||
.oneshot(tx::Request::Mempool {
|
.oneshot(tx::Request::Mempool {
|
||||||
transaction: tx,
|
transaction: tx.clone(),
|
||||||
height,
|
height,
|
||||||
})
|
})
|
||||||
|
.map_ok(|_hash| tx)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
tracing::debug!(?txid, ?result, "verified transaction for the mempool");
|
tracing::debug!(?txid, ?result, "verified transaction for the mempool");
|
||||||
|
|
||||||
result
|
result
|
||||||
}
|
}
|
||||||
.map_ok(|hash| {
|
.map_ok(|tx| {
|
||||||
metrics::counter!("gossip.verified.transaction.count", 1);
|
metrics::counter!("gossip.verified.transaction.count", 1);
|
||||||
hash
|
tx
|
||||||
})
|
})
|
||||||
// Tack the hash onto the error so we can remove the cancel handle
|
// Tack the hash onto the error so we can remove the cancel handle
|
||||||
// on failure as well as on success.
|
// on failure as well as on success.
|
||||||
|
|
@ -335,57 +312,11 @@ where
|
||||||
);
|
);
|
||||||
metrics::gauge!("gossip.queued.transaction.count", self.pending.len() as _);
|
metrics::gauge!("gossip.queued.transaction.count", self.pending.len() as _);
|
||||||
|
|
||||||
DownloadAction::AddedToQueue
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Check if transaction should be downloaded and/or verified.
|
|
||||||
///
|
|
||||||
/// If it is already in the mempool (or in its rejected list)
|
|
||||||
/// or in state, then it shouldn't be downloaded/verified
|
|
||||||
/// (and an error is returned).
|
|
||||||
async fn should_download_or_verify(
|
|
||||||
state: &mut ZS,
|
|
||||||
mempool: &mut ZM,
|
|
||||||
txid: UnminedTxId,
|
|
||||||
) -> Result<(), BoxError> {
|
|
||||||
// Check if the transaction is already in the mempool.
|
|
||||||
match mempool
|
|
||||||
.ready_and()
|
|
||||||
.await?
|
|
||||||
.call(mp::Request::TransactionsById(
|
|
||||||
[txid].iter().cloned().collect(),
|
|
||||||
))
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(mp::Response::Transactions(txs)) => {
|
|
||||||
if txs.is_empty() {
|
|
||||||
Ok(())
|
Ok(())
|
||||||
} else {
|
|
||||||
Err("already present in mempool".into())
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
Ok(_) => unreachable!("wrong response"),
|
|
||||||
Err(e) => Err(e),
|
|
||||||
}?;
|
|
||||||
|
|
||||||
// Check if the transaction is in the mempool rejected list.
|
|
||||||
match mempool
|
|
||||||
.oneshot(mp::Request::RejectedTransactionIds(
|
|
||||||
[txid].iter().cloned().collect(),
|
|
||||||
))
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(mp::Response::RejectedTransactionIds(txs)) => {
|
|
||||||
if txs.is_empty() {
|
|
||||||
Ok(())
|
|
||||||
} else {
|
|
||||||
Err("in mempool rejected list".into())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(_) => unreachable!("wrong response"),
|
|
||||||
Err(e) => Err(e),
|
|
||||||
}?;
|
|
||||||
|
|
||||||
|
/// Check if transaction is already in the state.
|
||||||
|
async fn transaction_in_state(state: &mut ZS, txid: UnminedTxId) -> Result<(), BoxError> {
|
||||||
// Check if the transaction is already in the state.
|
// Check if the transaction is already in the state.
|
||||||
match state
|
match state
|
||||||
.ready_and()
|
.ready_and()
|
||||||
|
|
|
||||||
|
|
@ -25,4 +25,22 @@ pub enum MempoolError {
|
||||||
|
|
||||||
#[error("transaction evicted from the mempool due to size restrictions")]
|
#[error("transaction evicted from the mempool due to size restrictions")]
|
||||||
Excess,
|
Excess,
|
||||||
|
|
||||||
|
#[error("transaction is in the mempool rejected list")]
|
||||||
|
Rejected,
|
||||||
|
|
||||||
|
/// The transaction hash is already queued, so this request was ignored.
|
||||||
|
///
|
||||||
|
/// Another peer has already gossiped the same hash to us, or the mempool crawler has fetched it.
|
||||||
|
#[error("transaction dropped because it is already queued for download")]
|
||||||
|
AlreadyQueued,
|
||||||
|
|
||||||
|
/// The queue is at capacity, so this request was ignored.
|
||||||
|
///
|
||||||
|
/// The mempool crawler should discover this transaction later.
|
||||||
|
/// If it is mined into a block, it will be downloaded by the syncer, or the inbound block downloader.
|
||||||
|
///
|
||||||
|
/// The queue's capacity is [`super::downloads::MAX_INBOUND_CONCURRENCY`].
|
||||||
|
#[error("transaction dropped because the queue is full")]
|
||||||
|
FullQueue,
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -104,6 +104,12 @@ impl Storage {
|
||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns `true` if a [`UnminedTx`] matching an [`UnminedTxId`] is in
|
||||||
|
/// the mempool rejected list.
|
||||||
|
pub fn contains_rejected(self, txid: &UnminedTxId) -> bool {
|
||||||
|
self.rejected.contains_key(txid)
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns the set of [`UnminedTxId`]s matching ids in the rejected list.
|
/// Returns the set of [`UnminedTxId`]s matching ids in the rejected list.
|
||||||
pub fn rejected_transactions(self, tx_ids: HashSet<UnminedTxId>) -> Vec<UnminedTxId> {
|
pub fn rejected_transactions(self, tx_ids: HashSet<UnminedTxId>) -> Vec<UnminedTxId> {
|
||||||
tx_ids
|
tx_ids
|
||||||
|
|
|
||||||
|
|
@ -55,11 +55,21 @@ fn mempool_storage_basic_for_network(network: Network) -> Result<()> {
|
||||||
.map(|tx| tx.id)
|
.map(|tx| tx.id)
|
||||||
.collect();
|
.collect();
|
||||||
// Convert response to a `HashSet` as we need a fixed order to compare.
|
// Convert response to a `HashSet` as we need a fixed order to compare.
|
||||||
let rejected_response: HashSet<UnminedTxId> =
|
let rejected_response: HashSet<UnminedTxId> = storage
|
||||||
storage.rejected_transactions(all_ids).into_iter().collect();
|
.clone()
|
||||||
|
.rejected_transactions(all_ids)
|
||||||
|
.into_iter()
|
||||||
|
.collect();
|
||||||
|
|
||||||
assert_eq!(rejected_response, rejected_ids);
|
assert_eq!(rejected_response, rejected_ids);
|
||||||
|
|
||||||
|
// Use `contains_rejected` to make sure the first id stored is now rejected
|
||||||
|
assert!(storage
|
||||||
|
.clone()
|
||||||
|
.contains_rejected(&unmined_transactions[0].id));
|
||||||
|
// Use `contains_rejected` to make sure the last id stored is not rejected
|
||||||
|
assert!(!storage.contains_rejected(&unmined_transactions[unmined_transactions.len() - 1].id));
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -2,24 +2,40 @@ use super::*;
|
||||||
use color_eyre::Report;
|
use color_eyre::Report;
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
use storage::tests::unmined_transactions_in_blocks;
|
use storage::tests::unmined_transactions_in_blocks;
|
||||||
use tower::ServiceExt;
|
use tower::{ServiceBuilder, ServiceExt};
|
||||||
|
|
||||||
|
use zebra_consensus::Config as ConsensusConfig;
|
||||||
|
use zebra_state::Config as StateConfig;
|
||||||
|
|
||||||
|
use crate::components::tests::mock_peer_set;
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn mempool_service_basic() -> Result<(), Report> {
|
async fn mempool_service_basic() -> Result<(), Report> {
|
||||||
// Using the mainnet for now
|
// Using the mainnet for now
|
||||||
let network = Network::Mainnet;
|
let network = Network::Mainnet;
|
||||||
|
let consensus_config = ConsensusConfig::default();
|
||||||
|
let state_config = StateConfig::ephemeral();
|
||||||
|
let (peer_set, _) = mock_peer_set();
|
||||||
|
|
||||||
|
let (state, _, _) = zebra_state::init(state_config, network);
|
||||||
|
let state_service = ServiceBuilder::new().buffer(1).service(state);
|
||||||
|
let (_chain_verifier, tx_verifier) =
|
||||||
|
zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone())
|
||||||
|
.await;
|
||||||
|
|
||||||
// get the genesis block transactions from the Zcash blockchain.
|
// get the genesis block transactions from the Zcash blockchain.
|
||||||
let genesis_transactions = unmined_transactions_in_blocks(0, network);
|
let genesis_transactions = unmined_transactions_in_blocks(0, network);
|
||||||
// Start the mempool service
|
// Start the mempool service
|
||||||
let mut service = Mempool::new(network);
|
let mut service = Mempool::new(network, peer_set, state_service.clone(), tx_verifier);
|
||||||
// Insert the genesis block coinbase transaction into the mempool storage.
|
// Insert the genesis block coinbase transaction into the mempool storage.
|
||||||
service.storage.insert(genesis_transactions.1[0].clone())?;
|
service.storage.insert(genesis_transactions.1[0].clone())?;
|
||||||
|
|
||||||
// Test `Request::TransactionIds`
|
// Test `Request::TransactionIds`
|
||||||
let response = service
|
let response = service
|
||||||
.clone()
|
.ready_and()
|
||||||
.oneshot(Request::TransactionIds)
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.call(Request::TransactionIds)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let genesis_transaction_ids = match response {
|
let genesis_transaction_ids = match response {
|
||||||
|
|
@ -33,7 +49,9 @@ async fn mempool_service_basic() -> Result<(), Report> {
|
||||||
.copied()
|
.copied()
|
||||||
.collect::<HashSet<_>>();
|
.collect::<HashSet<_>>();
|
||||||
let response = service
|
let response = service
|
||||||
.clone()
|
.ready_and()
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
.oneshot(Request::TransactionsById(
|
.oneshot(Request::TransactionsById(
|
||||||
genesis_transactions_hash_set.clone(),
|
genesis_transactions_hash_set.clone(),
|
||||||
))
|
))
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,29 @@
|
||||||
|
use tokio::sync::mpsc::{self, UnboundedReceiver};
|
||||||
|
use tower::{buffer::Buffer, util::BoxService};
|
||||||
|
use zebra_network::{BoxError, Request, Response};
|
||||||
|
|
||||||
|
/// Create a mock service to represent a [`PeerSet`][zebra_network::PeerSet] and intercept the
|
||||||
|
/// requests it receives.
|
||||||
|
///
|
||||||
|
/// The intercepted requests are sent through an unbounded channel to the receiver that's also
|
||||||
|
/// returned from this function.
|
||||||
|
pub(crate) fn mock_peer_set() -> (
|
||||||
|
Buffer<BoxService<Request, Response, BoxError>, Request>,
|
||||||
|
UnboundedReceiver<Request>,
|
||||||
|
) {
|
||||||
|
let (sender, receiver) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
|
let proxy_service = tower::service_fn(move |request| {
|
||||||
|
let sender = sender.clone();
|
||||||
|
|
||||||
|
async move {
|
||||||
|
let _ = sender.send(request);
|
||||||
|
|
||||||
|
Ok(Response::TransactionIds(vec![]))
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let service = Buffer::new(BoxService::new(proxy_service), 10);
|
||||||
|
|
||||||
|
(service, receiver)
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue