fix(network): Ignore out of order Address Book changes, unless they are concurrent (#6717)
* Ignore out of order Address Book changes, and restructure the function * Handle concurrent changes using the connection state machine order * Handle out of order changes correctly * Pass times through the call stack so they are consistent in tests * Add time arguments to tests * Fix tests that were broken by the address order checks * fastmod wall_ local_ zebra* * cargo fmt --all * Fix a bug in the concurrent change check * Test all the new apply and skip checks for address changes * Document more edge cases and increase the concurrency time slightly * Simplify enum ordering matches * Fix comment typos Co-authored-by: Arya <aryasolhi@gmail.com> --------- Co-authored-by: Arya <aryasolhi@gmail.com>
This commit is contained in:
parent
56c9116649
commit
8af4e572c9
|
|
@ -14,7 +14,7 @@ use ordered_map::OrderedMap;
|
||||||
use tokio::sync::watch;
|
use tokio::sync::watch;
|
||||||
use tracing::Span;
|
use tracing::Span;
|
||||||
|
|
||||||
use zebra_chain::parameters::Network;
|
use zebra_chain::{parameters::Network, serialization::DateTime32};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
constants,
|
constants,
|
||||||
|
|
@ -228,10 +228,11 @@ impl AddressBook {
|
||||||
/// Get the local listener address.
|
/// Get the local listener address.
|
||||||
///
|
///
|
||||||
/// This address contains minimal state, but it is not sanitized.
|
/// This address contains minimal state, but it is not sanitized.
|
||||||
pub fn local_listener_meta_addr(&self) -> MetaAddr {
|
pub fn local_listener_meta_addr(&self, now: chrono::DateTime<Utc>) -> MetaAddr {
|
||||||
|
let now: DateTime32 = now.try_into().expect("will succeed until 2038");
|
||||||
|
|
||||||
MetaAddr::new_local_listener_change(self.local_listener)
|
MetaAddr::new_local_listener_change(self.local_listener)
|
||||||
.into_new_meta_addr()
|
.local_listener_into_new_meta_addr(now)
|
||||||
.expect("unexpected invalid new local listener addr")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the local listener [`SocketAddr`].
|
/// Get the local listener [`SocketAddr`].
|
||||||
|
|
@ -249,7 +250,7 @@ impl AddressBook {
|
||||||
// Unconditionally add our local listener address to the advertised peers,
|
// Unconditionally add our local listener address to the advertised peers,
|
||||||
// to replace any self-connection failures. The address book and change
|
// to replace any self-connection failures. The address book and change
|
||||||
// constructors make sure that the SocketAddr is canonical.
|
// constructors make sure that the SocketAddr is canonical.
|
||||||
let local_listener = self.local_listener_meta_addr();
|
let local_listener = self.local_listener_meta_addr(now);
|
||||||
peers.insert(local_listener.addr, local_listener);
|
peers.insert(local_listener.addr, local_listener);
|
||||||
|
|
||||||
// Then sanitize and shuffle
|
// Then sanitize and shuffle
|
||||||
|
|
@ -313,7 +314,7 @@ impl AddressBook {
|
||||||
let instant_now = Instant::now();
|
let instant_now = Instant::now();
|
||||||
let chrono_now = Utc::now();
|
let chrono_now = Utc::now();
|
||||||
|
|
||||||
let updated = change.apply_to_meta_addr(previous);
|
let updated = change.apply_to_meta_addr(previous, instant_now, chrono_now);
|
||||||
|
|
||||||
trace!(
|
trace!(
|
||||||
?change,
|
?change,
|
||||||
|
|
|
||||||
|
|
@ -90,6 +90,24 @@ pub const REQUEST_TIMEOUT: Duration = Duration::from_secs(20);
|
||||||
/// nodes, and on testnet.
|
/// nodes, and on testnet.
|
||||||
pub const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(3);
|
pub const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(3);
|
||||||
|
|
||||||
|
/// The maximum time difference for two address book changes to be considered concurrent.
|
||||||
|
///
|
||||||
|
/// This prevents simultaneous or nearby important changes or connection progress
|
||||||
|
/// being overridden by less important changes.
|
||||||
|
///
|
||||||
|
/// This timeout should be less than:
|
||||||
|
/// - the [peer reconnection delay](MIN_PEER_RECONNECTION_DELAY), and
|
||||||
|
/// - the [peer keepalive/heartbeat interval](HEARTBEAT_INTERVAL).
|
||||||
|
///
|
||||||
|
/// But more than:
|
||||||
|
/// - the amount of time between connection events and address book updates,
|
||||||
|
/// even under heavy load (in tests, we have observed delays up to 500ms),
|
||||||
|
/// - the delay between an outbound connection failing,
|
||||||
|
/// and the [CandidateSet](crate::peer_set::CandidateSet) registering the failure, and
|
||||||
|
/// - the delay between the application closing a connection,
|
||||||
|
/// and any remaining positive changes from the peer.
|
||||||
|
pub const CONCURRENT_ADDRESS_CHANGE_PERIOD: Duration = Duration::from_secs(5);
|
||||||
|
|
||||||
/// We expect to receive a message from a live peer at least once in this time duration.
|
/// We expect to receive a message from a live peer at least once in this time duration.
|
||||||
///
|
///
|
||||||
/// This is the sum of:
|
/// This is the sum of:
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,7 @@
|
||||||
//! An address-with-metadata type used in Bitcoin networking.
|
//! An address-with-metadata type used in Bitcoin networking.
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
cmp::{Ord, Ordering},
|
cmp::{max, Ord, Ordering},
|
||||||
time::Instant,
|
time::Instant,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -79,6 +79,38 @@ impl PeerAddrState {
|
||||||
AttemptPending | Responded | Failed => false,
|
AttemptPending | Responded | Failed => false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns the typical connection state machine order of `self` and `other`.
|
||||||
|
/// Partially ordered states are sorted in connection attempt order.
|
||||||
|
///
|
||||||
|
/// See [`MetaAddrChange::apply_to_meta_addr()`] for more details.
|
||||||
|
fn connection_state_order(&self, other: &Self) -> Ordering {
|
||||||
|
use Ordering::*;
|
||||||
|
match (self, other) {
|
||||||
|
_ if self == other => Equal,
|
||||||
|
// Peers start in one of the "never attempted" states,
|
||||||
|
// then typically progress towards a "responded" or "failed" state.
|
||||||
|
//
|
||||||
|
// # Security
|
||||||
|
//
|
||||||
|
// Prefer gossiped addresses to alternate addresses,
|
||||||
|
// so that peers can't replace the addresses of other peers.
|
||||||
|
// (This is currently checked explicitly by the address update code,
|
||||||
|
// but we respect the same order here as a precaution.)
|
||||||
|
(NeverAttemptedAlternate, _) => Less,
|
||||||
|
(_, NeverAttemptedAlternate) => Greater,
|
||||||
|
(NeverAttemptedGossiped, _) => Less,
|
||||||
|
(_, NeverAttemptedGossiped) => Greater,
|
||||||
|
(AttemptPending, _) => Less,
|
||||||
|
(_, AttemptPending) => Greater,
|
||||||
|
(Responded, _) => Less,
|
||||||
|
(_, Responded) => Greater,
|
||||||
|
// These patterns are redundant, but Rust doesn't assume that `==` is reflexive,
|
||||||
|
// so the first is still required (but unreachable).
|
||||||
|
(Failed, _) => Less,
|
||||||
|
//(_, Failed) => Greater,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// non-test code should explicitly specify the peer address state
|
// non-test code should explicitly specify the peer address state
|
||||||
|
|
@ -100,11 +132,7 @@ impl Ord for PeerAddrState {
|
||||||
fn cmp(&self, other: &Self) -> Ordering {
|
fn cmp(&self, other: &Self) -> Ordering {
|
||||||
use Ordering::*;
|
use Ordering::*;
|
||||||
match (self, other) {
|
match (self, other) {
|
||||||
(Responded, Responded)
|
_ if self == other => Equal,
|
||||||
| (Failed, Failed)
|
|
||||||
| (NeverAttemptedGossiped, NeverAttemptedGossiped)
|
|
||||||
| (NeverAttemptedAlternate, NeverAttemptedAlternate)
|
|
||||||
| (AttemptPending, AttemptPending) => Equal,
|
|
||||||
// We reconnect to `Responded` peers that have stopped sending messages,
|
// We reconnect to `Responded` peers that have stopped sending messages,
|
||||||
// then `NeverAttempted` peers, then `Failed` peers
|
// then `NeverAttempted` peers, then `Failed` peers
|
||||||
(Responded, _) => Less,
|
(Responded, _) => Less,
|
||||||
|
|
@ -115,7 +143,10 @@ impl Ord for PeerAddrState {
|
||||||
(_, NeverAttemptedAlternate) => Greater,
|
(_, NeverAttemptedAlternate) => Greater,
|
||||||
(Failed, _) => Less,
|
(Failed, _) => Less,
|
||||||
(_, Failed) => Greater,
|
(_, Failed) => Greater,
|
||||||
// AttemptPending is covered by the other cases
|
// These patterns are redundant, but Rust doesn't assume that `==` is reflexive,
|
||||||
|
// so the first is still required (but unreachable).
|
||||||
|
(AttemptPending, _) => Less,
|
||||||
|
//(_, AttemptPending) => Greater,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -195,6 +226,9 @@ pub struct MetaAddr {
|
||||||
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
|
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
|
||||||
#[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))]
|
#[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))]
|
||||||
pub enum MetaAddrChange {
|
pub enum MetaAddrChange {
|
||||||
|
// TODO:
|
||||||
|
// - split the common `addr` field into an outer struct
|
||||||
|
//
|
||||||
/// Creates a `MetaAddr` for an initial peer.
|
/// Creates a `MetaAddr` for an initial peer.
|
||||||
NewInitial {
|
NewInitial {
|
||||||
#[cfg_attr(
|
#[cfg_attr(
|
||||||
|
|
@ -694,7 +728,7 @@ impl MetaAddrChange {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return the untrusted last seen time for this change, if available.
|
/// Return the untrusted last seen time for this change, if available.
|
||||||
pub fn untrusted_last_seen(&self) -> Option<DateTime32> {
|
pub fn untrusted_last_seen(&self, now: DateTime32) -> Option<DateTime32> {
|
||||||
match self {
|
match self {
|
||||||
NewInitial { .. } => None,
|
NewInitial { .. } => None,
|
||||||
NewGossiped {
|
NewGossiped {
|
||||||
|
|
@ -703,15 +737,34 @@ impl MetaAddrChange {
|
||||||
} => Some(*untrusted_last_seen),
|
} => Some(*untrusted_last_seen),
|
||||||
NewAlternate { .. } => None,
|
NewAlternate { .. } => None,
|
||||||
// We know that our local listener is available
|
// We know that our local listener is available
|
||||||
NewLocal { .. } => Some(DateTime32::now()),
|
NewLocal { .. } => Some(now),
|
||||||
UpdateAttempt { .. } => None,
|
UpdateAttempt { .. } => None,
|
||||||
UpdateResponded { .. } => None,
|
UpdateResponded { .. } => None,
|
||||||
UpdateFailed { .. } => None,
|
UpdateFailed { .. } => None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// # Concurrency
|
||||||
|
//
|
||||||
|
// We assign a time to each change when it is applied to the address book by either the
|
||||||
|
// address book updater or candidate set tasks. This is the time that the change was received
|
||||||
|
// from the updater channel, rather than the time that the message was read from the peer
|
||||||
|
// connection.
|
||||||
|
//
|
||||||
|
// Since the connection tasks run concurrently in an unspecified order, and the address book
|
||||||
|
// updater runs in a separate thread, these times are almost always very similar. If Zebra's
|
||||||
|
// address book is under load, we should use lower rate-limits for new inbound or outbound
|
||||||
|
// connections, disconnections, peer gossip crawls, or peer `UpdateResponded` updates.
|
||||||
|
//
|
||||||
|
// TODO:
|
||||||
|
// - move the time API calls from `impl MetaAddrChange` `last_*()` methods:
|
||||||
|
// - if they impact performance, call them once in the address book updater task,
|
||||||
|
// then apply them to all the waiting changes
|
||||||
|
// - otherwise, move them to the `impl MetaAddrChange` `new_*()` methods,
|
||||||
|
// so they are called in the connection tasks
|
||||||
|
//
|
||||||
/// Return the last attempt for this change, if available.
|
/// Return the last attempt for this change, if available.
|
||||||
pub fn last_attempt(&self) -> Option<Instant> {
|
pub fn last_attempt(&self, now: Instant) -> Option<Instant> {
|
||||||
match self {
|
match self {
|
||||||
NewInitial { .. } => None,
|
NewInitial { .. } => None,
|
||||||
NewGossiped { .. } => None,
|
NewGossiped { .. } => None,
|
||||||
|
|
@ -720,14 +773,14 @@ impl MetaAddrChange {
|
||||||
// Attempt changes are applied before we start the handshake to the
|
// Attempt changes are applied before we start the handshake to the
|
||||||
// peer address. So the attempt time is a lower bound for the actual
|
// peer address. So the attempt time is a lower bound for the actual
|
||||||
// handshake time.
|
// handshake time.
|
||||||
UpdateAttempt { .. } => Some(Instant::now()),
|
UpdateAttempt { .. } => Some(now),
|
||||||
UpdateResponded { .. } => None,
|
UpdateResponded { .. } => None,
|
||||||
UpdateFailed { .. } => None,
|
UpdateFailed { .. } => None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return the last response for this change, if available.
|
/// Return the last response for this change, if available.
|
||||||
pub fn last_response(&self) -> Option<DateTime32> {
|
pub fn last_response(&self, now: DateTime32) -> Option<DateTime32> {
|
||||||
match self {
|
match self {
|
||||||
NewInitial { .. } => None,
|
NewInitial { .. } => None,
|
||||||
NewGossiped { .. } => None,
|
NewGossiped { .. } => None,
|
||||||
|
|
@ -739,13 +792,13 @@ impl MetaAddrChange {
|
||||||
// - we might send outdated last seen times to our peers, and
|
// - we might send outdated last seen times to our peers, and
|
||||||
// - the peer will appear to be live for longer, delaying future
|
// - the peer will appear to be live for longer, delaying future
|
||||||
// reconnection attempts.
|
// reconnection attempts.
|
||||||
UpdateResponded { .. } => Some(DateTime32::now()),
|
UpdateResponded { .. } => Some(now),
|
||||||
UpdateFailed { .. } => None,
|
UpdateFailed { .. } => None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return the last failure for this change, if available.
|
/// Return the last failure for this change, if available.
|
||||||
pub fn last_failure(&self) -> Option<Instant> {
|
pub fn last_failure(&self, now: Instant) -> Option<Instant> {
|
||||||
match self {
|
match self {
|
||||||
NewInitial { .. } => None,
|
NewInitial { .. } => None,
|
||||||
NewGossiped { .. } => None,
|
NewGossiped { .. } => None,
|
||||||
|
|
@ -758,7 +811,7 @@ impl MetaAddrChange {
|
||||||
// states for longer, and
|
// states for longer, and
|
||||||
// - the peer will appear to be used for longer, delaying future
|
// - the peer will appear to be used for longer, delaying future
|
||||||
// reconnection attempts.
|
// reconnection attempts.
|
||||||
UpdateFailed { .. } => Some(Instant::now()),
|
UpdateFailed { .. } => Some(now),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -776,93 +829,212 @@ impl MetaAddrChange {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// If this change can create a new `MetaAddr`, return that address.
|
/// Returns the corresponding `MetaAddr` for this change.
|
||||||
pub fn into_new_meta_addr(self) -> Option<MetaAddr> {
|
pub fn into_new_meta_addr(self, instant_now: Instant, local_now: DateTime32) -> MetaAddr {
|
||||||
Some(MetaAddr {
|
MetaAddr {
|
||||||
addr: self.addr(),
|
addr: self.addr(),
|
||||||
services: self.untrusted_services(),
|
services: self.untrusted_services(),
|
||||||
untrusted_last_seen: self.untrusted_last_seen(),
|
untrusted_last_seen: self.untrusted_last_seen(local_now),
|
||||||
last_response: self.last_response(),
|
last_response: self.last_response(local_now),
|
||||||
last_attempt: self.last_attempt(),
|
last_attempt: self.last_attempt(instant_now),
|
||||||
last_failure: self.last_failure(),
|
last_failure: self.last_failure(instant_now),
|
||||||
last_connection_state: self.peer_addr_state(),
|
last_connection_state: self.peer_addr_state(),
|
||||||
})
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the corresponding [`MetaAddr`] for a local listener change.
|
||||||
|
///
|
||||||
|
/// This method exists so we don't have to provide an unused [`Instant`] to get a local
|
||||||
|
/// listener `MetaAddr`.
|
||||||
|
///
|
||||||
|
/// # Panics
|
||||||
|
///
|
||||||
|
/// If this change is not a [`MetaAddrChange::NewLocal`].
|
||||||
|
pub fn local_listener_into_new_meta_addr(self, local_now: DateTime32) -> MetaAddr {
|
||||||
|
assert!(matches!(self, MetaAddrChange::NewLocal { .. }));
|
||||||
|
|
||||||
|
MetaAddr {
|
||||||
|
addr: self.addr(),
|
||||||
|
services: self.untrusted_services(),
|
||||||
|
untrusted_last_seen: self.untrusted_last_seen(local_now),
|
||||||
|
last_response: self.last_response(local_now),
|
||||||
|
last_attempt: None,
|
||||||
|
last_failure: None,
|
||||||
|
last_connection_state: self.peer_addr_state(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Apply this change to a previous `MetaAddr` from the address book,
|
/// Apply this change to a previous `MetaAddr` from the address book,
|
||||||
/// producing a new or updated `MetaAddr`.
|
/// producing a new or updated `MetaAddr`.
|
||||||
///
|
///
|
||||||
/// If the change isn't valid for the `previous` address, returns `None`.
|
/// If the change isn't valid for the `previous` address, returns `None`.
|
||||||
pub fn apply_to_meta_addr(&self, previous: impl Into<Option<MetaAddr>>) -> Option<MetaAddr> {
|
#[allow(clippy::unwrap_in_result)]
|
||||||
if let Some(previous) = previous.into() {
|
pub fn apply_to_meta_addr(
|
||||||
assert_eq!(previous.addr, self.addr(), "unexpected addr mismatch");
|
&self,
|
||||||
|
previous: impl Into<Option<MetaAddr>>,
|
||||||
|
instant_now: Instant,
|
||||||
|
chrono_now: chrono::DateTime<Utc>,
|
||||||
|
) -> Option<MetaAddr> {
|
||||||
|
let local_now: DateTime32 = chrono_now.try_into().expect("will succeed until 2038");
|
||||||
|
|
||||||
let previous_has_been_attempted = !previous.last_connection_state.is_never_attempted();
|
let Some(previous) = previous.into() else {
|
||||||
let change_to_never_attempted = self
|
|
||||||
.into_new_meta_addr()
|
|
||||||
.map(|meta_addr| meta_addr.last_connection_state.is_never_attempted())
|
|
||||||
.unwrap_or(false);
|
|
||||||
|
|
||||||
if change_to_never_attempted {
|
|
||||||
if previous_has_been_attempted {
|
|
||||||
// Existing entry has been attempted, change is NeverAttempted
|
|
||||||
// - ignore the change
|
|
||||||
//
|
|
||||||
// # Security
|
|
||||||
//
|
|
||||||
// Ignore NeverAttempted changes once we have made an attempt,
|
|
||||||
// so malicious peers can't keep changing our peer connection order.
|
|
||||||
None
|
|
||||||
} else {
|
|
||||||
// Existing entry and change are both NeverAttempted
|
|
||||||
// - preserve original values of all fields
|
|
||||||
// - but replace None with Some
|
|
||||||
//
|
|
||||||
// # Security
|
|
||||||
//
|
|
||||||
// Preserve the original field values for NeverAttempted peers,
|
|
||||||
// so malicious peers can't keep changing our peer connection order.
|
|
||||||
Some(MetaAddr {
|
|
||||||
addr: self.addr(),
|
|
||||||
services: previous.services.or_else(|| self.untrusted_services()),
|
|
||||||
untrusted_last_seen: previous
|
|
||||||
.untrusted_last_seen
|
|
||||||
.or_else(|| self.untrusted_last_seen()),
|
|
||||||
// The peer has not been attempted, so these fields must be None
|
|
||||||
last_response: None,
|
|
||||||
last_attempt: None,
|
|
||||||
last_failure: None,
|
|
||||||
last_connection_state: self.peer_addr_state(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Existing entry and change are both Attempt, Responded, Failed
|
|
||||||
// - ignore changes to earlier times
|
|
||||||
// - update the services from the change
|
|
||||||
//
|
|
||||||
// # Security
|
|
||||||
//
|
|
||||||
// Ignore changes to earlier times. This enforces the peer
|
|
||||||
// connection timeout, even if changes are applied out of order.
|
|
||||||
Some(MetaAddr {
|
|
||||||
addr: self.addr(),
|
|
||||||
// We want up-to-date services, even if they have fewer bits,
|
|
||||||
// or they are applied out of order.
|
|
||||||
services: self.untrusted_services().or(previous.services),
|
|
||||||
// Only NeverAttempted changes can modify the last seen field
|
|
||||||
untrusted_last_seen: previous.untrusted_last_seen,
|
|
||||||
// Since Some(time) is always greater than None, `max` prefers:
|
|
||||||
// - the latest time if both are Some
|
|
||||||
// - Some(time) if the other is None
|
|
||||||
last_response: self.last_response().max(previous.last_response),
|
|
||||||
last_attempt: self.last_attempt().max(previous.last_attempt),
|
|
||||||
last_failure: self.last_failure().max(previous.last_failure),
|
|
||||||
last_connection_state: self.peer_addr_state(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// no previous: create a new entry
|
// no previous: create a new entry
|
||||||
self.into_new_meta_addr()
|
return Some(self.into_new_meta_addr(instant_now, local_now));
|
||||||
|
};
|
||||||
|
|
||||||
|
assert_eq!(previous.addr, self.addr(), "unexpected addr mismatch");
|
||||||
|
|
||||||
|
let instant_previous = max(previous.last_attempt, previous.last_failure);
|
||||||
|
let local_previous = previous.last_response;
|
||||||
|
|
||||||
|
// Is this change potentially concurrent with the previous change?
|
||||||
|
//
|
||||||
|
// Since we're using saturating arithmetic, one of each pair of less than comparisons
|
||||||
|
// will always be true, because subtraction saturates to zero.
|
||||||
|
let change_is_concurrent = instant_previous
|
||||||
|
.map(|instant_previous| {
|
||||||
|
instant_previous.saturating_duration_since(instant_now)
|
||||||
|
< constants::CONCURRENT_ADDRESS_CHANGE_PERIOD
|
||||||
|
&& instant_now.saturating_duration_since(instant_previous)
|
||||||
|
< constants::CONCURRENT_ADDRESS_CHANGE_PERIOD
|
||||||
|
})
|
||||||
|
.unwrap_or_default()
|
||||||
|
|| local_previous
|
||||||
|
.map(|local_previous| {
|
||||||
|
local_previous.saturating_duration_since(local_now).to_std()
|
||||||
|
< constants::CONCURRENT_ADDRESS_CHANGE_PERIOD
|
||||||
|
&& local_now.saturating_duration_since(local_previous).to_std()
|
||||||
|
< constants::CONCURRENT_ADDRESS_CHANGE_PERIOD
|
||||||
|
})
|
||||||
|
.unwrap_or_default();
|
||||||
|
let change_is_out_of_order = instant_previous
|
||||||
|
.map(|instant_previous| instant_previous > instant_now)
|
||||||
|
.unwrap_or_default()
|
||||||
|
|| local_previous
|
||||||
|
.map(|local_previous| local_previous > local_now)
|
||||||
|
.unwrap_or_default();
|
||||||
|
|
||||||
|
// Is this change typically from a connection state that has more progress?
|
||||||
|
let connection_has_more_progress = self
|
||||||
|
.peer_addr_state()
|
||||||
|
.connection_state_order(&previous.last_connection_state)
|
||||||
|
== Ordering::Greater;
|
||||||
|
|
||||||
|
let previous_has_been_attempted = !previous.last_connection_state.is_never_attempted();
|
||||||
|
let change_to_never_attempted = self.peer_addr_state().is_never_attempted();
|
||||||
|
|
||||||
|
// Invalid changes
|
||||||
|
|
||||||
|
if change_to_never_attempted && previous_has_been_attempted {
|
||||||
|
// Existing entry has been attempted, change is NeverAttempted
|
||||||
|
// - ignore the change
|
||||||
|
//
|
||||||
|
// # Security
|
||||||
|
//
|
||||||
|
// Ignore NeverAttempted changes once we have made an attempt,
|
||||||
|
// so malicious peers can't keep changing our peer connection order.
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
if change_is_out_of_order && !change_is_concurrent {
|
||||||
|
// Change is significantly out of order: ignore it.
|
||||||
|
//
|
||||||
|
// # Security
|
||||||
|
//
|
||||||
|
// Ignore changes that arrive out of order, if they are far enough apart.
|
||||||
|
// This enforces the peer connection retry interval.
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
if change_is_concurrent && !connection_has_more_progress {
|
||||||
|
// Change is close together in time, and it would revert the connection to an earlier
|
||||||
|
// state.
|
||||||
|
//
|
||||||
|
// # Security
|
||||||
|
//
|
||||||
|
// If the changes might have been concurrent, ignore connection states with less
|
||||||
|
// progress.
|
||||||
|
//
|
||||||
|
// ## Sources of Concurrency
|
||||||
|
//
|
||||||
|
// If two changes happen close together, the async scheduler can run their change
|
||||||
|
// send and apply code in any order. This includes the code that records the time of
|
||||||
|
// the change. So even if a failure happens after a response message, the failure time
|
||||||
|
// can be recorded before the response time code is run.
|
||||||
|
//
|
||||||
|
// Some machines and OSes have limited time resolution, so we can't guarantee that
|
||||||
|
// two messages on the same connection will always have different times. There are
|
||||||
|
// also known bugs impacting monotonic times which make them go backwards or stay
|
||||||
|
// equal. For wall clock times, clock skew is an expected event, particularly with
|
||||||
|
// network time server updates.
|
||||||
|
//
|
||||||
|
// Also, the application can fail a connection independently and simultaneously
|
||||||
|
// (or slightly before) a positive update from that peer connection. We want the
|
||||||
|
// application change to take priority in the address book, because the connection
|
||||||
|
// state machine also prioritises failures over any other peer messages.
|
||||||
|
//
|
||||||
|
// ## Resolution
|
||||||
|
//
|
||||||
|
// In these cases, we want to apply the failure, then ignore any nearby changes that
|
||||||
|
// reset the address book entry to a more appealing state. This prevents peers from
|
||||||
|
// sending updates right before failing a connection, in order to make themselves more
|
||||||
|
// likely to get a reconnection.
|
||||||
|
//
|
||||||
|
// The connection state machine order is used so that state transitions which are
|
||||||
|
// typically close together are preserved. These transitions are:
|
||||||
|
// - NeverAttempted*->AttemptPending->(Responded|Failed)
|
||||||
|
// - Responded->Failed
|
||||||
|
//
|
||||||
|
// State transitions like (Responded|Failed)->AttemptPending only happen after the
|
||||||
|
// reconnection timeout, so they will never be considered concurrent.
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Valid changes
|
||||||
|
|
||||||
|
if change_to_never_attempted && !previous_has_been_attempted {
|
||||||
|
// Existing entry and change are both NeverAttempted
|
||||||
|
// - preserve original values of all fields
|
||||||
|
// - but replace None with Some
|
||||||
|
//
|
||||||
|
// # Security
|
||||||
|
//
|
||||||
|
// Preserve the original field values for NeverAttempted peers,
|
||||||
|
// so malicious peers can't keep changing our peer connection order.
|
||||||
|
Some(MetaAddr {
|
||||||
|
addr: self.addr(),
|
||||||
|
services: previous.services.or_else(|| self.untrusted_services()),
|
||||||
|
untrusted_last_seen: previous
|
||||||
|
.untrusted_last_seen
|
||||||
|
.or_else(|| self.untrusted_last_seen(local_now)),
|
||||||
|
// The peer has not been attempted, so these fields must be None
|
||||||
|
last_response: None,
|
||||||
|
last_attempt: None,
|
||||||
|
last_failure: None,
|
||||||
|
last_connection_state: self.peer_addr_state(),
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
// Existing entry and change are both Attempt, Responded, Failed,
|
||||||
|
// and the change is later, either in time or in connection progress
|
||||||
|
// (this is checked above and returns None early):
|
||||||
|
// - update the fields from the change
|
||||||
|
Some(MetaAddr {
|
||||||
|
addr: self.addr(),
|
||||||
|
// Always update optional fields, unless the update is None.
|
||||||
|
//
|
||||||
|
// We want up-to-date services, even if they have fewer bits
|
||||||
|
services: self.untrusted_services().or(previous.services),
|
||||||
|
// Only NeverAttempted changes can modify the last seen field
|
||||||
|
untrusted_last_seen: previous.untrusted_last_seen,
|
||||||
|
// This is a wall clock time, but we already checked that responses are in order.
|
||||||
|
// Even if the wall clock time has jumped, we want to use the latest time.
|
||||||
|
last_response: self.last_response(local_now).or(previous.last_response),
|
||||||
|
// These are monotonic times, we already checked the responses are in order.
|
||||||
|
last_attempt: self.last_attempt(instant_now).or(previous.last_attempt),
|
||||||
|
last_failure: self.last_failure(instant_now).or(previous.last_failure),
|
||||||
|
// Replace the state with the updated state.
|
||||||
|
last_connection_state: self.peer_addr_state(),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,7 @@
|
||||||
//! Randomised test data generation for MetaAddr.
|
//! Randomised test data generation for MetaAddr.
|
||||||
|
|
||||||
|
use std::time::Instant;
|
||||||
|
|
||||||
use proptest::{arbitrary::any, collection::vec, prelude::*};
|
use proptest::{arbitrary::any, collection::vec, prelude::*};
|
||||||
|
|
||||||
use zebra_chain::{parameters::Network::*, serialization::DateTime32};
|
use zebra_chain::{parameters::Network::*, serialization::DateTime32};
|
||||||
|
|
@ -49,12 +51,20 @@ impl MetaAddr {
|
||||||
///
|
///
|
||||||
/// [1]: super::PeerAddrState::NeverAttemptedAlternate
|
/// [1]: super::PeerAddrState::NeverAttemptedAlternate
|
||||||
pub fn alternate_strategy() -> BoxedStrategy<Self> {
|
pub fn alternate_strategy() -> BoxedStrategy<Self> {
|
||||||
(canonical_peer_addr_strategy(), any::<PeerServices>())
|
(
|
||||||
.prop_map(|(socket_addr, untrusted_services)| {
|
canonical_peer_addr_strategy(),
|
||||||
MetaAddr::new_alternate(socket_addr, &untrusted_services)
|
any::<PeerServices>(),
|
||||||
.into_new_meta_addr()
|
any::<Instant>(),
|
||||||
.expect("unexpected invalid alternate change")
|
any::<DateTime32>(),
|
||||||
})
|
)
|
||||||
|
.prop_map(
|
||||||
|
|(socket_addr, untrusted_services, instant_now, local_now)| {
|
||||||
|
// instant_now is not actually used for this variant,
|
||||||
|
// so we could just provide a default value
|
||||||
|
MetaAddr::new_alternate(socket_addr, &untrusted_services)
|
||||||
|
.into_new_meta_addr(instant_now, local_now)
|
||||||
|
},
|
||||||
|
)
|
||||||
.boxed()
|
.boxed()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -98,22 +108,29 @@ impl MetaAddrChange {
|
||||||
///
|
///
|
||||||
/// [1]: super::NewAlternate
|
/// [1]: super::NewAlternate
|
||||||
pub fn ready_outbound_strategy() -> BoxedStrategy<Self> {
|
pub fn ready_outbound_strategy() -> BoxedStrategy<Self> {
|
||||||
canonical_peer_addr_strategy()
|
(
|
||||||
.prop_filter_map("failed MetaAddr::is_valid_for_outbound", |addr| {
|
canonical_peer_addr_strategy(),
|
||||||
// Alternate nodes use the current time, so they're always ready
|
any::<Instant>(),
|
||||||
//
|
any::<DateTime32>(),
|
||||||
// TODO: create a "Zebra supported services" constant
|
)
|
||||||
let change = MetaAddr::new_alternate(addr, &PeerServices::NODE_NETWORK);
|
.prop_filter_map(
|
||||||
if change
|
"failed MetaAddr::is_valid_for_outbound",
|
||||||
.into_new_meta_addr()
|
|(addr, instant_now, local_now)| {
|
||||||
.expect("unexpected invalid alternate change")
|
// Alternate nodes use the current time, so they're always ready
|
||||||
.last_known_info_is_valid_for_outbound(Mainnet)
|
//
|
||||||
{
|
// TODO: create a "Zebra supported services" constant
|
||||||
Some(change)
|
|
||||||
} else {
|
let change = MetaAddr::new_alternate(addr, &PeerServices::NODE_NETWORK);
|
||||||
None
|
if change
|
||||||
}
|
.into_new_meta_addr(instant_now, local_now)
|
||||||
})
|
.last_known_info_is_valid_for_outbound(Mainnet)
|
||||||
|
{
|
||||||
|
Some(change)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
},
|
||||||
|
)
|
||||||
.boxed()
|
.boxed()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,6 @@ use std::{collections::HashMap, env, net::SocketAddr, str::FromStr, sync::Arc, t
|
||||||
|
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
use proptest::{collection::vec, prelude::*};
|
use proptest::{collection::vec, prelude::*};
|
||||||
use tokio::time::Instant;
|
|
||||||
use tower::service_fn;
|
use tower::service_fn;
|
||||||
use tracing::Span;
|
use tracing::Span;
|
||||||
|
|
||||||
|
|
@ -64,8 +63,12 @@ proptest! {
|
||||||
) {
|
) {
|
||||||
let _init_guard = zebra_test::init();
|
let _init_guard = zebra_test::init();
|
||||||
|
|
||||||
|
let instant_now = std::time::Instant::now();
|
||||||
|
let chrono_now = Utc::now();
|
||||||
|
let local_now: DateTime32 = chrono_now.try_into().expect("will succeed until 2038");
|
||||||
|
|
||||||
for change in changes {
|
for change in changes {
|
||||||
if let Some(changed_addr) = change.apply_to_meta_addr(addr) {
|
if let Some(changed_addr) = change.apply_to_meta_addr(addr, instant_now, chrono_now) {
|
||||||
// untrusted last seen times:
|
// untrusted last seen times:
|
||||||
// check that we replace None with Some, but leave Some unchanged
|
// check that we replace None with Some, but leave Some unchanged
|
||||||
if addr.untrusted_last_seen.is_some() {
|
if addr.untrusted_last_seen.is_some() {
|
||||||
|
|
@ -73,7 +76,7 @@ proptest! {
|
||||||
} else {
|
} else {
|
||||||
prop_assert_eq!(
|
prop_assert_eq!(
|
||||||
changed_addr.untrusted_last_seen,
|
changed_addr.untrusted_last_seen,
|
||||||
change.untrusted_last_seen()
|
change.untrusted_last_seen(local_now)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -112,18 +115,22 @@ proptest! {
|
||||||
|
|
||||||
for change in changes {
|
for change in changes {
|
||||||
while addr.is_ready_for_connection_attempt(instant_now, chrono_now, Mainnet) {
|
while addr.is_ready_for_connection_attempt(instant_now, chrono_now, Mainnet) {
|
||||||
attempt_count += 1;
|
|
||||||
// Assume that this test doesn't last longer than MIN_PEER_RECONNECTION_DELAY
|
|
||||||
prop_assert!(attempt_count <= 1);
|
|
||||||
|
|
||||||
// Simulate an attempt
|
// Simulate an attempt
|
||||||
addr = MetaAddr::new_reconnect(addr.addr)
|
addr = if let Some(addr) = MetaAddr::new_reconnect(addr.addr)
|
||||||
.apply_to_meta_addr(addr)
|
.apply_to_meta_addr(addr, instant_now, chrono_now) {
|
||||||
.expect("unexpected invalid attempt");
|
attempt_count += 1;
|
||||||
|
// Assume that this test doesn't last longer than MIN_PEER_RECONNECTION_DELAY
|
||||||
|
prop_assert!(attempt_count <= 1);
|
||||||
|
addr
|
||||||
|
} else {
|
||||||
|
// Stop updating when an attempt comes too soon after a failure.
|
||||||
|
// In production these are prevented by the dialer code.
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If `change` is invalid for the current MetaAddr state, skip it.
|
// If `change` is invalid for the current MetaAddr state, skip it.
|
||||||
if let Some(changed_addr) = change.apply_to_meta_addr(addr) {
|
if let Some(changed_addr) = change.apply_to_meta_addr(addr, instant_now, chrono_now) {
|
||||||
prop_assert_eq!(changed_addr.addr, addr.addr);
|
prop_assert_eq!(changed_addr.addr, addr.addr);
|
||||||
addr = changed_addr;
|
addr = changed_addr;
|
||||||
}
|
}
|
||||||
|
|
@ -155,7 +162,7 @@ proptest! {
|
||||||
);
|
);
|
||||||
let sanitized_addrs = address_book.sanitized(chrono_now);
|
let sanitized_addrs = address_book.sanitized(chrono_now);
|
||||||
|
|
||||||
let expected_local_listener = address_book.local_listener_meta_addr();
|
let expected_local_listener = address_book.local_listener_meta_addr(chrono_now);
|
||||||
let canonical_local_listener = canonical_peer_addr(local_listener);
|
let canonical_local_listener = canonical_peer_addr(local_listener);
|
||||||
let book_sanitized_local_listener = sanitized_addrs
|
let book_sanitized_local_listener = sanitized_addrs
|
||||||
.iter()
|
.iter()
|
||||||
|
|
@ -186,9 +193,12 @@ proptest! {
|
||||||
|
|
||||||
let local_listener = "0.0.0.0:0".parse().expect("unexpected invalid SocketAddr");
|
let local_listener = "0.0.0.0:0".parse().expect("unexpected invalid SocketAddr");
|
||||||
|
|
||||||
|
let instant_now = std::time::Instant::now();
|
||||||
|
let chrono_now = Utc::now();
|
||||||
|
|
||||||
for change in changes {
|
for change in changes {
|
||||||
// Check direct application
|
// Check direct application
|
||||||
let new_addr = change.apply_to_meta_addr(None);
|
let new_addr = change.apply_to_meta_addr(None, instant_now, chrono_now);
|
||||||
|
|
||||||
prop_assert!(
|
prop_assert!(
|
||||||
new_addr.is_some(),
|
new_addr.is_some(),
|
||||||
|
|
@ -328,7 +338,7 @@ proptest! {
|
||||||
tokio::time::pause();
|
tokio::time::pause();
|
||||||
|
|
||||||
// The earliest time we can have a valid next attempt for this peer
|
// The earliest time we can have a valid next attempt for this peer
|
||||||
let earliest_next_attempt = Instant::now() + MIN_PEER_RECONNECTION_DELAY;
|
let earliest_next_attempt = tokio::time::Instant::now() + MIN_PEER_RECONNECTION_DELAY;
|
||||||
|
|
||||||
// The number of attempts for this peer in the last MIN_PEER_RECONNECTION_DELAY
|
// The number of attempts for this peer in the last MIN_PEER_RECONNECTION_DELAY
|
||||||
let mut attempt_count: usize = 0;
|
let mut attempt_count: usize = 0;
|
||||||
|
|
@ -349,7 +359,7 @@ proptest! {
|
||||||
original addr was in address book: {}\n",
|
original addr was in address book: {}\n",
|
||||||
candidate_addr,
|
candidate_addr,
|
||||||
i,
|
i,
|
||||||
Instant::now(),
|
tokio::time::Instant::now(),
|
||||||
earliest_next_attempt,
|
earliest_next_attempt,
|
||||||
attempt_count,
|
attempt_count,
|
||||||
LIVE_PEER_INTERVALS,
|
LIVE_PEER_INTERVALS,
|
||||||
|
|
@ -365,7 +375,7 @@ proptest! {
|
||||||
address_book.clone().lock().unwrap().update(change);
|
address_book.clone().lock().unwrap().update(change);
|
||||||
|
|
||||||
tokio::time::advance(peer_change_interval).await;
|
tokio::time::advance(peer_change_interval).await;
|
||||||
if Instant::now() >= earliest_next_attempt {
|
if tokio::time::Instant::now() >= earliest_next_attempt {
|
||||||
attempt_count = 0;
|
attempt_count = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -423,20 +433,24 @@ proptest! {
|
||||||
let change = changes.get(change_index);
|
let change = changes.get(change_index);
|
||||||
|
|
||||||
while addr.is_ready_for_connection_attempt(instant_now, chrono_now, Mainnet) {
|
while addr.is_ready_for_connection_attempt(instant_now, chrono_now, Mainnet) {
|
||||||
*attempt_counts.entry(addr.addr).or_default() += 1;
|
|
||||||
prop_assert!(
|
|
||||||
*attempt_counts.get(&addr.addr).unwrap() <= LIVE_PEER_INTERVALS + 1
|
|
||||||
);
|
|
||||||
|
|
||||||
// Simulate an attempt
|
// Simulate an attempt
|
||||||
*addr = MetaAddr::new_reconnect(addr.addr)
|
*addr = if let Some(addr) = MetaAddr::new_reconnect(addr.addr)
|
||||||
.apply_to_meta_addr(*addr)
|
.apply_to_meta_addr(*addr, instant_now, chrono_now) {
|
||||||
.expect("unexpected invalid attempt");
|
*attempt_counts.entry(addr.addr).or_default() += 1;
|
||||||
|
prop_assert!(
|
||||||
|
*attempt_counts.get(&addr.addr).unwrap() <= LIVE_PEER_INTERVALS + 1
|
||||||
|
);
|
||||||
|
addr
|
||||||
|
} else {
|
||||||
|
// Stop updating when an attempt comes too soon after a failure.
|
||||||
|
// In production these are prevented by the dialer code.
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If `change` is invalid for the current MetaAddr state, skip it.
|
// If `change` is invalid for the current MetaAddr state, skip it.
|
||||||
// If we've run out of changes for this addr, do nothing.
|
// If we've run out of changes for this addr, do nothing.
|
||||||
if let Some(changed_addr) = change.and_then(|change| change.apply_to_meta_addr(*addr))
|
if let Some(changed_addr) = change.and_then(|change| change.apply_to_meta_addr(*addr, instant_now, chrono_now))
|
||||||
{
|
{
|
||||||
prop_assert_eq!(changed_addr.addr, addr.addr);
|
prop_assert_eq!(changed_addr.addr, addr.addr);
|
||||||
*addr = changed_addr;
|
*addr = changed_addr;
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,7 @@
|
||||||
//! Fixed test cases for MetaAddr and MetaAddrChange.
|
//! Fixed test cases for MetaAddr and MetaAddrChange.
|
||||||
|
|
||||||
|
use std::time::Instant;
|
||||||
|
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
|
|
||||||
use zebra_chain::{
|
use zebra_chain::{
|
||||||
|
|
@ -7,7 +9,11 @@ use zebra_chain::{
|
||||||
serialization::{DateTime32, Duration32},
|
serialization::{DateTime32, Duration32},
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::{constants::MAX_PEER_ACTIVE_FOR_GOSSIP, protocol::types::PeerServices, PeerSocketAddr};
|
use crate::{
|
||||||
|
constants::{CONCURRENT_ADDRESS_CHANGE_PERIOD, MAX_PEER_ACTIVE_FOR_GOSSIP},
|
||||||
|
protocol::types::PeerServices,
|
||||||
|
PeerSocketAddr,
|
||||||
|
};
|
||||||
|
|
||||||
use super::{super::MetaAddr, check};
|
use super::{super::MetaAddr, check};
|
||||||
|
|
||||||
|
|
@ -57,12 +63,13 @@ fn sanitize_extremes() {
|
||||||
fn new_local_listener_is_gossipable() {
|
fn new_local_listener_is_gossipable() {
|
||||||
let _init_guard = zebra_test::init();
|
let _init_guard = zebra_test::init();
|
||||||
|
|
||||||
|
let instant_now = Instant::now();
|
||||||
let chrono_now = Utc::now();
|
let chrono_now = Utc::now();
|
||||||
|
let local_now: DateTime32 = chrono_now.try_into().expect("will succeed until 2038");
|
||||||
|
|
||||||
let address = PeerSocketAddr::from(([192, 168, 180, 9], 10_000));
|
let address = PeerSocketAddr::from(([192, 168, 180, 9], 10_000));
|
||||||
let peer = MetaAddr::new_local_listener_change(address)
|
let peer =
|
||||||
.into_new_meta_addr()
|
MetaAddr::new_local_listener_change(address).into_new_meta_addr(instant_now, local_now);
|
||||||
.expect("MetaAddrChange can't create a new MetaAddr");
|
|
||||||
|
|
||||||
assert!(peer.is_active_for_gossip(chrono_now));
|
assert!(peer.is_active_for_gossip(chrono_now));
|
||||||
}
|
}
|
||||||
|
|
@ -75,12 +82,13 @@ fn new_local_listener_is_gossipable() {
|
||||||
fn new_alternate_peer_address_is_not_gossipable() {
|
fn new_alternate_peer_address_is_not_gossipable() {
|
||||||
let _init_guard = zebra_test::init();
|
let _init_guard = zebra_test::init();
|
||||||
|
|
||||||
|
let instant_now = Instant::now();
|
||||||
let chrono_now = Utc::now();
|
let chrono_now = Utc::now();
|
||||||
|
let local_now: DateTime32 = chrono_now.try_into().expect("will succeed until 2038");
|
||||||
|
|
||||||
let address = PeerSocketAddr::from(([192, 168, 180, 9], 10_000));
|
let address = PeerSocketAddr::from(([192, 168, 180, 9], 10_000));
|
||||||
let peer = MetaAddr::new_alternate(address, &PeerServices::NODE_NETWORK)
|
let peer = MetaAddr::new_alternate(address, &PeerServices::NODE_NETWORK)
|
||||||
.into_new_meta_addr()
|
.into_new_meta_addr(instant_now, local_now);
|
||||||
.expect("MetaAddrChange can't create a new MetaAddr");
|
|
||||||
|
|
||||||
assert!(!peer.is_active_for_gossip(chrono_now));
|
assert!(!peer.is_active_for_gossip(chrono_now));
|
||||||
}
|
}
|
||||||
|
|
@ -153,16 +161,17 @@ fn gossiped_peer_reportedly_seen_long_ago_is_not_gossipable() {
|
||||||
fn recently_responded_peer_is_gossipable() {
|
fn recently_responded_peer_is_gossipable() {
|
||||||
let _init_guard = zebra_test::init();
|
let _init_guard = zebra_test::init();
|
||||||
|
|
||||||
|
let instant_now = Instant::now();
|
||||||
let chrono_now = Utc::now();
|
let chrono_now = Utc::now();
|
||||||
|
let local_now: DateTime32 = chrono_now.try_into().expect("will succeed until 2038");
|
||||||
|
|
||||||
let address = PeerSocketAddr::from(([192, 168, 180, 9], 10_000));
|
let address = PeerSocketAddr::from(([192, 168, 180, 9], 10_000));
|
||||||
let peer_seed = MetaAddr::new_alternate(address, &PeerServices::NODE_NETWORK)
|
let peer_seed = MetaAddr::new_alternate(address, &PeerServices::NODE_NETWORK)
|
||||||
.into_new_meta_addr()
|
.into_new_meta_addr(instant_now, local_now);
|
||||||
.expect("MetaAddrChange can't create a new MetaAddr");
|
|
||||||
|
|
||||||
// Create a peer that has responded
|
// Create a peer that has responded
|
||||||
let peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK)
|
let peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK)
|
||||||
.apply_to_meta_addr(peer_seed)
|
.apply_to_meta_addr(peer_seed, instant_now, chrono_now)
|
||||||
.expect("Failed to create MetaAddr for responded peer");
|
.expect("Failed to create MetaAddr for responded peer");
|
||||||
|
|
||||||
assert!(peer.is_active_for_gossip(chrono_now));
|
assert!(peer.is_active_for_gossip(chrono_now));
|
||||||
|
|
@ -173,16 +182,17 @@ fn recently_responded_peer_is_gossipable() {
|
||||||
fn not_so_recently_responded_peer_is_still_gossipable() {
|
fn not_so_recently_responded_peer_is_still_gossipable() {
|
||||||
let _init_guard = zebra_test::init();
|
let _init_guard = zebra_test::init();
|
||||||
|
|
||||||
|
let instant_now = Instant::now();
|
||||||
let chrono_now = Utc::now();
|
let chrono_now = Utc::now();
|
||||||
|
let local_now: DateTime32 = chrono_now.try_into().expect("will succeed until 2038");
|
||||||
|
|
||||||
let address = PeerSocketAddr::from(([192, 168, 180, 9], 10_000));
|
let address = PeerSocketAddr::from(([192, 168, 180, 9], 10_000));
|
||||||
let peer_seed = MetaAddr::new_alternate(address, &PeerServices::NODE_NETWORK)
|
let peer_seed = MetaAddr::new_alternate(address, &PeerServices::NODE_NETWORK)
|
||||||
.into_new_meta_addr()
|
.into_new_meta_addr(instant_now, local_now);
|
||||||
.expect("MetaAddrChange can't create a new MetaAddr");
|
|
||||||
|
|
||||||
// Create a peer that has responded
|
// Create a peer that has responded
|
||||||
let mut peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK)
|
let mut peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK)
|
||||||
.apply_to_meta_addr(peer_seed)
|
.apply_to_meta_addr(peer_seed, instant_now, chrono_now)
|
||||||
.expect("Failed to create MetaAddr for responded peer");
|
.expect("Failed to create MetaAddr for responded peer");
|
||||||
|
|
||||||
// Tweak the peer's last response time to be within the limits of the reachable duration
|
// Tweak the peer's last response time to be within the limits of the reachable duration
|
||||||
|
|
@ -203,16 +213,17 @@ fn not_so_recently_responded_peer_is_still_gossipable() {
|
||||||
fn responded_long_ago_peer_is_not_gossipable() {
|
fn responded_long_ago_peer_is_not_gossipable() {
|
||||||
let _init_guard = zebra_test::init();
|
let _init_guard = zebra_test::init();
|
||||||
|
|
||||||
|
let instant_now = Instant::now();
|
||||||
let chrono_now = Utc::now();
|
let chrono_now = Utc::now();
|
||||||
|
let local_now: DateTime32 = chrono_now.try_into().expect("will succeed until 2038");
|
||||||
|
|
||||||
let address = PeerSocketAddr::from(([192, 168, 180, 9], 10_000));
|
let address = PeerSocketAddr::from(([192, 168, 180, 9], 10_000));
|
||||||
let peer_seed = MetaAddr::new_alternate(address, &PeerServices::NODE_NETWORK)
|
let peer_seed = MetaAddr::new_alternate(address, &PeerServices::NODE_NETWORK)
|
||||||
.into_new_meta_addr()
|
.into_new_meta_addr(instant_now, local_now);
|
||||||
.expect("MetaAddrChange can't create a new MetaAddr");
|
|
||||||
|
|
||||||
// Create a peer that has responded
|
// Create a peer that has responded
|
||||||
let mut peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK)
|
let mut peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK)
|
||||||
.apply_to_meta_addr(peer_seed)
|
.apply_to_meta_addr(peer_seed, instant_now, chrono_now)
|
||||||
.expect("Failed to create MetaAddr for responded peer");
|
.expect("Failed to create MetaAddr for responded peer");
|
||||||
|
|
||||||
// Tweak the peer's last response time to be outside the limits of the reachable duration
|
// Tweak the peer's last response time to be outside the limits of the reachable duration
|
||||||
|
|
@ -227,3 +238,210 @@ fn responded_long_ago_peer_is_not_gossipable() {
|
||||||
|
|
||||||
assert!(!peer.is_active_for_gossip(chrono_now));
|
assert!(!peer.is_active_for_gossip(chrono_now));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Test that a change that is delayed for a long time is not applied to the address state.
|
||||||
|
#[test]
|
||||||
|
fn long_delayed_change_is_not_applied() {
|
||||||
|
let _init_guard = zebra_test::init();
|
||||||
|
|
||||||
|
let instant_now = Instant::now();
|
||||||
|
let chrono_now = Utc::now();
|
||||||
|
let local_now: DateTime32 = chrono_now.try_into().expect("will succeed until 2038");
|
||||||
|
|
||||||
|
let address = PeerSocketAddr::from(([192, 168, 180, 9], 10_000));
|
||||||
|
let peer_seed = MetaAddr::new_alternate(address, &PeerServices::NODE_NETWORK)
|
||||||
|
.into_new_meta_addr(instant_now, local_now);
|
||||||
|
|
||||||
|
// Create a peer that has responded
|
||||||
|
let peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK)
|
||||||
|
.apply_to_meta_addr(peer_seed, instant_now, chrono_now)
|
||||||
|
.expect("Failed to create MetaAddr for responded peer");
|
||||||
|
|
||||||
|
// Create an earlier change to Failed that has been delayed a long time.
|
||||||
|
// Failed typically comes after Responded, so it will pass the connection progress check.
|
||||||
|
//
|
||||||
|
// This is very unlikely in the May 2023 production code,
|
||||||
|
// but it can happen due to getting the time, then waiting for the address book mutex.
|
||||||
|
|
||||||
|
// Create some change times that are much earlier
|
||||||
|
let instant_early = instant_now - (CONCURRENT_ADDRESS_CHANGE_PERIOD * 3);
|
||||||
|
let chrono_early = chrono_now
|
||||||
|
- chrono::Duration::from_std(CONCURRENT_ADDRESS_CHANGE_PERIOD * 3)
|
||||||
|
.expect("constant is valid");
|
||||||
|
|
||||||
|
let change = MetaAddr::new_errored(address, PeerServices::NODE_NETWORK);
|
||||||
|
let outcome = change.apply_to_meta_addr(peer, instant_early, chrono_early);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
outcome, None,
|
||||||
|
"\n\
|
||||||
|
unexpected application of a much earlier change to a peer:\n\
|
||||||
|
change: {change:?}\n\
|
||||||
|
times: {instant_early:?} {chrono_early}\n\
|
||||||
|
peer: {peer:?}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Test that a change that happens a long time after the previous change
|
||||||
|
/// is applied to the address state, even if it is a revert.
|
||||||
|
#[test]
|
||||||
|
fn later_revert_change_is_applied() {
|
||||||
|
let _init_guard = zebra_test::init();
|
||||||
|
|
||||||
|
let instant_now = Instant::now();
|
||||||
|
let chrono_now = Utc::now();
|
||||||
|
let local_now: DateTime32 = chrono_now.try_into().expect("will succeed until 2038");
|
||||||
|
|
||||||
|
let address = PeerSocketAddr::from(([192, 168, 180, 9], 10_000));
|
||||||
|
let peer_seed = MetaAddr::new_alternate(address, &PeerServices::NODE_NETWORK)
|
||||||
|
.into_new_meta_addr(instant_now, local_now);
|
||||||
|
|
||||||
|
// Create a peer that has responded
|
||||||
|
let peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK)
|
||||||
|
.apply_to_meta_addr(peer_seed, instant_now, chrono_now)
|
||||||
|
.expect("Failed to create MetaAddr for responded peer");
|
||||||
|
|
||||||
|
// Create an earlier change to AttemptPending that happens a long time later.
|
||||||
|
// AttemptPending typically comes before Responded, so it will fail the connection progress
|
||||||
|
// check, but that failure should be ignored because it is not concurrent.
|
||||||
|
//
|
||||||
|
// This is a typical reconnect in production.
|
||||||
|
|
||||||
|
// Create some change times that are much later
|
||||||
|
let instant_late = instant_now + (CONCURRENT_ADDRESS_CHANGE_PERIOD * 3);
|
||||||
|
let chrono_late = chrono_now
|
||||||
|
+ chrono::Duration::from_std(CONCURRENT_ADDRESS_CHANGE_PERIOD * 3)
|
||||||
|
.expect("constant is valid");
|
||||||
|
|
||||||
|
let change = MetaAddr::new_reconnect(address);
|
||||||
|
let outcome = change.apply_to_meta_addr(peer, instant_late, chrono_late);
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
outcome.is_some(),
|
||||||
|
"\n\
|
||||||
|
unexpected skipped much later change to a peer:\n\
|
||||||
|
change: {change:?}\n\
|
||||||
|
times: {instant_late:?} {chrono_late}\n\
|
||||||
|
peer: {peer:?}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Test that a concurrent change which reverses the connection state is not applied.
|
||||||
|
#[test]
|
||||||
|
fn concurrent_state_revert_change_is_not_applied() {
|
||||||
|
let _init_guard = zebra_test::init();
|
||||||
|
|
||||||
|
let instant_now = Instant::now();
|
||||||
|
let chrono_now = Utc::now();
|
||||||
|
let local_now: DateTime32 = chrono_now.try_into().expect("will succeed until 2038");
|
||||||
|
|
||||||
|
let address = PeerSocketAddr::from(([192, 168, 180, 9], 10_000));
|
||||||
|
let peer_seed = MetaAddr::new_alternate(address, &PeerServices::NODE_NETWORK)
|
||||||
|
.into_new_meta_addr(instant_now, local_now);
|
||||||
|
|
||||||
|
// Create a peer that has responded
|
||||||
|
let peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK)
|
||||||
|
.apply_to_meta_addr(peer_seed, instant_now, chrono_now)
|
||||||
|
.expect("Failed to create MetaAddr for responded peer");
|
||||||
|
|
||||||
|
// Create a concurrent change to AttemptPending.
|
||||||
|
// AttemptPending typically comes before Responded, so it will fail the progress check.
|
||||||
|
//
|
||||||
|
// This is likely to happen in production, it just requires a short delay in the earlier change.
|
||||||
|
|
||||||
|
// Create some change times that are earlier but concurrent
|
||||||
|
let instant_early = instant_now - (CONCURRENT_ADDRESS_CHANGE_PERIOD / 2);
|
||||||
|
let chrono_early = chrono_now
|
||||||
|
- chrono::Duration::from_std(CONCURRENT_ADDRESS_CHANGE_PERIOD / 2)
|
||||||
|
.expect("constant is valid");
|
||||||
|
|
||||||
|
let change = MetaAddr::new_reconnect(address);
|
||||||
|
let outcome = change.apply_to_meta_addr(peer, instant_early, chrono_early);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
outcome, None,
|
||||||
|
"\n\
|
||||||
|
unexpected application of an early concurrent change to a peer:\n\
|
||||||
|
change: {change:?}\n\
|
||||||
|
times: {instant_early:?} {chrono_early}\n\
|
||||||
|
peer: {peer:?}"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Create some change times that are later but concurrent
|
||||||
|
let instant_late = instant_now + (CONCURRENT_ADDRESS_CHANGE_PERIOD / 2);
|
||||||
|
let chrono_late = chrono_now
|
||||||
|
+ chrono::Duration::from_std(CONCURRENT_ADDRESS_CHANGE_PERIOD / 2)
|
||||||
|
.expect("constant is valid");
|
||||||
|
|
||||||
|
let change = MetaAddr::new_reconnect(address);
|
||||||
|
let outcome = change.apply_to_meta_addr(peer, instant_late, chrono_late);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
outcome, None,
|
||||||
|
"\n\
|
||||||
|
unexpected application of a late concurrent change to a peer:\n\
|
||||||
|
change: {change:?}\n\
|
||||||
|
times: {instant_late:?} {chrono_late}\n\
|
||||||
|
peer: {peer:?}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Test that a concurrent change which progresses the connection state is applied.
|
||||||
|
#[test]
|
||||||
|
fn concurrent_state_progress_change_is_applied() {
|
||||||
|
let _init_guard = zebra_test::init();
|
||||||
|
|
||||||
|
let instant_now = Instant::now();
|
||||||
|
let chrono_now = Utc::now();
|
||||||
|
let local_now: DateTime32 = chrono_now.try_into().expect("will succeed until 2038");
|
||||||
|
|
||||||
|
let address = PeerSocketAddr::from(([192, 168, 180, 9], 10_000));
|
||||||
|
let peer_seed = MetaAddr::new_alternate(address, &PeerServices::NODE_NETWORK)
|
||||||
|
.into_new_meta_addr(instant_now, local_now);
|
||||||
|
|
||||||
|
// Create a peer that has responded
|
||||||
|
let peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK)
|
||||||
|
.apply_to_meta_addr(peer_seed, instant_now, chrono_now)
|
||||||
|
.expect("Failed to create MetaAddr for responded peer");
|
||||||
|
|
||||||
|
// Create a concurrent change to Failed.
|
||||||
|
// Failed typically comes after Responded, so it will pass the progress check.
|
||||||
|
//
|
||||||
|
// This is a typical update in production.
|
||||||
|
|
||||||
|
// Create some change times that are earlier but concurrent
|
||||||
|
let instant_early = instant_now - (CONCURRENT_ADDRESS_CHANGE_PERIOD / 2);
|
||||||
|
let chrono_early = chrono_now
|
||||||
|
- chrono::Duration::from_std(CONCURRENT_ADDRESS_CHANGE_PERIOD / 2)
|
||||||
|
.expect("constant is valid");
|
||||||
|
|
||||||
|
let change = MetaAddr::new_errored(address, None);
|
||||||
|
let outcome = change.apply_to_meta_addr(peer, instant_early, chrono_early);
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
outcome.is_some(),
|
||||||
|
"\n\
|
||||||
|
unexpected skipped early concurrent change to a peer:\n\
|
||||||
|
change: {change:?}\n\
|
||||||
|
times: {instant_early:?} {chrono_early}\n\
|
||||||
|
peer: {peer:?}"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Create some change times that are later but concurrent
|
||||||
|
let instant_late = instant_now + (CONCURRENT_ADDRESS_CHANGE_PERIOD / 2);
|
||||||
|
let chrono_late = chrono_now
|
||||||
|
+ chrono::Duration::from_std(CONCURRENT_ADDRESS_CHANGE_PERIOD / 2)
|
||||||
|
.expect("constant is valid");
|
||||||
|
|
||||||
|
let change = MetaAddr::new_errored(address, None);
|
||||||
|
let outcome = change.apply_to_meta_addr(peer, instant_late, chrono_late);
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
outcome.is_some(),
|
||||||
|
"\n\
|
||||||
|
unexpected skipped late concurrent change to a peer:\n\
|
||||||
|
change: {change:?}\n\
|
||||||
|
times: {instant_late:?} {chrono_late}\n\
|
||||||
|
peer: {peer:?}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -1145,7 +1145,7 @@ async fn self_connections_should_fail() {
|
||||||
.lock()
|
.lock()
|
||||||
.expect("unexpected panic in address book");
|
.expect("unexpected panic in address book");
|
||||||
|
|
||||||
let real_self_listener = unlocked_address_book.local_listener_meta_addr();
|
let real_self_listener = unlocked_address_book.local_listener_meta_addr(Utc::now());
|
||||||
|
|
||||||
// Set a fake listener to get past the check for adding our own address
|
// Set a fake listener to get past the check for adding our own address
|
||||||
unlocked_address_book.set_local_listener("192.168.0.0:1".parse().unwrap());
|
unlocked_address_book.set_local_listener("192.168.0.0:1".parse().unwrap());
|
||||||
|
|
@ -1384,7 +1384,10 @@ async fn local_listener_port_with(listen_addr: SocketAddr, network: Network) {
|
||||||
"Test user agent".to_string(),
|
"Test user agent".to_string(),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
let local_listener = address_book.lock().unwrap().local_listener_meta_addr();
|
let local_listener = address_book
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.local_listener_meta_addr(Utc::now());
|
||||||
|
|
||||||
if listen_addr.port() == 0 {
|
if listen_addr.port() == 0 {
|
||||||
assert_ne!(
|
assert_ne!(
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,10 @@
|
||||||
//! cargo insta test --review --features getblocktemplate-rpcs --delete-unreferenced-snapshots
|
//! cargo insta test --review --features getblocktemplate-rpcs --delete-unreferenced-snapshots
|
||||||
//! ```
|
//! ```
|
||||||
|
|
||||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
use std::{
|
||||||
|
net::{IpAddr, Ipv4Addr, SocketAddr},
|
||||||
|
time::Instant,
|
||||||
|
};
|
||||||
|
|
||||||
use hex::FromHex;
|
use hex::FromHex;
|
||||||
use insta::Settings;
|
use insta::Settings;
|
||||||
|
|
@ -133,8 +136,7 @@ pub async fn test_responses<State, ReadState>(
|
||||||
)
|
)
|
||||||
.into(),
|
.into(),
|
||||||
)
|
)
|
||||||
.into_new_meta_addr()
|
.into_new_meta_addr(Instant::now(), DateTime32::now())]);
|
||||||
.unwrap()]);
|
|
||||||
|
|
||||||
// get an rpc instance with continuous blockchain state
|
// get an rpc instance with continuous blockchain state
|
||||||
let get_block_template_rpc = GetBlockTemplateRpcImpl::new(
|
let get_block_template_rpc = GetBlockTemplateRpcImpl::new(
|
||||||
|
|
|
||||||
|
|
@ -951,8 +951,10 @@ async fn rpc_getpeerinfo() {
|
||||||
)
|
)
|
||||||
.into(),
|
.into(),
|
||||||
)
|
)
|
||||||
.into_new_meta_addr()
|
.into_new_meta_addr(
|
||||||
.unwrap();
|
std::time::Instant::now(),
|
||||||
|
zebra_chain::serialization::DateTime32::now(),
|
||||||
|
);
|
||||||
|
|
||||||
let mock_address_book = MockAddressBookPeers::new(vec![mock_peer_address]);
|
let mock_address_book = MockAddressBookPeers::new(vec![mock_peer_address]);
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue