diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index d3b99621..3dabd727 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -51,6 +51,8 @@ use crate::{ #[cfg(test)] mod tests; +mod recent_by_ip; + /// A successful outbound peer connection attempt or inbound connection handshake. /// /// The [`Handshake`](peer::Handshake) service returns a [`Result`]. Only successful connections @@ -576,6 +578,9 @@ where + Clone, S::Future: Send + 'static, { + let mut recent_inbound_connections = + recent_by_ip::RecentByIp::new(None, Some(config.max_connections_per_ip)); + let mut active_inbound_connections = ActiveConnectionCounter::new_counter_with( config.peerset_inbound_connection_limit(), "Inbound Connections", @@ -605,10 +610,14 @@ where if active_inbound_connections.update_count() >= config.peerset_inbound_connection_limit() + || recent_inbound_connections.is_past_limit_or_add(addr.ip()) { // Too many open inbound connections or pending handshakes already. // Close the connection. std::mem::drop(tcp_stream); + // Allow invalid connections to be cleared quickly, + // but still put a limit on our CPU and network usage from failed connections. + tokio::time::sleep(constants::MIN_INBOUND_PEER_FAILED_CONNECTION_INTERVAL).await; continue; } diff --git a/zebra-network/src/peer_set/initialize/recent_by_ip.rs b/zebra-network/src/peer_set/initialize/recent_by_ip.rs new file mode 100644 index 00000000..b2fcf750 --- /dev/null +++ b/zebra-network/src/peer_set/initialize/recent_by_ip.rs @@ -0,0 +1,94 @@ +//! A set of IPs from recent connection attempts. + +use std::{ + collections::{HashMap, VecDeque}, + net::IpAddr, + time::{Duration, Instant}, +}; + +use crate::constants; + +#[cfg(test)] +mod tests; + +#[derive(Debug)] +/// Stores IPs of recently attempted inbound connections. +pub struct RecentByIp { + /// The list of IPs in decreasing connection age order. + pub by_time: VecDeque<(IpAddr, Instant)>, + + /// Stores IPs for recently attempted inbound connections. + pub by_ip: HashMap, + + /// The maximum number of peer connections Zebra will keep for a given IP address + /// before it drops any additional peer connections with that IP. + pub max_connections_per_ip: usize, + + /// The duration to wait after an entry is added before removing it. + pub time_limit: Duration, +} + +impl Default for RecentByIp { + fn default() -> Self { + Self::new(None, None) + } +} + +impl RecentByIp { + /// Creates a new [`RecentByIp`] + pub fn new(time_limit: Option, max_connections_per_ip: Option) -> Self { + let (by_time, by_ip) = Default::default(); + Self { + by_time, + by_ip, + time_limit: time_limit.unwrap_or(constants::MIN_PEER_RECONNECTION_DELAY), + max_connections_per_ip: max_connections_per_ip + .unwrap_or(constants::DEFAULT_MAX_CONNS_PER_IP), + } + } + + /// Prunes outdated entries, checks if there's a recently attempted inbound connection with + /// this IP, and adds the entry to `by_time`, and `by_ip` if needed. + /// + /// Returns true if the recently attempted inbound connection count is past the configured limit. + pub fn is_past_limit_or_add(&mut self, ip: IpAddr) -> bool { + let now = Instant::now(); + self.prune_by_time(now); + + let count = self.by_ip.entry(ip).or_default(); + if *count >= self.max_connections_per_ip { + true + } else { + *count += 1; + self.by_time.push_back((ip, now)); + false + } + } + + /// Prunes entries older than `time_limit`, decrementing or removing their counts in `by_ip`. + fn prune_by_time(&mut self, now: Instant) { + // Currently saturates to zero: + // + // + // This discards the whole structure if the time limit is very large, + // which is unexpected, but stops this list growing without limit. + // After the handshake, the peer set will remove any duplicate connections over the limit. + let age_limit = now - self.time_limit; + + // `by_time` must be sorted for this to work. + let split_off_idx = self.by_time.partition_point(|&(_, time)| time <= age_limit); + + let updated_by_time = self.by_time.split_off(split_off_idx); + + for (ip, _) in &self.by_time { + if let Some(count) = self.by_ip.get_mut(ip) { + *count -= 1; + if *count == 0 { + self.by_ip.remove(ip); + } + } + } + + self.by_time = updated_by_time; + } +} diff --git a/zebra-network/src/peer_set/initialize/recent_by_ip/tests.rs b/zebra-network/src/peer_set/initialize/recent_by_ip/tests.rs new file mode 100644 index 00000000..e5a589cd --- /dev/null +++ b/zebra-network/src/peer_set/initialize/recent_by_ip/tests.rs @@ -0,0 +1,69 @@ +//! Fixed test vectors for recent IP limits. + +use std::time::Duration; + +use crate::peer_set::initialize::recent_by_ip::RecentByIp; + +#[test] +fn old_connection_attempts_are_pruned() { + const TEST_TIME_LIMIT: Duration = Duration::from_secs(5); + + let _init_guard = zebra_test::init(); + + let mut recent_connections = RecentByIp::new(Some(TEST_TIME_LIMIT), None); + let ip = "127.0.0.1".parse().expect("should parse"); + + assert!( + !recent_connections.is_past_limit_or_add(ip), + "should not be past limit" + ); + assert!( + recent_connections.is_past_limit_or_add(ip), + "should be past max_connections_per_ip limit" + ); + + std::thread::sleep(TEST_TIME_LIMIT / 3); + + assert!( + recent_connections.is_past_limit_or_add(ip), + "should still contain entry after a third of the time limit" + ); + + std::thread::sleep(3 * TEST_TIME_LIMIT / 4); + + assert!( + !recent_connections.is_past_limit_or_add(ip), + "should prune entry after 13/12 * time_limit" + ); + + const TEST_MAX_CONNS_PER_IP: usize = 3; + + let mut recent_connections = + RecentByIp::new(Some(TEST_TIME_LIMIT), Some(TEST_MAX_CONNS_PER_IP)); + + for _ in 0..TEST_MAX_CONNS_PER_IP { + assert!( + !recent_connections.is_past_limit_or_add(ip), + "should not be past limit" + ); + } + + assert!( + recent_connections.is_past_limit_or_add(ip), + "should be past max_connections_per_ip limit" + ); + + std::thread::sleep(TEST_TIME_LIMIT / 3); + + assert!( + recent_connections.is_past_limit_or_add(ip), + "should still be past limit after a third of the reconnection delay" + ); + + std::thread::sleep(3 * TEST_TIME_LIMIT / 4); + + assert!( + !recent_connections.is_past_limit_or_add(ip), + "should prune entry after 13/12 * time_limit" + ); +} diff --git a/zebra-network/src/peer_set/initialize/tests/vectors.rs b/zebra-network/src/peer_set/initialize/tests/vectors.rs index 73c15077..c871ab43 100644 --- a/zebra-network/src/peer_set/initialize/tests/vectors.rs +++ b/zebra-network/src/peer_set/initialize/tests/vectors.rs @@ -727,7 +727,7 @@ async fn listener_peer_limit_zero_handshake_panic() { }); let (_config, mut peerset_rx) = - spawn_inbound_listener_with_peer_limit(0, unreachable_inbound_handshaker).await; + spawn_inbound_listener_with_peer_limit(0, None, unreachable_inbound_handshaker).await; let peer_result = peerset_rx.try_next(); assert!( @@ -752,7 +752,7 @@ async fn listener_peer_limit_one_handshake_error() { service_fn(|_| async { Err("test inbound handshaker always returns errors".into()) }); let (_config, mut peerset_rx) = - spawn_inbound_listener_with_peer_limit(1, error_inbound_handshaker).await; + spawn_inbound_listener_with_peer_limit(1, None, error_inbound_handshaker).await; let peer_result = peerset_rx.try_next(); assert!( @@ -794,8 +794,12 @@ async fn listener_peer_limit_one_handshake_ok_then_drop() { Ok(fake_client) }); - let (config, mut peerset_rx) = - spawn_inbound_listener_with_peer_limit(1, success_disconnect_inbound_handshaker).await; + let (config, mut peerset_rx) = spawn_inbound_listener_with_peer_limit( + 1, + usize::MAX, + success_disconnect_inbound_handshaker, + ) + .await; let mut peer_count: usize = 0; loop { @@ -853,7 +857,7 @@ async fn listener_peer_limit_one_handshake_ok_stay_open() { }); let (config, mut peerset_rx) = - spawn_inbound_listener_with_peer_limit(1, success_stay_open_inbound_handshaker).await; + spawn_inbound_listener_with_peer_limit(1, None, success_stay_open_inbound_handshaker).await; let mut peer_change_count: usize = 0; loop { @@ -917,7 +921,7 @@ async fn listener_peer_limit_default_handshake_error() { service_fn(|_| async { Err("test inbound handshaker always returns errors".into()) }); let (_config, mut peerset_rx) = - spawn_inbound_listener_with_peer_limit(None, error_inbound_handshaker).await; + spawn_inbound_listener_with_peer_limit(None, None, error_inbound_handshaker).await; let peer_result = peerset_rx.try_next(); assert!( @@ -963,8 +967,12 @@ async fn listener_peer_limit_default_handshake_ok_then_drop() { Ok(fake_client) }); - let (config, mut peerset_rx) = - spawn_inbound_listener_with_peer_limit(None, success_disconnect_inbound_handshaker).await; + let (config, mut peerset_rx) = spawn_inbound_listener_with_peer_limit( + None, + usize::MAX, + success_disconnect_inbound_handshaker, + ) + .await; let mut peer_count: usize = 0; loop { @@ -1022,7 +1030,8 @@ async fn listener_peer_limit_default_handshake_ok_stay_open() { }); let (config, mut peerset_rx) = - spawn_inbound_listener_with_peer_limit(None, success_stay_open_inbound_handshaker).await; + spawn_inbound_listener_with_peer_limit(None, None, success_stay_open_inbound_handshaker) + .await; let mut peer_change_count: usize = 0; loop { @@ -1609,6 +1618,7 @@ where /// Returns the generated [`Config`], and the peer set receiver. async fn spawn_inbound_listener_with_peer_limit( peerset_initial_target_size: impl Into>, + max_connections_per_ip: impl Into>, listen_handshaker: S, ) -> (Config, mpsc::Receiver) where @@ -1623,6 +1633,9 @@ where let listen_addr = "127.0.0.1:0".parse().unwrap(); let mut config = Config { listen_addr, + max_connections_per_ip: max_connections_per_ip + .into() + .unwrap_or(constants::DEFAULT_MAX_CONNS_PER_IP), ..Config::default() };