From 8af4e572c985cd2e37844b77e61ee825f7a4f650 Mon Sep 17 00:00:00 2001 From: teor Date: Thu, 25 May 2023 09:53:53 +1000 Subject: [PATCH] fix(network): Ignore out of order Address Book changes, unless they are concurrent (#6717) * Ignore out of order Address Book changes, and restructure the function * Handle concurrent changes using the connection state machine order * Handle out of order changes correctly * Pass times through the call stack so they are consistent in tests * Add time arguments to tests * Fix tests that were broken by the address order checks * fastmod wall_ local_ zebra* * cargo fmt --all * Fix a bug in the concurrent change check * Test all the new apply and skip checks for address changes * Document more edge cases and increase the concurrency time slightly * Simplify enum ordering matches * Fix comment typos Co-authored-by: Arya --------- Co-authored-by: Arya --- zebra-network/src/address_book.rs | 13 +- zebra-network/src/constants.rs | 18 + zebra-network/src/meta_addr.rs | 354 +++++++++++++----- zebra-network/src/meta_addr/arbitrary.rs | 61 +-- zebra-network/src/meta_addr/tests/prop.rs | 64 ++-- zebra-network/src/meta_addr/tests/vectors.rs | 248 +++++++++++- .../src/peer_set/initialize/tests/vectors.rs | 7 +- .../tests/snapshot/get_block_template_rpcs.rs | 8 +- zebra-rpc/src/methods/tests/vectors.rs | 6 +- 9 files changed, 613 insertions(+), 166 deletions(-) diff --git a/zebra-network/src/address_book.rs b/zebra-network/src/address_book.rs index 02209280..fc0bda70 100644 --- a/zebra-network/src/address_book.rs +++ b/zebra-network/src/address_book.rs @@ -14,7 +14,7 @@ use ordered_map::OrderedMap; use tokio::sync::watch; use tracing::Span; -use zebra_chain::parameters::Network; +use zebra_chain::{parameters::Network, serialization::DateTime32}; use crate::{ constants, @@ -228,10 +228,11 @@ impl AddressBook { /// Get the local listener address. /// /// This address contains minimal state, but it is not sanitized. - pub fn local_listener_meta_addr(&self) -> MetaAddr { + pub fn local_listener_meta_addr(&self, now: chrono::DateTime) -> MetaAddr { + let now: DateTime32 = now.try_into().expect("will succeed until 2038"); + MetaAddr::new_local_listener_change(self.local_listener) - .into_new_meta_addr() - .expect("unexpected invalid new local listener addr") + .local_listener_into_new_meta_addr(now) } /// Get the local listener [`SocketAddr`]. @@ -249,7 +250,7 @@ impl AddressBook { // Unconditionally add our local listener address to the advertised peers, // to replace any self-connection failures. The address book and change // constructors make sure that the SocketAddr is canonical. - let local_listener = self.local_listener_meta_addr(); + let local_listener = self.local_listener_meta_addr(now); peers.insert(local_listener.addr, local_listener); // Then sanitize and shuffle @@ -313,7 +314,7 @@ impl AddressBook { let instant_now = Instant::now(); let chrono_now = Utc::now(); - let updated = change.apply_to_meta_addr(previous); + let updated = change.apply_to_meta_addr(previous, instant_now, chrono_now); trace!( ?change, diff --git a/zebra-network/src/constants.rs b/zebra-network/src/constants.rs index 67e73874..c6dfc0a3 100644 --- a/zebra-network/src/constants.rs +++ b/zebra-network/src/constants.rs @@ -90,6 +90,24 @@ pub const REQUEST_TIMEOUT: Duration = Duration::from_secs(20); /// nodes, and on testnet. pub const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(3); +/// The maximum time difference for two address book changes to be considered concurrent. +/// +/// This prevents simultaneous or nearby important changes or connection progress +/// being overridden by less important changes. +/// +/// This timeout should be less than: +/// - the [peer reconnection delay](MIN_PEER_RECONNECTION_DELAY), and +/// - the [peer keepalive/heartbeat interval](HEARTBEAT_INTERVAL). +/// +/// But more than: +/// - the amount of time between connection events and address book updates, +/// even under heavy load (in tests, we have observed delays up to 500ms), +/// - the delay between an outbound connection failing, +/// and the [CandidateSet](crate::peer_set::CandidateSet) registering the failure, and +/// - the delay between the application closing a connection, +/// and any remaining positive changes from the peer. +pub const CONCURRENT_ADDRESS_CHANGE_PERIOD: Duration = Duration::from_secs(5); + /// We expect to receive a message from a live peer at least once in this time duration. /// /// This is the sum of: diff --git a/zebra-network/src/meta_addr.rs b/zebra-network/src/meta_addr.rs index 3d9657fc..6fbd4e7e 100644 --- a/zebra-network/src/meta_addr.rs +++ b/zebra-network/src/meta_addr.rs @@ -1,7 +1,7 @@ //! An address-with-metadata type used in Bitcoin networking. use std::{ - cmp::{Ord, Ordering}, + cmp::{max, Ord, Ordering}, time::Instant, }; @@ -79,6 +79,38 @@ impl PeerAddrState { AttemptPending | Responded | Failed => false, } } + + /// Returns the typical connection state machine order of `self` and `other`. + /// Partially ordered states are sorted in connection attempt order. + /// + /// See [`MetaAddrChange::apply_to_meta_addr()`] for more details. + fn connection_state_order(&self, other: &Self) -> Ordering { + use Ordering::*; + match (self, other) { + _ if self == other => Equal, + // Peers start in one of the "never attempted" states, + // then typically progress towards a "responded" or "failed" state. + // + // # Security + // + // Prefer gossiped addresses to alternate addresses, + // so that peers can't replace the addresses of other peers. + // (This is currently checked explicitly by the address update code, + // but we respect the same order here as a precaution.) + (NeverAttemptedAlternate, _) => Less, + (_, NeverAttemptedAlternate) => Greater, + (NeverAttemptedGossiped, _) => Less, + (_, NeverAttemptedGossiped) => Greater, + (AttemptPending, _) => Less, + (_, AttemptPending) => Greater, + (Responded, _) => Less, + (_, Responded) => Greater, + // These patterns are redundant, but Rust doesn't assume that `==` is reflexive, + // so the first is still required (but unreachable). + (Failed, _) => Less, + //(_, Failed) => Greater, + } + } } // non-test code should explicitly specify the peer address state @@ -100,11 +132,7 @@ impl Ord for PeerAddrState { fn cmp(&self, other: &Self) -> Ordering { use Ordering::*; match (self, other) { - (Responded, Responded) - | (Failed, Failed) - | (NeverAttemptedGossiped, NeverAttemptedGossiped) - | (NeverAttemptedAlternate, NeverAttemptedAlternate) - | (AttemptPending, AttemptPending) => Equal, + _ if self == other => Equal, // We reconnect to `Responded` peers that have stopped sending messages, // then `NeverAttempted` peers, then `Failed` peers (Responded, _) => Less, @@ -115,7 +143,10 @@ impl Ord for PeerAddrState { (_, NeverAttemptedAlternate) => Greater, (Failed, _) => Less, (_, Failed) => Greater, - // AttemptPending is covered by the other cases + // These patterns are redundant, but Rust doesn't assume that `==` is reflexive, + // so the first is still required (but unreachable). + (AttemptPending, _) => Less, + //(_, AttemptPending) => Greater, } } } @@ -195,6 +226,9 @@ pub struct MetaAddr { #[derive(Copy, Clone, Debug, Eq, PartialEq)] #[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))] pub enum MetaAddrChange { + // TODO: + // - split the common `addr` field into an outer struct + // /// Creates a `MetaAddr` for an initial peer. NewInitial { #[cfg_attr( @@ -694,7 +728,7 @@ impl MetaAddrChange { } /// Return the untrusted last seen time for this change, if available. - pub fn untrusted_last_seen(&self) -> Option { + pub fn untrusted_last_seen(&self, now: DateTime32) -> Option { match self { NewInitial { .. } => None, NewGossiped { @@ -703,15 +737,34 @@ impl MetaAddrChange { } => Some(*untrusted_last_seen), NewAlternate { .. } => None, // We know that our local listener is available - NewLocal { .. } => Some(DateTime32::now()), + NewLocal { .. } => Some(now), UpdateAttempt { .. } => None, UpdateResponded { .. } => None, UpdateFailed { .. } => None, } } + // # Concurrency + // + // We assign a time to each change when it is applied to the address book by either the + // address book updater or candidate set tasks. This is the time that the change was received + // from the updater channel, rather than the time that the message was read from the peer + // connection. + // + // Since the connection tasks run concurrently in an unspecified order, and the address book + // updater runs in a separate thread, these times are almost always very similar. If Zebra's + // address book is under load, we should use lower rate-limits for new inbound or outbound + // connections, disconnections, peer gossip crawls, or peer `UpdateResponded` updates. + // + // TODO: + // - move the time API calls from `impl MetaAddrChange` `last_*()` methods: + // - if they impact performance, call them once in the address book updater task, + // then apply them to all the waiting changes + // - otherwise, move them to the `impl MetaAddrChange` `new_*()` methods, + // so they are called in the connection tasks + // /// Return the last attempt for this change, if available. - pub fn last_attempt(&self) -> Option { + pub fn last_attempt(&self, now: Instant) -> Option { match self { NewInitial { .. } => None, NewGossiped { .. } => None, @@ -720,14 +773,14 @@ impl MetaAddrChange { // Attempt changes are applied before we start the handshake to the // peer address. So the attempt time is a lower bound for the actual // handshake time. - UpdateAttempt { .. } => Some(Instant::now()), + UpdateAttempt { .. } => Some(now), UpdateResponded { .. } => None, UpdateFailed { .. } => None, } } /// Return the last response for this change, if available. - pub fn last_response(&self) -> Option { + pub fn last_response(&self, now: DateTime32) -> Option { match self { NewInitial { .. } => None, NewGossiped { .. } => None, @@ -739,13 +792,13 @@ impl MetaAddrChange { // - we might send outdated last seen times to our peers, and // - the peer will appear to be live for longer, delaying future // reconnection attempts. - UpdateResponded { .. } => Some(DateTime32::now()), + UpdateResponded { .. } => Some(now), UpdateFailed { .. } => None, } } /// Return the last failure for this change, if available. - pub fn last_failure(&self) -> Option { + pub fn last_failure(&self, now: Instant) -> Option { match self { NewInitial { .. } => None, NewGossiped { .. } => None, @@ -758,7 +811,7 @@ impl MetaAddrChange { // states for longer, and // - the peer will appear to be used for longer, delaying future // reconnection attempts. - UpdateFailed { .. } => Some(Instant::now()), + UpdateFailed { .. } => Some(now), } } @@ -776,93 +829,212 @@ impl MetaAddrChange { } } - /// If this change can create a new `MetaAddr`, return that address. - pub fn into_new_meta_addr(self) -> Option { - Some(MetaAddr { + /// Returns the corresponding `MetaAddr` for this change. + pub fn into_new_meta_addr(self, instant_now: Instant, local_now: DateTime32) -> MetaAddr { + MetaAddr { addr: self.addr(), services: self.untrusted_services(), - untrusted_last_seen: self.untrusted_last_seen(), - last_response: self.last_response(), - last_attempt: self.last_attempt(), - last_failure: self.last_failure(), + untrusted_last_seen: self.untrusted_last_seen(local_now), + last_response: self.last_response(local_now), + last_attempt: self.last_attempt(instant_now), + last_failure: self.last_failure(instant_now), last_connection_state: self.peer_addr_state(), - }) + } + } + + /// Returns the corresponding [`MetaAddr`] for a local listener change. + /// + /// This method exists so we don't have to provide an unused [`Instant`] to get a local + /// listener `MetaAddr`. + /// + /// # Panics + /// + /// If this change is not a [`MetaAddrChange::NewLocal`]. + pub fn local_listener_into_new_meta_addr(self, local_now: DateTime32) -> MetaAddr { + assert!(matches!(self, MetaAddrChange::NewLocal { .. })); + + MetaAddr { + addr: self.addr(), + services: self.untrusted_services(), + untrusted_last_seen: self.untrusted_last_seen(local_now), + last_response: self.last_response(local_now), + last_attempt: None, + last_failure: None, + last_connection_state: self.peer_addr_state(), + } } /// Apply this change to a previous `MetaAddr` from the address book, /// producing a new or updated `MetaAddr`. /// /// If the change isn't valid for the `previous` address, returns `None`. - pub fn apply_to_meta_addr(&self, previous: impl Into>) -> Option { - if let Some(previous) = previous.into() { - assert_eq!(previous.addr, self.addr(), "unexpected addr mismatch"); + #[allow(clippy::unwrap_in_result)] + pub fn apply_to_meta_addr( + &self, + previous: impl Into>, + instant_now: Instant, + chrono_now: chrono::DateTime, + ) -> Option { + let local_now: DateTime32 = chrono_now.try_into().expect("will succeed until 2038"); - let previous_has_been_attempted = !previous.last_connection_state.is_never_attempted(); - let change_to_never_attempted = self - .into_new_meta_addr() - .map(|meta_addr| meta_addr.last_connection_state.is_never_attempted()) - .unwrap_or(false); - - if change_to_never_attempted { - if previous_has_been_attempted { - // Existing entry has been attempted, change is NeverAttempted - // - ignore the change - // - // # Security - // - // Ignore NeverAttempted changes once we have made an attempt, - // so malicious peers can't keep changing our peer connection order. - None - } else { - // Existing entry and change are both NeverAttempted - // - preserve original values of all fields - // - but replace None with Some - // - // # Security - // - // Preserve the original field values for NeverAttempted peers, - // so malicious peers can't keep changing our peer connection order. - Some(MetaAddr { - addr: self.addr(), - services: previous.services.or_else(|| self.untrusted_services()), - untrusted_last_seen: previous - .untrusted_last_seen - .or_else(|| self.untrusted_last_seen()), - // The peer has not been attempted, so these fields must be None - last_response: None, - last_attempt: None, - last_failure: None, - last_connection_state: self.peer_addr_state(), - }) - } - } else { - // Existing entry and change are both Attempt, Responded, Failed - // - ignore changes to earlier times - // - update the services from the change - // - // # Security - // - // Ignore changes to earlier times. This enforces the peer - // connection timeout, even if changes are applied out of order. - Some(MetaAddr { - addr: self.addr(), - // We want up-to-date services, even if they have fewer bits, - // or they are applied out of order. - services: self.untrusted_services().or(previous.services), - // Only NeverAttempted changes can modify the last seen field - untrusted_last_seen: previous.untrusted_last_seen, - // Since Some(time) is always greater than None, `max` prefers: - // - the latest time if both are Some - // - Some(time) if the other is None - last_response: self.last_response().max(previous.last_response), - last_attempt: self.last_attempt().max(previous.last_attempt), - last_failure: self.last_failure().max(previous.last_failure), - last_connection_state: self.peer_addr_state(), - }) - } - } else { + let Some(previous) = previous.into() else { // no previous: create a new entry - self.into_new_meta_addr() + return Some(self.into_new_meta_addr(instant_now, local_now)); + }; + + assert_eq!(previous.addr, self.addr(), "unexpected addr mismatch"); + + let instant_previous = max(previous.last_attempt, previous.last_failure); + let local_previous = previous.last_response; + + // Is this change potentially concurrent with the previous change? + // + // Since we're using saturating arithmetic, one of each pair of less than comparisons + // will always be true, because subtraction saturates to zero. + let change_is_concurrent = instant_previous + .map(|instant_previous| { + instant_previous.saturating_duration_since(instant_now) + < constants::CONCURRENT_ADDRESS_CHANGE_PERIOD + && instant_now.saturating_duration_since(instant_previous) + < constants::CONCURRENT_ADDRESS_CHANGE_PERIOD + }) + .unwrap_or_default() + || local_previous + .map(|local_previous| { + local_previous.saturating_duration_since(local_now).to_std() + < constants::CONCURRENT_ADDRESS_CHANGE_PERIOD + && local_now.saturating_duration_since(local_previous).to_std() + < constants::CONCURRENT_ADDRESS_CHANGE_PERIOD + }) + .unwrap_or_default(); + let change_is_out_of_order = instant_previous + .map(|instant_previous| instant_previous > instant_now) + .unwrap_or_default() + || local_previous + .map(|local_previous| local_previous > local_now) + .unwrap_or_default(); + + // Is this change typically from a connection state that has more progress? + let connection_has_more_progress = self + .peer_addr_state() + .connection_state_order(&previous.last_connection_state) + == Ordering::Greater; + + let previous_has_been_attempted = !previous.last_connection_state.is_never_attempted(); + let change_to_never_attempted = self.peer_addr_state().is_never_attempted(); + + // Invalid changes + + if change_to_never_attempted && previous_has_been_attempted { + // Existing entry has been attempted, change is NeverAttempted + // - ignore the change + // + // # Security + // + // Ignore NeverAttempted changes once we have made an attempt, + // so malicious peers can't keep changing our peer connection order. + return None; + } + + if change_is_out_of_order && !change_is_concurrent { + // Change is significantly out of order: ignore it. + // + // # Security + // + // Ignore changes that arrive out of order, if they are far enough apart. + // This enforces the peer connection retry interval. + return None; + } + + if change_is_concurrent && !connection_has_more_progress { + // Change is close together in time, and it would revert the connection to an earlier + // state. + // + // # Security + // + // If the changes might have been concurrent, ignore connection states with less + // progress. + // + // ## Sources of Concurrency + // + // If two changes happen close together, the async scheduler can run their change + // send and apply code in any order. This includes the code that records the time of + // the change. So even if a failure happens after a response message, the failure time + // can be recorded before the response time code is run. + // + // Some machines and OSes have limited time resolution, so we can't guarantee that + // two messages on the same connection will always have different times. There are + // also known bugs impacting monotonic times which make them go backwards or stay + // equal. For wall clock times, clock skew is an expected event, particularly with + // network time server updates. + // + // Also, the application can fail a connection independently and simultaneously + // (or slightly before) a positive update from that peer connection. We want the + // application change to take priority in the address book, because the connection + // state machine also prioritises failures over any other peer messages. + // + // ## Resolution + // + // In these cases, we want to apply the failure, then ignore any nearby changes that + // reset the address book entry to a more appealing state. This prevents peers from + // sending updates right before failing a connection, in order to make themselves more + // likely to get a reconnection. + // + // The connection state machine order is used so that state transitions which are + // typically close together are preserved. These transitions are: + // - NeverAttempted*->AttemptPending->(Responded|Failed) + // - Responded->Failed + // + // State transitions like (Responded|Failed)->AttemptPending only happen after the + // reconnection timeout, so they will never be considered concurrent. + return None; + } + + // Valid changes + + if change_to_never_attempted && !previous_has_been_attempted { + // Existing entry and change are both NeverAttempted + // - preserve original values of all fields + // - but replace None with Some + // + // # Security + // + // Preserve the original field values for NeverAttempted peers, + // so malicious peers can't keep changing our peer connection order. + Some(MetaAddr { + addr: self.addr(), + services: previous.services.or_else(|| self.untrusted_services()), + untrusted_last_seen: previous + .untrusted_last_seen + .or_else(|| self.untrusted_last_seen(local_now)), + // The peer has not been attempted, so these fields must be None + last_response: None, + last_attempt: None, + last_failure: None, + last_connection_state: self.peer_addr_state(), + }) + } else { + // Existing entry and change are both Attempt, Responded, Failed, + // and the change is later, either in time or in connection progress + // (this is checked above and returns None early): + // - update the fields from the change + Some(MetaAddr { + addr: self.addr(), + // Always update optional fields, unless the update is None. + // + // We want up-to-date services, even if they have fewer bits + services: self.untrusted_services().or(previous.services), + // Only NeverAttempted changes can modify the last seen field + untrusted_last_seen: previous.untrusted_last_seen, + // This is a wall clock time, but we already checked that responses are in order. + // Even if the wall clock time has jumped, we want to use the latest time. + last_response: self.last_response(local_now).or(previous.last_response), + // These are monotonic times, we already checked the responses are in order. + last_attempt: self.last_attempt(instant_now).or(previous.last_attempt), + last_failure: self.last_failure(instant_now).or(previous.last_failure), + // Replace the state with the updated state. + last_connection_state: self.peer_addr_state(), + }) } } } diff --git a/zebra-network/src/meta_addr/arbitrary.rs b/zebra-network/src/meta_addr/arbitrary.rs index 955607e7..1b96440e 100644 --- a/zebra-network/src/meta_addr/arbitrary.rs +++ b/zebra-network/src/meta_addr/arbitrary.rs @@ -1,5 +1,7 @@ //! Randomised test data generation for MetaAddr. +use std::time::Instant; + use proptest::{arbitrary::any, collection::vec, prelude::*}; use zebra_chain::{parameters::Network::*, serialization::DateTime32}; @@ -49,12 +51,20 @@ impl MetaAddr { /// /// [1]: super::PeerAddrState::NeverAttemptedAlternate pub fn alternate_strategy() -> BoxedStrategy { - (canonical_peer_addr_strategy(), any::()) - .prop_map(|(socket_addr, untrusted_services)| { - MetaAddr::new_alternate(socket_addr, &untrusted_services) - .into_new_meta_addr() - .expect("unexpected invalid alternate change") - }) + ( + canonical_peer_addr_strategy(), + any::(), + any::(), + any::(), + ) + .prop_map( + |(socket_addr, untrusted_services, instant_now, local_now)| { + // instant_now is not actually used for this variant, + // so we could just provide a default value + MetaAddr::new_alternate(socket_addr, &untrusted_services) + .into_new_meta_addr(instant_now, local_now) + }, + ) .boxed() } } @@ -98,22 +108,29 @@ impl MetaAddrChange { /// /// [1]: super::NewAlternate pub fn ready_outbound_strategy() -> BoxedStrategy { - canonical_peer_addr_strategy() - .prop_filter_map("failed MetaAddr::is_valid_for_outbound", |addr| { - // Alternate nodes use the current time, so they're always ready - // - // TODO: create a "Zebra supported services" constant - let change = MetaAddr::new_alternate(addr, &PeerServices::NODE_NETWORK); - if change - .into_new_meta_addr() - .expect("unexpected invalid alternate change") - .last_known_info_is_valid_for_outbound(Mainnet) - { - Some(change) - } else { - None - } - }) + ( + canonical_peer_addr_strategy(), + any::(), + any::(), + ) + .prop_filter_map( + "failed MetaAddr::is_valid_for_outbound", + |(addr, instant_now, local_now)| { + // Alternate nodes use the current time, so they're always ready + // + // TODO: create a "Zebra supported services" constant + + let change = MetaAddr::new_alternate(addr, &PeerServices::NODE_NETWORK); + if change + .into_new_meta_addr(instant_now, local_now) + .last_known_info_is_valid_for_outbound(Mainnet) + { + Some(change) + } else { + None + } + }, + ) .boxed() } } diff --git a/zebra-network/src/meta_addr/tests/prop.rs b/zebra-network/src/meta_addr/tests/prop.rs index 0b5f968a..19f66718 100644 --- a/zebra-network/src/meta_addr/tests/prop.rs +++ b/zebra-network/src/meta_addr/tests/prop.rs @@ -4,7 +4,6 @@ use std::{collections::HashMap, env, net::SocketAddr, str::FromStr, sync::Arc, t use chrono::Utc; use proptest::{collection::vec, prelude::*}; -use tokio::time::Instant; use tower::service_fn; use tracing::Span; @@ -64,8 +63,12 @@ proptest! { ) { let _init_guard = zebra_test::init(); + let instant_now = std::time::Instant::now(); + let chrono_now = Utc::now(); + let local_now: DateTime32 = chrono_now.try_into().expect("will succeed until 2038"); + for change in changes { - if let Some(changed_addr) = change.apply_to_meta_addr(addr) { + if let Some(changed_addr) = change.apply_to_meta_addr(addr, instant_now, chrono_now) { // untrusted last seen times: // check that we replace None with Some, but leave Some unchanged if addr.untrusted_last_seen.is_some() { @@ -73,7 +76,7 @@ proptest! { } else { prop_assert_eq!( changed_addr.untrusted_last_seen, - change.untrusted_last_seen() + change.untrusted_last_seen(local_now) ); } @@ -112,18 +115,22 @@ proptest! { for change in changes { while addr.is_ready_for_connection_attempt(instant_now, chrono_now, Mainnet) { - attempt_count += 1; - // Assume that this test doesn't last longer than MIN_PEER_RECONNECTION_DELAY - prop_assert!(attempt_count <= 1); - // Simulate an attempt - addr = MetaAddr::new_reconnect(addr.addr) - .apply_to_meta_addr(addr) - .expect("unexpected invalid attempt"); + addr = if let Some(addr) = MetaAddr::new_reconnect(addr.addr) + .apply_to_meta_addr(addr, instant_now, chrono_now) { + attempt_count += 1; + // Assume that this test doesn't last longer than MIN_PEER_RECONNECTION_DELAY + prop_assert!(attempt_count <= 1); + addr + } else { + // Stop updating when an attempt comes too soon after a failure. + // In production these are prevented by the dialer code. + break; + } } // If `change` is invalid for the current MetaAddr state, skip it. - if let Some(changed_addr) = change.apply_to_meta_addr(addr) { + if let Some(changed_addr) = change.apply_to_meta_addr(addr, instant_now, chrono_now) { prop_assert_eq!(changed_addr.addr, addr.addr); addr = changed_addr; } @@ -155,7 +162,7 @@ proptest! { ); let sanitized_addrs = address_book.sanitized(chrono_now); - let expected_local_listener = address_book.local_listener_meta_addr(); + let expected_local_listener = address_book.local_listener_meta_addr(chrono_now); let canonical_local_listener = canonical_peer_addr(local_listener); let book_sanitized_local_listener = sanitized_addrs .iter() @@ -186,9 +193,12 @@ proptest! { let local_listener = "0.0.0.0:0".parse().expect("unexpected invalid SocketAddr"); + let instant_now = std::time::Instant::now(); + let chrono_now = Utc::now(); + for change in changes { // Check direct application - let new_addr = change.apply_to_meta_addr(None); + let new_addr = change.apply_to_meta_addr(None, instant_now, chrono_now); prop_assert!( new_addr.is_some(), @@ -328,7 +338,7 @@ proptest! { tokio::time::pause(); // The earliest time we can have a valid next attempt for this peer - let earliest_next_attempt = Instant::now() + MIN_PEER_RECONNECTION_DELAY; + let earliest_next_attempt = tokio::time::Instant::now() + MIN_PEER_RECONNECTION_DELAY; // The number of attempts for this peer in the last MIN_PEER_RECONNECTION_DELAY let mut attempt_count: usize = 0; @@ -349,7 +359,7 @@ proptest! { original addr was in address book: {}\n", candidate_addr, i, - Instant::now(), + tokio::time::Instant::now(), earliest_next_attempt, attempt_count, LIVE_PEER_INTERVALS, @@ -365,7 +375,7 @@ proptest! { address_book.clone().lock().unwrap().update(change); tokio::time::advance(peer_change_interval).await; - if Instant::now() >= earliest_next_attempt { + if tokio::time::Instant::now() >= earliest_next_attempt { attempt_count = 0; } } @@ -423,20 +433,24 @@ proptest! { let change = changes.get(change_index); while addr.is_ready_for_connection_attempt(instant_now, chrono_now, Mainnet) { - *attempt_counts.entry(addr.addr).or_default() += 1; - prop_assert!( - *attempt_counts.get(&addr.addr).unwrap() <= LIVE_PEER_INTERVALS + 1 - ); - // Simulate an attempt - *addr = MetaAddr::new_reconnect(addr.addr) - .apply_to_meta_addr(*addr) - .expect("unexpected invalid attempt"); + *addr = if let Some(addr) = MetaAddr::new_reconnect(addr.addr) + .apply_to_meta_addr(*addr, instant_now, chrono_now) { + *attempt_counts.entry(addr.addr).or_default() += 1; + prop_assert!( + *attempt_counts.get(&addr.addr).unwrap() <= LIVE_PEER_INTERVALS + 1 + ); + addr + } else { + // Stop updating when an attempt comes too soon after a failure. + // In production these are prevented by the dialer code. + break; + } } // If `change` is invalid for the current MetaAddr state, skip it. // If we've run out of changes for this addr, do nothing. - if let Some(changed_addr) = change.and_then(|change| change.apply_to_meta_addr(*addr)) + if let Some(changed_addr) = change.and_then(|change| change.apply_to_meta_addr(*addr, instant_now, chrono_now)) { prop_assert_eq!(changed_addr.addr, addr.addr); *addr = changed_addr; diff --git a/zebra-network/src/meta_addr/tests/vectors.rs b/zebra-network/src/meta_addr/tests/vectors.rs index 187f7077..5b341901 100644 --- a/zebra-network/src/meta_addr/tests/vectors.rs +++ b/zebra-network/src/meta_addr/tests/vectors.rs @@ -1,5 +1,7 @@ //! Fixed test cases for MetaAddr and MetaAddrChange. +use std::time::Instant; + use chrono::Utc; use zebra_chain::{ @@ -7,7 +9,11 @@ use zebra_chain::{ serialization::{DateTime32, Duration32}, }; -use crate::{constants::MAX_PEER_ACTIVE_FOR_GOSSIP, protocol::types::PeerServices, PeerSocketAddr}; +use crate::{ + constants::{CONCURRENT_ADDRESS_CHANGE_PERIOD, MAX_PEER_ACTIVE_FOR_GOSSIP}, + protocol::types::PeerServices, + PeerSocketAddr, +}; use super::{super::MetaAddr, check}; @@ -57,12 +63,13 @@ fn sanitize_extremes() { fn new_local_listener_is_gossipable() { let _init_guard = zebra_test::init(); + let instant_now = Instant::now(); let chrono_now = Utc::now(); + let local_now: DateTime32 = chrono_now.try_into().expect("will succeed until 2038"); let address = PeerSocketAddr::from(([192, 168, 180, 9], 10_000)); - let peer = MetaAddr::new_local_listener_change(address) - .into_new_meta_addr() - .expect("MetaAddrChange can't create a new MetaAddr"); + let peer = + MetaAddr::new_local_listener_change(address).into_new_meta_addr(instant_now, local_now); assert!(peer.is_active_for_gossip(chrono_now)); } @@ -75,12 +82,13 @@ fn new_local_listener_is_gossipable() { fn new_alternate_peer_address_is_not_gossipable() { let _init_guard = zebra_test::init(); + let instant_now = Instant::now(); let chrono_now = Utc::now(); + let local_now: DateTime32 = chrono_now.try_into().expect("will succeed until 2038"); let address = PeerSocketAddr::from(([192, 168, 180, 9], 10_000)); let peer = MetaAddr::new_alternate(address, &PeerServices::NODE_NETWORK) - .into_new_meta_addr() - .expect("MetaAddrChange can't create a new MetaAddr"); + .into_new_meta_addr(instant_now, local_now); assert!(!peer.is_active_for_gossip(chrono_now)); } @@ -153,16 +161,17 @@ fn gossiped_peer_reportedly_seen_long_ago_is_not_gossipable() { fn recently_responded_peer_is_gossipable() { let _init_guard = zebra_test::init(); + let instant_now = Instant::now(); let chrono_now = Utc::now(); + let local_now: DateTime32 = chrono_now.try_into().expect("will succeed until 2038"); let address = PeerSocketAddr::from(([192, 168, 180, 9], 10_000)); let peer_seed = MetaAddr::new_alternate(address, &PeerServices::NODE_NETWORK) - .into_new_meta_addr() - .expect("MetaAddrChange can't create a new MetaAddr"); + .into_new_meta_addr(instant_now, local_now); // Create a peer that has responded let peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK) - .apply_to_meta_addr(peer_seed) + .apply_to_meta_addr(peer_seed, instant_now, chrono_now) .expect("Failed to create MetaAddr for responded peer"); assert!(peer.is_active_for_gossip(chrono_now)); @@ -173,16 +182,17 @@ fn recently_responded_peer_is_gossipable() { fn not_so_recently_responded_peer_is_still_gossipable() { let _init_guard = zebra_test::init(); + let instant_now = Instant::now(); let chrono_now = Utc::now(); + let local_now: DateTime32 = chrono_now.try_into().expect("will succeed until 2038"); let address = PeerSocketAddr::from(([192, 168, 180, 9], 10_000)); let peer_seed = MetaAddr::new_alternate(address, &PeerServices::NODE_NETWORK) - .into_new_meta_addr() - .expect("MetaAddrChange can't create a new MetaAddr"); + .into_new_meta_addr(instant_now, local_now); // Create a peer that has responded let mut peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK) - .apply_to_meta_addr(peer_seed) + .apply_to_meta_addr(peer_seed, instant_now, chrono_now) .expect("Failed to create MetaAddr for responded peer"); // Tweak the peer's last response time to be within the limits of the reachable duration @@ -203,16 +213,17 @@ fn not_so_recently_responded_peer_is_still_gossipable() { fn responded_long_ago_peer_is_not_gossipable() { let _init_guard = zebra_test::init(); + let instant_now = Instant::now(); let chrono_now = Utc::now(); + let local_now: DateTime32 = chrono_now.try_into().expect("will succeed until 2038"); let address = PeerSocketAddr::from(([192, 168, 180, 9], 10_000)); let peer_seed = MetaAddr::new_alternate(address, &PeerServices::NODE_NETWORK) - .into_new_meta_addr() - .expect("MetaAddrChange can't create a new MetaAddr"); + .into_new_meta_addr(instant_now, local_now); // Create a peer that has responded let mut peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK) - .apply_to_meta_addr(peer_seed) + .apply_to_meta_addr(peer_seed, instant_now, chrono_now) .expect("Failed to create MetaAddr for responded peer"); // Tweak the peer's last response time to be outside the limits of the reachable duration @@ -227,3 +238,210 @@ fn responded_long_ago_peer_is_not_gossipable() { assert!(!peer.is_active_for_gossip(chrono_now)); } + +/// Test that a change that is delayed for a long time is not applied to the address state. +#[test] +fn long_delayed_change_is_not_applied() { + let _init_guard = zebra_test::init(); + + let instant_now = Instant::now(); + let chrono_now = Utc::now(); + let local_now: DateTime32 = chrono_now.try_into().expect("will succeed until 2038"); + + let address = PeerSocketAddr::from(([192, 168, 180, 9], 10_000)); + let peer_seed = MetaAddr::new_alternate(address, &PeerServices::NODE_NETWORK) + .into_new_meta_addr(instant_now, local_now); + + // Create a peer that has responded + let peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK) + .apply_to_meta_addr(peer_seed, instant_now, chrono_now) + .expect("Failed to create MetaAddr for responded peer"); + + // Create an earlier change to Failed that has been delayed a long time. + // Failed typically comes after Responded, so it will pass the connection progress check. + // + // This is very unlikely in the May 2023 production code, + // but it can happen due to getting the time, then waiting for the address book mutex. + + // Create some change times that are much earlier + let instant_early = instant_now - (CONCURRENT_ADDRESS_CHANGE_PERIOD * 3); + let chrono_early = chrono_now + - chrono::Duration::from_std(CONCURRENT_ADDRESS_CHANGE_PERIOD * 3) + .expect("constant is valid"); + + let change = MetaAddr::new_errored(address, PeerServices::NODE_NETWORK); + let outcome = change.apply_to_meta_addr(peer, instant_early, chrono_early); + + assert_eq!( + outcome, None, + "\n\ + unexpected application of a much earlier change to a peer:\n\ + change: {change:?}\n\ + times: {instant_early:?} {chrono_early}\n\ + peer: {peer:?}" + ); +} + +/// Test that a change that happens a long time after the previous change +/// is applied to the address state, even if it is a revert. +#[test] +fn later_revert_change_is_applied() { + let _init_guard = zebra_test::init(); + + let instant_now = Instant::now(); + let chrono_now = Utc::now(); + let local_now: DateTime32 = chrono_now.try_into().expect("will succeed until 2038"); + + let address = PeerSocketAddr::from(([192, 168, 180, 9], 10_000)); + let peer_seed = MetaAddr::new_alternate(address, &PeerServices::NODE_NETWORK) + .into_new_meta_addr(instant_now, local_now); + + // Create a peer that has responded + let peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK) + .apply_to_meta_addr(peer_seed, instant_now, chrono_now) + .expect("Failed to create MetaAddr for responded peer"); + + // Create an earlier change to AttemptPending that happens a long time later. + // AttemptPending typically comes before Responded, so it will fail the connection progress + // check, but that failure should be ignored because it is not concurrent. + // + // This is a typical reconnect in production. + + // Create some change times that are much later + let instant_late = instant_now + (CONCURRENT_ADDRESS_CHANGE_PERIOD * 3); + let chrono_late = chrono_now + + chrono::Duration::from_std(CONCURRENT_ADDRESS_CHANGE_PERIOD * 3) + .expect("constant is valid"); + + let change = MetaAddr::new_reconnect(address); + let outcome = change.apply_to_meta_addr(peer, instant_late, chrono_late); + + assert!( + outcome.is_some(), + "\n\ + unexpected skipped much later change to a peer:\n\ + change: {change:?}\n\ + times: {instant_late:?} {chrono_late}\n\ + peer: {peer:?}" + ); +} + +/// Test that a concurrent change which reverses the connection state is not applied. +#[test] +fn concurrent_state_revert_change_is_not_applied() { + let _init_guard = zebra_test::init(); + + let instant_now = Instant::now(); + let chrono_now = Utc::now(); + let local_now: DateTime32 = chrono_now.try_into().expect("will succeed until 2038"); + + let address = PeerSocketAddr::from(([192, 168, 180, 9], 10_000)); + let peer_seed = MetaAddr::new_alternate(address, &PeerServices::NODE_NETWORK) + .into_new_meta_addr(instant_now, local_now); + + // Create a peer that has responded + let peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK) + .apply_to_meta_addr(peer_seed, instant_now, chrono_now) + .expect("Failed to create MetaAddr for responded peer"); + + // Create a concurrent change to AttemptPending. + // AttemptPending typically comes before Responded, so it will fail the progress check. + // + // This is likely to happen in production, it just requires a short delay in the earlier change. + + // Create some change times that are earlier but concurrent + let instant_early = instant_now - (CONCURRENT_ADDRESS_CHANGE_PERIOD / 2); + let chrono_early = chrono_now + - chrono::Duration::from_std(CONCURRENT_ADDRESS_CHANGE_PERIOD / 2) + .expect("constant is valid"); + + let change = MetaAddr::new_reconnect(address); + let outcome = change.apply_to_meta_addr(peer, instant_early, chrono_early); + + assert_eq!( + outcome, None, + "\n\ + unexpected application of an early concurrent change to a peer:\n\ + change: {change:?}\n\ + times: {instant_early:?} {chrono_early}\n\ + peer: {peer:?}" + ); + + // Create some change times that are later but concurrent + let instant_late = instant_now + (CONCURRENT_ADDRESS_CHANGE_PERIOD / 2); + let chrono_late = chrono_now + + chrono::Duration::from_std(CONCURRENT_ADDRESS_CHANGE_PERIOD / 2) + .expect("constant is valid"); + + let change = MetaAddr::new_reconnect(address); + let outcome = change.apply_to_meta_addr(peer, instant_late, chrono_late); + + assert_eq!( + outcome, None, + "\n\ + unexpected application of a late concurrent change to a peer:\n\ + change: {change:?}\n\ + times: {instant_late:?} {chrono_late}\n\ + peer: {peer:?}" + ); +} + +/// Test that a concurrent change which progresses the connection state is applied. +#[test] +fn concurrent_state_progress_change_is_applied() { + let _init_guard = zebra_test::init(); + + let instant_now = Instant::now(); + let chrono_now = Utc::now(); + let local_now: DateTime32 = chrono_now.try_into().expect("will succeed until 2038"); + + let address = PeerSocketAddr::from(([192, 168, 180, 9], 10_000)); + let peer_seed = MetaAddr::new_alternate(address, &PeerServices::NODE_NETWORK) + .into_new_meta_addr(instant_now, local_now); + + // Create a peer that has responded + let peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK) + .apply_to_meta_addr(peer_seed, instant_now, chrono_now) + .expect("Failed to create MetaAddr for responded peer"); + + // Create a concurrent change to Failed. + // Failed typically comes after Responded, so it will pass the progress check. + // + // This is a typical update in production. + + // Create some change times that are earlier but concurrent + let instant_early = instant_now - (CONCURRENT_ADDRESS_CHANGE_PERIOD / 2); + let chrono_early = chrono_now + - chrono::Duration::from_std(CONCURRENT_ADDRESS_CHANGE_PERIOD / 2) + .expect("constant is valid"); + + let change = MetaAddr::new_errored(address, None); + let outcome = change.apply_to_meta_addr(peer, instant_early, chrono_early); + + assert!( + outcome.is_some(), + "\n\ + unexpected skipped early concurrent change to a peer:\n\ + change: {change:?}\n\ + times: {instant_early:?} {chrono_early}\n\ + peer: {peer:?}" + ); + + // Create some change times that are later but concurrent + let instant_late = instant_now + (CONCURRENT_ADDRESS_CHANGE_PERIOD / 2); + let chrono_late = chrono_now + + chrono::Duration::from_std(CONCURRENT_ADDRESS_CHANGE_PERIOD / 2) + .expect("constant is valid"); + + let change = MetaAddr::new_errored(address, None); + let outcome = change.apply_to_meta_addr(peer, instant_late, chrono_late); + + assert!( + outcome.is_some(), + "\n\ + unexpected skipped late concurrent change to a peer:\n\ + change: {change:?}\n\ + times: {instant_late:?} {chrono_late}\n\ + peer: {peer:?}" + ); +} diff --git a/zebra-network/src/peer_set/initialize/tests/vectors.rs b/zebra-network/src/peer_set/initialize/tests/vectors.rs index 4ef69cb5..76110e8e 100644 --- a/zebra-network/src/peer_set/initialize/tests/vectors.rs +++ b/zebra-network/src/peer_set/initialize/tests/vectors.rs @@ -1145,7 +1145,7 @@ async fn self_connections_should_fail() { .lock() .expect("unexpected panic in address book"); - let real_self_listener = unlocked_address_book.local_listener_meta_addr(); + let real_self_listener = unlocked_address_book.local_listener_meta_addr(Utc::now()); // Set a fake listener to get past the check for adding our own address unlocked_address_book.set_local_listener("192.168.0.0:1".parse().unwrap()); @@ -1384,7 +1384,10 @@ async fn local_listener_port_with(listen_addr: SocketAddr, network: Network) { "Test user agent".to_string(), ) .await; - let local_listener = address_book.lock().unwrap().local_listener_meta_addr(); + let local_listener = address_book + .lock() + .unwrap() + .local_listener_meta_addr(Utc::now()); if listen_addr.port() == 0 { assert_ne!( diff --git a/zebra-rpc/src/methods/tests/snapshot/get_block_template_rpcs.rs b/zebra-rpc/src/methods/tests/snapshot/get_block_template_rpcs.rs index 5b8155f2..c827d5f0 100644 --- a/zebra-rpc/src/methods/tests/snapshot/get_block_template_rpcs.rs +++ b/zebra-rpc/src/methods/tests/snapshot/get_block_template_rpcs.rs @@ -5,7 +5,10 @@ //! cargo insta test --review --features getblocktemplate-rpcs --delete-unreferenced-snapshots //! ``` -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::{ + net::{IpAddr, Ipv4Addr, SocketAddr}, + time::Instant, +}; use hex::FromHex; use insta::Settings; @@ -133,8 +136,7 @@ pub async fn test_responses( ) .into(), ) - .into_new_meta_addr() - .unwrap()]); + .into_new_meta_addr(Instant::now(), DateTime32::now())]); // get an rpc instance with continuous blockchain state let get_block_template_rpc = GetBlockTemplateRpcImpl::new( diff --git a/zebra-rpc/src/methods/tests/vectors.rs b/zebra-rpc/src/methods/tests/vectors.rs index 02ccd9bc..ccc018b5 100644 --- a/zebra-rpc/src/methods/tests/vectors.rs +++ b/zebra-rpc/src/methods/tests/vectors.rs @@ -951,8 +951,10 @@ async fn rpc_getpeerinfo() { ) .into(), ) - .into_new_meta_addr() - .unwrap(); + .into_new_meta_addr( + std::time::Instant::now(), + zebra_chain::serialization::DateTime32::now(), + ); let mock_address_book = MockAddressBookPeers::new(vec![mock_peer_address]);