diff --git a/zebra-network/Cargo.toml b/zebra-network/Cargo.toml index fd2edc3b..a1ccccd6 100644 --- a/zebra-network/Cargo.toml +++ b/zebra-network/Cargo.toml @@ -38,6 +38,7 @@ zebra-chain = { path = "../zebra-chain" } [dev-dependencies] proptest = "0.10" proptest-derive = "0.3" +tokio = { version = "0.3.6", features = ["test-util"] } toml = "0.5" zebra-chain = { path = "../zebra-chain", features = ["proptest-impl"] } diff --git a/zebra-network/src/constants.rs b/zebra-network/src/constants.rs index 4df38859..8b1541f6 100644 --- a/zebra-network/src/constants.rs +++ b/zebra-network/src/constants.rs @@ -49,6 +49,22 @@ pub const LIVE_PEER_DURATION: Duration = Duration::from_secs(60 + 20 + 20 + 20); /// connected peer. pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(60); +/// The minimum time between successive calls to [`CandidateSet::next()`][Self::next]. +/// +/// ## Security +/// +/// Zebra resists distributed denial of service attacks by making sure that new peer connections +/// are initiated at least `MIN_PEER_CONNECTION_INTERVAL` apart. +pub const MIN_PEER_CONNECTION_INTERVAL: Duration = Duration::from_millis(100); + +/// The minimum time between successive calls to [`CandidateSet::update()`][Self::update]. +/// +/// ## Security +/// +/// Zebra resists distributed denial of service attacks by making sure that requests for more +/// peer addresses are sent at least `MIN_PEER_GET_ADDR_INTERVAL` apart. +pub const MIN_PEER_GET_ADDR_INTERVAL: Duration = Duration::from_secs(10); + /// The number of GetAddr requests sent when crawling for new peers. /// /// ## SECURITY diff --git a/zebra-network/src/peer_set/candidate_set.rs b/zebra-network/src/peer_set/candidate_set.rs index 2197ca14..2133f259 100644 --- a/zebra-network/src/peer_set/candidate_set.rs +++ b/zebra-network/src/peer_set/candidate_set.rs @@ -108,7 +108,8 @@ mod tests; pub(super) struct CandidateSet { pub(super) address_book: Arc>, pub(super) peer_service: S, - next_peer_min_wait: Sleep, + wait_next_handshake: Sleep, + min_next_crawl: Instant, } impl CandidateSet @@ -116,14 +117,6 @@ where S: Service, S::Future: Send + 'static, { - /// The minimum time between successive calls to `CandidateSet::next()`. - /// - /// ## Security - /// - /// Zebra resists distributed denial of service attacks by making sure that new peer connections - /// are initiated at least `MIN_PEER_CONNECTION_INTERVAL` apart. - const MIN_PEER_CONNECTION_INTERVAL: Duration = Duration::from_millis(100); - /// Uses `address_book` and `peer_service` to manage a [`CandidateSet`] of peers. pub fn new( address_book: Arc>, @@ -132,7 +125,8 @@ where CandidateSet { address_book, peer_service, - next_peer_min_wait: sleep(Duration::from_secs(0)), + wait_next_handshake: sleep(Duration::from_secs(0)), + min_next_crawl: Instant::now(), } } @@ -165,6 +159,13 @@ where /// /// [`next`][Self::next] puts peers into the [`AttemptPending`] state. /// + /// ## Security + /// + /// This call is rate-limited to prevent sending a burst of repeated requests for new peer + /// addresses. Each call will only update the [`CandidateSet`] if more time + /// than [`MIN_PEER_GET_ADDR_INTERVAL`][constants::MIN_PEER_GET_ADDR_INTERVAL] has passed since + /// the last call. Otherwise, the update is skipped. + /// /// [`Responded`]: crate::PeerAddrState::Responded /// [`NeverAttemptedGossiped`]: crate::PeerAddrState::NeverAttemptedGossiped /// [`Failed`]: crate::PeerAddrState::Failed @@ -178,20 +179,27 @@ where /// /// See [`update_initial`][Self::update_initial] for details. async fn update_timeout(&mut self, fanout_limit: Option) -> Result<(), BoxError> { - // CORRECTNESS + // SECURITY // - // Use a timeout to avoid deadlocks when there are no connected - // peers, and: - // - we're waiting on a handshake to complete so there are peers, or - // - another task that handles or adds peers is waiting on this task - // to complete. - if let Ok(fanout_result) = - timeout(constants::REQUEST_TIMEOUT, self.update_fanout(fanout_limit)).await - { - fanout_result?; - } else { - // update must only return an error for permanent failures - info!("timeout waiting for the peer service to become ready"); + // Rate limit sending `GetAddr` messages to peers. + if self.min_next_crawl <= Instant::now() { + // CORRECTNESS + // + // Use a timeout to avoid deadlocks when there are no connected + // peers, and: + // - we're waiting on a handshake to complete so there are peers, or + // - another task that handles or adds peers is waiting on this task + // to complete. + if let Ok(fanout_result) = + timeout(constants::REQUEST_TIMEOUT, self.update_fanout(fanout_limit)).await + { + fanout_result?; + } else { + // update must only return an error for permanent failures + info!("timeout waiting for the peer service to become ready"); + } + + self.min_next_crawl = Instant::now() + constants::MIN_PEER_GET_ADDR_INTERVAL; } Ok(()) @@ -280,11 +288,11 @@ where /// /// Zebra resists distributed denial of service attacks by making sure that /// new peer connections are initiated at least - /// `MIN_PEER_CONNECTION_INTERVAL` apart. + /// [`MIN_PEER_CONNECTION_INTERVAL`][constants::MIN_PEER_CONNECTION_INTERVAL] apart. pub async fn next(&mut self) -> Option { - let current_deadline = self.next_peer_min_wait.deadline().max(Instant::now()); - let mut sleep = sleep_until(current_deadline + Self::MIN_PEER_CONNECTION_INTERVAL); - mem::swap(&mut self.next_peer_min_wait, &mut sleep); + let current_deadline = self.wait_next_handshake.deadline().max(Instant::now()); + let mut sleep = sleep_until(current_deadline + constants::MIN_PEER_CONNECTION_INTERVAL); + mem::swap(&mut self.wait_next_handshake, &mut sleep); // # Correctness // diff --git a/zebra-network/src/peer_set/candidate_set/tests/prop.rs b/zebra-network/src/peer_set/candidate_set/tests/prop.rs index da6c12ea..f68d4168 100644 --- a/zebra-network/src/peer_set/candidate_set/tests/prop.rs +++ b/zebra-network/src/peer_set/candidate_set/tests/prop.rs @@ -13,7 +13,10 @@ use tracing::Span; use zebra_chain::serialization::DateTime32; use super::super::{validate_addrs, CandidateSet}; -use crate::{types::MetaAddr, AddressBook, BoxError, Config, Request, Response}; +use crate::{ + constants::MIN_PEER_CONNECTION_INTERVAL, types::MetaAddr, AddressBook, BoxError, Config, + Request, Response, +}; proptest! { /// Test that validated gossiped peers never have a `last_seen` time that's in the future. @@ -87,6 +90,6 @@ where assert!(candidate_set.next().await.is_some()); assert!(Instant::now() >= minimum_reconnect_instant); - minimum_reconnect_instant += CandidateSet::::MIN_PEER_CONNECTION_INTERVAL; + minimum_reconnect_instant += MIN_PEER_CONNECTION_INTERVAL; } } diff --git a/zebra-network/src/peer_set/candidate_set/tests/vectors.rs b/zebra-network/src/peer_set/candidate_set/tests/vectors.rs index 4da9a8ab..4118447c 100644 --- a/zebra-network/src/peer_set/candidate_set/tests/vectors.rs +++ b/zebra-network/src/peer_set/candidate_set/tests/vectors.rs @@ -1,14 +1,32 @@ use std::{ + collections::VecDeque, convert::TryInto, + iter, net::{IpAddr, SocketAddr}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, Mutex, + }, + time::Duration as StdDuration, }; use chrono::{DateTime, Duration, Utc}; +use futures::future; +use tokio::{ + runtime::Runtime, + time::{self, Instant}, +}; +use tower::Service; +use tracing::Span; use zebra_chain::serialization::DateTime32; -use super::super::validate_addrs; -use crate::types::{MetaAddr, PeerServices}; +use super::super::{validate_addrs, CandidateSet}; +use crate::{ + constants::{GET_ADDR_FANOUT, MIN_PEER_GET_ADDR_INTERVAL}, + types::{MetaAddr, PeerServices}, + AddressBook, Config, Request, Response, +}; /// Test that offset is applied when all addresses have `last_seen` times in the future. #[test] @@ -115,6 +133,92 @@ fn rejects_all_addresses_if_applying_offset_causes_an_underflow() { assert!(validated_peers.next().is_none()); } +/// Test that calls to [`CandidateSet::update`] are rate limited. +#[test] +fn candidate_set_updates_are_rate_limited() { + // Run the test for enough time for `update` to actually run three times + const INTERVALS_TO_RUN: u32 = 3; + // How many times should `update` be called in each rate limit interval + const POLL_FREQUENCY_FACTOR: u32 = 3; + + let runtime = Runtime::new().expect("Failed to create Tokio runtime"); + let _guard = runtime.enter(); + + let address_book = AddressBook::new(&Config::default(), Span::none()); + let (peer_service, call_count) = mock_peer_service(); + let mut candidate_set = CandidateSet::new(Arc::new(Mutex::new(address_book)), peer_service); + + runtime.block_on(async move { + time::pause(); + + let time_limit = Instant::now() + + INTERVALS_TO_RUN * MIN_PEER_GET_ADDR_INTERVAL + + StdDuration::from_secs(1); + + while Instant::now() <= time_limit { + candidate_set + .update() + .await + .expect("Call to CandidateSet::update should not fail"); + + time::advance(MIN_PEER_GET_ADDR_INTERVAL / POLL_FREQUENCY_FACTOR).await; + } + + assert_eq!( + call_count.load(Ordering::SeqCst), + INTERVALS_TO_RUN as usize * GET_ADDR_FANOUT + ); + }); +} + +/// Test that a call to [`CandidateSet::update`] after a call to [`CandidateSet::update_inital`] is +/// rate limited. +#[test] +fn candidate_set_update_after_update_initial_is_rate_limited() { + let runtime = Runtime::new().expect("Failed to create Tokio runtime"); + let _guard = runtime.enter(); + + let address_book = AddressBook::new(&Config::default(), Span::none()); + let (peer_service, call_count) = mock_peer_service(); + let mut candidate_set = CandidateSet::new(Arc::new(Mutex::new(address_book)), peer_service); + + runtime.block_on(async move { + time::pause(); + + // Call `update_initial` first + candidate_set + .update_initial(GET_ADDR_FANOUT) + .await + .expect("Call to CandidateSet::update should not fail"); + + assert_eq!(call_count.load(Ordering::SeqCst), GET_ADDR_FANOUT); + + // The following two calls to `update` should be skipped + candidate_set + .update() + .await + .expect("Call to CandidateSet::update should not fail"); + time::advance(MIN_PEER_GET_ADDR_INTERVAL / 2).await; + candidate_set + .update() + .await + .expect("Call to CandidateSet::update should not fail"); + + assert_eq!(call_count.load(Ordering::SeqCst), GET_ADDR_FANOUT); + + // After waiting for at least the minimum interval the call to `update` should succeed + time::advance(MIN_PEER_GET_ADDR_INTERVAL).await; + candidate_set + .update() + .await + .expect("Call to CandidateSet::update should not fail"); + + assert_eq!(call_count.load(Ordering::SeqCst), 2 * GET_ADDR_FANOUT); + }); +} + +// Utility functions + /// Create a mock list of gossiped [`MetaAddr`]s with the specified `last_seen_times`. /// /// The IP address and port of the generated ports should not matter for the test. @@ -135,3 +239,49 @@ fn mock_gossiped_peers(last_seen_times: impl IntoIterator>) }) .collect() } + +/// Create a mock `PeerSet` service that checks that requests to it are rate limited. +/// +/// The function also returns an atomic counter, that can be used for checking how many times the +/// service was called. +fn mock_peer_service() -> ( + impl Service< + Request, + Response = Response, + Future = future::Ready>, + Error = E, + > + 'static, + Arc, +) { + let rate_limit_interval = MIN_PEER_GET_ADDR_INTERVAL; + + let call_counter = Arc::new(AtomicUsize::new(0)); + let call_counter_to_return = call_counter.clone(); + + let mut peer_request_tracker: VecDeque<_> = + iter::repeat(Instant::now()).take(GET_ADDR_FANOUT).collect(); + + let service = tower::service_fn(move |request| { + match request { + Request::Peers => { + // Get time from queue that the request is authorized to be sent + let authorized_request_time = peer_request_tracker + .pop_front() + .expect("peer_request_tracker should always have GET_ADDR_FANOUT elements"); + // Check that the request was rate limited + assert!(Instant::now() >= authorized_request_time); + // Push a new authorization, updated by the rate limit interval + peer_request_tracker.push_back(Instant::now() + rate_limit_interval); + + // Increment count of calls + call_counter.fetch_add(1, Ordering::SeqCst); + + // Return an empty list of peer addresses + future::ok(Response::Peers(vec![])) + } + _ => unreachable!("Received an unexpected internal message: {:?}", request), + } + }); + + (service, call_counter_to_return) +}