From 0833d31ec7e59164a0fadd0ab7797aa6e859cb7f Mon Sep 17 00:00:00 2001 From: Henry de Valence Date: Mon, 21 Oct 2019 22:39:05 -0700 Subject: [PATCH] Use a timer to add peers by interval. --- zebra-network/src/config.rs | 3 +++ zebra-network/src/peer_set.rs | 30 ++++++++++++++++++++++++------ 2 files changed, 27 insertions(+), 6 deletions(-) diff --git a/zebra-network/src/config.rs b/zebra-network/src/config.rs index 8f91a2b4..aad65cdb 100644 --- a/zebra-network/src/config.rs +++ b/zebra-network/src/config.rs @@ -24,6 +24,8 @@ pub struct Config { pub peerset_request_buffer_size: usize, /// The timeout for peer handshakes. pub handshake_timeout: Duration, + /// How frequently we attempt to connect to a new peer. + pub new_peer_interval: Duration, } impl Default for Config { @@ -39,6 +41,7 @@ impl Default for Config { ewma_decay_time: Duration::from_secs(60), peerset_request_buffer_size: 1, handshake_timeout: Duration::from_secs(4), + new_peer_interval: Duration::from_secs(120), } } } diff --git a/zebra-network/src/peer_set.rs b/zebra-network/src/peer_set.rs index bcccc7be..c4c6469f 100644 --- a/zebra-network/src/peer_set.rs +++ b/zebra-network/src/peer_set.rs @@ -7,6 +7,7 @@ use std::{ net::SocketAddr, pin::Pin, sync::{Arc, Mutex}, + time::Duration, }; use futures::{ @@ -140,7 +141,14 @@ where info!("Sending initial request for peers"); tokio::spawn( - crawl_and_dial(demand_rx, candidates, peer_connector, peerset_tx).map(|result| { + crawl_and_dial( + config.new_peer_interval, + demand_rx, + candidates, + peer_connector, + peerset_tx, + ) + .map(|result| { if let Err(e) = result { error!(%e); } @@ -212,9 +220,16 @@ where /// Given a channel that signals a need for new peers, try to connect to a peer /// and send the resulting `PeerClient` through a channel. /// -#[instrument(skip(demand_signal, candidates, peer_connector, success_tx))] +#[instrument(skip( + new_peer_interval, + demand_signal, + candidates, + peer_connector, + success_tx +))] async fn crawl_and_dial( - mut demand_signal: mpsc::Receiver<()>, + new_peer_interval: Duration, + demand_signal: mpsc::Receiver<()>, mut candidates: CandidateSet, peer_connector: C, mut success_tx: mpsc::Sender, @@ -271,9 +286,12 @@ where } } - // XXX instead of just responding to demand, we could respond to demand *or* - // to a interval timer (to continuously grow the peer set). - while let Some(()) = demand_signal.next().await { + use tokio::timer::Interval; + let mut connect_signal = futures::stream::select( + Interval::new_interval(new_peer_interval).map(|_| ()), + demand_signal, + ); + while let Some(()) = connect_signal.next().await { debug!("got demand signal from peer set, updating candidates"); candidates.update().await?; loop {