From 5424e1d8ba67067de41d73c68cf87c856b6150d9 Mon Sep 17 00:00:00 2001 From: teor Date: Thu, 18 Feb 2021 11:18:32 +1000 Subject: [PATCH] Fix candidate set address state handling (#1709) Design: - Add a `PeerAddrState` to each `MetaAddr` - Use a single peer set for all peers, regardless of state - Implement time-based liveness as an `AddressBook` method, rather than a `PeerAddrState` variant - Delete `AddressBook.by_state` Implementation: - Simplify `AddressBook` changes using `update` and `take` modifier methods - Simplify the `AddressBook` iterator implementation, replacing it with methods that are more obviously correct - Consistently collect peer set metrics Documentation: - Expand and update the peer set documentation We can optimise later, but for now we want simple code that is more obviously correct. --- zebra-network/src/address_book.rs | 268 ++++++++++++-------- zebra-network/src/lib.rs | 1 + zebra-network/src/meta_addr.rs | 155 +++++++++-- zebra-network/src/peer/handshake.rs | 3 +- zebra-network/src/peer_set/candidate_set.rs | 208 ++++++++------- 5 files changed, 427 insertions(+), 208 deletions(-) diff --git a/zebra-network/src/address_book.rs b/zebra-network/src/address_book.rs index 69e97430..d39c09b3 100644 --- a/zebra-network/src/address_book.rs +++ b/zebra-network/src/address_book.rs @@ -10,17 +10,16 @@ use std::{ use chrono::{DateTime, Utc}; use tracing::Span; -use crate::{ - constants, - types::{MetaAddr, PeerServices}, -}; +use crate::{constants, types::MetaAddr, PeerAddrState}; /// A database of peers, their advertised services, and information on when they /// were last seen. #[derive(Debug)] pub struct AddressBook { - by_addr: HashMap, PeerServices)>, - by_time: BTreeSet, + /// Each known peer address has a matching `MetaAddr` + by_addr: HashMap, + + /// The span for operations on this address book. span: Span, } @@ -28,34 +27,27 @@ pub struct AddressBook { impl AddressBook { /// Construct an `AddressBook` with the given [`tracing::Span`]. pub fn new(span: Span) -> AddressBook { - AddressBook { + let constructor_span = span.clone(); + let _guard = constructor_span.enter(); + + let new_book = AddressBook { by_addr: HashMap::default(), - by_time: BTreeSet::default(), span, - } + }; + + new_book.update_metrics(); + new_book } /// Get the contents of `self` in random order with sanitized timestamps. pub fn sanitized(&self) -> Vec { use rand::seq::SliceRandom; + let _guard = self.span.enter(); let mut peers = self.peers().map(MetaAddr::sanitize).collect::>(); peers.shuffle(&mut rand::thread_rng()); peers } - /// Check consistency of the address book invariants or panic, doing work - /// quadratic in the address book size. - #[cfg(test)] - fn assert_consistency(&self) { - for (a, (t, s)) in self.by_addr.iter() { - for meta in self.by_time.iter().filter(|meta| meta.addr == *a) { - if meta.last_seen != *t || meta.services != *s { - panic!("meta {:?} is not {:?}, {:?}, {:?}", meta, a, t, s); - } - } - } - } - /// Returns true if the address book has an entry for `addr`. pub fn contains_addr(&self, addr: &SocketAddr) -> bool { let _guard = self.span.enter(); @@ -65,131 +57,205 @@ impl AddressBook { /// Returns the entry corresponding to `addr`, or `None` if it does not exist. pub fn get_by_addr(&self, addr: SocketAddr) -> Option { let _guard = self.span.enter(); - let (last_seen, services) = self.by_addr.get(&addr).cloned()?; - Some(MetaAddr { - addr, - last_seen, - services, - }) + self.by_addr.get(&addr).cloned() } /// Add `new` to the address book, updating the previous entry if `new` is /// more recent or discarding `new` if it is stale. + /// + /// ## Note + /// + /// All changes should go through `update` or `take`, to ensure accurate metrics. pub fn update(&mut self, new: MetaAddr) { let _guard = self.span.enter(); trace!( ?new, - data.total = self.by_time.len(), - data.recent = (self.by_time.len() - self.disconnected_peers().count()), + total_peers = self.by_addr.len(), + recent_peers = self.recently_live_peers().count(), ); - #[cfg(test)] - self.assert_consistency(); if let Some(prev) = self.get_by_addr(new.addr) { if prev.last_seen > new.last_seen { return; - } else { - self.by_time - .take(&prev) - .expect("cannot have by_addr entry without by_time entry"); } } - self.by_time.insert(new); - self.by_addr.insert(new.addr, (new.last_seen, new.services)); - #[cfg(test)] - self.assert_consistency(); + self.by_addr.insert(new.addr, new); + self.update_metrics(); + } + + /// Removes the entry with `addr`, returning it if it exists + /// + /// ## Note + /// + /// All changes should go through `update` or `take`, to ensure accurate metrics. + fn take(&mut self, removed_addr: SocketAddr) -> Option { + let _guard = self.span.enter(); + trace!( + ?removed_addr, + total_peers = self.by_addr.len(), + recent_peers = self.recently_live_peers().count(), + ); + + if let Some(entry) = self.by_addr.remove(&removed_addr) { + self.update_metrics(); + Some(entry) + } else { + None + } } /// Compute a cutoff time that can determine whether an entry /// in an address book being updated with peer message timestamps - /// represents a known-disconnected peer or a potentially-connected peer. + /// represents a likely-dead (or hung) peer, or a potentially-connected peer. /// /// [`constants::LIVE_PEER_DURATION`] represents the time interval in which - /// we are guaranteed to receive at least one message from a peer or close - /// the connection. Therefore, if the last-seen timestamp is older than - /// [`constants::LIVE_PEER_DURATION`] ago, we know we must have disconnected - /// from it. Otherwise, we could potentially be connected to it. - fn cutoff_time() -> DateTime { + /// we should receive at least one message from a peer, or close the + /// connection. Therefore, if the last-seen timestamp is older than + /// [`constants::LIVE_PEER_DURATION`] ago, we know we should have + /// disconnected from it. Otherwise, we could potentially be connected to it. + fn liveness_cutoff_time() -> DateTime { // chrono uses signed durations while stdlib uses unsigned durations use chrono::Duration as CD; Utc::now() - CD::from_std(constants::LIVE_PEER_DURATION).unwrap() } - /// Used for range bounds, see cutoff_time - fn cutoff_meta() -> MetaAddr { - use std::net::{IpAddr, Ipv4Addr}; - MetaAddr { - last_seen: AddressBook::cutoff_time(), - // The ordering on MetaAddrs is newest-first, then arbitrary, - // so any fields will do here. - addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0), - services: PeerServices::default(), - } - } - - /// Returns true if the given [`SocketAddr`] could potentially be connected - /// to a node feeding timestamps into this address book. - pub fn is_potentially_connected(&self, addr: &SocketAddr) -> bool { + /// Returns true if the given [`SocketAddr`] has recently sent us a message. + pub fn recently_live_addr(&self, addr: &SocketAddr) -> bool { let _guard = self.span.enter(); match self.by_addr.get(addr) { None => false, - Some((ref last_seen, _)) => last_seen > &AddressBook::cutoff_time(), + // NeverAttempted, Failed, and AttemptPending peers should never be live + Some(peer) => { + peer.last_connection_state == PeerAddrState::Responded + && peer.last_seen > AddressBook::liveness_cutoff_time() + } } } - /// Return an iterator over all peers, ordered from most recently seen to - /// least recently seen. + /// Returns true if the given [`SocketAddr`] is pending a reconnection + /// attempt. + pub fn pending_reconnection_addr(&self, addr: &SocketAddr) -> bool { + let _guard = self.span.enter(); + match self.by_addr.get(addr) { + None => false, + Some(peer) => peer.last_connection_state == PeerAddrState::AttemptPending, + } + } + + /// Returns true if the given [`SocketAddr`] might be connected to a node + /// feeding timestamps into this address book. + pub fn maybe_connected_addr(&self, addr: &SocketAddr) -> bool { + self.recently_live_addr(addr) || self.pending_reconnection_addr(addr) + } + + /// Return an iterator over all peers. + /// + /// Returns peers in reconnection attempt order, then recently live peers in + /// an arbitrary order. pub fn peers(&'_ self) -> impl Iterator + '_ { let _guard = self.span.enter(); - self.by_time.iter().rev().cloned() + self.reconnection_peers() + .chain(self.maybe_connected_peers()) } - /// Return an iterator over peers known to be disconnected, ordered from most - /// recently seen to least recently seen. - pub fn disconnected_peers(&'_ self) -> impl Iterator + '_ { + /// Return an iterator over peers that are due for a reconnection attempt, + /// in reconnection attempt order. + pub fn reconnection_peers(&'_ self) -> impl Iterator + '_ { let _guard = self.span.enter(); - use std::ops::Bound::{Excluded, Unbounded}; - self.by_time - .range((Excluded(Self::cutoff_meta()), Unbounded)) - .rev() + // TODO: optimise, if needed, or get rid of older peers + + // Skip live peers, and peers pending a reconnect attempt, then sort using BTreeSet + self.by_addr + .values() + .filter(move |peer| !self.maybe_connected_addr(&peer.addr)) + .collect::>() + .into_iter() .cloned() } - /// Return an iterator over peers that could potentially be connected, ordered from most - /// recently seen to least recently seen. - pub fn potentially_connected_peers(&'_ self) -> impl Iterator + '_ { + /// Return an iterator over all the peers in `state`, in arbitrary order. + pub fn state_peers(&'_ self, state: PeerAddrState) -> impl Iterator + '_ { let _guard = self.span.enter(); - use std::ops::Bound::{Included, Unbounded}; - self.by_time - .range((Unbounded, Included(Self::cutoff_meta()))) - .rev() + self.by_addr + .values() + .filter(move |peer| peer.last_connection_state == state) .cloned() } - /// Returns an iterator that drains entries from the address book, removing - /// them in order from most recent to least recent. - pub fn drain_newest(&'_ mut self) -> impl Iterator + '_ { - Drain { - book: self, - newest_first: true, - } + /// Return an iterator over peers that might be connected, in arbitrary + /// order. + pub fn maybe_connected_peers(&'_ self) -> impl Iterator + '_ { + let _guard = self.span.enter(); + + self.by_addr + .values() + .filter(move |peer| self.maybe_connected_addr(&peer.addr)) + .cloned() } - /// Returns an iterator that drains entries from the address book, removing - /// them in order from least recent to most recent. - pub fn drain_oldest(&'_ mut self) -> impl Iterator + '_ { - Drain { - book: self, - newest_first: false, - } + /// Return an iterator over peers we've seen recently, in arbitrary order. + pub fn recently_live_peers(&'_ self) -> impl Iterator + '_ { + let _guard = self.span.enter(); + + self.by_addr + .values() + .filter(move |peer| self.recently_live_addr(&peer.addr)) + .cloned() + } + + /// Returns an iterator that drains entries from the address book. + /// + /// Removes entries in reconnection attempt then arbitrary order, + /// see [`peers`] for details. + pub fn drain(&'_ mut self) -> impl Iterator + '_ { + Drain { book: self } } /// Returns the number of entries in this address book. pub fn len(&self) -> usize { - self.by_time.len() + self.by_addr.len() + } + + /// Update the metrics for this address book. + fn update_metrics(&self) { + let _guard = self.span.enter(); + + let responded = self.state_peers(PeerAddrState::Responded).count(); + let never_attempted = self.state_peers(PeerAddrState::NeverAttempted).count(); + let failed = self.state_peers(PeerAddrState::Failed).count(); + let pending = self.state_peers(PeerAddrState::AttemptPending).count(); + + let recently_live = self.recently_live_peers().count(); + let recently_stopped_responding = responded + .checked_sub(recently_live) + .expect("all recently live peers must have responded"); + + // TODO: rename to address_book.responded.recently_live + metrics::gauge!("candidate_set.recently_live", recently_live as f64); + // TODO: rename to address_book.responded.stopped_responding + metrics::gauge!( + "candidate_set.disconnected", + recently_stopped_responding as f64 + ); + + // TODO: rename to address_book.[state_name] + metrics::gauge!("candidate_set.responded", responded as f64); + metrics::gauge!("candidate_set.gossiped", never_attempted as f64); + metrics::gauge!("candidate_set.failed", failed as f64); + metrics::gauge!("candidate_set.pending", pending as f64); + + debug!( + %recently_live, + %recently_stopped_responding, + %responded, + %never_attempted, + %failed, + %pending, + "address book peers" + ); } } @@ -206,23 +272,13 @@ impl Extend for AddressBook { struct Drain<'a> { book: &'a mut AddressBook, - newest_first: bool, } impl<'a> Iterator for Drain<'a> { type Item = MetaAddr; fn next(&mut self) -> Option { - let next_item = if self.newest_first { - *self.book.by_time.iter().next()? - } else { - *self.book.by_time.iter().rev().next()? - }; - self.book.by_time.remove(&next_item); - self.book - .by_addr - .remove(&next_item.addr) - .expect("cannot have by_time entry without by_addr entry"); - Some(next_item) + let next_item_addr = self.book.peers().next()?.addr; + self.book.take(next_item_addr) } } diff --git a/zebra-network/src/lib.rs b/zebra-network/src/lib.rs index 75f15ffd..fbcdee69 100644 --- a/zebra-network/src/lib.rs +++ b/zebra-network/src/lib.rs @@ -79,6 +79,7 @@ pub use crate::{ address_book::AddressBook, config::Config, isolated::connect_isolated, + meta_addr::PeerAddrState, peer_set::init, policies::{RetryErrors, RetryLimit}, protocol::internal::{Request, Response}, diff --git a/zebra-network/src/meta_addr.rs b/zebra-network/src/meta_addr.rs index a122a10f..9a066c59 100644 --- a/zebra-network/src/meta_addr.rs +++ b/zebra-network/src/meta_addr.rs @@ -15,6 +15,70 @@ use zebra_chain::serialization::{ use crate::protocol::types::PeerServices; +/// Peer connection state, based on our interactions with the peer. +/// +/// Zebra also tracks how recently a peer has sent us messages, and derives peer +/// liveness based on the current time. This derived state is tracked using +/// [`AddressBook::maybe_connected_peers`] and +/// [`AddressBook::reconnection_peers`]. +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +pub enum PeerAddrState { + /// The peer has sent us a valid message. + /// + /// Peers remain in this state, even if they stop responding to requests. + /// (Peer liveness is derived from the `last_seen` timestamp, and the current + /// time.) + Responded, + + /// The peer's address has just been fetched from a DNS seeder, or via peer + /// gossip, but we haven't attempted to connect to it yet. + NeverAttempted, + + /// The peer's TCP connection failed, or the peer sent us an unexpected + /// Zcash protocol message, so we failed the connection. + Failed, + + /// We just started a connection attempt to this peer. + AttemptPending, +} + +impl Default for PeerAddrState { + fn default() -> Self { + PeerAddrState::NeverAttempted + } +} + +impl Ord for PeerAddrState { + /// `PeerAddrState`s are sorted in approximate reconnection attempt + /// order, ignoring liveness. + /// + /// See [`CandidateSet`] and [`MetaAddr::cmp`] for more details. + fn cmp(&self, other: &Self) -> Ordering { + use PeerAddrState::*; + match (self, other) { + (Responded, Responded) + | (NeverAttempted, NeverAttempted) + | (Failed, Failed) + | (AttemptPending, AttemptPending) => Ordering::Equal, + // We reconnect to `Responded` peers that have stopped sending messages, + // then `NeverAttempted` peers, then `Failed` peers + (Responded, _) => Ordering::Less, + (_, Responded) => Ordering::Greater, + (NeverAttempted, _) => Ordering::Less, + (_, NeverAttempted) => Ordering::Greater, + (Failed, _) => Ordering::Less, + (_, Failed) => Ordering::Greater, + // AttemptPending is covered by the other cases + } + } +} + +impl PartialOrd for PeerAddrState { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + /// An address with metadata on its advertised services and last-seen time. /// /// [Bitcoin reference](https://en.bitcoin.it/wiki/Protocol_documentation#Network_address) @@ -22,10 +86,41 @@ use crate::protocol::types::PeerServices; pub struct MetaAddr { /// The peer's address. pub addr: SocketAddr, + /// The services advertised by the peer. + /// + /// The exact meaning depends on `last_connection_state`: + /// - `Responded`: the services advertised by this peer, the last time we + /// performed a handshake with it + /// - `NeverAttempted`: the unverified services provided by the remote peer + /// that sent us this address + /// - `Failed` or `AttemptPending`: unverified services via another peer, + /// or services advertised in a previous handshake + /// + /// ## Security + /// + /// `services` from `NeverAttempted` peers may be invalid due to outdated + /// records, older peer versions, or buggy or malicious peers. pub services: PeerServices, - /// When the peer was last seen. + + /// The last time we interacted with this peer. + /// + /// The exact meaning depends on `last_connection_state`: + /// - `Responded`: the last time we processed a message from this peer + /// - `NeverAttempted`: the unverified time provided by the remote peer + /// that sent us this address + /// - `Failed`: the last time we marked the peer as failed + /// - `AttemptPending`: the last time we queued the peer for a reconnection + /// attempt + /// + /// ## Security + /// + /// `last_seen` times from `NeverAttempted` peers may be invalid due to + /// clock skew, or buggy or malicious peers. pub last_seen: DateTime, + + /// The outcome of our most recent communication attempt with this peer. + pub last_connection_state: PeerAddrState, } impl MetaAddr { @@ -34,29 +129,48 @@ impl MetaAddr { let interval = crate::constants::TIMESTAMP_TRUNCATION_SECONDS; let ts = self.last_seen.timestamp(); self.last_seen = Utc.timestamp(ts - ts.rem_euclid(interval), 0); + self.last_connection_state = Default::default(); self } } impl Ord for MetaAddr { - /// `MetaAddr`s are sorted newest-first, and then in an arbitrary - /// but determinate total order. + /// `MetaAddr`s are sorted in approximate reconnection attempt order, but + /// with `Responded` peers sorted first as a group. + /// + /// This order should not be used for reconnection attempts: use + /// [`AddressBook::reconnection_peers`] instead. + /// + /// See [`CandidateSet`] for more details. fn cmp(&self, other: &Self) -> Ordering { - let newest_first = self.last_seen.cmp(&other.last_seen).reverse(); - newest_first.then_with(|| { + use std::net::IpAddr::{V4, V6}; + use PeerAddrState::*; + + let oldest_first = self.last_seen.cmp(&other.last_seen); + let newest_first = oldest_first.reverse(); + + let connection_state = self.last_connection_state.cmp(&other.last_connection_state); + let reconnection_time = match self.last_connection_state { + Responded => oldest_first, + NeverAttempted => newest_first, + Failed => oldest_first, + AttemptPending => oldest_first, + }; + let ip_numeric = match (self.addr.ip(), other.addr.ip()) { + (V4(a), V4(b)) => a.octets().cmp(&b.octets()), + (V6(a), V6(b)) => a.octets().cmp(&b.octets()), + (V4(_), V6(_)) => Ordering::Less, + (V6(_), V4(_)) => Ordering::Greater, + }; + + connection_state + .then(reconnection_time) // The remainder is meaningless as an ordering, but required so that we // have a total order on `MetaAddr` values: self and other must compare // as Ordering::Equal iff they are equal. - use std::net::IpAddr::{V4, V6}; - match (self.addr.ip(), other.addr.ip()) { - (V4(a), V4(b)) => a.octets().cmp(&b.octets()), - (V6(a), V6(b)) => a.octets().cmp(&b.octets()), - (V4(_), V6(_)) => Ordering::Less, - (V6(_), V4(_)) => Ordering::Greater, - } + .then(ip_numeric) .then(self.addr.port().cmp(&other.addr.port())) .then(self.services.bits().cmp(&other.services.bits())) - }) } } @@ -82,6 +196,7 @@ impl ZcashDeserialize for MetaAddr { // Discard unknown service bits. services: PeerServices::from_bits_truncate(reader.read_u64::()?), addr: reader.read_socket_addr()?, + last_connection_state: Default::default(), }) } } @@ -94,16 +209,26 @@ mod tests { fn sanitize_truncates_timestamps() { zebra_test::init(); + let services = PeerServices::default(); + let addr = "127.0.0.1:8233".parse().unwrap(); + let entry = MetaAddr { - services: PeerServices::default(), - addr: "127.0.0.1:8233".parse().unwrap(), + services, + addr, last_seen: Utc.timestamp(1_573_680_222, 0), + last_connection_state: PeerAddrState::Responded, } .sanitize(); + // We want the sanitized timestamp to be a multiple of the truncation interval. assert_eq!( entry.last_seen.timestamp() % crate::constants::TIMESTAMP_TRUNCATION_SECONDS, 0 ); + // We want the state to be the default + assert_eq!(entry.last_connection_state, Default::default()); + // We want the other fields to be unmodified + assert_eq!(entry.addr, addr); + assert_eq!(entry.services, services); } } diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index a7ffd641..2d859724 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -27,7 +27,7 @@ use crate::{ internal::{Request, Response}, }, types::MetaAddr, - BoxError, Config, + BoxError, Config, PeerAddrState, }; use super::{Client, Connection, ErrorSlot, HandshakeError, PeerError}; @@ -390,6 +390,7 @@ where addr, services: remote_services, last_seen: Utc::now(), + last_connection_state: PeerAddrState::Responded, }) .await; } diff --git a/zebra-network/src/peer_set/candidate_set.rs b/zebra-network/src/peer_set/candidate_set.rs index 0b7e8cbd..14d22e75 100644 --- a/zebra-network/src/peer_set/candidate_set.rs +++ b/zebra-network/src/peer_set/candidate_set.rs @@ -3,60 +3,73 @@ use std::sync::{Arc, Mutex}; use chrono::Utc; use futures::stream::{FuturesUnordered, StreamExt}; use tower::{Service, ServiceExt}; -use tracing::Level; -use crate::{types::MetaAddr, AddressBook, BoxError, Request, Response}; +use crate::{types::MetaAddr, AddressBook, BoxError, PeerAddrState, Request, Response}; -/// The `CandidateSet` maintains a pool of candidate peers. +/// The `CandidateSet` manages the `PeerSet`'s peer reconnection attempts. /// -/// It divides the set of all possible candidate peers into three disjoint subsets: +/// It divides the set of all possible candidate peers into disjoint subsets, +/// using the `PeerAddrState`: /// -/// 1. Disconnected peers, which we previously connected to but are not currently connected to; -/// 2. Gossiped peers, which we learned about from other peers but have never connected to; -/// 3. Failed peers, to whom we attempted to connect but were unable to. +/// 1. `Responded` peers, which we previously connected to. If we have not received +/// any messages from a `Responded` peer within a cutoff time, we assume that it +/// has disconnected or hung, and attempt reconnection; +/// 2. `NeverAttempted` peers, which we learned about from other peers or a DNS +/// seeder, but have never connected to; +/// 3. `Failed` peers, to whom we attempted to connect but were unable to; +/// 4. `AttemptPending` peers, which we've recently queued for reconnection. /// /// ```ascii,no_run -/// ┌─────────────────┐ -/// │ PeerSet │ -/// │GetPeers Requests│ -/// └─────────────────┘ +/// ┌──────────────────┐ +/// │ PeerSet │ +/// │GetPeers Responses│ +/// └──────────────────┘ /// │ /// │ /// │ /// │ /// ▼ -/// ┌─────────────┐ filter by Λ filter by -/// │ PeerSet │!contains_addr╱ ╲ !contains_addr -/// ┌──│ AddressBook │────────────▶▕ ▏◀───────────────────┐ -/// │ └─────────────┘ ╲ ╱ │ -/// │ │ V │ -/// │ │disconnected_peers │ │ -/// │ ▼ │ │ -/// │ Λ filter by │ │ -/// │ ╱ ╲ !contains_addr │ │ -/// │ ▕ ▏◀───────────────────┼──────────────────────┤ -/// │ ╲ ╱ │ │ -/// │ V │ │ -/// │ │ │ │ -/// │┌────────┼──────────────────────┼──────────────────────┼────────┐ -/// ││ ▼ ▼ │ │ -/// ││ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ -/// ││ │Disconnected │ │ Gossiped │ │Failed Peers │ │ -/// ││ │ Peers │ │ Peers │ │ AddressBook │◀┼┐ -/// ││ │ AddressBook │ │ AddressBook │ │ │ ││ -/// ││ └─────────────┘ └─────────────┘ └─────────────┘ ││ -/// ││ │ │ │ ││ -/// ││ #1 drain_oldest #2 drain_newest #3 drain_oldest ││ -/// ││ │ │ │ ││ -/// ││ ├──────────────────────┴──────────────────────┘ ││ -/// ││ │ disjoint candidate sets ││ -/// │└────────┼──────────────────────────────────────────────────────┘│ -/// │ ▼ │ -/// │ Λ │ -/// │ ╱ ╲ filter by │ -/// └──────▶▕ ▏!is_potentially_connected │ -/// ╲ ╱ │ -/// V │ +/// filter by Λ +/// !contains_addr ╱ ╲ +/// ┌────────────────────────────▶▕ ▏ +/// │ ╲ ╱ +/// │ V +/// │ │ +/// │ │ +/// │ │ +/// │ │ +/// │ │ +/// │ │ +/// │ │ +/// │ │ +/// ├───────────────────────────────┼───────────────────────────────┐ +/// │ PeerSet AddressBook ▼ │ +/// │ ┌─────────────┐ ┌────────────────┐ ┌─────────────┐ │ +/// │ │ Possibly │ │`NeverAttempted`│ │ `Failed` │ │ +/// │ │Disconnected │ │ Peers │ │ Peers │◀┼┐ +/// │ │ `Responded` │ │ │ │ │ ││ +/// │ │ Peers │ │ │ │ │ ││ +/// │ └─────────────┘ └────────────────┘ └─────────────┘ ││ +/// │ │ │ │ ││ +/// │ #1 oldest_first #2 newest_first #3 oldest_first ││ +/// │ │ │ │ ││ +/// │ ├──────────────────────┴──────────────────────┘ ││ +/// │ │ disjoint `PeerAddrState`s ││ +/// ├────────┼──────────────────────────────────────────────────────┘│ +/// │ ▼ │ +/// │ Λ │ +/// │ ╱ ╲ filter by │ +/// └─────▶▕ ▏!is_potentially_connected │ +/// ╲ ╱ to remove live │ +/// V `Responded` peers │ +/// │ │ +/// │ │ +/// ▼ │ +/// ┌────────────────┐ │ +/// │`AttemptPending`│ │ +/// │ Peers │ │ +/// │ │ │ +/// └────────────────┘ │ /// │ │ /// │ │ /// ▼ │ @@ -73,11 +86,20 @@ use crate::{types::MetaAddr, AddressBook, BoxError, Request, Response}; /// │peer::Client│ /// │to Discover │ /// └────────────┘ +/// │ +/// │ +/// ▼ +/// ┌───────────────────────────────────────┐ +/// │ every time we receive a peer message: │ +/// │ * update state to `Responded` │ +/// │ * update last_seen to now() │ +/// └───────────────────────────────────────┘ +/// /// ``` +// TODO: +// * draw arrow from the "peer message" box into the `Responded` state box +// * make the "disjoint states" box include `AttemptPending` pub(super) struct CandidateSet { - pub(super) disconnected: AddressBook, - pub(super) gossiped: AddressBook, - pub(super) failed: AddressBook, pub(super) peer_set: Arc>, pub(super) peer_service: S, } @@ -87,16 +109,28 @@ where S: Service, S::Future: Send + 'static, { + /// Uses `peer_set` and `peer_service` to manage a [`CandidateSet`] of peers. pub fn new(peer_set: Arc>, peer_service: S) -> CandidateSet { CandidateSet { - disconnected: AddressBook::new(span!(Level::TRACE, "disconnected peers")), - gossiped: AddressBook::new(span!(Level::TRACE, "gossiped peers")), - failed: AddressBook::new(span!(Level::TRACE, "failed peers")), peer_set, peer_service, } } + /// Update the peer set from the network. + /// + /// - Ask a few live `Responded` peers to send us more peers. + /// - Process all completed peer responses, adding new peers in the + /// `NeverAttempted` state. + /// + /// ## Correctness + /// + /// The handshaker sets up the peer message receiver so it also sends a + /// `Responded` peer address update. + /// + /// `report_failed` puts peers into the `Failed` state. + /// + /// `next` puts peers into the `AttemptPending` state. pub async fn update(&mut self) -> Result<(), BoxError> { // Opportunistically crawl the network on every update call to ensure // we're actively fetching peers. Continue independently of whether we @@ -113,60 +147,62 @@ where responses.push(self.peer_service.call(Request::Peers)); } while let Some(rsp) = responses.next().await { - if let Ok(Response::Peers(addrs)) = rsp { - let addr_len = addrs.len(); - let prev_len = self.gossiped.len(); + if let Ok(Response::Peers(rsp_addrs)) = rsp { // Filter new addresses to ensure that gossiped addresses are actually new - let failed = &self.failed; let peer_set = &self.peer_set; - let new_addrs = addrs - .into_iter() - .filter(|meta| !failed.contains_addr(&meta.addr)) - .filter(|meta| !peer_set.lock().unwrap().contains_addr(&meta.addr)); - self.gossiped.extend(new_addrs); + let new_addrs = rsp_addrs + .iter() + .filter(|meta| !peer_set.lock().unwrap().contains_addr(&meta.addr)) + .collect::>(); trace!( - addr_len, - new_addrs = self.gossiped.len() - prev_len, + ?rsp_addrs, + new_addr_count = ?new_addrs.len(), "got response to GetPeers" ); + // New addresses are deserialized in the `NeverAttempted` state + peer_set + .lock() + .unwrap() + .extend(new_addrs.into_iter().cloned()); } else { trace!("got error in GetPeers request"); } } - // Determine whether any known peers have recently disconnected. - let failed = &self.failed; - let peer_set = &self.peer_set; - self.disconnected.extend( - peer_set - .lock() - .expect("mutex must be unpoisoned") - .disconnected_peers() - .filter(|meta| failed.contains_addr(&meta.addr)), - ); - Ok(()) } + /// Returns the next candidate for a connection attempt, if any are available. + /// + /// Returns peers in this order: + /// - oldest `Responded` that are not live + /// - newest `NeverAttempted` + /// - oldest `Failed` + /// + /// Skips `AttemptPending` peers and live `Responded` peers. + /// + /// ## Correctness + /// + /// `AttemptPending` peers will become `Responded` if they respond, or + /// become `Failed` if they time out or provide a bad response. + /// + /// Live `Responded` peers will stay live if they keep responding, or + /// become a reconnection candidate if they stop responding. pub fn next(&mut self) -> Option { - metrics::gauge!("candidate_set.disconnected", self.disconnected.len() as f64); - metrics::gauge!("candidate_set.gossiped", self.gossiped.len() as f64); - metrics::gauge!("candidate_set.failed", self.failed.len() as f64); - debug!( - disconnected_peers = self.disconnected.len(), - gossiped_peers = self.gossiped.len(), - failed_peers = self.failed.len() - ); - let guard = self.peer_set.lock().unwrap(); - self.disconnected - .drain_oldest() - .chain(self.gossiped.drain_newest()) - .chain(self.failed.drain_oldest()) - .find(|meta| !guard.is_potentially_connected(&meta.addr)) + let mut peer_set_guard = self.peer_set.lock().unwrap(); + let mut reconnect = peer_set_guard.reconnection_peers().next()?; + + reconnect.last_seen = Utc::now(); + reconnect.last_connection_state = PeerAddrState::AttemptPending; + peer_set_guard.update(reconnect); + + Some(reconnect) } + /// Mark `addr` as a failed peer. pub fn report_failed(&mut self, mut addr: MetaAddr) { addr.last_seen = Utc::now(); - self.failed.update(addr); + addr.last_connection_state = PeerAddrState::Failed; + self.peer_set.lock().unwrap().update(addr); } }