network: handle Message::NotFound as a response

This cleans up the response processing logic a little bit along the way,
but the overall division of responsibility should be better documented
in a future commit.
This commit is contained in:
Henry de Valence 2020-09-19 19:45:46 -07:00
parent 64905563d1
commit 4df5632752
2 changed files with 65 additions and 25 deletions

View File

@ -46,7 +46,8 @@ use super::{ClientRequest, ErrorSlot, PeerError, SharedPeerError};
pub(super) enum Handler {
/// Indicates that the handler has finished processing the request.
Finished(Result<Response, SharedPeerError>),
/// An error here is scoped to the request.
Finished(Result<Response, PeerError>),
Ping(Nonce),
Peers,
FindBlocks,
@ -66,17 +67,21 @@ impl Handler {
/// Try to handle `msg` as a response to a client request, possibly consuming
/// it in the process.
///
/// This function is where we statefully interpret Bitcoin/Zcash messages
/// into responses to messages in the internal request/response protocol.
/// This conversion is done by a sequence of (request, message) match arms,
/// each of which contains the conversion logic for that pair.
///
/// Taking ownership of the message means that we can pass ownership of its
/// contents to responses without additional copies. If the message is not
/// interpretable as a response, we return ownership to the caller.
///
/// Unexpected messages are left unprocessed, and may be rejected later.
fn process_message(&mut self, msg: Message) -> Option<Message> {
// This function is where we statefully interpret Bitcoin/Zcash messages
// into responses to messages in the internal request/response protocol.
// This conversion is done by a sequence of (request, message) match arms,
// each of which contains the conversion logic for that pair.
let mut ignored_msg = None;
// XXX can this be avoided?
let tmp_state = std::mem::replace(self, Handler::Finished(Ok(Response::Nil)));
*self = match (tmp_state, msg) {
(Handler::Ping(req_nonce), Message::Pong(rsp_nonce)) => {
if req_nonce == rsp_nonce {
@ -95,19 +100,36 @@ impl Handler {
) => {
if hashes.remove(&transaction.hash()) {
transactions.push(transaction);
if hashes.is_empty() {
Handler::Finished(Ok(Response::Transactions(transactions)))
} else {
Handler::TransactionsByHash {
hashes,
transactions,
}
}
} else {
// This transaction isn't the one we asked for,
// but unsolicited transactions are OK, so leave
// for future handling.
ignored_msg = Some(Message::Tx(transaction));
}
if hashes.is_empty() {
Handler::Finished(Ok(Response::Transactions(transactions)))
} else {
Handler::TransactionsByHash {
hashes,
transactions,
}
}
}
(
Handler::TransactionsByHash {
hashes,
transactions,
},
Message::NotFound(items),
) => {
let hash_in_hashes = |item: &InventoryHash| {
if let InventoryHash::Tx(hash) = item {
hashes.contains(hash)
} else {
false
}
};
if items.iter().all(hash_in_hashes) {
Handler::Finished(Err(PeerError::NotFound(items)))
} else {
ignored_msg = Some(Message::NotFound(items));
Handler::TransactionsByHash {
hashes,
transactions,
@ -123,15 +145,28 @@ impl Handler {
) => {
if hashes.remove(&block.hash()) {
blocks.push(block);
if hashes.is_empty() {
Handler::Finished(Ok(Response::Blocks(blocks)))
} else {
Handler::BlocksByHash { hashes, blocks }
}
} else {
// Blocks shouldn't be sent unsolicited,
// so fail the request if we got the wrong one.
Handler::Finished(Err(PeerError::WrongBlock.into()))
ignored_msg = Some(Message::Block(block));
}
if hashes.is_empty() {
Handler::Finished(Ok(Response::Blocks(blocks)))
} else {
Handler::BlocksByHash { hashes, blocks }
}
}
(Handler::BlocksByHash { hashes, blocks }, Message::NotFound(items)) => {
let hash_in_hashes = |item: &InventoryHash| {
if let InventoryHash::Block(hash) = item {
hashes.contains(hash)
} else {
false
}
};
if items.iter().all(hash_in_hashes) {
Handler::Finished(Err(PeerError::NotFound(items)))
} else {
ignored_msg = Some(Message::NotFound(items));
Handler::BlocksByHash { hashes, blocks }
}
}
(Handler::FindBlocks, Message::Inv(items))
@ -292,7 +327,7 @@ where
tx,
..
} => {
let _ = tx.send(response);
let _ = tx.send(response.map_err(Into::into));
State::AwaitingRequest
}
pending @ State::AwaitingResponse { .. } => pending,

View File

@ -5,6 +5,8 @@ use thiserror::Error;
use tracing_error::TracedError;
use zebra_chain::serialization::SerializationError;
use crate::protocol::external::InventoryHash;
/// A wrapper around `Arc<PeerError>` that implements `Error`.
#[derive(Error, Debug, Clone)]
#[error(transparent)]
@ -57,6 +59,9 @@ pub enum PeerError {
/// The remote peer responded with a block we didn't ask for.
#[error("Remote peer responded with a block we didn't ask for.")]
WrongBlock,
/// We requested data that the peer couldn't find.
#[error("Remote peer could not find items: {0:?}")]
NotFound(Vec<InventoryHash>),
}
#[derive(Default, Clone)]