fix(net): Reduce inbound service overloads and add a timeout (#6950)

* Increase concurrency limit, reduce peer broadcast

* Fix a div_ceil() TODO

* Document security requirements of inbound peer overload handling

* Reduce drop probability and fix its formatting

* Put a 5 second timeout on inbound service requests

* Update drop probability tests

* Add error types, metrics, and logging for InboundTimeout errors
This commit is contained in:
teor 2023-06-15 10:43:41 +10:00 committed by GitHub
parent 912693bf0a
commit 32ea511a73
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 94 additions and 38 deletions

View File

@ -5759,6 +5759,7 @@ dependencies = [
"itertools", "itertools",
"lazy_static", "lazy_static",
"metrics 0.21.0", "metrics 0.21.0",
"num-integer",
"ordered-map", "ordered-map",
"pin-project", "pin-project",
"proptest", "proptest",

View File

@ -46,6 +46,7 @@ humantime-serde = "1.1.1"
indexmap = { version = "1.9.3", features = ["serde"] } indexmap = { version = "1.9.3", features = ["serde"] }
itertools = "0.10.5" itertools = "0.10.5"
lazy_static = "1.4.0" lazy_static = "1.4.0"
num-integer = "0.1.45"
ordered-map = "0.4.2" ordered-map = "0.4.2"
pin-project = "1.1.0" pin-project = "1.1.0"
rand = { version = "0.8.5", package = "rand" } rand = { version = "0.8.5", package = "rand" }

View File

@ -349,7 +349,7 @@ pub const MIN_OVERLOAD_DROP_PROBABILITY: f32 = 0.05;
/// The maximum probability of dropping a peer connection when it receives an /// The maximum probability of dropping a peer connection when it receives an
/// [`Overloaded`](crate::PeerError::Overloaded) error. /// [`Overloaded`](crate::PeerError::Overloaded) error.
pub const MAX_OVERLOAD_DROP_PROBABILITY: f32 = 0.95; pub const MAX_OVERLOAD_DROP_PROBABILITY: f32 = 0.5;
/// The minimum interval between logging peer set status updates. /// The minimum interval between logging peer set status updates.
pub const MIN_PEER_SET_LOG_INTERVAL: Duration = Duration::from_secs(60); pub const MIN_PEER_SET_LOG_INTERVAL: Duration = Duration::from_secs(60);

View File

@ -16,7 +16,7 @@ use futures::{
}; };
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use tokio::time::{sleep, Sleep}; use tokio::time::{sleep, Sleep};
use tower::{load_shed::error::Overloaded, Service, ServiceExt}; use tower::{Service, ServiceExt};
use tracing_futures::Instrument; use tracing_futures::Instrument;
use zebra_chain::{ use zebra_chain::{
@ -1283,6 +1283,12 @@ where
// before sending the next inbound request. // before sending the next inbound request.
tokio::task::yield_now().await; tokio::task::yield_now().await;
// # Security
//
// Holding buffer slots for a long time can cause hangs:
// <https://docs.rs/tower/latest/tower/buffer/struct.Buffer.html#a-note-on-choosing-a-bound>
//
// The inbound service must be called immediately after a buffer slot is reserved.
if self.svc.ready().await.is_err() { if self.svc.ready().await.is_err() {
self.fail_with(PeerError::ServiceShutdown).await; self.fail_with(PeerError::ServiceShutdown).await;
return; return;
@ -1290,12 +1296,28 @@ where
let rsp = match self.svc.call(req.clone()).await { let rsp = match self.svc.call(req.clone()).await {
Err(e) => { Err(e) => {
if e.is::<Overloaded>() { if e.is::<tower::load_shed::error::Overloaded>() {
// # Security
//
// The peer request queue must have a limited length.
// The buffer and load shed layers are added in `start::start()`.
tracing::debug!("inbound service is overloaded, may close connection"); tracing::debug!("inbound service is overloaded, may close connection");
let now = Instant::now(); let now = Instant::now();
self.handle_inbound_overload(req, now).await; self.handle_inbound_overload(req, now, PeerError::Overloaded)
.await;
} else if e.is::<tower::timeout::error::Elapsed>() {
// # Security
//
// Peer requests must have a timeout.
// The timeout layer is added in `start::start()`.
tracing::info!(%req, "inbound service request timed out, may close connection");
let now = Instant::now();
self.handle_inbound_overload(req, now, PeerError::InboundTimeout)
.await;
} else { } else {
// We could send a reject to the remote peer, but that might cause // We could send a reject to the remote peer, but that might cause
// them to disconnect, and we might be using them to sync blocks. // them to disconnect, and we might be using them to sync blocks.
@ -1431,7 +1453,8 @@ where
tokio::task::yield_now().await; tokio::task::yield_now().await;
} }
/// Handle inbound service overload error responses by randomly terminating some connections. /// Handle inbound service overload and timeout error responses by randomly terminating some
/// connections.
/// ///
/// # Security /// # Security
/// ///
@ -1450,15 +1473,19 @@ where
/// The inbound connection rate-limit also makes it hard for multiple peers to perform this /// The inbound connection rate-limit also makes it hard for multiple peers to perform this
/// attack, because each inbound connection can only send one inbound request before its /// attack, because each inbound connection can only send one inbound request before its
/// probability of being disconnected increases. /// probability of being disconnected increases.
async fn handle_inbound_overload(&mut self, req: Request, now: Instant) { async fn handle_inbound_overload(&mut self, req: Request, now: Instant, error: PeerError) {
let prev = self.last_overload_time.replace(now); let prev = self.last_overload_time.replace(now);
let drop_connection_probability = overload_drop_connection_probability(now, prev); let drop_connection_probability = overload_drop_connection_probability(now, prev);
if thread_rng().gen::<f32>() < drop_connection_probability { if thread_rng().gen::<f32>() < drop_connection_probability {
metrics::counter!("pool.closed.loadshed", 1); if matches!(error, PeerError::Overloaded) {
metrics::counter!("pool.closed.loadshed", 1);
} else {
metrics::counter!("pool.closed.inbound.timeout", 1);
}
tracing::info!( tracing::info!(
drop_connection_probability, drop_connection_probability = format!("{drop_connection_probability:.3}"),
remote_user_agent = ?self.connection_info.remote.user_agent, remote_user_agent = ?self.connection_info.remote.user_agent,
negotiated_version = ?self.connection_info.negotiated_version, negotiated_version = ?self.connection_info.negotiated_version,
peer = ?self.metrics_label, peer = ?self.metrics_label,
@ -1467,14 +1494,19 @@ where
remote_height = ?self.connection_info.remote.start_height, remote_height = ?self.connection_info.remote.start_height,
cached_addrs = ?self.cached_addrs.len(), cached_addrs = ?self.cached_addrs.len(),
connection_state = ?self.state, connection_state = ?self.state,
"inbound service is overloaded, closing connection", "inbound service {error} error, closing connection",
); );
self.update_state_metrics(format!("In::Req::{}/Rsp::Overload::Error", req.command())); self.update_state_metrics(format!("In::Req::{}/Rsp::{error}::Error", req.command()));
self.fail_with(PeerError::Overloaded).await; self.fail_with(error).await;
} else { } else {
self.update_state_metrics(format!("In::Req::{}/Rsp::Overload::Ignored", req.command())); self.update_state_metrics(format!("In::Req::{}/Rsp::{error}::Ignored", req.command()));
metrics::counter!("pool.ignored.loadshed", 1);
if matches!(error, PeerError::Overloaded) {
metrics::counter!("pool.ignored.loadshed", 1);
} else {
metrics::counter!("pool.ignored.inbound.timeout", 1);
}
} }
} }
} }

View File

@ -687,11 +687,11 @@ fn overload_probability_reduces_over_time() {
let drop_probability = overload_drop_connection_probability(now, Some(prev)); let drop_probability = overload_drop_connection_probability(now, Some(prev));
assert!( assert!(
drop_probability <= MAX_OVERLOAD_DROP_PROBABILITY, drop_probability <= MAX_OVERLOAD_DROP_PROBABILITY,
"if the overloads are very close together, drops can optionally decrease", "if the overloads are very close together, drops can optionally decrease: {drop_probability} <= {MAX_OVERLOAD_DROP_PROBABILITY}",
); );
assert!( assert!(
MAX_OVERLOAD_DROP_PROBABILITY - drop_probability < 0.001, MAX_OVERLOAD_DROP_PROBABILITY - drop_probability < 0.001,
"if the overloads are very close together, drops can only decrease slightly", "if the overloads are very close together, drops can only decrease slightly: {drop_probability}",
); );
let last_probability = drop_probability; let last_probability = drop_probability;
@ -700,11 +700,11 @@ fn overload_probability_reduces_over_time() {
let drop_probability = overload_drop_connection_probability(now, Some(prev)); let drop_probability = overload_drop_connection_probability(now, Some(prev));
assert!( assert!(
drop_probability < last_probability, drop_probability < last_probability,
"if the overloads decrease, drops should decrease", "if the overloads decrease, drops should decrease: {drop_probability} < {last_probability}",
); );
assert!( assert!(
MAX_OVERLOAD_DROP_PROBABILITY - drop_probability < 0.001, MAX_OVERLOAD_DROP_PROBABILITY - drop_probability < 0.001,
"if the overloads are very close together, drops can only decrease slightly", "if the overloads are very close together, drops can only decrease slightly: {drop_probability}",
); );
let last_probability = drop_probability; let last_probability = drop_probability;
@ -713,11 +713,11 @@ fn overload_probability_reduces_over_time() {
let drop_probability = overload_drop_connection_probability(now, Some(prev)); let drop_probability = overload_drop_connection_probability(now, Some(prev));
assert!( assert!(
drop_probability < last_probability, drop_probability < last_probability,
"if the overloads decrease, drops should decrease", "if the overloads decrease, drops should decrease: {drop_probability} < {last_probability}",
); );
assert!( assert!(
MAX_OVERLOAD_DROP_PROBABILITY - drop_probability < 0.001, MAX_OVERLOAD_DROP_PROBABILITY - drop_probability < 0.001,
"if the overloads are very close together, drops can only decrease slightly", "if the overloads are very close together, drops can only decrease slightly: {drop_probability}",
); );
let last_probability = drop_probability; let last_probability = drop_probability;
@ -726,11 +726,11 @@ fn overload_probability_reduces_over_time() {
let drop_probability = overload_drop_connection_probability(now, Some(prev)); let drop_probability = overload_drop_connection_probability(now, Some(prev));
assert!( assert!(
drop_probability < last_probability, drop_probability < last_probability,
"if the overloads decrease, drops should decrease", "if the overloads decrease, drops should decrease: {drop_probability} < {last_probability}",
); );
assert!( assert!(
MAX_OVERLOAD_DROP_PROBABILITY - drop_probability < 0.01, MAX_OVERLOAD_DROP_PROBABILITY - drop_probability < 0.01,
"if the overloads are very close together, drops can only decrease slightly", "if the overloads are very close together, drops can only decrease slightly: {drop_probability}",
); );
let last_probability = drop_probability; let last_probability = drop_probability;
@ -739,11 +739,11 @@ fn overload_probability_reduces_over_time() {
let drop_probability = overload_drop_connection_probability(now, Some(prev)); let drop_probability = overload_drop_connection_probability(now, Some(prev));
assert!( assert!(
drop_probability < last_probability, drop_probability < last_probability,
"if the overloads decrease, drops should decrease", "if the overloads decrease, drops should decrease: {drop_probability} < {last_probability}",
); );
assert!( assert!(
MAX_OVERLOAD_DROP_PROBABILITY - drop_probability > 0.5, MAX_OVERLOAD_DROP_PROBABILITY - drop_probability > 0.4,
"if the overloads are distant, drops should decrease a lot", "if the overloads are distant, drops should decrease a lot: {drop_probability}",
); );
let last_probability = drop_probability; let last_probability = drop_probability;
@ -752,11 +752,11 @@ fn overload_probability_reduces_over_time() {
let drop_probability = overload_drop_connection_probability(now, Some(prev)); let drop_probability = overload_drop_connection_probability(now, Some(prev));
assert!( assert!(
drop_probability < last_probability, drop_probability < last_probability,
"if the overloads decrease, drops should decrease", "if the overloads decrease, drops should decrease: {drop_probability} < {last_probability}",
); );
assert!( assert_eq!(
MAX_OVERLOAD_DROP_PROBABILITY - drop_probability > 0.7, drop_probability, MIN_OVERLOAD_DROP_PROBABILITY,
"if the overloads are distant, drops should decrease a lot", "if overloads are far apart, drops should have minimum drop probability: {drop_probability}",
); );
let _last_probability = drop_probability; let _last_probability = drop_probability;
@ -765,14 +765,14 @@ fn overload_probability_reduces_over_time() {
let drop_probability = overload_drop_connection_probability(now, Some(prev)); let drop_probability = overload_drop_connection_probability(now, Some(prev));
assert_eq!( assert_eq!(
drop_probability, MIN_OVERLOAD_DROP_PROBABILITY, drop_probability, MIN_OVERLOAD_DROP_PROBABILITY,
"if overloads are far apart, drops should have minimum drop probability", "if overloads are far apart, drops should have minimum drop probability: {drop_probability}",
); );
// Base case: no previous overload // Base case: no previous overload
let drop_probability = overload_drop_connection_probability(now, None); let drop_probability = overload_drop_connection_probability(now, None);
assert_eq!( assert_eq!(
drop_probability, MIN_OVERLOAD_DROP_PROBABILITY, drop_probability, MIN_OVERLOAD_DROP_PROBABILITY,
"if there is no previous overload time, overloads should have minimum drop probability", "if there is no previous overload time, overloads should have minimum drop probability: {drop_probability}",
); );
} }

View File

@ -82,6 +82,11 @@ pub enum PeerError {
#[error("Internal services over capacity")] #[error("Internal services over capacity")]
Overloaded, Overloaded,
/// This peer request's caused an internal service timeout, so the connection was dropped
/// to shed load or prevent attacks.
#[error("Internal services timed out")]
InboundTimeout,
/// This node's internal services are no longer able to service requests. /// This node's internal services are no longer able to service requests.
#[error("Internal services have failed or shutdown")] #[error("Internal services have failed or shutdown")]
ServiceShutdown, ServiceShutdown,
@ -142,6 +147,7 @@ impl PeerError {
PeerError::Serialization(inner) => format!("Serialization({inner})").into(), PeerError::Serialization(inner) => format!("Serialization({inner})").into(),
PeerError::DuplicateHandshake => "DuplicateHandshake".into(), PeerError::DuplicateHandshake => "DuplicateHandshake".into(),
PeerError::Overloaded => "Overloaded".into(), PeerError::Overloaded => "Overloaded".into(),
PeerError::InboundTimeout => "InboundTimeout".into(),
PeerError::ServiceShutdown => "ServiceShutdown".into(), PeerError::ServiceShutdown => "ServiceShutdown".into(),
PeerError::NotFoundResponse(_) => "NotFoundResponse".into(), PeerError::NotFoundResponse(_) => "NotFoundResponse".into(),
PeerError::NotFoundRegistry(_) => "NotFoundRegistry".into(), PeerError::NotFoundRegistry(_) => "NotFoundRegistry".into(),

View File

@ -111,6 +111,7 @@ use futures::{
stream::FuturesUnordered, stream::FuturesUnordered,
}; };
use itertools::Itertools; use itertools::Itertools;
use num_integer::div_ceil;
use tokio::{ use tokio::{
sync::{broadcast, oneshot::error::TryRecvError, watch}, sync::{broadcast, oneshot::error::TryRecvError, watch},
task::JoinHandle, task::JoinHandle,
@ -808,9 +809,11 @@ where
/// Given a number of ready peers calculate to how many of them Zebra will /// Given a number of ready peers calculate to how many of them Zebra will
/// actually send the request to. Return this number. /// actually send the request to. Return this number.
pub(crate) fn number_of_peers_to_broadcast(&self) -> usize { pub(crate) fn number_of_peers_to_broadcast(&self) -> usize {
// We are currently sending broadcast messages to half of the total peers. // We are currently sending broadcast messages to a third of the total peers.
const PEER_FRACTION_TO_BROADCAST: usize = 3;
// Round up, so that if we have one ready peer, it gets the request. // Round up, so that if we have one ready peer, it gets the request.
(self.ready_services.len() + 1) / 2 div_ceil(self.ready_services.len(), PEER_FRACTION_TO_BROADCAST)
} }
/// Returns the list of addresses in the peer set. /// Returns the list of addresses in the peer set.

View File

@ -84,7 +84,7 @@ use zebra_rpc::server::RpcServer;
use crate::{ use crate::{
application::{app_version, user_agent}, application::{app_version, user_agent},
components::{ components::{
inbound::{self, InboundSetupData}, inbound::{self, InboundSetupData, MAX_INBOUND_RESPONSE_TIME},
mempool::{self, Mempool}, mempool::{self, Mempool},
sync::{self, show_block_chain_progress, VERIFICATION_PIPELINE_SCALING_MULTIPLIER}, sync::{self, show_block_chain_progress, VERIFICATION_PIPELINE_SCALING_MULTIPLIER},
tokio::{RuntimeRun, TokioComponent}, tokio::{RuntimeRun, TokioComponent},
@ -132,10 +132,18 @@ impl StartCmd {
// The service that our node uses to respond to requests by peers. The // The service that our node uses to respond to requests by peers. The
// load_shed middleware ensures that we reduce the size of the peer set // load_shed middleware ensures that we reduce the size of the peer set
// in response to excess load. // in response to excess load.
//
// # Security
//
// This layer stack is security-sensitive, modifying it can cause hangs,
// or enable denial of service attacks.
//
// See `zebra_network::Connection::drive_peer_request()` for details.
let (setup_tx, setup_rx) = oneshot::channel(); let (setup_tx, setup_rx) = oneshot::channel();
let inbound = ServiceBuilder::new() let inbound = ServiceBuilder::new()
.load_shed() .load_shed()
.buffer(inbound::downloads::MAX_INBOUND_CONCURRENCY) .buffer(inbound::downloads::MAX_INBOUND_CONCURRENCY)
.timeout(MAX_INBOUND_RESPONSE_TIME)
.service(Inbound::new( .service(Inbound::new(
config.sync.full_verify_concurrency_limit, config.sync.full_verify_concurrency_limit,
setup_rx, setup_rx,

View File

@ -11,6 +11,7 @@ use std::{
pin::Pin, pin::Pin,
sync::Arc, sync::Arc,
task::{Context, Poll}, task::{Context, Poll},
time::Duration,
}; };
use chrono::Utc; use chrono::Utc;
@ -18,6 +19,7 @@ use futures::{
future::{FutureExt, TryFutureExt}, future::{FutureExt, TryFutureExt},
stream::Stream, stream::Stream,
}; };
use num_integer::div_ceil;
use tokio::sync::oneshot::{self, error::TryRecvError}; use tokio::sync::oneshot::{self, error::TryRecvError};
use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service, ServiceExt}; use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service, ServiceExt};
@ -50,6 +52,12 @@ mod tests;
use downloads::Downloads as BlockDownloads; use downloads::Downloads as BlockDownloads;
/// The maximum amount of time an inbound service response can take.
///
/// If the response takes longer than this time, it will be cancelled,
/// and the peer might be disconnected.
pub const MAX_INBOUND_RESPONSE_TIME: Duration = Duration::from_secs(5);
/// The number of bytes the [`Inbound`] service will queue in response to a single block or /// The number of bytes the [`Inbound`] service will queue in response to a single block or
/// transaction request, before ignoring any additional block or transaction IDs in that request. /// transaction request, before ignoring any additional block or transaction IDs in that request.
/// ///
@ -374,10 +382,7 @@ impl Service<zn::Request> for Inbound {
let mut peers = peers.sanitized(now); let mut peers = peers.sanitized(now);
// Truncate the list // Truncate the list
// let address_limit = div_ceil(peers.len(), ADDR_RESPONSE_LIMIT_DENOMINATOR);
// TODO: replace with div_ceil once it stabilises
// https://github.com/rust-lang/rust/issues/88581
let address_limit = (peers.len() + ADDR_RESPONSE_LIMIT_DENOMINATOR - 1) / ADDR_RESPONSE_LIMIT_DENOMINATOR;
let address_limit = MAX_ADDRS_IN_MESSAGE.min(address_limit); let address_limit = MAX_ADDRS_IN_MESSAGE.min(address_limit);
peers.truncate(address_limit); peers.truncate(address_limit);

View File

@ -49,7 +49,7 @@ type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
/// Since Zebra keeps an `inv` index, inbound downloads for malicious blocks /// Since Zebra keeps an `inv` index, inbound downloads for malicious blocks
/// will be directed to the malicious node that originally gossiped the hash. /// will be directed to the malicious node that originally gossiped the hash.
/// Therefore, this attack can be carried out by a single malicious node. /// Therefore, this attack can be carried out by a single malicious node.
pub const MAX_INBOUND_CONCURRENCY: usize = 20; pub const MAX_INBOUND_CONCURRENCY: usize = 30;
/// The action taken in response to a peer's gossiped block hash. /// The action taken in response to a peer's gossiped block hash.
pub enum DownloadAction { pub enum DownloadAction {