From 98502d618149e456e97e8c6c716f806875564963 Mon Sep 17 00:00:00 2001 From: teor Date: Mon, 7 Feb 2022 09:05:52 +1000 Subject: [PATCH] 1. Create an API for a missing inventory registry, but don't register any missing inventory yet (#3255) * feat(network): create an API for registering missing inventory, but don't use it yet * feat(constraint): implement AtLeastOne::iter_mut() * refactor(network): add InventoryStatus::marker() method to remove associated data * fix(network): prefer current inventory, and missing inventory statuses * fix(network): if an inventory rotation is missed, delay future rotations * fix(network): don't immediately rotate a new empty inventory registry * fix(network): assert that only expected inventory variants are stored in the registry * test(network): add a basic empty inventory registry test Also adds an inventory registry update future, which makes it easier to call from an async context. * refactor(network): add a convenience API for new InventoryChanges * feat(network): improve inventory registry logging and metrics * test(network): make sure advertised and missing inventory is correctly registered * test(network): check that missing inventory is preferred over advertised * test(network): check that current inventory is preferred over previous * test(network): check peer set routes inv requests to advertised peers * refactor(network): make the InventoryChange API more flexible Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- zebra-chain/src/serialization/constraint.rs | 36 ++- zebra-network/src/peer/handshake.rs | 47 ++- zebra-network/src/peer_set.rs | 1 + .../src/peer_set/inventory_registry.rs | 299 ++++++++++++++++-- .../src/peer_set/inventory_registry/tests.rs | 3 + .../inventory_registry/tests/vectors.rs | 198 ++++++++++++ .../src/peer_set/inventory_registry/update.rs | 36 +++ zebra-network/src/peer_set/set.rs | 8 +- zebra-network/src/peer_set/set/tests.rs | 38 ++- .../src/peer_set/set/tests/vectors.rs | 167 +++++++++- zebra-network/src/protocol/external/inv.rs | 1 - 11 files changed, 771 insertions(+), 63 deletions(-) create mode 100644 zebra-network/src/peer_set/inventory_registry/tests.rs create mode 100644 zebra-network/src/peer_set/inventory_registry/tests/vectors.rs create mode 100644 zebra-network/src/peer_set/inventory_registry/update.rs diff --git a/zebra-chain/src/serialization/constraint.rs b/zebra-chain/src/serialization/constraint.rs index 3c9effa6..262d4e37 100644 --- a/zebra-chain/src/serialization/constraint.rs +++ b/zebra-chain/src/serialization/constraint.rs @@ -147,7 +147,7 @@ where } } -// Deref (but not DerefMut, because that could break the constraint) +// Deref and AsRef (but not DerefMut or AsMut, because that could break the constraint) impl Deref for AtLeastOne { type Target = Vec; @@ -157,6 +157,12 @@ impl Deref for AtLeastOne { } } +impl AsRef<[T]> for AtLeastOne { + fn as_ref(&self) -> &[T] { + self.inner.as_ref() + } +} + // Extracting one or more items impl From> for Vec { @@ -165,7 +171,35 @@ impl From> for Vec { } } +// `IntoIterator` for `T` and `&mut T`, because iterators can't remove items + +impl IntoIterator for AtLeastOne { + type Item = T; + + type IntoIter = std::vec::IntoIter; + + fn into_iter(self) -> std::vec::IntoIter { + self.inner.into_iter() + } +} + impl AtLeastOne { + /// Returns an iterator that allows modifying each value. + pub fn iter_mut(&mut self) -> std::slice::IterMut<'_, T> { + self.inner.iter_mut() + } +} + +impl AtLeastOne { + /// Returns a new `AtLeastOne`, containing a single `item`. + /// + /// Skips the `TrustedPreallocate` memory denial of service checks. + /// (`TrustedPreallocate` can not defend against a single item + /// that causes a denial of service by itself.) + pub fn from_one(item: T) -> AtLeastOne { + AtLeastOne { inner: vec![item] } + } + /// Returns a reference to the inner vector. pub fn as_vec(&self) -> &Vec { &self.inner diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index fa8244c6..d8fb5f8f 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -38,7 +38,7 @@ use crate::{ CancelHeartbeatTask, Client, ClientRequest, Connection, ErrorSlot, HandshakeError, MinimumPeerVersion, PeerError, }, - peer_set::ConnectionTracker, + peer_set::{ConnectionTracker, InventoryChange}, protocol::{ external::{types::*, AddrInVersion, Codec, InventoryHash, Message}, internal::{Request, Response}, @@ -68,7 +68,7 @@ where inbound_service: S, address_book_updater: tokio::sync::mpsc::Sender, - inv_collector: broadcast::Sender<(InventoryHash, SocketAddr)>, + inv_collector: broadcast::Sender, minimum_peer_version: MinimumPeerVersion, nonces: Arc>>, @@ -349,7 +349,7 @@ where inbound_service: Option, address_book_updater: Option>, - inv_collector: Option>, + inv_collector: Option>, latest_chain_tip: C, } @@ -377,7 +377,7 @@ where /// to look up peers that have specific inventory. pub fn with_inventory_collector( mut self, - inv_collector: broadcast::Sender<(InventoryHash, SocketAddr)>, + inv_collector: broadcast::Sender, ) -> Self { self.inv_collector = Some(inv_collector); self @@ -930,24 +930,37 @@ where // // https://zebra.zfnd.org/dev/rfcs/0003-inventory-tracking.html#inventory-monitoring // - // TODO: zcashd has a bug where it merges queued inv messages of - // the same or different types. So Zebra should split small - // merged inv messages into separate inv messages. (#1768) + // Note: zcashd has a bug where it merges queued inv messages of + // the same or different types. Zebra compensates by sending `notfound` + // responses to the inv collector. (#2156, #1768) + // + // (We can't split `inv`s, because that fills the inventory registry + // with useless entries that the whole network has, making it large and slow.) match hashes.as_slice() { [hash @ InventoryHash::Block(_)] => { debug!(?hash, "registering gossiped block inventory for peer"); - let _ = inv_collector.send((*hash, transient_addr)); + + // The peer set and inv collector use the peer's remote + // address as an identifier + let _ = inv_collector.send(InventoryChange::new_advertised( + *hash, + transient_addr, + )); } [hashes @ ..] => { - for hash in hashes { - if let Some(unmined_tx_id) = hash.unmined_tx_id() { - debug!(?unmined_tx_id, "registering unmined transaction inventory for peer"); - // The peer set and inv collector use the peer's remote - // address as an identifier - let _ = inv_collector.send((*hash, transient_addr)); - } else { - trace!(?hash, "ignoring non-transaction inventory hash in multi-hash list") - } + let hashes = + hashes.iter().filter(|hash| hash.unmined_tx_id().is_some()); + + debug!( + ?hashes, + "registering unmined transaction inventory for peer" + ); + + if let Some(change) = InventoryChange::new_advertised_multi( + hashes, + transient_addr, + ) { + let _ = inv_collector.send(change); } } } diff --git a/zebra-network/src/peer_set.rs b/zebra-network/src/peer_set.rs index 04a2476d..8ef53db4 100644 --- a/zebra-network/src/peer_set.rs +++ b/zebra-network/src/peer_set.rs @@ -6,6 +6,7 @@ mod set; mod unready_service; pub(crate) use candidate_set::CandidateSet; +pub(crate) use inventory_registry::InventoryChange; pub(crate) use limit::{ActiveConnectionCounter, ConnectionTracker}; use inventory_registry::InventoryRegistry; diff --git a/zebra-network/src/peer_set/inventory_registry.rs b/zebra-network/src/peer_set/inventory_registry.rs index 291179e9..c16ae9f2 100644 --- a/zebra-network/src/peer_set/inventory_registry.rs +++ b/zebra-network/src/peer_set/inventory_registry.rs @@ -3,7 +3,8 @@ //! [RFC]: https://zebra.zfnd.org/dev/rfcs/0003-inventory-tracking.html use std::{ - collections::{HashMap, HashSet}, + collections::HashMap, + convert::TryInto, net::SocketAddr, pin::Pin, task::{Context, Poll}, @@ -11,31 +12,71 @@ use std::{ }; use futures::{FutureExt, Stream, StreamExt}; -use tokio::{sync::broadcast, time}; +use tokio::{ + sync::broadcast, + time::{self, Instant}, +}; use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream, IntervalStream}; +use zebra_chain::{parameters::POST_BLOSSOM_POW_TARGET_SPACING, serialization::AtLeastOne}; + use crate::{protocol::external::InventoryHash, BoxError}; -/// An Inventory Registry for tracking recent inventory advertisements by peer. +use self::update::Update; + +use InventoryStatus::*; + +pub mod update; + +#[cfg(test)] +mod tests; + +/// A peer inventory status change, used in the inventory status channel. +pub type InventoryChange = InventoryStatus<(AtLeastOne, SocketAddr)>; + +/// An internal marker used in inventory status hash maps. +type InventoryMarker = InventoryStatus<()>; + +/// A generic peer inventory status. +/// +/// `Advertised` is used for inventory that peers claim to have, +/// and `Missing` is used for inventory they didn't provide when we requested it. +#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] +pub enum InventoryStatus { + /// An advertised inventory hash. + /// + /// For performance reasons, advertisements should only be tracked + /// for hashes that are rare on the network. + /// So Zebra only tracks single-block inventory messages. + Advertised(T), + + /// An inventory hash rejected by a peer. + /// + /// For security reasons, all `notfound` rejections should be tracked. + /// This also helps with performance, if the hash is rare on the network. + #[allow(dead_code)] + Missing(T), +} + +/// An Inventory Registry for tracking recent inventory advertisements and missing inventory. /// /// For more details please refer to the [RFC]. /// /// [RFC]: https://zebra.zfnd.org/dev/rfcs/0003-inventory-tracking.html pub struct InventoryRegistry { - /// Map tracking the inventory advertisements from the current interval - /// period - current: HashMap>, - /// Map tracking inventory advertisements from the previous interval period - prev: HashMap>, - /// Stream of incoming inventory hashes to register + /// Map tracking the latest inventory status from the current interval + /// period. + current: HashMap>, + + /// Map tracking inventory statuses from the previous interval period. + prev: HashMap>, + + /// Stream of incoming inventory statuses to register. inv_stream: Pin< - Box< - dyn Stream> - + Send - + 'static, - >, + Box> + Send + 'static>, >, - /// Interval tracking how frequently we should rotate our maps + + /// Interval tracking when we should next rotate our maps. interval: IntervalStream, } @@ -48,24 +89,176 @@ impl std::fmt::Debug for InventoryRegistry { } } +impl InventoryChange { + /// Returns a new advertised inventory change from a single hash. + pub fn new_advertised(hash: InventoryHash, peer: SocketAddr) -> Self { + InventoryStatus::Advertised((AtLeastOne::from_one(hash), peer)) + } + + /// Returns a new missing inventory change from a single hash. + #[allow(dead_code)] + pub fn new_missing(hash: InventoryHash, peer: SocketAddr) -> Self { + InventoryStatus::Missing((AtLeastOne::from_one(hash), peer)) + } + + /// Returns a new advertised multiple inventory change, if `hashes` contains at least one change. + pub fn new_advertised_multi<'a>( + hashes: impl IntoIterator, + peer: SocketAddr, + ) -> Option { + let hashes: Vec = hashes.into_iter().copied().collect(); + let hashes = hashes.try_into().ok(); + + hashes.map(|hashes| InventoryStatus::Advertised((hashes, peer))) + } + + /// Returns a new missing multiple inventory change, if `hashes` contains at least one change. + #[allow(dead_code)] + pub fn new_missing_multi<'a>( + hashes: impl IntoIterator, + peer: SocketAddr, + ) -> Option { + let hashes: Vec = hashes.into_iter().copied().collect(); + let hashes = hashes.try_into().ok(); + + hashes.map(|hashes| InventoryStatus::Missing((hashes, peer))) + } +} + +impl InventoryStatus { + /// Returns true if the inventory item was advertised. + #[allow(dead_code)] + pub fn is_advertised(&self) -> bool { + matches!(self, Advertised(_)) + } + + /// Returns true if the inventory item was missing. + #[allow(dead_code)] + pub fn is_missing(&self) -> bool { + matches!(self, Missing(_)) + } + + /// Get the advertised inventory item, if present. + pub fn advertised(&self) -> Option { + if let Advertised(item) = self { + Some(item.clone()) + } else { + None + } + } + + /// Get the rejected inventory item, if present. + #[allow(dead_code)] + pub fn missing(&self) -> Option { + if let Missing(item) = self { + Some(item.clone()) + } else { + None + } + } + + /// Get the inner item, regardless of status. + pub fn inner(&self) -> T { + match self { + Advertised(item) | Missing(item) => item.clone(), + } + } + + /// Get a marker for the status, without any associated data. + pub fn marker(&self) -> InventoryMarker { + self.as_ref().map(|_inner| ()) + } + + /// Maps an `InventoryStatus` to `InventoryStatus` by applying a function to a contained value. + pub fn map U>(self, f: F) -> InventoryStatus { + // Based on Option::map from https://doc.rust-lang.org/src/core/option.rs.html#829 + match self { + Advertised(item) => Advertised(f(item)), + Missing(item) => Missing(f(item)), + } + } + + /// Converts from `&InventoryStatus` to `InventoryStatus<&T>`. + pub fn as_ref(&self) -> InventoryStatus<&T> { + match self { + Advertised(item) => Advertised(item), + Missing(item) => Missing(item), + } + } +} + impl InventoryRegistry { - /// Returns an Inventory Registry - pub fn new(inv_stream: broadcast::Receiver<(InventoryHash, SocketAddr)>) -> Self { + /// Returns a new Inventory Registry for `inv_stream`. + pub fn new(inv_stream: broadcast::Receiver) -> Self { + let interval = Duration::from_secs( + POST_BLOSSOM_POW_TARGET_SPACING + .try_into() + .expect("non-negative"), + ); + + // Don't do an immediate rotation, current and prev are already empty. + let mut interval = tokio::time::interval_at(Instant::now() + interval, interval); + // SECURITY: if the rotation time is late, delay future rotations by the same amount + interval.set_missed_tick_behavior(time::MissedTickBehavior::Delay); + Self { current: Default::default(), prev: Default::default(), inv_stream: BroadcastStream::new(inv_stream).boxed(), - interval: IntervalStream::new(time::interval(Duration::from_secs(75))), + interval: IntervalStream::new(interval), } } - /// Returns an iterator over addrs of peers that have recently advertised - /// having `hash` in their inventory. - pub fn peers(&self, hash: &InventoryHash) -> impl Iterator { - let prev = self.prev.get(hash).into_iter(); - let current = self.current.get(hash).into_iter(); + /// Returns an iterator over addrs of peers that have recently advertised `hash` in their inventory. + pub fn advertising_peers(&self, hash: InventoryHash) -> impl Iterator { + self.status_peers(hash) + .filter_map(|addr_status| addr_status.advertised()) + } - prev.chain(current).flatten() + /// Returns an iterator over addrs of peers that have recently missed `hash` in their inventory. + #[allow(dead_code)] + pub fn missing_peers(&self, hash: InventoryHash) -> impl Iterator { + self.status_peers(hash) + .filter_map(|addr_status| addr_status.missing()) + } + + /// Returns an iterator over peer inventory statuses for `hash`. + /// + /// Prefers current statuses to previously rotated statuses for the same peer. + pub fn status_peers( + &self, + hash: InventoryHash, + ) -> impl Iterator> { + let prev = self.prev.get(&hash); + let current = self.current.get(&hash); + + // # Security + // + // Prefer `current` statuses for the same peer over previously rotated statuses. + // This makes sure Zebra is using the most up-to-date network information. + let prev = prev + .into_iter() + .flatten() + .filter(move |(addr, _status)| !self.has_current_status(hash, **addr)); + let current = current.into_iter().flatten(); + + current + .chain(prev) + .map(|(addr, status)| status.map(|()| addr)) + } + + /// Returns true if there is a current status entry for `hash` and `addr`. + pub fn has_current_status(&self, hash: InventoryHash, addr: SocketAddr) -> bool { + self.current + .get(&hash) + .and_then(|current| current.get(&addr)) + .is_some() + } + + /// Returns a future that polls once for new registry updates. + #[allow(dead_code)] + pub fn update(&mut self) -> Update { + Update::new(self) } /// Drive periodic inventory tasks @@ -75,6 +268,7 @@ impl InventoryRegistry { /// - rotates HashMaps based on interval events /// - drains the inv_stream channel and registers all advertised inventory pub fn poll_inventory(&mut self, cx: &mut Context<'_>) -> Result<(), BoxError> { + // Correctness: Registers the current task for wakeup when the timer next becomes ready. while Pin::new(&mut self.interval).poll_next(cx).is_ready() { self.rotate(); } @@ -97,10 +291,14 @@ impl InventoryRegistry { // failure of the peer set. while let Poll::Ready(channel_result) = self.inv_stream.next().poll_unpin(cx) { match channel_result { - Some(Ok((hash, addr))) => self.register(hash, addr), + Some(Ok(change)) => self.register(change), Some(Err(BroadcastStreamRecvError::Lagged(count))) => { metrics::counter!("pool.inventory.dropped", 1); - tracing::debug!(count, "dropped lagged inventory advertisements"); + metrics::counter!("pool.inventory.dropped.messages", count); + + // If this message happens a lot, we should improve inventory registry performance, + // or poll the registry or peer set in a separate task. + info!(count, "dropped lagged inventory advertisements"); } // This indicates all senders, including the one in the handshaker, // have been dropped, which really is a permanent failure. @@ -111,9 +309,52 @@ impl InventoryRegistry { Ok(()) } - /// Record that the given inventory `hash` is available from the peer `addr` - fn register(&mut self, hash: InventoryHash, addr: SocketAddr) { - self.current.entry(hash).or_default().insert(addr); + /// Record the given inventory `change` for the peer `addr`. + /// + /// `Missing` markers are not updated until the registry rotates, for security reasons. + fn register(&mut self, change: InventoryChange) { + let new_status = change.marker(); + let (invs, addr) = change.inner(); + + for inv in invs { + use InventoryHash::*; + assert!( + matches!(inv, Block(_) | Tx(_) | Wtx(_)), + "unexpected inventory type: {:?} from peer: {:?}", + inv, + addr, + ); + + let current = self.current.entry(inv).or_default(); + + // # Security + // + // Prefer `missing` over `advertised`, so malicious peers can't reset their own entries, + // and funnel multiple failing requests to themselves. + if let Some(old_status) = current.get(&addr) { + if old_status.is_missing() && new_status.is_advertised() { + debug!(?new_status, ?old_status, ?addr, ?inv, "skipping new status"); + continue; + } + + debug!( + ?new_status, + ?old_status, + ?addr, + ?inv, + "keeping both new and old status" + ); + } + + let replaced_status = current.insert(addr, new_status); + debug!( + ?new_status, + ?replaced_status, + ?addr, + ?inv, + "inserted new status" + ); + } } /// Replace the prev HashMap with current's and replace current with an empty diff --git a/zebra-network/src/peer_set/inventory_registry/tests.rs b/zebra-network/src/peer_set/inventory_registry/tests.rs new file mode 100644 index 00000000..e83d591b --- /dev/null +++ b/zebra-network/src/peer_set/inventory_registry/tests.rs @@ -0,0 +1,3 @@ +//! Tests for the inventory registry. + +mod vectors; diff --git a/zebra-network/src/peer_set/inventory_registry/tests/vectors.rs b/zebra-network/src/peer_set/inventory_registry/tests/vectors.rs new file mode 100644 index 00000000..f54970b6 --- /dev/null +++ b/zebra-network/src/peer_set/inventory_registry/tests/vectors.rs @@ -0,0 +1,198 @@ +//! Fixed test vectors for the inventory registry. + +use tokio::sync::broadcast; + +use zebra_chain::block; + +use crate::{ + peer_set::{ + inventory_registry::{InventoryRegistry, InventoryStatus}, + InventoryChange, + }, + protocol::external::InventoryHash, +}; + +/// The number of changes that can be pending in the inventory channel, before it starts lagging. +/// +/// Lagging drops messages, so tests should avoid filling the channel. +pub const MAX_PENDING_CHANGES: usize = 32; + +/// Check an empty inventory registry works as expected. +#[tokio::test] +async fn inv_registry_empty_ok() { + let fake_hash = InventoryHash::Error; + + let (mut inv_registry, _inv_stream_tx) = new_inv_registry(); + + inv_registry + .update() + .await + .expect("unexpected dropped registry sender channel"); + + assert_eq!(inv_registry.advertising_peers(fake_hash).count(), 0); + assert_eq!(inv_registry.missing_peers(fake_hash).count(), 0); +} + +/// Check inventory registration for one advertised hash/peer. +#[tokio::test] +async fn inv_registry_one_advertised_ok() { + let test_hash = InventoryHash::Block(block::Hash([0; 32])); + let test_peer = "1.1.1.1:1" + .parse() + .expect("unexpected invalid peer address"); + let test_change = InventoryStatus::new_advertised(test_hash, test_peer); + + let (mut inv_registry, inv_stream_tx) = new_inv_registry(); + + let receiver_count = inv_stream_tx + .send(test_change) + .expect("unexpected failed inventory status send"); + assert_eq!(receiver_count, 1); + + inv_registry + .update() + .await + .expect("unexpected dropped registry sender channel"); + + assert_eq!( + inv_registry.advertising_peers(test_hash).next(), + Some(&test_peer), + ); + assert_eq!(inv_registry.missing_peers(test_hash).count(), 0); +} + +/// Check inventory registration for one missing hash/peer. +#[tokio::test] +async fn inv_registry_one_missing_ok() { + let test_hash = InventoryHash::Block(block::Hash([0; 32])); + let test_peer = "1.1.1.1:1" + .parse() + .expect("unexpected invalid peer address"); + let test_change = InventoryStatus::new_missing(test_hash, test_peer); + + let (mut inv_registry, inv_stream_tx) = new_inv_registry(); + + let receiver_count = inv_stream_tx + .send(test_change) + .expect("unexpected failed inventory status send"); + assert_eq!(receiver_count, 1); + + inv_registry + .update() + .await + .expect("unexpected dropped registry sender channel"); + + assert_eq!(inv_registry.advertising_peers(test_hash).count(), 0); + assert_eq!( + inv_registry.missing_peers(test_hash).next(), + Some(&test_peer), + ); +} + +/// Check inventory registration for one hash/peer prefers missing over advertised. +#[tokio::test] +async fn inv_registry_prefer_missing_ok() { + inv_registry_prefer_missing_order(true).await; + inv_registry_prefer_missing_order(false).await; +} + +async fn inv_registry_prefer_missing_order(missing_first: bool) { + let test_hash = InventoryHash::Block(block::Hash([0; 32])); + let test_peer = "1.1.1.1:1" + .parse() + .expect("unexpected invalid peer address"); + + let missing_change = InventoryStatus::new_missing(test_hash, test_peer); + let advertised_change = InventoryStatus::new_advertised(test_hash, test_peer); + + let (mut inv_registry, inv_stream_tx) = new_inv_registry(); + + let changes = if missing_first { + [missing_change, advertised_change] + } else { + [advertised_change, missing_change] + }; + + for change in changes { + let receiver_count = inv_stream_tx + .send(change) + .expect("unexpected failed inventory status send"); + assert_eq!(receiver_count, 1); + } + + // TODO: also test with updates after each change + inv_registry + .update() + .await + .expect("unexpected dropped registry sender channel"); + + assert_eq!(inv_registry.advertising_peers(test_hash).count(), 0); + assert_eq!( + inv_registry.missing_peers(test_hash).next(), + Some(&test_peer), + ); +} + +/// Check inventory registration for one hash/peer prefers current over previous. +#[tokio::test] +async fn inv_registry_prefer_current_ok() { + inv_registry_prefer_current_order(true).await; + inv_registry_prefer_current_order(false).await; +} + +async fn inv_registry_prefer_current_order(missing_current: bool) { + let test_hash = InventoryHash::Block(block::Hash([0; 32])); + let test_peer = "1.1.1.1:1" + .parse() + .expect("unexpected invalid peer address"); + + let missing_change = InventoryStatus::new_missing(test_hash, test_peer); + let advertised_change = InventoryStatus::new_advertised(test_hash, test_peer); + + let (mut inv_registry, inv_stream_tx) = new_inv_registry(); + + let changes = if missing_current { + [advertised_change, missing_change] + } else { + [missing_change, advertised_change] + }; + + for change in changes { + // This rotation has no effect in the first loop iteration, because the registry is empty. + inv_registry.rotate(); + + let receiver_count = inv_stream_tx + .send(change) + .expect("unexpected failed inventory status send"); + assert_eq!(receiver_count, 1); + + // We must update after each change, so the rotation puts the first change in `prev`. + inv_registry + .update() + .await + .expect("unexpected dropped registry sender channel"); + } + + if missing_current { + assert_eq!(inv_registry.advertising_peers(test_hash).count(), 0); + assert_eq!( + inv_registry.missing_peers(test_hash).next(), + Some(&test_peer), + ); + } else { + assert_eq!( + inv_registry.advertising_peers(test_hash).next(), + Some(&test_peer), + ); + assert_eq!(inv_registry.missing_peers(test_hash).count(), 0); + } +} + +/// Returns a newly initialised inventory registry, and a sender for its inventory channel. +fn new_inv_registry() -> (InventoryRegistry, broadcast::Sender) { + let (inv_stream_tx, inv_stream_rx) = broadcast::channel(MAX_PENDING_CHANGES); + + let inv_registry = InventoryRegistry::new(inv_stream_rx); + + (inv_registry, inv_stream_tx) +} diff --git a/zebra-network/src/peer_set/inventory_registry/update.rs b/zebra-network/src/peer_set/inventory_registry/update.rs new file mode 100644 index 00000000..9ebedc55 --- /dev/null +++ b/zebra-network/src/peer_set/inventory_registry/update.rs @@ -0,0 +1,36 @@ +//! Inventory registry update future. + +use std::pin::Pin; + +use futures::{ + future::Future, + task::{Context, Poll}, +}; + +use crate::{peer_set::InventoryRegistry, BoxError}; + +/// Future for the [`update`](super::InventoryRegistry::update) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Update<'a> { + registry: &'a mut InventoryRegistry, +} + +impl Unpin for Update<'_> {} + +impl<'a> Update<'a> { + #[allow(dead_code)] + pub(super) fn new(registry: &'a mut InventoryRegistry) -> Self { + Self { registry } + } +} + +impl Future for Update<'_> { + type Output = Result<(), BoxError>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // TODO: should the future wait until new changes arrive? + // or for the rotation timer? + Poll::Ready(self.registry.poll_inventory(cx)) + } +} diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index 48f96baf..3277f5ad 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -124,7 +124,7 @@ use crate::{ peer::{LoadTrackedClient, MinimumPeerVersion}, peer_set::{ unready_service::{Error as UnreadyError, UnreadyService}, - InventoryRegistry, + InventoryChange, InventoryRegistry, }, protocol::{ external::InventoryHash, @@ -256,7 +256,7 @@ where /// - `handle_rx`: receives background task handles, /// monitors them to make sure they're still running, /// and shuts down all the tasks as soon as one task exits; - /// - `inv_stream`: receives inventory advertisements for peers, + /// - `inv_stream`: receives inventory changes from peers, /// allowing the peer set to direct inventory requests; /// - `address_book`: when peer set is busy, it logs address book diagnostics. pub fn new( @@ -264,7 +264,7 @@ where discover: D, demand_signal: mpsc::Sender, handle_rx: tokio::sync::oneshot::Receiver>>>, - inv_stream: broadcast::Receiver<(InventoryHash, SocketAddr)>, + inv_stream: broadcast::Receiver, address_metrics: watch::Receiver, minimum_peer_version: MinimumPeerVersion, ) -> Self { @@ -659,7 +659,7 @@ where ) -> >::Future { let inventory_peer_list = self .inventory_registry - .peers(&hash) + .advertising_peers(hash) .filter(|&key| self.ready_services.contains_key(key)) .copied() .collect(); diff --git a/zebra-network/src/peer_set/set/tests.rs b/zebra-network/src/peer_set/set/tests.rs index 269df2c8..06633c6c 100644 --- a/zebra-network/src/peer_set/set/tests.rs +++ b/zebra-network/src/peer_set/set/tests.rs @@ -19,12 +19,11 @@ use zebra_chain::{ parameters::{Network, NetworkUpgrade}, }; -use super::MorePeers; use crate::{ address_book::AddressMetrics, peer::{ClientTestHarness, LoadTrackedClient, MinimumPeerVersion}, - peer_set::PeerSet, - protocol::external::{types::Version, InventoryHash}, + peer_set::{set::MorePeers, InventoryChange, PeerSet}, + protocol::external::types::Version, AddressBook, Config, }; @@ -113,7 +112,7 @@ struct PeerSetBuilder { discover: Option, demand_signal: Option>, handle_rx: Option>>>>, - inv_stream: Option>, + inv_stream: Option>, address_book: Option>>, minimum_peer_version: Option>, } @@ -207,7 +206,7 @@ pub struct PeerSetGuard { background_tasks_sender: Option>>>>, demand_receiver: Option>, - inventory_sender: Option>, + inventory_sender: Option>, address_book: Option>>, } @@ -217,6 +216,31 @@ impl PeerSetGuard { PeerSetGuard::default() } + /// Return a mutable reference to the background tasks sender, if present. + #[allow(dead_code)] + pub fn background_tasks_sender( + &mut self, + ) -> &mut Option>>>> { + &mut self.background_tasks_sender + } + + /// Return a mutable reference to the background tasks sender, if present. + #[allow(dead_code)] + pub fn demand_receiver(&mut self) -> &mut Option> { + &mut self.demand_receiver + } + + /// Return a mutable reference to the background tasks sender, if present. + pub fn inventory_sender(&mut self) -> &mut Option> { + &mut self.inventory_sender + } + + /// Return a mutable reference to the background tasks sender, if present. + #[allow(dead_code)] + pub fn address_book(&mut self) -> &mut Option>> { + &mut self.address_book + } + /// Create a dummy channel for the background tasks sent to the [`PeerSet`]. /// /// The sender is stored inside the [`PeerSetGuard`], while the receiver is returned to be @@ -247,9 +271,7 @@ impl PeerSetGuard { /// /// The sender is stored inside the [`PeerSetGuard`], while the receiver is returned to be /// passed to the [`PeerSet`] constructor. - pub fn create_inventory_receiver( - &mut self, - ) -> broadcast::Receiver<(InventoryHash, SocketAddr)> { + pub fn create_inventory_receiver(&mut self) -> broadcast::Receiver { let (sender, receiver) = broadcast::channel(1); self.inventory_sender = Some(sender); diff --git a/zebra-network/src/peer_set/set/tests/vectors.rs b/zebra-network/src/peer_set/set/tests/vectors.rs index 9d88ac4c..f4cdc55a 100644 --- a/zebra-network/src/peer_set/set/tests/vectors.rs +++ b/zebra-network/src/peer_set/set/tests/vectors.rs @@ -1,13 +1,18 @@ -use std::time::Duration; +use std::{iter, time::Duration}; + use tokio::time::timeout; use tower::{Service, ServiceExt}; -use zebra_chain::parameters::{Network, NetworkUpgrade}; +use zebra_chain::{ + block, + parameters::{Network, NetworkUpgrade}, +}; use super::{PeerSetBuilder, PeerVersions}; use crate::{ peer::{ClientRequest, MinimumPeerVersion}, - protocol::external::types::Version, + peer_set::inventory_registry::InventoryStatus, + protocol::external::{types::Version, InventoryHash}, Request, }; @@ -164,3 +169,159 @@ fn peer_set_ready_multiple_connections() { assert!(timeout(Duration::from_secs(10), peer_ready).await.is_err()); }); } + +/// Check that a peer set with an empty inventory registry routes requests to a random ready peer. +#[test] +fn peer_set_route_inv_empty_registry() { + let test_hash = block::Hash([0; 32]); + + // Use two peers with the same version + let peer_version = Version::min_specified_for_upgrade(Network::Mainnet, NetworkUpgrade::Canopy); + let peer_versions = PeerVersions { + peer_versions: vec![peer_version, peer_version], + }; + + // Start the runtime + let runtime = zebra_test::init_async(); + let _guard = runtime.enter(); + + // Pause the runtime's timer so that it advances automatically. + // + // CORRECTNESS: This test does not depend on external resources that could really timeout, like + // real network connections. + tokio::time::pause(); + + // Get peers and client handles of them + let (discovered_peers, handles) = peer_versions.mock_peer_discovery(); + let (minimum_peer_version, _best_tip_height) = + MinimumPeerVersion::with_mock_chain_tip(Network::Mainnet); + + // Make sure we have the right number of peers + assert_eq!(handles.len(), 2); + + runtime.block_on(async move { + // Build a peerset + let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new() + .with_discover(discovered_peers) + .with_minimum_peer_version(minimum_peer_version.clone()) + .build(); + + // Get peerset ready + let peer_ready = peer_set + .ready() + .await + .expect("peer set service is always ready"); + + // Check we have the right amount of ready services + assert_eq!(peer_ready.ready_services.len(), 2); + + // Send an inventory-based request + let sent_request = Request::BlocksByHash(iter::once(test_hash).collect()); + let _fut = peer_ready.call(sent_request.clone()); + + // Check that one of the clients received the request + let mut received_count = 0; + for mut handle in handles { + if let Some(ClientRequest { request, .. }) = + handle.try_to_receive_outbound_client_request().request() + { + assert_eq!(sent_request, request); + received_count += 1; + } + } + + assert_eq!(received_count, 1); + }); +} + +/// Check that a peer set routes inventory requests to a peer that has advertised that inventory. +#[test] +fn peer_set_route_inv_via_registry() { + let test_hash = block::Hash([0; 32]); + let test_inv = InventoryHash::Block(test_hash); + + // Hard-code the fixed test address created by mock_peer_discovery + // TODO: add peer test addresses to ClientTestHarness + let test_peer = "127.0.0.1:1" + .parse() + .expect("unexpected invalid peer address"); + + let test_change = InventoryStatus::new_advertised(test_inv, test_peer); + + // Use two peers with the same version + let peer_version = Version::min_specified_for_upgrade(Network::Mainnet, NetworkUpgrade::Canopy); + let peer_versions = PeerVersions { + peer_versions: vec![peer_version, peer_version], + }; + + // Start the runtime + let runtime = zebra_test::init_async(); + let _guard = runtime.enter(); + + // Pause the runtime's timer so that it advances automatically. + // + // CORRECTNESS: This test does not depend on external resources that could really timeout, like + // real network connections. + tokio::time::pause(); + + // Get peers and client handles of them + let (discovered_peers, mut handles) = peer_versions.mock_peer_discovery(); + let (minimum_peer_version, _best_tip_height) = + MinimumPeerVersion::with_mock_chain_tip(Network::Mainnet); + + // Make sure we have the right number of peers + assert_eq!(handles.len(), 2); + + runtime.block_on(async move { + // Build a peerset + let (mut peer_set, mut peer_set_guard) = PeerSetBuilder::new() + .with_discover(discovered_peers) + .with_minimum_peer_version(minimum_peer_version.clone()) + .build(); + + // Advertise some inventory + peer_set_guard + .inventory_sender() + .as_mut() + .expect("unexpected missing inv sender") + .send(test_change) + .expect("unexpected dropped receiver"); + + // Get peerset ready + let peer_ready = peer_set + .ready() + .await + .expect("peer set service is always ready"); + + // Check we have the right amount of ready services + assert_eq!(peer_ready.ready_services.len(), 2); + + // Send an inventory-based request + let sent_request = Request::BlocksByHash(iter::once(test_hash).collect()); + let _fut = peer_ready.call(sent_request.clone()); + + // Check that the client that advertised the inventory received the request + let advertised_handle = &mut handles[0]; + + if let Some(ClientRequest { request, .. }) = advertised_handle + .try_to_receive_outbound_client_request() + .request() + { + assert_eq!(sent_request, request); + } else { + panic!("inv request not routed to advertised peer"); + } + + let other_handle = &mut handles[1]; + + assert!( + matches!( + other_handle + .try_to_receive_outbound_client_request() + .request(), + None + ), + "request routed to non-advertised peer", + ); + }); +} diff --git a/zebra-network/src/protocol/external/inv.rs b/zebra-network/src/protocol/external/inv.rs index ed4de5d3..ea483d88 100644 --- a/zebra-network/src/protocol/external/inv.rs +++ b/zebra-network/src/protocol/external/inv.rs @@ -50,7 +50,6 @@ pub enum InventoryHash { /// [auth_digest]: https://zips.z.cash/zip-0244#authorizing-data-commitment /// [zip239]: https://zips.z.cash/zip-0239 /// [bip339]: https://github.com/bitcoin/bips/blob/master/bip-0339.mediawiki - // TODO: Actually handle this variant once the mempool is implemented (#2449) Wtx(transaction::WtxId), }