Stop closing connections on unexpected messages, Credit: Equilibrium (#3120)

* Ignore unsupported messages from peers

* Ignore unknown message commands from peers

* Implement Display for Request, Response, Handler, connection::State

* Stop ignoring some completed `Response`s

* Stop returning NotFound errors, use the response instead

Co-authored-by: Alfredo Garcia <oxarbitrage@gmail.com>
This commit is contained in:
teor 2021-12-01 05:26:17 +10:00 committed by GitHub
parent e6ffe374d4
commit a358c410f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 239 additions and 93 deletions

View File

@ -7,7 +7,7 @@
//! And it's unclear if these assumptions match the `zcashd` implementation.
//! It should be refactored into a cleaner set of request/response pairs (#1515).
use std::{collections::HashSet, pin::Pin, sync::Arc};
use std::{collections::HashSet, fmt, pin::Pin, sync::Arc};
use futures::{
future::{self, Either},
@ -48,7 +48,7 @@ pub(super) enum Handler {
FindBlocks,
FindHeaders,
BlocksByHash {
hashes: HashSet<block::Hash>,
pending_hashes: HashSet<block::Hash>,
blocks: Vec<Arc<Block>>,
},
TransactionsById {
@ -58,6 +58,39 @@ pub(super) enum Handler {
MempoolTransactionIds,
}
impl fmt::Display for Handler {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str(&match self {
Handler::Finished(Ok(response)) => format!("Finished({})", response),
Handler::Finished(Err(error)) => format!("Finished({})", error),
Handler::Ping(_) => "Ping".to_string(),
Handler::Peers => "Peers".to_string(),
Handler::FindBlocks => "FindBlocks".to_string(),
Handler::FindHeaders => "FindHeaders".to_string(),
Handler::BlocksByHash {
pending_hashes,
blocks,
} => format!(
"BlocksByHash {{ pending_hashes: {}, blocks: {} }}",
pending_hashes.len(),
blocks.len()
),
Handler::TransactionsById {
pending_ids,
transactions,
} => format!(
"TransactionsById {{ pending_ids: {}, transactions: {} }}",
pending_ids.len(),
transactions.len()
),
Handler::MempoolTransactionIds => "MempoolTransactionIds".to_string(),
})
}
}
impl Handler {
/// Try to handle `msg` as a response to a client request, possibly consuming
/// it in the process.
@ -127,19 +160,20 @@ impl Handler {
// connection open, so the inbound service can process transactions from good
// peers (case 2).
ignored_msg = Some(Message::Tx(transaction));
if !transactions.is_empty() {
if !pending_ids.is_empty() {
// if our peers start sending mixed solicited and unsolicited transactions,
// we should update this code to handle those responses
error!("unexpected transaction from peer: transaction responses should be sent in a continuous batch, followed by notfound. Using partial received transactions as the peer response");
// TODO: does the caller need a list of missing transactions? (#1515)
Handler::Finished(Ok(Response::Transactions(transactions)))
} else {
// TODO: is it really an error if we ask for a transaction hash, but the peer
// doesn't know it? Should we close the connection on that kind of error?
// Should we fake a NotFound response here? (#1515)
let missing_transaction_ids = pending_ids.iter().map(Into::into).collect();
Handler::Finished(Err(PeerError::NotFound(missing_transaction_ids)))
info!(
"unexpected transaction from peer: \
transaction responses should be sent in a continuous sequence, \
followed by notfound. \
Using partial received transactions as the peer response"
);
}
// TODO: does the caller need a list of missing transactions? (#1515)
Handler::Finished(Ok(Response::Transactions(transactions)))
}
}
// `zcashd` peers actually return this response
@ -160,24 +194,27 @@ impl Handler {
// hashes from the handler. If we're not in sync with the peer, we should return
// what we got so far, and log an error.
let missing_transaction_ids: HashSet<_> = transaction_ids(&missing_invs).collect();
if missing_transaction_ids != pending_ids {
trace!(?missing_invs, ?missing_transaction_ids, ?pending_ids);
// if these errors are noisy, we should replace them with debugs
error!("unexpected notfound message from peer: all remaining transaction hashes should be listed in the notfound. Using partial received transactions as the peer response");
}
if missing_transaction_ids.len() != missing_invs.len() {
trace!(?missing_invs, ?missing_transaction_ids, ?pending_ids);
error!("unexpected notfound message from peer: notfound contains duplicate hashes or non-transaction hashes. Using partial received transactions as the peer response");
info!(
"unexpected notfound message from peer: \
all remaining transaction hashes should be listed in the notfound. \
Using partial received transactions as the peer response"
);
}
if !transactions.is_empty() {
// TODO: does the caller need a list of missing transactions? (#1515)
Handler::Finished(Ok(Response::Transactions(transactions)))
} else {
// TODO: is it really an error if we ask for a transaction hash, but the peer
// doesn't know it? Should we close the connection on that kind of error? (#1515)
Handler::Finished(Err(PeerError::NotFound(missing_invs)))
if missing_transaction_ids.len() != missing_invs.len() {
trace!(?missing_invs, ?missing_transaction_ids, ?pending_ids);
info!(
"unexpected notfound message from peer: \
notfound contains duplicate hashes or non-transaction hashes. \
Using partial received transactions as the peer response"
);
}
// TODO: does the caller need a list of missing transactions? (#1515)
Handler::Finished(Ok(Response::Transactions(transactions)))
}
// `zcashd` returns requested blocks in a single batch of messages.
// Other blocks or non-blocks messages can come before or after the batch.
@ -185,7 +222,7 @@ impl Handler {
// https://github.com/zcash/zcash/blob/e7b425298f6d9a54810cb7183f00be547e4d9415/src/main.cpp#L5523
(
Handler::BlocksByHash {
mut hashes,
mut pending_hashes,
mut blocks,
},
Message::Block(block),
@ -194,13 +231,16 @@ impl Handler {
// - the block messages are sent in a single continuous batch
// - missing blocks are silently skipped
// (there is no `NotFound` message at the end of the batch)
if hashes.remove(&block.hash()) {
if pending_hashes.remove(&block.hash()) {
// we are in the middle of the continuous block messages
blocks.push(block);
if hashes.is_empty() {
if pending_hashes.is_empty() {
Handler::Finished(Ok(Response::Blocks(blocks)))
} else {
Handler::BlocksByHash { hashes, blocks }
Handler::BlocksByHash {
pending_hashes,
blocks,
}
}
} else {
// We got a block we didn't ask for.
@ -216,20 +256,29 @@ impl Handler {
// But we keep the connection open, so the inbound service can process blocks
// from good peers (case 2).
ignored_msg = Some(Message::Block(block));
if !blocks.is_empty() {
// TODO: does the caller need a list of missing blocks? (#1515)
Handler::Finished(Ok(Response::Blocks(blocks)))
} else {
// TODO: is it really an error if we ask for a block hash, but the peer
// doesn't know it? Should we close the connection on that kind of error?
// Should we fake a NotFound response here? (#1515)
let items = hashes.iter().map(|h| InventoryHash::Block(*h)).collect();
Handler::Finished(Err(PeerError::NotFound(items)))
if !pending_hashes.is_empty() {
// if our peers start sending mixed solicited and unsolicited blocks,
// we should update this code to handle those responses
info!(
"unexpected block from peer: \
block responses should be sent in a continuous sequence. \
Using partial received blocks as the peer response"
);
}
// TODO: does the caller need a list of missing blocks? (#1515)
Handler::Finished(Ok(Response::Blocks(blocks)))
}
}
// peers are allowed to return this response, but `zcashd` never does
(Handler::BlocksByHash { hashes, blocks }, Message::NotFound(items)) => {
(
Handler::BlocksByHash {
pending_hashes,
blocks,
},
Message::NotFound(items),
) => {
// assumptions:
// - the peer eventually returns a block or a `NotFound` entry
// for each hash
@ -247,24 +296,37 @@ impl Handler {
})
.cloned()
.collect();
if missing_blocks != hashes {
trace!(?items, ?missing_blocks, ?hashes);
// if these errors are noisy, we should replace them with debugs
error!("unexpected notfound message from peer: all remaining block hashes should be listed in the notfound. Using partial received blocks as the peer response");
}
if missing_blocks.len() != items.len() {
trace!(?items, ?missing_blocks, ?hashes);
error!("unexpected notfound message from peer: notfound contains duplicate hashes or non-block hashes. Using partial received blocks as the peer response");
if missing_blocks != pending_hashes {
trace!(?items, ?missing_blocks, ?pending_hashes);
info!(
"unexpected notfound message from peer: \
all remaining block hashes should be listed in the notfound. \
Using partial received blocks as the peer response"
);
}
if !blocks.is_empty() {
// TODO: does the caller need a list of missing blocks? (#1515)
Handler::Finished(Ok(Response::Blocks(blocks)))
} else {
// TODO: is it really an error if we ask for a block hash, but the peer
// doesn't know it? Should we close the connection on that kind of error? (#1515)
Handler::Finished(Err(PeerError::NotFound(items)))
if missing_blocks.len() != items.len() {
trace!(?items, ?missing_blocks, ?pending_hashes);
info!(
"unexpected notfound message from peer: \
notfound contains duplicate hashes or non-block hashes. \
Using partial received blocks as the peer response"
);
}
if !pending_hashes.is_empty() {
// if our peers start sending mixed solicited and unsolicited blocks,
// we should update this code to handle those responses
info!(
"unexpected block from peer: \
block responses should be sent in a continuous sequence. \
Using partial received blocks as the peer response"
);
}
// TODO: does the caller need a list of missing blocks? (#1515)
Handler::Finished(Ok(Response::Blocks(blocks)))
}
(Handler::FindBlocks, Message::Inv(items))
if items
@ -312,6 +374,18 @@ pub(super) enum State {
Failed,
}
impl fmt::Display for State {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str(&match self {
State::AwaitingRequest => "AwaitingRequest".to_string(),
State::AwaitingResponse { handler, .. } => {
format!("AwaitingResponse({})", handler)
}
State::Failed => "Failed".to_string(),
})
}
}
/// The state associated with a peer connection.
pub struct Connection<S, Tx> {
/// The state of this connection's current request or response.
@ -457,7 +531,7 @@ where
// &mut self. This is a sign that we don't properly
// factor the state required for inbound and
// outbound requests.
let request_msg = match self.state {
let mut request_msg = match self.state {
State::AwaitingResponse {
ref mut handler, ..
} => span.in_scope(|| handler.process_message(peer_msg)),
@ -467,30 +541,44 @@ where
self.client_rx,
),
};
// Check whether the handler is finished
// processing messages and update the state.
self.state = match self.state {
State::AwaitingResponse {
handler: Handler::Finished(response),
tx,
..
} => {
let _ = tx.send(response.map_err(Into::into));
State::AwaitingRequest
}
pending @ State::AwaitingResponse { .. } => {
// Drop the un-consumed request message,
// because we can't process multiple messages at the same time.
info!(
new_request = %request_msg
.as_ref()
.map(|m| m.to_string())
.unwrap_or_else(|| "None".to_string()),
awaiting_response = %pending,
"ignoring new request while awaiting a response"
);
request_msg = None;
pending
},
_ => unreachable!(
"unexpected failed connection state while AwaitingResponse: client_receiver: {:?}",
self.client_rx
),
};
// If the message was not consumed, check whether it
// should be handled as a request.
if let Some(msg) = request_msg {
// do NOT instrument with the request span, this is
// independent work
self.handle_message_as_request(msg).await;
} else {
// Otherwise, check whether the handler is finished
// processing messages and update the state.
self.state = match self.state {
State::AwaitingResponse {
handler: Handler::Finished(response),
tx,
..
} => {
let _ = tx.send(response.map_err(Into::into));
State::AwaitingRequest
}
pending @ State::AwaitingResponse { .. } => pending,
_ => unreachable!(
"unexpected failed connection state while AwaitingResponse: client_receiver: {:?}",
self.client_rx
),
};
}
}
Either::Left((Either::Right(_), _peer_fut)) => {
@ -673,7 +761,7 @@ where
AwaitingResponse {
handler: Handler::BlocksByHash {
blocks: Vec::with_capacity(hashes.len()),
hashes,
pending_hashes: hashes,
},
tx,
span,
@ -858,9 +946,14 @@ where
Message::FilterLoad { .. }
| Message::FilterAdd { .. }
| Message::FilterClear { .. } => {
self.fail_with(PeerError::UnsupportedMessage(
"got BIP111 message without advertising NODE_BLOOM",
));
// # Security
//
// Zcash connections are not authenticated, so malicious nodes can send fake messages,
// with connected peers' IP addresses in the IP header.
//
// Since we can't verify their source, Zebra needs to ignore unexpected messages,
// because closing the connection could cause a denial of service or eclipse attack.
debug!("got BIP111 message without advertising NODE_BLOOM");
return;
}
// Zebra crawls the network proactively, to prevent

View File

@ -5,8 +5,6 @@ use thiserror::Error;
use tracing_error::TracedError;
use zebra_chain::serialization::SerializationError;
use crate::protocol::external::InventoryHash;
/// A wrapper around `Arc<PeerError>` that implements `Error`.
#[derive(Error, Debug, Clone)]
#[error(transparent)]
@ -46,17 +44,6 @@ pub enum PeerError {
/// to shed load.
#[error("Internal services over capacity")]
Overloaded,
// TODO: stop closing connections on these errors (#2107)
// log info or debug logs instead
//
/// A peer sent us a message we don't support.
#[error("Remote peer sent an unsupported message type: {0}")]
UnsupportedMessage(&'static str),
/// We requested data that the peer couldn't find.
#[error("Remote peer could not find items: {0:?}")]
NotFound(Vec<InventoryHash>),
}
/// A shared error slot for peer errors.

View File

@ -428,7 +428,19 @@ impl Decoder for Codec {
b"filterload\0\0" => self.read_filterload(&mut body_reader, body_len),
b"filteradd\0\0\0" => self.read_filteradd(&mut body_reader, body_len),
b"filterclear\0" => self.read_filterclear(&mut body_reader),
_ => return Err(Parse("unknown command")),
_ => {
let command_string = String::from_utf8_lossy(&command);
// # Security
//
// Zcash connections are not authenticated, so malicious nodes can
// send fake messages, with connected peers' IP addresses in the IP header.
//
// Since we can't verify their source, Zebra needs to ignore unexpected messages,
// because closing the connection could cause a denial of service or eclipse attack.
debug!(?command, %command_string, "unknown message command from peer");
return Ok(None);
}
}
// We need Ok(Some(msg)) to signal that we're done decoding.
// This is also convenient for tracing the parse result.

View File

@ -1,4 +1,4 @@
use std::collections::HashSet;
use std::{collections::HashSet, fmt};
use zebra_chain::{
block,
@ -181,3 +181,36 @@ pub enum Request {
/// Returns [`Response::TransactionIds`](super::Response::TransactionIds).
MempoolTransactionIds,
}
impl fmt::Display for Request {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str(&match self {
Request::Peers => "Peers".to_string(),
Request::Ping(_) => "Ping".to_string(),
Request::BlocksByHash(hashes) => {
format!("BlocksByHash {{ hashes: {} }}", hashes.len())
}
Request::TransactionsById(ids) => format!("TransactionsById {{ ids: {} }}", ids.len()),
Request::FindBlocks { known_blocks, stop } => format!(
"FindBlocks {{ known_blocks: {}, stop: {} }}",
known_blocks.len(),
if stop.is_some() { "Some" } else { "None" },
),
Request::FindHeaders { known_blocks, stop } => format!(
"FindHeaders {{ known_blocks: {}, stop: {} }}",
known_blocks.len(),
if stop.is_some() { "Some" } else { "None" },
),
Request::PushTransaction(_) => "PushTransaction".to_string(),
Request::AdvertiseTransactionIds(ids) => {
format!("AdvertiseTransactionIds {{ ids: {} }}", ids.len())
}
Request::AdvertiseBlock(_) => "AdvertiseBlock".to_string(),
Request::MempoolTransactionIds => "MempoolTransactionIds".to_string(),
})
}
}

View File

@ -5,7 +5,7 @@ use zebra_chain::{
use crate::meta_addr::MetaAddr;
use std::sync::Arc;
use std::{fmt, sync::Arc};
#[cfg(any(test, feature = "proptest-impl"))]
use proptest_derive::Arbitrary;
@ -46,3 +46,24 @@ pub enum Response {
/// v5 transactions use a witnessed transaction ID.
TransactionIds(Vec<UnminedTxId>),
}
impl fmt::Display for Response {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str(&match self {
Response::Nil => "Nil".to_string(),
Response::Peers(peers) => format!("Peers {{ peers: {} }}", peers.len()),
Response::Blocks(blocks) => format!("Blocks {{ blocks: {} }}", blocks.len()),
Response::BlockHashes(hashes) => format!("BlockHashes {{ hashes: {} }}", hashes.len()),
Response::BlockHeaders(headers) => {
format!("BlockHeaders {{ headers: {} }}", headers.len())
}
Response::Transactions(transactions) => {
format!("Transactions {{ transactions: {} }}", transactions.len())
}
Response::TransactionIds(ids) => format!("TransactionIds {{ ids: {} }}", ids.len()),
})
}
}