network: clean up message-as-request translation

This commit is contained in:
Henry de Valence 2020-09-18 20:31:41 -07:00
parent 4b35fea492
commit 430176dd0d
1 changed files with 117 additions and 106 deletions

View File

@ -483,11 +483,15 @@ where
// context (namely, the work of processing the inbound msg as a request) // context (namely, the work of processing the inbound msg as a request)
#[instrument(skip(self))] #[instrument(skip(self))]
async fn handle_message_as_request(&mut self, msg: Message) { async fn handle_message_as_request(&mut self, msg: Message) {
// XXX(hdevalence) -- using multiple match statements here let req = match msg {
// prevents us from having exhaustiveness checking. Message::Ping(nonce) => {
trace!(?msg); trace!(?nonce, "responding to heartbeat");
// These messages are transport-related, handle them separately: if let Err(e) = self.peer_tx.send(Message::Pong(nonce)).await {
match msg { self.fail_with(e.into());
}
return;
}
// These messages shouldn't be sent outside of a handshake.
Message::Version { .. } => { Message::Version { .. } => {
self.fail_with(PeerError::DuplicateHandshake); self.fail_with(PeerError::DuplicateHandshake);
return; return;
@ -496,118 +500,99 @@ where
self.fail_with(PeerError::DuplicateHandshake); self.fail_with(PeerError::DuplicateHandshake);
return; return;
} }
Message::Ping(nonce) => { // These messages should already be handled as a response if they
trace!(?nonce, "responding to heartbeat"); // could be a response, so if we see them here, they were either
match self.peer_tx.send(Message::Pong(nonce)).await { // sent unsolicited, or we've failed to handle messages correctly.
Ok(()) => {} Message::Reject { .. } => {
Err(e) => self.fail_with(e.into()), trace!("rejecting unsolicited reject message");
}
return;
}
_ => {}
}
// Per BIP-011, since we don't advertise NODE_BLOOM, we MUST
// disconnect from this peer immediately.
match msg {
Message::FilterLoad { .. }
| Message::FilterAdd { .. }
| Message::FilterClear { .. } => {
self.fail_with(PeerError::UnsupportedMessage); self.fail_with(PeerError::UnsupportedMessage);
return; return;
} }
_ => {} Message::NotFound { .. } => {
} trace!("rejecting unsolicited notfound message");
self.fail_with(PeerError::UnsupportedMessage);
// Interpret `msg` as a request from the remote peer to our node, return;
// and try to construct an appropriate request object. }
let req = match msg { Message::Pong(_) => {
trace!("rejecting unsolicited pong message");
self.fail_with(PeerError::UnsupportedMessage);
return;
}
Message::Block(_) => {
trace!("rejecting unsolicited block message");
self.fail_with(PeerError::UnsupportedMessage);
return;
}
Message::Headers(_) => {
trace!("rejecting unsolicited headers message");
self.fail_with(PeerError::UnsupportedMessage);
return;
}
// These messages should never be sent by peers.
Message::FilterLoad { .. }
| Message::FilterAdd { .. }
| Message::FilterClear { .. } => {
trace!("got BIP11 message without NODE_BLOOM");
self.fail_with(PeerError::UnsupportedMessage);
return;
}
// Zebra crawls the network proactively, to prevent
// peers from inserting data into our address book.
Message::Addr(_) => { Message::Addr(_) => {
debug!("ignoring unsolicited addr message"); trace!("ignoring unsolicited addr message");
None return;
} }
Message::GetAddr => Some(Request::Peers), Message::Tx(transaction) => Request::PushTransaction(transaction),
Message::GetData(items) Message::Inv(items) => match &items[..] {
if items // We don't expect to be advertised multiple blocks at a time,
.iter() // so we ignore any advertisements of multiple blocks.
.all(|item| matches!(item, InventoryHash::Block(_))) => [InventoryHash::Block(hash)] => Request::AdvertiseBlock(*hash),
{ [InventoryHash::Tx(_), rest @ ..]
Some(Request::BlocksByHash( if rest.iter().all(|item| matches!(item, InventoryHash::Tx(_))) =>
items {
.iter() Request::TransactionsByHash(transaction_hashes(&items))
.map(|item| {
if let InventoryHash::Block(hash) = item {
*hash
} else {
unreachable!("already checked all items are InventoryHash::Block")
}
})
.collect(),
))
}
Message::GetData(items)
if items
.iter()
.all(|item| matches!(item, InventoryHash::Tx(_))) =>
{
Some(Request::TransactionsByHash(
items
.iter()
.map(|item| {
if let InventoryHash::Tx(hash) = item {
*hash
} else {
unreachable!("already checked all items are InventoryHash::Tx")
}
})
.collect(),
))
}
Message::GetData(items) => {
debug!(?items, "could not interpret getdata message");
None
}
Message::Tx(transaction) => Some(Request::PushTransaction(transaction)),
// We don't expect to be advertised multiple blocks at a time,
// so we ignore any advertisements of multiple blocks.
Message::Inv(items)
if items.len() == 1 && matches!(items[0], InventoryHash::Block(_)) =>
{
if let InventoryHash::Block(hash) = items[0] {
Some(Request::AdvertiseBlock(hash))
} else {
unreachable!("already checked we got a single block hash");
} }
} _ => {
// This match arm is terrible, because we have to check that all the items debug!(?items, "ignoring unrecognized inv message");
// are the correct kind and *then* convert them all. self.fail_with(PeerError::UnsupportedMessage);
Message::Inv(items) return;
if items }
.iter() },
.all(|item| matches!(item, InventoryHash::Tx(_))) => Message::GetData(items) => match &items[..] {
{ [InventoryHash::Block(_), rest @ ..]
Some(Request::AdvertiseTransactions( if rest
items
.iter() .iter()
.map(|item| { .all(|item| matches!(item, InventoryHash::Block(_))) =>
if let InventoryHash::Tx(hash) = item { {
*hash Request::BlocksByHash(block_hashes(&items))
} else { }
unreachable!("already checked all items are InventoryHash::Tx") [InventoryHash::Tx(_), rest @ ..]
} if rest.iter().all(|item| matches!(item, InventoryHash::Tx(_))) =>
}) {
.collect(), Request::TransactionsByHash(transaction_hashes(&items))
)) }
_ => {
trace!(?items, "ignoring getdata with mixed item types");
self.fail_with(PeerError::UnsupportedMessage);
return;
}
},
Message::GetAddr => Request::Peers,
Message::GetBlocks { .. } => {
debug!("ignoring unimplemented getblocks message");
return;
} }
_ => { Message::GetHeaders { .. } => {
debug!("unhandled message type"); debug!("ignoring unimplemented getheaders message");
None return;
}
Message::Mempool => {
debug!("ignoring unimplemented mempool message");
return;
} }
}; };
if let Some(req) = req { self.drive_peer_request(req).await
self.drive_peer_request(req).await
}
} }
/// Given a `req` originating from the peer, drive it to completion and send /// Given a `req` originating from the peer, drive it to completion and send
@ -672,3 +657,29 @@ where
} }
} }
} }
fn transaction_hashes(items: &[InventoryHash]) -> HashSet<transaction::Hash> {
items
.iter()
.filter_map(|item| {
if let InventoryHash::Tx(hash) = item {
Some(*hash)
} else {
None
}
})
.collect()
}
fn block_hashes(items: &[InventoryHash]) -> HashSet<block::Hash> {
items
.iter()
.filter_map(|item| {
if let InventoryHash::Block(hash) = item {
Some(*hash)
} else {
None
}
})
.collect()
}