Implement Inventory Tracking RFC (#963)

* Add .cargo to the gitignore file

* Implement Inventory Tracking RFC

* checkpoint

* wire together the inventory registry

* add comment documenting condition

* make inventory registry optional
This commit is contained in:
Jane Lusby 2020-09-01 14:28:54 -07:00 committed by GitHub
parent f91b91b6d8
commit 96c8809348
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 154 additions and 22 deletions

View File

@ -13,7 +13,7 @@ use super::Transaction;
/// ///
/// TODO: I'm pretty sure this is also a SHA256d hash but I haven't /// TODO: I'm pretty sure this is also a SHA256d hash but I haven't
/// confirmed it yet. /// confirmed it yet.
#[derive(Copy, Clone, Eq, PartialEq, Serialize, Deserialize)] #[derive(Copy, Clone, Eq, PartialEq, Serialize, Deserialize, Hash)]
#[cfg_attr(test, derive(Arbitrary))] #[cfg_attr(test, derive(Arbitrary))]
pub struct Hash(pub [u8; 32]); pub struct Hash(pub [u8; 32]);

View File

@ -12,7 +12,7 @@ use futures::{
channel::{mpsc, oneshot}, channel::{mpsc, oneshot},
prelude::*, prelude::*,
}; };
use tokio::net::TcpStream; use tokio::{net::TcpStream, sync::broadcast};
use tokio_util::codec::Framed; use tokio_util::codec::Framed;
use tower::Service; use tower::Service;
use tracing::{span, Level}; use tracing::{span, Level};
@ -23,7 +23,7 @@ use zebra_chain::block;
use crate::{ use crate::{
constants, constants,
protocol::{ protocol::{
external::{types::*, Codec, Message}, external::{types::*, Codec, InventoryHash, Message},
internal::{Request, Response}, internal::{Request, Response},
}, },
types::MetaAddr, types::MetaAddr,
@ -39,6 +39,7 @@ pub struct Handshake<S> {
config: Config, config: Config,
inbound_service: S, inbound_service: S,
timestamp_collector: mpsc::Sender<MetaAddr>, timestamp_collector: mpsc::Sender<MetaAddr>,
inv_collector: broadcast::Sender<(InventoryHash, SocketAddr)>,
nonces: Arc<Mutex<HashSet<Nonce>>>, nonces: Arc<Mutex<HashSet<Nonce>>>,
user_agent: String, user_agent: String,
our_services: PeerServices, our_services: PeerServices,
@ -52,6 +53,7 @@ pub struct Builder<S> {
our_services: Option<PeerServices>, our_services: Option<PeerServices>,
user_agent: Option<String>, user_agent: Option<String>,
relay: Option<bool>, relay: Option<bool>,
inv_collector: Option<broadcast::Sender<(InventoryHash, SocketAddr)>>,
} }
impl<S> Builder<S> impl<S> Builder<S>
@ -71,6 +73,15 @@ where
self self
} }
/// Provide a channel for registering inventory advertisements. Optional.
pub fn with_inventory_collector(
mut self,
inv_collector: broadcast::Sender<(InventoryHash, SocketAddr)>,
) -> Self {
self.inv_collector = Some(inv_collector);
self
}
/// Provide a hook for timestamp collection. Optional. /// Provide a hook for timestamp collection. Optional.
/// ///
/// If this is unset, timestamps will not be collected. /// If this is unset, timestamps will not be collected.
@ -111,6 +122,10 @@ where
let inbound_service = self let inbound_service = self
.inbound_service .inbound_service
.ok_or("did not specify inbound service")?; .ok_or("did not specify inbound service")?;
let inv_collector = self.inv_collector.unwrap_or_else(|| {
let (tx, _) = broadcast::channel(100);
tx
});
let timestamp_collector = self.timestamp_collector.unwrap_or_else(|| { let timestamp_collector = self.timestamp_collector.unwrap_or_else(|| {
// No timestamp collector was passed, so create a stub channel. // No timestamp collector was passed, so create a stub channel.
// Dropping the receiver means sends will fail, but we don't care. // Dropping the receiver means sends will fail, but we don't care.
@ -124,6 +139,7 @@ where
Ok(Handshake { Ok(Handshake {
config, config,
inbound_service, inbound_service,
inv_collector,
timestamp_collector, timestamp_collector,
nonces, nonces,
user_agent, user_agent,
@ -150,6 +166,7 @@ where
user_agent: None, user_agent: None,
our_services: None, our_services: None,
relay: None, relay: None,
inv_collector: None,
} }
} }
} }
@ -181,6 +198,7 @@ where
let nonces = self.nonces.clone(); let nonces = self.nonces.clone();
let inbound_service = self.inbound_service.clone(); let inbound_service = self.inbound_service.clone();
let timestamp_collector = self.timestamp_collector.clone(); let timestamp_collector = self.timestamp_collector.clone();
let inv_collector = self.inv_collector.clone();
let network = self.config.network; let network = self.config.network;
let our_addr = self.config.listen_addr; let our_addr = self.config.listen_addr;
let user_agent = self.user_agent.clone(); let user_agent = self.user_agent.clone();
@ -374,6 +392,23 @@ where
msg msg
} }
}) })
.then(move |msg| {
let inv_collector = inv_collector.clone();
async move {
if let Ok(Message::Inv(hashes)) = &msg {
// We reject inventory messages with more than one
// item because they are most likely replies to a
// query rather than a newly gosipped block.
//
// https://zebra.zfnd.org/dev/rfcs/0003-inventory-tracking.html#inventory-monitoring
if hashes.len() == 1 {
let hash = hashes[0];
let _ = inv_collector.send((hash, addr));
}
}
msg
}
})
.boxed(); .boxed();
use super::connection; use super::connection;

View File

@ -1,9 +1,11 @@
mod candidate_set; mod candidate_set;
mod initialize; mod initialize;
mod inventory_registry;
mod set; mod set;
mod unready_service; mod unready_service;
use candidate_set::CandidateSet; use candidate_set::CandidateSet;
use inventory_registry::InventoryRegistry;
use set::PeerSet; use set::PeerSet;
pub use initialize::init; pub use initialize::init;

View File

@ -14,7 +14,10 @@ use futures::{
sink::SinkExt, sink::SinkExt,
stream::{FuturesUnordered, StreamExt}, stream::{FuturesUnordered, StreamExt},
}; };
use tokio::net::{TcpListener, TcpStream}; use tokio::{
net::{TcpListener, TcpStream},
sync::broadcast,
};
use tower::{ use tower::{
buffer::Buffer, buffer::Buffer,
discover::{Change, ServiceStream}, discover::{Change, ServiceStream},
@ -49,6 +52,7 @@ where
S::Future: Send + 'static, S::Future: Send + 'static,
{ {
let (address_book, timestamp_collector) = TimestampCollector::spawn(); let (address_book, timestamp_collector) = TimestampCollector::spawn();
let (inv_sender, inv_receiver) = broadcast::channel(100);
// Construct services that handle inbound handshakes and perform outbound // Construct services that handle inbound handshakes and perform outbound
// handshakes. These use the same handshake service internally to detect // handshakes. These use the same handshake service internally to detect
@ -61,6 +65,7 @@ where
let hs = peer::Handshake::builder() let hs = peer::Handshake::builder()
.with_config(config.clone()) .with_config(config.clone())
.with_inbound_service(inbound_service) .with_inbound_service(inbound_service)
.with_inventory_collector(inv_sender)
.with_timestamp_collector(timestamp_collector) .with_timestamp_collector(timestamp_collector)
.with_advertised_services(PeerServices::NODE_NETWORK) .with_advertised_services(PeerServices::NODE_NETWORK)
.with_user_agent(crate::constants::USER_AGENT.to_string()) .with_user_agent(crate::constants::USER_AGENT.to_string())
@ -93,6 +98,7 @@ where
), ),
demand_tx.clone(), demand_tx.clone(),
handle_rx, handle_rx,
inv_receiver,
); );
let peer_set = Buffer::new(BoxService::new(peer_set), constants::PEERSET_BUFFER_SIZE); let peer_set = Buffer::new(BoxService::new(peer_set), constants::PEERSET_BUFFER_SIZE);

View File

@ -0,0 +1,60 @@
use crate::{protocol::external::InventoryHash, BoxedStdError};
use futures::Stream;
use std::{
collections::{HashMap, HashSet},
net::SocketAddr,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use tokio::{
sync::broadcast,
time::{self, Interval},
};
#[derive(Debug)]
pub struct InventoryRegistry {
current: HashMap<InventoryHash, HashSet<SocketAddr>>,
prev: HashMap<InventoryHash, HashSet<SocketAddr>>,
/// Stream of incoming inventory hashes to
inv_stream: broadcast::Receiver<(InventoryHash, SocketAddr)>,
interval: Interval,
}
impl InventoryRegistry {
pub fn new(inv_stream: broadcast::Receiver<(InventoryHash, SocketAddr)>) -> Self {
Self {
current: Default::default(),
prev: Default::default(),
inv_stream,
interval: time::interval(Duration::from_secs(75)),
}
}
pub fn peers(&self, hash: &InventoryHash) -> impl Iterator<Item = &SocketAddr> {
let prev = self.prev.get(hash).into_iter();
let current = self.current.get(hash).into_iter();
prev.chain(current).flatten()
}
pub fn poll_inventory(&mut self, cx: &mut Context<'_>) -> Result<(), BoxedStdError> {
while let Poll::Ready(_) = self.interval.poll_tick(cx) {
self.rotate();
}
while let Poll::Ready(Some((hash, addr))) = Pin::new(&mut self.inv_stream).poll_next(cx)? {
self.register(hash, addr)
}
Ok(())
}
fn register(&mut self, hash: InventoryHash, addr: SocketAddr) {
self.current.entry(hash).or_default().insert(addr);
}
fn rotate(&mut self) {
self.prev = std::mem::take(&mut self.current);
}
}

View File

@ -1,3 +1,4 @@
use std::net::SocketAddr;
use std::{ use std::{
collections::HashMap, collections::HashMap,
convert::TryInto, convert::TryInto,
@ -14,7 +15,7 @@ use futures::{
stream::FuturesUnordered, stream::FuturesUnordered,
}; };
use indexmap::IndexMap; use indexmap::IndexMap;
use tokio::sync::oneshot::error::TryRecvError; use tokio::sync::{broadcast, oneshot::error::TryRecvError};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tower::{ use tower::{
discover::{Change, Discover}, discover::{Change, Discover},
@ -23,11 +24,17 @@ use tower::{
use tower_load::Load; use tower_load::Load;
use crate::{ use crate::{
protocol::internal::{Request, Response}, protocol::{
external::InventoryHash,
internal::{Request, Response},
},
BoxedStdError, BoxedStdError,
}; };
use super::unready_service::{Error as UnreadyError, UnreadyService}; use super::{
unready_service::{Error as UnreadyError, UnreadyService},
InventoryRegistry,
};
/// A [`tower::Service`] that abstractly represents "the rest of the network". /// A [`tower::Service`] that abstractly represents "the rest of the network".
/// ///
@ -71,7 +78,7 @@ use super::unready_service::{Error as UnreadyError, UnreadyService};
/// [p2c]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf /// [p2c]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf
pub struct PeerSet<D> pub struct PeerSet<D>
where where
D: Discover, D: Discover<Key = SocketAddr>,
{ {
discover: D, discover: D,
ready_services: IndexMap<D::Key, D::Service>, ready_services: IndexMap<D::Key, D::Service>,
@ -88,12 +95,12 @@ where
/// These guards are checked for errors as part of `poll_ready` which lets /// These guards are checked for errors as part of `poll_ready` which lets
/// the `PeerSet` propagate errors from background tasks back to the user /// the `PeerSet` propagate errors from background tasks back to the user
guards: futures::stream::FuturesUnordered<JoinHandle<Result<(), BoxedStdError>>>, guards: futures::stream::FuturesUnordered<JoinHandle<Result<(), BoxedStdError>>>,
inventory_registry: InventoryRegistry,
} }
impl<D> PeerSet<D> impl<D> PeerSet<D>
where where
D: Discover + Unpin, D: Discover<Key = SocketAddr> + Unpin,
D::Key: Clone + Debug,
D::Service: Service<Request, Response = Response> + Load, D::Service: Service<Request, Response = Response> + Load,
D::Error: Into<BoxedStdError>, D::Error: Into<BoxedStdError>,
<D::Service as Service<Request>>::Error: Into<BoxedStdError> + 'static, <D::Service as Service<Request>>::Error: Into<BoxedStdError> + 'static,
@ -105,6 +112,7 @@ where
discover: D, discover: D,
demand_signal: mpsc::Sender<()>, demand_signal: mpsc::Sender<()>,
handle_rx: tokio::sync::oneshot::Receiver<Vec<JoinHandle<Result<(), BoxedStdError>>>>, handle_rx: tokio::sync::oneshot::Receiver<Vec<JoinHandle<Result<(), BoxedStdError>>>>,
inv_stream: broadcast::Receiver<(InventoryHash, SocketAddr)>,
) -> Self { ) -> Self {
Self { Self {
discover, discover,
@ -115,6 +123,7 @@ where
demand_signal, demand_signal,
guards: futures::stream::FuturesUnordered::new(), guards: futures::stream::FuturesUnordered::new(),
handle_rx, handle_rx,
inventory_registry: InventoryRegistry::new(inv_stream),
} }
} }
@ -160,7 +169,7 @@ where
fn push_unready(&mut self, key: D::Key, svc: D::Service) { fn push_unready(&mut self, key: D::Key, svc: D::Service) {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
self.cancel_handles.insert(key.clone(), tx); self.cancel_handles.insert(key, tx);
self.unready_services.push(UnreadyService { self.unready_services.push(UnreadyService {
key: Some(key), key: Some(key),
service: Some(svc), service: Some(svc),
@ -250,12 +259,38 @@ where
let (_, svc) = self.ready_services.get_index(index).expect("invalid index"); let (_, svc) = self.ready_services.get_index(index).expect("invalid index");
svc.load() svc.load()
} }
fn best_peer_for(&mut self, req: &Request) -> (SocketAddr, D::Service) {
if let Request::BlocksByHash(hashes) = req {
for hash in hashes.iter() {
let mut peers = self.inventory_registry.peers(&(*hash).into());
if let Some(index) = peers.find_map(|addr| self.ready_services.get_index_of(addr)) {
return self
.ready_services
.swap_remove_index(index)
.expect("found index must be valid");
}
}
}
self.default_peer()
}
fn default_peer(&mut self) -> (SocketAddr, D::Service) {
let index = self
.next_idx
.take()
.expect("ready service must have valid preselected index");
self.ready_services
.swap_remove_index(index)
.expect("preselected index must be valid")
}
} }
impl<D> Service<Request> for PeerSet<D> impl<D> Service<Request> for PeerSet<D>
where where
D: Discover + Unpin, D: Discover<Key = SocketAddr> + Unpin,
D::Key: Clone + Debug + ToString,
D::Service: Service<Request, Response = Response> + Load, D::Service: Service<Request, Response = Response> + Load,
D::Error: Into<BoxedStdError>, D::Error: Into<BoxedStdError>,
<D::Service as Service<Request>>::Error: Into<BoxedStdError> + 'static, <D::Service as Service<Request>>::Error: Into<BoxedStdError> + 'static,
@ -271,6 +306,7 @@ where
self.check_for_background_errors(cx)?; self.check_for_background_errors(cx)?;
// Process peer discovery updates. // Process peer discovery updates.
let _ = self.poll_discover(cx)?; let _ = self.poll_discover(cx)?;
self.inventory_registry.poll_inventory(cx)?;
// Poll unready services to drive them to readiness. // Poll unready services to drive them to readiness.
self.poll_unready(cx); self.poll_unready(cx);
@ -325,14 +361,7 @@ where
} }
fn call(&mut self, req: Request) -> Self::Future { fn call(&mut self, req: Request) -> Self::Future {
let index = self let (key, mut svc) = self.best_peer_for(&req);
.next_idx
.take()
.expect("ready service must have valid preselected index");
let (key, mut svc) = self
.ready_services
.swap_remove_index(index)
.expect("preselected index must be valid");
// XXX add a dimension tagging request metrics by type // XXX add a dimension tagging request metrics by type
metrics::counter!( metrics::counter!(

View File

@ -20,7 +20,7 @@ use zebra_chain::{
/// container, so we do not use that term to avoid confusion with `Vec<T>`. /// container, so we do not use that term to avoid confusion with `Vec<T>`.
/// ///
/// [Bitcoin reference](https://en.bitcoin.it/wiki/Protocol_documentation#Inventory_Vectors) /// [Bitcoin reference](https://en.bitcoin.it/wiki/Protocol_documentation#Inventory_Vectors)
#[derive(Copy, Clone, Debug, Eq, PartialEq)] #[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
pub enum InventoryHash { pub enum InventoryHash {
/// An error. /// An error.
/// ///