diff --git a/zebra-network/src/peer/connection.rs b/zebra-network/src/peer/connection.rs index da037930..bc55a8a9 100644 --- a/zebra-network/src/peer/connection.rs +++ b/zebra-network/src/peer/connection.rs @@ -7,7 +7,7 @@ //! And it's unclear if these assumptions match the `zcashd` implementation. //! It should be refactored into a cleaner set of request/response pairs (#1515). -use std::{collections::HashSet, pin::Pin, sync::Arc}; +use std::{collections::HashSet, fmt, pin::Pin, sync::Arc}; use futures::{ future::{self, Either}, @@ -48,7 +48,7 @@ pub(super) enum Handler { FindBlocks, FindHeaders, BlocksByHash { - hashes: HashSet, + pending_hashes: HashSet, blocks: Vec>, }, TransactionsById { @@ -58,6 +58,39 @@ pub(super) enum Handler { MempoolTransactionIds, } +impl fmt::Display for Handler { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str(&match self { + Handler::Finished(Ok(response)) => format!("Finished({})", response), + Handler::Finished(Err(error)) => format!("Finished({})", error), + + Handler::Ping(_) => "Ping".to_string(), + Handler::Peers => "Peers".to_string(), + + Handler::FindBlocks => "FindBlocks".to_string(), + Handler::FindHeaders => "FindHeaders".to_string(), + Handler::BlocksByHash { + pending_hashes, + blocks, + } => format!( + "BlocksByHash {{ pending_hashes: {}, blocks: {} }}", + pending_hashes.len(), + blocks.len() + ), + + Handler::TransactionsById { + pending_ids, + transactions, + } => format!( + "TransactionsById {{ pending_ids: {}, transactions: {} }}", + pending_ids.len(), + transactions.len() + ), + Handler::MempoolTransactionIds => "MempoolTransactionIds".to_string(), + }) + } +} + impl Handler { /// Try to handle `msg` as a response to a client request, possibly consuming /// it in the process. @@ -127,19 +160,20 @@ impl Handler { // connection open, so the inbound service can process transactions from good // peers (case 2). ignored_msg = Some(Message::Tx(transaction)); - if !transactions.is_empty() { + + if !pending_ids.is_empty() { // if our peers start sending mixed solicited and unsolicited transactions, // we should update this code to handle those responses - error!("unexpected transaction from peer: transaction responses should be sent in a continuous batch, followed by notfound. Using partial received transactions as the peer response"); - // TODO: does the caller need a list of missing transactions? (#1515) - Handler::Finished(Ok(Response::Transactions(transactions))) - } else { - // TODO: is it really an error if we ask for a transaction hash, but the peer - // doesn't know it? Should we close the connection on that kind of error? - // Should we fake a NotFound response here? (#1515) - let missing_transaction_ids = pending_ids.iter().map(Into::into).collect(); - Handler::Finished(Err(PeerError::NotFound(missing_transaction_ids))) + info!( + "unexpected transaction from peer: \ + transaction responses should be sent in a continuous sequence, \ + followed by notfound. \ + Using partial received transactions as the peer response" + ); } + + // TODO: does the caller need a list of missing transactions? (#1515) + Handler::Finished(Ok(Response::Transactions(transactions))) } } // `zcashd` peers actually return this response @@ -160,24 +194,27 @@ impl Handler { // hashes from the handler. If we're not in sync with the peer, we should return // what we got so far, and log an error. let missing_transaction_ids: HashSet<_> = transaction_ids(&missing_invs).collect(); + if missing_transaction_ids != pending_ids { trace!(?missing_invs, ?missing_transaction_ids, ?pending_ids); - // if these errors are noisy, we should replace them with debugs - error!("unexpected notfound message from peer: all remaining transaction hashes should be listed in the notfound. Using partial received transactions as the peer response"); - } - if missing_transaction_ids.len() != missing_invs.len() { - trace!(?missing_invs, ?missing_transaction_ids, ?pending_ids); - error!("unexpected notfound message from peer: notfound contains duplicate hashes or non-transaction hashes. Using partial received transactions as the peer response"); + info!( + "unexpected notfound message from peer: \ + all remaining transaction hashes should be listed in the notfound. \ + Using partial received transactions as the peer response" + ); } - if !transactions.is_empty() { - // TODO: does the caller need a list of missing transactions? (#1515) - Handler::Finished(Ok(Response::Transactions(transactions))) - } else { - // TODO: is it really an error if we ask for a transaction hash, but the peer - // doesn't know it? Should we close the connection on that kind of error? (#1515) - Handler::Finished(Err(PeerError::NotFound(missing_invs))) + if missing_transaction_ids.len() != missing_invs.len() { + trace!(?missing_invs, ?missing_transaction_ids, ?pending_ids); + info!( + "unexpected notfound message from peer: \ + notfound contains duplicate hashes or non-transaction hashes. \ + Using partial received transactions as the peer response" + ); } + + // TODO: does the caller need a list of missing transactions? (#1515) + Handler::Finished(Ok(Response::Transactions(transactions))) } // `zcashd` returns requested blocks in a single batch of messages. // Other blocks or non-blocks messages can come before or after the batch. @@ -185,7 +222,7 @@ impl Handler { // https://github.com/zcash/zcash/blob/e7b425298f6d9a54810cb7183f00be547e4d9415/src/main.cpp#L5523 ( Handler::BlocksByHash { - mut hashes, + mut pending_hashes, mut blocks, }, Message::Block(block), @@ -194,13 +231,16 @@ impl Handler { // - the block messages are sent in a single continuous batch // - missing blocks are silently skipped // (there is no `NotFound` message at the end of the batch) - if hashes.remove(&block.hash()) { + if pending_hashes.remove(&block.hash()) { // we are in the middle of the continuous block messages blocks.push(block); - if hashes.is_empty() { + if pending_hashes.is_empty() { Handler::Finished(Ok(Response::Blocks(blocks))) } else { - Handler::BlocksByHash { hashes, blocks } + Handler::BlocksByHash { + pending_hashes, + blocks, + } } } else { // We got a block we didn't ask for. @@ -216,20 +256,29 @@ impl Handler { // But we keep the connection open, so the inbound service can process blocks // from good peers (case 2). ignored_msg = Some(Message::Block(block)); - if !blocks.is_empty() { - // TODO: does the caller need a list of missing blocks? (#1515) - Handler::Finished(Ok(Response::Blocks(blocks))) - } else { - // TODO: is it really an error if we ask for a block hash, but the peer - // doesn't know it? Should we close the connection on that kind of error? - // Should we fake a NotFound response here? (#1515) - let items = hashes.iter().map(|h| InventoryHash::Block(*h)).collect(); - Handler::Finished(Err(PeerError::NotFound(items))) + + if !pending_hashes.is_empty() { + // if our peers start sending mixed solicited and unsolicited blocks, + // we should update this code to handle those responses + info!( + "unexpected block from peer: \ + block responses should be sent in a continuous sequence. \ + Using partial received blocks as the peer response" + ); } + + // TODO: does the caller need a list of missing blocks? (#1515) + Handler::Finished(Ok(Response::Blocks(blocks))) } } // peers are allowed to return this response, but `zcashd` never does - (Handler::BlocksByHash { hashes, blocks }, Message::NotFound(items)) => { + ( + Handler::BlocksByHash { + pending_hashes, + blocks, + }, + Message::NotFound(items), + ) => { // assumptions: // - the peer eventually returns a block or a `NotFound` entry // for each hash @@ -247,24 +296,37 @@ impl Handler { }) .cloned() .collect(); - if missing_blocks != hashes { - trace!(?items, ?missing_blocks, ?hashes); - // if these errors are noisy, we should replace them with debugs - error!("unexpected notfound message from peer: all remaining block hashes should be listed in the notfound. Using partial received blocks as the peer response"); - } - if missing_blocks.len() != items.len() { - trace!(?items, ?missing_blocks, ?hashes); - error!("unexpected notfound message from peer: notfound contains duplicate hashes or non-block hashes. Using partial received blocks as the peer response"); + + if missing_blocks != pending_hashes { + trace!(?items, ?missing_blocks, ?pending_hashes); + info!( + "unexpected notfound message from peer: \ + all remaining block hashes should be listed in the notfound. \ + Using partial received blocks as the peer response" + ); } - if !blocks.is_empty() { - // TODO: does the caller need a list of missing blocks? (#1515) - Handler::Finished(Ok(Response::Blocks(blocks))) - } else { - // TODO: is it really an error if we ask for a block hash, but the peer - // doesn't know it? Should we close the connection on that kind of error? (#1515) - Handler::Finished(Err(PeerError::NotFound(items))) + if missing_blocks.len() != items.len() { + trace!(?items, ?missing_blocks, ?pending_hashes); + info!( + "unexpected notfound message from peer: \ + notfound contains duplicate hashes or non-block hashes. \ + Using partial received blocks as the peer response" + ); } + + if !pending_hashes.is_empty() { + // if our peers start sending mixed solicited and unsolicited blocks, + // we should update this code to handle those responses + info!( + "unexpected block from peer: \ + block responses should be sent in a continuous sequence. \ + Using partial received blocks as the peer response" + ); + } + + // TODO: does the caller need a list of missing blocks? (#1515) + Handler::Finished(Ok(Response::Blocks(blocks))) } (Handler::FindBlocks, Message::Inv(items)) if items @@ -312,6 +374,18 @@ pub(super) enum State { Failed, } +impl fmt::Display for State { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str(&match self { + State::AwaitingRequest => "AwaitingRequest".to_string(), + State::AwaitingResponse { handler, .. } => { + format!("AwaitingResponse({})", handler) + } + State::Failed => "Failed".to_string(), + }) + } +} + /// The state associated with a peer connection. pub struct Connection { /// The state of this connection's current request or response. @@ -457,7 +531,7 @@ where // &mut self. This is a sign that we don't properly // factor the state required for inbound and // outbound requests. - let request_msg = match self.state { + let mut request_msg = match self.state { State::AwaitingResponse { ref mut handler, .. } => span.in_scope(|| handler.process_message(peer_msg)), @@ -467,30 +541,44 @@ where self.client_rx, ), }; + + // Check whether the handler is finished + // processing messages and update the state. + self.state = match self.state { + State::AwaitingResponse { + handler: Handler::Finished(response), + tx, + .. + } => { + let _ = tx.send(response.map_err(Into::into)); + State::AwaitingRequest + } + pending @ State::AwaitingResponse { .. } => { + // Drop the un-consumed request message, + // because we can't process multiple messages at the same time. + info!( + new_request = %request_msg + .as_ref() + .map(|m| m.to_string()) + .unwrap_or_else(|| "None".to_string()), + awaiting_response = %pending, + "ignoring new request while awaiting a response" + ); + request_msg = None; + pending + }, + _ => unreachable!( + "unexpected failed connection state while AwaitingResponse: client_receiver: {:?}", + self.client_rx + ), + }; + // If the message was not consumed, check whether it // should be handled as a request. if let Some(msg) = request_msg { // do NOT instrument with the request span, this is // independent work self.handle_message_as_request(msg).await; - } else { - // Otherwise, check whether the handler is finished - // processing messages and update the state. - self.state = match self.state { - State::AwaitingResponse { - handler: Handler::Finished(response), - tx, - .. - } => { - let _ = tx.send(response.map_err(Into::into)); - State::AwaitingRequest - } - pending @ State::AwaitingResponse { .. } => pending, - _ => unreachable!( - "unexpected failed connection state while AwaitingResponse: client_receiver: {:?}", - self.client_rx - ), - }; } } Either::Left((Either::Right(_), _peer_fut)) => { @@ -673,7 +761,7 @@ where AwaitingResponse { handler: Handler::BlocksByHash { blocks: Vec::with_capacity(hashes.len()), - hashes, + pending_hashes: hashes, }, tx, span, @@ -858,9 +946,14 @@ where Message::FilterLoad { .. } | Message::FilterAdd { .. } | Message::FilterClear { .. } => { - self.fail_with(PeerError::UnsupportedMessage( - "got BIP111 message without advertising NODE_BLOOM", - )); + // # Security + // + // Zcash connections are not authenticated, so malicious nodes can send fake messages, + // with connected peers' IP addresses in the IP header. + // + // Since we can't verify their source, Zebra needs to ignore unexpected messages, + // because closing the connection could cause a denial of service or eclipse attack. + debug!("got BIP111 message without advertising NODE_BLOOM"); return; } // Zebra crawls the network proactively, to prevent diff --git a/zebra-network/src/peer/error.rs b/zebra-network/src/peer/error.rs index 24e638ff..2bf1d3a7 100644 --- a/zebra-network/src/peer/error.rs +++ b/zebra-network/src/peer/error.rs @@ -5,8 +5,6 @@ use thiserror::Error; use tracing_error::TracedError; use zebra_chain::serialization::SerializationError; -use crate::protocol::external::InventoryHash; - /// A wrapper around `Arc` that implements `Error`. #[derive(Error, Debug, Clone)] #[error(transparent)] @@ -46,17 +44,6 @@ pub enum PeerError { /// to shed load. #[error("Internal services over capacity")] Overloaded, - - // TODO: stop closing connections on these errors (#2107) - // log info or debug logs instead - // - /// A peer sent us a message we don't support. - #[error("Remote peer sent an unsupported message type: {0}")] - UnsupportedMessage(&'static str), - - /// We requested data that the peer couldn't find. - #[error("Remote peer could not find items: {0:?}")] - NotFound(Vec), } /// A shared error slot for peer errors. diff --git a/zebra-network/src/protocol/external/codec.rs b/zebra-network/src/protocol/external/codec.rs index cea85ec5..af6438ee 100644 --- a/zebra-network/src/protocol/external/codec.rs +++ b/zebra-network/src/protocol/external/codec.rs @@ -428,7 +428,19 @@ impl Decoder for Codec { b"filterload\0\0" => self.read_filterload(&mut body_reader, body_len), b"filteradd\0\0\0" => self.read_filteradd(&mut body_reader, body_len), b"filterclear\0" => self.read_filterclear(&mut body_reader), - _ => return Err(Parse("unknown command")), + _ => { + let command_string = String::from_utf8_lossy(&command); + + // # Security + // + // Zcash connections are not authenticated, so malicious nodes can + // send fake messages, with connected peers' IP addresses in the IP header. + // + // Since we can't verify their source, Zebra needs to ignore unexpected messages, + // because closing the connection could cause a denial of service or eclipse attack. + debug!(?command, %command_string, "unknown message command from peer"); + return Ok(None); + } } // We need Ok(Some(msg)) to signal that we're done decoding. // This is also convenient for tracing the parse result. diff --git a/zebra-network/src/protocol/internal/request.rs b/zebra-network/src/protocol/internal/request.rs index 84d39afd..b0330ee1 100644 --- a/zebra-network/src/protocol/internal/request.rs +++ b/zebra-network/src/protocol/internal/request.rs @@ -1,4 +1,4 @@ -use std::collections::HashSet; +use std::{collections::HashSet, fmt}; use zebra_chain::{ block, @@ -181,3 +181,36 @@ pub enum Request { /// Returns [`Response::TransactionIds`](super::Response::TransactionIds). MempoolTransactionIds, } + +impl fmt::Display for Request { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str(&match self { + Request::Peers => "Peers".to_string(), + Request::Ping(_) => "Ping".to_string(), + + Request::BlocksByHash(hashes) => { + format!("BlocksByHash {{ hashes: {} }}", hashes.len()) + } + Request::TransactionsById(ids) => format!("TransactionsById {{ ids: {} }}", ids.len()), + + Request::FindBlocks { known_blocks, stop } => format!( + "FindBlocks {{ known_blocks: {}, stop: {} }}", + known_blocks.len(), + if stop.is_some() { "Some" } else { "None" }, + ), + Request::FindHeaders { known_blocks, stop } => format!( + "FindHeaders {{ known_blocks: {}, stop: {} }}", + known_blocks.len(), + if stop.is_some() { "Some" } else { "None" }, + ), + + Request::PushTransaction(_) => "PushTransaction".to_string(), + Request::AdvertiseTransactionIds(ids) => { + format!("AdvertiseTransactionIds {{ ids: {} }}", ids.len()) + } + + Request::AdvertiseBlock(_) => "AdvertiseBlock".to_string(), + Request::MempoolTransactionIds => "MempoolTransactionIds".to_string(), + }) + } +} diff --git a/zebra-network/src/protocol/internal/response.rs b/zebra-network/src/protocol/internal/response.rs index 048fd948..b266d371 100644 --- a/zebra-network/src/protocol/internal/response.rs +++ b/zebra-network/src/protocol/internal/response.rs @@ -5,7 +5,7 @@ use zebra_chain::{ use crate::meta_addr::MetaAddr; -use std::sync::Arc; +use std::{fmt, sync::Arc}; #[cfg(any(test, feature = "proptest-impl"))] use proptest_derive::Arbitrary; @@ -46,3 +46,24 @@ pub enum Response { /// v5 transactions use a witnessed transaction ID. TransactionIds(Vec), } + +impl fmt::Display for Response { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str(&match self { + Response::Nil => "Nil".to_string(), + + Response::Peers(peers) => format!("Peers {{ peers: {} }}", peers.len()), + + Response::Blocks(blocks) => format!("Blocks {{ blocks: {} }}", blocks.len()), + Response::BlockHashes(hashes) => format!("BlockHashes {{ hashes: {} }}", hashes.len()), + Response::BlockHeaders(headers) => { + format!("BlockHeaders {{ headers: {} }}", headers.len()) + } + + Response::Transactions(transactions) => { + format!("Transactions {{ transactions: {} }}", transactions.len()) + } + Response::TransactionIds(ids) => format!("TransactionIds {{ ids: {} }}", ids.len()), + }) + } +}