From 32ea511a73b8e25d26b59c653c25eec85cb43b1c Mon Sep 17 00:00:00 2001 From: teor Date: Thu, 15 Jun 2023 10:43:41 +1000 Subject: [PATCH] fix(net): Reduce inbound service overloads and add a timeout (#6950) * Increase concurrency limit, reduce peer broadcast * Fix a div_ceil() TODO * Document security requirements of inbound peer overload handling * Reduce drop probability and fix its formatting * Put a 5 second timeout on inbound service requests * Update drop probability tests * Add error types, metrics, and logging for InboundTimeout errors --- Cargo.lock | 1 + zebra-network/Cargo.toml | 1 + zebra-network/src/constants.rs | 2 +- zebra-network/src/peer/connection.rs | 56 +++++++++++++++---- .../src/peer/connection/tests/vectors.rs | 34 +++++------ zebra-network/src/peer/error.rs | 6 ++ zebra-network/src/peer_set/set.rs | 7 ++- zebrad/src/commands/start.rs | 10 +++- zebrad/src/components/inbound.rs | 13 +++-- zebrad/src/components/inbound/downloads.rs | 2 +- 10 files changed, 94 insertions(+), 38 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 02bbb1f5..1c29b1fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5759,6 +5759,7 @@ dependencies = [ "itertools", "lazy_static", "metrics 0.21.0", + "num-integer", "ordered-map", "pin-project", "proptest", diff --git a/zebra-network/Cargo.toml b/zebra-network/Cargo.toml index 339086a9..36016071 100644 --- a/zebra-network/Cargo.toml +++ b/zebra-network/Cargo.toml @@ -46,6 +46,7 @@ humantime-serde = "1.1.1" indexmap = { version = "1.9.3", features = ["serde"] } itertools = "0.10.5" lazy_static = "1.4.0" +num-integer = "0.1.45" ordered-map = "0.4.2" pin-project = "1.1.0" rand = { version = "0.8.5", package = "rand" } diff --git a/zebra-network/src/constants.rs b/zebra-network/src/constants.rs index f327cce4..e137fb7b 100644 --- a/zebra-network/src/constants.rs +++ b/zebra-network/src/constants.rs @@ -349,7 +349,7 @@ pub const MIN_OVERLOAD_DROP_PROBABILITY: f32 = 0.05; /// The maximum probability of dropping a peer connection when it receives an /// [`Overloaded`](crate::PeerError::Overloaded) error. -pub const MAX_OVERLOAD_DROP_PROBABILITY: f32 = 0.95; +pub const MAX_OVERLOAD_DROP_PROBABILITY: f32 = 0.5; /// The minimum interval between logging peer set status updates. pub const MIN_PEER_SET_LOG_INTERVAL: Duration = Duration::from_secs(60); diff --git a/zebra-network/src/peer/connection.rs b/zebra-network/src/peer/connection.rs index 318357db..71838366 100644 --- a/zebra-network/src/peer/connection.rs +++ b/zebra-network/src/peer/connection.rs @@ -16,7 +16,7 @@ use futures::{ }; use rand::{thread_rng, Rng}; use tokio::time::{sleep, Sleep}; -use tower::{load_shed::error::Overloaded, Service, ServiceExt}; +use tower::{Service, ServiceExt}; use tracing_futures::Instrument; use zebra_chain::{ @@ -1283,6 +1283,12 @@ where // before sending the next inbound request. tokio::task::yield_now().await; + // # Security + // + // Holding buffer slots for a long time can cause hangs: + // + // + // The inbound service must be called immediately after a buffer slot is reserved. if self.svc.ready().await.is_err() { self.fail_with(PeerError::ServiceShutdown).await; return; @@ -1290,12 +1296,28 @@ where let rsp = match self.svc.call(req.clone()).await { Err(e) => { - if e.is::() { + if e.is::() { + // # Security + // + // The peer request queue must have a limited length. + // The buffer and load shed layers are added in `start::start()`. tracing::debug!("inbound service is overloaded, may close connection"); let now = Instant::now(); - self.handle_inbound_overload(req, now).await; + self.handle_inbound_overload(req, now, PeerError::Overloaded) + .await; + } else if e.is::() { + // # Security + // + // Peer requests must have a timeout. + // The timeout layer is added in `start::start()`. + tracing::info!(%req, "inbound service request timed out, may close connection"); + + let now = Instant::now(); + + self.handle_inbound_overload(req, now, PeerError::InboundTimeout) + .await; } else { // We could send a reject to the remote peer, but that might cause // them to disconnect, and we might be using them to sync blocks. @@ -1431,7 +1453,8 @@ where tokio::task::yield_now().await; } - /// Handle inbound service overload error responses by randomly terminating some connections. + /// Handle inbound service overload and timeout error responses by randomly terminating some + /// connections. /// /// # Security /// @@ -1450,15 +1473,19 @@ where /// The inbound connection rate-limit also makes it hard for multiple peers to perform this /// attack, because each inbound connection can only send one inbound request before its /// probability of being disconnected increases. - async fn handle_inbound_overload(&mut self, req: Request, now: Instant) { + async fn handle_inbound_overload(&mut self, req: Request, now: Instant, error: PeerError) { let prev = self.last_overload_time.replace(now); let drop_connection_probability = overload_drop_connection_probability(now, prev); if thread_rng().gen::() < drop_connection_probability { - metrics::counter!("pool.closed.loadshed", 1); + if matches!(error, PeerError::Overloaded) { + metrics::counter!("pool.closed.loadshed", 1); + } else { + metrics::counter!("pool.closed.inbound.timeout", 1); + } tracing::info!( - drop_connection_probability, + drop_connection_probability = format!("{drop_connection_probability:.3}"), remote_user_agent = ?self.connection_info.remote.user_agent, negotiated_version = ?self.connection_info.negotiated_version, peer = ?self.metrics_label, @@ -1467,14 +1494,19 @@ where remote_height = ?self.connection_info.remote.start_height, cached_addrs = ?self.cached_addrs.len(), connection_state = ?self.state, - "inbound service is overloaded, closing connection", + "inbound service {error} error, closing connection", ); - self.update_state_metrics(format!("In::Req::{}/Rsp::Overload::Error", req.command())); - self.fail_with(PeerError::Overloaded).await; + self.update_state_metrics(format!("In::Req::{}/Rsp::{error}::Error", req.command())); + self.fail_with(error).await; } else { - self.update_state_metrics(format!("In::Req::{}/Rsp::Overload::Ignored", req.command())); - metrics::counter!("pool.ignored.loadshed", 1); + self.update_state_metrics(format!("In::Req::{}/Rsp::{error}::Ignored", req.command())); + + if matches!(error, PeerError::Overloaded) { + metrics::counter!("pool.ignored.loadshed", 1); + } else { + metrics::counter!("pool.ignored.inbound.timeout", 1); + } } } } diff --git a/zebra-network/src/peer/connection/tests/vectors.rs b/zebra-network/src/peer/connection/tests/vectors.rs index cca8c8b2..4ab4db7a 100644 --- a/zebra-network/src/peer/connection/tests/vectors.rs +++ b/zebra-network/src/peer/connection/tests/vectors.rs @@ -687,11 +687,11 @@ fn overload_probability_reduces_over_time() { let drop_probability = overload_drop_connection_probability(now, Some(prev)); assert!( drop_probability <= MAX_OVERLOAD_DROP_PROBABILITY, - "if the overloads are very close together, drops can optionally decrease", + "if the overloads are very close together, drops can optionally decrease: {drop_probability} <= {MAX_OVERLOAD_DROP_PROBABILITY}", ); assert!( MAX_OVERLOAD_DROP_PROBABILITY - drop_probability < 0.001, - "if the overloads are very close together, drops can only decrease slightly", + "if the overloads are very close together, drops can only decrease slightly: {drop_probability}", ); let last_probability = drop_probability; @@ -700,11 +700,11 @@ fn overload_probability_reduces_over_time() { let drop_probability = overload_drop_connection_probability(now, Some(prev)); assert!( drop_probability < last_probability, - "if the overloads decrease, drops should decrease", + "if the overloads decrease, drops should decrease: {drop_probability} < {last_probability}", ); assert!( MAX_OVERLOAD_DROP_PROBABILITY - drop_probability < 0.001, - "if the overloads are very close together, drops can only decrease slightly", + "if the overloads are very close together, drops can only decrease slightly: {drop_probability}", ); let last_probability = drop_probability; @@ -713,11 +713,11 @@ fn overload_probability_reduces_over_time() { let drop_probability = overload_drop_connection_probability(now, Some(prev)); assert!( drop_probability < last_probability, - "if the overloads decrease, drops should decrease", + "if the overloads decrease, drops should decrease: {drop_probability} < {last_probability}", ); assert!( MAX_OVERLOAD_DROP_PROBABILITY - drop_probability < 0.001, - "if the overloads are very close together, drops can only decrease slightly", + "if the overloads are very close together, drops can only decrease slightly: {drop_probability}", ); let last_probability = drop_probability; @@ -726,11 +726,11 @@ fn overload_probability_reduces_over_time() { let drop_probability = overload_drop_connection_probability(now, Some(prev)); assert!( drop_probability < last_probability, - "if the overloads decrease, drops should decrease", + "if the overloads decrease, drops should decrease: {drop_probability} < {last_probability}", ); assert!( MAX_OVERLOAD_DROP_PROBABILITY - drop_probability < 0.01, - "if the overloads are very close together, drops can only decrease slightly", + "if the overloads are very close together, drops can only decrease slightly: {drop_probability}", ); let last_probability = drop_probability; @@ -739,11 +739,11 @@ fn overload_probability_reduces_over_time() { let drop_probability = overload_drop_connection_probability(now, Some(prev)); assert!( drop_probability < last_probability, - "if the overloads decrease, drops should decrease", + "if the overloads decrease, drops should decrease: {drop_probability} < {last_probability}", ); assert!( - MAX_OVERLOAD_DROP_PROBABILITY - drop_probability > 0.5, - "if the overloads are distant, drops should decrease a lot", + MAX_OVERLOAD_DROP_PROBABILITY - drop_probability > 0.4, + "if the overloads are distant, drops should decrease a lot: {drop_probability}", ); let last_probability = drop_probability; @@ -752,11 +752,11 @@ fn overload_probability_reduces_over_time() { let drop_probability = overload_drop_connection_probability(now, Some(prev)); assert!( drop_probability < last_probability, - "if the overloads decrease, drops should decrease", + "if the overloads decrease, drops should decrease: {drop_probability} < {last_probability}", ); - assert!( - MAX_OVERLOAD_DROP_PROBABILITY - drop_probability > 0.7, - "if the overloads are distant, drops should decrease a lot", + assert_eq!( + drop_probability, MIN_OVERLOAD_DROP_PROBABILITY, + "if overloads are far apart, drops should have minimum drop probability: {drop_probability}", ); let _last_probability = drop_probability; @@ -765,14 +765,14 @@ fn overload_probability_reduces_over_time() { let drop_probability = overload_drop_connection_probability(now, Some(prev)); assert_eq!( drop_probability, MIN_OVERLOAD_DROP_PROBABILITY, - "if overloads are far apart, drops should have minimum drop probability", + "if overloads are far apart, drops should have minimum drop probability: {drop_probability}", ); // Base case: no previous overload let drop_probability = overload_drop_connection_probability(now, None); assert_eq!( drop_probability, MIN_OVERLOAD_DROP_PROBABILITY, - "if there is no previous overload time, overloads should have minimum drop probability", + "if there is no previous overload time, overloads should have minimum drop probability: {drop_probability}", ); } diff --git a/zebra-network/src/peer/error.rs b/zebra-network/src/peer/error.rs index 4d842ba5..6263fb56 100644 --- a/zebra-network/src/peer/error.rs +++ b/zebra-network/src/peer/error.rs @@ -82,6 +82,11 @@ pub enum PeerError { #[error("Internal services over capacity")] Overloaded, + /// This peer request's caused an internal service timeout, so the connection was dropped + /// to shed load or prevent attacks. + #[error("Internal services timed out")] + InboundTimeout, + /// This node's internal services are no longer able to service requests. #[error("Internal services have failed or shutdown")] ServiceShutdown, @@ -142,6 +147,7 @@ impl PeerError { PeerError::Serialization(inner) => format!("Serialization({inner})").into(), PeerError::DuplicateHandshake => "DuplicateHandshake".into(), PeerError::Overloaded => "Overloaded".into(), + PeerError::InboundTimeout => "InboundTimeout".into(), PeerError::ServiceShutdown => "ServiceShutdown".into(), PeerError::NotFoundResponse(_) => "NotFoundResponse".into(), PeerError::NotFoundRegistry(_) => "NotFoundRegistry".into(), diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index b3ff2a92..0353d377 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -111,6 +111,7 @@ use futures::{ stream::FuturesUnordered, }; use itertools::Itertools; +use num_integer::div_ceil; use tokio::{ sync::{broadcast, oneshot::error::TryRecvError, watch}, task::JoinHandle, @@ -808,9 +809,11 @@ where /// Given a number of ready peers calculate to how many of them Zebra will /// actually send the request to. Return this number. pub(crate) fn number_of_peers_to_broadcast(&self) -> usize { - // We are currently sending broadcast messages to half of the total peers. + // We are currently sending broadcast messages to a third of the total peers. + const PEER_FRACTION_TO_BROADCAST: usize = 3; + // Round up, so that if we have one ready peer, it gets the request. - (self.ready_services.len() + 1) / 2 + div_ceil(self.ready_services.len(), PEER_FRACTION_TO_BROADCAST) } /// Returns the list of addresses in the peer set. diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index edbc29d2..67b9b3e7 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -84,7 +84,7 @@ use zebra_rpc::server::RpcServer; use crate::{ application::{app_version, user_agent}, components::{ - inbound::{self, InboundSetupData}, + inbound::{self, InboundSetupData, MAX_INBOUND_RESPONSE_TIME}, mempool::{self, Mempool}, sync::{self, show_block_chain_progress, VERIFICATION_PIPELINE_SCALING_MULTIPLIER}, tokio::{RuntimeRun, TokioComponent}, @@ -132,10 +132,18 @@ impl StartCmd { // The service that our node uses to respond to requests by peers. The // load_shed middleware ensures that we reduce the size of the peer set // in response to excess load. + // + // # Security + // + // This layer stack is security-sensitive, modifying it can cause hangs, + // or enable denial of service attacks. + // + // See `zebra_network::Connection::drive_peer_request()` for details. let (setup_tx, setup_rx) = oneshot::channel(); let inbound = ServiceBuilder::new() .load_shed() .buffer(inbound::downloads::MAX_INBOUND_CONCURRENCY) + .timeout(MAX_INBOUND_RESPONSE_TIME) .service(Inbound::new( config.sync.full_verify_concurrency_limit, setup_rx, diff --git a/zebrad/src/components/inbound.rs b/zebrad/src/components/inbound.rs index d7c9ca08..e93aa851 100644 --- a/zebrad/src/components/inbound.rs +++ b/zebrad/src/components/inbound.rs @@ -11,6 +11,7 @@ use std::{ pin::Pin, sync::Arc, task::{Context, Poll}, + time::Duration, }; use chrono::Utc; @@ -18,6 +19,7 @@ use futures::{ future::{FutureExt, TryFutureExt}, stream::Stream, }; +use num_integer::div_ceil; use tokio::sync::oneshot::{self, error::TryRecvError}; use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service, ServiceExt}; @@ -50,6 +52,12 @@ mod tests; use downloads::Downloads as BlockDownloads; +/// The maximum amount of time an inbound service response can take. +/// +/// If the response takes longer than this time, it will be cancelled, +/// and the peer might be disconnected. +pub const MAX_INBOUND_RESPONSE_TIME: Duration = Duration::from_secs(5); + /// The number of bytes the [`Inbound`] service will queue in response to a single block or /// transaction request, before ignoring any additional block or transaction IDs in that request. /// @@ -374,10 +382,7 @@ impl Service for Inbound { let mut peers = peers.sanitized(now); // Truncate the list - // - // TODO: replace with div_ceil once it stabilises - // https://github.com/rust-lang/rust/issues/88581 - let address_limit = (peers.len() + ADDR_RESPONSE_LIMIT_DENOMINATOR - 1) / ADDR_RESPONSE_LIMIT_DENOMINATOR; + let address_limit = div_ceil(peers.len(), ADDR_RESPONSE_LIMIT_DENOMINATOR); let address_limit = MAX_ADDRS_IN_MESSAGE.min(address_limit); peers.truncate(address_limit); diff --git a/zebrad/src/components/inbound/downloads.rs b/zebrad/src/components/inbound/downloads.rs index aa8b2cf6..11200e66 100644 --- a/zebrad/src/components/inbound/downloads.rs +++ b/zebrad/src/components/inbound/downloads.rs @@ -49,7 +49,7 @@ type BoxError = Box; /// Since Zebra keeps an `inv` index, inbound downloads for malicious blocks /// will be directed to the malicious node that originally gossiped the hash. /// Therefore, this attack can be carried out by a single malicious node. -pub const MAX_INBOUND_CONCURRENCY: usize = 20; +pub const MAX_INBOUND_CONCURRENCY: usize = 30; /// The action taken in response to a peer's gossiped block hash. pub enum DownloadAction {