Security: Limit reconnection rate to individual peers (#2275)
* Security: Limit reconnection rate to individual peers Reconnection Rate Limit the reconnection rate to each individual peer by applying the liveness cutoff to the attempt, responded, and failure time fields. If any field is recent, the peer is skipped. The new liveness cutoff skips any peers that have recently been attempted or failed. (Previously, the liveness check was only applied if the peer was in the `Responded` state, which could lead to repeated retries of `Failed` peers, particularly in small address books.) Reconnection Order Zebra prefers more useful peer states, then the earliest attempted, failed, and responded times, then the most recent gossiped last seen times. Before this change, Zebra took the most recent time in all the peer time fields, and used that time for liveness and ordering. This led to confusion between trusted and untrusted data, and success and failure times. Unlike the previous order, the new order: - tries all peers in each state, before re-trying any peer in that state, and - only checks the the gossiped untrusted last seen time if all other times are equal. * Preserve the later time if changes arrive out of order * Update CandidateSet::next documentation * Update CandidateSet state diagram * Fix variant names in comments * Explain why timestamps can be left out of MetaAddrChanges * Add a simple test for the individual peer retry limit * Only generate valid Arbitrary PeerServices values * Add an individual peer retry limit AddressBook and CandidateSet test * Stop deleting recently live addresses from the address book If we delete recently live addresses from the address book, we can get a new entry for them, and reconnect too rapidly. * Rename functions to match similar tokio API * Fix docs for service sorting * Clarify a comment * Cleanup a variable and comments * Remove blank lines in the CandidateSet state diagram * Add a multi-peer proptest that checks outbound attempt fairness * Fix a comment typo Co-authored-by: Janito Vaqueiro Ferreira Filho <janito.vff@gmail.com> * Simplify time maths in MetaAddr * Create a Duration32 type to simplify calculations and comparisons * Rename variables for clarity * Split a string constant into multiple lines * Make constants match rustdoc order Co-authored-by: Janito Vaqueiro Ferreira Filho <janito.vff@gmail.com>
This commit is contained in:
parent
6396ac27d8
commit
4d22a0bae9
|
|
@ -17,7 +17,23 @@ pub struct DateTime32 {
|
|||
timestamp: u32,
|
||||
}
|
||||
|
||||
/// An unsigned time duration, represented by a 32-bit number of seconds.
|
||||
#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
|
||||
pub struct Duration32 {
|
||||
seconds: u32,
|
||||
}
|
||||
|
||||
impl DateTime32 {
|
||||
/// The earliest possible `DateTime32` value.
|
||||
pub const MIN: DateTime32 = DateTime32 {
|
||||
timestamp: u32::MIN,
|
||||
};
|
||||
|
||||
/// The latest possible `DateTime32` value.
|
||||
pub const MAX: DateTime32 = DateTime32 {
|
||||
timestamp: u32::MAX,
|
||||
};
|
||||
|
||||
/// Returns the number of seconds since the UNIX epoch.
|
||||
pub fn timestamp(&self) -> u32 {
|
||||
self.timestamp
|
||||
|
|
@ -34,6 +50,57 @@ impl DateTime32 {
|
|||
.try_into()
|
||||
.expect("unexpected out of range chrono::DateTime")
|
||||
}
|
||||
|
||||
/// Returns the number of seconds elapsed between `earlier` and this time,
|
||||
/// or `None` if `earlier` is later than this time.
|
||||
pub fn checked_duration_since(&self, earlier: DateTime32) -> Option<Duration32> {
|
||||
self.timestamp
|
||||
.checked_sub(earlier.timestamp)
|
||||
.map(|seconds| Duration32 { seconds })
|
||||
}
|
||||
|
||||
/// Returns the number of seconds elapsed between `earlier` and this time,
|
||||
/// or zero if `earlier` is later than this time.
|
||||
pub fn saturating_duration_since(&self, earlier: DateTime32) -> Duration32 {
|
||||
Duration32 {
|
||||
seconds: self.timestamp.saturating_sub(earlier.timestamp),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the number of seconds elapsed since this time,
|
||||
/// or if this time is in the future, returns `None`.
|
||||
pub fn checked_elapsed(&self) -> Option<Duration32> {
|
||||
DateTime32::now().checked_duration_since(*self)
|
||||
}
|
||||
|
||||
/// Returns the number of seconds elapsed since this time,
|
||||
/// or if this time is in the future, returns zero.
|
||||
pub fn saturating_elapsed(&self) -> Duration32 {
|
||||
DateTime32::now().saturating_duration_since(*self)
|
||||
}
|
||||
}
|
||||
|
||||
impl Duration32 {
|
||||
/// The earliest possible `Duration32` value.
|
||||
pub const MIN: Duration32 = Duration32 { seconds: u32::MIN };
|
||||
|
||||
/// The latest possible `Duration32` value.
|
||||
pub const MAX: Duration32 = Duration32 { seconds: u32::MAX };
|
||||
|
||||
/// Returns the number of seconds.
|
||||
pub fn seconds(&self) -> u32 {
|
||||
self.seconds
|
||||
}
|
||||
|
||||
/// Returns the equivalent [`chrono::Duration`].
|
||||
pub fn to_chrono(self) -> chrono::Duration {
|
||||
self.into()
|
||||
}
|
||||
|
||||
/// Returns the equivalent [`std::time::Duration`].
|
||||
pub fn to_std(self) -> std::time::Duration {
|
||||
self.into()
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for DateTime32 {
|
||||
|
|
@ -45,6 +112,15 @@ impl fmt::Debug for DateTime32 {
|
|||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for Duration32 {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("Duration32")
|
||||
.field("seconds", &self.seconds)
|
||||
.field("calendar", &chrono::Duration::from(*self))
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<u32> for DateTime32 {
|
||||
fn from(value: u32) -> Self {
|
||||
DateTime32 { timestamp: value }
|
||||
|
|
@ -70,6 +146,44 @@ impl From<&DateTime32> for chrono::DateTime<Utc> {
|
|||
}
|
||||
}
|
||||
|
||||
impl From<u32> for Duration32 {
|
||||
fn from(value: u32) -> Self {
|
||||
Duration32 { seconds: value }
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&u32> for Duration32 {
|
||||
fn from(value: &u32) -> Self {
|
||||
(*value).into()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Duration32> for chrono::Duration {
|
||||
fn from(value: Duration32) -> Self {
|
||||
// chrono::Duration is guaranteed to hold 32-bit values
|
||||
chrono::Duration::seconds(value.seconds.into())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&Duration32> for chrono::Duration {
|
||||
fn from(value: &Duration32) -> Self {
|
||||
(*value).into()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Duration32> for std::time::Duration {
|
||||
fn from(value: Duration32) -> Self {
|
||||
// std::time::Duration is guaranteed to hold 32-bit values
|
||||
std::time::Duration::from_secs(value.seconds.into())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&Duration32> for std::time::Duration {
|
||||
fn from(value: &Duration32) -> Self {
|
||||
(*value).into()
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<chrono::DateTime<Utc>> for DateTime32 {
|
||||
type Error = TryFromIntError;
|
||||
|
||||
|
|
@ -94,6 +208,54 @@ impl TryFrom<&chrono::DateTime<Utc>> for DateTime32 {
|
|||
}
|
||||
}
|
||||
|
||||
impl TryFrom<chrono::Duration> for Duration32 {
|
||||
type Error = TryFromIntError;
|
||||
|
||||
/// Convert from a [`chrono::Duration`] to a [`Duration32`], discarding any nanoseconds.
|
||||
///
|
||||
/// Conversion fails if the number of seconds is outside the `u32` range.
|
||||
fn try_from(value: chrono::Duration) -> Result<Self, Self::Error> {
|
||||
Ok(Self {
|
||||
seconds: value.num_seconds().try_into()?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&chrono::Duration> for Duration32 {
|
||||
type Error = TryFromIntError;
|
||||
|
||||
/// Convert from a [`chrono::Duration`] to a [`Duration32`], discarding any nanoseconds.
|
||||
///
|
||||
/// Conversion fails if the number of seconds is outside the `u32` range.
|
||||
fn try_from(value: &chrono::Duration) -> Result<Self, Self::Error> {
|
||||
(*value).try_into()
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<std::time::Duration> for Duration32 {
|
||||
type Error = TryFromIntError;
|
||||
|
||||
/// Convert from a [`std::time::Duration`] to a [`Duration32`], discarding any nanoseconds.
|
||||
///
|
||||
/// Conversion fails if the number of seconds is outside the `u32` range.
|
||||
fn try_from(value: std::time::Duration) -> Result<Self, Self::Error> {
|
||||
Ok(Self {
|
||||
seconds: value.as_secs().try_into()?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&std::time::Duration> for Duration32 {
|
||||
type Error = TryFromIntError;
|
||||
|
||||
/// Convert from a [`std::time::Duration`] to a [`Duration32`], discarding any nanoseconds.
|
||||
///
|
||||
/// Conversion fails if the number of seconds is outside the `u32` range.
|
||||
fn try_from(value: &std::time::Duration) -> Result<Self, Self::Error> {
|
||||
(*value).try_into()
|
||||
}
|
||||
}
|
||||
|
||||
impl ZcashSerialize for DateTime32 {
|
||||
fn zcash_serialize<W: std::io::Write>(&self, mut writer: W) -> Result<(), std::io::Error> {
|
||||
writer.write_u32::<LittleEndian>(self.timestamp)
|
||||
|
|
|
|||
|
|
@ -8,10 +8,9 @@ use std::{
|
|||
time::Instant,
|
||||
};
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use tracing::Span;
|
||||
|
||||
use crate::{constants, meta_addr::MetaAddrChange, types::MetaAddr, Config, PeerAddrState};
|
||||
use crate::{meta_addr::MetaAddrChange, types::MetaAddr, Config, PeerAddrState};
|
||||
|
||||
/// A database of peer listener addresses, their advertised services, and
|
||||
/// information on when they were last seen.
|
||||
|
|
@ -104,6 +103,28 @@ impl AddressBook {
|
|||
new_book
|
||||
}
|
||||
|
||||
/// Construct an [`AddressBook`] with the given [`Config`],
|
||||
/// [`tracing::Span`], and addresses.
|
||||
///
|
||||
/// This constructor can be used to break address book invariants,
|
||||
/// so it should only be used in tests.
|
||||
#[cfg(any(test, feature = "proptest-impl"))]
|
||||
pub fn new_with_addrs(
|
||||
config: &Config,
|
||||
span: Span,
|
||||
addrs: impl IntoIterator<Item = MetaAddr>,
|
||||
) -> AddressBook {
|
||||
let mut new_book = AddressBook::new(config, span);
|
||||
|
||||
let addrs = addrs
|
||||
.into_iter()
|
||||
.map(|meta_addr| (meta_addr.addr, meta_addr));
|
||||
new_book.by_addr.extend(addrs);
|
||||
|
||||
new_book.update_metrics();
|
||||
new_book
|
||||
}
|
||||
|
||||
/// Get the local listener address.
|
||||
pub fn get_local_listener(&self) -> MetaAddrChange {
|
||||
MetaAddr::new_local_listener(&self.local_listener)
|
||||
|
|
@ -140,6 +161,15 @@ impl AddressBook {
|
|||
///
|
||||
/// All changes should go through `update`, so that the address book
|
||||
/// only contains valid outbound addresses.
|
||||
///
|
||||
/// # Security
|
||||
///
|
||||
/// This function must apply every attempted, responded, and failed change
|
||||
/// to the address book. This prevents rapid reconnections to the same peer.
|
||||
///
|
||||
/// As an exception, this function can ignore all changes for specific
|
||||
/// [`SocketAddr`]s. Ignored addresses will never be used to connect to
|
||||
/// peers.
|
||||
pub fn update(&mut self, change: MetaAddrChange) -> Option<MetaAddr> {
|
||||
let _guard = self.span.enter();
|
||||
|
||||
|
|
@ -155,19 +185,22 @@ impl AddressBook {
|
|||
);
|
||||
|
||||
if let Some(updated) = updated {
|
||||
// If a node that we are directly connected to has changed to a client,
|
||||
// remove it from the address book.
|
||||
if updated.is_direct_client() && previous.is_some() {
|
||||
std::mem::drop(_guard);
|
||||
self.take(updated.addr);
|
||||
// Ignore invalid outbound addresses.
|
||||
// (Inbound connections can be monitored via Zebra's metrics.)
|
||||
if !updated.address_is_valid_for_outbound() {
|
||||
return None;
|
||||
}
|
||||
|
||||
// Never add unspecified addresses or client services.
|
||||
// Ignore invalid outbound services and other info,
|
||||
// but only if the peer has never been attempted.
|
||||
//
|
||||
// Communication with these addresses can be monitored via Zebra's
|
||||
// metrics. (The address book is for valid peer addresses.)
|
||||
if !updated.is_valid_for_outbound() {
|
||||
// Otherwise, if we got the info directly from the peer,
|
||||
// store it in the address book, so we know not to reconnect.
|
||||
//
|
||||
// TODO: delete peers with invalid info when they get too old (#1873)
|
||||
if !updated.last_known_info_is_valid_for_outbound()
|
||||
&& updated.last_connection_state.is_never_attempted()
|
||||
{
|
||||
return None;
|
||||
}
|
||||
|
||||
|
|
@ -202,21 +235,6 @@ impl AddressBook {
|
|||
}
|
||||
}
|
||||
|
||||
/// Compute a cutoff time that can determine whether an entry
|
||||
/// in an address book being updated with peer message timestamps
|
||||
/// represents a likely-dead (or hung) peer, or a potentially-connected peer.
|
||||
///
|
||||
/// [`constants::LIVE_PEER_DURATION`] represents the time interval in which
|
||||
/// we should receive at least one message from a peer, or close the
|
||||
/// connection. Therefore, if the last-seen timestamp is older than
|
||||
/// [`constants::LIVE_PEER_DURATION`] ago, we know we should have
|
||||
/// disconnected from it. Otherwise, we could potentially be connected to it.
|
||||
fn liveness_cutoff_time() -> DateTime<Utc> {
|
||||
Utc::now()
|
||||
- chrono::Duration::from_std(constants::LIVE_PEER_DURATION)
|
||||
.expect("unexpectedly large constant")
|
||||
}
|
||||
|
||||
/// Returns true if the given [`SocketAddr`] has recently sent us a message.
|
||||
pub fn recently_live_addr(&self, addr: &SocketAddr) -> bool {
|
||||
let _guard = self.span.enter();
|
||||
|
|
@ -224,8 +242,7 @@ impl AddressBook {
|
|||
None => false,
|
||||
// NeverAttempted, Failed, and AttemptPending peers should never be live
|
||||
Some(peer) => {
|
||||
peer.last_connection_state == PeerAddrState::Responded
|
||||
&& peer.get_last_seen() > AddressBook::liveness_cutoff_time()
|
||||
peer.last_connection_state == PeerAddrState::Responded && peer.was_recently_live()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -240,12 +257,6 @@ impl AddressBook {
|
|||
}
|
||||
}
|
||||
|
||||
/// Returns true if the given [`SocketAddr`] might be connected to a node
|
||||
/// feeding timestamps into this address book.
|
||||
pub fn maybe_connected_addr(&self, addr: &SocketAddr) -> bool {
|
||||
self.recently_live_addr(addr) || self.pending_reconnection_addr(addr)
|
||||
}
|
||||
|
||||
/// Return an iterator over all peers.
|
||||
///
|
||||
/// Returns peers in reconnection attempt order, then recently live peers in
|
||||
|
|
@ -266,7 +277,7 @@ impl AddressBook {
|
|||
// Skip live peers, and peers pending a reconnect attempt, then sort using BTreeSet
|
||||
self.by_addr
|
||||
.values()
|
||||
.filter(move |peer| !self.maybe_connected_addr(&peer.addr))
|
||||
.filter(|peer| peer.is_ready_for_attempt())
|
||||
.collect::<BTreeSet<_>>()
|
||||
.into_iter()
|
||||
.cloned()
|
||||
|
|
@ -289,7 +300,7 @@ impl AddressBook {
|
|||
|
||||
self.by_addr
|
||||
.values()
|
||||
.filter(move |peer| self.maybe_connected_addr(&peer.addr))
|
||||
.filter(|peer| !peer.is_ready_for_attempt())
|
||||
.cloned()
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@
|
|||
|
||||
use std::{
|
||||
cmp::{Ord, Ordering},
|
||||
convert::TryInto,
|
||||
io::{Read, Write},
|
||||
net::SocketAddr,
|
||||
time::Instant,
|
||||
|
|
@ -9,13 +10,15 @@ use std::{
|
|||
|
||||
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
|
||||
|
||||
use chrono::Duration;
|
||||
use zebra_chain::serialization::{
|
||||
DateTime32, ReadZcashExt, SerializationError, TrustedPreallocate, WriteZcashExt,
|
||||
ZcashDeserialize, ZcashDeserializeInto, ZcashSerialize,
|
||||
};
|
||||
|
||||
use crate::protocol::{external::MAX_PROTOCOL_MESSAGE_LEN, types::PeerServices};
|
||||
use crate::{
|
||||
constants,
|
||||
protocol::{external::MAX_PROTOCOL_MESSAGE_LEN, types::PeerServices},
|
||||
};
|
||||
|
||||
use MetaAddrChange::*;
|
||||
use PeerAddrState::*;
|
||||
|
|
@ -243,34 +246,7 @@ pub enum MetaAddrChange {
|
|||
},
|
||||
}
|
||||
|
||||
// TODO: remove this use in a follow-up PR
|
||||
use chrono::{DateTime, Utc};
|
||||
|
||||
impl MetaAddr {
|
||||
/// Returns the maximum time among all the time fields.
|
||||
///
|
||||
/// This function exists to replicate an old Zebra bug.
|
||||
/// TODO: remove this function in a follow-up PR
|
||||
pub(crate) fn get_last_seen(&self) -> DateTime<Utc> {
|
||||
let latest_seen = self
|
||||
.untrusted_last_seen
|
||||
.max(self.last_response)
|
||||
.map(DateTime32::to_chrono);
|
||||
|
||||
// At this point it's pretty obvious why we want to get rid of this code
|
||||
let latest_try = self
|
||||
.last_attempt
|
||||
.max(self.last_failure)
|
||||
.map(|latest_try| Instant::now().checked_duration_since(latest_try))
|
||||
.flatten()
|
||||
.map(Duration::from_std)
|
||||
.map(Result::ok)
|
||||
.flatten()
|
||||
.map(|before_now| Utc::now() - before_now);
|
||||
|
||||
latest_seen.or(latest_try).unwrap_or(chrono::MIN_DATETIME)
|
||||
}
|
||||
|
||||
/// Returns a new `MetaAddr`, based on the deserialized fields from a
|
||||
/// gossiped peer [`Addr`][crate::protocol::external::Message::Addr] message.
|
||||
pub fn new_gossiped_meta_addr(
|
||||
|
|
@ -320,7 +296,7 @@ impl MetaAddr {
|
|||
}
|
||||
}
|
||||
|
||||
/// Returns a [`MetaAddrChange::UpdateConnectionAttempt`] for a peer that we
|
||||
/// Returns a [`MetaAddrChange::UpdateAttempt`] for a peer that we
|
||||
/// want to make an outbound connection to.
|
||||
pub fn new_reconnect(addr: &SocketAddr) -> MetaAddrChange {
|
||||
UpdateAttempt { addr: *addr }
|
||||
|
|
@ -335,7 +311,7 @@ impl MetaAddr {
|
|||
}
|
||||
}
|
||||
|
||||
/// Returns a [`MetaAddrChange::NewLocalListener`] for our own listener address.
|
||||
/// Returns a [`MetaAddrChange::NewLocal`] for our own listener address.
|
||||
pub fn new_local_listener(addr: &SocketAddr) -> MetaAddrChange {
|
||||
NewLocal { addr: *addr }
|
||||
}
|
||||
|
|
@ -422,19 +398,84 @@ impl MetaAddr {
|
|||
self.last_failure
|
||||
}
|
||||
|
||||
/// Is this address a directly connected client?
|
||||
pub fn is_direct_client(&self) -> bool {
|
||||
match self.last_connection_state {
|
||||
Responded => !self.services.contains(PeerServices::NODE_NETWORK),
|
||||
NeverAttemptedGossiped | NeverAttemptedAlternate | Failed | AttemptPending => false,
|
||||
/// Have we had any recently messages from this peer?
|
||||
///
|
||||
/// Returns `true` if the peer is likely connected and responsive in the peer
|
||||
/// set.
|
||||
///
|
||||
/// [`constants::LIVE_PEER_DURATION`] represents the time interval in which
|
||||
/// we should receive at least one message from a peer, or close the
|
||||
/// connection. Therefore, if the last-seen timestamp is older than
|
||||
/// [`constants::LIVE_PEER_DURATION`] ago, we know we should have
|
||||
/// disconnected from it. Otherwise, we could potentially be connected to it.
|
||||
pub fn was_recently_live(&self) -> bool {
|
||||
if let Some(last_response) = self.last_response {
|
||||
// Recent times and future times are considered live
|
||||
last_response.saturating_elapsed()
|
||||
<= constants::LIVE_PEER_DURATION
|
||||
.try_into()
|
||||
.expect("unexpectedly large constant")
|
||||
} else {
|
||||
// If there has never been any response, it can't possibly be live
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// Is this address valid for outbound connections?
|
||||
pub fn is_valid_for_outbound(&self) -> bool {
|
||||
self.services.contains(PeerServices::NODE_NETWORK)
|
||||
&& !self.addr.ip().is_unspecified()
|
||||
&& self.addr.port() != 0
|
||||
/// Have we recently attempted an outbound connection to this peer?
|
||||
///
|
||||
/// Returns `true` if this peer was recently attempted, or has a connection
|
||||
/// attempt in progress.
|
||||
pub fn was_recently_attempted(&self) -> bool {
|
||||
if let Some(last_attempt) = self.last_attempt {
|
||||
// Recent times and future times are considered live.
|
||||
// Instants are monotonic, so `now` should always be later than `last_attempt`,
|
||||
// except for synthetic data in tests.
|
||||
Instant::now().saturating_duration_since(last_attempt) <= constants::LIVE_PEER_DURATION
|
||||
} else {
|
||||
// If there has never been any attempt, it can't possibly be live
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// Have we recently had a failed connection to this peer?
|
||||
///
|
||||
/// Returns `true` if this peer has recently failed.
|
||||
pub fn was_recently_failed(&self) -> bool {
|
||||
if let Some(last_failure) = self.last_failure {
|
||||
// Recent times and future times are considered live
|
||||
Instant::now().saturating_duration_since(last_failure) <= constants::LIVE_PEER_DURATION
|
||||
} else {
|
||||
// If there has never been any failure, it can't possibly be recent
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// Is this address ready for a new outbound connection attempt?
|
||||
pub fn is_ready_for_attempt(&self) -> bool {
|
||||
self.last_known_info_is_valid_for_outbound()
|
||||
&& !self.was_recently_live()
|
||||
&& !self.was_recently_attempted()
|
||||
&& !self.was_recently_failed()
|
||||
}
|
||||
|
||||
/// Is the [`SocketAddr`] we have for this peer valid for outbound
|
||||
/// connections?
|
||||
///
|
||||
/// Since the addresses in the address book are unique, this check can be
|
||||
/// used to permanently reject entire [`MetaAddr`]s.
|
||||
pub fn address_is_valid_for_outbound(&self) -> bool {
|
||||
!self.addr.ip().is_unspecified() && self.addr.port() != 0
|
||||
}
|
||||
|
||||
/// Is the last known information for this peer valid for outbound
|
||||
/// connections?
|
||||
///
|
||||
/// The last known info might be outdated or untrusted, so this check can
|
||||
/// only be used to:
|
||||
/// - reject `NeverAttempted...` [`MetaAddrChange`]s, and
|
||||
/// - temporarily stop outbound connections to a [`MetaAddr`].
|
||||
pub fn last_known_info_is_valid_for_outbound(&self) -> bool {
|
||||
self.services.contains(PeerServices::NODE_NETWORK) && self.address_is_valid_for_outbound()
|
||||
}
|
||||
|
||||
/// Return a sanitized version of this `MetaAddr`, for sending to a remote peer.
|
||||
|
|
@ -528,6 +569,9 @@ impl MetaAddrChange {
|
|||
NewGossiped { .. } => None,
|
||||
NewAlternate { .. } => None,
|
||||
NewLocal { .. } => None,
|
||||
// Attempt changes are applied before we start the handshake to the
|
||||
// peer address. So the attempt time is a lower bound for the actual
|
||||
// handshake time.
|
||||
UpdateAttempt { .. } => Some(Instant::now()),
|
||||
UpdateResponded { .. } => None,
|
||||
UpdateFailed { .. } => None,
|
||||
|
|
@ -541,6 +585,11 @@ impl MetaAddrChange {
|
|||
NewAlternate { .. } => None,
|
||||
NewLocal { .. } => None,
|
||||
UpdateAttempt { .. } => None,
|
||||
// If there is a large delay applying this change, then:
|
||||
// - the peer might stay in the `AttemptPending` state for longer,
|
||||
// - we might send outdated last seen times to our peers, and
|
||||
// - the peer will appear to be live for longer, delaying future
|
||||
// reconnection attempts.
|
||||
UpdateResponded { .. } => Some(DateTime32::now()),
|
||||
UpdateFailed { .. } => None,
|
||||
}
|
||||
|
|
@ -554,6 +603,11 @@ impl MetaAddrChange {
|
|||
NewLocal { .. } => None,
|
||||
UpdateAttempt { .. } => None,
|
||||
UpdateResponded { .. } => None,
|
||||
// If there is a large delay applying this change, then:
|
||||
// - the peer might stay in the `AttemptPending` or `Responded`
|
||||
// states for longer, and
|
||||
// - the peer will appear to be used for longer, delaying future
|
||||
// reconnection attempts.
|
||||
UpdateFailed { .. } => Some(Instant::now()),
|
||||
}
|
||||
}
|
||||
|
|
@ -563,7 +617,7 @@ impl MetaAddrChange {
|
|||
match self {
|
||||
NewGossiped { .. } => NeverAttemptedGossiped,
|
||||
NewAlternate { .. } => NeverAttemptedAlternate,
|
||||
// local listeners get sanitized, so the exact value doesn't matter
|
||||
// local listeners get sanitized, so the state doesn't matter here
|
||||
NewLocal { .. } => NeverAttemptedGossiped,
|
||||
UpdateAttempt { .. } => AttemptPending,
|
||||
UpdateResponded { .. } => Responded,
|
||||
|
|
@ -606,18 +660,31 @@ impl MetaAddrChange {
|
|||
|
||||
if change_to_never_attempted {
|
||||
if previous_has_been_attempted {
|
||||
// Security: ignore never attempted changes once we have made an attempt
|
||||
// Existing entry has been attempted, change is NeverAttempted
|
||||
// - ignore the change
|
||||
//
|
||||
// # Security
|
||||
//
|
||||
// Ignore NeverAttempted changes once we have made an attempt,
|
||||
// so malicious peers can't keep changing our peer connection order.
|
||||
None
|
||||
} else {
|
||||
// never attempted to never attempted update: preserve original values
|
||||
// Existing entry and change are both NeverAttempted
|
||||
// - preserve original values of all fields
|
||||
// - but replace None with Some
|
||||
//
|
||||
// # Security
|
||||
//
|
||||
// Preserve the original field values for NeverAttempted peers,
|
||||
// so malicious peers can't keep changing our peer connection order.
|
||||
Some(MetaAddr {
|
||||
addr: self.addr(),
|
||||
// TODO: or(self.untrusted_services()) when services become optional
|
||||
services: previous.services,
|
||||
// Security: only update the last seen time if it is missing
|
||||
untrusted_last_seen: previous
|
||||
.untrusted_last_seen
|
||||
.or_else(|| self.untrusted_last_seen()),
|
||||
// The peer has not been attempted, so these fields must be None
|
||||
last_response: None,
|
||||
last_attempt: None,
|
||||
last_failure: None,
|
||||
|
|
@ -625,15 +692,25 @@ impl MetaAddrChange {
|
|||
})
|
||||
}
|
||||
} else {
|
||||
// any to attempt, responded, or failed: prefer newer values
|
||||
// Existing entry and change are both Attempt, Responded, Failed
|
||||
// - ignore changes to earlier times
|
||||
// - update the services from the change
|
||||
//
|
||||
// # Security
|
||||
//
|
||||
// Ignore changes to earlier times. This enforces the peer
|
||||
// connection timeout, even if changes are applied out of order.
|
||||
Some(MetaAddr {
|
||||
addr: self.addr(),
|
||||
services: self.untrusted_services().unwrap_or(previous.services),
|
||||
// we don't modify the last seen field at all
|
||||
// only NeverAttempted changes can modify the last seen field
|
||||
untrusted_last_seen: previous.untrusted_last_seen,
|
||||
last_response: self.last_response().or(previous.last_response),
|
||||
last_attempt: self.last_attempt().or(previous.last_attempt),
|
||||
last_failure: self.last_failure().or(previous.last_failure),
|
||||
// Since Some(time) is always greater than None, `max` prefers:
|
||||
// - the latest time if both are Some
|
||||
// - Some(time) if the other is None
|
||||
last_response: self.last_response().max(previous.last_response),
|
||||
last_attempt: self.last_attempt().max(previous.last_attempt),
|
||||
last_failure: self.last_failure().max(previous.last_failure),
|
||||
last_connection_state: self.peer_addr_state(),
|
||||
})
|
||||
}
|
||||
|
|
@ -656,32 +733,82 @@ impl Ord for MetaAddr {
|
|||
use std::net::IpAddr::{V4, V6};
|
||||
use Ordering::*;
|
||||
|
||||
let oldest_first = self.get_last_seen().cmp(&other.get_last_seen());
|
||||
let newest_first = oldest_first.reverse();
|
||||
// First, try states that are more likely to work
|
||||
let more_reliable_state = self.last_connection_state.cmp(&other.last_connection_state);
|
||||
|
||||
let connection_state = self.last_connection_state.cmp(&other.last_connection_state);
|
||||
let reconnection_time = match self.last_connection_state {
|
||||
Responded => oldest_first,
|
||||
NeverAttemptedGossiped => newest_first,
|
||||
NeverAttemptedAlternate => newest_first,
|
||||
Failed => oldest_first,
|
||||
AttemptPending => oldest_first,
|
||||
};
|
||||
let ip_numeric = match (self.addr.ip(), other.addr.ip()) {
|
||||
// # Security and Correctness
|
||||
//
|
||||
// Prioritise older attempt times, so we try all peers in each state,
|
||||
// before re-trying any of them. This avoids repeatedly reconnecting to
|
||||
// peers that aren't working.
|
||||
//
|
||||
// Using the internal attempt time for peer ordering also minimises the
|
||||
// amount of information `Addrs` responses leak about Zebra's retry order.
|
||||
|
||||
// If the states are the same, try peers that we haven't tried for a while.
|
||||
//
|
||||
// Each state change updates a specific time field, and
|
||||
// None is less than Some(T),
|
||||
// so the resulting ordering for each state is:
|
||||
// - Responded: oldest attempts first (attempt times are required and unique)
|
||||
// - NeverAttempted...: recent gossiped times first (all other times are None)
|
||||
// - Failed: oldest attempts first (attempt times are required and unique)
|
||||
// - AttemptPending: oldest attempts first (attempt times are required and unique)
|
||||
//
|
||||
// We also compare the other local times, because:
|
||||
// - seed peers may not have an attempt time, and
|
||||
// - updates can be applied to the address book in any order.
|
||||
let older_attempt = self.last_attempt.cmp(&other.last_attempt);
|
||||
let older_failure = self.last_failure.cmp(&other.last_failure);
|
||||
let older_response = self.last_response.cmp(&other.last_response);
|
||||
|
||||
// # Security
|
||||
//
|
||||
// Compare local times before untrusted gossiped times and services.
|
||||
// This gives malicious peers less influence over our peer connection
|
||||
// order.
|
||||
|
||||
// If all local times are None, try peers that other peers have seen more recently
|
||||
let newer_untrusted_last_seen = self
|
||||
.untrusted_last_seen
|
||||
.cmp(&other.untrusted_last_seen)
|
||||
.reverse();
|
||||
|
||||
// Finally, prefer numerically larger service bit patterns
|
||||
//
|
||||
// As of June 2021, Zebra only recognises the NODE_NETWORK bit.
|
||||
// When making outbound connections, Zebra skips non-nodes.
|
||||
// So this comparison will have no impact until Zebra implements
|
||||
// more service features.
|
||||
//
|
||||
// TODO: order services by usefulness, not bit pattern values
|
||||
// Security: split gossiped and direct services
|
||||
let larger_services = self.services.cmp(&other.services);
|
||||
|
||||
// The remaining comparisons are meaningless for peer connection priority.
|
||||
// But they are required so that we have a total order on `MetaAddr` values:
|
||||
// self and other must compare as Equal iff they are equal.
|
||||
|
||||
// As a tie-breaker, compare ip and port numerically
|
||||
//
|
||||
// Since SocketAddrs are unique in the address book, these comparisons
|
||||
// guarantee a total, unique order.
|
||||
let ip_tie_breaker = match (self.addr.ip(), other.addr.ip()) {
|
||||
(V4(a), V4(b)) => a.octets().cmp(&b.octets()),
|
||||
(V6(a), V6(b)) => a.octets().cmp(&b.octets()),
|
||||
(V4(_), V6(_)) => Less,
|
||||
(V6(_), V4(_)) => Greater,
|
||||
};
|
||||
let port_tie_breaker = self.addr.port().cmp(&other.addr.port());
|
||||
|
||||
connection_state
|
||||
.then(reconnection_time)
|
||||
// The remainder is meaningless as an ordering, but required so that we
|
||||
// have a total order on `MetaAddr` values: self and other must compare
|
||||
// as Equal iff they are equal.
|
||||
.then(ip_numeric)
|
||||
.then(self.addr.port().cmp(&other.addr.port()))
|
||||
.then(self.services.bits().cmp(&other.services.bits()))
|
||||
more_reliable_state
|
||||
.then(older_attempt)
|
||||
.then(older_failure)
|
||||
.then(older_response)
|
||||
.then(newer_untrusted_last_seen)
|
||||
.then(larger_services)
|
||||
.then(ip_tie_breaker)
|
||||
.then(port_tie_breaker)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -87,7 +87,7 @@ impl MetaAddrChange {
|
|||
if change
|
||||
.into_new_meta_addr()
|
||||
.expect("unexpected invalid alternate change")
|
||||
.is_valid_for_outbound()
|
||||
.last_known_info_is_valid_for_outbound()
|
||||
{
|
||||
Some(change)
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -1,13 +1,35 @@
|
|||
//! Randomised property tests for MetaAddr.
|
||||
|
||||
use super::check;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
convert::{TryFrom, TryInto},
|
||||
env,
|
||||
net::SocketAddr,
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use crate::meta_addr::{arbitrary::MAX_ADDR_CHANGE, MetaAddr, MetaAddrChange, PeerAddrState::*};
|
||||
|
||||
use proptest::prelude::*;
|
||||
use proptest::{collection::vec, prelude::*};
|
||||
use tokio::{runtime::Runtime, time::Instant};
|
||||
use tower::service_fn;
|
||||
use tracing::Span;
|
||||
|
||||
use zebra_chain::serialization::{ZcashDeserialize, ZcashSerialize};
|
||||
|
||||
use super::check;
|
||||
use crate::{
|
||||
constants::LIVE_PEER_DURATION,
|
||||
meta_addr::{arbitrary::MAX_ADDR_CHANGE, MetaAddr, MetaAddrChange, PeerAddrState::*},
|
||||
peer_set::candidate_set::CandidateSet,
|
||||
AddressBook, Config,
|
||||
};
|
||||
|
||||
/// The number of test cases to use for proptest that have verbose failures.
|
||||
///
|
||||
/// Set this to the default number of proptest cases, unless you're debugging a
|
||||
/// failure.
|
||||
const DEFAULT_VERBOSE_TEST_PROPTEST_CASES: u32 = 256;
|
||||
|
||||
proptest! {
|
||||
/// Make sure that the sanitize function reduces time and state metadata
|
||||
/// leaks.
|
||||
|
|
@ -150,7 +172,7 @@ proptest! {
|
|||
prop_assert_eq!(
|
||||
&addr_bytes,
|
||||
&addr_bytes2,
|
||||
"unexpected round-trip bytes mismatch: original addr: {:?}, bytes: {:?}, deserialized addr: {:?}, bytes: {:?}",
|
||||
"unexpected double-serialization round-trip mismatch with original addr: {:?}, bytes: {:?}, deserialized addr: {:?}, bytes: {:?}",
|
||||
sanitized_addr,
|
||||
hex::encode(&addr_bytes),
|
||||
deserialized_addr,
|
||||
|
|
@ -159,7 +181,7 @@ proptest! {
|
|||
|
||||
}
|
||||
|
||||
/// Make sure that `[MetaAddrChange]`s:
|
||||
/// Make sure that [`MetaAddrChange`]s:
|
||||
/// - do not modify the last seen time, unless it was None, and
|
||||
/// - only modify the services after a response or failure.
|
||||
#[test]
|
||||
|
|
@ -188,4 +210,211 @@ proptest! {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Make sure that [`MetaAddr`]s do not get retried more than once per
|
||||
/// [`LIVE_PEER_DURATION`], regardless of the [`MetaAddrChange`]s that are
|
||||
/// applied to them.
|
||||
///
|
||||
/// This is the simple version of the test, which checks [`MetaAddr`]s by
|
||||
/// themselves. It detects bugs in [`MetaAddr`]s, even if there are
|
||||
/// compensating bugs in the [`CandidateSet`] or [`AddressBook`].
|
||||
#[test]
|
||||
fn individual_peer_retry_limit_meta_addr(
|
||||
(mut addr, changes) in MetaAddrChange::addr_changes_strategy(MAX_ADDR_CHANGE)
|
||||
) {
|
||||
zebra_test::init();
|
||||
|
||||
let mut attempt_count: usize = 0;
|
||||
|
||||
for change in changes {
|
||||
while addr.is_ready_for_attempt() {
|
||||
attempt_count += 1;
|
||||
// Assume that this test doesn't last longer than LIVE_PEER_DURATION
|
||||
prop_assert!(attempt_count <= 1);
|
||||
|
||||
// Simulate an attempt
|
||||
addr = MetaAddr::new_reconnect(&addr.addr)
|
||||
.apply_to_meta_addr(addr)
|
||||
.expect("unexpected invalid attempt");
|
||||
}
|
||||
|
||||
// If `change` is invalid for the current MetaAddr state, skip it.
|
||||
if let Some(changed_addr) = change.apply_to_meta_addr(addr) {
|
||||
assert_eq!(changed_addr.addr, addr.addr);
|
||||
addr = changed_addr;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
proptest! {
|
||||
// These tests can produce a lot of debug output, so we use a smaller number of cases by default.
|
||||
// Set the PROPTEST_CASES env var to override this default.
|
||||
#![proptest_config(proptest::test_runner::Config::with_cases(env::var("PROPTEST_CASES")
|
||||
.ok()
|
||||
.and_then(|v| v.parse().ok())
|
||||
.unwrap_or(DEFAULT_VERBOSE_TEST_PROPTEST_CASES)))]
|
||||
|
||||
/// Make sure that [`MetaAddr`]s do not get retried more than once per
|
||||
/// [`LIVE_PEER_DURATION`], regardless of the [`MetaAddrChange`]s that are
|
||||
/// applied to a single peer's entries in the [`AddressBook`].
|
||||
///
|
||||
/// This is the complex version of the test, which checks [`MetaAddr`],
|
||||
/// [`CandidateSet`] and [`AddressBook`] together.
|
||||
#[test]
|
||||
fn individual_peer_retry_limit_candidate_set(
|
||||
(addr, changes) in MetaAddrChange::addr_changes_strategy(MAX_ADDR_CHANGE)
|
||||
) {
|
||||
zebra_test::init();
|
||||
|
||||
// Run the test for this many simulated live peer durations
|
||||
const LIVE_PEER_INTERVALS: u32 = 3;
|
||||
// Run the test for this much simulated time
|
||||
let overall_test_time: Duration = LIVE_PEER_DURATION * LIVE_PEER_INTERVALS;
|
||||
// Advance the clock by this much for every peer change
|
||||
let peer_change_interval: Duration = overall_test_time / MAX_ADDR_CHANGE.try_into().unwrap();
|
||||
|
||||
assert!(
|
||||
u32::try_from(MAX_ADDR_CHANGE).unwrap() >= 3 * LIVE_PEER_INTERVALS,
|
||||
"there are enough changes for good test coverage",
|
||||
);
|
||||
|
||||
let runtime = Runtime::new().expect("Failed to create Tokio runtime");
|
||||
let _guard = runtime.enter();
|
||||
|
||||
// Only put valid addresses in the address book.
|
||||
// This means some tests will start with an empty address book.
|
||||
let addrs = if addr.last_known_info_is_valid_for_outbound() {
|
||||
Some(addr)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let address_book = Arc::new(std::sync::Mutex::new(AddressBook::new_with_addrs(&Config::default(), Span::none(), addrs)));
|
||||
let peer_service = service_fn(|_| async { unreachable!("Service should not be called") });
|
||||
let mut candidate_set = CandidateSet::new(address_book.clone(), peer_service);
|
||||
|
||||
runtime.block_on(async move {
|
||||
tokio::time::pause();
|
||||
|
||||
// The earliest time we can have a valid next attempt for this peer
|
||||
let earliest_next_attempt = Instant::now() + LIVE_PEER_DURATION;
|
||||
|
||||
// The number of attempts for this peer in the last LIVE_PEER_DURATION
|
||||
let mut attempt_count: usize = 0;
|
||||
|
||||
for (i, change) in changes.into_iter().enumerate() {
|
||||
while let Some(candidate_addr) = candidate_set.next().await {
|
||||
assert_eq!(candidate_addr.addr, addr.addr);
|
||||
|
||||
attempt_count += 1;
|
||||
assert!(
|
||||
attempt_count <= 1,
|
||||
"candidate: {:?}, change: {}, now: {:?}, earliest next attempt: {:?}, \
|
||||
attempts: {}, live peer interval limit: {}, test time limit: {:?}, \
|
||||
peer change interval: {:?}, original addr was in address book: {}",
|
||||
candidate_addr,
|
||||
i,
|
||||
Instant::now(),
|
||||
earliest_next_attempt,
|
||||
attempt_count,
|
||||
LIVE_PEER_INTERVALS,
|
||||
overall_test_time,
|
||||
peer_change_interval,
|
||||
addr.last_known_info_is_valid_for_outbound(),
|
||||
);
|
||||
}
|
||||
|
||||
// If `change` is invalid for the current MetaAddr state,
|
||||
// multiple intervals will elapse between actual changes to
|
||||
// the MetaAddr in the AddressBook.
|
||||
address_book.clone().lock().unwrap().update(change);
|
||||
|
||||
tokio::time::advance(peer_change_interval).await;
|
||||
if Instant::now() >= earliest_next_attempt {
|
||||
attempt_count = 0;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// Make sure that all disconnected [`MetaAddr`]s are retried once, before
|
||||
/// any are retried twice.
|
||||
///
|
||||
/// This is the simple version of the test, which checks [`MetaAddr`]s by
|
||||
/// themselves. It detects bugs in [`MetaAddr`]s, even if there are
|
||||
/// compensating bugs in the [`CandidateSet`] or [`AddressBook`].
|
||||
//
|
||||
// TODO: write a similar test using the AddressBook and CandidateSet
|
||||
#[test]
|
||||
fn multiple_peer_retry_order_meta_addr(
|
||||
addr_changes_lists in vec(
|
||||
MetaAddrChange::addr_changes_strategy(MAX_ADDR_CHANGE),
|
||||
2..MAX_ADDR_CHANGE
|
||||
),
|
||||
) {
|
||||
zebra_test::init();
|
||||
|
||||
// Run the test for this many simulated live peer durations
|
||||
const LIVE_PEER_INTERVALS: u32 = 3;
|
||||
// Run the test for this much simulated time
|
||||
let overall_test_time: Duration = LIVE_PEER_DURATION * LIVE_PEER_INTERVALS;
|
||||
// Advance the clock by this much for every peer change
|
||||
let peer_change_interval: Duration = overall_test_time / MAX_ADDR_CHANGE.try_into().unwrap();
|
||||
|
||||
assert!(
|
||||
u32::try_from(MAX_ADDR_CHANGE).unwrap() >= 3 * LIVE_PEER_INTERVALS,
|
||||
"there are enough changes for good test coverage",
|
||||
);
|
||||
|
||||
let runtime = Runtime::new().expect("Failed to create Tokio runtime");
|
||||
let _guard = runtime.enter();
|
||||
|
||||
let attempt_counts = runtime.block_on(async move {
|
||||
tokio::time::pause();
|
||||
|
||||
// The current attempt counts for each peer in this interval
|
||||
let mut attempt_counts: HashMap<SocketAddr, u32> = HashMap::new();
|
||||
|
||||
// The most recent address info for each peer
|
||||
let mut addrs: HashMap<SocketAddr, MetaAddr> = HashMap::new();
|
||||
|
||||
for change_index in 0..MAX_ADDR_CHANGE {
|
||||
for (addr, changes) in addr_changes_lists.iter() {
|
||||
let addr = addrs.entry(addr.addr).or_insert(*addr);
|
||||
let change = changes.get(change_index);
|
||||
|
||||
while addr.is_ready_for_attempt() {
|
||||
*attempt_counts.entry(addr.addr).or_default() += 1;
|
||||
assert!(*attempt_counts.get(&addr.addr).unwrap() <= LIVE_PEER_INTERVALS + 1);
|
||||
|
||||
// Simulate an attempt
|
||||
*addr = MetaAddr::new_reconnect(&addr.addr)
|
||||
.apply_to_meta_addr(*addr)
|
||||
.expect("unexpected invalid attempt");
|
||||
}
|
||||
|
||||
// If `change` is invalid for the current MetaAddr state, skip it.
|
||||
// If we've run out of changes for this addr, do nothing.
|
||||
if let Some(changed_addr) = change
|
||||
.map(|change| change.apply_to_meta_addr(*addr))
|
||||
.flatten() {
|
||||
assert_eq!(changed_addr.addr, addr.addr);
|
||||
*addr = changed_addr;
|
||||
}
|
||||
}
|
||||
|
||||
tokio::time::advance(peer_change_interval).await;
|
||||
}
|
||||
|
||||
attempt_counts
|
||||
});
|
||||
|
||||
let min_attempts = attempt_counts.values().min();
|
||||
let max_attempts = attempt_counts.values().max();
|
||||
if let (Some(&min_attempts), Some(&max_attempts)) = (min_attempts, max_attempts) {
|
||||
prop_assert!(max_attempts >= min_attempts);
|
||||
prop_assert!(max_attempts - min_attempts <= 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,10 +1,10 @@
|
|||
mod candidate_set;
|
||||
pub(crate) mod candidate_set;
|
||||
mod initialize;
|
||||
mod inventory_registry;
|
||||
mod set;
|
||||
mod unready_service;
|
||||
|
||||
use candidate_set::CandidateSet;
|
||||
pub(crate) use candidate_set::CandidateSet;
|
||||
use inventory_registry::InventoryRegistry;
|
||||
use set::PeerSet;
|
||||
|
||||
|
|
|
|||
|
|
@ -11,101 +11,107 @@ use crate::{constants, types::MetaAddr, AddressBook, BoxError, Request, Response
|
|||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
/// The `CandidateSet` manages the `PeerSet`'s peer reconnection attempts.
|
||||
/// The [`CandidateSet`] manages outbound peer connection attempts.
|
||||
/// Successful connections become peers in the [`PeerSet`].
|
||||
///
|
||||
/// It divides the set of all possible candidate peers into disjoint subsets,
|
||||
/// using the `PeerAddrState`:
|
||||
/// The candidate set divides the set of all possible outbound peers into
|
||||
/// disjoint subsets, using the [`PeerAddrState`]:
|
||||
///
|
||||
/// 1. `Responded` peers, which we previously had inbound or outbound connections
|
||||
/// to. If we have not received any messages from a `Responded` peer within a
|
||||
/// cutoff time, we assume that it has disconnected or hung, and attempt
|
||||
/// reconnection;
|
||||
/// 2. `NeverAttempted` peers, which we learned about from other peers or a DNS
|
||||
/// seeder, but have never connected to;
|
||||
/// 3. `Failed` peers, to whom we attempted to connect but were unable to;
|
||||
/// 4. `AttemptPending` peers, which we've recently queued for reconnection.
|
||||
/// 1. [`Responded`] peers, which we have had an outbound connection to.
|
||||
/// 2. [`NeverAttemptedGossiped`] peers, which we learned about from other peers
|
||||
/// but have never connected to.
|
||||
/// 3. [`NeverAttemptedAlternate`] peers, canonical addresses which we learned
|
||||
/// from the [`Version`] messages of inbound and outbound connections,
|
||||
/// but have never connected to.
|
||||
/// 4. [`Failed`] peers, which failed a connection attempt, or had an error
|
||||
/// during an outbound connection.
|
||||
/// 5. [`AttemptPending`] peers, which we've recently queued for a connection.
|
||||
///
|
||||
/// Never attempted peers are always available for connection.
|
||||
///
|
||||
/// If a peer's attempted, responded, or failure time is recent
|
||||
/// (within the liveness limit), we avoid reconnecting to it.
|
||||
/// Otherwise, we assume that it has disconnected or hung,
|
||||
/// and attempt reconnection.
|
||||
///
|
||||
/// ```ascii,no_run
|
||||
/// ┌──────────────────┐
|
||||
/// │ PeerSet │
|
||||
/// │GetPeers Responses│
|
||||
/// └──────────────────┘
|
||||
/// │
|
||||
/// │
|
||||
/// │
|
||||
/// │
|
||||
/// ▼
|
||||
/// filter by Λ
|
||||
/// !contains_addr ╱ ╲
|
||||
/// ┌────────────────────────────▶▕ ▏
|
||||
/// │ ╲ ╱
|
||||
/// │ V
|
||||
/// │ │
|
||||
/// │ │
|
||||
/// │ │
|
||||
/// │ ┌──────────────────┐ │
|
||||
/// │ │ Inbound │ │
|
||||
/// │ │ Peer Connections │ │
|
||||
/// │ Config / DNS │
|
||||
/// ┌───────────│ Seed │───────────┐
|
||||
/// │ │ Addresses │ │
|
||||
/// │ └──────────────────┘ │
|
||||
/// │ │ │
|
||||
/// ├──────────┼────────────────────┼───────────────────────────────┐
|
||||
/// │ PeerSet ▼ AddressBook ▼ │
|
||||
/// │ ┌─────────────┐ ┌────────────────┐ ┌─────────────┐ │
|
||||
/// │ │ Possibly │ │`NeverAttempted`│ │ `Failed` │ │
|
||||
/// │ │Disconnected │ │ Peers │ │ Peers │◀┼┐
|
||||
/// │ │ `Responded` │ │ │ │ │ ││
|
||||
/// │ │ Peers │ │ │ │ │ ││
|
||||
/// │ └─────────────┘ └────────────────┘ └─────────────┘ ││
|
||||
/// │ │ │ │ ││
|
||||
/// │ #1 oldest_first #2 newest_first #3 oldest_first ││
|
||||
/// │ │ │ │ ││
|
||||
/// │ ├──────────────────────┴──────────────────────┘ ││
|
||||
/// │ │ disjoint `PeerAddrState`s ││
|
||||
/// ├────────┼──────────────────────────────────────────────────────┘│
|
||||
/// │ │ untrusted_last_seen │
|
||||
/// │ │ is unknown │
|
||||
/// ▼ │ ▼
|
||||
/// ┌──────────────────┐ │ ┌──────────────────┐
|
||||
/// │ Handshake │ │ │ Peer Set │
|
||||
/// │ Canonical │──────────┼──────────│ Gossiped │
|
||||
/// │ Addresses │ │ │ Addresses │
|
||||
/// └──────────────────┘ │ └──────────────────┘
|
||||
/// untrusted_last_seen │ provides
|
||||
/// set to now │ untrusted_last_seen
|
||||
/// ▼
|
||||
/// Λ if attempted, responded, or failed:
|
||||
/// ╱ ╲ ignore gossiped info
|
||||
/// ▕ ▏ otherwise, if never attempted:
|
||||
/// ╲ ╱ skip updates to existing fields
|
||||
/// V
|
||||
/// ┌───────────────────────────────┼───────────────────────────────┐
|
||||
/// │ AddressBook │ │
|
||||
/// │ disjoint `PeerAddrState`s ▼ │
|
||||
/// │ ┌─────────────┐ ┌─────────────────────────┐ ┌─────────────┐ │
|
||||
/// │ │ `Responded` │ │`NeverAttemptedGossiped` │ │ `Failed` │ │
|
||||
/// ┌┼▶│ Peers │ │`NeverAttemptedAlternate`│ │ Peers │◀┼┐
|
||||
/// ││ │ │ │ Peers │ │ │ ││
|
||||
/// ││ └─────────────┘ └─────────────────────────┘ └─────────────┘ ││
|
||||
/// ││ │ │ │ ││
|
||||
/// ││ #1 oldest_first #2 newest_first #3 oldest_first ││
|
||||
/// ││ ├──────────────────────┴──────────────────────┘ ││
|
||||
/// ││ ▼ ││
|
||||
/// ││ Λ ││
|
||||
/// ││ ╱ ╲ filter by ││
|
||||
/// ││ ▕ ▏ is_ready_for_attempt ││
|
||||
/// ││ ╲ ╱ to remove recent `Responded`, ││
|
||||
/// ││ V `AttemptPending`, and `Failed` peers ││
|
||||
/// ││ │ ││
|
||||
/// ││ │ try outbound connection, ││
|
||||
/// ││ ▼ update last_attempt to now() ││
|
||||
/// ││┌────────────────┐ ││
|
||||
/// │││`AttemptPending`│ ││
|
||||
/// │││ Peers │ ││
|
||||
/// ││└────────────────┘ ││
|
||||
/// │└────────┼──────────────────────────────────────────────────────┘│
|
||||
/// │ ▼ │
|
||||
/// │ Λ │
|
||||
/// │ ╱ ╲ filter by │
|
||||
/// └─────▶▕ ▏!is_potentially_connected │
|
||||
/// ╲ ╱ to remove live │
|
||||
/// V `Responded` peers │
|
||||
/// │ ╱ ╲ │
|
||||
/// │ ▕ ▏─────────────────────────────────────────────────────┘
|
||||
/// │ ╲ ╱ connection failed, update last_failure to now()
|
||||
/// │ V
|
||||
/// │ │
|
||||
/// │ Try outbound connection │
|
||||
/// ▼ │
|
||||
/// ┌────────────────┐ │
|
||||
/// │`AttemptPending`│ │
|
||||
/// │ Peers │ │
|
||||
/// │ │ │
|
||||
/// └────────────────┘ │
|
||||
/// │ │ connection succeeded
|
||||
/// │ ▼
|
||||
/// │ ┌────────────┐
|
||||
/// │ │ send │
|
||||
/// │ │peer::Client│
|
||||
/// │ │to Discover │
|
||||
/// │ └────────────┘
|
||||
/// │ │
|
||||
/// │ │
|
||||
/// ▼ │
|
||||
/// Λ │
|
||||
/// ╱ ╲ │
|
||||
/// ▕ ▏─────────────────────────────────────────────────────┘
|
||||
/// ╲ ╱ connection failed, update last_seen to now()
|
||||
/// V
|
||||
/// │
|
||||
/// │
|
||||
/// ▼
|
||||
/// ┌────────────┐
|
||||
/// │ send │
|
||||
/// │peer::Client│
|
||||
/// │to Discover │
|
||||
/// └────────────┘
|
||||
/// │
|
||||
/// │
|
||||
/// ▼
|
||||
/// ┌───────────────────────────────────────┐
|
||||
/// │ every time we receive a peer message: │
|
||||
/// │ * update state to `Responded` │
|
||||
/// │ * update last_seen to now() │
|
||||
/// │ ▼
|
||||
/// │┌───────────────────────────────────────┐
|
||||
/// ││ every time we receive a peer message: │
|
||||
/// └│ * update state to `Responded` │
|
||||
/// │ * update last_response to now() │
|
||||
/// └───────────────────────────────────────┘
|
||||
///
|
||||
/// ```
|
||||
// TODO:
|
||||
// * draw arrow from the "peer message" box into the `Responded` state box
|
||||
// * make the "disjoint states" box include `AttemptPending`
|
||||
pub(super) struct CandidateSet<S> {
|
||||
// * show all possible transitions between Attempt/Responded/Failed,
|
||||
// except Failed -> Responded is invalid, must go through Attempt
|
||||
// * for now, seed peers go straight to handshaking and responded,
|
||||
// but we'll fix that once we add the Seed state
|
||||
// When we add the Seed state:
|
||||
// * show that seed peers that transition to other never attempted
|
||||
// states are already in the address book
|
||||
pub(crate) struct CandidateSet<S> {
|
||||
pub(super) address_book: Arc<std::sync::Mutex<AddressBook>>,
|
||||
pub(super) peer_service: S,
|
||||
wait_next_handshake: Sleep,
|
||||
|
|
@ -271,12 +277,10 @@ where
|
|||
|
||||
/// Returns the next candidate for a connection attempt, if any are available.
|
||||
///
|
||||
/// Returns peers in this order:
|
||||
/// - oldest `Responded` that are not live
|
||||
/// - newest `NeverAttempted`
|
||||
/// - oldest `Failed`
|
||||
/// Returns peers in reconnection order, based on
|
||||
/// [`AddressBook::reconnection_peers`].
|
||||
///
|
||||
/// Skips `AttemptPending` peers and live `Responded` peers.
|
||||
/// Skips peers that have recently been active, attempted, or failed.
|
||||
///
|
||||
/// ## Correctness
|
||||
///
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
use proptest::{arbitrary::any, arbitrary::Arbitrary, prelude::*};
|
||||
|
||||
use super::InventoryHash;
|
||||
use super::{types::PeerServices, InventoryHash};
|
||||
|
||||
use zebra_chain::{block, transaction};
|
||||
|
||||
|
|
@ -52,3 +52,16 @@ impl Arbitrary for InventoryHash {
|
|||
|
||||
type Strategy = BoxedStrategy<Self>;
|
||||
}
|
||||
|
||||
#[cfg(any(test, feature = "proptest-impl"))]
|
||||
impl Arbitrary for PeerServices {
|
||||
type Parameters = ();
|
||||
|
||||
fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
|
||||
any::<u64>()
|
||||
.prop_map(PeerServices::from_bits_truncate)
|
||||
.boxed()
|
||||
}
|
||||
|
||||
type Strategy = BoxedStrategy<Self>;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -79,7 +79,6 @@ bitflags! {
|
|||
/// Note that bits 24-31 are reserved for temporary experiments; other
|
||||
/// service bits should be allocated via the ZIP process.
|
||||
#[derive(Default)]
|
||||
#[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))]
|
||||
pub struct PeerServices: u64 {
|
||||
/// NODE_NETWORK means that the node is a full node capable of serving
|
||||
/// blocks, as opposed to a light client that makes network requests but
|
||||
|
|
|
|||
Loading…
Reference in New Issue