diff --git a/zebrad/src/commands.rs b/zebrad/src/commands.rs index 3509566d..92632270 100644 --- a/zebrad/src/commands.rs +++ b/zebrad/src/commands.rs @@ -1,6 +1,5 @@ //! Zebrad Subcommands -mod connect; mod generate; mod revhex; mod seed; @@ -9,8 +8,7 @@ mod version; use self::ZebradCmd::*; use self::{ - connect::ConnectCmd, generate::GenerateCmd, revhex::RevhexCmd, seed::SeedCmd, start::StartCmd, - version::VersionCmd, + generate::GenerateCmd, revhex::RevhexCmd, seed::SeedCmd, start::StartCmd, version::VersionCmd, }; use crate::config::ZebradConfig; @@ -30,10 +28,6 @@ pub enum ZebradCmd { #[options(help = "generate a skeleton configuration")] Generate(GenerateCmd), - /// The `connect` subcommand - #[options(help = "testing stub for dumping network messages")] - Connect(ConnectCmd), - /// The `help` subcommand #[options(help = "get usage information")] Help(Help), @@ -67,7 +61,7 @@ impl ZebradCmd { match self { // List all the commands, so new commands have to make a choice here Generate(_) | Help(_) | Revhex(_) | Version(_) => true, - Connect(_) | Seed(_) | Start(_) => false, + Seed(_) | Start(_) => false, } } @@ -81,7 +75,7 @@ impl ZebradCmd { pub(crate) fn is_server(&self) -> bool { match self { // List all the commands, so new commands have to make a choice here - Connect(_) | Seed(_) | Start(_) => true, + Seed(_) | Start(_) => true, Generate(_) | Help(_) | Revhex(_) | Version(_) => false, } } diff --git a/zebrad/src/commands/connect.rs b/zebrad/src/commands/connect.rs deleted file mode 100644 index 6a8cc2ca..00000000 --- a/zebrad/src/commands/connect.rs +++ /dev/null @@ -1,216 +0,0 @@ -//! `connect` subcommand - test stub for talking to zcashd - -use crate::{components::tokio::TokioComponent, prelude::*}; - -use abscissa_core::{Command, Options, Runnable}; -use color_eyre::eyre::{eyre, Report, WrapErr}; -use futures::{ - prelude::*, - stream::{FuturesUnordered, StreamExt}, -}; -use std::collections::BTreeSet; -use tower::{buffer::Buffer, service_fn, Service, ServiceExt}; - -use zebra_chain::{block::BlockHeaderHash, types::BlockHeight}; -use zebra_consensus::parameters; - -/// `connect` subcommand -#[derive(Command, Debug, Options)] -pub struct ConnectCmd { - /// The address of the node to connect to. - #[options( - help = "The address of the node to connect to.", - default = "127.0.0.1:8233" - )] - addr: std::net::SocketAddr, -} - -impl Runnable for ConnectCmd { - /// Start the application. - fn run(&self) { - info!(connect.addr = ?self.addr); - - let network = app_config().network.network; - let genesis_hash = parameters::genesis_hash(network); - - let rt = app_writer() - .state_mut() - .components - .get_downcast_mut::() - .expect("TokioComponent should be available") - .rt - .take(); - - let result = rt - .expect("runtime should not already be taken") - .block_on(self.connect(genesis_hash)); - - match result { - Ok(()) => {} - Err(e) => { - eprintln!("Error: {:?}", e); - std::process::exit(1); - } - } - } -} - -impl ConnectCmd { - async fn connect(&self, genesis_hash: BlockHeaderHash) -> Result<(), Report> { - info!(?self, "begin tower-based peer handling test stub"); - - // The service that our node uses to respond to requests by peers - let node = Buffer::new( - service_fn(|req| async move { - info!(?req); - Ok::(zebra_network::Response::Nil) - }), - 1, - ); - - let mut config = app_config().network.clone(); - // Use a different listen addr so that we don't conflict with another local node. - config.listen_addr = "0.0.0.0:38233".parse()?; - // Connect only to the specified peer. - config.initial_mainnet_peers.insert(self.addr.to_string()); - - let state = zebra_state::in_memory::init(); - let (peer_set, _address_book) = zebra_network::init(config, node).await; - let retry_peer_set = tower::retry::Retry::new(zebra_network::RetryErrors, peer_set.clone()); - - let mut downloaded_block_heights = BTreeSet::::new(); - downloaded_block_heights.insert(BlockHeight(0)); - - let mut connect = Connect { - retry_peer_set, - peer_set, - state, - tip: genesis_hash, - block_requests: FuturesUnordered::new(), - requested_block_heights: 0, - downloaded_block_heights, - }; - - connect.connect().await - } -} - -type Error = Box; - -struct Connect -where - ZN: Service, -{ - retry_peer_set: tower::retry::Retry, - peer_set: ZN, - state: ZS, - tip: BlockHeaderHash, - block_requests: FuturesUnordered, - requested_block_heights: usize, - downloaded_block_heights: BTreeSet, -} - -impl Connect -where - ZN: Service - + Send - + Clone - + 'static, - ZN::Future: Send, - ZS: Service - + Send - + Clone - + 'static, - ZS::Future: Send, -{ - async fn connect(&mut self) -> Result<(), Report> { - // TODO(jlusby): Replace with real state service - - while self.requested_block_heights < 700_000 { - let hashes = self.next_hashes().await?; - self.tip = *hashes.last().unwrap(); - - // Request the corresponding blocks in chunks - self.request_blocks(hashes).await?; - - // Allow at most 300 block requests in flight. - self.drain_requests(300).await?; - } - - self.drain_requests(0).await?; - - let eternity = future::pending::<()>(); - eternity.await; - - Ok(()) - } - - async fn next_hashes(&mut self) -> Result, Report> { - // Request the next 500 hashes. - self.retry_peer_set - .ready_and() - .await - .map_err(|e| eyre!(e))? - .call(zebra_network::Request::FindBlocks { - known_blocks: vec![self.tip], - stop: None, - }) - .await - .map_err(|e| eyre!(e)) - .wrap_err("request failed, TODO implement retry") - .map(|response| match response { - zebra_network::Response::BlockHeaderHashes(hashes) => hashes, - _ => unreachable!("FindBlocks always gets a BlockHeaderHashes response"), - }) - .map(|hashes| { - info!( - new_hashes = hashes.len(), - requested = self.requested_block_heights, - in_flight = self.block_requests.len(), - downloaded = self.downloaded_block_heights.len(), - highest = self.downloaded_block_heights.iter().next_back().unwrap().0, - "requested more hashes" - ); - self.requested_block_heights += hashes.len(); - hashes - }) - } - - async fn request_blocks(&mut self, hashes: Vec) -> Result<(), Report> { - for chunk in hashes.chunks(10usize) { - let request = self.peer_set.ready_and().await.map_err(|e| eyre!(e))?.call( - zebra_network::Request::BlocksByHash(chunk.iter().cloned().collect()), - ); - - self.block_requests.push(request); - } - - Ok(()) - } - - async fn drain_requests(&mut self, request_goal: usize) -> Result<(), Report> { - while self.block_requests.len() > request_goal { - match self.block_requests.next().await { - Some(Ok(zebra_network::Response::Blocks(blocks))) => { - for block in blocks { - self.downloaded_block_heights - .insert(block.coinbase_height().unwrap()); - self.state - .ready_and() - .await - .map_err(|e| eyre!(e))? - .call(zebra_state::Request::AddBlock { block }) - .await - .map_err(|e| eyre!(e))?; - } - } - Some(Err(e)) => { - error!(%e); - } - _ => continue, - } - } - - Ok(()) - } -}