From 03aa6f671f2574a9dbc21ec006278ed04cf657a4 Mon Sep 17 00:00:00 2001 From: Jane Lusby Date: Tue, 9 Mar 2021 17:36:05 -0800 Subject: [PATCH] Implement outbound connection rate limiting - includes config rename with alias (#1855) * Implement outbound connection rate limiting * fix breaking change on config Co-authored-by: teor --- zebra-network/src/config.rs | 12 ++++-- zebra-network/src/peer_set/candidate_set.rs | 47 ++++++++++++++++++--- zebra-network/src/peer_set/initialize.rs | 10 ++--- 3 files changed, 54 insertions(+), 15 deletions(-) diff --git a/zebra-network/src/config.rs b/zebra-network/src/config.rs index dd270745..2fab27a9 100644 --- a/zebra-network/src/config.rs +++ b/zebra-network/src/config.rs @@ -33,8 +33,14 @@ pub struct Config { /// set size to reduce Zebra's bandwidth usage. pub peerset_initial_target_size: usize, - /// How frequently we attempt to connect to a new peer. - pub new_peer_interval: Duration, + /// How frequently we attempt to crawl the network to discover new peer + /// connections. + /// + /// This duration only pertains to the rate at which zebra crawls for new + /// peers, not the rate zebra connects to new peers, which is restricted to + /// CandidateSet::PEER_CONNECTION_INTERVAL + #[serde(alias = "new_peer_interval")] + pub crawl_new_peer_interval: Duration, } impl Config { @@ -146,7 +152,7 @@ impl Default for Config { network: Network::Mainnet, initial_mainnet_peers: mainnet_peers, initial_testnet_peers: testnet_peers, - new_peer_interval: Duration::from_secs(60), + crawl_new_peer_interval: Duration::from_secs(60), // The default peerset target size should be large enough to ensure // nodes have a reliable set of peers. But it should also be limited diff --git a/zebra-network/src/peer_set/candidate_set.rs b/zebra-network/src/peer_set/candidate_set.rs index 14d22e75..823ae4b0 100644 --- a/zebra-network/src/peer_set/candidate_set.rs +++ b/zebra-network/src/peer_set/candidate_set.rs @@ -1,7 +1,12 @@ -use std::sync::{Arc, Mutex}; +use std::{ + mem, + sync::{Arc, Mutex}, + time::Duration, +}; use chrono::Utc; use futures::stream::{FuturesUnordered, StreamExt}; +use tokio::time::{sleep, sleep_until, Sleep}; use tower::{Service, ServiceExt}; use crate::{types::MetaAddr, AddressBook, BoxError, PeerAddrState, Request, Response}; @@ -102,6 +107,7 @@ use crate::{types::MetaAddr, AddressBook, BoxError, PeerAddrState, Request, Resp pub(super) struct CandidateSet { pub(super) peer_set: Arc>, pub(super) peer_service: S, + next_peer_min_wait: Sleep, } impl CandidateSet @@ -109,11 +115,20 @@ where S: Service, S::Future: Send + 'static, { + /// The minimum time between successive calls to `CandidateSet::next()`. + /// + /// ## Security + /// + /// Zebra resists distributed denial of service attacks by making sure that new peer connections + /// are initiated at least `MIN_PEER_CONNECTION_INTERVAL` apart. + const MIN_PEER_CONNECTION_INTERVAL: Duration = Duration::from_millis(100); + /// Uses `peer_set` and `peer_service` to manage a [`CandidateSet`] of peers. pub fn new(peer_set: Arc>, peer_service: S) -> CandidateSet { CandidateSet { peer_set, peer_service, + next_peer_min_wait: sleep(Duration::from_secs(0)), } } @@ -188,13 +203,31 @@ where /// /// Live `Responded` peers will stay live if they keep responding, or /// become a reconnection candidate if they stop responding. - pub fn next(&mut self) -> Option { - let mut peer_set_guard = self.peer_set.lock().unwrap(); - let mut reconnect = peer_set_guard.reconnection_peers().next()?; + /// + /// ## Security + /// + /// Zebra resists distributed denial of service attacks by making sure that + /// new peer connections are initiated at least + /// `MIN_PEER_CONNECTION_INTERVAL` apart. + pub async fn next(&mut self) -> Option { + let current_deadline = self.next_peer_min_wait.deadline(); + let mut sleep = sleep_until(current_deadline + Self::MIN_PEER_CONNECTION_INTERVAL); + mem::swap(&mut self.next_peer_min_wait, &mut sleep); - reconnect.last_seen = Utc::now(); - reconnect.last_connection_state = PeerAddrState::AttemptPending; - peer_set_guard.update(reconnect); + let reconnect = { + let mut peer_set_guard = self.peer_set.lock().unwrap(); + // It's okay to early return here because we're returning None + // instead of yielding the next connection. + let mut reconnect = peer_set_guard.reconnection_peers().next()?; + + reconnect.last_seen = Utc::now(); + reconnect.last_connection_state = PeerAddrState::AttemptPending; + peer_set_guard.update(reconnect); + reconnect + }; + + // This is the line that is most relevant to the above ## Security section + sleep.await; Some(reconnect) } diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index d8418337..89f95252 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -171,7 +171,7 @@ where let crawl_guard = tokio::spawn( crawl_and_dial( - config.new_peer_interval, + config.crawl_new_peer_interval, demand_tx, demand_rx, candidates, @@ -269,7 +269,7 @@ where /// Given a channel that signals a need for new peers, try to connect to a peer /// and send the resulting `peer::Client` through a channel. #[instrument(skip( - new_peer_interval, + crawl_new_peer_interval, demand_tx, demand_rx, candidates, @@ -277,7 +277,7 @@ where success_tx ))] async fn crawl_and_dial( - new_peer_interval: std::time::Duration, + crawl_new_peer_interval: std::time::Duration, mut demand_tx: mpsc::Sender<()>, mut demand_rx: mpsc::Receiver<()>, mut candidates: CandidateSet, @@ -304,7 +304,7 @@ where // never terminates. handshakes.push(future::pending().boxed()); - let mut crawl_timer = tokio::time::interval(new_peer_interval); + let mut crawl_timer = tokio::time::interval(crawl_new_peer_interval); loop { metrics::gauge!( @@ -328,7 +328,7 @@ where trace!("too many in-flight handshakes, dropping demand signal"); continue; } - if let Some(candidate) = candidates.next() { + if let Some(candidate) = candidates.next().await { debug!(?candidate.addr, "attempting outbound connection in response to demand"); connector.ready_and().await?; handshakes.push(