diff --git a/zebra-network/src/peer/connection.rs b/zebra-network/src/peer/connection.rs index abe17f11..2569aa2d 100644 --- a/zebra-network/src/peer/connection.rs +++ b/zebra-network/src/peer/connection.rs @@ -46,7 +46,8 @@ use super::{ClientRequest, ErrorSlot, PeerError, SharedPeerError}; pub(super) enum Handler { /// Indicates that the handler has finished processing the request. - Finished(Result), + /// An error here is scoped to the request. + Finished(Result), Ping(Nonce), Peers, FindBlocks, @@ -66,17 +67,21 @@ impl Handler { /// Try to handle `msg` as a response to a client request, possibly consuming /// it in the process. /// + /// This function is where we statefully interpret Bitcoin/Zcash messages + /// into responses to messages in the internal request/response protocol. + /// This conversion is done by a sequence of (request, message) match arms, + /// each of which contains the conversion logic for that pair. + /// /// Taking ownership of the message means that we can pass ownership of its /// contents to responses without additional copies. If the message is not /// interpretable as a response, we return ownership to the caller. + /// + /// Unexpected messages are left unprocessed, and may be rejected later. fn process_message(&mut self, msg: Message) -> Option { - // This function is where we statefully interpret Bitcoin/Zcash messages - // into responses to messages in the internal request/response protocol. - // This conversion is done by a sequence of (request, message) match arms, - // each of which contains the conversion logic for that pair. let mut ignored_msg = None; // XXX can this be avoided? let tmp_state = std::mem::replace(self, Handler::Finished(Ok(Response::Nil))); + *self = match (tmp_state, msg) { (Handler::Ping(req_nonce), Message::Pong(rsp_nonce)) => { if req_nonce == rsp_nonce { @@ -95,19 +100,36 @@ impl Handler { ) => { if hashes.remove(&transaction.hash()) { transactions.push(transaction); - if hashes.is_empty() { - Handler::Finished(Ok(Response::Transactions(transactions))) - } else { - Handler::TransactionsByHash { - hashes, - transactions, - } - } } else { - // This transaction isn't the one we asked for, - // but unsolicited transactions are OK, so leave - // for future handling. ignored_msg = Some(Message::Tx(transaction)); + } + if hashes.is_empty() { + Handler::Finished(Ok(Response::Transactions(transactions))) + } else { + Handler::TransactionsByHash { + hashes, + transactions, + } + } + } + ( + Handler::TransactionsByHash { + hashes, + transactions, + }, + Message::NotFound(items), + ) => { + let hash_in_hashes = |item: &InventoryHash| { + if let InventoryHash::Tx(hash) = item { + hashes.contains(hash) + } else { + false + } + }; + if items.iter().all(hash_in_hashes) { + Handler::Finished(Err(PeerError::NotFound(items))) + } else { + ignored_msg = Some(Message::NotFound(items)); Handler::TransactionsByHash { hashes, transactions, @@ -123,15 +145,28 @@ impl Handler { ) => { if hashes.remove(&block.hash()) { blocks.push(block); - if hashes.is_empty() { - Handler::Finished(Ok(Response::Blocks(blocks))) - } else { - Handler::BlocksByHash { hashes, blocks } - } } else { - // Blocks shouldn't be sent unsolicited, - // so fail the request if we got the wrong one. - Handler::Finished(Err(PeerError::WrongBlock.into())) + ignored_msg = Some(Message::Block(block)); + } + if hashes.is_empty() { + Handler::Finished(Ok(Response::Blocks(blocks))) + } else { + Handler::BlocksByHash { hashes, blocks } + } + } + (Handler::BlocksByHash { hashes, blocks }, Message::NotFound(items)) => { + let hash_in_hashes = |item: &InventoryHash| { + if let InventoryHash::Block(hash) = item { + hashes.contains(hash) + } else { + false + } + }; + if items.iter().all(hash_in_hashes) { + Handler::Finished(Err(PeerError::NotFound(items))) + } else { + ignored_msg = Some(Message::NotFound(items)); + Handler::BlocksByHash { hashes, blocks } } } (Handler::FindBlocks, Message::Inv(items)) @@ -292,7 +327,7 @@ where tx, .. } => { - let _ = tx.send(response); + let _ = tx.send(response.map_err(Into::into)); State::AwaitingRequest } pending @ State::AwaitingResponse { .. } => pending, diff --git a/zebra-network/src/peer/error.rs b/zebra-network/src/peer/error.rs index 5398eb7e..2fd3df98 100644 --- a/zebra-network/src/peer/error.rs +++ b/zebra-network/src/peer/error.rs @@ -5,6 +5,8 @@ use thiserror::Error; use tracing_error::TracedError; use zebra_chain::serialization::SerializationError; +use crate::protocol::external::InventoryHash; + /// A wrapper around `Arc` that implements `Error`. #[derive(Error, Debug, Clone)] #[error(transparent)] @@ -57,6 +59,9 @@ pub enum PeerError { /// The remote peer responded with a block we didn't ask for. #[error("Remote peer responded with a block we didn't ask for.")] WrongBlock, + /// We requested data that the peer couldn't find. + #[error("Remote peer could not find items: {0:?}")] + NotFound(Vec), } #[derive(Default, Clone)]