diff --git a/zebrad/Cargo.toml b/zebrad/Cargo.toml index e2a13ced..8fc49d63 100644 --- a/zebrad/Cargo.toml +++ b/zebrad/Cargo.toml @@ -55,6 +55,7 @@ once_cell = "1.8" regex = "1.4.6" semver = "1.0.3" tempdir = "0.3.7" +tokio = { version = "0.3.6", features = ["full", "test-util"] } proptest = "0.10" proptest-derive = "0.3" diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index cb3a5514..ed74c45d 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -25,13 +25,14 @@ use abscissa_core::{config, Command, FrameworkError, Options, Runnable}; use color_eyre::eyre::{eyre, Report}; +use futures::{select, FutureExt}; use tokio::sync::oneshot; use tower::builder::ServiceBuilder; use crate::components::{tokio::RuntimeRun, Inbound}; use crate::config::ZebradConfig; use crate::{ - components::{tokio::TokioComponent, ChainSync}, + components::{mempool, tokio::TokioComponent, ChainSync}, prelude::*, }; @@ -79,9 +80,15 @@ impl StartCmd { info!("initializing syncer"); // TODO: use sync_length_receiver to activate the mempool (#2592) - let (syncer, _sync_length_receiver) = ChainSync::new(&config, peer_set, state, verifier); + let (syncer, _sync_length_receiver) = + ChainSync::new(&config, peer_set.clone(), state, verifier); - syncer.sync().await + select! { + result = syncer.sync().fuse() => result, + _ = mempool::Crawler::spawn(peer_set).fuse() => { + unreachable!("The mempool crawler only stops if it panics"); + } + } } } diff --git a/zebrad/src/components.rs b/zebrad/src/components.rs index 1b78d397..29e97d22 100644 --- a/zebrad/src/components.rs +++ b/zebrad/src/components.rs @@ -6,6 +6,7 @@ //! don't fit the async context well. mod inbound; +pub mod mempool; pub mod metrics; mod sync; pub mod tokio; diff --git a/zebrad/src/components/mempool.rs b/zebrad/src/components/mempool.rs new file mode 100644 index 00000000..0c38da71 --- /dev/null +++ b/zebrad/src/components/mempool.rs @@ -0,0 +1,5 @@ +//! Zebra mempool. + +mod crawler; + +pub use self::crawler::Crawler; diff --git a/zebrad/src/components/mempool/crawler.rs b/zebrad/src/components/mempool/crawler.rs new file mode 100644 index 00000000..6203e14a --- /dev/null +++ b/zebrad/src/components/mempool/crawler.rs @@ -0,0 +1,99 @@ +//! Zebra Mempool crawler. +//! +//! The crawler periodically requests transactions from peers in order to populate the mempool. + +use std::time::Duration; + +use futures::{stream, StreamExt, TryStreamExt}; +use tokio::{sync::Mutex, task::JoinHandle, time::sleep}; +use tower::{timeout::Timeout, BoxError, Service, ServiceExt}; + +use zebra_network::{Request, Response}; + +#[cfg(test)] +mod tests; + +/// The number of peers to request transactions from per crawl event. +const FANOUT: usize = 4; + +/// The delay between crawl events. +const RATE_LIMIT_DELAY: Duration = Duration::from_secs(75); + +/// The time to wait for a peer response. +/// +/// # Correctness +/// +/// If this timeout is removed or set too high, the crawler may hang waiting for a peer to respond. +/// +/// If this timeout is set too low, the crawler may fail to populate the mempool. +const PEER_RESPONSE_TIMEOUT: Duration = Duration::from_secs(6); + +/// The mempool transaction crawler. +pub struct Crawler { + peer_set: Mutex>, +} + +impl Crawler +where + S: Service + Clone + Send + 'static, + S::Future: Send, +{ + /// Spawn an asynchronous task to run the mempool crawler. + pub fn spawn(peer_set: S) -> JoinHandle<()> { + let crawler = Crawler { + peer_set: Mutex::new(Timeout::new(peer_set, PEER_RESPONSE_TIMEOUT)), + }; + + tokio::spawn(crawler.run()) + } + + /// Periodically crawl peers for transactions to include in the mempool. + pub async fn run(self) { + loop { + self.wait_until_enabled().await; + self.crawl_transactions().await; + sleep(RATE_LIMIT_DELAY).await; + } + } + + /// Wait until the mempool is enabled. + async fn wait_until_enabled(&self) { + // TODO: Check if synchronizing up to chain tip has finished (#2603). + } + + /// Crawl peers for transactions. + /// + /// Concurrently request [`FANOUT`] peers for transactions to include in the mempool. + async fn crawl_transactions(&self) { + let requests = stream::repeat(Request::MempoolTransactionIds).take(FANOUT); + let peer_set = self.peer_set.lock().await.clone(); + + trace!("Crawling for mempool transactions"); + + peer_set + .call_all(requests) + .unordered() + .and_then(|response| self.handle_response(response)) + // TODO: Reduce the log level of the errors (#2655). + .inspect_err(|error| info!("Failed to crawl peer for mempool transactions: {}", error)) + .for_each(|_| async {}) + .await; + } + + /// Handle a peer's response to the crawler's request for transactions. + async fn handle_response(&self, response: Response) -> Result<(), BoxError> { + let transaction_ids = match response { + Response::TransactionIds(ids) => ids, + _ => unreachable!("Peer set did not respond with transaction IDs to mempool crawler"), + }; + + trace!( + "Mempool crawler received {} transaction IDs", + transaction_ids.len() + ); + + // TODO: Download transactions and send them to the mempool (#2650) + + Ok(()) + } +} diff --git a/zebrad/src/components/mempool/crawler/tests.rs b/zebrad/src/components/mempool/crawler/tests.rs new file mode 100644 index 00000000..35b71d04 --- /dev/null +++ b/zebrad/src/components/mempool/crawler/tests.rs @@ -0,0 +1,76 @@ +use std::time::Duration; + +use tokio::{ + sync::mpsc::{self, UnboundedReceiver}, + time::{self, timeout}, +}; +use tower::{buffer::Buffer, util::BoxService, BoxError}; + +use zebra_network::{Request, Response}; + +use super::{Crawler, FANOUT, RATE_LIMIT_DELAY}; + +/// The number of iterations to crawl while testing. +/// +/// Note that this affects the total run time of the [`crawler_requests_for_transaction_ids`] test. +/// See more information in [`MAX_REQUEST_DELAY`]. +const CRAWL_ITERATIONS: usize = 4; + +/// The maximum time to wait for a request to arrive before considering it won't arrive. +/// +/// Note that this affects the total run time of the [`crawler_requests_for_transaction_ids`] test. +/// There are [`CRAWL_ITERATIONS`] requests that are expected to not be sent, so the test runs for +/// at least `MAX_REQUEST_DELAY * CRAWL_ITERATIONS`. +const MAX_REQUEST_DELAY: Duration = Duration::from_millis(250); + +/// The amount of time to advance beyond the expected instant that the crawler wakes up. +const ERROR_MARGIN: Duration = Duration::from_millis(100); + +#[tokio::test] +async fn crawler_requests_for_transaction_ids() { + let (peer_set, mut requests) = mock_peer_set(); + + Crawler::spawn(peer_set); + + time::pause(); + + for _ in 0..CRAWL_ITERATIONS { + for _ in 0..FANOUT { + let request = timeout(MAX_REQUEST_DELAY, requests.recv()).await; + + assert!(matches!(request, Ok(Some(Request::MempoolTransactionIds)))); + } + + let extra_request = timeout(MAX_REQUEST_DELAY, requests.recv()).await; + + assert!(extra_request.is_err()); + + time::advance(RATE_LIMIT_DELAY + ERROR_MARGIN).await; + } +} + +/// Create a mock service to represent a [`PeerSet`][zebra_network::PeerSet] and intercept the +/// requests it receives. +/// +/// The intercepted requests are sent through an unbounded channel to the receiver that's also +/// returned from this function. +fn mock_peer_set() -> ( + Buffer, Request>, + UnboundedReceiver, +) { + let (sender, receiver) = mpsc::unbounded_channel(); + + let proxy_service = tower::service_fn(move |request| { + let sender = sender.clone(); + + async move { + let _ = sender.send(request); + + Ok(Response::TransactionIds(vec![])) + } + }); + + let service = Buffer::new(BoxService::new(proxy_service), 10); + + (service, receiver) +}