From 96c88093482bc1a2b3437f5239dd9036c647a21c Mon Sep 17 00:00:00 2001 From: Jane Lusby Date: Tue, 1 Sep 2020 14:28:54 -0700 Subject: [PATCH] Implement Inventory Tracking RFC (#963) * Add .cargo to the gitignore file * Implement Inventory Tracking RFC * checkpoint * wire together the inventory registry * add comment documenting condition * make inventory registry optional --- zebra-chain/src/transaction/hash.rs | 2 +- zebra-network/src/peer/handshake.rs | 39 +++++++++++- zebra-network/src/peer_set.rs | 2 + zebra-network/src/peer_set/initialize.rs | 8 ++- .../src/peer_set/inventory_registry.rs | 60 ++++++++++++++++++ zebra-network/src/peer_set/set.rs | 63 ++++++++++++++----- zebra-network/src/protocol/external/inv.rs | 2 +- 7 files changed, 154 insertions(+), 22 deletions(-) create mode 100644 zebra-network/src/peer_set/inventory_registry.rs diff --git a/zebra-chain/src/transaction/hash.rs b/zebra-chain/src/transaction/hash.rs index 9aa85e4b..322bf40b 100644 --- a/zebra-chain/src/transaction/hash.rs +++ b/zebra-chain/src/transaction/hash.rs @@ -13,7 +13,7 @@ use super::Transaction; /// /// TODO: I'm pretty sure this is also a SHA256d hash but I haven't /// confirmed it yet. -#[derive(Copy, Clone, Eq, PartialEq, Serialize, Deserialize)] +#[derive(Copy, Clone, Eq, PartialEq, Serialize, Deserialize, Hash)] #[cfg_attr(test, derive(Arbitrary))] pub struct Hash(pub [u8; 32]); diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index 77f881c8..f0bcd05f 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -12,7 +12,7 @@ use futures::{ channel::{mpsc, oneshot}, prelude::*, }; -use tokio::net::TcpStream; +use tokio::{net::TcpStream, sync::broadcast}; use tokio_util::codec::Framed; use tower::Service; use tracing::{span, Level}; @@ -23,7 +23,7 @@ use zebra_chain::block; use crate::{ constants, protocol::{ - external::{types::*, Codec, Message}, + external::{types::*, Codec, InventoryHash, Message}, internal::{Request, Response}, }, types::MetaAddr, @@ -39,6 +39,7 @@ pub struct Handshake { config: Config, inbound_service: S, timestamp_collector: mpsc::Sender, + inv_collector: broadcast::Sender<(InventoryHash, SocketAddr)>, nonces: Arc>>, user_agent: String, our_services: PeerServices, @@ -52,6 +53,7 @@ pub struct Builder { our_services: Option, user_agent: Option, relay: Option, + inv_collector: Option>, } impl Builder @@ -71,6 +73,15 @@ where self } + /// Provide a channel for registering inventory advertisements. Optional. + pub fn with_inventory_collector( + mut self, + inv_collector: broadcast::Sender<(InventoryHash, SocketAddr)>, + ) -> Self { + self.inv_collector = Some(inv_collector); + self + } + /// Provide a hook for timestamp collection. Optional. /// /// If this is unset, timestamps will not be collected. @@ -111,6 +122,10 @@ where let inbound_service = self .inbound_service .ok_or("did not specify inbound service")?; + let inv_collector = self.inv_collector.unwrap_or_else(|| { + let (tx, _) = broadcast::channel(100); + tx + }); let timestamp_collector = self.timestamp_collector.unwrap_or_else(|| { // No timestamp collector was passed, so create a stub channel. // Dropping the receiver means sends will fail, but we don't care. @@ -124,6 +139,7 @@ where Ok(Handshake { config, inbound_service, + inv_collector, timestamp_collector, nonces, user_agent, @@ -150,6 +166,7 @@ where user_agent: None, our_services: None, relay: None, + inv_collector: None, } } } @@ -181,6 +198,7 @@ where let nonces = self.nonces.clone(); let inbound_service = self.inbound_service.clone(); let timestamp_collector = self.timestamp_collector.clone(); + let inv_collector = self.inv_collector.clone(); let network = self.config.network; let our_addr = self.config.listen_addr; let user_agent = self.user_agent.clone(); @@ -374,6 +392,23 @@ where msg } }) + .then(move |msg| { + let inv_collector = inv_collector.clone(); + async move { + if let Ok(Message::Inv(hashes)) = &msg { + // We reject inventory messages with more than one + // item because they are most likely replies to a + // query rather than a newly gosipped block. + // + // https://zebra.zfnd.org/dev/rfcs/0003-inventory-tracking.html#inventory-monitoring + if hashes.len() == 1 { + let hash = hashes[0]; + let _ = inv_collector.send((hash, addr)); + } + } + msg + } + }) .boxed(); use super::connection; diff --git a/zebra-network/src/peer_set.rs b/zebra-network/src/peer_set.rs index 4c39df4c..412d7caf 100644 --- a/zebra-network/src/peer_set.rs +++ b/zebra-network/src/peer_set.rs @@ -1,9 +1,11 @@ mod candidate_set; mod initialize; +mod inventory_registry; mod set; mod unready_service; use candidate_set::CandidateSet; +use inventory_registry::InventoryRegistry; use set::PeerSet; pub use initialize::init; diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index 4980153b..a87819a2 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -14,7 +14,10 @@ use futures::{ sink::SinkExt, stream::{FuturesUnordered, StreamExt}, }; -use tokio::net::{TcpListener, TcpStream}; +use tokio::{ + net::{TcpListener, TcpStream}, + sync::broadcast, +}; use tower::{ buffer::Buffer, discover::{Change, ServiceStream}, @@ -49,6 +52,7 @@ where S::Future: Send + 'static, { let (address_book, timestamp_collector) = TimestampCollector::spawn(); + let (inv_sender, inv_receiver) = broadcast::channel(100); // Construct services that handle inbound handshakes and perform outbound // handshakes. These use the same handshake service internally to detect @@ -61,6 +65,7 @@ where let hs = peer::Handshake::builder() .with_config(config.clone()) .with_inbound_service(inbound_service) + .with_inventory_collector(inv_sender) .with_timestamp_collector(timestamp_collector) .with_advertised_services(PeerServices::NODE_NETWORK) .with_user_agent(crate::constants::USER_AGENT.to_string()) @@ -93,6 +98,7 @@ where ), demand_tx.clone(), handle_rx, + inv_receiver, ); let peer_set = Buffer::new(BoxService::new(peer_set), constants::PEERSET_BUFFER_SIZE); diff --git a/zebra-network/src/peer_set/inventory_registry.rs b/zebra-network/src/peer_set/inventory_registry.rs new file mode 100644 index 00000000..9adc2a0b --- /dev/null +++ b/zebra-network/src/peer_set/inventory_registry.rs @@ -0,0 +1,60 @@ +use crate::{protocol::external::InventoryHash, BoxedStdError}; +use futures::Stream; +use std::{ + collections::{HashMap, HashSet}, + net::SocketAddr, + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; +use tokio::{ + sync::broadcast, + time::{self, Interval}, +}; + +#[derive(Debug)] +pub struct InventoryRegistry { + current: HashMap>, + prev: HashMap>, + /// Stream of incoming inventory hashes to + inv_stream: broadcast::Receiver<(InventoryHash, SocketAddr)>, + interval: Interval, +} + +impl InventoryRegistry { + pub fn new(inv_stream: broadcast::Receiver<(InventoryHash, SocketAddr)>) -> Self { + Self { + current: Default::default(), + prev: Default::default(), + inv_stream, + interval: time::interval(Duration::from_secs(75)), + } + } + + pub fn peers(&self, hash: &InventoryHash) -> impl Iterator { + let prev = self.prev.get(hash).into_iter(); + let current = self.current.get(hash).into_iter(); + + prev.chain(current).flatten() + } + + pub fn poll_inventory(&mut self, cx: &mut Context<'_>) -> Result<(), BoxedStdError> { + while let Poll::Ready(_) = self.interval.poll_tick(cx) { + self.rotate(); + } + + while let Poll::Ready(Some((hash, addr))) = Pin::new(&mut self.inv_stream).poll_next(cx)? { + self.register(hash, addr) + } + + Ok(()) + } + + fn register(&mut self, hash: InventoryHash, addr: SocketAddr) { + self.current.entry(hash).or_default().insert(addr); + } + + fn rotate(&mut self) { + self.prev = std::mem::take(&mut self.current); + } +} diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index f8fbd554..59c3f9eb 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -1,3 +1,4 @@ +use std::net::SocketAddr; use std::{ collections::HashMap, convert::TryInto, @@ -14,7 +15,7 @@ use futures::{ stream::FuturesUnordered, }; use indexmap::IndexMap; -use tokio::sync::oneshot::error::TryRecvError; +use tokio::sync::{broadcast, oneshot::error::TryRecvError}; use tokio::task::JoinHandle; use tower::{ discover::{Change, Discover}, @@ -23,11 +24,17 @@ use tower::{ use tower_load::Load; use crate::{ - protocol::internal::{Request, Response}, + protocol::{ + external::InventoryHash, + internal::{Request, Response}, + }, BoxedStdError, }; -use super::unready_service::{Error as UnreadyError, UnreadyService}; +use super::{ + unready_service::{Error as UnreadyError, UnreadyService}, + InventoryRegistry, +}; /// A [`tower::Service`] that abstractly represents "the rest of the network". /// @@ -71,7 +78,7 @@ use super::unready_service::{Error as UnreadyError, UnreadyService}; /// [p2c]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf pub struct PeerSet where - D: Discover, + D: Discover, { discover: D, ready_services: IndexMap, @@ -88,12 +95,12 @@ where /// These guards are checked for errors as part of `poll_ready` which lets /// the `PeerSet` propagate errors from background tasks back to the user guards: futures::stream::FuturesUnordered>>, + inventory_registry: InventoryRegistry, } impl PeerSet where - D: Discover + Unpin, - D::Key: Clone + Debug, + D: Discover + Unpin, D::Service: Service + Load, D::Error: Into, >::Error: Into + 'static, @@ -105,6 +112,7 @@ where discover: D, demand_signal: mpsc::Sender<()>, handle_rx: tokio::sync::oneshot::Receiver>>>, + inv_stream: broadcast::Receiver<(InventoryHash, SocketAddr)>, ) -> Self { Self { discover, @@ -115,6 +123,7 @@ where demand_signal, guards: futures::stream::FuturesUnordered::new(), handle_rx, + inventory_registry: InventoryRegistry::new(inv_stream), } } @@ -160,7 +169,7 @@ where fn push_unready(&mut self, key: D::Key, svc: D::Service) { let (tx, rx) = oneshot::channel(); - self.cancel_handles.insert(key.clone(), tx); + self.cancel_handles.insert(key, tx); self.unready_services.push(UnreadyService { key: Some(key), service: Some(svc), @@ -250,12 +259,38 @@ where let (_, svc) = self.ready_services.get_index(index).expect("invalid index"); svc.load() } + + fn best_peer_for(&mut self, req: &Request) -> (SocketAddr, D::Service) { + if let Request::BlocksByHash(hashes) = req { + for hash in hashes.iter() { + let mut peers = self.inventory_registry.peers(&(*hash).into()); + if let Some(index) = peers.find_map(|addr| self.ready_services.get_index_of(addr)) { + return self + .ready_services + .swap_remove_index(index) + .expect("found index must be valid"); + } + } + } + + self.default_peer() + } + + fn default_peer(&mut self) -> (SocketAddr, D::Service) { + let index = self + .next_idx + .take() + .expect("ready service must have valid preselected index"); + + self.ready_services + .swap_remove_index(index) + .expect("preselected index must be valid") + } } impl Service for PeerSet where - D: Discover + Unpin, - D::Key: Clone + Debug + ToString, + D: Discover + Unpin, D::Service: Service + Load, D::Error: Into, >::Error: Into + 'static, @@ -271,6 +306,7 @@ where self.check_for_background_errors(cx)?; // Process peer discovery updates. let _ = self.poll_discover(cx)?; + self.inventory_registry.poll_inventory(cx)?; // Poll unready services to drive them to readiness. self.poll_unready(cx); @@ -325,14 +361,7 @@ where } fn call(&mut self, req: Request) -> Self::Future { - let index = self - .next_idx - .take() - .expect("ready service must have valid preselected index"); - let (key, mut svc) = self - .ready_services - .swap_remove_index(index) - .expect("preselected index must be valid"); + let (key, mut svc) = self.best_peer_for(&req); // XXX add a dimension tagging request metrics by type metrics::counter!( diff --git a/zebra-network/src/protocol/external/inv.rs b/zebra-network/src/protocol/external/inv.rs index d85401a6..9a558402 100644 --- a/zebra-network/src/protocol/external/inv.rs +++ b/zebra-network/src/protocol/external/inv.rs @@ -20,7 +20,7 @@ use zebra_chain::{ /// container, so we do not use that term to avoid confusion with `Vec`. /// /// [Bitcoin reference](https://en.bitcoin.it/wiki/Protocol_documentation#Inventory_Vectors) -#[derive(Copy, Clone, Debug, Eq, PartialEq)] +#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] pub enum InventoryHash { /// An error. ///