From 424edfa4d9e80c372604aae9dd22e6972017d4bf Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 22 Oct 2021 11:26:04 +1000 Subject: [PATCH] Improve documentation and types in the PeerSet (#2925) * Replace some unit tuples with named unit structs This helps distinguish generic channels and make them type-safe. Also tidy imports and documentation in `peer_set::set`. * Link to the tower balance crate from docs Co-authored-by: Alfredo Garcia Co-authored-by: Alfredo Garcia --- zebra-network/src/peer_set/initialize.rs | 19 +-- zebra-network/src/peer_set/set.rs | 122 ++++++++++-------- zebra-network/src/peer_set/unready_service.rs | 6 +- 3 files changed, 84 insertions(+), 63 deletions(-) diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index 68312f1a..6822fc25 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -27,7 +27,7 @@ use crate::{ constants, meta_addr::MetaAddr, peer::{self, HandshakeRequest, OutboundConnectorRequest}, - peer_set::{ActiveConnectionCounter, CandidateSet, ConnectionTracker, PeerSet}, + peer_set::{set::MorePeers, ActiveConnectionCounter, CandidateSet, ConnectionTracker, PeerSet}, timestamp_collector::TimestampCollector, AddressBook, BoxError, Config, Request, Response, }; @@ -109,7 +109,9 @@ where // Create an mpsc channel for peer changes, with a generous buffer. let (peerset_tx, peerset_rx) = mpsc::channel::(100); // Create an mpsc channel for peerset demand signaling. - let (mut demand_tx, demand_rx) = mpsc::channel::<()>(100); + let (mut demand_tx, demand_rx) = mpsc::channel::(100); + + // Create a oneshot to send background task JoinHandles to the peer set let (handle_tx, handle_rx) = tokio::sync::oneshot::channel(); // Connect the rx end to a PeerSet, wrapping new peers in load instruments. @@ -167,8 +169,9 @@ where ); let _ = candidates.update_initial(active_initial_peer_count).await; + // TODO: reduce demand by `active_outbound_connections.update_count()` (#2902) for _ in 0..config.peerset_initial_target_size { - let _ = demand_tx.try_send(()); + let _ = demand_tx.try_send(MorePeers); } let crawl_guard = tokio::spawn( @@ -469,8 +472,8 @@ enum CrawlerAction { #[instrument(skip(demand_tx, demand_rx, candidates, outbound_connector, peerset_tx,))] async fn crawl_and_dial( crawl_new_peer_interval: std::time::Duration, - mut demand_tx: mpsc::Sender<()>, - mut demand_rx: mpsc::Receiver<()>, + mut demand_tx: mpsc::Sender, + mut demand_rx: mpsc::Receiver, mut candidates: CandidateSet, outbound_connector: C, mut peerset_tx: mpsc::Sender, @@ -579,7 +582,7 @@ where // spawn independent tasks to avoid deadlocks candidates.update().await?; // Try to connect to a new peer. - let _ = demand_tx.try_send(()); + let _ = demand_tx.try_send(MorePeers); } TimerCrawl { tick } => { debug!( @@ -589,7 +592,7 @@ where // TODO: spawn independent tasks to avoid deadlocks candidates.update().await?; // Try to connect to a new peer. - let _ = demand_tx.try_send(()); + let _ = demand_tx.try_send(MorePeers); } HandshakeConnected { peer_set_change } => { if let Change::Insert(ref addr, _) = peer_set_change { @@ -609,7 +612,7 @@ where // The demand signal that was taken out of the queue // to attempt to connect to the failed candidate never // turned into a connection, so add it back: - let _ = demand_tx.try_send(()); + let _ = demand_tx.try_send(MorePeers); } } } diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index 485a3b68..d224d256 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -1,9 +1,53 @@ -use std::net::SocketAddr; +//! Abstractions that represent "the rest of the network". +//! +//! # Implementation +//! +//! The [`PeerSet`] implementation is adapted from the one in the [Tower Balance][tower-balance] crate, and as +//! described in that crate's documentation, it +//! +//! > Distributes requests across inner services using the [Power of Two Choices][p2c]. +//! > +//! > As described in the [Finagle Guide][finagle]: +//! > +//! > > The algorithm randomly picks two services from the set of ready endpoints and +//! > > selects the least loaded of the two. By repeatedly using this strategy, we can +//! > > expect a manageable upper bound on the maximum load of any server. +//! > > +//! > > The maximum load variance between any two servers is bound by `ln(ln(n))` where +//! > > `n` is the number of servers in the cluster. +//! +//! This should work well for many network requests, but not all of them: some +//! requests, e.g., a request for some particular inventory item, can only be +//! made to a subset of connected peers, e.g., the ones that have recently +//! advertised that inventory hash, and other requests require specialized logic +//! (e.g., transaction diffusion). +//! +//! Implementing this specialized routing logic inside the `PeerSet` -- so that +//! it continues to abstract away "the rest of the network" into one endpoint -- +//! is not a problem, as the `PeerSet` can simply maintain more information on +//! its peers and route requests appropriately. However, there is a problem with +//! maintaining accurate backpressure information, because the `Service` trait +//! requires that service readiness is independent of the data in the request. +//! +//! For this reason, in the future, this code will probably be refactored to +//! address this backpressure mismatch. One possibility is to refactor the code +//! so that one entity holds and maintains the peer set and metadata on the +//! peers, and each "backpressure category" of request is assigned to different +//! `Service` impls with specialized `poll_ready()` implementations. Another +//! less-elegant solution (which might be useful as an intermediate step for the +//! inventory case) is to provide a way to borrow a particular backing service, +//! say by address. +//! +//! [finagle]: https://twitter.github.io/finagle/guide/Clients.html#power-of-two-choices-p2c-least-loaded +//! [p2c]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf +//! [tower-balance]: https://crates.io/crates/tower-balance + use std::{ collections::HashMap, fmt::Debug, future::Future, marker::PhantomData, + net::SocketAddr, pin::Pin, sync::Arc, task::{Context, Poll}, @@ -17,8 +61,10 @@ use futures::{ stream::FuturesUnordered, }; use indexmap::IndexMap; -use tokio::sync::{broadcast, oneshot::error::TryRecvError}; -use tokio::task::JoinHandle; +use tokio::{ + sync::{broadcast, oneshot::error::TryRecvError}, + task::JoinHandle, +}; use tower::{ discover::{Change, Discover}, load::Load, @@ -26,6 +72,10 @@ use tower::{ }; use crate::{ + peer_set::{ + unready_service::{Error as UnreadyError, UnreadyService}, + InventoryRegistry, + }, protocol::{ external::InventoryHash, internal::{Request, Response}, @@ -33,10 +83,17 @@ use crate::{ AddressBook, BoxError, }; -use super::{ - unready_service::{Error as UnreadyError, UnreadyService}, - InventoryRegistry, -}; +/// A signal sent by the [`PeerSet`] when it has no ready peers, and gets a request from Zebra. +/// +/// In response to this signal, the crawler tries to open more peer connections. +#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] +pub struct MorePeers; + +/// A signal sent by the [`PeerSet`] to cancel a [`Client`]'s current request or response. +/// +/// When it receives this signal, the [`Client`] stops processing and exits. +#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] +pub struct CancelClientWork; /// A [`tower::Service`] that abstractly represents "the rest of the network". /// @@ -48,47 +105,6 @@ use super::{ /// connections have an ephemeral local or proxy port.) /// /// Otherwise, malicious peers could interfere with other peers' `PeerSet` state. -/// -/// # Implementation -/// -/// This implementation is adapted from the one in `tower-balance`, and as -/// described in that crate's documentation, it -/// -/// > Distributes requests across inner services using the [Power of Two Choices][p2c]. -/// > -/// > As described in the [Finagle Guide][finagle]: -/// > -/// > > The algorithm randomly picks two services from the set of ready endpoints and -/// > > selects the least loaded of the two. By repeatedly using this strategy, we can -/// > > expect a manageable upper bound on the maximum load of any server. -/// > > -/// > > The maximum load variance between any two servers is bound by `ln(ln(n))` where -/// > > `n` is the number of servers in the cluster. -/// -/// This should work well for many network requests, but not all of them: some -/// requests, e.g., a request for some particular inventory item, can only be -/// made to a subset of connected peers, e.g., the ones that have recently -/// advertised that inventory hash, and other requests require specialized logic -/// (e.g., transaction diffusion). -/// -/// Implementing this specialized routing logic inside the `PeerSet` -- so that -/// it continues to abstract away "the rest of the network" into one endpoint -- -/// is not a problem, as the `PeerSet` can simply maintain more information on -/// its peers and route requests appropriately. However, there is a problem with -/// maintaining accurate backpressure information, because the `Service` trait -/// requires that service readiness is independent of the data in the request. -/// -/// For this reason, in the future, this code will probably be refactored to -/// address this backpressure mismatch. One possibility is to refactor the code -/// so that one entity holds and maintains the peer set and metadata on the -/// peers, and each "backpressure category" of request is assigned to different -/// `Service` impls with specialized `poll_ready()` implementations. Another -/// less-elegant solution (which might be useful as an intermediate step for the -/// inventory case) is to provide a way to borrow a particular backing service, -/// say by address. -/// -/// [finagle]: https://twitter.github.io/finagle/guide/Clients.html#power-of-two-choices-p2c-least-loaded -/// [p2c]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf pub struct PeerSet where D: Discover, @@ -99,9 +115,9 @@ where /// This means that every change to `ready_services` must invalidate or correct it. preselected_p2c_index: Option, ready_services: IndexMap, - cancel_handles: HashMap>, + cancel_handles: HashMap>, unready_services: FuturesUnordered>, - demand_signal: mpsc::Sender<()>, + demand_signal: mpsc::Sender, /// Channel for passing ownership of tokio JoinHandles from PeerSet's background tasks /// /// The join handles passed into the PeerSet are used populate the `guards` member @@ -132,7 +148,7 @@ where /// Construct a peerset which uses `discover` internally. pub fn new( discover: D, - demand_signal: mpsc::Sender<()>, + demand_signal: mpsc::Sender, handle_rx: tokio::sync::oneshot::Receiver>>>, inv_stream: broadcast::Receiver<(InventoryHash, SocketAddr)>, address_book: Arc>, @@ -252,7 +268,7 @@ where fn remove(&mut self, key: &D::Key) { if self.take_ready_service(key).is_some() { } else if let Some(handle) = self.cancel_handles.remove(key) { - let _ = handle.send(()); + let _ = handle.send(CancelClientWork); } } @@ -484,7 +500,7 @@ where // If we waited here, the crawler could deadlock sending a request to // fetch more peers, because it also empties the channel. trace!("no ready services, sending demand signal"); - let _ = self.demand_signal.try_send(()); + let _ = self.demand_signal.try_send(MorePeers); // CORRECTNESS // diff --git a/zebra-network/src/peer_set/unready_service.rs b/zebra-network/src/peer_set/unready_service.rs index d6a9f997..79ab37f1 100644 --- a/zebra-network/src/peer_set/unready_service.rs +++ b/zebra-network/src/peer_set/unready_service.rs @@ -10,6 +10,8 @@ use std::{ use futures::{channel::oneshot, ready}; use tower::Service; +use crate::peer_set::set::CancelClientWork; + /// A Future that becomes satisfied when an `S`-typed service is ready. /// /// May fail due to cancelation, i.e. if the service is removed from discovery. @@ -18,7 +20,7 @@ use tower::Service; pub(super) struct UnreadyService { pub(super) key: Option, #[pin] - pub(super) cancel: oneshot::Receiver<()>, + pub(super) cancel: oneshot::Receiver, pub(super) service: Option, pub(super) _req: PhantomData, @@ -35,7 +37,7 @@ impl, Req> Future for UnreadyService { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); - if let Poll::Ready(Ok(())) = this.cancel.poll(cx) { + if let Poll::Ready(Ok(CancelClientWork)) = this.cancel.poll(cx) { let key = this.key.take().expect("polled after ready"); return Poll::Ready(Err((key, Error::Canceled))); }