diff --git a/zebra-network/src/peer_set.rs b/zebra-network/src/peer_set.rs index 52dbf59e..99465cdd 100644 --- a/zebra-network/src/peer_set.rs +++ b/zebra-network/src/peer_set.rs @@ -54,11 +54,18 @@ pub type BoxedZebraService = Box< type PeerChange = Result, BoxedStdError>; /// Initialize a peer set with the given `config`, forwarding peer requests to the `inbound_service`. -pub fn init( +pub async fn init( config: Config, inbound_service: S, ) -> ( - impl Service + Send + Clone + 'static, + impl Service< + Request, + Response = Response, + Error = BoxedStdError, + Future = impl Future> + Send, + > + Send + + Clone + + 'static, Arc>, ) where @@ -77,7 +84,7 @@ where let (demand_tx, demand_rx) = mpsc::channel::<()>(100); // Connect the rx end to a PeerSet, wrapping new peers in load instruments. - let peer_set = Buffer::new( + let mut peer_set = Buffer::new( PeerSet::new( PeakEwmaDiscover::new( ServiceStream::new( @@ -118,7 +125,16 @@ where ); // 3. Outgoing peers we connect to in response to load. + let mut candidates = CandidateSet::new(address_book.clone(), peer_set.clone()); + + // We need to await candidates.update() here, because Zcashd only sends one + // `addr` message per connection, and if we only have one initial peer we + // need to ensure that its `addr` message is used by the crawler. + // XXX this should go in CandidateSet::new, but we need init() -> Result<_,_> + let _ = candidates.update().await; + + info!("Sending initial request for peers"); tokio::spawn( crawl_and_dial(demand_rx, candidates, peer_connector, peerset_tx).map(|result| { if let Err(e) = result { @@ -127,7 +143,7 @@ where }), ); - (Box::new(peer_set), address_book) + (peer_set, address_book) } /// Use the provided `peer_connector` to connect to `initial_peers`, then send @@ -192,16 +208,11 @@ where /// Given a channel that signals a need for new peers, try to connect to a peer /// and send the resulting `PeerClient` through a channel. /// -#[instrument(skip( - demand_signal, - candidates, - peer_connector, - success_tx -))] +#[instrument(skip(demand_signal, candidates, peer_connector, success_tx))] async fn crawl_and_dial( mut demand_signal: mpsc::Receiver<()>, mut candidates: CandidateSet, - mut peer_connector: C, + peer_connector: C, mut success_tx: mpsc::Sender, ) -> Result<(), BoxedStdError> where @@ -210,25 +221,76 @@ where S: Service, S::Future: Send + 'static, { + // XXX this kind of boilerplate didn't exist before we made PeerConnector + // take (TcpStream, SocketAddr), which made it so that we could share code + // between inbound and outbound handshakes. Probably the cleanest way to + // make it go away again is to rename "Connector" to "Handshake" (since it + // is really responsible just for the handshake) and to have a "Connector" + // Service wrapper around "Handshake" that opens a TCP stream. + // We could also probably make the Handshake service `Clone` directly, + // which might be more efficient than using a Buffer wrapper. + use crate::types::MetaAddr; + use futures::TryFutureExt; + let try_connect = |candidate: MetaAddr| { + let mut pc = peer_connector.clone(); + async move { + let stream = TcpStream::connect(candidate.addr).await?; + pc.ready().await?; + pc.call((stream, candidate.addr)) + .await + .map(|client| Change::Insert(candidate.addr, client)) + } + // Use map_err to tag failed connections with the MetaAddr, + // so they can be reported to the CandidateSet. + .map_err(move |_| candidate) + }; + + // On creation, we are likely to have very few peers, so try to get more + // connections quickly by concurrently connecting to a large number of + // candidates. + let mut handshakes = FuturesUnordered::new(); + for _ in 0..50usize { + if let Some(candidate) = candidates.next() { + handshakes.push(try_connect(candidate)) + } + } + while let Some(handshake) = handshakes.next().await { + match handshake { + Ok(change) => { + debug!("Successfully dialed new peer, sending to peerset"); + success_tx.send(Ok(change)).await?; + } + Err(candidate) => { + debug!(?candidate.addr, "marking address as failed"); + candidates.report_failed(candidate); + } + } + } + // XXX instead of just responding to demand, we could respond to demand *or* // to a interval timer (to continuously grow the peer set). while let Some(()) = demand_signal.next().await { - debug!("Got demand signal from peer set"); + debug!("got demand signal from peer set, updating candidates"); + candidates.update().await?; loop { - candidates.update().await?; - // If we were unable to get a candidate, keep looping to crawl more. - let addr = match candidates.next() { - Some(candidate) => candidate.addr, - None => continue, + let candidate = match candidates.next() { + Some(candidate) => candidate, + None => { + warn!("got demand for more peers but no available candidates"); + break; + } }; - if let Ok(stream) = TcpStream::connect(addr).await { - peer_connector.ready().await?; - if let Ok(client) = peer_connector.call((stream, addr)).await { + match try_connect(candidate).await { + Ok(change) => { debug!("Successfully dialed new peer, sending to peerset"); - success_tx.send(Ok(Change::Insert(addr, client))).await?; + success_tx.send(Ok(change)).await?; break; } + Err(candidate) => { + debug!(?candidate.addr, "marking address as failed"); + candidates.report_failed(candidate); + } } } } diff --git a/zebrad/src/commands/connect.rs b/zebrad/src/commands/connect.rs index 7d954fed..13ac33c9 100644 --- a/zebrad/src/commands/connect.rs +++ b/zebrad/src/commands/connect.rs @@ -70,7 +70,7 @@ impl ConnectCmd { config.initial_peers = vec![self.addr]; - let (mut peer_set, address_book) = zebra_network::init(config, node); + let (mut peer_set, address_book) = zebra_network::init(config, node).await; info!("waiting for peer_set ready"); peer_set.ready().await.map_err(Error::from_boxed_compat)?;