diff --git a/zebra-network/src/peer/connection.rs b/zebra-network/src/peer/connection.rs index 8e957675..ff4545c9 100644 --- a/zebra-network/src/peer/connection.rs +++ b/zebra-network/src/peer/connection.rs @@ -30,6 +30,7 @@ use tracing_futures::Instrument; use zebra_chain::{ block::{self, Block}, serialization::SerializationError, + transaction::{self, Transaction}, }; use crate::{ @@ -47,12 +48,16 @@ pub(super) enum Handler { /// Indicates that the handler has finished processing the request. Finished(Result), Ping(Nonce), - GetPeers, - GetBlocksByHash { + Peers, + FindBlocks, + BlocksByHash { hashes: HashSet, blocks: Vec>, }, - FindBlocks, + TransactionsByHash { + hashes: HashSet, + transactions: Vec>, + }, } impl Handler { @@ -79,9 +84,37 @@ impl Handler { Ping(req_nonce) } } - (GetPeers, Message::Addr(addrs)) => Finished(Ok(Response::Peers(addrs))), + (Peers, Message::Addr(addrs)) => Finished(Ok(Response::Peers(addrs))), ( - GetBlocksByHash { + TransactionsByHash { + mut hashes, + mut transactions, + }, + Message::Tx(transaction), + ) => { + if hashes.remove(&transaction.hash()) { + transactions.push(transaction); + if hashes.is_empty() { + Finished(Ok(Response::Transactions(transactions))) + } else { + TransactionsByHash { + hashes, + transactions, + } + } + } else { + // This transaction isn't the one we asked for, + // but unsolicited transactions are OK, so leave + // for future handling. + ignored_msg = Some(Message::Tx(transaction)); + TransactionsByHash { + hashes, + transactions, + } + } + } + ( + BlocksByHash { mut hashes, mut blocks, }, @@ -92,9 +125,11 @@ impl Handler { if hashes.is_empty() { Finished(Ok(Response::Blocks(blocks))) } else { - GetBlocksByHash { hashes, blocks } + BlocksByHash { hashes, blocks } } } else { + // Blocks shouldn't be sent unsolicited, + // so fail the request if we got the wrong one. Finished(Err(PeerError::WrongBlock.into())) } } @@ -352,7 +387,7 @@ where .await .map_err(|e| e.into()) .map(|()| AwaitingResponse { - handler: Handler::GetPeers, + handler: Handler::Peers, tx, span, }), @@ -374,13 +409,28 @@ where .await .map_err(|e| e.into()) .map(|()| AwaitingResponse { - handler: Handler::GetBlocksByHash { + handler: Handler::BlocksByHash { blocks: Vec::with_capacity(hashes.len()), hashes, }, tx, span, }), + (AwaitingRequest, TransactionsByHash(hashes)) => self + .peer_tx + .send(Message::GetData( + hashes.iter().map(|h| (*h).into()).collect(), + )) + .await + .map_err(|e| e.into()) + .map(|()| AwaitingResponse { + handler: Handler::TransactionsByHash { + transactions: Vec::with_capacity(hashes.len()), + hashes, + }, + tx, + span, + }), (AwaitingRequest, FindBlocks { known_blocks, stop }) => self .peer_tx .send(Message::GetBlocks { @@ -394,6 +444,32 @@ where tx, span, }), + (AwaitingRequest, PushTransaction(transaction)) => { + // Since we're not waiting for further messages, we need to + // send a response before dropping tx. + let _ = tx.send(Ok(Response::Nil)); + self.peer_tx + .send(Message::Tx(transaction)) + .await + .map_err(|e| e.into()) + .map(|()| AwaitingRequest) + } + (AwaitingRequest, AdvertiseTransactions(hashes)) => { + let _ = tx.send(Ok(Response::Nil)); + self.peer_tx + .send(Message::Inv(hashes.iter().map(|h| (*h).into()).collect())) + .await + .map_err(|e| e.into()) + .map(|()| AwaitingRequest) + } + (AwaitingRequest, AdvertiseBlock(hash)) => { + let _ = tx.send(Ok(Response::Nil)); + self.peer_tx + .send(Message::Inv(vec![hash.into()])) + .await + .map_err(|e| e.into()) + .map(|()| AwaitingRequest) + } } { Ok(new_state) => { self.state = new_state; @@ -407,6 +483,8 @@ where // context (namely, the work of processing the inbound msg as a request) #[instrument(skip(self))] async fn handle_message_as_request(&mut self, msg: Message) { + // XXX(hdevalence) -- using multiple match statements here + // prevents us from having exhaustiveness checking. trace!(?msg); // These messages are transport-related, handle them separately: match msg { @@ -449,6 +527,78 @@ where None } Message::GetAddr => Some(Request::Peers), + Message::GetData(items) + if items + .iter() + .all(|item| matches!(item, InventoryHash::Block(_))) => + { + Some(Request::BlocksByHash( + items + .iter() + .map(|item| { + if let InventoryHash::Block(hash) = item { + *hash + } else { + unreachable!("already checked all items are InventoryHash::Block") + } + }) + .collect(), + )) + } + Message::GetData(items) + if items + .iter() + .all(|item| matches!(item, InventoryHash::Tx(_))) => + { + Some(Request::TransactionsByHash( + items + .iter() + .map(|item| { + if let InventoryHash::Tx(hash) = item { + *hash + } else { + unreachable!("already checked all items are InventoryHash::Tx") + } + }) + .collect(), + )) + } + Message::GetData(items) => { + debug!(?items, "could not interpret getdata message"); + None + } + Message::Tx(transaction) => Some(Request::PushTransaction(transaction)), + // We don't expect to be advertised multiple blocks at a time, + // so we ignore any advertisements of multiple blocks. + Message::Inv(items) + if items.len() == 1 && matches!(items[0], InventoryHash::Block(_)) => + { + if let InventoryHash::Block(hash) = items[0] { + Some(Request::AdvertiseBlock(hash)) + } else { + unreachable!("already checked we got a single block hash"); + } + } + // This match arm is terrible, because we have to check that all the items + // are the correct kind and *then* convert them all. + Message::Inv(items) + if items + .iter() + .all(|item| matches!(item, InventoryHash::Tx(_))) => + { + Some(Request::AdvertiseTransactions( + items + .iter() + .map(|item| { + if let InventoryHash::Tx(hash) = item { + *hash + } else { + unreachable!("already checked all items are InventoryHash::Tx") + } + }) + .collect(), + )) + } _ => { debug!("unhandled message type"); None @@ -494,6 +644,14 @@ where self.fail_with(e.into()); } } + Response::Transactions(transactions) => { + // Generate one tx message per transaction. + for transaction in transactions.into_iter() { + if let Err(e) = self.peer_tx.send(Message::Tx(transaction)).await { + self.fail_with(e.into()); + } + } + } Response::Blocks(blocks) => { // Generate one block message per block. for block in blocks.into_iter() { diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index 59c3f9eb..eb44f0d7 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -11,6 +11,7 @@ use std::{ use futures::{ channel::{mpsc, oneshot}, + future::TryFutureExt, prelude::*, stream::FuturesUnordered, }; @@ -260,31 +261,78 @@ where svc.load() } - fn best_peer_for(&mut self, req: &Request) -> (SocketAddr, D::Service) { - if let Request::BlocksByHash(hashes) = req { - for hash in hashes.iter() { - let mut peers = self.inventory_registry.peers(&(*hash).into()); - if let Some(index) = peers.find_map(|addr| self.ready_services.get_index_of(addr)) { - return self - .ready_services - .swap_remove_index(index) - .expect("found index must be valid"); - } - } - } - - self.default_peer() - } - - fn default_peer(&mut self) -> (SocketAddr, D::Service) { + /// Routes a request using P2C load-balancing. + fn route_p2c(&mut self, req: Request) -> >::Future { let index = self .next_idx .take() .expect("ready service must have valid preselected index"); - self.ready_services + let (key, mut svc) = self + .ready_services .swap_remove_index(index) - .expect("preselected index must be valid") + .expect("preselected index must be valid"); + + let fut = svc.call(req); + self.push_unready(key, svc); + fut.map_err(Into::into).boxed() + } + + /// Tries to route a request to a peer that advertised that inventory, + /// falling back to P2C if there is no ready peer. + fn route_inv( + &mut self, + req: Request, + hash: InventoryHash, + ) -> >::Future { + let candidate_index = self + .inventory_registry + .peers(&hash) + .find_map(|addr| self.ready_services.get_index_of(addr)); + + match candidate_index { + Some(index) => { + let (key, mut svc) = self + .ready_services + .swap_remove_index(index) + .expect("found index must be valid"); + tracing::debug!(?hash, ?key, "routing based on inventory"); + + let fut = svc.call(req); + self.push_unready(key, svc); + fut.map_err(Into::into).boxed() + } + None => { + tracing::debug!( + ?hash, + "could not find ready peer for inventory hash, falling back to p2c" + ); + self.route_p2c(req) + } + } + } + + // Routes a request to all ready peers, ignoring return values. + fn route_all(&mut self, req: Request) -> >::Future { + // This is not needless: otherwise, we'd hold a &mut reference to self.ready_services, + // blocking us from passing &mut self to push_unready. + let ready_services = std::mem::take(&mut self.ready_services); + + let futs = FuturesUnordered::new(); + for (key, mut svc) in ready_services { + futs.push(svc.call(req.clone()).map_err(|_| ())); + self.push_unready(key, svc); + } + + async move { + let results = futs.collect::>>().await; + tracing::debug!( + ok.len = results.iter().filter(|r| r.is_ok()).count(), + err.len = results.iter().filter(|r| r.is_err()).count(), + ); + Ok(Response::Nil) + } + .boxed() } } @@ -361,19 +409,19 @@ where } fn call(&mut self, req: Request) -> Self::Future { - let (key, mut svc) = self.best_peer_for(&req); - - // XXX add a dimension tagging request metrics by type - metrics::counter!( - "outbound_requests", - 1, - "key" => key.to_string(), - ); - - let fut = svc.call(req); - self.push_unready(key, svc); - - use futures::future::TryFutureExt; - fut.map_err(Into::into).boxed() + match req { + // Only do inventory-aware routing on individual items. + Request::BlocksByHash(ref hashes) if hashes.len() == 1 => { + let hash = InventoryHash::from(*hashes.iter().next().unwrap()); + self.route_inv(req, hash) + } + Request::TransactionsByHash(ref hashes) if hashes.len() == 1 => { + let hash = InventoryHash::from(*hashes.iter().next().unwrap()); + self.route_inv(req, hash) + } + Request::AdvertiseTransactions(_) => self.route_all(req), + Request::AdvertiseBlock(_) => self.route_all(req), + _ => self.route_p2c(req), + } } } diff --git a/zebra-network/src/protocol/internal/request.rs b/zebra-network/src/protocol/internal/request.rs index 506e5112..deb4895f 100644 --- a/zebra-network/src/protocol/internal/request.rs +++ b/zebra-network/src/protocol/internal/request.rs @@ -1,6 +1,9 @@ -use std::collections::HashSet; +use std::{collections::HashSet, sync::Arc}; -use zebra_chain::block; +use zebra_chain::{ + block, + transaction::{self, Transaction}, +}; use super::super::types::Nonce; @@ -39,11 +42,30 @@ pub enum Request { /// didn't start with a `Vec` but with, e.g., an iterator, they can collect /// directly into a `HashSet` and save work. /// + /// If this requests a recently-advertised block, the peer set will make a + /// best-effort attempt to route the request to a peer that advertised the + /// block. This routing is only used for request sets of size 1. + /// Otherwise, it is routed using the normal load-balancing strategy. + /// /// # Returns /// /// Returns [`Response::Blocks`](super::Response::Blocks). BlocksByHash(HashSet), + /// Request transactions by hash. + /// + /// This uses a `HashSet` for the same reason as [`Request::BlocksByHash`]. + /// + /// If this requests a recently-advertised transaction, the peer set will + /// make a best-effort attempt to route the request to a peer that advertised + /// the transaction. This routing is only used for request sets of size 1. + /// Otherwise, it is routed using the normal load-balancing strategy. + /// + /// # Returns + /// + /// Returns [`Response::Transactions`](super::Response::Transactions). + TransactionsByHash(HashSet), + /// Request block hashes of subsequent blocks in the chain, giving hashes of /// known blocks. /// @@ -69,4 +91,50 @@ pub enum Request { /// Optionally, the last header to request. stop: Option, }, + + /// Push a transaction to a remote peer, without advertising it to them first. + /// + /// This is implemented by sending an unsolicited `tx` message. + /// + /// # Returns + /// + /// Returns [`Response::Nil`](super::Response::Nil). + PushTransaction(Arc), + + /// Advertise a set of transactions to all peers. + /// + /// This is intended to be used in Zebra with a single transaction at a time + /// (set of size 1), but multiple transactions are permitted because this is + /// how we interpret advertisements from Zcashd, which sometimes advertises + /// multiple transactions at once. + /// + /// This is implemented by sending an `inv` message containing the + /// transaction hash, allowing the remote peer to choose whether to download + /// it. Remote peers who choose to download the transaction will generate a + /// [`Request::TransactionsByHash`] against the "inbound" service passed to + /// [`zebra_network::init`]. + /// + /// The peer set routes this request specially, sending it to *every* + /// available peer. + /// + /// # Returns + /// + /// Returns [`Response::Nil`](super::Response::Nil). + AdvertiseTransactions(HashSet), + + /// Advertise a block to all peers. + /// + /// This is implemented by sending an `inv` message containing the + /// block hash, allowing the remote peer to choose whether to download + /// it. Remote peers who choose to download the transaction will generate a + /// [`Request::BlocksByHash`] against the "inbound" service passed to + /// [`zebra_network::init`]. + /// + /// The peer set routes this request specially, sending it to *every* + /// available peer. + /// + /// # Returns + /// + /// Returns [`Response::Nil`](super::Response::Nil). + AdvertiseBlock(block::Hash), } diff --git a/zebra-network/src/protocol/internal/response.rs b/zebra-network/src/protocol/internal/response.rs index 272e3a77..80bef665 100644 --- a/zebra-network/src/protocol/internal/response.rs +++ b/zebra-network/src/protocol/internal/response.rs @@ -1,4 +1,7 @@ -use zebra_chain::block::{self, Block}; +use zebra_chain::{ + block::{self, Block}, + transaction::Transaction, +}; use crate::meta_addr::MetaAddr; use std::sync::Arc; @@ -17,4 +20,7 @@ pub enum Response { /// A list of block hashes. BlockHashes(Vec), + + /// A list of transactions. + Transactions(Vec>), }