diff --git a/Cargo.toml b/Cargo.toml index 04c64569..3a2057c8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,3 +9,7 @@ members = [ "zebra-client", "zebrad", ] + +[patch.crates-io] +# Required because we pull tower-load from git +tower = { git = "https://github.com/tower-rs/tower", branch = "v0.3.x" } diff --git a/zebra-network/Cargo.toml b/zebra-network/Cargo.toml index 4954d037..4b7dfbab 100644 --- a/zebra-network/Cargo.toml +++ b/zebra-network/Cargo.toml @@ -15,6 +15,10 @@ byteorder = "1.3" chrono = "0.4" failure = "0.1" serde = { version = "1", features = ["serde_derive"] } +pin-project = "0.4" +# indexmap has rayon support for parallel iteration, +# which we don't use, so disable it to drop the dependencies. +indexmap = { version = "1.2", default-features = false } tokio = "=0.2.0-alpha.6" futures-preview = { version = "=0.3.0-alpha.19", features = ["async-await"] } @@ -23,5 +27,6 @@ tracing = "0.1" tracing-futures = { version = "0.1", features = ["tokio-alpha"], default-features = false } tower = "=0.3.0-alpha.2" +tower-load = { git = "https://github.com/tower-rs/tower", branch = "v0.3.x"} zebra-chain = { path = "../zebra-chain" } diff --git a/zebra-network/src/lib.rs b/zebra-network/src/lib.rs index 2e302b7f..cc006747 100644 --- a/zebra-network/src/lib.rs +++ b/zebra-network/src/lib.rs @@ -2,6 +2,8 @@ #![deny(missing_docs)] +#[macro_use] +extern crate pin_project; #[macro_use] extern crate serde; #[macro_use] @@ -30,4 +32,5 @@ pub mod protocol; pub mod constants; pub mod meta_addr; pub mod peer; +pub mod peer_set; pub mod timestamp_collector; diff --git a/zebra-network/src/peer.rs b/zebra-network/src/peer.rs index 8d402dd9..e3993e72 100644 --- a/zebra-network/src/peer.rs +++ b/zebra-network/src/peer.rs @@ -1,11 +1,15 @@ //! Peer handling. /// Handles outbound requests from our node to the network. -pub mod client; +mod client; /// Asynchronously connects to peers. -pub mod connector; +mod connector; /// Handles inbound requests from the network to our node. -pub mod server; +mod server; + +pub use client::PeerClient; +pub use connector::PeerConnector; +pub use server::PeerServer; /// An error related to a peer connection. #[derive(Fail, Debug, Clone)] @@ -20,3 +24,10 @@ impl From for PeerError { PeerError::Inner(std::sync::Arc::new(e)) } } + +// XXX hack +impl Into for PeerError { + fn into(self) -> crate::BoxedStdError { + Box::new(format_err!("dropped error info").compat()) + } +} diff --git a/zebra-network/src/peer/client.rs b/zebra-network/src/peer/client.rs index 47949676..560d7d57 100644 --- a/zebra-network/src/peer/client.rs +++ b/zebra-network/src/peer/client.rs @@ -35,7 +35,8 @@ pub(super) struct ClientRequest( impl Service for PeerClient { type Response = Response; type Error = PeerError; - type Future = Pin>>>; + type Future = + Pin> + Send + 'static>>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { if let Err(_) = ready!(self.server_tx.poll_ready(cx)) { diff --git a/zebra-network/src/peer_set.rs b/zebra-network/src/peer_set.rs new file mode 100644 index 00000000..a87e2c68 --- /dev/null +++ b/zebra-network/src/peer_set.rs @@ -0,0 +1,11 @@ +//! A peer set whose size is dynamically determined by resource constraints. + +// Portions of this submodule were adapted from tower-balance, +// which is (c) 2019 Tower Contributors (MIT licensed). + +mod discover; +mod set; +mod unready_service; + +pub use discover::PeerDiscover; +pub use set::PeerSet; diff --git a/zebra-network/src/peer_set/discover.rs b/zebra-network/src/peer_set/discover.rs new file mode 100644 index 00000000..add4a33c --- /dev/null +++ b/zebra-network/src/peer_set/discover.rs @@ -0,0 +1,36 @@ +use std::{ + net::SocketAddr, + pin::Pin, + task::{Context, Poll}, +}; + +use failure::Error; +use tokio::prelude::*; +use tower::discover::{Change, Discover}; + +use crate::peer::PeerClient; + +/// A [`tower::discover::Discover`] implementation to report new `PeerClient`s. +/// +/// Because the `PeerClient` and `PeerServer` are always created together, either +/// by a `PeerConnector` or a `PeerListener`, and the `PeerServer` spawns a task +/// owned by Tokio, we only need to manage the `PeerClient` handles. +pub struct PeerDiscover { + // add fields; +} + +impl Discover for PeerDiscover { + type Key = SocketAddr; + type Service = PeerClient; + type Error = Error; + + fn poll_discover( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>> { + // Do stuff and possibly produce new peers?? + + // Change enum has insert and delete variants, but we only need to consider inserts here.. + unimplemented!(); + } +} diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs new file mode 100644 index 00000000..5b7af917 --- /dev/null +++ b/zebra-network/src/peer_set/set.rs @@ -0,0 +1,288 @@ +use std::{ + collections::HashMap, + fmt::Debug, + marker::PhantomData, + net::SocketAddr, + pin::Pin, + task::{Context, Poll}, +}; + +use futures::{channel::oneshot, stream::FuturesUnordered}; +use indexmap::IndexMap; +use tokio::prelude::*; +use tower::{ + discover::{Change, Discover}, + Service, +}; +use tower_load::Load; + +use crate::{ + peer::{PeerClient, PeerError}, + protocol::internal::{Request, Response}, + BoxedStdError, +}; + +use super::{ + unready_service::{Error as UnreadyError, UnreadyService}, + PeerDiscover, +}; + +/// A [`tower::Service`] that abstractly represents "the rest of the network". +/// +/// This implementation is adapted from the one in `tower-balance`, and as +/// described in that crate's documentation, it +/// +/// > Distributes requests across inner services using the [Power of Two Choices][p2c]. +/// > +/// > As described in the [Finagle Guide][finagle]: +/// > +/// > > The algorithm randomly picks two services from the set of ready endpoints and +/// > > selects the least loaded of the two. By repeatedly using this strategy, we can +/// > > expect a manageable upper bound on the maximum load of any server. +/// > > +/// > > The maximum load variance between any two servers is bound by `ln(ln(n))` where +/// > > `n` is the number of servers in the cluster. +/// +/// This should work well for many network requests, but not all of them: some +/// requests, e.g., a request for some particular inventory item, can only be +/// made to a subset of connected peers, e.g., the ones that have recently +/// advertised that inventory hash, and other requests require specialized logic +/// (e.g., transaction diffusion). +/// +/// Implementing this specialized routing logic inside the `PeerSet` -- so that +/// it continues to abstract away "the rest of the network" into one endpoint -- +/// is not a problem, as the `PeerSet` can simply maintain more information on +/// its peers and route requests appropriately. However, there is a problem with +/// maintaining accurate backpressure information, because the `Service` trait +/// requires that service readiness is independent of the data in the request. +/// +/// For this reason, in the future, this code will probably be refactored to +/// address this backpressure mismatch. One possibility is to refactor the code +/// so that one entity holds and maintains the peer set and metadata on the +/// peers, and each "backpressure category" of request is assigned to different +/// `Service` impls with specialized `poll_ready()` implementations. Another +/// less-elegant solution (which might be useful as an intermediate step for the +/// inventory case) is to provide a way to borrow a particular backing service, +/// say by address. +/// +/// [finagle]: https://twitter.github.io/finagle/guide/Clients.html#power-of-two-choices-p2c-least-loaded +/// [p2c]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf +pub struct PeerSet +where + D: Discover, +{ + discover: D, + ready_services: IndexMap, + cancel_handles: HashMap>, + unready_services: FuturesUnordered>, + next_idx: Option, +} + +impl PeerSet +where + D: Discover + Unpin, + D::Key: Clone + Debug, + D::Service: Service + Load, + D::Error: Into, + >::Error: Into + 'static, + >::Future: Send + 'static, + ::Metric: Debug, +{ + /// Construct a peerset which uses `discover` internally. + pub fn new(discover: D) -> Self { + Self { + discover, + ready_services: IndexMap::new(), + cancel_handles: HashMap::new(), + unready_services: FuturesUnordered::new(), + next_idx: None, + } + } + + fn poll_discover(&mut self, cx: &mut Context<'_>) -> Poll> { + use futures::ready; + loop { + match ready!(Pin::new(&mut self.discover).poll_discover(cx)).map_err(Into::into)? { + Change::Remove(key) => { + trace!(?key, "got Change::Remove from Discover"); + self.remove(&key); + } + Change::Insert(key, svc) => { + trace!(?key, "got Change::Insert from Discover"); + self.remove(&key); + self.push_unready(key, svc); + } + } + } + } + + fn remove(&mut self, key: &D::Key) { + // Remove key from either the set of ready services, + // or else from the set of unready services. + if let Some((i, _, _)) = self.ready_services.swap_remove_full(key) { + // swap_remove perturbs the position of the last element of + // ready_services, so we may have invalidated self.next_idx, in + // which case we need to fix it. Specifically, swap_remove swaps the + // position of the removee and the last element, then drops the + // removee from the end, so we compare the active and removed indices: + let len = self.ready_services.len(); + self.next_idx = match self.next_idx { + None => None, // No active index + Some(j) if j == i => None, // We removed j + Some(j) if j == len => Some(i), // We swapped i and j + Some(j) => Some(j), // We swapped an unrelated service. + }; + // No Heisenservices: they must be ready or unready. + debug_assert!(!self.cancel_handles.contains_key(key)); + } else if let Some(handle) = self.cancel_handles.remove(key) { + let _ = handle.send(()); + } + } + + fn push_unready(&mut self, key: D::Key, svc: D::Service) { + let (tx, rx) = oneshot::channel(); + self.cancel_handles.insert(key.clone(), tx); + self.unready_services.push(UnreadyService { + key: Some(key), + service: Some(svc), + cancel: rx, + _req: PhantomData, + }); + } + + fn poll_unready(&mut self, cx: &mut Context<'_>) { + loop { + match Pin::new(&mut self.unready_services).poll_next(cx) { + Poll::Pending | Poll::Ready(None) => return, + Poll::Ready(Some(Ok((key, svc)))) => { + trace!(?key, "service became ready"); + let _cancel = self.cancel_handles.remove(&key); + debug_assert!(_cancel.is_some(), "missing cancel handle"); + self.ready_services.insert(key, svc); + } + Poll::Ready(Some(Err((key, UnreadyError::Canceled)))) => { + debug_assert!(!self.cancel_handles.contains_key(&key)) + } + Poll::Ready(Some(Err((key, UnreadyError::Inner(e))))) => { + let error = e.into(); + debug!(%error, "service failed while unready, dropped"); + let _cancel = self.cancel_handles.remove(&key); + debug_assert!(_cancel.is_some(), "missing cancel handle"); + } + } + } + } + + /// Performs P2C on inner services to select a ready service. + fn select_next_ready_index(&mut self) -> Option { + match self.ready_services.len() { + 0 => None, + 1 => Some(0), + len => { + // XXX avoid relying on rand complexity + let (a, b) = { + let idxs = rand::seq::index::sample(&mut rand::thread_rng(), len, 2); + (idxs.index(0), idxs.index(1)) + }; + + let a_load = self.ready_index_load(a); + let b_load = self.ready_index_load(b); + + let selected = if a_load <= b_load { a } else { b }; + + trace!(a.idx = a, a.load = ?a_load, b.idx = b, b.load = ?b_load, selected, "selected service by p2c"); + + Some(selected) + } + } + } + + /// Accesses a ready endpoint by index and returns its current load. + fn ready_index_load(&self, index: usize) -> ::Metric { + let (_, svc) = self.ready_services.get_index(index).expect("invalid index"); + svc.load() + } +} + +impl Service for PeerSet +where + D: Discover + Unpin, + D::Key: Clone + Debug, + D::Service: Service + Load, + D::Error: Into, + >::Error: Into + 'static, + >::Future: Send + 'static, + ::Metric: Debug, +{ + type Response = Response; + type Error = BoxedStdError; + type Future = Pin>>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + // Process peer discovery updates. + let _ = self.poll_discover(cx)?; + + // Poll unready services to drive them to readiness. + self.poll_unready(cx); + trace!( + num_ready = self.ready_services.len(), + num_unready = self.unready_services.len(), + ); + + loop { + // Re-check that the pre-selected service is ready, in case + // something has happened since (e.g., it failed, peer closed + // connection, ...) + if let Some(index) = self.next_idx { + let (key, service) = self + .ready_services + .get_index_mut(index) + .expect("preselected index must be valid"); + trace!(preselected_index = index, ?key); + match service.poll_ready(cx) { + Poll::Ready(Ok(())) => return Poll::Ready(Ok(())), + Poll::Pending => { + trace!("preselected service is no longer ready"); + let (key, service) = self + .ready_services + .swap_remove_index(index) + .expect("preselected index must be valid"); + self.push_unready(key, service); + } + Poll::Ready(Err(e)) => { + let error = e.into(); + trace!(%error, "preselected service failed, dropping it"); + self.ready_services + .swap_remove_index(index) + .expect("preselected index must be valid"); + } + } + } + + trace!("preselected service was not ready, reselecting"); + self.next_idx = self.select_next_ready_index(); + + if self.next_idx.is_none() { + trace!("no ready services, returning Poll::Pending"); + return Poll::Pending; + } + } + } + + fn call(&mut self, req: Request) -> Self::Future { + let index = self + .next_idx + .take() + .expect("ready service must have valid preselected index"); + let (key, mut svc) = self + .ready_services + .swap_remove_index(index) + .expect("preselected index must be valid"); + + let fut = svc.call(req); + self.push_unready(key, svc); + + use futures::future::TryFutureExt; + fut.map_err(Into::into).boxed() + } +} diff --git a/zebra-network/src/peer_set/unready_service.rs b/zebra-network/src/peer_set/unready_service.rs new file mode 100644 index 00000000..f471f2e5 --- /dev/null +++ b/zebra-network/src/peer_set/unready_service.rs @@ -0,0 +1,57 @@ +// Adapted from tower-balance + +use std::{ + marker::PhantomData, + pin::Pin, + task::{Context, Poll}, +}; + +use futures::{channel::oneshot, ready}; +use tokio::prelude::*; +use tower::Service; + +/// A Future that becomes satisfied when an `S`-typed service is ready. +/// +/// May fail due to cancelation, i.e. if the service is removed from discovery. +#[pin_project] +#[derive(Debug)] +pub(super) struct UnreadyService { + pub(super) key: Option, + #[pin] + pub(super) cancel: oneshot::Receiver<()>, + pub(super) service: Option, + + pub(super) _req: PhantomData, +} + +pub(super) enum Error { + Inner(E), + Canceled, +} + +impl, Req> Future for UnreadyService { + type Output = Result<(K, S), (K, Error)>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + + if let Poll::Ready(Ok(())) = this.cancel.poll(cx) { + let key = this.key.take().expect("polled after ready"); + return Poll::Ready(Err((key, Error::Canceled))); + } + + let res = ready!(this + .service + .as_mut() + .expect("poll after ready") + .poll_ready(cx)); + + let key = this.key.take().expect("polled after ready"); + let svc = this.service.take().expect("polled after ready"); + + match res { + Ok(()) => Poll::Ready(Ok((key, svc))), + Err(e) => Poll::Ready(Err((key, Error::Inner(e)))), + } + } +} diff --git a/zebrad/Cargo.toml b/zebrad/Cargo.toml index e12e5c93..b1c38d8a 100644 --- a/zebrad/Cargo.toml +++ b/zebrad/Cargo.toml @@ -25,6 +25,7 @@ tracing-log = "0.1" hyper = "=0.13.0-alpha.4" tower = "=0.3.0-alpha.2" +tower-load = { git = "https://github.com/tower-rs/tower", branch = "v0.3.x"} zebra-chain = { path = "../zebra-chain" } zebra-network = { path = "../zebra-network" } diff --git a/zebrad/src/commands/connect.rs b/zebrad/src/commands/connect.rs index 3eb158b1..b5fb7c46 100644 --- a/zebrad/src/commands/connect.rs +++ b/zebrad/src/commands/connect.rs @@ -52,7 +52,8 @@ impl Runnable for ConnectCmd { impl ConnectCmd { async fn connect(&self) -> Result<(), failure::Error> { use zebra_network::{ - peer::connector::PeerConnector, + peer::PeerConnector, + peer_set::PeerSet, protocol::internal::{Request, Response}, timestamp_collector::TimestampCollector, Network, @@ -79,8 +80,69 @@ impl ConnectCmd { let mut client = pc.call(self.addr.clone()).await?; client.ready().await?; - let rsp = client.call(Request::GetPeers).await?; - info!(?rsp); + + let addrs = match client.call(Request::GetPeers).await? { + Response::Peers(addrs) => addrs, + _ => bail!("Got wrong response type"), + }; + info!( + addrs.len = addrs.len(), + "got addresses from first connected peer" + ); + + use failure::Error; + use futures::{ + future, + stream::{FuturesUnordered, StreamExt}, + }; + use std::time::Duration; + use tower::discover::{Change, ServiceStream}; + use tower_load::{peak_ewma::PeakEwmaDiscover, NoInstrument}; + + // construct a stream of services + let client_stream = PeakEwmaDiscover::new( + ServiceStream::new( + addrs + .into_iter() + .map(|meta| { + let svc_fut = pc.call(meta.addr); + async move { Ok::<_, Error>(Change::Insert(meta.addr, svc_fut.await?)) } + }) + .collect::>() + // Discard any errored connections... + .filter(|result| future::ready(result.is_ok())), + ), + Duration::from_secs(1), // default rtt estimate + Duration::from_secs(60), // decay time + NoInstrument, + ); + + info!("finished constructing discover"); + + let mut peer_set = PeerSet::new(client_stream); + + info!("waiting for peer_set ready"); + peer_set.ready().await.map_err(Error::from_boxed_compat)?; + + info!("peer_set became ready, constructing addr requests"); + + let mut addr_reqs = FuturesUnordered::new(); + for i in 0..10usize { + info!(i, "awaiting peer_set ready"); + peer_set.ready().await.map_err(Error::from_boxed_compat)?; + info!(i, "calling peer_set"); + addr_reqs.push(peer_set.call(Request::GetPeers)); + } + + let mut all_addrs = Vec::new(); + while let Some(Ok(Response::Peers(addrs))) = addr_reqs.next().await { + info!( + all_addrs.len = all_addrs.len(), + addrs.len = addrs.len(), + "got address response" + ); + all_addrs.extend(addrs); + } loop { // empty loop ensures we don't exit the application,