diff --git a/zebra-network/src/constants.rs b/zebra-network/src/constants.rs index 275a14ab..5abaf517 100644 --- a/zebra-network/src/constants.rs +++ b/zebra-network/src/constants.rs @@ -200,7 +200,10 @@ pub const EWMA_DEFAULT_RTT: Duration = Duration::from_secs(REQUEST_TIMEOUT.as_se /// /// This should be much larger than the `SYNC_RESTART_TIMEOUT`, so we choose /// better peers when we restart the sync. -pub const EWMA_DECAY_TIME: Duration = Duration::from_secs(200); +pub const EWMA_DECAY_TIME_NANOS: f64 = 200.0 * NANOS_PER_SECOND; + +/// The number of nanoseconds in one second. +const NANOS_PER_SECOND: f64 = 1_000_000_000.0; lazy_static! { /// The minimum network protocol version accepted by this crate for each network, @@ -279,7 +282,10 @@ mod tests { assert!(EWMA_DEFAULT_RTT > REQUEST_TIMEOUT, "The default EWMA RTT should be higher than the request timeout, so new peers are required to prove they are fast, before we prefer them to other peers."); - assert!(EWMA_DECAY_TIME > REQUEST_TIMEOUT, + let request_timeout_nanos = REQUEST_TIMEOUT.as_secs_f64() + + f64::from(REQUEST_TIMEOUT.subsec_nanos()) * NANOS_PER_SECOND; + + assert!(EWMA_DECAY_TIME_NANOS > request_timeout_nanos, "The EWMA decay time should be higher than the request timeout, so timed out peers are penalised by the EWMA."); } diff --git a/zebra-network/src/peer.rs b/zebra-network/src/peer.rs index 11cbcf13..f3a0dffc 100644 --- a/zebra-network/src/peer.rs +++ b/zebra-network/src/peer.rs @@ -10,11 +10,22 @@ mod connector; mod error; /// Performs peer handshakes. mod handshake; +/// Tracks the load on a `Client` service. +mod load_tracked_client; +/// Watches for chain tip height updates to determine the minimum support peer protocol version. +mod minimum_peer_version; -use client::{ClientRequest, ClientRequestReceiver, InProgressClientRequest, MustUseOneshotSender}; +#[cfg(not(test))] +use client::ClientRequest; +#[cfg(test)] +pub(crate) use client::ClientRequest; + +use client::{ClientRequestReceiver, InProgressClientRequest, MustUseOneshotSender}; pub use client::Client; pub use connection::Connection; pub use connector::{Connector, OutboundConnectorRequest}; pub use error::{ErrorSlot, HandshakeError, PeerError, SharedPeerError}; pub use handshake::{ConnectedAddr, Handshake, HandshakeRequest}; +pub use load_tracked_client::LoadTrackedClient; +pub use minimum_peer_version::MinimumPeerVersion; diff --git a/zebra-network/src/peer/client.rs b/zebra-network/src/peer/client.rs index ca63d377..ae266bf6 100644 --- a/zebra-network/src/peer/client.rs +++ b/zebra-network/src/peer/client.rs @@ -11,7 +11,10 @@ use futures::{ }; use tower::Service; -use crate::protocol::internal::{Request, Response}; +use crate::protocol::{ + external::types::Version, + internal::{Request, Response}, +}; use super::{ErrorSlot, PeerError, SharedPeerError}; @@ -28,6 +31,9 @@ pub struct Client { /// /// `None` unless the connection or client have errored. pub(crate) error_slot: ErrorSlot, + + /// The peer connection's protocol version. + pub(crate) version: Version, } /// A message from the `peer::Client` to the `peer::Server`. diff --git a/zebra-network/src/peer/connector.rs b/zebra-network/src/peer/connector.rs index 8f44c152..9fd08b44 100644 --- a/zebra-network/src/peer/connector.rs +++ b/zebra-network/src/peer/connector.rs @@ -7,7 +7,7 @@ use std::{ use futures::prelude::*; use tokio::net::TcpStream; -use tower::{discover::Change, Service, ServiceExt}; +use tower::{Service, ServiceExt}; use tracing_futures::Instrument; use zebra_chain::chain_tip::{ChainTip, NoChainTip}; @@ -57,7 +57,7 @@ where S::Future: Send, C: ChainTip + Clone + Send + 'static, { - type Response = Change; + type Response = (SocketAddr, Client); type Error = BoxError; type Future = Pin> + Send + 'static>>; @@ -86,7 +86,7 @@ where connection_tracker, }) .await?; - Ok(Change::Insert(addr, client)) + Ok((addr, client)) } .instrument(connector_span) .boxed() diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index 783c8c96..a318e0fa 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -30,7 +30,9 @@ use zebra_chain::{ use crate::{ constants, meta_addr::MetaAddrChange, - peer::{Client, ClientRequest, Connection, ErrorSlot, HandshakeError, PeerError}, + peer::{ + Client, ClientRequest, Connection, ErrorSlot, HandshakeError, MinimumPeerVersion, PeerError, + }, peer_set::ConnectionTracker, protocol::{ external::{types::*, AddrInVersion, Codec, InventoryHash, Message}, @@ -59,7 +61,7 @@ pub struct Handshake { our_services: PeerServices, relay: bool, parent_span: Span, - latest_chain_tip: C, + minimum_peer_version: MinimumPeerVersion, } /// The peer address that we are handshaking with. @@ -420,6 +422,8 @@ where let user_agent = self.user_agent.unwrap_or_else(|| "".to_string()); let our_services = self.our_services.unwrap_or_else(PeerServices::empty); let relay = self.relay.unwrap_or(false); + let network = config.network; + let minimum_peer_version = MinimumPeerVersion::new(self.latest_chain_tip, network); Ok(Handshake { config, @@ -431,7 +435,7 @@ where our_services, relay, parent_span: Span::current(), - latest_chain_tip: self.latest_chain_tip, + minimum_peer_version, }) } } @@ -473,7 +477,7 @@ pub async fn negotiate_version( user_agent: String, our_services: PeerServices, relay: bool, - latest_chain_tip: impl ChainTip, + mut minimum_peer_version: MinimumPeerVersion, ) -> Result<(Version, PeerServices, SocketAddr), HandshakeError> { // Create a random nonce for this connection let local_nonce = Nonce::default(); @@ -589,8 +593,7 @@ pub async fn negotiate_version( // SECURITY: Reject connections to peers on old versions, because they might not know about all // network upgrades and could lead to chain forks or slower block propagation. - let height = latest_chain_tip.best_tip_height(); - let min_version = Version::min_remote_for_height(config.network, height); + let min_version = minimum_peer_version.current(); if remote_version < min_version { debug!( remote_ip = ?their_addr, @@ -716,7 +719,7 @@ where let user_agent = self.user_agent.clone(); let our_services = self.our_services; let relay = self.relay; - let latest_chain_tip = self.latest_chain_tip.clone(); + let minimum_peer_version = self.minimum_peer_version.clone(); let fut = async move { debug!( @@ -747,7 +750,7 @@ where user_agent, our_services, relay, - latest_chain_tip, + minimum_peer_version, ), ) .await??; @@ -792,6 +795,7 @@ where shutdown_tx: Some(shutdown_tx), server_tx: server_tx.clone(), error_slot: slot.clone(), + version: remote_version, }; let (peer_tx, peer_rx) = peer_conn.split(); diff --git a/zebra-network/src/peer/load_tracked_client.rs b/zebra-network/src/peer/load_tracked_client.rs new file mode 100644 index 00000000..e931a662 --- /dev/null +++ b/zebra-network/src/peer/load_tracked_client.rs @@ -0,0 +1,71 @@ +//! A peer connection service wrapper type to handle load tracking and provide access to the +//! reported protocol version. + +use std::task::{Context, Poll}; + +use tower::{ + load::{Load, PeakEwma}, + Service, +}; + +use crate::{ + constants::{EWMA_DECAY_TIME_NANOS, EWMA_DEFAULT_RTT}, + peer::Client, + protocol::external::types::Version, +}; + +/// A client service wrapper that keeps track of its load. +/// +/// It also keeps track of the peer's reported protocol version. +pub struct LoadTrackedClient { + service: PeakEwma, + version: Version, +} + +/// Create a new [`LoadTrackedClient`] wrapping the provided `client` service. +impl From for LoadTrackedClient { + fn from(client: Client) -> Self { + let version = client.version; + + let service = PeakEwma::new( + client, + EWMA_DEFAULT_RTT, + EWMA_DECAY_TIME_NANOS, + tower::load::CompleteOnResponse::default(), + ); + + LoadTrackedClient { service, version } + } +} + +impl LoadTrackedClient { + /// Retrieve the peer's reported protocol version. + pub fn version(&self) -> Version { + self.version + } +} + +impl Service for LoadTrackedClient +where + Client: Service, +{ + type Response = >::Response; + type Error = >::Error; + type Future = as Service>::Future; + + fn poll_ready(&mut self, context: &mut Context<'_>) -> Poll> { + self.service.poll_ready(context) + } + + fn call(&mut self, request: Request) -> Self::Future { + self.service.call(request) + } +} + +impl Load for LoadTrackedClient { + type Metric = as Load>::Metric; + + fn load(&self) -> Self::Metric { + self.service.load() + } +} diff --git a/zebra-network/src/peer/minimum_peer_version.rs b/zebra-network/src/peer/minimum_peer_version.rs new file mode 100644 index 00000000..5ee2a449 --- /dev/null +++ b/zebra-network/src/peer/minimum_peer_version.rs @@ -0,0 +1,83 @@ +use zebra_chain::{chain_tip::ChainTip, parameters::Network}; + +use crate::protocol::external::types::Version; + +#[cfg(any(test, feature = "proptest-impl"))] +mod tests; + +/// A helper type to monitor the chain tip in order to determine the minimum peer protocol version +/// that is currently supported. +pub struct MinimumPeerVersion { + network: Network, + chain_tip: C, + current_minimum: Version, + has_changed: bool, +} + +impl MinimumPeerVersion +where + C: ChainTip, +{ + /// Create a new [`MinimumPeerVersion`] to track the minimum supported peer protocol version + /// for the current `chain_tip` on the `network`. + pub fn new(chain_tip: C, network: Network) -> Self { + MinimumPeerVersion { + network, + chain_tip, + current_minimum: Version::min_remote_for_height(network, None), + has_changed: true, + } + } + + /// Check if the minimum supported peer version has changed since the last time this was + /// called. + /// + /// The first call returns the current minimum version, and subsequent calls return [`None`] + /// until the minimum version changes. When that happens, the next call returns the new minimum + /// version, and subsequent calls return [`None`] again until the minimum version changes once + /// more. + pub fn changed(&mut self) -> Option { + self.update(); + + if self.has_changed { + self.has_changed = false; + Some(self.current_minimum) + } else { + None + } + } + + /// Retrieve the current minimum supported peer protocol version. + pub fn current(&mut self) -> Version { + self.update(); + self.current_minimum + } + + /// Check the current chain tip height to determine the minimum peer version, and detect if it + /// has changed. + fn update(&mut self) { + let height = self.chain_tip.best_tip_height(); + let new_minimum = Version::min_remote_for_height(self.network, height); + + if self.current_minimum != new_minimum { + self.current_minimum = new_minimum; + self.has_changed = true; + } + } +} + +/// A custom [`Clone`] implementation to ensure that the first call to +/// [`MinimumPeerVersion::changed`] after the clone will always return the current version. +impl Clone for MinimumPeerVersion +where + C: Clone, +{ + fn clone(&self) -> Self { + MinimumPeerVersion { + network: self.network, + chain_tip: self.chain_tip.clone(), + current_minimum: self.current_minimum, + has_changed: true, + } + } +} diff --git a/zebra-network/src/peer/minimum_peer_version/tests.rs b/zebra-network/src/peer/minimum_peer_version/tests.rs new file mode 100644 index 00000000..54fa687a --- /dev/null +++ b/zebra-network/src/peer/minimum_peer_version/tests.rs @@ -0,0 +1,57 @@ +use std::sync::Arc; + +use tokio::sync::watch; + +use zebra_chain::{block, chain_tip::ChainTip, parameters::Network, transaction}; + +use super::MinimumPeerVersion; + +#[cfg(test)] +mod prop; + +/// A mock [`ChainTip`] implementation that allows setting the `best_tip_height` externally. +#[derive(Clone)] +pub struct MockChainTip { + best_tip_height: watch::Receiver>, +} + +impl MockChainTip { + /// Create a new [`MockChainTip`]. + /// + /// Returns the [`MockChainTip`] instance and the endpoint to modiy the current best tip + /// height. + /// + /// Initially, the best tip height is [`None`]. + pub fn new() -> (Self, watch::Sender>) { + let (sender, receiver) = watch::channel(None); + + let mock_chain_tip = MockChainTip { + best_tip_height: receiver, + }; + + (mock_chain_tip, sender) + } +} + +impl ChainTip for MockChainTip { + fn best_tip_height(&self) -> Option { + *self.best_tip_height.borrow() + } + + fn best_tip_hash(&self) -> Option { + unreachable!("Method not used in `MinimumPeerVersion` tests"); + } + + fn best_tip_mined_transaction_ids(&self) -> Arc<[transaction::Hash]> { + unreachable!("Method not used in `MinimumPeerVersion` tests"); + } +} + +impl MinimumPeerVersion { + pub fn with_mock_chain_tip(network: Network) -> (Self, watch::Sender>) { + let (chain_tip, best_tip_height) = MockChainTip::new(); + let minimum_peer_version = MinimumPeerVersion::new(chain_tip, network); + + (minimum_peer_version, best_tip_height) + } +} diff --git a/zebra-network/src/peer/minimum_peer_version/tests/prop.rs b/zebra-network/src/peer/minimum_peer_version/tests/prop.rs new file mode 100644 index 00000000..f3884d2c --- /dev/null +++ b/zebra-network/src/peer/minimum_peer_version/tests/prop.rs @@ -0,0 +1,82 @@ +use proptest::prelude::*; + +use zebra_chain::{block, parameters::Network}; + +use crate::{peer::MinimumPeerVersion, protocol::external::types::Version}; + +proptest! { + /// Test if the calculated minimum peer version is correct. + #[test] + fn minimum_peer_version_is_correct( + network in any::(), + block_height in any::>(), + ) { + let (mut minimum_peer_version, best_tip_height) = + MinimumPeerVersion::with_mock_chain_tip(network); + + best_tip_height + .send(block_height) + .expect("receiving endpoint lives as long as `minimum_peer_version`"); + + let expected_minimum_version = Version::min_remote_for_height(network, block_height); + + prop_assert_eq!(minimum_peer_version.current(), expected_minimum_version); + } + + /// Test if the calculated minimum peer version changes with the tip height. + #[test] + fn minimum_peer_version_is_updated_with_chain_tip( + network in any::(), + block_heights in any::>>(), + ) { + let (mut minimum_peer_version, best_tip_height) = + MinimumPeerVersion::with_mock_chain_tip(network); + + for block_height in block_heights { + best_tip_height + .send(block_height) + .expect("receiving endpoint lives as long as `minimum_peer_version`"); + + let expected_minimum_version = Version::min_remote_for_height(network, block_height); + + prop_assert_eq!(minimum_peer_version.current(), expected_minimum_version); + } + } + + /// Test if the minimum peer version changes are correctly tracked. + #[test] + fn minimum_peer_version_reports_changes_correctly( + network in any::(), + block_height_updates in any::>>>(), + ) { + let (mut minimum_peer_version, best_tip_height) = + MinimumPeerVersion::with_mock_chain_tip(network); + + let mut current_minimum_version = Version::min_remote_for_height(network, None); + let mut expected_minimum_version = Some(current_minimum_version); + + prop_assert_eq!(minimum_peer_version.changed(), expected_minimum_version); + + for update in block_height_updates { + if let Some(new_block_height) = update { + best_tip_height + .send(new_block_height) + .expect("receiving endpoint lives as long as `minimum_peer_version`"); + + let new_minimum_version = Version::min_remote_for_height(network, new_block_height); + + expected_minimum_version = if new_minimum_version != current_minimum_version { + Some(new_minimum_version) + } else { + None + }; + + current_minimum_version = new_minimum_version; + } else { + expected_minimum_version = None; + } + + prop_assert_eq!(minimum_peer_version.changed(), expected_minimum_version); + } + } +} diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index c626c129..575e25f2 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -9,7 +9,7 @@ use futures::{ channel::mpsc, future::{self, FutureExt}, sink::SinkExt, - stream::{FuturesUnordered, StreamExt}, + stream::{FuturesUnordered, StreamExt, TryStreamExt}, TryFutureExt, }; use rand::seq::SliceRandom; @@ -20,8 +20,7 @@ use tokio::{ }; use tokio_stream::wrappers::IntervalStream; use tower::{ - buffer::Buffer, discover::Change, layer::Layer, load::peak_ewma::PeakEwmaDiscover, - util::BoxService, Service, ServiceExt, + buffer::Buffer, discover::Change, layer::Layer, util::BoxService, Service, ServiceExt, }; use tracing::Span; use tracing_futures::Instrument; @@ -32,7 +31,7 @@ use crate::{ address_book_updater::AddressBookUpdater, constants, meta_addr::{MetaAddr, MetaAddrChange}, - peer::{self, HandshakeRequest, OutboundConnectorRequest}, + peer::{self, HandshakeRequest, MinimumPeerVersion, OutboundConnectorRequest}, peer_set::{set::MorePeers, ActiveConnectionCounter, CandidateSet, ConnectionTracker, PeerSet}, AddressBook, BoxError, Config, Request, Response, }; @@ -43,7 +42,7 @@ mod tests; /// The result of an outbound peer connection attempt or inbound connection handshake. /// /// This result comes from the [`Handshaker`]. -type PeerChange = Result, BoxError>; +type DiscoveredPeer = Result<(SocketAddr, peer::Client), BoxError>; /// Initialize a peer set, using a network `config`, `inbound_service`, /// and `latest_chain_tip`. @@ -122,7 +121,7 @@ where .with_address_book_updater(address_book_updater.clone()) .with_advertised_services(PeerServices::NODE_NETWORK) .with_user_agent(crate::constants::USER_AGENT.to_string()) - .with_latest_chain_tip(latest_chain_tip) + .with_latest_chain_tip(latest_chain_tip.clone()) .want_transactions(true) .finish() .expect("configured all required parameters"); @@ -135,7 +134,14 @@ where // Create an mpsc channel for peer changes, // based on the maximum number of inbound and outbound peers. let (peerset_tx, peerset_rx) = - mpsc::channel::(config.peerset_total_connection_limit()); + mpsc::channel::(config.peerset_total_connection_limit()); + + let discovered_peers = peerset_rx + // Discover interprets an error as stream termination, + // so discard any errored connections... + .filter(|result| future::ready(result.is_ok())) + .map_ok(|(address, client)| Change::Insert(address, client.into())); + // Create an mpsc channel for peerset demand signaling, // based on the maximum number of outbound peers. let (mut demand_tx, demand_rx) = @@ -147,18 +153,12 @@ where // Connect the rx end to a PeerSet, wrapping new peers in load instruments. let peer_set = PeerSet::new( &config, - PeakEwmaDiscover::new( - // Discover interprets an error as stream termination, - // so discard any errored connections... - peerset_rx.filter(|result| future::ready(result.is_ok())), - constants::EWMA_DEFAULT_RTT, - constants::EWMA_DECAY_TIME, - tower::load::CompleteOnResponse::default(), - ), + discovered_peers, demand_tx.clone(), handle_rx, inv_receiver, address_book.clone(), + MinimumPeerVersion::new(latest_chain_tip, config.network), ); let peer_set = Buffer::new(BoxService::new(peer_set), constants::PEERSET_BUFFER_SIZE); @@ -241,15 +241,12 @@ where async fn add_initial_peers( config: Config, outbound_connector: S, - mut peerset_tx: mpsc::Sender, + mut peerset_tx: mpsc::Sender, address_book_updater: mpsc::Sender, ) -> Result where - S: Service< - OutboundConnectorRequest, - Response = Change, - Error = BoxError, - > + Clone, + S: Service + + Clone, S::Future: Send + 'static, { let initial_peers = limit_initial_peers(&config, address_book_updater).await; @@ -470,7 +467,7 @@ async fn accept_inbound_connections( config: Config, listener: TcpListener, mut handshaker: S, - peerset_tx: mpsc::Sender, + peerset_tx: mpsc::Sender, ) -> Result<(), BoxError> where S: Service + Clone, @@ -519,7 +516,7 @@ where tokio::spawn( async move { if let Ok(client) = handshake.await { - let _ = peerset_tx.send(Ok(Change::Insert(addr, client))).await; + let _ = peerset_tx.send(Ok((addr, client))).await; } } .instrument(handshaker_span), @@ -560,7 +557,8 @@ enum CrawlerAction { TimerCrawl { tick: Instant }, /// Handle a successfully connected handshake `peer_set_change`. HandshakeConnected { - peer_set_change: Change, + address: SocketAddr, + client: peer::Client, }, /// Handle a handshake failure to `failed_addr`. HandshakeFailed { failed_addr: MetaAddr }, @@ -598,15 +596,12 @@ async fn crawl_and_dial( mut demand_rx: mpsc::Receiver, mut candidates: CandidateSet, outbound_connector: C, - mut peerset_tx: mpsc::Sender, + mut peerset_tx: mpsc::Sender, mut active_outbound_connections: ActiveConnectionCounter, ) -> Result<(), BoxError> where - C: Service< - OutboundConnectorRequest, - Response = Change, - Error = BoxError, - > + Clone + C: Service + + Clone + Send + 'static, C::Future: Send + 'static, @@ -717,15 +712,11 @@ where // Try to connect to a new peer. let _ = demand_tx.try_send(MorePeers); } - HandshakeConnected { peer_set_change } => { - if let Change::Insert(ref addr, _) = peer_set_change { - debug!(candidate.addr = ?addr, "successfully dialed new peer"); - } else { - unreachable!("unexpected handshake result: all changes should be Insert"); - } - // successes are handled by an independent task, so they - // shouldn't hang - peerset_tx.send(Ok(peer_set_change)).await?; + HandshakeConnected { address, client } => { + debug!(candidate.addr = ?address, "successfully dialed new peer"); + // successes are handled by an independent task, except for `candidates.update` in + // this task, which has a timeout, so they shouldn't hang + peerset_tx.send(Ok((address, client))).await?; } HandshakeFailed { failed_addr } => { // The connection was never opened, or it failed the handshake and was dropped. @@ -758,11 +749,8 @@ async fn dial( outbound_connection_tracker: ConnectionTracker, ) -> CrawlerAction where - C: Service< - OutboundConnectorRequest, - Response = Change, - Error = BoxError, - > + Clone + C: Service + + Clone + Send + 'static, C::Future: Send + 'static, @@ -794,11 +782,11 @@ where .await } -impl From, (MetaAddr, BoxError)>> for CrawlerAction { - fn from(dial_result: Result, (MetaAddr, BoxError)>) -> Self { +impl From> for CrawlerAction { + fn from(dial_result: Result<(SocketAddr, peer::Client), (MetaAddr, BoxError)>) -> Self { use CrawlerAction::*; match dial_result { - Ok(peer_set_change) => HandshakeConnected { peer_set_change }, + Ok((address, client)) => HandshakeConnected { address, client }, Err((candidate, e)) => { debug!(?candidate.addr, ?e, "failed to connect to candidate"); HandshakeFailed { diff --git a/zebra-network/src/peer_set/initialize/tests/vectors.rs b/zebra-network/src/peer_set/initialize/tests/vectors.rs index 8af74298..faa27090 100644 --- a/zebra-network/src/peer_set/initialize/tests/vectors.rs +++ b/zebra-network/src/peer_set/initialize/tests/vectors.rs @@ -26,7 +26,7 @@ use futures::{ FutureExt, StreamExt, }; use tokio::{net::TcpStream, task::JoinHandle}; -use tower::{discover::Change, service_fn, Service}; +use tower::{service_fn, Service}; use tracing::Span; use zebra_chain::{chain_tip::NoChainTip, parameters::Network, serialization::DateTime32}; @@ -40,12 +40,12 @@ use crate::{ peer_set::{ initialize::{ accept_inbound_connections, add_initial_peers, crawl_and_dial, open_listener, - PeerChange, + DiscoveredPeer, }, set::MorePeers, ActiveConnectionCounter, CandidateSet, }, - protocol::types::PeerServices, + protocol::{external::types::Version, types::PeerServices}, AddressBook, BoxError, Config, Request, Response, }; @@ -359,6 +359,7 @@ async fn crawler_peer_limit_one_connect_ok_then_drop() { shutdown_tx: Some(shutdown_tx), server_tx, error_slot, + version: Version(1), }; // Fake the connection closing. @@ -367,7 +368,7 @@ async fn crawler_peer_limit_one_connect_ok_then_drop() { // Give the crawler time to get the message. tokio::task::yield_now().await; - Ok(Change::Insert(addr, fake_client)) + Ok((addr, fake_client)) }); let (config, mut peerset_rx) = @@ -380,7 +381,7 @@ async fn crawler_peer_limit_one_connect_ok_then_drop() { // A peer handshake succeeded. Ok(Some(peer_result)) => { assert!( - matches!(peer_result, Ok(Change::Insert(_, _))), + matches!(peer_result, Ok((_, _))), "unexpected connection error: {:?}\n\ {} previous peers succeeded", peer_result, @@ -431,6 +432,7 @@ async fn crawler_peer_limit_one_connect_ok_stay_open() { shutdown_tx: Some(shutdown_tx), server_tx, error_slot, + version: Version(1), }; // Make the connection staying open. @@ -438,7 +440,7 @@ async fn crawler_peer_limit_one_connect_ok_stay_open() { .unbounded_send(connection_tracker) .expect("unexpected error sending to unbounded channel"); - Ok(Change::Insert(addr, fake_client)) + Ok((addr, fake_client)) } }); @@ -452,7 +454,7 @@ async fn crawler_peer_limit_one_connect_ok_stay_open() { // A peer handshake succeeded. Ok(Some(peer_change_result)) => { assert!( - matches!(peer_change_result, Ok(Change::Insert(_, _))), + matches!(peer_change_result, Ok((_, _))), "unexpected connection error: {:?}\n\ {} previous peers succeeded", peer_change_result, @@ -550,6 +552,7 @@ async fn crawler_peer_limit_default_connect_ok_then_drop() { shutdown_tx: Some(shutdown_tx), server_tx, error_slot, + version: Version(1), }; // Fake the connection closing. @@ -558,7 +561,7 @@ async fn crawler_peer_limit_default_connect_ok_then_drop() { // Give the crawler time to get the message. tokio::task::yield_now().await; - Ok(Change::Insert(addr, fake_client)) + Ok((addr, fake_client)) }); // TODO: tweak the crawler timeouts and rate-limits so we get over the actual limit @@ -573,7 +576,7 @@ async fn crawler_peer_limit_default_connect_ok_then_drop() { // A peer handshake succeeded. Ok(Some(peer_result)) => { assert!( - matches!(peer_result, Ok(Change::Insert(_, _))), + matches!(peer_result, Ok((_, _))), "unexpected connection error: {:?}\n\ {} previous peers succeeded", peer_result, @@ -624,6 +627,7 @@ async fn crawler_peer_limit_default_connect_ok_stay_open() { shutdown_tx: Some(shutdown_tx), server_tx, error_slot, + version: Version(1), }; // Make the connection staying open. @@ -631,7 +635,7 @@ async fn crawler_peer_limit_default_connect_ok_stay_open() { .unbounded_send(connection_tracker) .expect("unexpected error sending to unbounded channel"); - Ok(Change::Insert(addr, fake_client)) + Ok((addr, fake_client)) } }); @@ -646,7 +650,7 @@ async fn crawler_peer_limit_default_connect_ok_stay_open() { // A peer handshake succeeded. Ok(Some(peer_change_result)) => { assert!( - matches!(peer_change_result, Ok(Change::Insert(_, _))), + matches!(peer_change_result, Ok((_, _))), "unexpected connection error: {:?}\n\ {} previous peers succeeded", peer_change_result, @@ -775,6 +779,7 @@ async fn listener_peer_limit_one_handshake_ok_then_drop() { shutdown_tx: Some(shutdown_tx), server_tx, error_slot, + version: Version(1), }; // Actually close the connection. @@ -797,7 +802,7 @@ async fn listener_peer_limit_one_handshake_ok_then_drop() { // A peer handshake succeeded. Ok(Some(peer_result)) => { assert!( - matches!(peer_result, Ok(Change::Insert(_, _))), + matches!(peer_result, Ok((_, _))), "unexpected connection error: {:?}\n\ {} previous peers succeeded", peer_result, @@ -851,6 +856,7 @@ async fn listener_peer_limit_one_handshake_ok_stay_open() { shutdown_tx: Some(shutdown_tx), server_tx, error_slot, + version: Version(1), }; // Make the connection staying open. @@ -872,7 +878,7 @@ async fn listener_peer_limit_one_handshake_ok_stay_open() { // A peer handshake succeeded. Ok(Some(peer_change_result)) => { assert!( - matches!(peer_change_result, Ok(Change::Insert(_, _))), + matches!(peer_change_result, Ok((_, _))), "unexpected connection error: {:?}\n\ {} previous peers succeeded", peer_change_result, @@ -979,6 +985,7 @@ async fn listener_peer_limit_default_handshake_ok_then_drop() { shutdown_tx: Some(shutdown_tx), server_tx, error_slot, + version: Version(1), }; // Actually close the connection. @@ -1001,7 +1008,7 @@ async fn listener_peer_limit_default_handshake_ok_then_drop() { // A peer handshake succeeded. Ok(Some(peer_result)) => { assert!( - matches!(peer_result, Ok(Change::Insert(_, _))), + matches!(peer_result, Ok((_, _))), "unexpected connection error: {:?}\n\ {} previous peers succeeded", peer_result, @@ -1055,6 +1062,7 @@ async fn listener_peer_limit_default_handshake_ok_stay_open() { shutdown_tx: Some(shutdown_tx), server_tx, error_slot, + version: Version(1), }; // Make the connection staying open. @@ -1076,7 +1084,7 @@ async fn listener_peer_limit_default_handshake_ok_stay_open() { // A peer handshake succeeded. Ok(Some(peer_change_result)) => { assert!( - matches!(peer_change_result, Ok(Change::Insert(_, _))), + matches!(peer_change_result, Ok((_, _))), "unexpected connection error: {:?}\n\ {} previous peers succeeded", peer_change_result, @@ -1308,13 +1316,10 @@ where async fn spawn_crawler_with_peer_limit( peerset_initial_target_size: impl Into>, outbound_connector: C, -) -> (Config, mpsc::Receiver) +) -> (Config, mpsc::Receiver) where - C: Service< - OutboundConnectorRequest, - Response = Change, - Error = BoxError, - > + Clone + C: Service + + Clone + Send + 'static, C::Future: Send + 'static, @@ -1358,7 +1363,7 @@ where let address_book = Arc::new(std::sync::Mutex::new(address_book)); // Make the channels large enough to hold all the peers. - let (peerset_tx, peerset_rx) = mpsc::channel::(over_limit_peers); + let (peerset_tx, peerset_rx) = mpsc::channel::(over_limit_peers); let (mut demand_tx, demand_rx) = mpsc::channel::(over_limit_peers); let candidates = CandidateSet::new(address_book.clone(), nil_peer_set); @@ -1421,7 +1426,7 @@ where async fn spawn_inbound_listener_with_peer_limit( peerset_initial_target_size: impl Into>, listen_handshaker: S, -) -> (Config, mpsc::Receiver) +) -> (Config, mpsc::Receiver) where S: Service + Clone @@ -1446,7 +1451,7 @@ where // Make enough inbound connections to go over the limit, even if the limit is zero. // Make the channels large enough to hold all the connections. let over_limit_connections = config.peerset_inbound_connection_limit() * 2 + 1; - let (peerset_tx, peerset_rx) = mpsc::channel::(over_limit_connections); + let (peerset_tx, peerset_rx) = mpsc::channel::(over_limit_connections); // Start listening for connections. let listen_fut = accept_inbound_connections( @@ -1522,15 +1527,12 @@ async fn spawn_add_initial_peers( outbound_connector: C, ) -> ( JoinHandle>, - mpsc::Receiver, + mpsc::Receiver, JoinHandle>, ) where - C: Service< - OutboundConnectorRequest, - Response = Change, - Error = BoxError, - > + Clone + C: Service + + Clone + Send + 'static, C::Future: Send + 'static, @@ -1556,7 +1558,7 @@ where ..Config::default() }; - let (peerset_tx, peerset_rx) = mpsc::channel::(peer_count + 1); + let (peerset_tx, peerset_rx) = mpsc::channel::(peer_count + 1); let (_address_book, address_book_updater, address_book_updater_guard) = AddressBookUpdater::spawn(&config, unused_v4); diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index b54d46d7..39e57615 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -72,7 +72,10 @@ use tower::{ Service, }; +use zebra_chain::chain_tip::ChainTip; + use crate::{ + peer::{LoadTrackedClient, MinimumPeerVersion}, peer_set::{ unready_service::{Error as UnreadyError, UnreadyService}, InventoryRegistry, @@ -84,6 +87,9 @@ use crate::{ AddressBook, BoxError, Config, }; +#[cfg(test)] +mod tests; + /// A signal sent by the [`PeerSet`] when it has no ready peers, and gets a request from Zebra. /// /// In response to this signal, the crawler tries to open more peer connections. @@ -106,14 +112,11 @@ pub struct CancelClientWork; /// connections have an ephemeral local or proxy port.) /// /// Otherwise, malicious peers could interfere with other peers' `PeerSet` state. -pub struct PeerSet +pub struct PeerSet where - D: Discover + Unpin, - D::Service: Service + Load, + D: Discover + Unpin, D::Error: Into, - >::Error: Into + 'static, - >::Future: Send + 'static, - ::Metric: Debug, + C: ChainTip, { /// Provides new and deleted peer [`Change`]s to the peer set, /// via the [`Discover`] trait implementation. @@ -130,7 +133,8 @@ where /// If this is `Some(addr)`, `addr` must be a key for a peer in `ready_services`. /// If that peer is removed from `ready_services`, we must set the preselected peer to `None`. /// - /// This is handled by [`PeerSet::take_ready_service`]. + /// This is handled by [`PeerSet::take_ready_service`] and + /// [`PeerSet::disconnect_from_outdated_peers`]. preselected_p2c_peer: Option, /// Stores gossiped inventory hashes from connected peers. @@ -172,30 +176,30 @@ where /// The peer set panics if this size is exceeded. /// If that happens, our connection limit code has a bug. peerset_total_connection_limit: usize, + + /// An endpoint to see the minimum peer protocol version in real time. + /// + /// The minimum version depends on the block height, and [`MinimumPeerVersion`] listens for + /// height changes and determines the correct minimum version. + minimum_peer_version: MinimumPeerVersion, } -impl Drop for PeerSet +impl Drop for PeerSet where - D: Discover + Unpin, - D::Service: Service + Load, + D: Discover + Unpin, D::Error: Into, - >::Error: Into + 'static, - >::Future: Send + 'static, - ::Metric: Debug, + C: ChainTip, { fn drop(&mut self) { self.shut_down_tasks_and_channels() } } -impl PeerSet +impl PeerSet where - D: Discover + Unpin, - D::Service: Service + Load, + D: Discover + Unpin, D::Error: Into, - >::Error: Into + 'static, - >::Future: Send + 'static, - ::Metric: Debug, + C: ChainTip, { /// Construct a peerset which uses `discover` to manage peer connections. /// @@ -216,6 +220,7 @@ where handle_rx: tokio::sync::oneshot::Receiver>>>, inv_stream: broadcast::Receiver<(InventoryHash, SocketAddr)>, address_book: Arc>, + minimum_peer_version: MinimumPeerVersion, ) -> Self { Self { // Ready peers @@ -237,6 +242,9 @@ where last_peer_log: None, address_book, peerset_total_connection_limit: config.peerset_total_connection_limit(), + + // Real-time parameters + minimum_peer_version, } } @@ -348,7 +356,8 @@ where /// Check busy peer services for request completion or errors. /// - /// Move newly ready services to the ready list, and drop failed services. + /// Move newly ready services to the ready list if they are for peers with supported protocol + /// versions, otherwise they are dropped. Also drop failed services. fn poll_unready(&mut self, cx: &mut Context<'_>) { loop { match Pin::new(&mut self.unready_services).poll_next(cx) { @@ -360,7 +369,10 @@ where trace!(?key, "service became ready"); let cancel = self.cancel_handles.remove(&key); assert!(cancel.is_some(), "missing cancel handle"); - self.ready_services.insert(key, svc); + + if svc.version() >= self.minimum_peer_version.current() { + self.ready_services.insert(key, svc); + } } // Unready -> Canceled @@ -376,8 +388,7 @@ where } // Unready -> Errored - Poll::Ready(Some(Err((key, UnreadyError::Inner(e))))) => { - let error = e.into(); + Poll::Ready(Some(Err((key, UnreadyError::Inner(error))))) => { debug!(%error, "service failed while unready, dropping service"); let cancel = self.cancel_handles.remove(&key); @@ -411,6 +422,26 @@ where } } + /// Checks if the minimum peer version has changed, and disconnects from outdated peers. + fn disconnect_from_outdated_peers(&mut self) { + if let Some(minimum_version) = self.minimum_peer_version.changed() { + // TODO: Remove when the code base migrates to Rust 2021 edition (#2709). + let preselected_p2c_peer = &mut self.preselected_p2c_peer; + + self.ready_services.retain(|address, peer| { + if peer.version() >= minimum_version { + true + } else { + if *preselected_p2c_peer == Some(*address) { + *preselected_p2c_peer = None; + } + + false + } + }); + } + } + /// Takes a ready service by key, invalidating `preselected_p2c_peer` if needed. fn take_ready_service(&mut self, key: &D::Key) -> Option { if let Some(svc) = self.ready_services.remove(key) { @@ -445,17 +476,29 @@ where } } - /// Adds a busy service to the unready list, + /// Adds a busy service to the unready list if it's for a peer with a supported version, /// and adds a cancel handle for the service's current request. + /// + /// If the service is for a connection to an outdated peer, the request is cancelled and the + /// service is dropped. fn push_unready(&mut self, key: D::Key, svc: D::Service) { + let peer_version = svc.version(); let (tx, rx) = oneshot::channel(); - self.cancel_handles.insert(key, tx); + self.unready_services.push(UnreadyService { key: Some(key), service: Some(svc), cancel: rx, _req: PhantomData, }); + + if peer_version >= self.minimum_peer_version.current() { + self.cancel_handles.insert(key, tx); + } else { + // Cancel any request made to the service because it is using an outdated protocol + // version. + let _ = tx.send(CancelClientWork); + } } /// Performs P2C on `self.ready_services` to randomly select a less-loaded ready service. @@ -727,14 +770,11 @@ where } } -impl Service for PeerSet +impl Service for PeerSet where - D: Discover + Unpin, - D::Service: Service + Load, + D: Discover + Unpin, D::Error: Into, - >::Error: Into + 'static, - >::Future: Send + 'static, - ::Metric: Debug, + C: ChainTip, { type Response = Response; type Error = BoxError; @@ -746,6 +786,7 @@ where // Update peer statuses let _ = self.poll_discover(cx)?; + self.disconnect_from_outdated_peers(); self.inventory_registry.poll_inventory(cx)?; self.poll_unready(cx); @@ -772,8 +813,7 @@ where trace!("preselected service is no longer ready, moving to unready list"); self.push_unready(key, service); } - Poll::Ready(Err(e)) => { - let error = e.into(); + Poll::Ready(Err(error)) => { trace!(%error, "preselected service failed, dropping it"); std::mem::drop(service); } diff --git a/zebra-network/src/peer_set/set/tests.rs b/zebra-network/src/peer_set/set/tests.rs new file mode 100644 index 00000000..c030c2c9 --- /dev/null +++ b/zebra-network/src/peer_set/set/tests.rs @@ -0,0 +1,382 @@ +use std::{net::SocketAddr, sync::Arc}; + +use futures::{ + channel::{mpsc, oneshot}, + stream, Stream, StreamExt, +}; +use proptest::{collection::vec, prelude::*}; +use proptest_derive::Arbitrary; +use tokio::{sync::broadcast, task::JoinHandle}; +use tower::{ + discover::{Change, Discover}, + BoxError, +}; +use tracing::Span; + +use zebra_chain::{ + block, + chain_tip::ChainTip, + parameters::{Network, NetworkUpgrade}, +}; + +use super::MorePeers; +use crate::{ + peer::{Client, ClientRequest, ErrorSlot, LoadTrackedClient, MinimumPeerVersion}, + peer_set::PeerSet, + protocol::external::{types::Version, InventoryHash}, + AddressBook, Config, +}; + +#[cfg(test)] +mod prop; + +/// The maximum number of arbitrary peers to generate in [`PeerVersions`]. +/// +/// This affects the maximum number of peer connections added to the [`PeerSet`] during the tests. +const MAX_PEERS: usize = 20; + +/// A handle to a mocked [`Client`] instance. +struct MockedClientHandle { + _request_receiver: mpsc::Receiver, + shutdown_receiver: oneshot::Receiver<()>, + version: Version, +} + +impl MockedClientHandle { + /// Create a new mocked [`Client`] instance, returning it together with a handle to track it. + pub fn new(version: Version) -> (Self, LoadTrackedClient) { + let (shutdown_sender, shutdown_receiver) = oneshot::channel(); + let (request_sender, _request_receiver) = mpsc::channel(1); + + let client = Client { + shutdown_tx: Some(shutdown_sender), + server_tx: request_sender, + error_slot: ErrorSlot::default(), + version, + }; + + let handle = MockedClientHandle { + _request_receiver, + shutdown_receiver, + version, + }; + + (handle, client.into()) + } + + /// Gets the peer protocol version associated to the [`Client`]. + pub fn version(&self) -> Version { + self.version + } + + /// Checks if the [`Client`] instance has not been dropped, which would have disconnected from + /// the peer. + pub fn is_connected(&mut self) -> bool { + match self.shutdown_receiver.try_recv() { + Ok(None) => true, + Ok(Some(())) | Err(oneshot::Canceled) => false, + } + } +} + +/// A helper type to generate arbitrary peer versions which can then become mock peer services. +#[derive(Arbitrary, Debug)] +struct PeerVersions { + #[proptest(strategy = "vec(any::(), 1..MAX_PEERS)")] + peer_versions: Vec, +} + +impl PeerVersions { + /// Convert the arbitrary peer versions into mock peer services. + /// + /// Each peer versions results in a mock peer service, which is returned as a tuple. The first + /// element is the [`LeadTrackedClient`], which is the actual service for the peer connection. + /// The second element is a [`MockedClientHandle`], which contains the open endpoints of the + /// mock channels used by the peer service. + pub fn mock_peers(&self) -> (Vec, Vec) { + let mut clients = Vec::with_capacity(self.peer_versions.len()); + let mut handles = Vec::with_capacity(self.peer_versions.len()); + + for peer_version in &self.peer_versions { + let (handle, client) = MockedClientHandle::new(*peer_version); + + clients.push(client); + handles.push(handle); + } + + (clients, handles) + } + + /// Convert the arbitrary peer versions into mock peer services available through a + /// [`Discover`] compatible stream. + /// + /// A tuple is returned, where the first item is a stream with the mock peers available through + /// a [`Discover`] interface, and the second is a list of handles to the mocked services. + /// + /// The returned stream never finishes, so it is ready to be passed to the [`PeerSet`] + /// constructor. + /// + /// See [`Self::mock_peers`] for details on how the peers are mocked and on what the handles + /// contain. + pub fn mock_peer_discovery( + &self, + ) -> ( + impl Stream, BoxError>>, + Vec, + ) { + let (clients, handles) = self.mock_peers(); + let fake_ports = 1_u16..; + + let discovered_peers_iterator = fake_ports.zip(clients).map(|(port, client)| { + let peer_address = SocketAddr::new([127, 0, 0, 1].into(), port); + + Ok(Change::Insert(peer_address, client)) + }); + + let discovered_peers = stream::iter(discovered_peers_iterator).chain(stream::pending()); + + (discovered_peers, handles) + } +} + +/// A helper builder type for creating test [`PeerSet`] instances. +/// +/// This helps to reduce repeated boilerplate code. Fields that are not set are configured to use +/// dummy fallbacks. +#[derive(Default)] +struct PeerSetBuilder { + config: Option, + discover: Option, + demand_signal: Option>, + handle_rx: Option>>>>, + inv_stream: Option>, + address_book: Option>>, + minimum_peer_version: Option>, +} + +impl PeerSetBuilder<(), ()> { + /// Create a new [`PeerSetBuilder`] instance. + pub fn new() -> Self { + PeerSetBuilder::default() + } +} + +impl PeerSetBuilder { + /// Use the provided `discover` parameter when constructing the [`PeerSet`] instance. + pub fn with_discover(self, discover: NewD) -> PeerSetBuilder { + PeerSetBuilder { + discover: Some(discover), + config: self.config, + demand_signal: self.demand_signal, + handle_rx: self.handle_rx, + inv_stream: self.inv_stream, + address_book: self.address_book, + minimum_peer_version: self.minimum_peer_version, + } + } + + /// Use the provided [`MinimumPeerVersion`] instance when constructing the [`PeerSet`] instance. + pub fn with_minimum_peer_version( + self, + minimum_peer_version: MinimumPeerVersion, + ) -> PeerSetBuilder { + PeerSetBuilder { + minimum_peer_version: Some(minimum_peer_version), + config: self.config, + discover: self.discover, + demand_signal: self.demand_signal, + handle_rx: self.handle_rx, + inv_stream: self.inv_stream, + address_book: self.address_book, + } + } +} + +impl PeerSetBuilder +where + D: Discover + Unpin, + D::Error: Into, + C: ChainTip, +{ + /// Finish building the [`PeerSet`] instance. + /// + /// Returns a tuple with the [`PeerSet`] instance and a [`PeerSetGuard`] to keep track of some + /// endpoints of channels created for the [`PeerSet`]. + pub fn build(self) -> (PeerSet, PeerSetGuard) { + let mut guard = PeerSetGuard::new(); + + let config = self.config.unwrap_or_default(); + let discover = self.discover.expect("`discover` must be set"); + let minimum_peer_version = self + .minimum_peer_version + .expect("`minimum_peer_version` must be set"); + + let demand_signal = self + .demand_signal + .unwrap_or_else(|| guard.create_demand_sender()); + let handle_rx = self + .handle_rx + .unwrap_or_else(|| guard.create_background_tasks_receiver()); + let inv_stream = self + .inv_stream + .unwrap_or_else(|| guard.create_inventory_receiver()); + + let address_book = guard.prepare_address_book(self.address_book); + + let peer_set = PeerSet::new( + &config, + discover, + demand_signal, + handle_rx, + inv_stream, + address_book, + minimum_peer_version, + ); + + (peer_set, guard) + } +} + +/// A helper type to keep track of some dummy endpoints sent to a test [`PeerSet`] instance. +#[derive(Default)] +pub struct PeerSetGuard { + background_tasks_sender: + Option>>>>, + demand_receiver: Option>, + inventory_sender: Option>, + address_book: Option>>, +} + +impl PeerSetGuard { + /// Create a new empty [`PeerSetGuard`] instance. + pub fn new() -> Self { + PeerSetGuard::default() + } + + /// Create a dummy channel for the background tasks sent to the [`PeerSet`]. + /// + /// The sender is stored inside the [`PeerSetGuard`], while the receiver is returned to be + /// passed to the [`PeerSet`] constructor. + pub fn create_background_tasks_receiver( + &mut self, + ) -> tokio::sync::oneshot::Receiver>>> { + let (sender, receiver) = tokio::sync::oneshot::channel(); + + self.background_tasks_sender = Some(sender); + + receiver + } + + /// Create a dummy channel for the [`PeerSet`] to send demand signals for more peers. + /// + /// The receiver is stored inside the [`PeerSetGuard`], while the sender is returned to be + /// passed to the [`PeerSet`] constructor. + pub fn create_demand_sender(&mut self) -> mpsc::Sender { + let (sender, receiver) = mpsc::channel(1); + + self.demand_receiver = Some(receiver); + + sender + } + + /// Create a dummy channel for the inventory hashes sent to the [`PeerSet`]. + /// + /// The sender is stored inside the [`PeerSetGuard`], while the receiver is returned to be + /// passed to the [`PeerSet`] constructor. + pub fn create_inventory_receiver( + &mut self, + ) -> broadcast::Receiver<(InventoryHash, SocketAddr)> { + let (sender, receiver) = broadcast::channel(1); + + self.inventory_sender = Some(sender); + + receiver + } + + /// Prepare an [`AddressBook`] instance to send to the [`PeerSet`]. + /// + /// If the `maybe_address_book` parameter contains an [`AddressBook`] instance, it is stored + /// inside the [`PeerSetGuard`] to keep track of it. Otherwise, a new instance is created with + /// the [`Self::fallback_address_book`] method. + /// + /// A reference to the [`AddressBook`] instance tracked by the [`PeerSetGuard`] is returned to + /// be passed to the [`PeerSet`] constructor. + pub fn prepare_address_book( + &mut self, + maybe_address_book: Option>>, + ) -> Arc> { + let address_book = maybe_address_book.unwrap_or_else(Self::fallback_address_book); + + self.address_book = Some(address_book.clone()); + + address_book + } + + /// Create an empty [`AddressBook`] instance using a dummy local listener address. + fn fallback_address_book() -> Arc> { + let local_listener = "127.0.0.1:1000" + .parse() + .expect("Invalid local listener address"); + let address_book = AddressBook::new(local_listener, Span::none()); + + Arc::new(std::sync::Mutex::new(address_book)) + } +} + +/// A pair of block height values, where one is before and the other is at or after an arbitrary +/// network upgrade's activation height. +#[derive(Clone, Copy, Debug)] +pub struct BlockHeightPairAcrossNetworkUpgrades { + /// The network for which the block height values represent heights before and after an + /// upgrade. + pub network: Network, + + /// The block height before the network upgrade activation. + pub before_upgrade: block::Height, + + /// The block height at or after the network upgrade activation. + pub after_upgrade: block::Height, +} + +impl Arbitrary for BlockHeightPairAcrossNetworkUpgrades { + type Parameters = (); + + fn arbitrary_with((): Self::Parameters) -> Self::Strategy { + any::<(Network, NetworkUpgrade)>() + // Filter out genesis upgrade because there is no block height before genesis. + .prop_filter("no block height before genesis", |(_, upgrade)| { + !matches!(upgrade, NetworkUpgrade::Genesis) + }) + // Filter out network upgrades without activation heights. + .prop_filter_map( + "missing activation height for network upgrade", + |(network, upgrade)| { + upgrade + .activation_height(network) + .map(|height| (network, height)) + }, + ) + // Obtain random heights before and after (or at) the network upgrade activation. + .prop_flat_map(|(network, activation_height)| { + let before_upgrade_strategy = 0..activation_height.0; + let after_upgrade_strategy = activation_height.0..; + + ( + Just(network), + before_upgrade_strategy, + after_upgrade_strategy, + ) + }) + // Collect the arbitrary values to build the final type. + .prop_map(|(network, before_upgrade_height, after_upgrade_height)| { + BlockHeightPairAcrossNetworkUpgrades { + network, + before_upgrade: block::Height(before_upgrade_height), + after_upgrade: block::Height(after_upgrade_height), + } + }) + .boxed() + } + + type Strategy = BoxedStrategy; +} diff --git a/zebra-network/src/peer_set/set/tests/prop.rs b/zebra-network/src/peer_set/set/tests/prop.rs new file mode 100644 index 00000000..3ca138c1 --- /dev/null +++ b/zebra-network/src/peer_set/set/tests/prop.rs @@ -0,0 +1,136 @@ +use std::net::SocketAddr; + +use futures::FutureExt; +use proptest::prelude::*; +use tower::{discover::Discover, BoxError, ServiceExt}; + +use zebra_chain::{block, chain_tip::ChainTip, parameters::Network}; + +use super::{ + BlockHeightPairAcrossNetworkUpgrades, MockedClientHandle, PeerSetBuilder, PeerVersions, +}; +use crate::{ + peer::{LoadTrackedClient, MinimumPeerVersion}, + peer_set::PeerSet, + protocol::external::types::Version, +}; + +proptest! { + /// Check if discovered outdated peers are immediately dropped by the [`PeerSet`]. + #[test] + fn only_non_outdated_peers_are_accepted( + network in any::(), + block_height in any::(), + peer_versions in any::(), + ) { + let runtime = zebra_test::init_async(); + + let (discovered_peers, mut handles) = peer_versions.mock_peer_discovery(); + let (mut minimum_peer_version, best_tip_height) = + MinimumPeerVersion::with_mock_chain_tip(network); + + best_tip_height + .send(Some(block_height)) + .expect("receiving endpoint lives as long as `minimum_peer_version`"); + + let current_minimum_version = minimum_peer_version.current(); + + runtime.block_on(async move { + let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new() + .with_discover(discovered_peers) + .with_minimum_peer_version(minimum_peer_version) + .build(); + + check_if_only_up_to_date_peers_are_live( + &mut peer_set, + &mut handles, + current_minimum_version, + )?; + + Ok::<_, TestCaseError>(()) + })?; + } + + /// Check if peers that become outdated after a network upgrade are dropped by the [`PeerSet`]. + #[test] + fn outdated_peers_are_dropped_on_network_upgrade( + block_heights in any::(), + peer_versions in any::(), + ) { + let runtime = zebra_test::init_async(); + + let (discovered_peers, mut handles) = peer_versions.mock_peer_discovery(); + let (mut minimum_peer_version, best_tip_height) = + MinimumPeerVersion::with_mock_chain_tip(block_heights.network); + + best_tip_height + .send(Some(block_heights.before_upgrade)) + .expect("receiving endpoint lives as long as `minimum_peer_version`"); + + runtime.block_on(async move { + let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new() + .with_discover(discovered_peers) + .with_minimum_peer_version(minimum_peer_version.clone()) + .build(); + + check_if_only_up_to_date_peers_are_live( + &mut peer_set, + &mut handles, + minimum_peer_version.current(), + )?; + + best_tip_height + .send(Some(block_heights.after_upgrade)) + .expect("receiving endpoint lives as long as `minimum_peer_version`"); + + check_if_only_up_to_date_peers_are_live( + &mut peer_set, + &mut handles, + minimum_peer_version.current(), + )?; + + Ok::<_, TestCaseError>(()) + })?; + } +} + +/// Check if only peers with up-to-date protocol versions are live. +/// +/// This will poll the `peer_set` to allow it to drop outdated peers, and then check the peer +/// `handles` to assert that only up-to-date peers are kept by the `peer_set`. +fn check_if_only_up_to_date_peers_are_live( + peer_set: &mut PeerSet, + handles: &mut Vec, + minimum_version: Version, +) -> Result<(), TestCaseError> +where + D: Discover + Unpin, + D::Error: Into, + C: ChainTip, +{ + // Force `poll_discover` to be called to process all discovered peers. + let poll_result = peer_set.ready().now_or_never(); + let all_peers_are_outdated = handles + .iter() + .all(|handle| handle.version() < minimum_version); + + if all_peers_are_outdated { + prop_assert!(matches!(poll_result, None)); + } else { + prop_assert!(matches!(poll_result, Some(Ok(_)))); + } + + for handle in handles { + let is_outdated = handle.version() < minimum_version; + let is_connected = handle.is_connected(); + + prop_assert!( + is_connected != is_outdated, + "is_connected: {}, is_outdated: {}", + is_connected, + is_outdated, + ); + } + + Ok(()) +} diff --git a/zebra-network/src/protocol/external/arbitrary.rs b/zebra-network/src/protocol/external/arbitrary.rs index 614da606..229d2f0a 100644 --- a/zebra-network/src/protocol/external/arbitrary.rs +++ b/zebra-network/src/protocol/external/arbitrary.rs @@ -9,7 +9,7 @@ use zebra_chain::{block, transaction}; use super::{ addr::{canonical_socket_addr, ipv6_mapped_socket_addr}, - types::PeerServices, + types::{PeerServices, Version}, InventoryHash, Message, }; @@ -112,6 +112,18 @@ impl Message { } } +impl Arbitrary for Version { + type Parameters = (); + + fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy { + prop_oneof![170_002_u32..=170_015, 0_u32..] + .prop_map(Version) + .boxed() + } + + type Strategy = BoxedStrategy; +} + /// Returns a random canonical Zebra `SocketAddr`. /// /// See [`canonical_ip_addr`] for details. diff --git a/zebra-network/src/protocol/external/types.rs b/zebra-network/src/protocol/external/types.rs index c27ca9ad..f404835e 100644 --- a/zebra-network/src/protocol/external/types.rs +++ b/zebra-network/src/protocol/external/types.rs @@ -38,7 +38,6 @@ impl From for Magic { /// A protocol version number. #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)] -#[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))] pub struct Version(pub u32); impl fmt::Display for Version {