diff --git a/zebra-chain/src/serialization/date_time.rs b/zebra-chain/src/serialization/date_time.rs index a2883dce..ce01035a 100644 --- a/zebra-chain/src/serialization/date_time.rs +++ b/zebra-chain/src/serialization/date_time.rs @@ -17,7 +17,23 @@ pub struct DateTime32 { timestamp: u32, } +/// An unsigned time duration, represented by a 32-bit number of seconds. +#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub struct Duration32 { + seconds: u32, +} + impl DateTime32 { + /// The earliest possible `DateTime32` value. + pub const MIN: DateTime32 = DateTime32 { + timestamp: u32::MIN, + }; + + /// The latest possible `DateTime32` value. + pub const MAX: DateTime32 = DateTime32 { + timestamp: u32::MAX, + }; + /// Returns the number of seconds since the UNIX epoch. pub fn timestamp(&self) -> u32 { self.timestamp @@ -34,6 +50,57 @@ impl DateTime32 { .try_into() .expect("unexpected out of range chrono::DateTime") } + + /// Returns the number of seconds elapsed between `earlier` and this time, + /// or `None` if `earlier` is later than this time. + pub fn checked_duration_since(&self, earlier: DateTime32) -> Option { + self.timestamp + .checked_sub(earlier.timestamp) + .map(|seconds| Duration32 { seconds }) + } + + /// Returns the number of seconds elapsed between `earlier` and this time, + /// or zero if `earlier` is later than this time. + pub fn saturating_duration_since(&self, earlier: DateTime32) -> Duration32 { + Duration32 { + seconds: self.timestamp.saturating_sub(earlier.timestamp), + } + } + + /// Returns the number of seconds elapsed since this time, + /// or if this time is in the future, returns `None`. + pub fn checked_elapsed(&self) -> Option { + DateTime32::now().checked_duration_since(*self) + } + + /// Returns the number of seconds elapsed since this time, + /// or if this time is in the future, returns zero. + pub fn saturating_elapsed(&self) -> Duration32 { + DateTime32::now().saturating_duration_since(*self) + } +} + +impl Duration32 { + /// The earliest possible `Duration32` value. + pub const MIN: Duration32 = Duration32 { seconds: u32::MIN }; + + /// The latest possible `Duration32` value. + pub const MAX: Duration32 = Duration32 { seconds: u32::MAX }; + + /// Returns the number of seconds. + pub fn seconds(&self) -> u32 { + self.seconds + } + + /// Returns the equivalent [`chrono::Duration`]. + pub fn to_chrono(self) -> chrono::Duration { + self.into() + } + + /// Returns the equivalent [`std::time::Duration`]. + pub fn to_std(self) -> std::time::Duration { + self.into() + } } impl fmt::Debug for DateTime32 { @@ -45,6 +112,15 @@ impl fmt::Debug for DateTime32 { } } +impl fmt::Debug for Duration32 { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Duration32") + .field("seconds", &self.seconds) + .field("calendar", &chrono::Duration::from(*self)) + .finish() + } +} + impl From for DateTime32 { fn from(value: u32) -> Self { DateTime32 { timestamp: value } @@ -70,6 +146,44 @@ impl From<&DateTime32> for chrono::DateTime { } } +impl From for Duration32 { + fn from(value: u32) -> Self { + Duration32 { seconds: value } + } +} + +impl From<&u32> for Duration32 { + fn from(value: &u32) -> Self { + (*value).into() + } +} + +impl From for chrono::Duration { + fn from(value: Duration32) -> Self { + // chrono::Duration is guaranteed to hold 32-bit values + chrono::Duration::seconds(value.seconds.into()) + } +} + +impl From<&Duration32> for chrono::Duration { + fn from(value: &Duration32) -> Self { + (*value).into() + } +} + +impl From for std::time::Duration { + fn from(value: Duration32) -> Self { + // std::time::Duration is guaranteed to hold 32-bit values + std::time::Duration::from_secs(value.seconds.into()) + } +} + +impl From<&Duration32> for std::time::Duration { + fn from(value: &Duration32) -> Self { + (*value).into() + } +} + impl TryFrom> for DateTime32 { type Error = TryFromIntError; @@ -94,6 +208,54 @@ impl TryFrom<&chrono::DateTime> for DateTime32 { } } +impl TryFrom for Duration32 { + type Error = TryFromIntError; + + /// Convert from a [`chrono::Duration`] to a [`Duration32`], discarding any nanoseconds. + /// + /// Conversion fails if the number of seconds is outside the `u32` range. + fn try_from(value: chrono::Duration) -> Result { + Ok(Self { + seconds: value.num_seconds().try_into()?, + }) + } +} + +impl TryFrom<&chrono::Duration> for Duration32 { + type Error = TryFromIntError; + + /// Convert from a [`chrono::Duration`] to a [`Duration32`], discarding any nanoseconds. + /// + /// Conversion fails if the number of seconds is outside the `u32` range. + fn try_from(value: &chrono::Duration) -> Result { + (*value).try_into() + } +} + +impl TryFrom for Duration32 { + type Error = TryFromIntError; + + /// Convert from a [`std::time::Duration`] to a [`Duration32`], discarding any nanoseconds. + /// + /// Conversion fails if the number of seconds is outside the `u32` range. + fn try_from(value: std::time::Duration) -> Result { + Ok(Self { + seconds: value.as_secs().try_into()?, + }) + } +} + +impl TryFrom<&std::time::Duration> for Duration32 { + type Error = TryFromIntError; + + /// Convert from a [`std::time::Duration`] to a [`Duration32`], discarding any nanoseconds. + /// + /// Conversion fails if the number of seconds is outside the `u32` range. + fn try_from(value: &std::time::Duration) -> Result { + (*value).try_into() + } +} + impl ZcashSerialize for DateTime32 { fn zcash_serialize(&self, mut writer: W) -> Result<(), std::io::Error> { writer.write_u32::(self.timestamp) diff --git a/zebra-network/src/address_book.rs b/zebra-network/src/address_book.rs index 28afe14e..decf9f06 100644 --- a/zebra-network/src/address_book.rs +++ b/zebra-network/src/address_book.rs @@ -8,10 +8,9 @@ use std::{ time::Instant, }; -use chrono::{DateTime, Utc}; use tracing::Span; -use crate::{constants, meta_addr::MetaAddrChange, types::MetaAddr, Config, PeerAddrState}; +use crate::{meta_addr::MetaAddrChange, types::MetaAddr, Config, PeerAddrState}; /// A database of peer listener addresses, their advertised services, and /// information on when they were last seen. @@ -104,6 +103,28 @@ impl AddressBook { new_book } + /// Construct an [`AddressBook`] with the given [`Config`], + /// [`tracing::Span`], and addresses. + /// + /// This constructor can be used to break address book invariants, + /// so it should only be used in tests. + #[cfg(any(test, feature = "proptest-impl"))] + pub fn new_with_addrs( + config: &Config, + span: Span, + addrs: impl IntoIterator, + ) -> AddressBook { + let mut new_book = AddressBook::new(config, span); + + let addrs = addrs + .into_iter() + .map(|meta_addr| (meta_addr.addr, meta_addr)); + new_book.by_addr.extend(addrs); + + new_book.update_metrics(); + new_book + } + /// Get the local listener address. pub fn get_local_listener(&self) -> MetaAddrChange { MetaAddr::new_local_listener(&self.local_listener) @@ -140,6 +161,15 @@ impl AddressBook { /// /// All changes should go through `update`, so that the address book /// only contains valid outbound addresses. + /// + /// # Security + /// + /// This function must apply every attempted, responded, and failed change + /// to the address book. This prevents rapid reconnections to the same peer. + /// + /// As an exception, this function can ignore all changes for specific + /// [`SocketAddr`]s. Ignored addresses will never be used to connect to + /// peers. pub fn update(&mut self, change: MetaAddrChange) -> Option { let _guard = self.span.enter(); @@ -155,19 +185,22 @@ impl AddressBook { ); if let Some(updated) = updated { - // If a node that we are directly connected to has changed to a client, - // remove it from the address book. - if updated.is_direct_client() && previous.is_some() { - std::mem::drop(_guard); - self.take(updated.addr); + // Ignore invalid outbound addresses. + // (Inbound connections can be monitored via Zebra's metrics.) + if !updated.address_is_valid_for_outbound() { return None; } - // Never add unspecified addresses or client services. + // Ignore invalid outbound services and other info, + // but only if the peer has never been attempted. // - // Communication with these addresses can be monitored via Zebra's - // metrics. (The address book is for valid peer addresses.) - if !updated.is_valid_for_outbound() { + // Otherwise, if we got the info directly from the peer, + // store it in the address book, so we know not to reconnect. + // + // TODO: delete peers with invalid info when they get too old (#1873) + if !updated.last_known_info_is_valid_for_outbound() + && updated.last_connection_state.is_never_attempted() + { return None; } @@ -202,21 +235,6 @@ impl AddressBook { } } - /// Compute a cutoff time that can determine whether an entry - /// in an address book being updated with peer message timestamps - /// represents a likely-dead (or hung) peer, or a potentially-connected peer. - /// - /// [`constants::LIVE_PEER_DURATION`] represents the time interval in which - /// we should receive at least one message from a peer, or close the - /// connection. Therefore, if the last-seen timestamp is older than - /// [`constants::LIVE_PEER_DURATION`] ago, we know we should have - /// disconnected from it. Otherwise, we could potentially be connected to it. - fn liveness_cutoff_time() -> DateTime { - Utc::now() - - chrono::Duration::from_std(constants::LIVE_PEER_DURATION) - .expect("unexpectedly large constant") - } - /// Returns true if the given [`SocketAddr`] has recently sent us a message. pub fn recently_live_addr(&self, addr: &SocketAddr) -> bool { let _guard = self.span.enter(); @@ -224,8 +242,7 @@ impl AddressBook { None => false, // NeverAttempted, Failed, and AttemptPending peers should never be live Some(peer) => { - peer.last_connection_state == PeerAddrState::Responded - && peer.get_last_seen() > AddressBook::liveness_cutoff_time() + peer.last_connection_state == PeerAddrState::Responded && peer.was_recently_live() } } } @@ -240,12 +257,6 @@ impl AddressBook { } } - /// Returns true if the given [`SocketAddr`] might be connected to a node - /// feeding timestamps into this address book. - pub fn maybe_connected_addr(&self, addr: &SocketAddr) -> bool { - self.recently_live_addr(addr) || self.pending_reconnection_addr(addr) - } - /// Return an iterator over all peers. /// /// Returns peers in reconnection attempt order, then recently live peers in @@ -266,7 +277,7 @@ impl AddressBook { // Skip live peers, and peers pending a reconnect attempt, then sort using BTreeSet self.by_addr .values() - .filter(move |peer| !self.maybe_connected_addr(&peer.addr)) + .filter(|peer| peer.is_ready_for_attempt()) .collect::>() .into_iter() .cloned() @@ -289,7 +300,7 @@ impl AddressBook { self.by_addr .values() - .filter(move |peer| self.maybe_connected_addr(&peer.addr)) + .filter(|peer| !peer.is_ready_for_attempt()) .cloned() } diff --git a/zebra-network/src/meta_addr.rs b/zebra-network/src/meta_addr.rs index bff058eb..e565261a 100644 --- a/zebra-network/src/meta_addr.rs +++ b/zebra-network/src/meta_addr.rs @@ -2,6 +2,7 @@ use std::{ cmp::{Ord, Ordering}, + convert::TryInto, io::{Read, Write}, net::SocketAddr, time::Instant, @@ -9,13 +10,15 @@ use std::{ use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; -use chrono::Duration; use zebra_chain::serialization::{ DateTime32, ReadZcashExt, SerializationError, TrustedPreallocate, WriteZcashExt, ZcashDeserialize, ZcashDeserializeInto, ZcashSerialize, }; -use crate::protocol::{external::MAX_PROTOCOL_MESSAGE_LEN, types::PeerServices}; +use crate::{ + constants, + protocol::{external::MAX_PROTOCOL_MESSAGE_LEN, types::PeerServices}, +}; use MetaAddrChange::*; use PeerAddrState::*; @@ -243,34 +246,7 @@ pub enum MetaAddrChange { }, } -// TODO: remove this use in a follow-up PR -use chrono::{DateTime, Utc}; - impl MetaAddr { - /// Returns the maximum time among all the time fields. - /// - /// This function exists to replicate an old Zebra bug. - /// TODO: remove this function in a follow-up PR - pub(crate) fn get_last_seen(&self) -> DateTime { - let latest_seen = self - .untrusted_last_seen - .max(self.last_response) - .map(DateTime32::to_chrono); - - // At this point it's pretty obvious why we want to get rid of this code - let latest_try = self - .last_attempt - .max(self.last_failure) - .map(|latest_try| Instant::now().checked_duration_since(latest_try)) - .flatten() - .map(Duration::from_std) - .map(Result::ok) - .flatten() - .map(|before_now| Utc::now() - before_now); - - latest_seen.or(latest_try).unwrap_or(chrono::MIN_DATETIME) - } - /// Returns a new `MetaAddr`, based on the deserialized fields from a /// gossiped peer [`Addr`][crate::protocol::external::Message::Addr] message. pub fn new_gossiped_meta_addr( @@ -320,7 +296,7 @@ impl MetaAddr { } } - /// Returns a [`MetaAddrChange::UpdateConnectionAttempt`] for a peer that we + /// Returns a [`MetaAddrChange::UpdateAttempt`] for a peer that we /// want to make an outbound connection to. pub fn new_reconnect(addr: &SocketAddr) -> MetaAddrChange { UpdateAttempt { addr: *addr } @@ -335,7 +311,7 @@ impl MetaAddr { } } - /// Returns a [`MetaAddrChange::NewLocalListener`] for our own listener address. + /// Returns a [`MetaAddrChange::NewLocal`] for our own listener address. pub fn new_local_listener(addr: &SocketAddr) -> MetaAddrChange { NewLocal { addr: *addr } } @@ -422,19 +398,84 @@ impl MetaAddr { self.last_failure } - /// Is this address a directly connected client? - pub fn is_direct_client(&self) -> bool { - match self.last_connection_state { - Responded => !self.services.contains(PeerServices::NODE_NETWORK), - NeverAttemptedGossiped | NeverAttemptedAlternate | Failed | AttemptPending => false, + /// Have we had any recently messages from this peer? + /// + /// Returns `true` if the peer is likely connected and responsive in the peer + /// set. + /// + /// [`constants::LIVE_PEER_DURATION`] represents the time interval in which + /// we should receive at least one message from a peer, or close the + /// connection. Therefore, if the last-seen timestamp is older than + /// [`constants::LIVE_PEER_DURATION`] ago, we know we should have + /// disconnected from it. Otherwise, we could potentially be connected to it. + pub fn was_recently_live(&self) -> bool { + if let Some(last_response) = self.last_response { + // Recent times and future times are considered live + last_response.saturating_elapsed() + <= constants::LIVE_PEER_DURATION + .try_into() + .expect("unexpectedly large constant") + } else { + // If there has never been any response, it can't possibly be live + false } } - /// Is this address valid for outbound connections? - pub fn is_valid_for_outbound(&self) -> bool { - self.services.contains(PeerServices::NODE_NETWORK) - && !self.addr.ip().is_unspecified() - && self.addr.port() != 0 + /// Have we recently attempted an outbound connection to this peer? + /// + /// Returns `true` if this peer was recently attempted, or has a connection + /// attempt in progress. + pub fn was_recently_attempted(&self) -> bool { + if let Some(last_attempt) = self.last_attempt { + // Recent times and future times are considered live. + // Instants are monotonic, so `now` should always be later than `last_attempt`, + // except for synthetic data in tests. + Instant::now().saturating_duration_since(last_attempt) <= constants::LIVE_PEER_DURATION + } else { + // If there has never been any attempt, it can't possibly be live + false + } + } + + /// Have we recently had a failed connection to this peer? + /// + /// Returns `true` if this peer has recently failed. + pub fn was_recently_failed(&self) -> bool { + if let Some(last_failure) = self.last_failure { + // Recent times and future times are considered live + Instant::now().saturating_duration_since(last_failure) <= constants::LIVE_PEER_DURATION + } else { + // If there has never been any failure, it can't possibly be recent + false + } + } + + /// Is this address ready for a new outbound connection attempt? + pub fn is_ready_for_attempt(&self) -> bool { + self.last_known_info_is_valid_for_outbound() + && !self.was_recently_live() + && !self.was_recently_attempted() + && !self.was_recently_failed() + } + + /// Is the [`SocketAddr`] we have for this peer valid for outbound + /// connections? + /// + /// Since the addresses in the address book are unique, this check can be + /// used to permanently reject entire [`MetaAddr`]s. + pub fn address_is_valid_for_outbound(&self) -> bool { + !self.addr.ip().is_unspecified() && self.addr.port() != 0 + } + + /// Is the last known information for this peer valid for outbound + /// connections? + /// + /// The last known info might be outdated or untrusted, so this check can + /// only be used to: + /// - reject `NeverAttempted...` [`MetaAddrChange`]s, and + /// - temporarily stop outbound connections to a [`MetaAddr`]. + pub fn last_known_info_is_valid_for_outbound(&self) -> bool { + self.services.contains(PeerServices::NODE_NETWORK) && self.address_is_valid_for_outbound() } /// Return a sanitized version of this `MetaAddr`, for sending to a remote peer. @@ -528,6 +569,9 @@ impl MetaAddrChange { NewGossiped { .. } => None, NewAlternate { .. } => None, NewLocal { .. } => None, + // Attempt changes are applied before we start the handshake to the + // peer address. So the attempt time is a lower bound for the actual + // handshake time. UpdateAttempt { .. } => Some(Instant::now()), UpdateResponded { .. } => None, UpdateFailed { .. } => None, @@ -541,6 +585,11 @@ impl MetaAddrChange { NewAlternate { .. } => None, NewLocal { .. } => None, UpdateAttempt { .. } => None, + // If there is a large delay applying this change, then: + // - the peer might stay in the `AttemptPending` state for longer, + // - we might send outdated last seen times to our peers, and + // - the peer will appear to be live for longer, delaying future + // reconnection attempts. UpdateResponded { .. } => Some(DateTime32::now()), UpdateFailed { .. } => None, } @@ -554,6 +603,11 @@ impl MetaAddrChange { NewLocal { .. } => None, UpdateAttempt { .. } => None, UpdateResponded { .. } => None, + // If there is a large delay applying this change, then: + // - the peer might stay in the `AttemptPending` or `Responded` + // states for longer, and + // - the peer will appear to be used for longer, delaying future + // reconnection attempts. UpdateFailed { .. } => Some(Instant::now()), } } @@ -563,7 +617,7 @@ impl MetaAddrChange { match self { NewGossiped { .. } => NeverAttemptedGossiped, NewAlternate { .. } => NeverAttemptedAlternate, - // local listeners get sanitized, so the exact value doesn't matter + // local listeners get sanitized, so the state doesn't matter here NewLocal { .. } => NeverAttemptedGossiped, UpdateAttempt { .. } => AttemptPending, UpdateResponded { .. } => Responded, @@ -606,18 +660,31 @@ impl MetaAddrChange { if change_to_never_attempted { if previous_has_been_attempted { - // Security: ignore never attempted changes once we have made an attempt + // Existing entry has been attempted, change is NeverAttempted + // - ignore the change + // + // # Security + // + // Ignore NeverAttempted changes once we have made an attempt, + // so malicious peers can't keep changing our peer connection order. None } else { - // never attempted to never attempted update: preserve original values + // Existing entry and change are both NeverAttempted + // - preserve original values of all fields + // - but replace None with Some + // + // # Security + // + // Preserve the original field values for NeverAttempted peers, + // so malicious peers can't keep changing our peer connection order. Some(MetaAddr { addr: self.addr(), // TODO: or(self.untrusted_services()) when services become optional services: previous.services, - // Security: only update the last seen time if it is missing untrusted_last_seen: previous .untrusted_last_seen .or_else(|| self.untrusted_last_seen()), + // The peer has not been attempted, so these fields must be None last_response: None, last_attempt: None, last_failure: None, @@ -625,15 +692,25 @@ impl MetaAddrChange { }) } } else { - // any to attempt, responded, or failed: prefer newer values + // Existing entry and change are both Attempt, Responded, Failed + // - ignore changes to earlier times + // - update the services from the change + // + // # Security + // + // Ignore changes to earlier times. This enforces the peer + // connection timeout, even if changes are applied out of order. Some(MetaAddr { addr: self.addr(), services: self.untrusted_services().unwrap_or(previous.services), - // we don't modify the last seen field at all + // only NeverAttempted changes can modify the last seen field untrusted_last_seen: previous.untrusted_last_seen, - last_response: self.last_response().or(previous.last_response), - last_attempt: self.last_attempt().or(previous.last_attempt), - last_failure: self.last_failure().or(previous.last_failure), + // Since Some(time) is always greater than None, `max` prefers: + // - the latest time if both are Some + // - Some(time) if the other is None + last_response: self.last_response().max(previous.last_response), + last_attempt: self.last_attempt().max(previous.last_attempt), + last_failure: self.last_failure().max(previous.last_failure), last_connection_state: self.peer_addr_state(), }) } @@ -656,32 +733,82 @@ impl Ord for MetaAddr { use std::net::IpAddr::{V4, V6}; use Ordering::*; - let oldest_first = self.get_last_seen().cmp(&other.get_last_seen()); - let newest_first = oldest_first.reverse(); + // First, try states that are more likely to work + let more_reliable_state = self.last_connection_state.cmp(&other.last_connection_state); - let connection_state = self.last_connection_state.cmp(&other.last_connection_state); - let reconnection_time = match self.last_connection_state { - Responded => oldest_first, - NeverAttemptedGossiped => newest_first, - NeverAttemptedAlternate => newest_first, - Failed => oldest_first, - AttemptPending => oldest_first, - }; - let ip_numeric = match (self.addr.ip(), other.addr.ip()) { + // # Security and Correctness + // + // Prioritise older attempt times, so we try all peers in each state, + // before re-trying any of them. This avoids repeatedly reconnecting to + // peers that aren't working. + // + // Using the internal attempt time for peer ordering also minimises the + // amount of information `Addrs` responses leak about Zebra's retry order. + + // If the states are the same, try peers that we haven't tried for a while. + // + // Each state change updates a specific time field, and + // None is less than Some(T), + // so the resulting ordering for each state is: + // - Responded: oldest attempts first (attempt times are required and unique) + // - NeverAttempted...: recent gossiped times first (all other times are None) + // - Failed: oldest attempts first (attempt times are required and unique) + // - AttemptPending: oldest attempts first (attempt times are required and unique) + // + // We also compare the other local times, because: + // - seed peers may not have an attempt time, and + // - updates can be applied to the address book in any order. + let older_attempt = self.last_attempt.cmp(&other.last_attempt); + let older_failure = self.last_failure.cmp(&other.last_failure); + let older_response = self.last_response.cmp(&other.last_response); + + // # Security + // + // Compare local times before untrusted gossiped times and services. + // This gives malicious peers less influence over our peer connection + // order. + + // If all local times are None, try peers that other peers have seen more recently + let newer_untrusted_last_seen = self + .untrusted_last_seen + .cmp(&other.untrusted_last_seen) + .reverse(); + + // Finally, prefer numerically larger service bit patterns + // + // As of June 2021, Zebra only recognises the NODE_NETWORK bit. + // When making outbound connections, Zebra skips non-nodes. + // So this comparison will have no impact until Zebra implements + // more service features. + // + // TODO: order services by usefulness, not bit pattern values + // Security: split gossiped and direct services + let larger_services = self.services.cmp(&other.services); + + // The remaining comparisons are meaningless for peer connection priority. + // But they are required so that we have a total order on `MetaAddr` values: + // self and other must compare as Equal iff they are equal. + + // As a tie-breaker, compare ip and port numerically + // + // Since SocketAddrs are unique in the address book, these comparisons + // guarantee a total, unique order. + let ip_tie_breaker = match (self.addr.ip(), other.addr.ip()) { (V4(a), V4(b)) => a.octets().cmp(&b.octets()), (V6(a), V6(b)) => a.octets().cmp(&b.octets()), (V4(_), V6(_)) => Less, (V6(_), V4(_)) => Greater, }; + let port_tie_breaker = self.addr.port().cmp(&other.addr.port()); - connection_state - .then(reconnection_time) - // The remainder is meaningless as an ordering, but required so that we - // have a total order on `MetaAddr` values: self and other must compare - // as Equal iff they are equal. - .then(ip_numeric) - .then(self.addr.port().cmp(&other.addr.port())) - .then(self.services.bits().cmp(&other.services.bits())) + more_reliable_state + .then(older_attempt) + .then(older_failure) + .then(older_response) + .then(newer_untrusted_last_seen) + .then(larger_services) + .then(ip_tie_breaker) + .then(port_tie_breaker) } } diff --git a/zebra-network/src/meta_addr/arbitrary.rs b/zebra-network/src/meta_addr/arbitrary.rs index c4737cd1..90f6a9aa 100644 --- a/zebra-network/src/meta_addr/arbitrary.rs +++ b/zebra-network/src/meta_addr/arbitrary.rs @@ -87,7 +87,7 @@ impl MetaAddrChange { if change .into_new_meta_addr() .expect("unexpected invalid alternate change") - .is_valid_for_outbound() + .last_known_info_is_valid_for_outbound() { Some(change) } else { diff --git a/zebra-network/src/meta_addr/tests/prop.rs b/zebra-network/src/meta_addr/tests/prop.rs index 074d6d60..efc57d11 100644 --- a/zebra-network/src/meta_addr/tests/prop.rs +++ b/zebra-network/src/meta_addr/tests/prop.rs @@ -1,13 +1,35 @@ //! Randomised property tests for MetaAddr. -use super::check; +use std::{ + collections::HashMap, + convert::{TryFrom, TryInto}, + env, + net::SocketAddr, + sync::Arc, + time::Duration, +}; -use crate::meta_addr::{arbitrary::MAX_ADDR_CHANGE, MetaAddr, MetaAddrChange, PeerAddrState::*}; - -use proptest::prelude::*; +use proptest::{collection::vec, prelude::*}; +use tokio::{runtime::Runtime, time::Instant}; +use tower::service_fn; +use tracing::Span; use zebra_chain::serialization::{ZcashDeserialize, ZcashSerialize}; +use super::check; +use crate::{ + constants::LIVE_PEER_DURATION, + meta_addr::{arbitrary::MAX_ADDR_CHANGE, MetaAddr, MetaAddrChange, PeerAddrState::*}, + peer_set::candidate_set::CandidateSet, + AddressBook, Config, +}; + +/// The number of test cases to use for proptest that have verbose failures. +/// +/// Set this to the default number of proptest cases, unless you're debugging a +/// failure. +const DEFAULT_VERBOSE_TEST_PROPTEST_CASES: u32 = 256; + proptest! { /// Make sure that the sanitize function reduces time and state metadata /// leaks. @@ -150,7 +172,7 @@ proptest! { prop_assert_eq!( &addr_bytes, &addr_bytes2, - "unexpected round-trip bytes mismatch: original addr: {:?}, bytes: {:?}, deserialized addr: {:?}, bytes: {:?}", + "unexpected double-serialization round-trip mismatch with original addr: {:?}, bytes: {:?}, deserialized addr: {:?}, bytes: {:?}", sanitized_addr, hex::encode(&addr_bytes), deserialized_addr, @@ -159,7 +181,7 @@ proptest! { } - /// Make sure that `[MetaAddrChange]`s: + /// Make sure that [`MetaAddrChange`]s: /// - do not modify the last seen time, unless it was None, and /// - only modify the services after a response or failure. #[test] @@ -188,4 +210,211 @@ proptest! { } } } + + /// Make sure that [`MetaAddr`]s do not get retried more than once per + /// [`LIVE_PEER_DURATION`], regardless of the [`MetaAddrChange`]s that are + /// applied to them. + /// + /// This is the simple version of the test, which checks [`MetaAddr`]s by + /// themselves. It detects bugs in [`MetaAddr`]s, even if there are + /// compensating bugs in the [`CandidateSet`] or [`AddressBook`]. + #[test] + fn individual_peer_retry_limit_meta_addr( + (mut addr, changes) in MetaAddrChange::addr_changes_strategy(MAX_ADDR_CHANGE) + ) { + zebra_test::init(); + + let mut attempt_count: usize = 0; + + for change in changes { + while addr.is_ready_for_attempt() { + attempt_count += 1; + // Assume that this test doesn't last longer than LIVE_PEER_DURATION + prop_assert!(attempt_count <= 1); + + // Simulate an attempt + addr = MetaAddr::new_reconnect(&addr.addr) + .apply_to_meta_addr(addr) + .expect("unexpected invalid attempt"); + } + + // If `change` is invalid for the current MetaAddr state, skip it. + if let Some(changed_addr) = change.apply_to_meta_addr(addr) { + assert_eq!(changed_addr.addr, addr.addr); + addr = changed_addr; + } + } + } +} + +proptest! { + // These tests can produce a lot of debug output, so we use a smaller number of cases by default. + // Set the PROPTEST_CASES env var to override this default. + #![proptest_config(proptest::test_runner::Config::with_cases(env::var("PROPTEST_CASES") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(DEFAULT_VERBOSE_TEST_PROPTEST_CASES)))] + + /// Make sure that [`MetaAddr`]s do not get retried more than once per + /// [`LIVE_PEER_DURATION`], regardless of the [`MetaAddrChange`]s that are + /// applied to a single peer's entries in the [`AddressBook`]. + /// + /// This is the complex version of the test, which checks [`MetaAddr`], + /// [`CandidateSet`] and [`AddressBook`] together. + #[test] + fn individual_peer_retry_limit_candidate_set( + (addr, changes) in MetaAddrChange::addr_changes_strategy(MAX_ADDR_CHANGE) + ) { + zebra_test::init(); + + // Run the test for this many simulated live peer durations + const LIVE_PEER_INTERVALS: u32 = 3; + // Run the test for this much simulated time + let overall_test_time: Duration = LIVE_PEER_DURATION * LIVE_PEER_INTERVALS; + // Advance the clock by this much for every peer change + let peer_change_interval: Duration = overall_test_time / MAX_ADDR_CHANGE.try_into().unwrap(); + + assert!( + u32::try_from(MAX_ADDR_CHANGE).unwrap() >= 3 * LIVE_PEER_INTERVALS, + "there are enough changes for good test coverage", + ); + + let runtime = Runtime::new().expect("Failed to create Tokio runtime"); + let _guard = runtime.enter(); + + // Only put valid addresses in the address book. + // This means some tests will start with an empty address book. + let addrs = if addr.last_known_info_is_valid_for_outbound() { + Some(addr) + } else { + None + }; + + let address_book = Arc::new(std::sync::Mutex::new(AddressBook::new_with_addrs(&Config::default(), Span::none(), addrs))); + let peer_service = service_fn(|_| async { unreachable!("Service should not be called") }); + let mut candidate_set = CandidateSet::new(address_book.clone(), peer_service); + + runtime.block_on(async move { + tokio::time::pause(); + + // The earliest time we can have a valid next attempt for this peer + let earliest_next_attempt = Instant::now() + LIVE_PEER_DURATION; + + // The number of attempts for this peer in the last LIVE_PEER_DURATION + let mut attempt_count: usize = 0; + + for (i, change) in changes.into_iter().enumerate() { + while let Some(candidate_addr) = candidate_set.next().await { + assert_eq!(candidate_addr.addr, addr.addr); + + attempt_count += 1; + assert!( + attempt_count <= 1, + "candidate: {:?}, change: {}, now: {:?}, earliest next attempt: {:?}, \ + attempts: {}, live peer interval limit: {}, test time limit: {:?}, \ + peer change interval: {:?}, original addr was in address book: {}", + candidate_addr, + i, + Instant::now(), + earliest_next_attempt, + attempt_count, + LIVE_PEER_INTERVALS, + overall_test_time, + peer_change_interval, + addr.last_known_info_is_valid_for_outbound(), + ); + } + + // If `change` is invalid for the current MetaAddr state, + // multiple intervals will elapse between actual changes to + // the MetaAddr in the AddressBook. + address_book.clone().lock().unwrap().update(change); + + tokio::time::advance(peer_change_interval).await; + if Instant::now() >= earliest_next_attempt { + attempt_count = 0; + } + } + }); + } + + /// Make sure that all disconnected [`MetaAddr`]s are retried once, before + /// any are retried twice. + /// + /// This is the simple version of the test, which checks [`MetaAddr`]s by + /// themselves. It detects bugs in [`MetaAddr`]s, even if there are + /// compensating bugs in the [`CandidateSet`] or [`AddressBook`]. + // + // TODO: write a similar test using the AddressBook and CandidateSet + #[test] + fn multiple_peer_retry_order_meta_addr( + addr_changes_lists in vec( + MetaAddrChange::addr_changes_strategy(MAX_ADDR_CHANGE), + 2..MAX_ADDR_CHANGE + ), + ) { + zebra_test::init(); + + // Run the test for this many simulated live peer durations + const LIVE_PEER_INTERVALS: u32 = 3; + // Run the test for this much simulated time + let overall_test_time: Duration = LIVE_PEER_DURATION * LIVE_PEER_INTERVALS; + // Advance the clock by this much for every peer change + let peer_change_interval: Duration = overall_test_time / MAX_ADDR_CHANGE.try_into().unwrap(); + + assert!( + u32::try_from(MAX_ADDR_CHANGE).unwrap() >= 3 * LIVE_PEER_INTERVALS, + "there are enough changes for good test coverage", + ); + + let runtime = Runtime::new().expect("Failed to create Tokio runtime"); + let _guard = runtime.enter(); + + let attempt_counts = runtime.block_on(async move { + tokio::time::pause(); + + // The current attempt counts for each peer in this interval + let mut attempt_counts: HashMap = HashMap::new(); + + // The most recent address info for each peer + let mut addrs: HashMap = HashMap::new(); + + for change_index in 0..MAX_ADDR_CHANGE { + for (addr, changes) in addr_changes_lists.iter() { + let addr = addrs.entry(addr.addr).or_insert(*addr); + let change = changes.get(change_index); + + while addr.is_ready_for_attempt() { + *attempt_counts.entry(addr.addr).or_default() += 1; + assert!(*attempt_counts.get(&addr.addr).unwrap() <= LIVE_PEER_INTERVALS + 1); + + // Simulate an attempt + *addr = MetaAddr::new_reconnect(&addr.addr) + .apply_to_meta_addr(*addr) + .expect("unexpected invalid attempt"); + } + + // If `change` is invalid for the current MetaAddr state, skip it. + // If we've run out of changes for this addr, do nothing. + if let Some(changed_addr) = change + .map(|change| change.apply_to_meta_addr(*addr)) + .flatten() { + assert_eq!(changed_addr.addr, addr.addr); + *addr = changed_addr; + } + } + + tokio::time::advance(peer_change_interval).await; + } + + attempt_counts + }); + + let min_attempts = attempt_counts.values().min(); + let max_attempts = attempt_counts.values().max(); + if let (Some(&min_attempts), Some(&max_attempts)) = (min_attempts, max_attempts) { + prop_assert!(max_attempts >= min_attempts); + prop_assert!(max_attempts - min_attempts <= 1); + } + } } diff --git a/zebra-network/src/peer_set.rs b/zebra-network/src/peer_set.rs index 412d7caf..c5c4da43 100644 --- a/zebra-network/src/peer_set.rs +++ b/zebra-network/src/peer_set.rs @@ -1,10 +1,10 @@ -mod candidate_set; +pub(crate) mod candidate_set; mod initialize; mod inventory_registry; mod set; mod unready_service; -use candidate_set::CandidateSet; +pub(crate) use candidate_set::CandidateSet; use inventory_registry::InventoryRegistry; use set::PeerSet; diff --git a/zebra-network/src/peer_set/candidate_set.rs b/zebra-network/src/peer_set/candidate_set.rs index 5b51c539..cc4ac3ff 100644 --- a/zebra-network/src/peer_set/candidate_set.rs +++ b/zebra-network/src/peer_set/candidate_set.rs @@ -11,101 +11,107 @@ use crate::{constants, types::MetaAddr, AddressBook, BoxError, Request, Response #[cfg(test)] mod tests; -/// The `CandidateSet` manages the `PeerSet`'s peer reconnection attempts. +/// The [`CandidateSet`] manages outbound peer connection attempts. +/// Successful connections become peers in the [`PeerSet`]. /// -/// It divides the set of all possible candidate peers into disjoint subsets, -/// using the `PeerAddrState`: +/// The candidate set divides the set of all possible outbound peers into +/// disjoint subsets, using the [`PeerAddrState`]: /// -/// 1. `Responded` peers, which we previously had inbound or outbound connections -/// to. If we have not received any messages from a `Responded` peer within a -/// cutoff time, we assume that it has disconnected or hung, and attempt -/// reconnection; -/// 2. `NeverAttempted` peers, which we learned about from other peers or a DNS -/// seeder, but have never connected to; -/// 3. `Failed` peers, to whom we attempted to connect but were unable to; -/// 4. `AttemptPending` peers, which we've recently queued for reconnection. +/// 1. [`Responded`] peers, which we have had an outbound connection to. +/// 2. [`NeverAttemptedGossiped`] peers, which we learned about from other peers +/// but have never connected to. +/// 3. [`NeverAttemptedAlternate`] peers, canonical addresses which we learned +/// from the [`Version`] messages of inbound and outbound connections, +/// but have never connected to. +/// 4. [`Failed`] peers, which failed a connection attempt, or had an error +/// during an outbound connection. +/// 5. [`AttemptPending`] peers, which we've recently queued for a connection. +/// +/// Never attempted peers are always available for connection. +/// +/// If a peer's attempted, responded, or failure time is recent +/// (within the liveness limit), we avoid reconnecting to it. +/// Otherwise, we assume that it has disconnected or hung, +/// and attempt reconnection. /// /// ```ascii,no_run /// ┌──────────────────┐ -/// │ PeerSet │ -/// │GetPeers Responses│ -/// └──────────────────┘ -/// │ -/// │ -/// │ -/// │ +/// │ Config / DNS │ +/// ┌───────────│ Seed │───────────┐ +/// │ │ Addresses │ │ +/// │ └──────────────────┘ │ +/// │ │ untrusted_last_seen │ +/// │ │ is unknown │ +/// ▼ │ ▼ +/// ┌──────────────────┐ │ ┌──────────────────┐ +/// │ Handshake │ │ │ Peer Set │ +/// │ Canonical │──────────┼──────────│ Gossiped │ +/// │ Addresses │ │ │ Addresses │ +/// └──────────────────┘ │ └──────────────────┘ +/// untrusted_last_seen │ provides +/// set to now │ untrusted_last_seen /// ▼ -/// filter by Λ -/// !contains_addr ╱ ╲ -/// ┌────────────────────────────▶▕ ▏ -/// │ ╲ ╱ -/// │ V -/// │ │ -/// │ │ -/// │ │ -/// │ ┌──────────────────┐ │ -/// │ │ Inbound │ │ -/// │ │ Peer Connections │ │ -/// │ └──────────────────┘ │ -/// │ │ │ -/// ├──────────┼────────────────────┼───────────────────────────────┐ -/// │ PeerSet ▼ AddressBook ▼ │ -/// │ ┌─────────────┐ ┌────────────────┐ ┌─────────────┐ │ -/// │ │ Possibly │ │`NeverAttempted`│ │ `Failed` │ │ -/// │ │Disconnected │ │ Peers │ │ Peers │◀┼┐ -/// │ │ `Responded` │ │ │ │ │ ││ -/// │ │ Peers │ │ │ │ │ ││ -/// │ └─────────────┘ └────────────────┘ └─────────────┘ ││ -/// │ │ │ │ ││ -/// │ #1 oldest_first #2 newest_first #3 oldest_first ││ -/// │ │ │ │ ││ -/// │ ├──────────────────────┴──────────────────────┘ ││ -/// │ │ disjoint `PeerAddrState`s ││ -/// ├────────┼──────────────────────────────────────────────────────┘│ -/// │ ▼ │ -/// │ Λ │ -/// │ ╱ ╲ filter by │ -/// └─────▶▕ ▏!is_potentially_connected │ -/// ╲ ╱ to remove live │ -/// V `Responded` peers │ -/// │ │ -/// │ Try outbound connection │ -/// ▼ │ -/// ┌────────────────┐ │ -/// │`AttemptPending`│ │ -/// │ Peers │ │ -/// │ │ │ -/// └────────────────┘ │ -/// │ │ -/// │ │ -/// ▼ │ -/// Λ │ -/// ╱ ╲ │ -/// ▕ ▏─────────────────────────────────────────────────────┘ -/// ╲ ╱ connection failed, update last_seen to now() -/// V -/// │ -/// │ -/// ▼ -/// ┌────────────┐ -/// │ send │ -/// │peer::Client│ -/// │to Discover │ -/// └────────────┘ -/// │ -/// │ -/// ▼ -/// ┌───────────────────────────────────────┐ -/// │ every time we receive a peer message: │ -/// │ * update state to `Responded` │ -/// │ * update last_seen to now() │ +/// Λ if attempted, responded, or failed: +/// ╱ ╲ ignore gossiped info +/// ▕ ▏ otherwise, if never attempted: +/// ╲ ╱ skip updates to existing fields +/// V +/// ┌───────────────────────────────┼───────────────────────────────┐ +/// │ AddressBook │ │ +/// │ disjoint `PeerAddrState`s ▼ │ +/// │ ┌─────────────┐ ┌─────────────────────────┐ ┌─────────────┐ │ +/// │ │ `Responded` │ │`NeverAttemptedGossiped` │ │ `Failed` │ │ +/// ┌┼▶│ Peers │ │`NeverAttemptedAlternate`│ │ Peers │◀┼┐ +/// ││ │ │ │ Peers │ │ │ ││ +/// ││ └─────────────┘ └─────────────────────────┘ └─────────────┘ ││ +/// ││ │ │ │ ││ +/// ││ #1 oldest_first #2 newest_first #3 oldest_first ││ +/// ││ ├──────────────────────┴──────────────────────┘ ││ +/// ││ ▼ ││ +/// ││ Λ ││ +/// ││ ╱ ╲ filter by ││ +/// ││ ▕ ▏ is_ready_for_attempt ││ +/// ││ ╲ ╱ to remove recent `Responded`, ││ +/// ││ V `AttemptPending`, and `Failed` peers ││ +/// ││ │ ││ +/// ││ │ try outbound connection, ││ +/// ││ ▼ update last_attempt to now() ││ +/// ││┌────────────────┐ ││ +/// │││`AttemptPending`│ ││ +/// │││ Peers │ ││ +/// ││└────────────────┘ ││ +/// │└────────┼──────────────────────────────────────────────────────┘│ +/// │ ▼ │ +/// │ Λ │ +/// │ ╱ ╲ │ +/// │ ▕ ▏─────────────────────────────────────────────────────┘ +/// │ ╲ ╱ connection failed, update last_failure to now() +/// │ V +/// │ │ +/// │ │ connection succeeded +/// │ ▼ +/// │ ┌────────────┐ +/// │ │ send │ +/// │ │peer::Client│ +/// │ │to Discover │ +/// │ └────────────┘ +/// │ │ +/// │ ▼ +/// │┌───────────────────────────────────────┐ +/// ││ every time we receive a peer message: │ +/// └│ * update state to `Responded` │ +/// │ * update last_response to now() │ /// └───────────────────────────────────────┘ -/// /// ``` // TODO: -// * draw arrow from the "peer message" box into the `Responded` state box -// * make the "disjoint states" box include `AttemptPending` -pub(super) struct CandidateSet { +// * show all possible transitions between Attempt/Responded/Failed, +// except Failed -> Responded is invalid, must go through Attempt +// * for now, seed peers go straight to handshaking and responded, +// but we'll fix that once we add the Seed state +// When we add the Seed state: +// * show that seed peers that transition to other never attempted +// states are already in the address book +pub(crate) struct CandidateSet { pub(super) address_book: Arc>, pub(super) peer_service: S, wait_next_handshake: Sleep, @@ -271,12 +277,10 @@ where /// Returns the next candidate for a connection attempt, if any are available. /// - /// Returns peers in this order: - /// - oldest `Responded` that are not live - /// - newest `NeverAttempted` - /// - oldest `Failed` + /// Returns peers in reconnection order, based on + /// [`AddressBook::reconnection_peers`]. /// - /// Skips `AttemptPending` peers and live `Responded` peers. + /// Skips peers that have recently been active, attempted, or failed. /// /// ## Correctness /// diff --git a/zebra-network/src/protocol/external/arbitrary.rs b/zebra-network/src/protocol/external/arbitrary.rs index 3173350e..e736f409 100644 --- a/zebra-network/src/protocol/external/arbitrary.rs +++ b/zebra-network/src/protocol/external/arbitrary.rs @@ -1,6 +1,6 @@ use proptest::{arbitrary::any, arbitrary::Arbitrary, prelude::*}; -use super::InventoryHash; +use super::{types::PeerServices, InventoryHash}; use zebra_chain::{block, transaction}; @@ -52,3 +52,16 @@ impl Arbitrary for InventoryHash { type Strategy = BoxedStrategy; } + +#[cfg(any(test, feature = "proptest-impl"))] +impl Arbitrary for PeerServices { + type Parameters = (); + + fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy { + any::() + .prop_map(PeerServices::from_bits_truncate) + .boxed() + } + + type Strategy = BoxedStrategy; +} diff --git a/zebra-network/src/protocol/external/types.rs b/zebra-network/src/protocol/external/types.rs index 0a5658f0..3f9ecb5e 100644 --- a/zebra-network/src/protocol/external/types.rs +++ b/zebra-network/src/protocol/external/types.rs @@ -79,7 +79,6 @@ bitflags! { /// Note that bits 24-31 are reserved for temporary experiments; other /// service bits should be allocated via the ZIP process. #[derive(Default)] - #[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))] pub struct PeerServices: u64 { /// NODE_NETWORK means that the node is a full node capable of serving /// blocks, as opposed to a light client that makes network requests but