From 1d0ebf89c611ba31635b05fa453d83ab7b2f077a Mon Sep 17 00:00:00 2001 From: Henry de Valence Date: Fri, 18 Sep 2020 12:18:22 -0700 Subject: [PATCH] zebrad: move seed command into inbound component Remove the seed command entirely, and make the behavior it provided (responding to `Request::Peers`) part of the ordinary functioning of the start command. The new `Inbound` service should be expanded to handle all request types. --- book/src/user/run.md | 2 - zebrad/src/commands.rs | 11 +-- zebrad/src/commands/seed.rs | 144 ------------------------------- zebrad/src/commands/start.rs | 41 +++++---- zebrad/src/components.rs | 2 + zebrad/src/components/inbound.rs | 98 +++++++++++++++++++++ zebrad/tests/acceptance.rs | 49 ----------- 7 files changed, 127 insertions(+), 220 deletions(-) delete mode 100644 zebrad/src/commands/seed.rs create mode 100644 zebrad/src/components/inbound.rs diff --git a/book/src/user/run.md b/book/src/user/run.md index b35dbd00..32a6d9ef 100644 --- a/book/src/user/run.md +++ b/book/src/user/run.md @@ -10,8 +10,6 @@ structure, and documentation for all of the config options can be found [here](https://doc.zebra.zfnd.org/zebrad/config/struct.ZebradConfig.html). * `zebrad start` starts a full node. -* `zebrad seed` starts a crawler that can power a DNS seeder, but does not - attempt to sync the chain state. ## Return Codes diff --git a/zebrad/src/commands.rs b/zebrad/src/commands.rs index 1809029a..85790856 100644 --- a/zebrad/src/commands.rs +++ b/zebrad/src/commands.rs @@ -2,14 +2,11 @@ mod generate; mod revhex; -mod seed; mod start; mod version; use self::ZebradCmd::*; -use self::{ - generate::GenerateCmd, revhex::RevhexCmd, seed::SeedCmd, start::StartCmd, version::VersionCmd, -}; +use self::{generate::GenerateCmd, revhex::RevhexCmd, start::StartCmd, version::VersionCmd}; use crate::config::ZebradConfig; @@ -36,10 +33,6 @@ pub enum ZebradCmd { #[options(help = "reverses the endianness of a hex string, like a block or transaction hash")] Revhex(RevhexCmd), - /// The `seed` subcommand - #[options(help = "dns seeder")] - Seed(SeedCmd), - /// The `start` subcommand #[options(help = "start the application")] Start(StartCmd), @@ -56,7 +49,7 @@ impl ZebradCmd { pub(crate) fn is_server(&self) -> bool { match self { // List all the commands, so new commands have to make a choice here - Seed(_) | Start(_) => true, + Start(_) => true, Generate(_) | Help(_) | Revhex(_) | Version(_) => false, } } diff --git a/zebrad/src/commands/seed.rs b/zebrad/src/commands/seed.rs deleted file mode 100644 index 379e676a..00000000 --- a/zebrad/src/commands/seed.rs +++ /dev/null @@ -1,144 +0,0 @@ -//! `seed` subcommand - runs a dns seeder - -use std::{ - future::Future, - pin::Pin, - sync::{Arc, Mutex}, - task::{Context, Poll}, -}; - -use abscissa_core::{Command, Options, Runnable}; -use futures::{channel::oneshot, prelude::*}; -use tower::{buffer::Buffer, Service, ServiceExt}; - -use zebra_network::{AddressBook, BoxError, Request, Response}; - -use crate::components::tokio::RuntimeRun; -use crate::prelude::*; -use color_eyre::eyre::{eyre, Report}; - -/// Whether our `SeedService` is poll_ready or not. -#[derive(Debug)] -enum SeederState { - /// Waiting for the address book to be shared with us via the oneshot channel. - AwaitingAddressBook(oneshot::Receiver>>), - /// Address book received, ready to service requests. - Ready(Arc>), -} - -#[derive(Debug)] -struct SeedService { - state: SeederState, -} - -impl Service for SeedService { - type Response = Response; - type Error = BoxError; - type Future = - Pin> + Send + 'static>>; - - #[instrument(skip(self, _cx))] - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { - match self.state { - SeederState::Ready(_) => Poll::Ready(Ok(())), - SeederState::AwaitingAddressBook(ref mut rx) => match rx.try_recv() { - Err(e) => { - error!("oneshot sender dropped, failing service: {:?}", e); - Poll::Ready(Err(e.into())) - } - Ok(None) => { - trace!("awaiting address book, service is unready"); - Poll::Pending - } - Ok(Some(address_book)) => { - debug!("received address_book via oneshot, service becomes ready"); - self.state = SeederState::Ready(address_book); - Poll::Ready(Ok(())) - } - }, - } - } - - // Note: the generated span applies only to this function, not - // to the future, but this is OK because the current implementation - // is not actually async. - #[instrument] - fn call(&mut self, req: Request) -> Self::Future { - let address_book = if let SeederState::Ready(address_book) = &self.state { - address_book - } else { - panic!("SeedService::call without SeedService::poll_ready"); - }; - - let response = match req { - Request::Peers => { - debug!("selecting peers to gossip"); - let mut peers = address_book.lock().unwrap().sanitized(); - // truncate the list so that we do not trivially reveal our entire peer set. - peers.truncate(50); - Ok(Response::Peers(peers)) - } - _ => { - debug!("ignoring request"); - Ok(Response::Nil) - } - }; - Box::pin(futures::future::ready(response)) - } -} - -/// `seed` subcommand -/// -/// A DNS seeder command to spider and collect as many valid peer -/// addresses as we can. -// This is not a unit-like struct because it makes Command and Options sad. -#[derive(Command, Debug, Default, Options)] -pub struct SeedCmd {} - -impl Runnable for SeedCmd { - /// Start the application. - fn run(&self) { - info!("Starting zebrad in seed mode"); - - use crate::components::tokio::TokioComponent; - - let rt = app_writer() - .state_mut() - .components - .get_downcast_mut::() - .expect("TokioComponent should be available") - .rt - .take(); - - rt.expect("runtime should not already be taken") - .run(self.seed()); - } -} - -impl SeedCmd { - async fn seed(&self) -> Result<(), Report> { - info!("begin tower-based peer handling test stub"); - - let (addressbook_tx, addressbook_rx) = oneshot::channel(); - let seed_service = SeedService { - state: SeederState::AwaitingAddressBook(addressbook_rx), - }; - let buffered_svc = Buffer::new(seed_service, 1); - - let config = app_config().network.clone(); - - let (mut peer_set, address_book) = zebra_network::init(config, buffered_svc).await; - - let _ = addressbook_tx.send(address_book); - - info!("waiting for peer_set ready"); - peer_set.ready_and().await.map_err(|e| eyre!(e))?; - - info!("peer_set became ready"); - - let eternity = future::pending::<()>(); - eternity.await; - - Ok(()) - } -} diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index c25ea55e..e61c0ebe 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -19,17 +19,18 @@ //! * This task runs in the background and continuously queries the network for //! new blocks to be verified and added to the local state -use crate::components::tokio::RuntimeRun; +use abscissa_core::{config, Command, FrameworkError, Options, Runnable}; +use color_eyre::eyre::{eyre, Report}; +use tokio::sync::oneshot; +use tower::builder::ServiceBuilder; + +use crate::components::{tokio::RuntimeRun, Inbound}; use crate::config::ZebradConfig; use crate::{ components::{tokio::TokioComponent, ChainSync}, prelude::*, }; -use abscissa_core::{config, Command, FrameworkError, Options, Runnable}; -use color_eyre::eyre::Report; -use tower::{buffer::Buffer, service_fn}; - /// `start` subcommand #[derive(Command, Debug, Options)] pub struct StartCmd { @@ -40,10 +41,13 @@ pub struct StartCmd { impl StartCmd { async fn start(&self) -> Result<(), Report> { - info!(?self, "starting to connect to the network"); + let config = app_config().clone(); + info!(?config); - let config = app_config(); + info!("initializing node state"); let state = zebra_state::init(config.state.clone(), config.network.network); + + info!("initializing chain verifier"); let verifier = zebra_consensus::chain::init( config.consensus.clone(), config.network.network, @@ -51,16 +55,21 @@ impl StartCmd { ) .await; - // The service that our node uses to respond to requests by peers - let node = Buffer::new( - service_fn(|req| async move { - debug!(?req, "inbound peer request"); - Ok::(zebra_network::Response::Nil) - }), - 1, - ); - let (peer_set, _address_book) = zebra_network::init(config.network.clone(), node).await; + info!("initializing network"); + // The service that our node uses to respond to requests by peers + let (setup_tx, setup_rx) = oneshot::channel(); + let inbound = ServiceBuilder::new() + .load_shed() + .buffer(20) + .service(Inbound::new(setup_rx)); + + let (peer_set, address_book) = zebra_network::init(config.network.clone(), inbound).await; + setup_tx + .send((peer_set.clone(), address_book)) + .map_err(|_| eyre!("could not send setup data to inbound service"))?; + + info!("initializing syncer"); let mut syncer = ChainSync::new(config.network.network, peer_set, state, verifier); syncer.sync().await diff --git a/zebrad/src/components.rs b/zebrad/src/components.rs index a53e52e9..a6861489 100644 --- a/zebrad/src/components.rs +++ b/zebrad/src/components.rs @@ -1,6 +1,8 @@ +mod inbound; pub mod metrics; mod sync; pub mod tokio; pub mod tracing; +pub use inbound::Inbound; pub use sync::ChainSync; diff --git a/zebrad/src/components/inbound.rs b/zebrad/src/components/inbound.rs new file mode 100644 index 00000000..04b36653 --- /dev/null +++ b/zebrad/src/components/inbound.rs @@ -0,0 +1,98 @@ +use std::{ + future::Future, + pin::Pin, + sync::{Arc, Mutex}, + task::{Context, Poll}, +}; + +use futures::future::FutureExt; +use tokio::sync::oneshot; +use tower::{buffer::Buffer, util::BoxService, Service}; + +use zebra_network as zn; +use zebra_network::AddressBook; + +type Outbound = Buffer, zn::Request>; + +pub type SetupData = (Outbound, Arc>); + +pub struct Inbound { + // invariant: outbound, address_book are Some if network_setup is None + // + // why not use an enum for the inbound state? because it would mean + // match-wrapping the body of Service::call rather than just expect()ing + // some Options. + network_setup: Option>, + outbound: Option, + address_book: Option>>, +} + +impl Inbound { + pub fn new(network_setup: oneshot::Receiver) -> Self { + Self { + network_setup: Some(network_setup), + outbound: None, + address_book: None, + } + } +} + +impl Service for Inbound { + type Response = zn::Response; + type Error = zn::BoxError; + type Future = + Pin> + Send + 'static>>; + + #[instrument(skip(self, _cx))] + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + use oneshot::error::TryRecvError; + match self.network_setup.take() { + Some(mut rx) => match rx.try_recv() { + Ok((outbound, address_book)) => { + self.outbound = Some(outbound); + self.address_book = Some(address_book); + self.network_setup = None; + Poll::Ready(Ok(())) + } + Err(e @ TryRecvError::Closed) => { + // returning poll_ready(err) means that poll_ready should + // never be called again, but put the oneshot back so we + // error again in case someone does. + self.network_setup = Some(rx); + Poll::Ready(Err(e.into())) + } + Err(TryRecvError::Empty) => { + self.network_setup = Some(rx); + Poll::Pending + } + }, + None => Poll::Ready(Ok(())), + } + } + + #[instrument(skip(self))] + fn call(&mut self, req: zn::Request) -> Self::Future { + match req { + zn::Request::Peers => { + // We could truncate the list to try to not reveal our entire + // peer set. But because we don't monitor repeated requests, + // this wouldn't actually achieve anything, because a crawler + // could just repeatedly query it. + let mut peers = self + .address_book + .as_ref() + .unwrap() + .lock() + .unwrap() + .sanitized(); + const MAX_ADDR: usize = 1000; // bitcoin protocol constant + peers.truncate(MAX_ADDR); + async { Ok(zn::Response::Peers(peers)) }.boxed() + } + _ => { + debug!("ignoring unimplemented request"); + async { Ok(zn::Response::Nil) }.boxed() + } + } + } +} diff --git a/zebrad/tests/acceptance.rs b/zebrad/tests/acceptance.rs index 30d317e0..f47dc760 100644 --- a/zebrad/tests/acceptance.rs +++ b/zebrad/tests/acceptance.rs @@ -218,52 +218,6 @@ fn revhex_args() -> Result<()> { Ok(()) } -#[test] -fn seed_no_args() -> Result<()> { - zebra_test::init(); - let testdir = testdir()?.with_config(default_test_config()?)?; - - let mut child = testdir.spawn_child(&["-v", "seed"])?; - - // Run the program and kill it at 1 second - std::thread::sleep(Duration::from_secs(1)); - child.kill()?; - - let output = child.wait_with_output()?; - let output = output.assert_failure()?; - - output.stdout_contains(r"Starting zebrad in seed mode")?; - - // Make sure the command was killed - output.assert_was_killed()?; - - Ok(()) -} - -#[test] -fn seed_args() -> Result<()> { - zebra_test::init(); - let testdir = testdir()?.with_config(default_test_config()?)?; - let testdir = &testdir; - - // unexpected free argument `argument` - let child = testdir.spawn_child(&["seed", "argument"])?; - let output = child.wait_with_output()?; - output.assert_failure()?; - - // unrecognized option `-f` - let child = testdir.spawn_child(&["seed", "-f"])?; - let output = child.wait_with_output()?; - output.assert_failure()?; - - // unexpected free argument `start` - let child = testdir.spawn_child(&["seed", "start"])?; - let output = child.wait_with_output()?; - output.assert_failure()?; - - Ok(()) -} - #[test] fn start_no_args() -> Result<()> { zebra_test::init(); @@ -279,8 +233,6 @@ fn start_no_args() -> Result<()> { let output = child.wait_with_output()?; let output = output.assert_failure()?; - // start is the default mode, so we check for end of line, to distinguish it - // from seed output.stdout_contains(r"Starting zebrad$")?; // Make sure the command was killed @@ -455,7 +407,6 @@ fn valid_generated_config_test() -> Result<()> { // they use the generated config. So parallel execution can cause port and // cache conflicts. valid_generated_config("start", r"Starting zebrad$")?; - valid_generated_config("seed", r"Starting zebrad in seed mode")?; Ok(()) }