From b1832ce5939ca7cbbea09d875abb33c0217cfe84 Mon Sep 17 00:00:00 2001 From: Henry de Valence Date: Mon, 21 Oct 2019 15:24:17 -0700 Subject: [PATCH] Initial work to add a crawl-and-dial task. This responds to peerset demand by connecting to additional peers. Co-authored-by: Deirdre Connolly --- zebra-network/src/address_book.rs | 5 + zebra-network/src/config.rs | 3 + zebra-network/src/peer_set.rs | 196 ++++++++++++++++++-- zebra-network/src/peer_set/candidate_set.rs | 83 +++++++++ zebra-network/src/peer_set/set.rs | 12 +- zebrad/src/commands/connect.rs | 40 +--- 6 files changed, 279 insertions(+), 60 deletions(-) create mode 100644 zebra-network/src/peer_set/candidate_set.rs diff --git a/zebra-network/src/address_book.rs b/zebra-network/src/address_book.rs index bf1aa481..5efc63ee 100644 --- a/zebra-network/src/address_book.rs +++ b/zebra-network/src/address_book.rs @@ -150,6 +150,11 @@ impl AddressBook { newest_first: false, } } + + /// Returns the number of entries in this address book. + pub fn len(&self) -> usize { + self.by_time.len() + } } impl Extend for AddressBook { diff --git a/zebra-network/src/config.rs b/zebra-network/src/config.rs index 19c0e21f..f5d8f4ce 100644 --- a/zebra-network/src/config.rs +++ b/zebra-network/src/config.rs @@ -20,6 +20,8 @@ pub struct Config { pub ewma_default_rtt: Duration, /// The decay time for the exponentially-weighted moving average response time. pub ewma_decay_time: Duration, + /// The outgoing request buffer size for the peer set. + pub peerset_request_buffer_size: usize, } impl Default for Config { @@ -33,6 +35,7 @@ impl Default for Config { initial_peers: Vec::new(), ewma_default_rtt: Duration::from_secs(1), ewma_decay_time: Duration::from_secs(60), + peerset_request_buffer_size: 1, } } } diff --git a/zebra-network/src/peer_set.rs b/zebra-network/src/peer_set.rs index 81c1652d..3b95d241 100644 --- a/zebra-network/src/peer_set.rs +++ b/zebra-network/src/peer_set.rs @@ -3,13 +3,6 @@ // Portions of this submodule were adapted from tower-balance, // which is (c) 2019 Tower Contributors (MIT licensed). -mod discover; -mod set; -mod unready_service; - -pub use discover::PeerDiscover; -pub use set::PeerSet; - use std::{ net::SocketAddr, pin::Pin, @@ -38,6 +31,15 @@ use crate::{ AddressBook, BoxedStdError, Config, Request, Response, }; +mod candidate_set; +mod discover; +mod set; +mod unready_service; + +use candidate_set::CandidateSet; +pub use discover::PeerDiscover; +pub use set::PeerSet; + /// A type alias for a boxed [`tower::Service`] used to process [`Request`]s into [`Response`]s. pub type BoxedZebraService = Box< dyn Service< @@ -52,7 +54,13 @@ pub type BoxedZebraService = Box< type PeerChange = Result, BoxedStdError>; /// Initialize a peer set with the given `config`, forwarding peer requests to the `inbound_service`. -pub fn init(config: Config, inbound_service: S) -> (BoxedZebraService, Arc>) +pub fn init( + config: Config, + inbound_service: S, +) -> ( + impl Service + Send + Clone + 'static, + Arc>, +) where S: Service + Clone + Send + 'static, S::Future: Send + 'static, @@ -65,18 +73,26 @@ where // Create an mpsc channel for peer changes, with a generous buffer. let (peerset_tx, peerset_rx) = mpsc::channel::(100); + // Create an mpsc channel for peerset demand signaling. + let (demand_tx, demand_rx) = mpsc::channel::<()>(100); // Connect the rx end to a PeerSet, wrapping new peers in load instruments. - let peer_set = PeerSet::new(PeakEwmaDiscover::new( - ServiceStream::new( - // ServiceStream interprets an error as stream termination, - // so discard any errored connections... - peerset_rx.filter(|result| future::ready(result.is_ok())), + let peer_set = Buffer::new( + PeerSet::new( + PeakEwmaDiscover::new( + ServiceStream::new( + // ServiceStream interprets an error as stream termination, + // so discard any errored connections... + peerset_rx.filter(|result| future::ready(result.is_ok())), + ), + config.ewma_default_rtt, + config.ewma_decay_time, + NoInstrument, + ), + demand_tx, ), - config.ewma_default_rtt, - config.ewma_decay_time, - NoInstrument, - )); + config.peerset_request_buffer_size, + ); // Connect the tx end to the 3 peer sources: @@ -89,7 +105,12 @@ where // 2. Incoming peer connections, via a listener. tokio::spawn( - listen(config.listen_addr, peer_connector, peerset_tx).map(|result| { + listen( + config.listen_addr, + peer_connector.clone(), + peerset_tx.clone(), + ) + .map(|result| { if let Err(e) = result { error!(%e); } @@ -97,6 +118,20 @@ where ); // 3. Outgoing peers we connect to in response to load. + tokio::spawn( + crawl_and_dial( + demand_rx, + peer_set.clone(), + address_book.clone(), + peer_connector, + peerset_tx, + ) + .map(|result| { + if let Err(e) = result { + error!(%e); + } + }), + ); (Box::new(peer_set), address_book) } @@ -159,3 +194,128 @@ where } } } + +/// Given a channel that signals a need for new peers, try to connect to a peer +/// and send the resulting `PeerClient` through a channel. +/// +/// ```ascii,no_run +/// ┌─────────────────┐ +/// │ PeerSet │ +/// │GetPeers Requests│ +/// └─────────────────┘ +/// │ +/// │ +/// │ +/// │ +/// ▼ +/// ┌─────────────┐ filter by Λ filter by +/// │ PeerSet │!contains_addr╱ ╲ !contains_addr +/// ┌──│ AddressBook │────────────▶▕ ▏◀───────────────────┐ +/// │ └─────────────┘ ╲ ╱ │ +/// │ │ V │ +/// │ │disconnected_peers │ │ +/// │ ▼ │ │ +/// │ Λ filter by │ │ +/// │ ╱ ╲ !contains_addr │ │ +/// │ ▕ ▏◀───────────────────┼──────────────────────┤ +/// │ ╲ ╱ │ │ +/// │ V │ │ +/// │ │ │ │ +/// │┌────────┼──────────────────────┼──────────────────────┼────────┐ +/// ││ ▼ ▼ │ │ +/// ││ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ +/// ││ │Disconnected │ │ Gossiped │ │Failed Peers │ │ +/// ││ │ Peers │ │ Peers │ │ AddressBook │◀┼┐ +/// ││ │ AddressBook │ │ AddressBook │ │ │ ││ +/// ││ └─────────────┘ └─────────────┘ └─────────────┘ ││ +/// ││ │ │ │ ││ +/// ││ #1 drain_oldest #2 drain_newest #3 drain_oldest ││ +/// ││ │ │ │ ││ +/// ││ ├──────────────────────┴──────────────────────┘ ││ +/// ││ │ disjoint candidate sets ││ +/// │└────────┼──────────────────────────────────────────────────────┘│ +/// │ ▼ │ +/// │ Λ │ +/// │ ╱ ╲ filter by │ +/// └──────▶▕ ▏!is_potentially_connected │ +/// ╲ ╱ │ +/// V │ +/// │ │ +/// │ │ +/// ▼ │ +/// Λ │ +/// ╱ ╲ │ +/// ▕ ▏─────────────────────────────────────────────────────┘ +/// ╲ ╱ connection failed, update last_seen to now() +/// V +/// │ +/// │ +/// ▼ +/// ┌────────────┐ +/// │ send │ +/// │ PeerClient │ +/// │to Discover │ +/// └────────────┘ +/// ``` +#[instrument(skip( + demand_signal, + peer_set_service, + peer_set_address_book, + peer_connector, + success_tx +))] +async fn crawl_and_dial( + mut demand_signal: mpsc::Receiver<()>, + peer_set_service: S, + peer_set_address_book: Arc>, + mut peer_connector: C, + mut success_tx: mpsc::Sender, +) -> Result<(), BoxedStdError> +where + C: Service<(TcpStream, SocketAddr), Response = PeerClient, Error = BoxedStdError> + Clone, + C::Future: Send + 'static, + S: Service, + S::Future: Send + 'static, +{ + let mut candidates = CandidateSet { + disconnected: AddressBook::default(), + gossiped: AddressBook::default(), + failed: AddressBook::default(), + peer_set: peer_set_address_book.clone(), + peer_service: peer_set_service, + }; + + // XXX instead of just responding to demand, we could respond to demand *or* + // to a interval timer (to continuously grow the peer set). + while let Some(()) = demand_signal.next().await { + debug!("Got demand signal from peer set"); + loop { + candidates.update().await?; + // If we were unable to get a candidate, keep looping to crawl more. + let addr = match candidates.next() { + Some(candidate) => candidate.addr, + None => continue, + }; + + // Check that we have not connected to the candidate since it was + // pulled into the candidate set. + if peer_set_address_book + .lock() + .unwrap() + .is_potentially_connected(&addr) + { + continue; + }; + + if let Ok(stream) = TcpStream::connect(addr).await { + peer_connector.ready().await?; + if let Ok(client) = peer_connector.call((stream, addr)).await { + debug!("Successfully dialed new peer, sending to peerset"); + success_tx.send(Ok(Change::Insert(addr, client))).await?; + break; + } + } + } + } + Ok(()) +} diff --git a/zebra-network/src/peer_set/candidate_set.rs b/zebra-network/src/peer_set/candidate_set.rs new file mode 100644 index 00000000..f38ff7c6 --- /dev/null +++ b/zebra-network/src/peer_set/candidate_set.rs @@ -0,0 +1,83 @@ +use std::sync::{Arc, Mutex}; + +use chrono::{TimeZone, Utc}; +use futures::stream::{FuturesUnordered, Stream, StreamExt}; +use tower::{Service, ServiceExt}; + +use crate::{types::MetaAddr, AddressBook, BoxedStdError, Request, Response}; + +pub(super) struct CandidateSet { + pub(super) disconnected: AddressBook, + pub(super) gossiped: AddressBook, + pub(super) failed: AddressBook, + pub(super) peer_set: Arc>, + pub(super) peer_service: S, +} + +impl CandidateSet +where + S: Service, + S::Future: Send + 'static, +{ + pub async fn update(&mut self) -> Result<(), BoxedStdError> { + // Opportunistically crawl the network on every update call to ensure + // we're actively fetching peers. Continue independently of whether we + // actually receive any peers, but always ask the network for more. + // Because requests are load-balanced across existing peers, we can make + // multiple requests concurrently, which will be randomly assigned to + // existing peers, but we don't make too many because update may be + // called while the peer set is already loaded. + let mut responses = FuturesUnordered::new(); + for _ in 0..2usize { + self.peer_service.ready().await?; + responses.push(self.peer_service.call(Request::GetPeers)); + } + while let Some(rsp) = responses.next().await { + if let Ok(Response::Peers(addrs)) = rsp { + let addr_len = addrs.len(); + let prev_len = self.gossiped.len(); + // Filter new addresses to ensure that gossiped + let failed = &self.failed; + let peer_set = &self.peer_set; + let new_addrs = addrs + .into_iter() + .filter(|meta| failed.contains_addr(&meta.addr)) + .filter(|meta| peer_set.lock().unwrap().contains_addr(&meta.addr)); + self.gossiped.extend(new_addrs); + trace!( + addr_len, + new_addrs = self.gossiped.len() - prev_len, + "got response to GetPeers" + ); + } else { + trace!("got error in GetPeers request"); + } + } + + // Determine whether any known peers have recently disconnected. + let failed = &self.failed; + let peer_set = &self.peer_set; + self.disconnected.extend( + peer_set + .lock() + .expect("mutex must be unpoisoned") + .disconnected_peers() + .filter(|meta| failed.contains_addr(&meta.addr)), + ); + + Ok(()) + } + + pub fn next(&mut self) -> Option { + self.disconnected + .drain_oldest() + .chain(self.gossiped.drain_newest()) + .chain(self.failed.drain_oldest()) + .next() + } + + pub fn report_failed(&mut self, mut addr: MetaAddr) { + addr.last_seen = Utc::now(); + self.failed.update(addr); + } +} diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index 27c05a30..60b5dc20 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -7,7 +7,10 @@ use std::{ task::{Context, Poll}, }; -use futures::{channel::oneshot, stream::FuturesUnordered}; +use futures::{ + channel::{mpsc, oneshot}, + stream::FuturesUnordered, +}; use indexmap::IndexMap; use tokio::prelude::*; use tower::{ @@ -76,6 +79,7 @@ where cancel_handles: HashMap>, unready_services: FuturesUnordered>, next_idx: Option, + demand_signal: mpsc::Sender<()>, } impl PeerSet @@ -89,13 +93,14 @@ where ::Metric: Debug, { /// Construct a peerset which uses `discover` internally. - pub fn new(discover: D) -> Self { + pub fn new(discover: D, demand_signal: mpsc::Sender<()>) -> Self { Self { discover, ready_services: IndexMap::new(), cancel_handles: HashMap::new(), unready_services: FuturesUnordered::new(), next_idx: None, + demand_signal, } } @@ -264,7 +269,8 @@ where self.next_idx = self.select_next_ready_index(); if self.next_idx.is_none() { - trace!("no ready services, returning Poll::Pending"); + trace!("no ready services, sending demand signal"); + let _ = self.demand_signal.try_send(()); return Poll::Pending; } } diff --git a/zebrad/src/commands/connect.rs b/zebrad/src/commands/connect.rs index 5da2876c..e0d8b8f1 100644 --- a/zebrad/src/commands/connect.rs +++ b/zebrad/src/commands/connect.rs @@ -68,45 +68,7 @@ impl ConnectCmd { let mut config = app_config().network.clone(); - // Until we finish fleshing out the peerset -- particularly - // pulling more peers -- we don't want to start with a single - // initial peer. So make a throwaway connection to the first, - // extract a list of addresses, and discard everything else. - // All the setup is kept in a sub-scope so we know we're not reusing it. - // - // Later, this should turn into initial_peers = vec![self.addr]; - config.initial_peers = { - use tokio::net::TcpStream; - use zebra_network::should_be_private::{PeerConnector, TimestampCollector}; - - let (_, collector) = TimestampCollector::spawn(); - let mut pc = Buffer::new( - PeerConnector::new(config.clone(), node.clone(), collector), - 1, - ); - - let tcp_stream = TcpStream::connect(self.addr).await?; - pc.ready() - .await - .map_err(failure::Error::from_boxed_compat)?; - let mut client = pc - .call((tcp_stream, self.addr)) - .await - .map_err(failure::Error::from_boxed_compat)?; - - client.ready().await?; - - let addrs = match client.call(Request::GetPeers).await? { - Response::Peers(addrs) => addrs, - _ => bail!("Got wrong response type"), - }; - info!( - addrs.len = addrs.len(), - "got addresses from first connected peer" - ); - - addrs.into_iter().map(|meta| meta.addr).collect::>() - }; + config.initial_peers = vec![self.addr]; let (mut peer_set, address_book) = zebra_network::init(config, node);