Reply to `Request::MempoolTransactionIds` with mempool content (#2720)
* reply to `Request::MempoolTransactionIds` * remove boilerplate * get storage from mempool with a method * change panic message * try fix for mac * use normal init instead of init_tests for state service * newline * rustfmt * fix test build
This commit is contained in:
parent
44ac06775b
commit
9c220afdc8
|
|
@ -64,6 +64,9 @@ impl StartCmd {
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
info!("initializing mempool");
|
||||||
|
let mempool = mempool::Mempool::new(config.network.network);
|
||||||
|
|
||||||
info!("initializing network");
|
info!("initializing network");
|
||||||
// The service that our node uses to respond to requests by peers. The
|
// The service that our node uses to respond to requests by peers. The
|
||||||
// load_shed middleware ensures that we reduce the size of the peer set
|
// load_shed middleware ensures that we reduce the size of the peer set
|
||||||
|
|
@ -77,6 +80,7 @@ impl StartCmd {
|
||||||
state.clone(),
|
state.clone(),
|
||||||
chain_verifier.clone(),
|
chain_verifier.clone(),
|
||||||
tx_verifier.clone(),
|
tx_verifier.clone(),
|
||||||
|
mempool,
|
||||||
));
|
));
|
||||||
|
|
||||||
let (peer_set, address_book) =
|
let (peer_set, address_book) =
|
||||||
|
|
|
||||||
|
|
@ -25,9 +25,15 @@ use super::mempool::downloads::{
|
||||||
Downloads as TxDownloads, TRANSACTION_DOWNLOAD_TIMEOUT, TRANSACTION_VERIFY_TIMEOUT,
|
Downloads as TxDownloads, TRANSACTION_DOWNLOAD_TIMEOUT, TRANSACTION_VERIFY_TIMEOUT,
|
||||||
};
|
};
|
||||||
// Re-use the syncer timeouts for consistency.
|
// Re-use the syncer timeouts for consistency.
|
||||||
use super::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT};
|
use super::{
|
||||||
|
mempool,
|
||||||
|
sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT},
|
||||||
|
};
|
||||||
|
|
||||||
mod downloads;
|
mod downloads;
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests;
|
||||||
|
|
||||||
use downloads::Downloads as BlockDownloads;
|
use downloads::Downloads as BlockDownloads;
|
||||||
|
|
||||||
type Outbound = Buffer<BoxService<zn::Request, zn::Response, zn::BoxError>, zn::Request>;
|
type Outbound = Buffer<BoxService<zn::Request, zn::Response, zn::BoxError>, zn::Request>;
|
||||||
|
|
@ -126,6 +132,9 @@ pub struct Inbound {
|
||||||
|
|
||||||
/// A service that manages cached blockchain state.
|
/// A service that manages cached blockchain state.
|
||||||
state: State,
|
state: State,
|
||||||
|
|
||||||
|
/// A service that manages transactions in the memory pool.
|
||||||
|
mempool: mempool::Mempool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Inbound {
|
impl Inbound {
|
||||||
|
|
@ -134,6 +143,7 @@ impl Inbound {
|
||||||
state: State,
|
state: State,
|
||||||
block_verifier: BlockVerifier,
|
block_verifier: BlockVerifier,
|
||||||
tx_verifier: TxVerifier,
|
tx_verifier: TxVerifier,
|
||||||
|
mempool: mempool::Mempool,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
network_setup: Setup::AwaitingNetwork {
|
network_setup: Setup::AwaitingNetwork {
|
||||||
|
|
@ -142,6 +152,7 @@ impl Inbound {
|
||||||
tx_verifier,
|
tx_verifier,
|
||||||
},
|
},
|
||||||
state,
|
state,
|
||||||
|
mempool,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -372,8 +383,11 @@ impl Service<zn::Request> for Inbound {
|
||||||
async { Ok(zn::Response::Nil) }.boxed()
|
async { Ok(zn::Response::Nil) }.boxed()
|
||||||
}
|
}
|
||||||
zn::Request::MempoolTransactionIds => {
|
zn::Request::MempoolTransactionIds => {
|
||||||
debug!("ignoring unimplemented request");
|
self.mempool.clone().oneshot(mempool::Request::TransactionIds).map_ok(|resp| match resp {
|
||||||
async { Ok(zn::Response::Nil) }.boxed()
|
mempool::Response::TransactionIds(transaction_ids) => zn::Response::TransactionIds(transaction_ids),
|
||||||
|
_ => unreachable!("Mempool component should always respond to a `TransactionIds` request with a `TransactionIds` response"),
|
||||||
|
})
|
||||||
|
.boxed()
|
||||||
}
|
}
|
||||||
zn::Request::Ping(_) => {
|
zn::Request::Ping(_) => {
|
||||||
unreachable!("ping requests are handled internally");
|
unreachable!("ping requests are handled internally");
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,69 @@
|
||||||
|
use tower::ServiceExt;
|
||||||
|
|
||||||
|
use super::mempool::{unmined_transactions_in_blocks, Mempool};
|
||||||
|
|
||||||
|
use tokio::sync::oneshot;
|
||||||
|
use tower::builder::ServiceBuilder;
|
||||||
|
|
||||||
|
use zebra_chain::{
|
||||||
|
parameters::Network,
|
||||||
|
transaction::{UnminedTx, UnminedTxId},
|
||||||
|
};
|
||||||
|
use zebra_consensus::Config as ConsensusConfig;
|
||||||
|
use zebra_network::{Request, Response};
|
||||||
|
use zebra_state::Config as StateConfig;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn mempool_requests_for_transaction_ids() {
|
||||||
|
let network = Network::Mainnet;
|
||||||
|
let consensus_config = ConsensusConfig::default();
|
||||||
|
let state_config = StateConfig::ephemeral();
|
||||||
|
|
||||||
|
let (state, _, _) = zebra_state::init(state_config, network);
|
||||||
|
let state_service = ServiceBuilder::new().buffer(1).service(state);
|
||||||
|
let mut mempool_service = Mempool::new(network);
|
||||||
|
|
||||||
|
let added_transaction_ids: Vec<UnminedTxId> =
|
||||||
|
add_some_stuff_to_mempool(&mut mempool_service, network)
|
||||||
|
.iter()
|
||||||
|
.map(|t| t.id)
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let (block_verifier, transaction_verifier) =
|
||||||
|
zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone())
|
||||||
|
.await;
|
||||||
|
let (_setup_tx, setup_rx) = oneshot::channel();
|
||||||
|
|
||||||
|
let inbound_service = ServiceBuilder::new()
|
||||||
|
.load_shed()
|
||||||
|
.buffer(1)
|
||||||
|
.service(super::Inbound::new(
|
||||||
|
setup_rx,
|
||||||
|
state_service,
|
||||||
|
block_verifier.clone(),
|
||||||
|
transaction_verifier.clone(),
|
||||||
|
mempool_service,
|
||||||
|
));
|
||||||
|
|
||||||
|
let request = inbound_service
|
||||||
|
.oneshot(Request::MempoolTransactionIds)
|
||||||
|
.await;
|
||||||
|
match request {
|
||||||
|
Ok(Response::TransactionIds(response)) => assert_eq!(response, added_transaction_ids),
|
||||||
|
_ => unreachable!(
|
||||||
|
"`MempoolTransactionIds` requests should always respond `Ok(Vec<UnminedTxId>)`"
|
||||||
|
),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
fn add_some_stuff_to_mempool(mempool_service: &mut Mempool, network: Network) -> Vec<UnminedTx> {
|
||||||
|
// get the genesis block transactions from the Zcash blockchain.
|
||||||
|
let genesis_transactions = unmined_transactions_in_blocks(0, network);
|
||||||
|
// Insert the genesis block coinbase transaction into the mempool storage.
|
||||||
|
mempool_service
|
||||||
|
.storage()
|
||||||
|
.insert(genesis_transactions.1[0].clone())
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
genesis_transactions.1
|
||||||
|
}
|
||||||
|
|
@ -27,6 +27,8 @@ mod tests;
|
||||||
|
|
||||||
pub use self::crawler::Crawler;
|
pub use self::crawler::Crawler;
|
||||||
pub use self::error::MempoolError;
|
pub use self::error::MempoolError;
|
||||||
|
#[cfg(test)]
|
||||||
|
pub use self::storage::tests::unmined_transactions_in_blocks;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
|
|
@ -62,6 +64,12 @@ impl Mempool {
|
||||||
storage: Default::default(),
|
storage: Default::default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get the storage field of the mempool for testing purposes.
|
||||||
|
#[cfg(test)]
|
||||||
|
pub fn storage(&mut self) -> &mut storage::Storage {
|
||||||
|
&mut self.storage
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Service<Request> for Mempool {
|
impl Service<Request> for Mempool {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue