From 8d01750459d179e0cdc6823d6c16783c3679bf3e Mon Sep 17 00:00:00 2001 From: Conrado Gouvea Date: Wed, 27 Oct 2021 20:46:43 -0300 Subject: [PATCH] Rate-limit initial seed peer connections (#2943) * Rate-limit initial seed peer connections * Revert "Rate-limit initial seed peer connections" This reverts commit f779a1eb9e1ffd5497e96dfe8ae1be4340d5d24d. * Simplify logic * Avoid cooperative async task starvation in the peer crawler and listener If we don't yield in these loops, they can run for a long time before tokio forces them to yield. * Add test * Check for task panics in initial peers test * Remove duplicate code in rebase Co-authored-by: teor --- zebra-network/src/peer_set/initialize.rs | 28 ++++-- .../src/peer_set/initialize/tests/vectors.rs | 90 ++++++++++++++++++- 2 files changed, 107 insertions(+), 11 deletions(-) diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index 7c0bce0f..d01a6a31 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -13,7 +13,11 @@ use futures::{ TryFutureExt, }; use rand::seq::SliceRandom; -use tokio::{net::TcpListener, sync::broadcast, time::Instant}; +use tokio::{ + net::TcpListener, + sync::broadcast, + time::{sleep, Instant}, +}; use tower::{ buffer::Buffer, discover::Change, layer::Layer, load::peak_ewma::PeakEwmaDiscover, util::BoxService, Service, ServiceExt, @@ -248,7 +252,10 @@ where // # Security // - // TODO: rate-limit initial seed peer connections (#2326) + // Resists distributed denial of service attacks by making sure that + // new peer connections are initiated at least + // [`MIN_PEER_CONNECTION_INTERVAL`][constants::MIN_PEER_CONNECTION_INTERVAL] + // apart. // // # Correctness // @@ -258,17 +265,24 @@ where // single `FuturesUnordered` to completion, and handshakes have a short timeout. let mut handshakes: FuturesUnordered<_> = initial_peers .into_iter() - .map(|addr| { + .enumerate() + .map(|(i, addr)| { let connection_tracker = active_outbound_connections.track_connection(); let req = OutboundConnectorRequest { addr, connection_tracker, }; - outbound_connector - .clone() - .oneshot(req) - .map_err(move |e| (addr, e)) + let outbound_connector = outbound_connector.clone(); + async move { + // Rate-limit the connection, sleeping for an interval according + // to its index in the list. + sleep(constants::MIN_PEER_CONNECTION_INTERVAL.saturating_mul(i as u32)).await; + outbound_connector + .oneshot(req) + .map_err(move |e| (addr, e)) + .await + } }) .collect(); diff --git a/zebra-network/src/peer_set/initialize/tests/vectors.rs b/zebra-network/src/peer_set/initialize/tests/vectors.rs index beefc2fd..71366de8 100644 --- a/zebra-network/src/peer_set/initialize/tests/vectors.rs +++ b/zebra-network/src/peer_set/initialize/tests/vectors.rs @@ -17,13 +17,14 @@ use std::{ collections::HashSet, net::{Ipv4Addr, SocketAddr}, sync::Arc, - time::Duration, + time::{Duration, Instant}, }; use futures::{ channel::{mpsc, oneshot}, - FutureExt, + FutureExt, StreamExt, }; +use tokio::task::JoinHandle; use tower::{discover::Change, service_fn, Service}; use tracing::Span; @@ -31,11 +32,11 @@ use zebra_chain::{chain_tip::NoChainTip, parameters::Network, serialization::Dat use zebra_test::net::random_known_port; use crate::{ - init, + constants, init, meta_addr::MetaAddr, peer::{self, ErrorSlot, OutboundConnectorRequest}, peer_set::{ - initialize::{crawl_and_dial, PeerChange}, + initialize::{add_initial_peers, crawl_and_dial, PeerChange}, set::MorePeers, ActiveConnectionCounter, CandidateSet, }, @@ -587,6 +588,42 @@ async fn crawler_peer_limit_default_connect_ok_stay_open() { ); } +/// Test if the initial seed peer connections is rate-limited. +#[tokio::test] +async fn add_initial_peers_is_rate_limited() { + zebra_test::init(); + + // We don't need to actually connect to the peers; we only need to check + // if the connection attempts is rate-limited. Therefore, just return an error. + let outbound_connector = + service_fn(|_| async { Err("test outbound connector always returns errors".into()) }); + + const PEER_COUNT: usize = 10; + + let before = Instant::now(); + + let (initial_peers_task_handle, peerset_rx) = + spawn_add_initial_peers(PEER_COUNT, outbound_connector); + let connections = peerset_rx.take(PEER_COUNT).collect::>().await; + + let elapsed = Instant::now() - before; + + assert_eq!(connections.len(), PEER_COUNT); + // Make sure the rate limiting worked by checking if it took long enough + assert!( + elapsed > constants::MIN_PEER_CONNECTION_INTERVAL.saturating_mul((PEER_COUNT - 1) as u32), + "elapsed only {:?}", + elapsed + ); + + let initial_peers_result = initial_peers_task_handle.await; + assert!( + matches!(initial_peers_result, Ok(Ok(_))), + "unexpected error or panic in add_initial_peers task: {:?}", + initial_peers_result, + ); +} + /// Open a local listener on `listen_addr` for `network`. /// Asserts that the local listener address works as expected. async fn local_listener_port_with(listen_addr: SocketAddr, network: Network) { @@ -763,3 +800,48 @@ where (config, peerset_rx) } + +/// Initialize a task that connects to `peer_count` initial peers using the +/// given connector. +/// +/// Dummy IPs are used. +/// +/// Returns the task [`JoinHandle`], and the peer set receiver. +fn spawn_add_initial_peers( + peer_count: usize, + outbound_connector: C, +) -> ( + JoinHandle>, + mpsc::Receiver, +) +where + C: Service< + OutboundConnectorRequest, + Response = Change, + Error = BoxError, + > + Clone + + Send + + 'static, + C::Future: Send + 'static, +{ + // Create a list of dummy IPs and initialize a config using them as the + // initial peers. + let mut peers = HashSet::new(); + for address_number in 0..peer_count { + peers.insert( + SocketAddr::new(Ipv4Addr::new(127, 1, 1, address_number as _).into(), 1).to_string(), + ); + } + let config = Config { + initial_mainnet_peers: peers, + network: Network::Mainnet, + ..Config::default() + }; + + let (peerset_tx, peerset_rx) = mpsc::channel::(peer_count + 1); + + let add_fut = add_initial_peers(config, outbound_connector, peerset_tx); + let add_task_handle = tokio::spawn(add_fut); + + (add_task_handle, peerset_rx) +}