fix(security): Limit blocks and transactions sent in response to a single request (#6679)
* Limit mempool transactions sent in response to a single request * Limit block count and size, sent in response to a single request
This commit is contained in:
parent
81e8198ad8
commit
c9215c6637
|
|
@ -24,7 +24,11 @@ use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service, Service
|
||||||
use zebra_network as zn;
|
use zebra_network as zn;
|
||||||
use zebra_state as zs;
|
use zebra_state as zs;
|
||||||
|
|
||||||
use zebra_chain::{block, transaction::UnminedTxId};
|
use zebra_chain::{
|
||||||
|
block::{self, Block},
|
||||||
|
serialization::ZcashSerialize,
|
||||||
|
transaction::UnminedTxId,
|
||||||
|
};
|
||||||
use zebra_consensus::chain::VerifyChainError;
|
use zebra_consensus::chain::VerifyChainError;
|
||||||
use zebra_network::{
|
use zebra_network::{
|
||||||
constants::{ADDR_RESPONSE_LIMIT_DENOMINATOR, MAX_ADDRS_IN_MESSAGE},
|
constants::{ADDR_RESPONSE_LIMIT_DENOMINATOR, MAX_ADDRS_IN_MESSAGE},
|
||||||
|
|
@ -46,6 +50,25 @@ mod tests;
|
||||||
|
|
||||||
use downloads::Downloads as BlockDownloads;
|
use downloads::Downloads as BlockDownloads;
|
||||||
|
|
||||||
|
/// The number of bytes the [`Inbound`] service will queue in response to a single block or
|
||||||
|
/// transaction request, before ignoring any additional block or transaction IDs in that request.
|
||||||
|
///
|
||||||
|
/// This is the same as `zcashd`'s default send buffer limit:
|
||||||
|
/// <https://github.com/zcash/zcash/blob/829dd94f9d253bb705f9e194f13cb8ca8e545e1e/src/net.h#L84>
|
||||||
|
/// as used in `ProcessGetData()`:
|
||||||
|
/// <https://github.com/zcash/zcash/blob/829dd94f9d253bb705f9e194f13cb8ca8e545e1e/src/main.cpp#L6410-L6412>
|
||||||
|
pub const GETDATA_SENT_BYTES_LIMIT: usize = 1_000_000;
|
||||||
|
|
||||||
|
/// The maximum number of blocks the [`Inbound`] service will queue in response to a block request,
|
||||||
|
/// before ignoring any additional block IDs in that request.
|
||||||
|
///
|
||||||
|
/// This is the same as `zcashd`'s request limit:
|
||||||
|
/// <https://github.com/zcash/zcash/blob/829dd94f9d253bb705f9e194f13cb8ca8e545e1e/src/main.h#L108>
|
||||||
|
///
|
||||||
|
/// (Zebra's request limit is one block in transit per peer, because it fans out block requests to
|
||||||
|
/// many peers instead of just a few peers.)
|
||||||
|
pub const GETDATA_MAX_BLOCK_COUNT: usize = 16;
|
||||||
|
|
||||||
type BlockDownloadPeerSet =
|
type BlockDownloadPeerSet =
|
||||||
Buffer<BoxService<zn::Request, zn::Response, zn::BoxError>, zn::Request>;
|
Buffer<BoxService<zn::Request, zn::Response, zn::BoxError>, zn::Request>;
|
||||||
type State = Buffer<BoxService<zs::Request, zs::Response, zs::BoxError>, zs::Request>;
|
type State = Buffer<BoxService<zs::Request, zs::Response, zs::BoxError>, zs::Request>;
|
||||||
|
|
@ -369,53 +392,65 @@ impl Service<zn::Request> for Inbound {
|
||||||
}
|
}
|
||||||
zn::Request::BlocksByHash(hashes) => {
|
zn::Request::BlocksByHash(hashes) => {
|
||||||
// We return an available or missing response to each inventory request,
|
// We return an available or missing response to each inventory request,
|
||||||
// unless the request is empty.
|
// unless the request is empty, or it reaches a response limit.
|
||||||
if hashes.is_empty() {
|
if hashes.is_empty() {
|
||||||
return async { Ok(zn::Response::Nil) }.boxed();
|
return async { Ok(zn::Response::Nil) }.boxed();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Correctness:
|
let state = state.clone();
|
||||||
//
|
|
||||||
// We can't use `call_all` here, because it can hold one buffer slot per concurrent
|
|
||||||
// future, until the `CallAll` struct is dropped. We can't hold those slots in this
|
|
||||||
// future because:
|
|
||||||
// * we're not sure when the returned future will complete, and
|
|
||||||
// * we don't limit how many returned futures can be concurrently running
|
|
||||||
// https://github.com/tower-rs/tower/blob/master/tower/src/util/call_all/common.rs#L112
|
|
||||||
use futures::stream::TryStreamExt;
|
|
||||||
hashes
|
|
||||||
.iter()
|
|
||||||
.cloned()
|
|
||||||
.map(|hash| zs::Request::Block(hash.into()))
|
|
||||||
.map(|request| state.clone().oneshot(request))
|
|
||||||
.collect::<futures::stream::FuturesOrdered<_>>()
|
|
||||||
.try_filter_map(|response| async move {
|
|
||||||
Ok(match response {
|
|
||||||
zs::Response::Block(Some(block)) => Some(block),
|
|
||||||
zs::Response::Block(None) => None,
|
|
||||||
_ => unreachable!("wrong response from state"),
|
|
||||||
})
|
|
||||||
})
|
|
||||||
.try_collect::<Vec<_>>()
|
|
||||||
.map_ok(|blocks| {
|
|
||||||
// Work out which hashes were missing.
|
|
||||||
let available_hashes: HashSet<block::Hash> = blocks.iter().map(|block| block.hash()).collect();
|
|
||||||
let available = blocks.into_iter().map(Available);
|
|
||||||
let missing = hashes.into_iter().filter(|hash| !available_hashes.contains(hash)).map(Missing);
|
|
||||||
|
|
||||||
zn::Response::Blocks(available.chain(missing).collect())
|
async move {
|
||||||
})
|
let mut blocks: Vec<InventoryResponse<Arc<Block>, block::Hash>> = Vec::new();
|
||||||
.boxed()
|
let mut total_size = 0;
|
||||||
|
|
||||||
|
// Ignore any block hashes past the response limit.
|
||||||
|
// This saves us expensive database lookups.
|
||||||
|
for &hash in hashes.iter().take(GETDATA_MAX_BLOCK_COUNT) {
|
||||||
|
// We check the limit after including at least one block, so that we can
|
||||||
|
// send blocks greater than 1 MB (but only one at a time)
|
||||||
|
if total_size >= GETDATA_SENT_BYTES_LIMIT {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
let response = state.clone().ready().await?.call(zs::Request::Block(hash.into())).await?;
|
||||||
|
|
||||||
|
// Add the block responses to the list, while updating the size limit.
|
||||||
|
//
|
||||||
|
// If there was a database error, return the error,
|
||||||
|
// and stop processing further chunks.
|
||||||
|
match response {
|
||||||
|
zs::Response::Block(Some(block)) => {
|
||||||
|
// If checking the serialized size of the block performs badly,
|
||||||
|
// return the size from the state using a wrapper type.
|
||||||
|
total_size += block.zcash_serialized_size();
|
||||||
|
|
||||||
|
blocks.push(Available(block))
|
||||||
|
},
|
||||||
|
// We don't need to limit the size of the missing block IDs list,
|
||||||
|
// because it is already limited to the size of the getdata request
|
||||||
|
// sent by the peer. (Their content and encodings are the same.)
|
||||||
|
zs::Response::Block(None) => blocks.push(Missing(hash)),
|
||||||
|
_ => unreachable!("wrong response from state"),
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// The network layer handles splitting this response into multiple `block`
|
||||||
|
// messages, and a `notfound` message if needed.
|
||||||
|
Ok(zn::Response::Blocks(blocks))
|
||||||
|
}.boxed()
|
||||||
}
|
}
|
||||||
zn::Request::TransactionsById(req_tx_ids) => {
|
zn::Request::TransactionsById(req_tx_ids) => {
|
||||||
// We return an available or missing response to each inventory request,
|
// We return an available or missing response to each inventory request,
|
||||||
// unless the request is empty.
|
// unless the request is empty, or it reaches a response limit.
|
||||||
if req_tx_ids.is_empty() {
|
if req_tx_ids.is_empty() {
|
||||||
return async { Ok(zn::Response::Nil) }.boxed();
|
return async { Ok(zn::Response::Nil) }.boxed();
|
||||||
}
|
}
|
||||||
|
|
||||||
let request = mempool::Request::TransactionsById(req_tx_ids.clone());
|
let request = mempool::Request::TransactionsById(req_tx_ids.clone());
|
||||||
mempool.clone().oneshot(request).map_ok(move |resp| {
|
mempool.clone().oneshot(request).map_ok(move |resp| {
|
||||||
|
let mut total_size = 0;
|
||||||
|
|
||||||
let transactions = match resp {
|
let transactions = match resp {
|
||||||
mempool::Response::Transactions(transactions) => transactions,
|
mempool::Response::Transactions(transactions) => transactions,
|
||||||
_ => unreachable!("Mempool component should always respond to a `TransactionsById` request with a `Transactions` response"),
|
_ => unreachable!("Mempool component should always respond to a `TransactionsById` request with a `Transactions` response"),
|
||||||
|
|
@ -423,12 +458,31 @@ impl Service<zn::Request> for Inbound {
|
||||||
|
|
||||||
// Work out which transaction IDs were missing.
|
// Work out which transaction IDs were missing.
|
||||||
let available_tx_ids: HashSet<UnminedTxId> = transactions.iter().map(|tx| tx.id).collect();
|
let available_tx_ids: HashSet<UnminedTxId> = transactions.iter().map(|tx| tx.id).collect();
|
||||||
let available = transactions.into_iter().map(Available);
|
// We don't need to limit the size of the missing transaction IDs list,
|
||||||
|
// because it is already limited to the size of the getdata request
|
||||||
|
// sent by the peer. (Their content and encodings are the same.)
|
||||||
let missing = req_tx_ids.into_iter().filter(|tx_id| !available_tx_ids.contains(tx_id)).map(Missing);
|
let missing = req_tx_ids.into_iter().filter(|tx_id| !available_tx_ids.contains(tx_id)).map(Missing);
|
||||||
|
|
||||||
|
// If we skip sending some transactions because the limit has been reached,
|
||||||
|
// they aren't reported as missing. This matches `zcashd`'s behaviour:
|
||||||
|
// <https://github.com/zcash/zcash/blob/829dd94f9d253bb705f9e194f13cb8ca8e545e1e/src/main.cpp#L6410-L6412>
|
||||||
|
let available = transactions.into_iter().take_while(|tx| {
|
||||||
|
// We check the limit after including the transaction,
|
||||||
|
// so that we can send transactions greater than 1 MB
|
||||||
|
// (but only one at a time)
|
||||||
|
let within_limit = total_size < GETDATA_SENT_BYTES_LIMIT;
|
||||||
|
|
||||||
|
total_size += tx.size;
|
||||||
|
|
||||||
|
within_limit
|
||||||
|
}).map(Available);
|
||||||
|
|
||||||
|
// The network layer handles splitting this response into multiple `tx`
|
||||||
|
// messages, and a `notfound` message if needed.
|
||||||
zn::Response::Transactions(available.chain(missing).collect())
|
zn::Response::Transactions(available.chain(missing).collect())
|
||||||
}).boxed()
|
}).boxed()
|
||||||
}
|
}
|
||||||
|
// Find* responses are already size-limited by the state.
|
||||||
zn::Request::FindBlocks { known_blocks, stop } => {
|
zn::Request::FindBlocks { known_blocks, stop } => {
|
||||||
let request = zs::Request::FindBlockHashes { known_blocks, stop };
|
let request = zs::Request::FindBlockHashes { known_blocks, stop };
|
||||||
state.clone().oneshot(request).map_ok(|resp| match resp {
|
state.clone().oneshot(request).map_ok(|resp| match resp {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue