Fix candidate set address state handling (#1709)
Design: - Add a `PeerAddrState` to each `MetaAddr` - Use a single peer set for all peers, regardless of state - Implement time-based liveness as an `AddressBook` method, rather than a `PeerAddrState` variant - Delete `AddressBook.by_state` Implementation: - Simplify `AddressBook` changes using `update` and `take` modifier methods - Simplify the `AddressBook` iterator implementation, replacing it with methods that are more obviously correct - Consistently collect peer set metrics Documentation: - Expand and update the peer set documentation We can optimise later, but for now we want simple code that is more obviously correct.
This commit is contained in:
parent
579bd4a368
commit
5424e1d8ba
|
|
@ -10,17 +10,16 @@ use std::{
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use tracing::Span;
|
use tracing::Span;
|
||||||
|
|
||||||
use crate::{
|
use crate::{constants, types::MetaAddr, PeerAddrState};
|
||||||
constants,
|
|
||||||
types::{MetaAddr, PeerServices},
|
|
||||||
};
|
|
||||||
|
|
||||||
/// A database of peers, their advertised services, and information on when they
|
/// A database of peers, their advertised services, and information on when they
|
||||||
/// were last seen.
|
/// were last seen.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct AddressBook {
|
pub struct AddressBook {
|
||||||
by_addr: HashMap<SocketAddr, (DateTime<Utc>, PeerServices)>,
|
/// Each known peer address has a matching `MetaAddr`
|
||||||
by_time: BTreeSet<MetaAddr>,
|
by_addr: HashMap<SocketAddr, MetaAddr>,
|
||||||
|
|
||||||
|
/// The span for operations on this address book.
|
||||||
span: Span,
|
span: Span,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -28,34 +27,27 @@ pub struct AddressBook {
|
||||||
impl AddressBook {
|
impl AddressBook {
|
||||||
/// Construct an `AddressBook` with the given [`tracing::Span`].
|
/// Construct an `AddressBook` with the given [`tracing::Span`].
|
||||||
pub fn new(span: Span) -> AddressBook {
|
pub fn new(span: Span) -> AddressBook {
|
||||||
AddressBook {
|
let constructor_span = span.clone();
|
||||||
|
let _guard = constructor_span.enter();
|
||||||
|
|
||||||
|
let new_book = AddressBook {
|
||||||
by_addr: HashMap::default(),
|
by_addr: HashMap::default(),
|
||||||
by_time: BTreeSet::default(),
|
|
||||||
span,
|
span,
|
||||||
}
|
};
|
||||||
|
|
||||||
|
new_book.update_metrics();
|
||||||
|
new_book
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the contents of `self` in random order with sanitized timestamps.
|
/// Get the contents of `self` in random order with sanitized timestamps.
|
||||||
pub fn sanitized(&self) -> Vec<MetaAddr> {
|
pub fn sanitized(&self) -> Vec<MetaAddr> {
|
||||||
use rand::seq::SliceRandom;
|
use rand::seq::SliceRandom;
|
||||||
|
let _guard = self.span.enter();
|
||||||
let mut peers = self.peers().map(MetaAddr::sanitize).collect::<Vec<_>>();
|
let mut peers = self.peers().map(MetaAddr::sanitize).collect::<Vec<_>>();
|
||||||
peers.shuffle(&mut rand::thread_rng());
|
peers.shuffle(&mut rand::thread_rng());
|
||||||
peers
|
peers
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Check consistency of the address book invariants or panic, doing work
|
|
||||||
/// quadratic in the address book size.
|
|
||||||
#[cfg(test)]
|
|
||||||
fn assert_consistency(&self) {
|
|
||||||
for (a, (t, s)) in self.by_addr.iter() {
|
|
||||||
for meta in self.by_time.iter().filter(|meta| meta.addr == *a) {
|
|
||||||
if meta.last_seen != *t || meta.services != *s {
|
|
||||||
panic!("meta {:?} is not {:?}, {:?}, {:?}", meta, a, t, s);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns true if the address book has an entry for `addr`.
|
/// Returns true if the address book has an entry for `addr`.
|
||||||
pub fn contains_addr(&self, addr: &SocketAddr) -> bool {
|
pub fn contains_addr(&self, addr: &SocketAddr) -> bool {
|
||||||
let _guard = self.span.enter();
|
let _guard = self.span.enter();
|
||||||
|
|
@ -65,131 +57,205 @@ impl AddressBook {
|
||||||
/// Returns the entry corresponding to `addr`, or `None` if it does not exist.
|
/// Returns the entry corresponding to `addr`, or `None` if it does not exist.
|
||||||
pub fn get_by_addr(&self, addr: SocketAddr) -> Option<MetaAddr> {
|
pub fn get_by_addr(&self, addr: SocketAddr) -> Option<MetaAddr> {
|
||||||
let _guard = self.span.enter();
|
let _guard = self.span.enter();
|
||||||
let (last_seen, services) = self.by_addr.get(&addr).cloned()?;
|
self.by_addr.get(&addr).cloned()
|
||||||
Some(MetaAddr {
|
|
||||||
addr,
|
|
||||||
last_seen,
|
|
||||||
services,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Add `new` to the address book, updating the previous entry if `new` is
|
/// Add `new` to the address book, updating the previous entry if `new` is
|
||||||
/// more recent or discarding `new` if it is stale.
|
/// more recent or discarding `new` if it is stale.
|
||||||
|
///
|
||||||
|
/// ## Note
|
||||||
|
///
|
||||||
|
/// All changes should go through `update` or `take`, to ensure accurate metrics.
|
||||||
pub fn update(&mut self, new: MetaAddr) {
|
pub fn update(&mut self, new: MetaAddr) {
|
||||||
let _guard = self.span.enter();
|
let _guard = self.span.enter();
|
||||||
trace!(
|
trace!(
|
||||||
?new,
|
?new,
|
||||||
data.total = self.by_time.len(),
|
total_peers = self.by_addr.len(),
|
||||||
data.recent = (self.by_time.len() - self.disconnected_peers().count()),
|
recent_peers = self.recently_live_peers().count(),
|
||||||
);
|
);
|
||||||
#[cfg(test)]
|
|
||||||
self.assert_consistency();
|
|
||||||
|
|
||||||
if let Some(prev) = self.get_by_addr(new.addr) {
|
if let Some(prev) = self.get_by_addr(new.addr) {
|
||||||
if prev.last_seen > new.last_seen {
|
if prev.last_seen > new.last_seen {
|
||||||
return;
|
return;
|
||||||
} else {
|
|
||||||
self.by_time
|
|
||||||
.take(&prev)
|
|
||||||
.expect("cannot have by_addr entry without by_time entry");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.by_time.insert(new);
|
|
||||||
self.by_addr.insert(new.addr, (new.last_seen, new.services));
|
|
||||||
|
|
||||||
#[cfg(test)]
|
self.by_addr.insert(new.addr, new);
|
||||||
self.assert_consistency();
|
self.update_metrics();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Removes the entry with `addr`, returning it if it exists
|
||||||
|
///
|
||||||
|
/// ## Note
|
||||||
|
///
|
||||||
|
/// All changes should go through `update` or `take`, to ensure accurate metrics.
|
||||||
|
fn take(&mut self, removed_addr: SocketAddr) -> Option<MetaAddr> {
|
||||||
|
let _guard = self.span.enter();
|
||||||
|
trace!(
|
||||||
|
?removed_addr,
|
||||||
|
total_peers = self.by_addr.len(),
|
||||||
|
recent_peers = self.recently_live_peers().count(),
|
||||||
|
);
|
||||||
|
|
||||||
|
if let Some(entry) = self.by_addr.remove(&removed_addr) {
|
||||||
|
self.update_metrics();
|
||||||
|
Some(entry)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Compute a cutoff time that can determine whether an entry
|
/// Compute a cutoff time that can determine whether an entry
|
||||||
/// in an address book being updated with peer message timestamps
|
/// in an address book being updated with peer message timestamps
|
||||||
/// represents a known-disconnected peer or a potentially-connected peer.
|
/// represents a likely-dead (or hung) peer, or a potentially-connected peer.
|
||||||
///
|
///
|
||||||
/// [`constants::LIVE_PEER_DURATION`] represents the time interval in which
|
/// [`constants::LIVE_PEER_DURATION`] represents the time interval in which
|
||||||
/// we are guaranteed to receive at least one message from a peer or close
|
/// we should receive at least one message from a peer, or close the
|
||||||
/// the connection. Therefore, if the last-seen timestamp is older than
|
/// connection. Therefore, if the last-seen timestamp is older than
|
||||||
/// [`constants::LIVE_PEER_DURATION`] ago, we know we must have disconnected
|
/// [`constants::LIVE_PEER_DURATION`] ago, we know we should have
|
||||||
/// from it. Otherwise, we could potentially be connected to it.
|
/// disconnected from it. Otherwise, we could potentially be connected to it.
|
||||||
fn cutoff_time() -> DateTime<Utc> {
|
fn liveness_cutoff_time() -> DateTime<Utc> {
|
||||||
// chrono uses signed durations while stdlib uses unsigned durations
|
// chrono uses signed durations while stdlib uses unsigned durations
|
||||||
use chrono::Duration as CD;
|
use chrono::Duration as CD;
|
||||||
Utc::now() - CD::from_std(constants::LIVE_PEER_DURATION).unwrap()
|
Utc::now() - CD::from_std(constants::LIVE_PEER_DURATION).unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Used for range bounds, see cutoff_time
|
/// Returns true if the given [`SocketAddr`] has recently sent us a message.
|
||||||
fn cutoff_meta() -> MetaAddr {
|
pub fn recently_live_addr(&self, addr: &SocketAddr) -> bool {
|
||||||
use std::net::{IpAddr, Ipv4Addr};
|
|
||||||
MetaAddr {
|
|
||||||
last_seen: AddressBook::cutoff_time(),
|
|
||||||
// The ordering on MetaAddrs is newest-first, then arbitrary,
|
|
||||||
// so any fields will do here.
|
|
||||||
addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0),
|
|
||||||
services: PeerServices::default(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns true if the given [`SocketAddr`] could potentially be connected
|
|
||||||
/// to a node feeding timestamps into this address book.
|
|
||||||
pub fn is_potentially_connected(&self, addr: &SocketAddr) -> bool {
|
|
||||||
let _guard = self.span.enter();
|
let _guard = self.span.enter();
|
||||||
match self.by_addr.get(addr) {
|
match self.by_addr.get(addr) {
|
||||||
None => false,
|
None => false,
|
||||||
Some((ref last_seen, _)) => last_seen > &AddressBook::cutoff_time(),
|
// NeverAttempted, Failed, and AttemptPending peers should never be live
|
||||||
|
Some(peer) => {
|
||||||
|
peer.last_connection_state == PeerAddrState::Responded
|
||||||
|
&& peer.last_seen > AddressBook::liveness_cutoff_time()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return an iterator over all peers, ordered from most recently seen to
|
/// Returns true if the given [`SocketAddr`] is pending a reconnection
|
||||||
/// least recently seen.
|
/// attempt.
|
||||||
|
pub fn pending_reconnection_addr(&self, addr: &SocketAddr) -> bool {
|
||||||
|
let _guard = self.span.enter();
|
||||||
|
match self.by_addr.get(addr) {
|
||||||
|
None => false,
|
||||||
|
Some(peer) => peer.last_connection_state == PeerAddrState::AttemptPending,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns true if the given [`SocketAddr`] might be connected to a node
|
||||||
|
/// feeding timestamps into this address book.
|
||||||
|
pub fn maybe_connected_addr(&self, addr: &SocketAddr) -> bool {
|
||||||
|
self.recently_live_addr(addr) || self.pending_reconnection_addr(addr)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return an iterator over all peers.
|
||||||
|
///
|
||||||
|
/// Returns peers in reconnection attempt order, then recently live peers in
|
||||||
|
/// an arbitrary order.
|
||||||
pub fn peers(&'_ self) -> impl Iterator<Item = MetaAddr> + '_ {
|
pub fn peers(&'_ self) -> impl Iterator<Item = MetaAddr> + '_ {
|
||||||
let _guard = self.span.enter();
|
let _guard = self.span.enter();
|
||||||
self.by_time.iter().rev().cloned()
|
self.reconnection_peers()
|
||||||
|
.chain(self.maybe_connected_peers())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return an iterator over peers known to be disconnected, ordered from most
|
/// Return an iterator over peers that are due for a reconnection attempt,
|
||||||
/// recently seen to least recently seen.
|
/// in reconnection attempt order.
|
||||||
pub fn disconnected_peers(&'_ self) -> impl Iterator<Item = MetaAddr> + '_ {
|
pub fn reconnection_peers(&'_ self) -> impl Iterator<Item = MetaAddr> + '_ {
|
||||||
let _guard = self.span.enter();
|
let _guard = self.span.enter();
|
||||||
use std::ops::Bound::{Excluded, Unbounded};
|
|
||||||
|
|
||||||
self.by_time
|
// TODO: optimise, if needed, or get rid of older peers
|
||||||
.range((Excluded(Self::cutoff_meta()), Unbounded))
|
|
||||||
.rev()
|
// Skip live peers, and peers pending a reconnect attempt, then sort using BTreeSet
|
||||||
|
self.by_addr
|
||||||
|
.values()
|
||||||
|
.filter(move |peer| !self.maybe_connected_addr(&peer.addr))
|
||||||
|
.collect::<BTreeSet<_>>()
|
||||||
|
.into_iter()
|
||||||
.cloned()
|
.cloned()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return an iterator over peers that could potentially be connected, ordered from most
|
/// Return an iterator over all the peers in `state`, in arbitrary order.
|
||||||
/// recently seen to least recently seen.
|
pub fn state_peers(&'_ self, state: PeerAddrState) -> impl Iterator<Item = MetaAddr> + '_ {
|
||||||
pub fn potentially_connected_peers(&'_ self) -> impl Iterator<Item = MetaAddr> + '_ {
|
|
||||||
let _guard = self.span.enter();
|
let _guard = self.span.enter();
|
||||||
use std::ops::Bound::{Included, Unbounded};
|
|
||||||
|
|
||||||
self.by_time
|
self.by_addr
|
||||||
.range((Unbounded, Included(Self::cutoff_meta())))
|
.values()
|
||||||
.rev()
|
.filter(move |peer| peer.last_connection_state == state)
|
||||||
.cloned()
|
.cloned()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns an iterator that drains entries from the address book, removing
|
/// Return an iterator over peers that might be connected, in arbitrary
|
||||||
/// them in order from most recent to least recent.
|
/// order.
|
||||||
pub fn drain_newest(&'_ mut self) -> impl Iterator<Item = MetaAddr> + '_ {
|
pub fn maybe_connected_peers(&'_ self) -> impl Iterator<Item = MetaAddr> + '_ {
|
||||||
Drain {
|
let _guard = self.span.enter();
|
||||||
book: self,
|
|
||||||
newest_first: true,
|
self.by_addr
|
||||||
}
|
.values()
|
||||||
|
.filter(move |peer| self.maybe_connected_addr(&peer.addr))
|
||||||
|
.cloned()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns an iterator that drains entries from the address book, removing
|
/// Return an iterator over peers we've seen recently, in arbitrary order.
|
||||||
/// them in order from least recent to most recent.
|
pub fn recently_live_peers(&'_ self) -> impl Iterator<Item = MetaAddr> + '_ {
|
||||||
pub fn drain_oldest(&'_ mut self) -> impl Iterator<Item = MetaAddr> + '_ {
|
let _guard = self.span.enter();
|
||||||
Drain {
|
|
||||||
book: self,
|
self.by_addr
|
||||||
newest_first: false,
|
.values()
|
||||||
}
|
.filter(move |peer| self.recently_live_addr(&peer.addr))
|
||||||
|
.cloned()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns an iterator that drains entries from the address book.
|
||||||
|
///
|
||||||
|
/// Removes entries in reconnection attempt then arbitrary order,
|
||||||
|
/// see [`peers`] for details.
|
||||||
|
pub fn drain(&'_ mut self) -> impl Iterator<Item = MetaAddr> + '_ {
|
||||||
|
Drain { book: self }
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the number of entries in this address book.
|
/// Returns the number of entries in this address book.
|
||||||
pub fn len(&self) -> usize {
|
pub fn len(&self) -> usize {
|
||||||
self.by_time.len()
|
self.by_addr.len()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Update the metrics for this address book.
|
||||||
|
fn update_metrics(&self) {
|
||||||
|
let _guard = self.span.enter();
|
||||||
|
|
||||||
|
let responded = self.state_peers(PeerAddrState::Responded).count();
|
||||||
|
let never_attempted = self.state_peers(PeerAddrState::NeverAttempted).count();
|
||||||
|
let failed = self.state_peers(PeerAddrState::Failed).count();
|
||||||
|
let pending = self.state_peers(PeerAddrState::AttemptPending).count();
|
||||||
|
|
||||||
|
let recently_live = self.recently_live_peers().count();
|
||||||
|
let recently_stopped_responding = responded
|
||||||
|
.checked_sub(recently_live)
|
||||||
|
.expect("all recently live peers must have responded");
|
||||||
|
|
||||||
|
// TODO: rename to address_book.responded.recently_live
|
||||||
|
metrics::gauge!("candidate_set.recently_live", recently_live as f64);
|
||||||
|
// TODO: rename to address_book.responded.stopped_responding
|
||||||
|
metrics::gauge!(
|
||||||
|
"candidate_set.disconnected",
|
||||||
|
recently_stopped_responding as f64
|
||||||
|
);
|
||||||
|
|
||||||
|
// TODO: rename to address_book.[state_name]
|
||||||
|
metrics::gauge!("candidate_set.responded", responded as f64);
|
||||||
|
metrics::gauge!("candidate_set.gossiped", never_attempted as f64);
|
||||||
|
metrics::gauge!("candidate_set.failed", failed as f64);
|
||||||
|
metrics::gauge!("candidate_set.pending", pending as f64);
|
||||||
|
|
||||||
|
debug!(
|
||||||
|
%recently_live,
|
||||||
|
%recently_stopped_responding,
|
||||||
|
%responded,
|
||||||
|
%never_attempted,
|
||||||
|
%failed,
|
||||||
|
%pending,
|
||||||
|
"address book peers"
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -206,23 +272,13 @@ impl Extend<MetaAddr> for AddressBook {
|
||||||
|
|
||||||
struct Drain<'a> {
|
struct Drain<'a> {
|
||||||
book: &'a mut AddressBook,
|
book: &'a mut AddressBook,
|
||||||
newest_first: bool,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> Iterator for Drain<'a> {
|
impl<'a> Iterator for Drain<'a> {
|
||||||
type Item = MetaAddr;
|
type Item = MetaAddr;
|
||||||
|
|
||||||
fn next(&mut self) -> Option<Self::Item> {
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
let next_item = if self.newest_first {
|
let next_item_addr = self.book.peers().next()?.addr;
|
||||||
*self.book.by_time.iter().next()?
|
self.book.take(next_item_addr)
|
||||||
} else {
|
|
||||||
*self.book.by_time.iter().rev().next()?
|
|
||||||
};
|
|
||||||
self.book.by_time.remove(&next_item);
|
|
||||||
self.book
|
|
||||||
.by_addr
|
|
||||||
.remove(&next_item.addr)
|
|
||||||
.expect("cannot have by_time entry without by_addr entry");
|
|
||||||
Some(next_item)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -79,6 +79,7 @@ pub use crate::{
|
||||||
address_book::AddressBook,
|
address_book::AddressBook,
|
||||||
config::Config,
|
config::Config,
|
||||||
isolated::connect_isolated,
|
isolated::connect_isolated,
|
||||||
|
meta_addr::PeerAddrState,
|
||||||
peer_set::init,
|
peer_set::init,
|
||||||
policies::{RetryErrors, RetryLimit},
|
policies::{RetryErrors, RetryLimit},
|
||||||
protocol::internal::{Request, Response},
|
protocol::internal::{Request, Response},
|
||||||
|
|
|
||||||
|
|
@ -15,6 +15,70 @@ use zebra_chain::serialization::{
|
||||||
|
|
||||||
use crate::protocol::types::PeerServices;
|
use crate::protocol::types::PeerServices;
|
||||||
|
|
||||||
|
/// Peer connection state, based on our interactions with the peer.
|
||||||
|
///
|
||||||
|
/// Zebra also tracks how recently a peer has sent us messages, and derives peer
|
||||||
|
/// liveness based on the current time. This derived state is tracked using
|
||||||
|
/// [`AddressBook::maybe_connected_peers`] and
|
||||||
|
/// [`AddressBook::reconnection_peers`].
|
||||||
|
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
|
||||||
|
pub enum PeerAddrState {
|
||||||
|
/// The peer has sent us a valid message.
|
||||||
|
///
|
||||||
|
/// Peers remain in this state, even if they stop responding to requests.
|
||||||
|
/// (Peer liveness is derived from the `last_seen` timestamp, and the current
|
||||||
|
/// time.)
|
||||||
|
Responded,
|
||||||
|
|
||||||
|
/// The peer's address has just been fetched from a DNS seeder, or via peer
|
||||||
|
/// gossip, but we haven't attempted to connect to it yet.
|
||||||
|
NeverAttempted,
|
||||||
|
|
||||||
|
/// The peer's TCP connection failed, or the peer sent us an unexpected
|
||||||
|
/// Zcash protocol message, so we failed the connection.
|
||||||
|
Failed,
|
||||||
|
|
||||||
|
/// We just started a connection attempt to this peer.
|
||||||
|
AttemptPending,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for PeerAddrState {
|
||||||
|
fn default() -> Self {
|
||||||
|
PeerAddrState::NeverAttempted
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Ord for PeerAddrState {
|
||||||
|
/// `PeerAddrState`s are sorted in approximate reconnection attempt
|
||||||
|
/// order, ignoring liveness.
|
||||||
|
///
|
||||||
|
/// See [`CandidateSet`] and [`MetaAddr::cmp`] for more details.
|
||||||
|
fn cmp(&self, other: &Self) -> Ordering {
|
||||||
|
use PeerAddrState::*;
|
||||||
|
match (self, other) {
|
||||||
|
(Responded, Responded)
|
||||||
|
| (NeverAttempted, NeverAttempted)
|
||||||
|
| (Failed, Failed)
|
||||||
|
| (AttemptPending, AttemptPending) => Ordering::Equal,
|
||||||
|
// We reconnect to `Responded` peers that have stopped sending messages,
|
||||||
|
// then `NeverAttempted` peers, then `Failed` peers
|
||||||
|
(Responded, _) => Ordering::Less,
|
||||||
|
(_, Responded) => Ordering::Greater,
|
||||||
|
(NeverAttempted, _) => Ordering::Less,
|
||||||
|
(_, NeverAttempted) => Ordering::Greater,
|
||||||
|
(Failed, _) => Ordering::Less,
|
||||||
|
(_, Failed) => Ordering::Greater,
|
||||||
|
// AttemptPending is covered by the other cases
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PartialOrd for PeerAddrState {
|
||||||
|
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||||
|
Some(self.cmp(other))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// An address with metadata on its advertised services and last-seen time.
|
/// An address with metadata on its advertised services and last-seen time.
|
||||||
///
|
///
|
||||||
/// [Bitcoin reference](https://en.bitcoin.it/wiki/Protocol_documentation#Network_address)
|
/// [Bitcoin reference](https://en.bitcoin.it/wiki/Protocol_documentation#Network_address)
|
||||||
|
|
@ -22,10 +86,41 @@ use crate::protocol::types::PeerServices;
|
||||||
pub struct MetaAddr {
|
pub struct MetaAddr {
|
||||||
/// The peer's address.
|
/// The peer's address.
|
||||||
pub addr: SocketAddr,
|
pub addr: SocketAddr,
|
||||||
|
|
||||||
/// The services advertised by the peer.
|
/// The services advertised by the peer.
|
||||||
|
///
|
||||||
|
/// The exact meaning depends on `last_connection_state`:
|
||||||
|
/// - `Responded`: the services advertised by this peer, the last time we
|
||||||
|
/// performed a handshake with it
|
||||||
|
/// - `NeverAttempted`: the unverified services provided by the remote peer
|
||||||
|
/// that sent us this address
|
||||||
|
/// - `Failed` or `AttemptPending`: unverified services via another peer,
|
||||||
|
/// or services advertised in a previous handshake
|
||||||
|
///
|
||||||
|
/// ## Security
|
||||||
|
///
|
||||||
|
/// `services` from `NeverAttempted` peers may be invalid due to outdated
|
||||||
|
/// records, older peer versions, or buggy or malicious peers.
|
||||||
pub services: PeerServices,
|
pub services: PeerServices,
|
||||||
/// When the peer was last seen.
|
|
||||||
|
/// The last time we interacted with this peer.
|
||||||
|
///
|
||||||
|
/// The exact meaning depends on `last_connection_state`:
|
||||||
|
/// - `Responded`: the last time we processed a message from this peer
|
||||||
|
/// - `NeverAttempted`: the unverified time provided by the remote peer
|
||||||
|
/// that sent us this address
|
||||||
|
/// - `Failed`: the last time we marked the peer as failed
|
||||||
|
/// - `AttemptPending`: the last time we queued the peer for a reconnection
|
||||||
|
/// attempt
|
||||||
|
///
|
||||||
|
/// ## Security
|
||||||
|
///
|
||||||
|
/// `last_seen` times from `NeverAttempted` peers may be invalid due to
|
||||||
|
/// clock skew, or buggy or malicious peers.
|
||||||
pub last_seen: DateTime<Utc>,
|
pub last_seen: DateTime<Utc>,
|
||||||
|
|
||||||
|
/// The outcome of our most recent communication attempt with this peer.
|
||||||
|
pub last_connection_state: PeerAddrState,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MetaAddr {
|
impl MetaAddr {
|
||||||
|
|
@ -34,29 +129,48 @@ impl MetaAddr {
|
||||||
let interval = crate::constants::TIMESTAMP_TRUNCATION_SECONDS;
|
let interval = crate::constants::TIMESTAMP_TRUNCATION_SECONDS;
|
||||||
let ts = self.last_seen.timestamp();
|
let ts = self.last_seen.timestamp();
|
||||||
self.last_seen = Utc.timestamp(ts - ts.rem_euclid(interval), 0);
|
self.last_seen = Utc.timestamp(ts - ts.rem_euclid(interval), 0);
|
||||||
|
self.last_connection_state = Default::default();
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Ord for MetaAddr {
|
impl Ord for MetaAddr {
|
||||||
/// `MetaAddr`s are sorted newest-first, and then in an arbitrary
|
/// `MetaAddr`s are sorted in approximate reconnection attempt order, but
|
||||||
/// but determinate total order.
|
/// with `Responded` peers sorted first as a group.
|
||||||
|
///
|
||||||
|
/// This order should not be used for reconnection attempts: use
|
||||||
|
/// [`AddressBook::reconnection_peers`] instead.
|
||||||
|
///
|
||||||
|
/// See [`CandidateSet`] for more details.
|
||||||
fn cmp(&self, other: &Self) -> Ordering {
|
fn cmp(&self, other: &Self) -> Ordering {
|
||||||
let newest_first = self.last_seen.cmp(&other.last_seen).reverse();
|
use std::net::IpAddr::{V4, V6};
|
||||||
newest_first.then_with(|| {
|
use PeerAddrState::*;
|
||||||
|
|
||||||
|
let oldest_first = self.last_seen.cmp(&other.last_seen);
|
||||||
|
let newest_first = oldest_first.reverse();
|
||||||
|
|
||||||
|
let connection_state = self.last_connection_state.cmp(&other.last_connection_state);
|
||||||
|
let reconnection_time = match self.last_connection_state {
|
||||||
|
Responded => oldest_first,
|
||||||
|
NeverAttempted => newest_first,
|
||||||
|
Failed => oldest_first,
|
||||||
|
AttemptPending => oldest_first,
|
||||||
|
};
|
||||||
|
let ip_numeric = match (self.addr.ip(), other.addr.ip()) {
|
||||||
|
(V4(a), V4(b)) => a.octets().cmp(&b.octets()),
|
||||||
|
(V6(a), V6(b)) => a.octets().cmp(&b.octets()),
|
||||||
|
(V4(_), V6(_)) => Ordering::Less,
|
||||||
|
(V6(_), V4(_)) => Ordering::Greater,
|
||||||
|
};
|
||||||
|
|
||||||
|
connection_state
|
||||||
|
.then(reconnection_time)
|
||||||
// The remainder is meaningless as an ordering, but required so that we
|
// The remainder is meaningless as an ordering, but required so that we
|
||||||
// have a total order on `MetaAddr` values: self and other must compare
|
// have a total order on `MetaAddr` values: self and other must compare
|
||||||
// as Ordering::Equal iff they are equal.
|
// as Ordering::Equal iff they are equal.
|
||||||
use std::net::IpAddr::{V4, V6};
|
.then(ip_numeric)
|
||||||
match (self.addr.ip(), other.addr.ip()) {
|
|
||||||
(V4(a), V4(b)) => a.octets().cmp(&b.octets()),
|
|
||||||
(V6(a), V6(b)) => a.octets().cmp(&b.octets()),
|
|
||||||
(V4(_), V6(_)) => Ordering::Less,
|
|
||||||
(V6(_), V4(_)) => Ordering::Greater,
|
|
||||||
}
|
|
||||||
.then(self.addr.port().cmp(&other.addr.port()))
|
.then(self.addr.port().cmp(&other.addr.port()))
|
||||||
.then(self.services.bits().cmp(&other.services.bits()))
|
.then(self.services.bits().cmp(&other.services.bits()))
|
||||||
})
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -82,6 +196,7 @@ impl ZcashDeserialize for MetaAddr {
|
||||||
// Discard unknown service bits.
|
// Discard unknown service bits.
|
||||||
services: PeerServices::from_bits_truncate(reader.read_u64::<LittleEndian>()?),
|
services: PeerServices::from_bits_truncate(reader.read_u64::<LittleEndian>()?),
|
||||||
addr: reader.read_socket_addr()?,
|
addr: reader.read_socket_addr()?,
|
||||||
|
last_connection_state: Default::default(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -94,16 +209,26 @@ mod tests {
|
||||||
fn sanitize_truncates_timestamps() {
|
fn sanitize_truncates_timestamps() {
|
||||||
zebra_test::init();
|
zebra_test::init();
|
||||||
|
|
||||||
|
let services = PeerServices::default();
|
||||||
|
let addr = "127.0.0.1:8233".parse().unwrap();
|
||||||
|
|
||||||
let entry = MetaAddr {
|
let entry = MetaAddr {
|
||||||
services: PeerServices::default(),
|
services,
|
||||||
addr: "127.0.0.1:8233".parse().unwrap(),
|
addr,
|
||||||
last_seen: Utc.timestamp(1_573_680_222, 0),
|
last_seen: Utc.timestamp(1_573_680_222, 0),
|
||||||
|
last_connection_state: PeerAddrState::Responded,
|
||||||
}
|
}
|
||||||
.sanitize();
|
.sanitize();
|
||||||
|
|
||||||
// We want the sanitized timestamp to be a multiple of the truncation interval.
|
// We want the sanitized timestamp to be a multiple of the truncation interval.
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
entry.last_seen.timestamp() % crate::constants::TIMESTAMP_TRUNCATION_SECONDS,
|
entry.last_seen.timestamp() % crate::constants::TIMESTAMP_TRUNCATION_SECONDS,
|
||||||
0
|
0
|
||||||
);
|
);
|
||||||
|
// We want the state to be the default
|
||||||
|
assert_eq!(entry.last_connection_state, Default::default());
|
||||||
|
// We want the other fields to be unmodified
|
||||||
|
assert_eq!(entry.addr, addr);
|
||||||
|
assert_eq!(entry.services, services);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -27,7 +27,7 @@ use crate::{
|
||||||
internal::{Request, Response},
|
internal::{Request, Response},
|
||||||
},
|
},
|
||||||
types::MetaAddr,
|
types::MetaAddr,
|
||||||
BoxError, Config,
|
BoxError, Config, PeerAddrState,
|
||||||
};
|
};
|
||||||
|
|
||||||
use super::{Client, Connection, ErrorSlot, HandshakeError, PeerError};
|
use super::{Client, Connection, ErrorSlot, HandshakeError, PeerError};
|
||||||
|
|
@ -390,6 +390,7 @@ where
|
||||||
addr,
|
addr,
|
||||||
services: remote_services,
|
services: remote_services,
|
||||||
last_seen: Utc::now(),
|
last_seen: Utc::now(),
|
||||||
|
last_connection_state: PeerAddrState::Responded,
|
||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -3,60 +3,73 @@ use std::sync::{Arc, Mutex};
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
use futures::stream::{FuturesUnordered, StreamExt};
|
use futures::stream::{FuturesUnordered, StreamExt};
|
||||||
use tower::{Service, ServiceExt};
|
use tower::{Service, ServiceExt};
|
||||||
use tracing::Level;
|
|
||||||
|
|
||||||
use crate::{types::MetaAddr, AddressBook, BoxError, Request, Response};
|
use crate::{types::MetaAddr, AddressBook, BoxError, PeerAddrState, Request, Response};
|
||||||
|
|
||||||
/// The `CandidateSet` maintains a pool of candidate peers.
|
/// The `CandidateSet` manages the `PeerSet`'s peer reconnection attempts.
|
||||||
///
|
///
|
||||||
/// It divides the set of all possible candidate peers into three disjoint subsets:
|
/// It divides the set of all possible candidate peers into disjoint subsets,
|
||||||
|
/// using the `PeerAddrState`:
|
||||||
///
|
///
|
||||||
/// 1. Disconnected peers, which we previously connected to but are not currently connected to;
|
/// 1. `Responded` peers, which we previously connected to. If we have not received
|
||||||
/// 2. Gossiped peers, which we learned about from other peers but have never connected to;
|
/// any messages from a `Responded` peer within a cutoff time, we assume that it
|
||||||
/// 3. Failed peers, to whom we attempted to connect but were unable to.
|
/// has disconnected or hung, and attempt reconnection;
|
||||||
|
/// 2. `NeverAttempted` peers, which we learned about from other peers or a DNS
|
||||||
|
/// seeder, but have never connected to;
|
||||||
|
/// 3. `Failed` peers, to whom we attempted to connect but were unable to;
|
||||||
|
/// 4. `AttemptPending` peers, which we've recently queued for reconnection.
|
||||||
///
|
///
|
||||||
/// ```ascii,no_run
|
/// ```ascii,no_run
|
||||||
/// ┌─────────────────┐
|
/// ┌──────────────────┐
|
||||||
/// │ PeerSet │
|
/// │ PeerSet │
|
||||||
/// │GetPeers Requests│
|
/// │GetPeers Responses│
|
||||||
/// └─────────────────┘
|
/// └──────────────────┘
|
||||||
/// │
|
/// │
|
||||||
/// │
|
/// │
|
||||||
/// │
|
/// │
|
||||||
/// │
|
/// │
|
||||||
/// ▼
|
/// ▼
|
||||||
/// ┌─────────────┐ filter by Λ filter by
|
/// filter by Λ
|
||||||
/// │ PeerSet │!contains_addr╱ ╲ !contains_addr
|
/// !contains_addr ╱ ╲
|
||||||
/// ┌──│ AddressBook │────────────▶▕ ▏◀───────────────────┐
|
/// ┌────────────────────────────▶▕ ▏
|
||||||
/// │ └─────────────┘ ╲ ╱ │
|
/// │ ╲ ╱
|
||||||
/// │ │ V │
|
/// │ V
|
||||||
/// │ │disconnected_peers │ │
|
/// │ │
|
||||||
/// │ ▼ │ │
|
/// │ │
|
||||||
/// │ Λ filter by │ │
|
/// │ │
|
||||||
/// │ ╱ ╲ !contains_addr │ │
|
/// │ │
|
||||||
/// │ ▕ ▏◀───────────────────┼──────────────────────┤
|
/// │ │
|
||||||
/// │ ╲ ╱ │ │
|
/// │ │
|
||||||
/// │ V │ │
|
/// │ │
|
||||||
/// │ │ │ │
|
/// │ │
|
||||||
/// │┌────────┼──────────────────────┼──────────────────────┼────────┐
|
/// ├───────────────────────────────┼───────────────────────────────┐
|
||||||
/// ││ ▼ ▼ │ │
|
/// │ PeerSet AddressBook ▼ │
|
||||||
/// ││ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
|
/// │ ┌─────────────┐ ┌────────────────┐ ┌─────────────┐ │
|
||||||
/// ││ │Disconnected │ │ Gossiped │ │Failed Peers │ │
|
/// │ │ Possibly │ │`NeverAttempted`│ │ `Failed` │ │
|
||||||
/// ││ │ Peers │ │ Peers │ │ AddressBook │◀┼┐
|
/// │ │Disconnected │ │ Peers │ │ Peers │◀┼┐
|
||||||
/// ││ │ AddressBook │ │ AddressBook │ │ │ ││
|
/// │ │ `Responded` │ │ │ │ │ ││
|
||||||
/// ││ └─────────────┘ └─────────────┘ └─────────────┘ ││
|
/// │ │ Peers │ │ │ │ │ ││
|
||||||
/// ││ │ │ │ ││
|
/// │ └─────────────┘ └────────────────┘ └─────────────┘ ││
|
||||||
/// ││ #1 drain_oldest #2 drain_newest #3 drain_oldest ││
|
/// │ │ │ │ ││
|
||||||
/// ││ │ │ │ ││
|
/// │ #1 oldest_first #2 newest_first #3 oldest_first ││
|
||||||
/// ││ ├──────────────────────┴──────────────────────┘ ││
|
/// │ │ │ │ ││
|
||||||
/// ││ │ disjoint candidate sets ││
|
/// │ ├──────────────────────┴──────────────────────┘ ││
|
||||||
/// │└────────┼──────────────────────────────────────────────────────┘│
|
/// │ │ disjoint `PeerAddrState`s ││
|
||||||
/// │ ▼ │
|
/// ├────────┼──────────────────────────────────────────────────────┘│
|
||||||
/// │ Λ │
|
/// │ ▼ │
|
||||||
/// │ ╱ ╲ filter by │
|
/// │ Λ │
|
||||||
/// └──────▶▕ ▏!is_potentially_connected │
|
/// │ ╱ ╲ filter by │
|
||||||
/// ╲ ╱ │
|
/// └─────▶▕ ▏!is_potentially_connected │
|
||||||
/// V │
|
/// ╲ ╱ to remove live │
|
||||||
|
/// V `Responded` peers │
|
||||||
|
/// │ │
|
||||||
|
/// │ │
|
||||||
|
/// ▼ │
|
||||||
|
/// ┌────────────────┐ │
|
||||||
|
/// │`AttemptPending`│ │
|
||||||
|
/// │ Peers │ │
|
||||||
|
/// │ │ │
|
||||||
|
/// └────────────────┘ │
|
||||||
/// │ │
|
/// │ │
|
||||||
/// │ │
|
/// │ │
|
||||||
/// ▼ │
|
/// ▼ │
|
||||||
|
|
@ -73,11 +86,20 @@ use crate::{types::MetaAddr, AddressBook, BoxError, Request, Response};
|
||||||
/// │peer::Client│
|
/// │peer::Client│
|
||||||
/// │to Discover │
|
/// │to Discover │
|
||||||
/// └────────────┘
|
/// └────────────┘
|
||||||
|
/// │
|
||||||
|
/// │
|
||||||
|
/// ▼
|
||||||
|
/// ┌───────────────────────────────────────┐
|
||||||
|
/// │ every time we receive a peer message: │
|
||||||
|
/// │ * update state to `Responded` │
|
||||||
|
/// │ * update last_seen to now() │
|
||||||
|
/// └───────────────────────────────────────┘
|
||||||
|
///
|
||||||
/// ```
|
/// ```
|
||||||
|
// TODO:
|
||||||
|
// * draw arrow from the "peer message" box into the `Responded` state box
|
||||||
|
// * make the "disjoint states" box include `AttemptPending`
|
||||||
pub(super) struct CandidateSet<S> {
|
pub(super) struct CandidateSet<S> {
|
||||||
pub(super) disconnected: AddressBook,
|
|
||||||
pub(super) gossiped: AddressBook,
|
|
||||||
pub(super) failed: AddressBook,
|
|
||||||
pub(super) peer_set: Arc<Mutex<AddressBook>>,
|
pub(super) peer_set: Arc<Mutex<AddressBook>>,
|
||||||
pub(super) peer_service: S,
|
pub(super) peer_service: S,
|
||||||
}
|
}
|
||||||
|
|
@ -87,16 +109,28 @@ where
|
||||||
S: Service<Request, Response = Response, Error = BoxError>,
|
S: Service<Request, Response = Response, Error = BoxError>,
|
||||||
S::Future: Send + 'static,
|
S::Future: Send + 'static,
|
||||||
{
|
{
|
||||||
|
/// Uses `peer_set` and `peer_service` to manage a [`CandidateSet`] of peers.
|
||||||
pub fn new(peer_set: Arc<Mutex<AddressBook>>, peer_service: S) -> CandidateSet<S> {
|
pub fn new(peer_set: Arc<Mutex<AddressBook>>, peer_service: S) -> CandidateSet<S> {
|
||||||
CandidateSet {
|
CandidateSet {
|
||||||
disconnected: AddressBook::new(span!(Level::TRACE, "disconnected peers")),
|
|
||||||
gossiped: AddressBook::new(span!(Level::TRACE, "gossiped peers")),
|
|
||||||
failed: AddressBook::new(span!(Level::TRACE, "failed peers")),
|
|
||||||
peer_set,
|
peer_set,
|
||||||
peer_service,
|
peer_service,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Update the peer set from the network.
|
||||||
|
///
|
||||||
|
/// - Ask a few live `Responded` peers to send us more peers.
|
||||||
|
/// - Process all completed peer responses, adding new peers in the
|
||||||
|
/// `NeverAttempted` state.
|
||||||
|
///
|
||||||
|
/// ## Correctness
|
||||||
|
///
|
||||||
|
/// The handshaker sets up the peer message receiver so it also sends a
|
||||||
|
/// `Responded` peer address update.
|
||||||
|
///
|
||||||
|
/// `report_failed` puts peers into the `Failed` state.
|
||||||
|
///
|
||||||
|
/// `next` puts peers into the `AttemptPending` state.
|
||||||
pub async fn update(&mut self) -> Result<(), BoxError> {
|
pub async fn update(&mut self) -> Result<(), BoxError> {
|
||||||
// Opportunistically crawl the network on every update call to ensure
|
// Opportunistically crawl the network on every update call to ensure
|
||||||
// we're actively fetching peers. Continue independently of whether we
|
// we're actively fetching peers. Continue independently of whether we
|
||||||
|
|
@ -113,60 +147,62 @@ where
|
||||||
responses.push(self.peer_service.call(Request::Peers));
|
responses.push(self.peer_service.call(Request::Peers));
|
||||||
}
|
}
|
||||||
while let Some(rsp) = responses.next().await {
|
while let Some(rsp) = responses.next().await {
|
||||||
if let Ok(Response::Peers(addrs)) = rsp {
|
if let Ok(Response::Peers(rsp_addrs)) = rsp {
|
||||||
let addr_len = addrs.len();
|
|
||||||
let prev_len = self.gossiped.len();
|
|
||||||
// Filter new addresses to ensure that gossiped addresses are actually new
|
// Filter new addresses to ensure that gossiped addresses are actually new
|
||||||
let failed = &self.failed;
|
|
||||||
let peer_set = &self.peer_set;
|
let peer_set = &self.peer_set;
|
||||||
let new_addrs = addrs
|
let new_addrs = rsp_addrs
|
||||||
.into_iter()
|
.iter()
|
||||||
.filter(|meta| !failed.contains_addr(&meta.addr))
|
.filter(|meta| !peer_set.lock().unwrap().contains_addr(&meta.addr))
|
||||||
.filter(|meta| !peer_set.lock().unwrap().contains_addr(&meta.addr));
|
.collect::<Vec<_>>();
|
||||||
self.gossiped.extend(new_addrs);
|
|
||||||
trace!(
|
trace!(
|
||||||
addr_len,
|
?rsp_addrs,
|
||||||
new_addrs = self.gossiped.len() - prev_len,
|
new_addr_count = ?new_addrs.len(),
|
||||||
"got response to GetPeers"
|
"got response to GetPeers"
|
||||||
);
|
);
|
||||||
|
// New addresses are deserialized in the `NeverAttempted` state
|
||||||
|
peer_set
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.extend(new_addrs.into_iter().cloned());
|
||||||
} else {
|
} else {
|
||||||
trace!("got error in GetPeers request");
|
trace!("got error in GetPeers request");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Determine whether any known peers have recently disconnected.
|
|
||||||
let failed = &self.failed;
|
|
||||||
let peer_set = &self.peer_set;
|
|
||||||
self.disconnected.extend(
|
|
||||||
peer_set
|
|
||||||
.lock()
|
|
||||||
.expect("mutex must be unpoisoned")
|
|
||||||
.disconnected_peers()
|
|
||||||
.filter(|meta| failed.contains_addr(&meta.addr)),
|
|
||||||
);
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns the next candidate for a connection attempt, if any are available.
|
||||||
|
///
|
||||||
|
/// Returns peers in this order:
|
||||||
|
/// - oldest `Responded` that are not live
|
||||||
|
/// - newest `NeverAttempted`
|
||||||
|
/// - oldest `Failed`
|
||||||
|
///
|
||||||
|
/// Skips `AttemptPending` peers and live `Responded` peers.
|
||||||
|
///
|
||||||
|
/// ## Correctness
|
||||||
|
///
|
||||||
|
/// `AttemptPending` peers will become `Responded` if they respond, or
|
||||||
|
/// become `Failed` if they time out or provide a bad response.
|
||||||
|
///
|
||||||
|
/// Live `Responded` peers will stay live if they keep responding, or
|
||||||
|
/// become a reconnection candidate if they stop responding.
|
||||||
pub fn next(&mut self) -> Option<MetaAddr> {
|
pub fn next(&mut self) -> Option<MetaAddr> {
|
||||||
metrics::gauge!("candidate_set.disconnected", self.disconnected.len() as f64);
|
let mut peer_set_guard = self.peer_set.lock().unwrap();
|
||||||
metrics::gauge!("candidate_set.gossiped", self.gossiped.len() as f64);
|
let mut reconnect = peer_set_guard.reconnection_peers().next()?;
|
||||||
metrics::gauge!("candidate_set.failed", self.failed.len() as f64);
|
|
||||||
debug!(
|
reconnect.last_seen = Utc::now();
|
||||||
disconnected_peers = self.disconnected.len(),
|
reconnect.last_connection_state = PeerAddrState::AttemptPending;
|
||||||
gossiped_peers = self.gossiped.len(),
|
peer_set_guard.update(reconnect);
|
||||||
failed_peers = self.failed.len()
|
|
||||||
);
|
Some(reconnect)
|
||||||
let guard = self.peer_set.lock().unwrap();
|
|
||||||
self.disconnected
|
|
||||||
.drain_oldest()
|
|
||||||
.chain(self.gossiped.drain_newest())
|
|
||||||
.chain(self.failed.drain_oldest())
|
|
||||||
.find(|meta| !guard.is_potentially_connected(&meta.addr))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Mark `addr` as a failed peer.
|
||||||
pub fn report_failed(&mut self, mut addr: MetaAddr) {
|
pub fn report_failed(&mut self, mut addr: MetaAddr) {
|
||||||
addr.last_seen = Utc::now();
|
addr.last_seen = Utc::now();
|
||||||
self.failed.update(addr);
|
addr.last_connection_state = PeerAddrState::Failed;
|
||||||
|
self.peer_set.lock().unwrap().update(addr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue