diff --git a/zebra-network/src/peer_set.rs b/zebra-network/src/peer_set.rs index a87e2c68..4577a958 100644 --- a/zebra-network/src/peer_set.rs +++ b/zebra-network/src/peer_set.rs @@ -9,3 +9,76 @@ mod unready_service; pub use discover::PeerDiscover; pub use set::PeerSet; + +use std::pin::Pin; + +use futures::future::Future; +use tower::Service; + +use crate::config::Config; +use crate::protocol::internal::{Request, Response}; +use crate::BoxedStdError; +use crate::{peer::PeerConnector, timestamp_collector::TimestampCollector}; + +type BoxedZebraService = Box< + dyn Service< + Request, + Response = Response, + Error = BoxedStdError, + Future = Pin> + Send>>, + > + Send + + 'static, +>; + +/// Initialize a peer set with the given `config`, forwarding peer requests to the `inbound_service`. +pub fn init(config: Config, inbound_service: S) -> (BoxedZebraService, TimestampCollector) +where + S: Service + Clone + Send + 'static, + S::Future: Send, +{ + use futures::{ + future, + stream::{FuturesUnordered, StreamExt}, + }; + use tokio::net::TcpStream; + use tower::{ + buffer::Buffer, + discover::{Change, ServiceStream}, + ServiceExt, + }; + use tower_load::{peak_ewma::PeakEwmaDiscover, NoInstrument}; + + let tc = TimestampCollector::new(); + let pc = Buffer::new(PeerConnector::new(config.clone(), inbound_service, &tc), 1); + + // construct a stream of services XXX currently the stream is based on a + // static stream from config.initial_peers; this should be replaced with a + // channel that starts with initial_peers but also accetps incoming, dials + // new, etc. + let client_stream = PeakEwmaDiscover::new( + ServiceStream::new( + config + .initial_peers + .into_iter() + .map(|addr| { + let mut pc = pc.clone(); + async move { + let stream = TcpStream::connect(addr).await?; + pc.ready().await?; + let client = pc.call((stream, addr)).await?; + Ok::<_, BoxedStdError>(Change::Insert(addr, client)) + } + }) + .collect::>() + // Discard any errored connections... + .filter(|result| future::ready(result.is_ok())), + ), + config.ewma_default_rtt, + config.ewma_decay_time, + NoInstrument, + ); + + let peer_set = PeerSet::new(client_stream); + + (Box::new(peer_set), tc) +} diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index 5b7af917..27c05a30 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -216,7 +216,8 @@ where { type Response = Response; type Error = BoxedStdError; - type Future = Pin>>>; + type Future = + Pin> + Send + 'static>>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { // Process peer discovery updates. diff --git a/zebrad/src/commands/connect.rs b/zebrad/src/commands/connect.rs index 66f1347b..24bc3146 100644 --- a/zebrad/src/commands/connect.rs +++ b/zebrad/src/commands/connect.rs @@ -73,76 +73,57 @@ impl ConnectCmd { 1, ); - use tokio::net::TcpStream; + let mut config = app_config().network.clone(); - let config = app_config().network.clone(); - let collector = TimestampCollector::new(); - let mut pc = Buffer::new( - PeerConnector::new(config, Network::Mainnet, node, &collector), - 1, - ); + // Until we finish fleshing out the peerset -- particularly + // pulling more peers -- we don't want to start with a single + // initial peer. So make a throwaway connection to the first, + // extract a list of addresses, and discard everything else. + // All the setup is kept in a sub-scope so we know we're not reusing it. + // + // Later, this should turn into initial_peers = vec![self.addr]; + config.initial_peers = { + use tokio::net::TcpStream; - let tcp_stream = TcpStream::connect(self.addr).await?; - pc.ready() - .await - .map_err(failure::Error::from_boxed_compat)?; - let mut client = pc - .call((tcp_stream, self.addr)) - .await - .map_err(failure::Error::from_boxed_compat)?; + let collector = TimestampCollector::new(); + let mut pc = Buffer::new( + PeerConnector::new(config.clone(), node.clone(), &collector), + 1, + ); - client.ready().await?; + let tcp_stream = TcpStream::connect(self.addr).await?; + pc.ready() + .await + .map_err(failure::Error::from_boxed_compat)?; + let mut client = pc + .call((tcp_stream, self.addr)) + .await + .map_err(failure::Error::from_boxed_compat)?; - let addrs = match client.call(Request::GetPeers).await? { - Response::Peers(addrs) => addrs, - _ => bail!("Got wrong response type"), + client.ready().await?; + + let addrs = match client.call(Request::GetPeers).await? { + Response::Peers(addrs) => addrs, + _ => bail!("Got wrong response type"), + }; + info!( + addrs.len = addrs.len(), + "got addresses from first connected peer" + ); + + addrs.into_iter().map(|meta| meta.addr).collect::>() }; - info!( - addrs.len = addrs.len(), - "got addresses from first connected peer" - ); - use failure::Error; - use futures::{ - future, - stream::{FuturesUnordered, StreamExt}, - }; - use std::time::Duration; - use tower::discover::{Change, ServiceStream}; - use tower_load::{peak_ewma::PeakEwmaDiscover, NoInstrument}; - - // construct a stream of services - let client_stream = PeakEwmaDiscover::new( - ServiceStream::new( - addrs - .into_iter() - .map(|meta| { - let mut pc = pc.clone(); - async move { - let stream = TcpStream::connect(meta.addr).await?; - pc.ready().await?; - let client = pc.call((stream, meta.addr)).await?; - Ok::<_, BoxedStdError>(Change::Insert(meta.addr, client)) - } - }) - .collect::>() - // Discard any errored connections... - .filter(|result| future::ready(result.is_ok())), - ), - Duration::from_secs(1), // default rtt estimate - Duration::from_secs(60), // decay time - NoInstrument, - ); - - info!("finished constructing discover"); - - let mut peer_set = PeerSet::new(client_stream); + let (mut peer_set, _tc) = zebra_network::peer_set::init(config, node); info!("waiting for peer_set ready"); peer_set.ready().await.map_err(Error::from_boxed_compat)?; info!("peer_set became ready, constructing addr requests"); + use failure::Error; + use futures::stream::{FuturesUnordered, StreamExt}; + let mut addr_reqs = FuturesUnordered::new(); for i in 0..10usize { info!(i, "awaiting peer_set ready"); diff --git a/zebrad/src/commands/listen.rs b/zebrad/src/commands/listen.rs index 930d2777..84762bab 100644 --- a/zebrad/src/commands/listen.rs +++ b/zebrad/src/commands/listen.rs @@ -75,10 +75,7 @@ impl ListenCmd { let config = app_config().network.clone(); let collector = TimestampCollector::new(); - let mut pc = Buffer::new( - PeerConnector::new(config, Network::Mainnet, node, &collector), - 1, - ); + let mut pc = Buffer::new(PeerConnector::new(config, node, &collector), 1); let mut listener = TcpListener::bind(self.addr).await?;