diff --git a/zebra-network/src/config.rs b/zebra-network/src/config.rs index 95365f90..7c4ce771 100644 --- a/zebra-network/src/config.rs +++ b/zebra-network/src/config.rs @@ -43,6 +43,9 @@ pub struct Config { /// How frequently we attempt to connect to a new peer. pub new_peer_interval: Duration, + + /// The initial target size for the peer set. + pub peerset_initial_target_size: usize, } impl Config { @@ -92,6 +95,7 @@ impl Default for Config { peerset_request_buffer_size: 1, handshake_timeout: Duration::from_secs(4), new_peer_interval: Duration::from_secs(120), + peerset_initial_target_size: 50, } } } diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index 20a09bb8..c917aa3b 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -105,7 +105,7 @@ where let network = self.config.network; let fut = async move { - info!("connecting to remote peer"); + debug!("connecting to remote peer"); let mut stream = Framed::new(tcp_stream, Codec::builder().for_network(network).finish()); diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index 8660a48b..4abbbd72 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -6,7 +6,6 @@ use std::{ net::SocketAddr, sync::{Arc, Mutex}, - time::Duration, }; use futures::{ @@ -72,7 +71,7 @@ where // Create an mpsc channel for peer changes, with a generous buffer. let (peerset_tx, peerset_rx) = mpsc::channel::(100); // Create an mpsc channel for peerset demand signaling. - let (demand_tx, demand_rx) = mpsc::channel::<()>(100); + let (mut demand_tx, demand_rx) = mpsc::channel::<()>(100); // Connect the rx end to a PeerSet, wrapping new peers in load instruments. let peer_set = Buffer::new( @@ -87,7 +86,7 @@ where config.ewma_decay_time, NoInstrument, ), - demand_tx, + demand_tx.clone(), ), config.peerset_request_buffer_size, ); @@ -121,9 +120,15 @@ where let _ = candidates.update().await; info!("Sending initial request for peers"); + + for _ in 0..config.peerset_initial_target_size { + let _ = demand_tx.try_send(()); + } + tokio::spawn( crawl_and_dial( config.new_peer_interval, + demand_tx, demand_rx, candidates, connector, @@ -192,11 +197,11 @@ where /// Given a channel that signals a need for new peers, try to connect to a peer /// and send the resulting `peer::Client` through a channel. -/// -#[instrument(skip(new_peer_interval, demand_signal, candidates, connector, success_tx))] +#[instrument(skip(new_peer_interval, demand_tx, demand_rx, candidates, connector, success_tx))] async fn crawl_and_dial( - new_peer_interval: Duration, - demand_signal: mpsc::Receiver<()>, + new_peer_interval: std::time::Duration, + mut demand_tx: mpsc::Sender<()>, + mut demand_rx: mpsc::Receiver<()>, mut candidates: CandidateSet, mut connector: C, mut success_tx: mpsc::Sender, @@ -208,69 +213,68 @@ where S: Service, S::Future: Send + 'static, { - use futures::TryFutureExt; + use futures::{future::{select, Either::{Left, Right}}, TryFutureExt}; - // On creation, we are likely to have very few peers, so try to get more - // connections quickly by concurrently connecting to a large number of - // candidates. let mut handshakes = FuturesUnordered::new(); - for _ in 0..50usize { - if let Some(candidate) = candidates.next() { - connector.ready().await?; - handshakes.push( - connector - .call(candidate.addr) - // Use map_err to tag failed connections with the MetaAddr, - // so they can be reported to the CandidateSet. - .map_err(move |_| candidate), - ) - } - } - while let Some(handshake) = handshakes.next().await { - match handshake { - Ok(change) => { - debug!("Successfully dialed new peer, sending to peerset"); + // returns None when empty. + // Keeping an unresolved future in the pool means the stream + // never terminates. + handshakes.push(future::pending().boxed()); + + let mut crawl_timer = tokio::time::interval(new_peer_interval); + + loop { + // This is a little awkward because there's no select3. + match select( + select(demand_rx.next(), crawl_timer.next()), + handshakes.next(), + ) + .await + { + Left((Left((Some(_demand), _)), _)) => { + if handshakes.len() > 50 { + // This is set to trace level because when the peerset is + // congested it can generate a lot of demand signal very rapidly. + trace!("too many in-flight handshakes, dropping demand signal"); + continue; + } + if let Some(candidate) = candidates.next() { + debug!(?candidate.addr, "attempting outbound connection in response to demand"); + connector.ready().await?; + handshakes.push( + connector + .call(candidate.addr) + .map_err(move |_| candidate) + .boxed(), + ); + } else { + warn!("demand for peers but no available candidates"); + } + } + // did a drill sergeant write this? no there's just no Either3 + Left((Right((Some(_timer), _)), _)) => { + debug!("crawling for more peers"); + candidates.update().await?; + // Try to connect to a new peer. + let _ = demand_tx.try_send(()); + } + Right((Some(Ok(change)), _)) => { + // in fact all changes are Insert so this branch is always taken + if let Change::Insert(ref addr, _) = change { + debug!(candidate.addr = ?addr, "successfully dialed new peer"); + } success_tx.send(Ok(change)).await?; } - Err(candidate) => { - debug!(?candidate.addr, "marking address as failed"); + Right((Some(Err(candidate)), _)) => { + debug!(?candidate.addr, "failed to connect to peer"); candidates.report_failed(candidate); + // The demand signal that was taken out of the queue + // to attempt to connect to the failed candidate never + // turned into a connection, so add it back: + let _ = demand_tx.try_send(()); } - } - } - - let mut connect_signal = futures::stream::select( - tokio::time::interval(new_peer_interval).map(|_| ()), - demand_signal, - ); - while let Some(()) = connect_signal.next().await { - debug!("got demand signal from peer set, updating candidates"); - candidates.update().await?; - loop { - let candidate = match candidates.next() { - Some(candidate) => candidate, - None => { - warn!("got demand for more peers but no available candidates"); - break; - } - }; - - connector.ready().await?; - match connector - .call(candidate.addr) - .map_err(move |_| candidate) - .await - { - Ok(change) => { - debug!("Successfully dialed new peer, sending to peerset"); - success_tx.send(Ok(change)).await?; - break; - } - Err(candidate) => { - debug!(?candidate.addr, "marking address as failed"); - candidates.report_failed(candidate); - } - } + // If we don't match one of these patterns, shutdown. + _ => break, } } Ok(())