From df656a8bf02671a4065d668a1f26536d53639389 Mon Sep 17 00:00:00 2001 From: Jane Lusby Date: Fri, 12 Jun 2020 09:20:58 -0700 Subject: [PATCH] Reorganize `connect` subcommand for readibility (#450) --- zebrad/src/commands/connect.rs | 234 +++++++++++++++++++-------------- 1 file changed, 137 insertions(+), 97 deletions(-) diff --git a/zebrad/src/commands/connect.rs b/zebrad/src/commands/connect.rs index db804602..95a822bf 100644 --- a/zebrad/src/commands/connect.rs +++ b/zebrad/src/commands/connect.rs @@ -1,12 +1,22 @@ //! `connect` subcommand - test stub for talking to zcashd -use crate::prelude::*; - +use crate::{components::tokio::TokioComponent, prelude::*}; use abscissa_core::{Command, Options, Runnable}; - use color_eyre::Report; -use eyre::eyre; -use futures::prelude::*; +use eyre::{eyre, WrapErr}; +use futures::{ + prelude::*, + stream::{FuturesUnordered, StreamExt}, +}; +use std::collections::BTreeSet; +use tower::{buffer::Buffer, service_fn, Service, ServiceExt}; +use zebra_chain::{block::BlockHeaderHash, types::BlockHeight}; + +// genesis +static GENESIS: BlockHeaderHash = BlockHeaderHash([ + 8, 206, 61, 151, 49, 176, 0, 192, 131, 56, 69, 92, 138, 74, 107, 208, 93, 161, 110, 38, 177, + 29, 170, 27, 145, 113, 132, 236, 232, 15, 4, 0, +]); /// `connect` subcommand #[derive(Command, Debug, Options)] @@ -24,7 +34,6 @@ impl Runnable for ConnectCmd { fn run(&self) { info!(connect.addr = ?self.addr); - use crate::components::tokio::TokioComponent; let rt = app_writer() .state_mut() .components @@ -49,8 +58,7 @@ impl Runnable for ConnectCmd { impl ConnectCmd { async fn connect(&self) -> Result<(), Report> { - info!("begin tower-based peer handling test stub"); - use tower::{buffer::Buffer, service_fn, Service, ServiceExt}; + info!(?self, "begin tower-based peer handling test stub"); // The service that our node uses to respond to requests by peers let node = Buffer::new( @@ -67,111 +75,143 @@ impl ConnectCmd { // Connect only to the specified peer. config.initial_mainnet_peers.insert(self.addr.to_string()); - let mut state = zebra_state::in_memory::init(); - let (mut peer_set, _address_book) = zebra_network::init(config, node).await; - let mut retry_peer_set = - tower::retry::Retry::new(zebra_network::RetryErrors, peer_set.clone()); + let state = zebra_state::in_memory::init(); + let (peer_set, _address_book) = zebra_network::init(config, node).await; + let retry_peer_set = tower::retry::Retry::new(zebra_network::RetryErrors, peer_set.clone()); - info!("waiting for peer_set ready"); - peer_set.ready_and().await.map_err(|e| eyre!(e))?; - - info!("peer_set became ready"); - - use futures::stream::{FuturesUnordered, StreamExt}; - use std::collections::BTreeSet; - use zebra_chain::block::BlockHeaderHash; - use zebra_chain::types::BlockHeight; - - // genesis - let mut tip = BlockHeaderHash([ - 8, 206, 61, 151, 49, 176, 0, 192, 131, 56, 69, 92, 138, 74, 107, 208, 93, 161, 110, 38, - 177, 29, 170, 27, 145, 113, 132, 236, 232, 15, 4, 0, - ]); - - // TODO(jlusby): Replace with real state service let mut downloaded_block_heights = BTreeSet::::new(); downloaded_block_heights.insert(BlockHeight(0)); - let mut block_requests = FuturesUnordered::new(); - let mut requested_block_heights = 0; + let mut connect = Connect { + retry_peer_set, + peer_set, + state, + tip: GENESIS, + block_requests: FuturesUnordered::new(), + requested_block_heights: 0, + downloaded_block_heights, + }; - while requested_block_heights < 700_000 { - // Request the next 500 hashes. - let hashes = if let Ok(zebra_network::Response::BlockHeaderHashes(hashes)) = - retry_peer_set - .ready_and() - .await - .map_err(|e| eyre!(e))? - .call(zebra_network::Request::FindBlocks { - known_blocks: vec![tip], - stop: None, - }) - .await - { - info!( - new_hashes = hashes.len(), - requested = requested_block_heights, - in_flight = block_requests.len(), - downloaded = downloaded_block_heights.len(), - highest = downloaded_block_heights.iter().next_back().unwrap().0, - "requested more hashes" - ); - requested_block_heights += hashes.len(); - hashes - } else { - panic!("request failed, TODO implement retry"); - }; + connect.connect().await + } +} - tip = *hashes.last().unwrap(); +type Error = Box; + +struct Connect +where + ZN: Service, +{ + retry_peer_set: tower::retry::Retry, + peer_set: ZN, + state: ZS, + tip: BlockHeaderHash, + block_requests: FuturesUnordered, + requested_block_heights: usize, + downloaded_block_heights: BTreeSet, +} + +impl Connect +where + ZN: Service + + Send + + Clone + + 'static, + ZN::Future: Send, + ZS: Service + + Send + + Clone + + 'static, + ZS::Future: Send, +{ + async fn connect(&mut self) -> Result<(), Report> { + // TODO(jlusby): Replace with real state service + + while self.requested_block_heights < 700_000 { + let hashes = self.next_hashes().await?; + self.tip = *hashes.last().unwrap(); // Request the corresponding blocks in chunks - for chunk in hashes.chunks(10usize) { - let request = peer_set.ready_and().await.map_err(|e| eyre!(e))?.call( - zebra_network::Request::BlocksByHash(chunk.iter().cloned().collect()), - ); - - block_requests.push(request); - } + self.request_blocks(hashes).await?; // Allow at most 300 block requests in flight. - while block_requests.len() > 300 { - match block_requests.next().await { - Some(Ok(zebra_network::Response::Blocks(blocks))) => { - for block in blocks { - downloaded_block_heights.insert(block.coinbase_height().unwrap()); - state - .ready_and() - .await - .map_err(|e| eyre!(e))? - .call(zebra_state::Request::AddBlock { block }) - .await - .map_err(|e| eyre!(e))?; - } - } - Some(Err(e)) => { - error!(%e); - } - _ => continue, - } - } + self.drain_requests(300).await?; } - while let Some(Ok(zebra_network::Response::Blocks(blocks))) = block_requests.next().await { - for block in blocks { - downloaded_block_heights.insert(block.coinbase_height().unwrap()); - state - .ready_and() - .await - .map_err(|e| eyre!(e))? - .call(zebra_state::Request::AddBlock { block }) - .await - .map_err(|e| eyre!(e))?; - } - } + self.drain_requests(0).await?; let eternity = future::pending::<()>(); eternity.await; Ok(()) } + + async fn next_hashes(&mut self) -> Result, Report> { + // Request the next 500 hashes. + self.retry_peer_set + .ready_and() + .await + .map_err(|e| eyre!(e))? + .call(zebra_network::Request::FindBlocks { + known_blocks: vec![self.tip], + stop: None, + }) + .await + .map_err(|e| eyre!(e)) + .wrap_err("request failed, TODO implement retry") + .map(|response| match response { + zebra_network::Response::BlockHeaderHashes(hashes) => hashes, + _ => unreachable!("FindBlocks always gets a BlockHeaderHashes response"), + }) + .map(|hashes| { + info!( + new_hashes = hashes.len(), + requested = self.requested_block_heights, + in_flight = self.block_requests.len(), + downloaded = self.downloaded_block_heights.len(), + highest = self.downloaded_block_heights.iter().next_back().unwrap().0, + "requested more hashes" + ); + self.requested_block_heights += hashes.len(); + hashes + }) + } + + async fn request_blocks(&mut self, hashes: Vec) -> Result<(), Report> { + for chunk in hashes.chunks(10usize) { + let request = self.peer_set.ready_and().await.map_err(|e| eyre!(e))?.call( + zebra_network::Request::BlocksByHash(chunk.iter().cloned().collect()), + ); + + self.block_requests.push(request); + } + + Ok(()) + } + + async fn drain_requests(&mut self, request_goal: usize) -> Result<(), Report> { + while self.block_requests.len() > request_goal { + match self.block_requests.next().await { + Some(Ok(zebra_network::Response::Blocks(blocks))) => { + for block in blocks { + self.downloaded_block_heights + .insert(block.coinbase_height().unwrap()); + self.state + .ready_and() + .await + .map_err(|e| eyre!(e))? + .call(zebra_state::Request::AddBlock { block }) + .await + .map_err(|e| eyre!(e))?; + } + } + Some(Err(e)) => { + error!(%e); + } + _ => continue, + } + } + + Ok(()) + } }