From aaef94c2bf3062d0195cdbb020a85003e6eb54ce Mon Sep 17 00:00:00 2001 From: Janito Vaqueiro Ferreira Filho Date: Mon, 7 Jun 2021 01:13:46 -0300 Subject: [PATCH] Prevent burst of reconnection attempts (#2251) * Rate-limit new outbound peer connections Set the rate-limiting sleep timer to use a delay added to the maximum between the next peer connection instant and now. This ensures that the timer always sleeps at least the time used for the delay. This change fixes rate-limiting new outbound peer connections, since before there could be a burst of attempts until the deadline progressed to the current instant. Fixes #2216 * Create `MetaAddr::alternate_node_strategy` helper Creates arbitrary `MetaAddr`s as if they were network nodes that sent their listening address. * Test outbound peer connection rate limiting Tests if connections are rate limited to 10 per second, and also tests that sleeping before continuing with the attempts still respets the rate limit and does not result in a burst of reconnection attempts. --- zebra-network/src/meta_addr/arbitrary.rs | 6 ++ zebra-network/src/peer_set/candidate_set.rs | 4 +- .../src/peer_set/candidate_set/tests/prop.rs | 73 ++++++++++++++++++- 3 files changed, 79 insertions(+), 4 deletions(-) diff --git a/zebra-network/src/meta_addr/arbitrary.rs b/zebra-network/src/meta_addr/arbitrary.rs index 5c4d2872..8c85c95a 100644 --- a/zebra-network/src/meta_addr/arbitrary.rs +++ b/zebra-network/src/meta_addr/arbitrary.rs @@ -16,6 +16,12 @@ impl MetaAddr { }) .boxed() } + + pub fn alternate_node_strategy() -> BoxedStrategy { + canonical_socket_addr() + .prop_map(|address| MetaAddr::new_alternate(&address, &PeerServices::NODE_NETWORK)) + .boxed() + } } impl Arbitrary for MetaAddr { diff --git a/zebra-network/src/peer_set/candidate_set.rs b/zebra-network/src/peer_set/candidate_set.rs index ef84b340..2197ca14 100644 --- a/zebra-network/src/peer_set/candidate_set.rs +++ b/zebra-network/src/peer_set/candidate_set.rs @@ -1,7 +1,7 @@ use std::{cmp::min, mem, sync::Arc, time::Duration}; use futures::stream::{FuturesUnordered, StreamExt}; -use tokio::time::{sleep, sleep_until, timeout, Sleep}; +use tokio::time::{sleep, sleep_until, timeout, Instant, Sleep}; use tower::{Service, ServiceExt}; use zebra_chain::serialization::DateTime32; @@ -282,7 +282,7 @@ where /// new peer connections are initiated at least /// `MIN_PEER_CONNECTION_INTERVAL` apart. pub async fn next(&mut self) -> Option { - let current_deadline = self.next_peer_min_wait.deadline(); + let current_deadline = self.next_peer_min_wait.deadline().max(Instant::now()); let mut sleep = sleep_until(current_deadline + Self::MIN_PEER_CONNECTION_INTERVAL); mem::swap(&mut self.next_peer_min_wait, &mut sleep); diff --git a/zebra-network/src/peer_set/candidate_set/tests/prop.rs b/zebra-network/src/peer_set/candidate_set/tests/prop.rs index e35139cc..da6c12ea 100644 --- a/zebra-network/src/peer_set/candidate_set/tests/prop.rs +++ b/zebra-network/src/peer_set/candidate_set/tests/prop.rs @@ -1,9 +1,19 @@ +use std::{ + sync::{Arc, Mutex}, + time::{Duration, Instant}, +}; + use proptest::{collection::vec, prelude::*}; +use tokio::{ + runtime::Runtime, + time::{sleep, timeout}, +}; +use tracing::Span; use zebra_chain::serialization::DateTime32; -use super::super::validate_addrs; -use crate::types::MetaAddr; +use super::super::{validate_addrs, CandidateSet}; +use crate::{types::MetaAddr, AddressBook, BoxError, Config, Request, Response}; proptest! { /// Test that validated gossiped peers never have a `last_seen` time that's in the future. @@ -21,3 +31,62 @@ proptest! { } } } + +proptest! { + #![proptest_config(ProptestConfig::with_cases(16))] + + /// Test that new outbound peer connections are rate-limited. + #[test] + fn new_outbound_peer_connections_are_rate_limited( + peers in vec(MetaAddr::alternate_node_strategy(), 10), + initial_candidates in 0..4usize, + extra_candidates in 0..4usize, + ) { + zebra_test::init(); + + let runtime = Runtime::new().expect("Failed to create Tokio runtime"); + let _guard = runtime.enter(); + + let peer_service = tower::service_fn(|_| async { + unreachable!("Mock peer service is never used"); + }); + + let mut address_book = AddressBook::new(&Config::default(), Span::none()); + address_book.extend(peers); + + let mut candidate_set = CandidateSet::new(Arc::new(Mutex::new(address_book)), peer_service); + + let checks = async move { + // Check rate limiting for initial peers + check_candidates_rate_limiting(&mut candidate_set, initial_candidates).await; + // Sleep more than the rate limiting delay + sleep(Duration::from_millis(400)).await; + // Check that the next peers are still respecting the rate limiting, without causing a + // burst of reconnections + check_candidates_rate_limiting(&mut candidate_set, extra_candidates).await; + }; + + assert!(runtime.block_on(timeout(Duration::from_secs(10), checks)).is_ok()); + } +} + +/// Check if obtaining a certain number of reconnection peers is rate limited. +/// +/// # Panics +/// +/// Will panic if a reconnection peer is returned too quickly, or if no reconnection peer is +/// returned. +async fn check_candidates_rate_limiting(candidate_set: &mut CandidateSet, candidates: usize) +where + S: tower::Service, + S::Future: Send + 'static, +{ + let mut minimum_reconnect_instant = Instant::now(); + + for _ in 0..candidates { + assert!(candidate_set.next().await.is_some()); + assert!(Instant::now() >= minimum_reconnect_instant); + + minimum_reconnect_instant += CandidateSet::::MIN_PEER_CONNECTION_INTERVAL; + } +}