From c9215c66377150674c86e7113344b90cdba48c5e Mon Sep 17 00:00:00 2001 From: teor Date: Wed, 17 May 2023 09:54:45 +1000 Subject: [PATCH] fix(security): Limit blocks and transactions sent in response to a single request (#6679) * Limit mempool transactions sent in response to a single request * Limit block count and size, sent in response to a single request --- zebrad/src/components/inbound.rs | 124 ++++++++++++++++++++++--------- 1 file changed, 89 insertions(+), 35 deletions(-) diff --git a/zebrad/src/components/inbound.rs b/zebrad/src/components/inbound.rs index 24a5050c..f3029ec5 100644 --- a/zebrad/src/components/inbound.rs +++ b/zebrad/src/components/inbound.rs @@ -24,7 +24,11 @@ use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service, Service use zebra_network as zn; use zebra_state as zs; -use zebra_chain::{block, transaction::UnminedTxId}; +use zebra_chain::{ + block::{self, Block}, + serialization::ZcashSerialize, + transaction::UnminedTxId, +}; use zebra_consensus::chain::VerifyChainError; use zebra_network::{ constants::{ADDR_RESPONSE_LIMIT_DENOMINATOR, MAX_ADDRS_IN_MESSAGE}, @@ -46,6 +50,25 @@ mod tests; use downloads::Downloads as BlockDownloads; +/// The number of bytes the [`Inbound`] service will queue in response to a single block or +/// transaction request, before ignoring any additional block or transaction IDs in that request. +/// +/// This is the same as `zcashd`'s default send buffer limit: +/// +/// as used in `ProcessGetData()`: +/// +pub const GETDATA_SENT_BYTES_LIMIT: usize = 1_000_000; + +/// The maximum number of blocks the [`Inbound`] service will queue in response to a block request, +/// before ignoring any additional block IDs in that request. +/// +/// This is the same as `zcashd`'s request limit: +/// +/// +/// (Zebra's request limit is one block in transit per peer, because it fans out block requests to +/// many peers instead of just a few peers.) +pub const GETDATA_MAX_BLOCK_COUNT: usize = 16; + type BlockDownloadPeerSet = Buffer, zn::Request>; type State = Buffer, zs::Request>; @@ -369,53 +392,65 @@ impl Service for Inbound { } zn::Request::BlocksByHash(hashes) => { // We return an available or missing response to each inventory request, - // unless the request is empty. + // unless the request is empty, or it reaches a response limit. if hashes.is_empty() { return async { Ok(zn::Response::Nil) }.boxed(); } - // Correctness: - // - // We can't use `call_all` here, because it can hold one buffer slot per concurrent - // future, until the `CallAll` struct is dropped. We can't hold those slots in this - // future because: - // * we're not sure when the returned future will complete, and - // * we don't limit how many returned futures can be concurrently running - // https://github.com/tower-rs/tower/blob/master/tower/src/util/call_all/common.rs#L112 - use futures::stream::TryStreamExt; - hashes - .iter() - .cloned() - .map(|hash| zs::Request::Block(hash.into())) - .map(|request| state.clone().oneshot(request)) - .collect::>() - .try_filter_map(|response| async move { - Ok(match response { - zs::Response::Block(Some(block)) => Some(block), - zs::Response::Block(None) => None, - _ => unreachable!("wrong response from state"), - }) - }) - .try_collect::>() - .map_ok(|blocks| { - // Work out which hashes were missing. - let available_hashes: HashSet = blocks.iter().map(|block| block.hash()).collect(); - let available = blocks.into_iter().map(Available); - let missing = hashes.into_iter().filter(|hash| !available_hashes.contains(hash)).map(Missing); + let state = state.clone(); - zn::Response::Blocks(available.chain(missing).collect()) - }) - .boxed() + async move { + let mut blocks: Vec, block::Hash>> = Vec::new(); + let mut total_size = 0; + + // Ignore any block hashes past the response limit. + // This saves us expensive database lookups. + for &hash in hashes.iter().take(GETDATA_MAX_BLOCK_COUNT) { + // We check the limit after including at least one block, so that we can + // send blocks greater than 1 MB (but only one at a time) + if total_size >= GETDATA_SENT_BYTES_LIMIT { + break; + } + + let response = state.clone().ready().await?.call(zs::Request::Block(hash.into())).await?; + + // Add the block responses to the list, while updating the size limit. + // + // If there was a database error, return the error, + // and stop processing further chunks. + match response { + zs::Response::Block(Some(block)) => { + // If checking the serialized size of the block performs badly, + // return the size from the state using a wrapper type. + total_size += block.zcash_serialized_size(); + + blocks.push(Available(block)) + }, + // We don't need to limit the size of the missing block IDs list, + // because it is already limited to the size of the getdata request + // sent by the peer. (Their content and encodings are the same.) + zs::Response::Block(None) => blocks.push(Missing(hash)), + _ => unreachable!("wrong response from state"), + } + + } + + // The network layer handles splitting this response into multiple `block` + // messages, and a `notfound` message if needed. + Ok(zn::Response::Blocks(blocks)) + }.boxed() } zn::Request::TransactionsById(req_tx_ids) => { // We return an available or missing response to each inventory request, - // unless the request is empty. + // unless the request is empty, or it reaches a response limit. if req_tx_ids.is_empty() { return async { Ok(zn::Response::Nil) }.boxed(); } let request = mempool::Request::TransactionsById(req_tx_ids.clone()); mempool.clone().oneshot(request).map_ok(move |resp| { + let mut total_size = 0; + let transactions = match resp { mempool::Response::Transactions(transactions) => transactions, _ => unreachable!("Mempool component should always respond to a `TransactionsById` request with a `Transactions` response"), @@ -423,12 +458,31 @@ impl Service for Inbound { // Work out which transaction IDs were missing. let available_tx_ids: HashSet = transactions.iter().map(|tx| tx.id).collect(); - let available = transactions.into_iter().map(Available); + // We don't need to limit the size of the missing transaction IDs list, + // because it is already limited to the size of the getdata request + // sent by the peer. (Their content and encodings are the same.) let missing = req_tx_ids.into_iter().filter(|tx_id| !available_tx_ids.contains(tx_id)).map(Missing); + // If we skip sending some transactions because the limit has been reached, + // they aren't reported as missing. This matches `zcashd`'s behaviour: + // + let available = transactions.into_iter().take_while(|tx| { + // We check the limit after including the transaction, + // so that we can send transactions greater than 1 MB + // (but only one at a time) + let within_limit = total_size < GETDATA_SENT_BYTES_LIMIT; + + total_size += tx.size; + + within_limit + }).map(Available); + + // The network layer handles splitting this response into multiple `tx` + // messages, and a `notfound` message if needed. zn::Response::Transactions(available.chain(missing).collect()) }).boxed() } + // Find* responses are already size-limited by the state. zn::Request::FindBlocks { known_blocks, stop } => { let request = zs::Request::FindBlockHashes { known_blocks, stop }; state.clone().oneshot(request).map_ok(|resp| match resp {