From 2de93bba8ed2695e7aa269de45f6fb5e770b21cf Mon Sep 17 00:00:00 2001 From: Alfredo Garcia Date: Thu, 21 Oct 2021 20:04:46 -0300 Subject: [PATCH] Limit the number of initial peers (#2913) * limit the number of initial peers * Move more code out of zebra_network::initialize * Always limit the number of initial peers in the Config This way, we can never get the unused peers out. * Revert "Always limit the number of initial peers in the Config" This reverts commit 81ede597c885de7289ce19faa3c21a3439dd293c. Actually, this doesn't work, because we want those extra peers. * Minor tweaks Co-authored-by: Deirdre Connolly Co-authored-by: teor --- zebra-network/src/peer_set/initialize.rs | 48 ++++++++++++++++++------ 1 file changed, 36 insertions(+), 12 deletions(-) diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index df418f60..aa3819ed 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -3,7 +3,7 @@ // Portions of this submodule were adapted from tower-balance, // which is (c) 2019 Tower Contributors (MIT licensed). -use std::{net::SocketAddr, sync::Arc}; +use std::{collections::HashSet, net::SocketAddr, sync::Arc}; use futures::{ channel::mpsc, @@ -12,6 +12,7 @@ use futures::{ stream::{FuturesUnordered, StreamExt}, TryFutureExt, }; +use rand::seq::SliceRandom; use tokio::{net::TcpListener, sync::broadcast, time::Instant}; use tower::{ buffer::Buffer, discover::Change, layer::Layer, load::peak_ewma::PeakEwmaDiscover, @@ -141,11 +142,7 @@ where let config = config.clone(); let outbound_connector = outbound_connector.clone(); let peerset_tx = peerset_tx.clone(); - async move { - let initial_peers = config.initial_peers().await; - add_initial_peers(initial_peers, outbound_connector, peerset_tx).await - } - .boxed() + async move { add_initial_peers(&config, outbound_connector, peerset_tx).await }.boxed() }; let initial_peers_join = tokio::spawn(initial_peers_fut.instrument(Span::current())); @@ -192,11 +189,11 @@ where (peer_set, address_book) } -/// Use the provided `handshaker` to connect to `initial_peers`, then send -/// the results over `peerset_tx`. -#[instrument(skip(initial_peers, outbound_connector, peerset_tx))] +/// Use the provided `outbound_connector` to connect to the configured initial peers, +/// then send the resulting peer connections over `peerset_tx`. +#[instrument(skip(config, outbound_connector, peerset_tx))] async fn add_initial_peers( - initial_peers: std::collections::HashSet, + config: &Config, outbound_connector: S, mut peerset_tx: mpsc::Sender, ) -> Result @@ -208,14 +205,15 @@ where > + Clone, S::Future: Send + 'static, { - let initial_peer_count = initial_peers.len(); + let initial_peers = limit_initial_peers(config).await; + let mut handshake_success_total: usize = 0; let mut handshake_error_total: usize = 0; let mut active_outbound_connections = ActiveConnectionCounter::new_counter(); info!( - ?initial_peer_count, + initial_peer_count = ?initial_peers.len(), ?initial_peers, "connecting to initial peer set" ); @@ -286,6 +284,32 @@ where Ok(active_outbound_connections) } +/// Limit the number of `initial_peers` addresses entries to the configured +/// `peerset_initial_target_size`. +/// +/// The result is randomly chosen entries from the provided set of addresses. +async fn limit_initial_peers(config: &Config) -> HashSet { + let initial_peers = config.initial_peers().await; + let initial_peer_count = initial_peers.len(); + + // Limit the number of initial peers to `config.peerset_initial_target_size` + if initial_peer_count > config.peerset_initial_target_size { + info!( + "Limiting the initial peers list from {} to {}", + initial_peer_count, config.peerset_initial_target_size + ); + } + + let initial_peers_vect: Vec = initial_peers.iter().copied().collect(); + + // TODO: add unused peers to the AddressBook (#2931) + // https://docs.rs/rand/0.8.4/rand/seq/trait.SliceRandom.html#tymethod.partial_shuffle + initial_peers_vect + .choose_multiple(&mut rand::thread_rng(), config.peerset_initial_target_size) + .copied() + .collect() +} + /// Open a peer connection listener on `config.listen_addr`, /// returning the opened [`TcpListener`], and the address it is bound to. ///