network: implement transaction request handling. (#1016)

This commit makes several related changes to the network code:

- adds a `TransactionsByHash(HashSet<transaction::Hash>)` request and
  `Transactions(Vec<Arc<Transaction>>)` response pair that allows
  fetching transactions from a remote peer;

- adds a `PushTransaction(Arc<Transaction>)` request that pushes an
  unsolicited transaction to a remote peer;

- adds an `AdvertiseTransactions(HashSet<transaction::Hash>)` request
  that advertises transactions by hash to a remote peer;

- adds an `AdvertiseBlock(block::Hash)` request that advertises a block
  by hash to a remote peer;

Then, it modifies the connection state machine so that outbound
requests to remote peers are handled properly:

- `TransactionsByHash` generates a `getdata` message and collects the
  results, like the existing `BlocksByHash` request.

- `PushTransaction` generates a `tx` message, and returns `Nil` immediately.

- `AdvertiseTransactions` and `AdvertiseBlock` generate an `inv`
  message, and return `Nil` immediately.

Next, it modifies the connection state machine so that messages
from remote peers generate requests to the inbound service:

- `getdata` messages generate `BlocksByHash` or `TransactionsByHash`
  requests, depending on the content of the message;

- `tx` messages generate `PushTransaction` requests;

- `inv` messages generate `AdvertiseBlock` or `AdvertiseTransactions`
  requests.

Finally, it refactors the request routing logic for the peer set to
handle advertisement messages, providing three routing methods:

- `route_p2c`, which uses p2c as normal (default);
- `route_inv`, which uses the inventory registry and falls back to p2c
  (used for `BlocksByHash` or `TransactionsByHash`);
- `route_all`, which broadcasts a request to all ready peers (used for
  `AdvertiseBlock` and `AdvertiseTransactions`).
This commit is contained in:
Henry de Valence 2020-09-08 10:16:29 -07:00 committed by GitHub
parent cad38415b2
commit 3f150eb16e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 324 additions and 44 deletions

View File

@ -30,6 +30,7 @@ use tracing_futures::Instrument;
use zebra_chain::{ use zebra_chain::{
block::{self, Block}, block::{self, Block},
serialization::SerializationError, serialization::SerializationError,
transaction::{self, Transaction},
}; };
use crate::{ use crate::{
@ -47,12 +48,16 @@ pub(super) enum Handler {
/// Indicates that the handler has finished processing the request. /// Indicates that the handler has finished processing the request.
Finished(Result<Response, SharedPeerError>), Finished(Result<Response, SharedPeerError>),
Ping(Nonce), Ping(Nonce),
GetPeers, Peers,
GetBlocksByHash { FindBlocks,
BlocksByHash {
hashes: HashSet<block::Hash>, hashes: HashSet<block::Hash>,
blocks: Vec<Arc<Block>>, blocks: Vec<Arc<Block>>,
}, },
FindBlocks, TransactionsByHash {
hashes: HashSet<transaction::Hash>,
transactions: Vec<Arc<Transaction>>,
},
} }
impl Handler { impl Handler {
@ -79,9 +84,37 @@ impl Handler {
Ping(req_nonce) Ping(req_nonce)
} }
} }
(GetPeers, Message::Addr(addrs)) => Finished(Ok(Response::Peers(addrs))), (Peers, Message::Addr(addrs)) => Finished(Ok(Response::Peers(addrs))),
( (
GetBlocksByHash { TransactionsByHash {
mut hashes,
mut transactions,
},
Message::Tx(transaction),
) => {
if hashes.remove(&transaction.hash()) {
transactions.push(transaction);
if hashes.is_empty() {
Finished(Ok(Response::Transactions(transactions)))
} else {
TransactionsByHash {
hashes,
transactions,
}
}
} else {
// This transaction isn't the one we asked for,
// but unsolicited transactions are OK, so leave
// for future handling.
ignored_msg = Some(Message::Tx(transaction));
TransactionsByHash {
hashes,
transactions,
}
}
}
(
BlocksByHash {
mut hashes, mut hashes,
mut blocks, mut blocks,
}, },
@ -92,9 +125,11 @@ impl Handler {
if hashes.is_empty() { if hashes.is_empty() {
Finished(Ok(Response::Blocks(blocks))) Finished(Ok(Response::Blocks(blocks)))
} else { } else {
GetBlocksByHash { hashes, blocks } BlocksByHash { hashes, blocks }
} }
} else { } else {
// Blocks shouldn't be sent unsolicited,
// so fail the request if we got the wrong one.
Finished(Err(PeerError::WrongBlock.into())) Finished(Err(PeerError::WrongBlock.into()))
} }
} }
@ -352,7 +387,7 @@ where
.await .await
.map_err(|e| e.into()) .map_err(|e| e.into())
.map(|()| AwaitingResponse { .map(|()| AwaitingResponse {
handler: Handler::GetPeers, handler: Handler::Peers,
tx, tx,
span, span,
}), }),
@ -374,13 +409,28 @@ where
.await .await
.map_err(|e| e.into()) .map_err(|e| e.into())
.map(|()| AwaitingResponse { .map(|()| AwaitingResponse {
handler: Handler::GetBlocksByHash { handler: Handler::BlocksByHash {
blocks: Vec::with_capacity(hashes.len()), blocks: Vec::with_capacity(hashes.len()),
hashes, hashes,
}, },
tx, tx,
span, span,
}), }),
(AwaitingRequest, TransactionsByHash(hashes)) => self
.peer_tx
.send(Message::GetData(
hashes.iter().map(|h| (*h).into()).collect(),
))
.await
.map_err(|e| e.into())
.map(|()| AwaitingResponse {
handler: Handler::TransactionsByHash {
transactions: Vec::with_capacity(hashes.len()),
hashes,
},
tx,
span,
}),
(AwaitingRequest, FindBlocks { known_blocks, stop }) => self (AwaitingRequest, FindBlocks { known_blocks, stop }) => self
.peer_tx .peer_tx
.send(Message::GetBlocks { .send(Message::GetBlocks {
@ -394,6 +444,32 @@ where
tx, tx,
span, span,
}), }),
(AwaitingRequest, PushTransaction(transaction)) => {
// Since we're not waiting for further messages, we need to
// send a response before dropping tx.
let _ = tx.send(Ok(Response::Nil));
self.peer_tx
.send(Message::Tx(transaction))
.await
.map_err(|e| e.into())
.map(|()| AwaitingRequest)
}
(AwaitingRequest, AdvertiseTransactions(hashes)) => {
let _ = tx.send(Ok(Response::Nil));
self.peer_tx
.send(Message::Inv(hashes.iter().map(|h| (*h).into()).collect()))
.await
.map_err(|e| e.into())
.map(|()| AwaitingRequest)
}
(AwaitingRequest, AdvertiseBlock(hash)) => {
let _ = tx.send(Ok(Response::Nil));
self.peer_tx
.send(Message::Inv(vec![hash.into()]))
.await
.map_err(|e| e.into())
.map(|()| AwaitingRequest)
}
} { } {
Ok(new_state) => { Ok(new_state) => {
self.state = new_state; self.state = new_state;
@ -407,6 +483,8 @@ where
// context (namely, the work of processing the inbound msg as a request) // context (namely, the work of processing the inbound msg as a request)
#[instrument(skip(self))] #[instrument(skip(self))]
async fn handle_message_as_request(&mut self, msg: Message) { async fn handle_message_as_request(&mut self, msg: Message) {
// XXX(hdevalence) -- using multiple match statements here
// prevents us from having exhaustiveness checking.
trace!(?msg); trace!(?msg);
// These messages are transport-related, handle them separately: // These messages are transport-related, handle them separately:
match msg { match msg {
@ -449,6 +527,78 @@ where
None None
} }
Message::GetAddr => Some(Request::Peers), Message::GetAddr => Some(Request::Peers),
Message::GetData(items)
if items
.iter()
.all(|item| matches!(item, InventoryHash::Block(_))) =>
{
Some(Request::BlocksByHash(
items
.iter()
.map(|item| {
if let InventoryHash::Block(hash) = item {
*hash
} else {
unreachable!("already checked all items are InventoryHash::Block")
}
})
.collect(),
))
}
Message::GetData(items)
if items
.iter()
.all(|item| matches!(item, InventoryHash::Tx(_))) =>
{
Some(Request::TransactionsByHash(
items
.iter()
.map(|item| {
if let InventoryHash::Tx(hash) = item {
*hash
} else {
unreachable!("already checked all items are InventoryHash::Tx")
}
})
.collect(),
))
}
Message::GetData(items) => {
debug!(?items, "could not interpret getdata message");
None
}
Message::Tx(transaction) => Some(Request::PushTransaction(transaction)),
// We don't expect to be advertised multiple blocks at a time,
// so we ignore any advertisements of multiple blocks.
Message::Inv(items)
if items.len() == 1 && matches!(items[0], InventoryHash::Block(_)) =>
{
if let InventoryHash::Block(hash) = items[0] {
Some(Request::AdvertiseBlock(hash))
} else {
unreachable!("already checked we got a single block hash");
}
}
// This match arm is terrible, because we have to check that all the items
// are the correct kind and *then* convert them all.
Message::Inv(items)
if items
.iter()
.all(|item| matches!(item, InventoryHash::Tx(_))) =>
{
Some(Request::AdvertiseTransactions(
items
.iter()
.map(|item| {
if let InventoryHash::Tx(hash) = item {
*hash
} else {
unreachable!("already checked all items are InventoryHash::Tx")
}
})
.collect(),
))
}
_ => { _ => {
debug!("unhandled message type"); debug!("unhandled message type");
None None
@ -494,6 +644,14 @@ where
self.fail_with(e.into()); self.fail_with(e.into());
} }
} }
Response::Transactions(transactions) => {
// Generate one tx message per transaction.
for transaction in transactions.into_iter() {
if let Err(e) = self.peer_tx.send(Message::Tx(transaction)).await {
self.fail_with(e.into());
}
}
}
Response::Blocks(blocks) => { Response::Blocks(blocks) => {
// Generate one block message per block. // Generate one block message per block.
for block in blocks.into_iter() { for block in blocks.into_iter() {

View File

@ -11,6 +11,7 @@ use std::{
use futures::{ use futures::{
channel::{mpsc, oneshot}, channel::{mpsc, oneshot},
future::TryFutureExt,
prelude::*, prelude::*,
stream::FuturesUnordered, stream::FuturesUnordered,
}; };
@ -260,31 +261,78 @@ where
svc.load() svc.load()
} }
fn best_peer_for(&mut self, req: &Request) -> (SocketAddr, D::Service) { /// Routes a request using P2C load-balancing.
if let Request::BlocksByHash(hashes) = req { fn route_p2c(&mut self, req: Request) -> <Self as tower::Service<Request>>::Future {
for hash in hashes.iter() {
let mut peers = self.inventory_registry.peers(&(*hash).into());
if let Some(index) = peers.find_map(|addr| self.ready_services.get_index_of(addr)) {
return self
.ready_services
.swap_remove_index(index)
.expect("found index must be valid");
}
}
}
self.default_peer()
}
fn default_peer(&mut self) -> (SocketAddr, D::Service) {
let index = self let index = self
.next_idx .next_idx
.take() .take()
.expect("ready service must have valid preselected index"); .expect("ready service must have valid preselected index");
self.ready_services let (key, mut svc) = self
.ready_services
.swap_remove_index(index) .swap_remove_index(index)
.expect("preselected index must be valid") .expect("preselected index must be valid");
let fut = svc.call(req);
self.push_unready(key, svc);
fut.map_err(Into::into).boxed()
}
/// Tries to route a request to a peer that advertised that inventory,
/// falling back to P2C if there is no ready peer.
fn route_inv(
&mut self,
req: Request,
hash: InventoryHash,
) -> <Self as tower::Service<Request>>::Future {
let candidate_index = self
.inventory_registry
.peers(&hash)
.find_map(|addr| self.ready_services.get_index_of(addr));
match candidate_index {
Some(index) => {
let (key, mut svc) = self
.ready_services
.swap_remove_index(index)
.expect("found index must be valid");
tracing::debug!(?hash, ?key, "routing based on inventory");
let fut = svc.call(req);
self.push_unready(key, svc);
fut.map_err(Into::into).boxed()
}
None => {
tracing::debug!(
?hash,
"could not find ready peer for inventory hash, falling back to p2c"
);
self.route_p2c(req)
}
}
}
// Routes a request to all ready peers, ignoring return values.
fn route_all(&mut self, req: Request) -> <Self as tower::Service<Request>>::Future {
// This is not needless: otherwise, we'd hold a &mut reference to self.ready_services,
// blocking us from passing &mut self to push_unready.
let ready_services = std::mem::take(&mut self.ready_services);
let futs = FuturesUnordered::new();
for (key, mut svc) in ready_services {
futs.push(svc.call(req.clone()).map_err(|_| ()));
self.push_unready(key, svc);
}
async move {
let results = futs.collect::<Vec<Result<_, _>>>().await;
tracing::debug!(
ok.len = results.iter().filter(|r| r.is_ok()).count(),
err.len = results.iter().filter(|r| r.is_err()).count(),
);
Ok(Response::Nil)
}
.boxed()
} }
} }
@ -361,19 +409,19 @@ where
} }
fn call(&mut self, req: Request) -> Self::Future { fn call(&mut self, req: Request) -> Self::Future {
let (key, mut svc) = self.best_peer_for(&req); match req {
// Only do inventory-aware routing on individual items.
// XXX add a dimension tagging request metrics by type Request::BlocksByHash(ref hashes) if hashes.len() == 1 => {
metrics::counter!( let hash = InventoryHash::from(*hashes.iter().next().unwrap());
"outbound_requests", self.route_inv(req, hash)
1, }
"key" => key.to_string(), Request::TransactionsByHash(ref hashes) if hashes.len() == 1 => {
); let hash = InventoryHash::from(*hashes.iter().next().unwrap());
self.route_inv(req, hash)
let fut = svc.call(req); }
self.push_unready(key, svc); Request::AdvertiseTransactions(_) => self.route_all(req),
Request::AdvertiseBlock(_) => self.route_all(req),
use futures::future::TryFutureExt; _ => self.route_p2c(req),
fut.map_err(Into::into).boxed() }
} }
} }

View File

@ -1,6 +1,9 @@
use std::collections::HashSet; use std::{collections::HashSet, sync::Arc};
use zebra_chain::block; use zebra_chain::{
block,
transaction::{self, Transaction},
};
use super::super::types::Nonce; use super::super::types::Nonce;
@ -39,11 +42,30 @@ pub enum Request {
/// didn't start with a `Vec` but with, e.g., an iterator, they can collect /// didn't start with a `Vec` but with, e.g., an iterator, they can collect
/// directly into a `HashSet` and save work. /// directly into a `HashSet` and save work.
/// ///
/// If this requests a recently-advertised block, the peer set will make a
/// best-effort attempt to route the request to a peer that advertised the
/// block. This routing is only used for request sets of size 1.
/// Otherwise, it is routed using the normal load-balancing strategy.
///
/// # Returns /// # Returns
/// ///
/// Returns [`Response::Blocks`](super::Response::Blocks). /// Returns [`Response::Blocks`](super::Response::Blocks).
BlocksByHash(HashSet<block::Hash>), BlocksByHash(HashSet<block::Hash>),
/// Request transactions by hash.
///
/// This uses a `HashSet` for the same reason as [`Request::BlocksByHash`].
///
/// If this requests a recently-advertised transaction, the peer set will
/// make a best-effort attempt to route the request to a peer that advertised
/// the transaction. This routing is only used for request sets of size 1.
/// Otherwise, it is routed using the normal load-balancing strategy.
///
/// # Returns
///
/// Returns [`Response::Transactions`](super::Response::Transactions).
TransactionsByHash(HashSet<transaction::Hash>),
/// Request block hashes of subsequent blocks in the chain, giving hashes of /// Request block hashes of subsequent blocks in the chain, giving hashes of
/// known blocks. /// known blocks.
/// ///
@ -69,4 +91,50 @@ pub enum Request {
/// Optionally, the last header to request. /// Optionally, the last header to request.
stop: Option<block::Hash>, stop: Option<block::Hash>,
}, },
/// Push a transaction to a remote peer, without advertising it to them first.
///
/// This is implemented by sending an unsolicited `tx` message.
///
/// # Returns
///
/// Returns [`Response::Nil`](super::Response::Nil).
PushTransaction(Arc<Transaction>),
/// Advertise a set of transactions to all peers.
///
/// This is intended to be used in Zebra with a single transaction at a time
/// (set of size 1), but multiple transactions are permitted because this is
/// how we interpret advertisements from Zcashd, which sometimes advertises
/// multiple transactions at once.
///
/// This is implemented by sending an `inv` message containing the
/// transaction hash, allowing the remote peer to choose whether to download
/// it. Remote peers who choose to download the transaction will generate a
/// [`Request::TransactionsByHash`] against the "inbound" service passed to
/// [`zebra_network::init`].
///
/// The peer set routes this request specially, sending it to *every*
/// available peer.
///
/// # Returns
///
/// Returns [`Response::Nil`](super::Response::Nil).
AdvertiseTransactions(HashSet<transaction::Hash>),
/// Advertise a block to all peers.
///
/// This is implemented by sending an `inv` message containing the
/// block hash, allowing the remote peer to choose whether to download
/// it. Remote peers who choose to download the transaction will generate a
/// [`Request::BlocksByHash`] against the "inbound" service passed to
/// [`zebra_network::init`].
///
/// The peer set routes this request specially, sending it to *every*
/// available peer.
///
/// # Returns
///
/// Returns [`Response::Nil`](super::Response::Nil).
AdvertiseBlock(block::Hash),
} }

View File

@ -1,4 +1,7 @@
use zebra_chain::block::{self, Block}; use zebra_chain::{
block::{self, Block},
transaction::Transaction,
};
use crate::meta_addr::MetaAddr; use crate::meta_addr::MetaAddr;
use std::sync::Arc; use std::sync::Arc;
@ -17,4 +20,7 @@ pub enum Response {
/// A list of block hashes. /// A list of block hashes.
BlockHashes(Vec<block::Hash>), BlockHashes(Vec<block::Hash>),
/// A list of transactions.
Transactions(Vec<Arc<Transaction>>),
} }