Use a timer to add peers by interval.

This commit is contained in:
Henry de Valence 2019-10-21 22:39:05 -07:00
parent 9a779a639f
commit 0833d31ec7
2 changed files with 27 additions and 6 deletions

View File

@ -24,6 +24,8 @@ pub struct Config {
pub peerset_request_buffer_size: usize, pub peerset_request_buffer_size: usize,
/// The timeout for peer handshakes. /// The timeout for peer handshakes.
pub handshake_timeout: Duration, pub handshake_timeout: Duration,
/// How frequently we attempt to connect to a new peer.
pub new_peer_interval: Duration,
} }
impl Default for Config { impl Default for Config {
@ -39,6 +41,7 @@ impl Default for Config {
ewma_decay_time: Duration::from_secs(60), ewma_decay_time: Duration::from_secs(60),
peerset_request_buffer_size: 1, peerset_request_buffer_size: 1,
handshake_timeout: Duration::from_secs(4), handshake_timeout: Duration::from_secs(4),
new_peer_interval: Duration::from_secs(120),
} }
} }
} }

View File

@ -7,6 +7,7 @@ use std::{
net::SocketAddr, net::SocketAddr,
pin::Pin, pin::Pin,
sync::{Arc, Mutex}, sync::{Arc, Mutex},
time::Duration,
}; };
use futures::{ use futures::{
@ -140,7 +141,14 @@ where
info!("Sending initial request for peers"); info!("Sending initial request for peers");
tokio::spawn( tokio::spawn(
crawl_and_dial(demand_rx, candidates, peer_connector, peerset_tx).map(|result| { crawl_and_dial(
config.new_peer_interval,
demand_rx,
candidates,
peer_connector,
peerset_tx,
)
.map(|result| {
if let Err(e) = result { if let Err(e) = result {
error!(%e); error!(%e);
} }
@ -212,9 +220,16 @@ where
/// Given a channel that signals a need for new peers, try to connect to a peer /// Given a channel that signals a need for new peers, try to connect to a peer
/// and send the resulting `PeerClient` through a channel. /// and send the resulting `PeerClient` through a channel.
/// ///
#[instrument(skip(demand_signal, candidates, peer_connector, success_tx))] #[instrument(skip(
new_peer_interval,
demand_signal,
candidates,
peer_connector,
success_tx
))]
async fn crawl_and_dial<C, S>( async fn crawl_and_dial<C, S>(
mut demand_signal: mpsc::Receiver<()>, new_peer_interval: Duration,
demand_signal: mpsc::Receiver<()>,
mut candidates: CandidateSet<S>, mut candidates: CandidateSet<S>,
peer_connector: C, peer_connector: C,
mut success_tx: mpsc::Sender<PeerChange>, mut success_tx: mpsc::Sender<PeerChange>,
@ -271,9 +286,12 @@ where
} }
} }
// XXX instead of just responding to demand, we could respond to demand *or* use tokio::timer::Interval;
// to a interval timer (to continuously grow the peer set). let mut connect_signal = futures::stream::select(
while let Some(()) = demand_signal.next().await { Interval::new_interval(new_peer_interval).map(|_| ()),
demand_signal,
);
while let Some(()) = connect_signal.next().await {
debug!("got demand signal from peer set, updating candidates"); debug!("got demand signal from peer set, updating candidates");
candidates.update().await?; candidates.update().await?;
loop { loop {