From 806dd0f24ca368bfeafd8265c43c2a867fc2ba4a Mon Sep 17 00:00:00 2001 From: teor Date: Thu, 15 Sep 2022 01:00:25 +1000 Subject: [PATCH] feat(net): return peer metadata from `connect_isolated` functions (#4870) * Move version into a ConnectionInfo struct * Add negotiated version to ConnectionInfo Part of this change was generated using: ``` fastmod --fixed-strings ".version(" ".remote_version(" zebra-network ``` * Add the peer address to ConnectionInfo, add ConnectionInfo to Connection * Return a Client instance from connect_isolated_* functions This allows library users to access client ConnectionInfo. * Add and improve debug formatting * Add peer services and user agent to ConnectionInfo * Export the Client type, and fix up a zebrad test * Export types used by the public API * Split VersionMessage into its own struct * Use VersionMessage in ConnectionInfo * Add a public API test for ConnectionInfo * Wrap ConnectionInfo in an Arc * Fix some doc links --- zebra-network/src/isolated.rs | 16 +- zebra-network/src/isolated/tests/vectors.rs | 5 +- zebra-network/src/isolated/tor.rs | 11 +- zebra-network/src/lib.rs | 17 +- zebra-network/src/peer.rs | 2 +- zebra-network/src/peer/client.rs | 24 ++- zebra-network/src/peer/client/tests.rs | 64 +++++-- zebra-network/src/peer/connection.rs | 30 +++- zebra-network/src/peer/connection/tests.rs | 43 ++++- zebra-network/src/peer/handshake.rs | 164 ++++++++++++------ zebra-network/src/peer/load_tracked_client.rs | 23 ++- .../src/peer/minimum_peer_version.rs | 13 ++ zebra-network/src/peer_set/set.rs | 6 +- zebra-network/src/peer_set/set/tests/prop.rs | 4 +- zebra-network/src/protocol/external.rs | 4 +- zebra-network/src/protocol/external/codec.rs | 19 +- .../src/protocol/external/message.rs | 141 +++++++++------ zebra-network/tests/acceptance.rs | 45 +++++ .../components/inbound/tests/real_peer_set.rs | 5 +- 19 files changed, 450 insertions(+), 186 deletions(-) create mode 100644 zebra-network/tests/acceptance.rs diff --git a/zebra-network/src/isolated.rs b/zebra-network/src/isolated.rs index 42240281..41ecdc23 100644 --- a/zebra-network/src/isolated.rs +++ b/zebra-network/src/isolated.rs @@ -4,15 +4,12 @@ use std::{future::Future, net::SocketAddr}; use futures::future::TryFutureExt; use tokio::io::{AsyncRead, AsyncWrite}; -use tower::{ - util::{BoxService, Oneshot}, - Service, ServiceExt, -}; +use tower::{util::Oneshot, Service}; use zebra_chain::{chain_tip::NoChainTip, parameters::Network}; use crate::{ - peer::{self, ConnectedAddr, HandshakeRequest}, + peer::{self, Client, ConnectedAddr, HandshakeRequest}, peer_set::ActiveConnectionCounter, BoxError, Config, Request, Response, }; @@ -51,7 +48,7 @@ pub fn connect_isolated( network: Network, data_stream: PeerTransport, user_agent: String, -) -> impl Future, BoxError>> +) -> impl Future> where PeerTransport: AsyncRead + AsyncWrite + Unpin + Send + 'static, { @@ -79,7 +76,7 @@ pub fn connect_isolated_with_inbound( data_stream: PeerTransport, user_agent: String, inbound_service: InboundService, -) -> impl Future, BoxError>> +) -> impl Future> where PeerTransport: AsyncRead + AsyncWrite + Unpin + Send + 'static, InboundService: @@ -111,7 +108,6 @@ where connection_tracker, }, ) - .map_ok(|client| BoxService::new(client.map_err(Into::into))) } /// Creates a direct TCP Zcash peer connection to `addr`. @@ -129,7 +125,7 @@ pub fn connect_isolated_tcp_direct( network: Network, addr: SocketAddr, user_agent: String, -) -> impl Future, BoxError>> { +) -> impl Future> { let nil_inbound_service = tower::service_fn(|_req| async move { Ok::(Response::Nil) }); @@ -150,7 +146,7 @@ pub fn connect_isolated_tcp_direct_with_inbound( addr: SocketAddr, user_agent: String, inbound_service: InboundService, -) -> impl Future, BoxError>> +) -> impl Future> where InboundService: Service + Clone + Send + 'static, diff --git a/zebra-network/src/isolated/tests/vectors.rs b/zebra-network/src/isolated/tests/vectors.rs index 4d8ee446..a182ebc1 100644 --- a/zebra-network/src/isolated/tests/vectors.rs +++ b/zebra-network/src/isolated/tests/vectors.rs @@ -9,6 +9,7 @@ use crate::{ constants::CURRENT_NETWORK_PROTOCOL_VERSION, protocol::external::{AddrInVersion, Codec, Message}, types::PeerServices, + VersionMessage, }; use super::super::*; @@ -127,7 +128,7 @@ async fn check_version_message( PeerTransport: AsyncRead + Unpin, { // We don't need to send any bytes to get a version message. - if let Message::Version { + if let Message::Version(VersionMessage { version, services, timestamp, @@ -137,7 +138,7 @@ async fn check_version_message( user_agent, start_height, relay, - } = inbound_stream + }) = inbound_stream .next() .await .expect("stream item") diff --git a/zebra-network/src/isolated/tor.rs b/zebra-network/src/isolated/tor.rs index b51113a0..d3344a1a 100644 --- a/zebra-network/src/isolated/tor.rs +++ b/zebra-network/src/isolated/tor.rs @@ -4,11 +4,14 @@ use std::sync::{Arc, Mutex}; use arti_client::{DataStream, TorAddr, TorClient, TorClientConfig}; use tor_rtcompat::tokio::TokioRuntimeHandle; -use tower::{util::BoxService, Service}; +use tower::Service; use zebra_chain::parameters::Network; -use crate::{connect_isolated, connect_isolated_with_inbound, BoxError, Request, Response}; +use crate::{ + connect_isolated, connect_isolated_with_inbound, peer::Client as ZebraClient, BoxError, + Request, Response, +}; #[cfg(test)] mod tests; @@ -44,7 +47,7 @@ pub async fn connect_isolated_tor( network: Network, hostname: String, user_agent: String, -) -> Result, BoxError> { +) -> Result { let tor_stream = new_tor_stream(hostname).await?; // Calling connect_isolated_tor_with_inbound causes lifetime issues. @@ -68,7 +71,7 @@ pub async fn connect_isolated_tor_with_inbound( hostname: String, user_agent: String, inbound_service: InboundService, -) -> Result, BoxError> +) -> Result where InboundService: Service + Clone + Send + 'static, diff --git a/zebra-network/src/lib.rs b/zebra-network/src/lib.rs index a8309fa4..ee6dcf72 100644 --- a/zebra-network/src/lib.rs +++ b/zebra-network/src/lib.rs @@ -168,15 +168,24 @@ pub use crate::{ config::Config, isolated::{connect_isolated, connect_isolated_tcp_direct}, meta_addr::PeerAddrState, - peer::{HandshakeError, PeerError, SharedPeerError}, + peer::{Client, ConnectedAddr, ConnectionInfo, HandshakeError, PeerError, SharedPeerError}, peer_set::init, policies::RetryLimit, - protocol::internal::{InventoryResponse, Request, Response}, + protocol::{ + external::{Version, VersionMessage}, + internal::{InventoryResponse, Request, Response}, + }, }; -/// Types used in the definition of [`Request`] and [`Response`] messages. +/// Types used in the definition of [`Request`], [`Response`], and [`VersionMessage`]. pub mod types { - pub use crate::{meta_addr::MetaAddr, protocol::types::PeerServices}; + pub use crate::{ + meta_addr::MetaAddr, + protocol::{ + external::{AddrInVersion, Nonce}, + types::PeerServices, + }, + }; #[cfg(any(test, feature = "proptest-impl"))] pub use crate::protocol::external::InventoryHash; diff --git a/zebra-network/src/peer.rs b/zebra-network/src/peer.rs index d22497c0..52291d64 100644 --- a/zebra-network/src/peer.rs +++ b/zebra-network/src/peer.rs @@ -25,7 +25,7 @@ pub use client::Client; pub use connection::Connection; pub use connector::{Connector, OutboundConnectorRequest}; pub use error::{ErrorSlot, HandshakeError, PeerError, SharedPeerError}; -pub use handshake::{ConnectedAddr, Handshake, HandshakeRequest}; +pub use handshake::{ConnectedAddr, ConnectionInfo, Handshake, HandshakeRequest}; pub use load_tracked_client::LoadTrackedClient; pub use minimum_peer_version::MinimumPeerVersion; pub use priority::{AttributePreference, PeerPreference}; diff --git a/zebra-network/src/peer/client.rs b/zebra-network/src/peer/client.rs index 961e761d..8b2a6198 100644 --- a/zebra-network/src/peer/client.rs +++ b/zebra-network/src/peer/client.rs @@ -6,6 +6,7 @@ use std::{ iter, net::SocketAddr, pin::Pin, + sync::Arc, task::{Context, Poll}, }; @@ -19,10 +20,13 @@ use tokio::{sync::broadcast, task::JoinHandle}; use tower::Service; use crate::{ - peer::error::{AlreadyErrored, ErrorSlot, PeerError, SharedPeerError}, + peer::{ + error::{AlreadyErrored, ErrorSlot, PeerError, SharedPeerError}, + ConnectionInfo, + }, peer_set::InventoryChange, protocol::{ - external::{types::Version, InventoryHash}, + external::InventoryHash, internal::{Request, Response}, }, BoxError, @@ -33,6 +37,9 @@ pub mod tests; /// The "client" duplex half of a peer connection. pub struct Client { + /// The metadata for the connected peer `service`. + pub connection_info: Arc, + /// Used to shut down the corresponding heartbeat. /// This is always Some except when we take it on drop. pub(crate) shutdown_tx: Option>, @@ -44,17 +51,11 @@ pub struct Client { /// so that the peer set can route retries to other clients. pub(crate) inv_collector: broadcast::Sender, - /// The peer address for registering missing inventory. - pub(crate) transient_addr: Option, - /// A slot for an error shared between the Connection and the Client that uses it. /// /// `None` unless the connection or client have errored. pub(crate) error_slot: ErrorSlot, - /// The peer connection's protocol version. - pub(crate) version: Version, - /// A handle to the task responsible for connecting to the peer. pub(crate) connection_task: JoinHandle<()>, @@ -84,6 +85,8 @@ pub(crate) struct ClientRequest { pub inv_collector: Option>, /// The peer address for registering missing inventory. + /// + /// TODO: replace this with `ConnectedAddr`? pub transient_addr: Option, /// The tracing context for the request, so that work the connection task does @@ -170,7 +173,10 @@ impl std::fmt::Debug for Client { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { // skip the channels, they don't tell us anything useful f.debug_struct("Client") + .field("connection_info", &self.connection_info) .field("error_slot", &self.error_slot) + .field("connection_task", &self.connection_task) + .field("heartbeat_task", &self.heartbeat_task) .finish() } } @@ -594,7 +600,7 @@ impl Service for Client { request, tx, inv_collector: Some(self.inv_collector.clone()), - transient_addr: self.transient_addr, + transient_addr: self.connection_info.connected_addr.get_transient_addr(), span, }) { Err(e) => { diff --git a/zebra-network/src/peer/client/tests.rs b/zebra-network/src/peer/client/tests.rs index 354a02f0..4b3d0eef 100644 --- a/zebra-network/src/peer/client/tests.rs +++ b/zebra-network/src/peer/client/tests.rs @@ -3,8 +3,13 @@ #![cfg_attr(feature = "proptest-impl", allow(dead_code))] -use std::time::Duration; +use std::{ + net::{Ipv4Addr, SocketAddrV4}, + sync::Arc, + time::Duration, +}; +use chrono::Utc; use futures::{ channel::{mpsc, oneshot}, future::{self, AbortHandle, Future, FutureExt}, @@ -14,11 +19,20 @@ use tokio::{ task::JoinHandle, }; +use zebra_chain::block::Height; + use crate::{ - peer::{error::SharedPeerError, CancelHeartbeatTask, Client, ClientRequest, ErrorSlot}, + constants, + peer::{ + error::SharedPeerError, CancelHeartbeatTask, Client, ClientRequest, ConnectionInfo, + ErrorSlot, + }, peer_set::InventoryChange, - protocol::external::types::Version, - BoxError, + protocol::{ + external::{types::Version, AddrInVersion}, + types::{Nonce, PeerServices}, + }, + BoxError, VersionMessage, }; #[cfg(test)] @@ -34,7 +48,7 @@ pub struct ClientTestHarness { #[allow(dead_code)] inv_receiver: Option>, error_slot: ErrorSlot, - version: Version, + remote_version: Version, connection_aborter: AbortHandle, heartbeat_aborter: AbortHandle, } @@ -50,9 +64,9 @@ impl ClientTestHarness { } } - /// Gets the peer protocol version associated to the [`Client`]. - pub fn version(&self) -> Version { - self.version + /// Gets the remote peer protocol version reported by the [`Client`]. + pub fn remote_version(&self) -> Version { + self.remote_version } /// Returns true if the [`Client`] instance still wants connection heartbeats to be sent. @@ -278,20 +292,46 @@ where let (inv_sender, inv_receiver) = broadcast::channel(5); let error_slot = ErrorSlot::default(); - let version = self.version.unwrap_or(Version(0)); + let remote_version = self.version.unwrap_or(Version(0)); let (connection_task, connection_aborter) = Self::spawn_background_task_or_fallback(self.connection_task); let (heartbeat_task, heartbeat_aborter) = Self::spawn_background_task_or_fallback_with_result(self.heartbeat_task); + let negotiated_version = + std::cmp::min(remote_version, constants::CURRENT_NETWORK_PROTOCOL_VERSION); + + let remote = VersionMessage { + version: remote_version, + services: PeerServices::default(), + timestamp: Utc::now(), + address_recv: AddrInVersion::new( + SocketAddrV4::new(Ipv4Addr::LOCALHOST, 1), + PeerServices::default(), + ), + address_from: AddrInVersion::new( + SocketAddrV4::new(Ipv4Addr::LOCALHOST, 2), + PeerServices::default(), + ), + nonce: Nonce::default(), + user_agent: "client test harness".to_string(), + start_height: Height(0), + relay: true, + }; + + let connection_info = Arc::new(ConnectionInfo { + connected_addr: crate::peer::ConnectedAddr::Isolated, + remote, + negotiated_version, + }); + let client = Client { + connection_info, shutdown_tx: Some(shutdown_sender), server_tx: client_request_sender, inv_collector: inv_sender, - transient_addr: None, error_slot: error_slot.clone(), - version, connection_task, heartbeat_task, }; @@ -301,7 +341,7 @@ where shutdown_receiver: Some(shutdown_receiver), inv_receiver: Some(inv_receiver), error_slot, - version, + remote_version, connection_aborter, heartbeat_aborter, }; diff --git a/zebra-network/src/peer/connection.rs b/zebra-network/src/peer/connection.rs index 87cdc31d..e72ac1fa 100644 --- a/zebra-network/src/peer/connection.rs +++ b/zebra-network/src/peer/connection.rs @@ -29,7 +29,7 @@ use crate::{ meta_addr::MetaAddr, peer::{ connection::peer_tx::PeerTx, error::AlreadyErrored, ClientRequest, ClientRequestReceiver, - ConnectedAddr, ErrorSlot, InProgressClientRequest, MustUseClientResponseSender, PeerError, + ConnectionInfo, ErrorSlot, InProgressClientRequest, MustUseClientResponseSender, PeerError, SharedPeerError, }, peer_set::ConnectionTracker, @@ -448,6 +448,12 @@ impl From for InboundMessage { /// The channels, services, and associated state for a peer connection. pub struct Connection { + /// The metadata for the connected peer `service`. + /// + /// This field is used for debugging. + #[allow(dead_code)] + pub connection_info: Arc, + /// The state of this connection's current request or response. pub(super) state: State, @@ -505,6 +511,21 @@ pub struct Connection { pub(super) last_metrics_state: Option>, } +impl fmt::Debug for Connection { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + // skip the channels, they don't tell us anything useful + f.debug_struct(std::any::type_name::>()) + .field("connection_info", &self.connection_info) + .field("state", &self.state) + .field("request_timer", &self.request_timer) + .field("cached_addrs", &self.cached_addrs.len()) + .field("error_slot", &self.error_slot) + .field("metrics_label", &self.metrics_label) + .field("last_metrics_state", &self.last_metrics_state) + .finish() + } +} + impl Connection { /// Return a new connection from its channels, services, and shared state. pub(crate) fn new( @@ -513,9 +534,12 @@ impl Connection { error_slot: ErrorSlot, peer_tx: Tx, connection_tracker: ConnectionTracker, - connected_addr: ConnectedAddr, + connection_info: Arc, ) -> Self { + let metrics_label = connection_info.connected_addr.get_transient_addr_label(); + Connection { + connection_info, state: State::AwaitingRequest, request_timer: None, cached_addrs: Vec::new(), @@ -524,7 +548,7 @@ impl Connection { error_slot, peer_tx: peer_tx.into(), connection_tracker, - metrics_label: connected_addr.get_transient_addr_label(), + metrics_label, last_metrics_state: None, } } diff --git a/zebra-network/src/peer/connection/tests.rs b/zebra-network/src/peer/connection/tests.rs index 76d12817..b2239150 100644 --- a/zebra-network/src/peer/connection/tests.rs +++ b/zebra-network/src/peer/connection/tests.rs @@ -1,17 +1,26 @@ //! Tests for peer connections -use std::io; +use std::{ + io, + net::{Ipv4Addr, SocketAddr, SocketAddrV4}, + sync::Arc, +}; +use chrono::Utc; use futures::{channel::mpsc, sink::SinkMapErr, SinkExt}; -use zebra_chain::serialization::SerializationError; +use zebra_chain::{block::Height, serialization::SerializationError}; use zebra_test::mock_service::MockService; use crate::{ - peer::{ClientRequest, ConnectedAddr, Connection, ErrorSlot}, + constants::CURRENT_NETWORK_PROTOCOL_VERSION, + peer::{ClientRequest, ConnectedAddr, Connection, ConnectionInfo, ErrorSlot}, peer_set::ActiveConnectionCounter, - protocol::external::Message, - Request, Response, + protocol::{ + external::{AddrInVersion, Message}, + types::{Nonce, PeerServices}, + }, + Request, Response, VersionMessage, }; mod prop; @@ -45,13 +54,35 @@ fn new_test_connection() -> ( }; let peer_tx = peer_tx.sink_map_err(error_converter); + let fake_addr: SocketAddr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 4).into(); + let fake_version = CURRENT_NETWORK_PROTOCOL_VERSION; + let fake_services = PeerServices::default(); + + let remote = VersionMessage { + version: fake_version, + services: fake_services, + timestamp: Utc::now(), + address_recv: AddrInVersion::new(fake_addr, fake_services), + address_from: AddrInVersion::new(fake_addr, fake_services), + nonce: Nonce::default(), + user_agent: "connection test".to_string(), + start_height: Height(0), + relay: true, + }; + + let connection_info = ConnectionInfo { + connected_addr: ConnectedAddr::Isolated, + remote, + negotiated_version: fake_version, + }; + let connection = Connection::new( mock_inbound_service.clone(), client_rx, shared_error_slot.clone(), peer_tx, ActiveConnectionCounter::new_counter().track_connection(), - ConnectedAddr::Isolated, + Arc::new(connection_info), ); ( diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index dcbc166b..487397c6 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -45,7 +45,7 @@ use crate::{ internal::{Request, Response}, }, types::MetaAddr, - BoxError, Config, + BoxError, Config, VersionMessage, }; /// A [`Service`] that handshakes with a remote peer and constructs a @@ -76,6 +76,25 @@ where parent_span: Span, } +impl fmt::Debug for Handshake +where + S: Service + Clone + Send + 'static, + S::Future: Send, + C: ChainTip + Clone + Send + 'static, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // skip the channels, they don't tell us anything useful + f.debug_struct(std::any::type_name::>()) + .field("config", &self.config) + .field("user_agent", &self.user_agent) + .field("our_services", &self.our_services) + .field("relay", &self.relay) + .field("minimum_peer_version", &self.minimum_peer_version) + .field("parent_span", &self.parent_span) + .finish() + } +} + impl Clone for Handshake where S: Service + Clone + Send + 'static, @@ -98,6 +117,26 @@ where } } +/// The metadata for a peer connection. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ConnectionInfo { + /// The connected peer address, if known. + /// This address might not be valid for outbound connections. + /// + /// Peers can be connected via a transient inbound or proxy address, + /// which will appear as the connected address to the OS and Zebra. + pub connected_addr: ConnectedAddr, + + /// The network protocol [`VersionMessage`](crate::VersionMessage) sent by the remote peer. + pub remote: VersionMessage, + + /// The network protocol version negotiated with the remote peer. + /// + /// Derived from `remote.version` and the + /// [current `zebra_network` protocol version](constants::CURRENT_NETWORK_PROTOCOL_VERSION). + pub negotiated_version: Version, +} + /// The peer address that we are handshaking with. /// /// Typically, we can rely on outbound addresses, but inbound addresses don't @@ -108,7 +147,10 @@ pub enum ConnectedAddr { /// /// In an honest network, a Zcash peer is listening on this exact address /// and port. - OutboundDirect { addr: SocketAddr }, + OutboundDirect { + /// The connected outbound remote address and port. + addr: SocketAddr, + }, /// The address we received from the OS, when a remote peer directly /// connected to our Zcash listener port. @@ -117,7 +159,10 @@ pub enum ConnectedAddr { /// if its outbound address is the same as its listener address. But the port /// is an ephemeral outbound TCP port, not a listener port. InboundDirect { + /// The connected inbound remote address. maybe_ip: IpAddr, + + /// The connected inbound transient remote port. transient_port: u16, }, @@ -127,7 +172,10 @@ pub enum ConnectedAddr { /// outbound address and port can be used as an identifier for the duration /// of this connection. OutboundProxy { + /// The remote address and port of the proxy. proxy_addr: SocketAddr, + + /// The local address and transient port we used to connect to the proxy. transient_local_addr: SocketAddr, }, @@ -136,7 +184,10 @@ pub enum ConnectedAddr { /// /// The proxy's ephemeral outbound address can be used as an identifier for /// the duration of this connection. - InboundProxy { transient_addr: SocketAddr }, + InboundProxy { + /// The local address and transient port we used to connect to the proxy. + transient_addr: SocketAddr, + }, /// An isolated connection, where we deliberately don't have any connection metadata. Isolated, @@ -208,6 +259,8 @@ impl ConnectedAddr { /// This address must not depend on the canonical address from the `Version` /// message. Otherwise, malicious peers could interfere with other peers /// `AddressBook` state. + /// + /// TODO: remove the `get_` from these methods (Rust style avoids `get` prefixes) pub fn get_address_book_addr(&self) -> Option { match self { OutboundDirect { addr } => Some(*addr), @@ -512,6 +565,8 @@ where /// /// We split `Handshake` into its components before calling this function, /// to avoid infectious `Sync` bounds on the returned future. +/// +/// Returns the [`VersionMessage`](crate::VersionMessage) sent by the remote peer. #[allow(clippy::too_many_arguments)] pub async fn negotiate_version( peer_conn: &mut Framed, @@ -522,7 +577,7 @@ pub async fn negotiate_version( our_services: PeerServices, relay: bool, mut minimum_peer_version: MinimumPeerVersion, -) -> Result<(Version, PeerServices, SocketAddr), HandshakeError> +) -> Result where PeerTransport: AsyncRead + AsyncWrite + Unpin + Send + 'static, { @@ -570,7 +625,7 @@ where } }; - let our_version = Message::Version { + let our_version = VersionMessage { version: constants::CURRENT_NETWORK_PROTOCOL_VERSION, services: our_services, timestamp, @@ -581,7 +636,8 @@ where user_agent: user_agent.clone(), start_height: minimum_peer_version.chain_tip_height(), relay, - }; + } + .into(); debug!(?our_version, "sending initial version message"); peer_conn.send(our_version).await?; @@ -592,11 +648,11 @@ where .ok_or(HandshakeError::ConnectionClosed)??; // Wait for next message if the one we got is not Version - loop { + let remote: VersionMessage = loop { match remote_msg { - Message::Version { .. } => { - debug!(?remote_msg, "got version message from remote peer"); - break; + Message::Version(version_message) => { + debug!(?version_message, "got version message from remote peer"); + break version_message; } _ => { remote_msg = peer_conn @@ -606,34 +662,18 @@ where debug!(?remote_msg, "ignoring non-version message from remote peer"); } } + }; + + let remote_address_services = remote.address_from.untrusted_services(); + if remote_address_services != remote.services { + info!( + ?remote.services, + ?remote_address_services, + ?remote.user_agent, + "peer with inconsistent version services and version address services", + ); } - // If we got a Version message, destructure its fields into the local scope. - let (remote_nonce, remote_services, remote_version, remote_canonical_addr, user_agent) = - if let Message::Version { - version, - services, - address_from, - nonce, - user_agent, - .. - } = remote_msg - { - let canonical_addr = address_from.addr(); - let address_services = address_from.untrusted_services(); - if address_services != services { - info!( - ?services, - ?address_services, - "peer with inconsistent version services and version address services" - ); - } - - (nonce, services, version, canonical_addr, user_agent) - } else { - Err(HandshakeError::UnexpectedMessage(Box::new(remote_msg)))? - }; - // Check for nonce reuse, indicating self-connection // // # Correctness @@ -643,7 +683,7 @@ where // released. let nonce_reuse = { let mut locked_nonces = nonces.lock().await; - let nonce_reuse = locked_nonces.contains(&remote_nonce); + let nonce_reuse = locked_nonces.contains(&remote.nonce); // Regardless of whether we observed nonce reuse, clean up the nonce set. locked_nonces.remove(&local_nonce); nonce_reuse @@ -655,12 +695,13 @@ where // SECURITY: Reject connections to peers on old versions, because they might not know about all // network upgrades and could lead to chain forks or slower block propagation. let min_version = minimum_peer_version.current(); - if remote_version < min_version { + if remote.version < min_version { debug!( remote_ip = ?their_addr, - ?remote_version, + ?remote.version, ?min_version, - "disconnecting from peer with obsolete network protocol version" + ?remote.user_agent, + "disconnecting from peer with obsolete network protocol version", ); // the value is the number of rejected handshakes, by peer IP and protocol version @@ -668,29 +709,30 @@ where "zcash.net.peers.obsolete", 1, "remote_ip" => their_addr.to_string(), - "remote_version" => remote_version.to_string(), + "remote_version" => remote.version.to_string(), "min_version" => min_version.to_string(), - "user_agent" => user_agent, + "user_agent" => remote.user_agent.clone(), ); // the value is the remote version of the most recent rejected handshake from each peer metrics::gauge!( "zcash.net.peers.version.obsolete", - remote_version.0 as f64, + remote.version.0 as f64, "remote_ip" => their_addr.to_string(), ); // Disconnect if peer is using an obsolete version. - Err(HandshakeError::ObsoleteVersion(remote_version))?; + Err(HandshakeError::ObsoleteVersion(remote.version))?; } else { - let negotiated_version = min(constants::CURRENT_NETWORK_PROTOCOL_VERSION, remote_version); + let negotiated_version = min(constants::CURRENT_NETWORK_PROTOCOL_VERSION, remote.version); debug!( remote_ip = ?their_addr, - ?remote_version, + ?remote.version, ?negotiated_version, ?min_version, - "negotiated network protocol version with peer" + ?remote.user_agent, + "negotiated network protocol version with peer", ); // the value is the number of connected handshakes, by peer IP and protocol version @@ -698,16 +740,16 @@ where "zcash.net.peers.connected", 1, "remote_ip" => their_addr.to_string(), - "remote_version" => remote_version.to_string(), + "remote_version" => remote.version.to_string(), "negotiated_version" => negotiated_version.to_string(), "min_version" => min_version.to_string(), - "user_agent" => user_agent, + "user_agent" => remote.user_agent.clone(), ); // the value is the remote version of the most recent connected handshake from each peer metrics::gauge!( "zcash.net.peers.version.connected", - remote_version.0 as f64, + remote.version.0 as f64, "remote_ip" => their_addr.to_string(), ); } @@ -736,7 +778,7 @@ where } } - Ok((remote_version, remote_services, remote_canonical_addr)) + Ok(remote) } /// A handshake request. @@ -813,8 +855,7 @@ where .finish(), ); - // Wrap the entire initial connection setup in a timeout. - let (remote_version, remote_services, remote_canonical_addr) = negotiate_version( + let remote = negotiate_version( &mut peer_conn, &connected_addr, config, @@ -826,6 +867,9 @@ where ) .await?; + let remote_canonical_addr = remote.address_from.addr(); + let remote_services = remote.services; + // If we've learned potential peer addresses from an inbound // connection or handshake, add those addresses to our address book. // @@ -853,7 +897,14 @@ where // Set the connection's version to the minimum of the received version or our own. let negotiated_version = - std::cmp::min(remote_version, constants::CURRENT_NETWORK_PROTOCOL_VERSION); + std::cmp::min(remote.version, constants::CURRENT_NETWORK_PROTOCOL_VERSION); + + // Limit containing struct size, and avoid multiple duplicates of 300+ bytes of data. + let connection_info = Arc::new(ConnectionInfo { + connected_addr, + remote, + negotiated_version, + }); // Reconfigure the codec to use the negotiated version. // @@ -970,7 +1021,7 @@ where error_slot.clone(), peer_tx, connection_tracker, - connected_addr, + connection_info.clone(), ); let connection_task = tokio::spawn( @@ -993,12 +1044,11 @@ where ); let client = Client { + connection_info, shutdown_tx: Some(shutdown_tx), server_tx, inv_collector, - transient_addr: connected_addr.get_transient_addr(), error_slot, - version: remote_version, connection_task, heartbeat_task, }; diff --git a/zebra-network/src/peer/load_tracked_client.rs b/zebra-network/src/peer/load_tracked_client.rs index e931a662..49d14307 100644 --- a/zebra-network/src/peer/load_tracked_client.rs +++ b/zebra-network/src/peer/load_tracked_client.rs @@ -1,7 +1,10 @@ //! A peer connection service wrapper type to handle load tracking and provide access to the //! reported protocol version. -use std::task::{Context, Poll}; +use std::{ + sync::Arc, + task::{Context, Poll}, +}; use tower::{ load::{Load, PeakEwma}, @@ -10,7 +13,7 @@ use tower::{ use crate::{ constants::{EWMA_DECAY_TIME_NANOS, EWMA_DEFAULT_RTT}, - peer::Client, + peer::{Client, ConnectionInfo}, protocol::external::types::Version, }; @@ -18,14 +21,17 @@ use crate::{ /// /// It also keeps track of the peer's reported protocol version. pub struct LoadTrackedClient { + /// A service representing a connected peer, wrapped in a load tracker. service: PeakEwma, - version: Version, + + /// The metadata for the connected peer `service`. + connection_info: Arc, } /// Create a new [`LoadTrackedClient`] wrapping the provided `client` service. impl From for LoadTrackedClient { fn from(client: Client) -> Self { - let version = client.version; + let connection_info = client.connection_info.clone(); let service = PeakEwma::new( client, @@ -34,14 +40,17 @@ impl From for LoadTrackedClient { tower::load::CompleteOnResponse::default(), ); - LoadTrackedClient { service, version } + LoadTrackedClient { + service, + connection_info, + } } } impl LoadTrackedClient { /// Retrieve the peer's reported protocol version. - pub fn version(&self) -> Version { - self.version + pub fn remote_version(&self) -> Version { + self.connection_info.remote.version } } diff --git a/zebra-network/src/peer/minimum_peer_version.rs b/zebra-network/src/peer/minimum_peer_version.rs index 7eb6d6d7..97846a64 100644 --- a/zebra-network/src/peer/minimum_peer_version.rs +++ b/zebra-network/src/peer/minimum_peer_version.rs @@ -1,5 +1,7 @@ //! Watches for chain tip height updates to determine the minimum supported peer protocol version. +use std::fmt; + use zebra_chain::{block::Height, chain_tip::ChainTip, parameters::Network}; use crate::protocol::external::types::Version; @@ -16,6 +18,17 @@ pub struct MinimumPeerVersion { has_changed: bool, } +impl fmt::Debug for MinimumPeerVersion { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // skip the chain tip to avoid locking issues + f.debug_struct(std::any::type_name::>()) + .field("network", &self.network) + .field("current_minimum", &self.current_minimum) + .field("has_changed", &self.has_changed) + .finish() + } +} + impl MinimumPeerVersion where C: ChainTip, diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index 37989daa..67ff8ad5 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -439,7 +439,7 @@ where let cancel = self.cancel_handles.remove(&key); assert!(cancel.is_some(), "missing cancel handle"); - if svc.version() >= self.minimum_peer_version.current() { + if svc.remote_version() >= self.minimum_peer_version.current() { self.ready_services.insert(key, svc); } } @@ -509,7 +509,7 @@ where let preselected_p2c_peer = &mut self.preselected_p2c_peer; self.ready_services.retain(|address, peer| { - if peer.version() >= minimum_version { + if peer.remote_version() >= minimum_version { true } else { if *preselected_p2c_peer == Some(*address) { @@ -562,7 +562,7 @@ where /// If the service is for a connection to an outdated peer, the request is cancelled and the /// service is dropped. fn push_unready(&mut self, key: D::Key, svc: D::Service) { - let peer_version = svc.version(); + let peer_version = svc.remote_version(); let (tx, rx) = oneshot::channel(); self.unready_services.push(UnreadyService { diff --git a/zebra-network/src/peer_set/set/tests/prop.rs b/zebra-network/src/peer_set/set/tests/prop.rs index 48555b30..efd7a896 100644 --- a/zebra-network/src/peer_set/set/tests/prop.rs +++ b/zebra-network/src/peer_set/set/tests/prop.rs @@ -299,7 +299,7 @@ where let poll_result = peer_set.ready().now_or_never(); let all_peers_are_outdated = harnesses .iter() - .all(|harness| harness.version() < minimum_version); + .all(|harness| harness.remote_version() < minimum_version); if all_peers_are_outdated { prop_assert!(matches!(poll_result, None)); @@ -309,7 +309,7 @@ where let mut number_of_connected_peers = 0; for harness in harnesses { - let is_outdated = harness.version() < minimum_version; + let is_outdated = harness.remote_version() < minimum_version; let is_connected = harness.wants_connection_heartbeats(); prop_assert!( diff --git a/zebra-network/src/protocol/external.rs b/zebra-network/src/protocol/external.rs index d0481822..abecdc05 100644 --- a/zebra-network/src/protocol/external.rs +++ b/zebra-network/src/protocol/external.rs @@ -19,5 +19,7 @@ mod tests; pub use addr::{canonical_socket_addr, AddrInVersion}; pub use codec::Codec; pub use inv::InventoryHash; -pub use message::Message; +pub use message::{Message, VersionMessage}; +pub use types::{Nonce, Version}; + pub use zebra_chain::serialization::MAX_PROTOCOL_MESSAGE_LEN; diff --git a/zebra-network/src/protocol/external/codec.rs b/zebra-network/src/protocol/external/codec.rs index 1c6de9b8..6dea5ea9 100644 --- a/zebra-network/src/protocol/external/codec.rs +++ b/zebra-network/src/protocol/external/codec.rs @@ -26,7 +26,7 @@ use crate::constants; use super::{ addr::{AddrInVersion, AddrV1, AddrV2}, - message::{Message, RejectReason}, + message::{Message, RejectReason, VersionMessage}, types::*, }; @@ -195,7 +195,7 @@ impl Codec { /// contain a checksum of the message body. fn write_body(&self, msg: &Message, mut writer: W) -> Result<(), Error> { match msg { - Message::Version { + Message::Version(VersionMessage { version, services, timestamp, @@ -205,7 +205,7 @@ impl Codec { user_agent, start_height, relay, - } => { + }) => { writer.write_u32::(version.0)?; writer.write_u64::(services.bits())?; // # Security @@ -465,7 +465,7 @@ impl Decoder for Codec { impl Codec { fn read_version(&self, mut reader: R) -> Result { - Ok(Message::Version { + Ok(VersionMessage { version: Version(reader.read_u32::()?), // Use from_bits_truncate to discard unknown service bits. services: PeerServices::from_bits_truncate(reader.read_u64::()?), @@ -485,7 +485,8 @@ impl Codec { 1 => true, _ => return Err(Error::Parse("non-bool value supplied in relay field")), }, - }) + } + .into()) } fn read_verack(&self, mut _reader: R) -> Result { @@ -723,9 +724,7 @@ impl Codec { } } -// TODO: -// - move these unit tests to a separate file -// - add exterior integration tests + proptest +// TODO: move these tests to their own module #[cfg(test)] mod tests { use std::net::{IpAddr, Ipv4Addr, SocketAddr}; @@ -740,7 +739,8 @@ mod tests { static ref VERSION_TEST_VECTOR: Message = { let services = PeerServices::NODE_NETWORK; let timestamp = Utc.timestamp(1_568_000_000, 0); - Message::Version { + + VersionMessage { version: crate::constants::CURRENT_NETWORK_PROTOCOL_VERSION, services, timestamp, @@ -757,6 +757,7 @@ mod tests { start_height: block::Height(540_000), relay: true, } + .into() }; } diff --git a/zebra-network/src/protocol/external/message.rs b/zebra-network/src/protocol/external/message.rs index a4681cfd..2ea54698 100644 --- a/zebra-network/src/protocol/external/message.rs +++ b/zebra-network/src/protocol/external/message.rs @@ -9,7 +9,7 @@ use zebra_chain::{ transaction::UnminedTx, }; -use crate::meta_addr::MetaAddr; +use crate::{meta_addr::MetaAddr, BoxError}; use super::{addr::AddrInVersion, inv::InventoryHash, types::*}; @@ -45,54 +45,7 @@ pub enum Message { /// is distinct from a simple version number. /// /// [Bitcoin reference](https://en.bitcoin.it/wiki/Protocol_documentation#version) - Version { - /// The network version number supported by the sender. - version: Version, - - /// The network services advertised by the sender. - services: PeerServices, - - /// The time when the version message was sent. - /// - /// This is a 64-bit field. Zebra rejects out-of-range times as invalid. - #[cfg_attr( - any(test, feature = "proptest-impl"), - proptest(strategy = "datetime_full()") - )] - timestamp: DateTime, - - /// The network address of the node receiving this message, and its - /// advertised network services. - /// - /// Q: how does the handshake know the remote peer's services already? - address_recv: AddrInVersion, - - /// The network address of the node sending this message, and its - /// advertised network services. - address_from: AddrInVersion, - - /// Node random nonce, randomly generated every time a version - /// packet is sent. This nonce is used to detect connections - /// to self. - nonce: Nonce, - - /// The Zcash user agent advertised by the sender. - user_agent: String, - - /// The last block received by the emitting node. - start_height: block::Height, - - /// Whether the remote peer should announce relayed - /// transactions or not, see [BIP 0037]. - /// - /// Zebra does not implement the bloom filters in [BIP 0037]. - /// Instead, it only relays: - /// - newly verified best chain block hashes and mempool transaction IDs, - /// - after it reaches the chain tip. - /// - /// [BIP 0037]: https://github.com/bitcoin/bips/blob/master/bip-0037.mediawiki - relay: bool, - }, + Version(VersionMessage), /// A `verack` message. /// @@ -340,6 +293,69 @@ pub enum Message { FilterClear, } +/// A `version` message. +/// +/// Note that although this is called `version` in Bitcoin, its role is really +/// analogous to a `ClientHello` message in TLS, used to begin a handshake, and +/// is distinct from a simple version number. +/// +/// This struct provides a type that is guaranteed to be a `version` message, +/// and allows [`Message::Version`](Message) fields to be accessed directly. +/// +/// [Bitcoin reference](https://en.bitcoin.it/wiki/Protocol_documentation#version) +#[derive(Clone, Eq, PartialEq, Debug)] +#[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))] +pub struct VersionMessage { + /// The network version number supported by the sender. + pub version: Version, + + /// The network services advertised by the sender. + pub services: PeerServices, + + /// The time when the version message was sent. + /// + /// This is a 64-bit field. Zebra rejects out-of-range times as invalid. + /// + /// TODO: replace with a custom DateTime64 type (#2171) + #[cfg_attr( + any(test, feature = "proptest-impl"), + proptest(strategy = "datetime_full()") + )] + pub timestamp: DateTime, + + /// The network address of the node receiving this message, and its + /// advertised network services. + /// + /// Q: how does the handshake know the remote peer's services already? + pub address_recv: AddrInVersion, + + /// The network address of the node sending this message, and its + /// advertised network services. + pub address_from: AddrInVersion, + + /// Node random nonce, randomly generated every time a version + /// packet is sent. This nonce is used to detect connections + /// to self. + pub nonce: Nonce, + + /// The Zcash user agent advertised by the sender. + pub user_agent: String, + + /// The last block received by the emitting node. + pub start_height: block::Height, + + /// Whether the remote peer should announce relayed + /// transactions or not, see [BIP 0037]. + /// + /// Zebra does not implement the bloom filters in [BIP 0037]. + /// Instead, it only relays: + /// - newly verified best chain block hashes and mempool transaction IDs, + /// - after it reaches the chain tip. + /// + /// [BIP 0037]: https://github.com/bitcoin/bips/blob/master/bip-0037.mediawiki + pub relay: bool, +} + /// The maximum size of the rejection message. /// /// This is equivalent to `COMMAND_SIZE` in zcashd. @@ -350,6 +366,27 @@ const MAX_REJECT_MESSAGE_LENGTH: usize = 12; /// This is equivalent to `MAX_REJECT_MESSAGE_LENGTH` in zcashd. const MAX_REJECT_REASON_LENGTH: usize = 111; +impl From for Message { + fn from(version_message: VersionMessage) -> Self { + Message::Version(version_message) + } +} + +impl TryFrom for VersionMessage { + type Error = BoxError; + + fn try_from(message: Message) -> Result { + match message { + Message::Version(version_message) => Ok(version_message), + _ => Err(format!( + "{} message is not a version message: {message:?}", + message.command() + ) + .into()), + } + } +} + // TODO: add tests for Error conversion and Reject message serialization (#4633) // (Zebra does not currently send reject messages, and it ignores received reject messages.) impl From for Message @@ -408,13 +445,13 @@ pub enum RejectReason { impl fmt::Display for Message { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.write_str(&match self { - Message::Version { + Message::Version(VersionMessage { version, address_recv, address_from, user_agent, .. - } => format!( + }) => format!( "version {{ network: {}, recv: {},_from: {}, user_agent: {:?} }}", version, address_recv.addr(), @@ -481,7 +518,7 @@ impl Message { /// Returns the Zcash protocol message command as a string. pub fn command(&self) -> &'static str { match self { - Message::Version { .. } => "version", + Message::Version(_) => "version", Message::Verack => "verack", Message::Ping(_) => "ping", Message::Pong(_) => "pong", diff --git a/zebra-network/tests/acceptance.rs b/zebra-network/tests/acceptance.rs new file mode 100644 index 00000000..c301b900 --- /dev/null +++ b/zebra-network/tests/acceptance.rs @@ -0,0 +1,45 @@ +//! Acceptance tests for zebra-network APIs. + +use std::{ + net::{Ipv4Addr, SocketAddr, SocketAddrV4}, + sync::Arc, +}; + +use chrono::Utc; + +use zebra_chain::block::Height; +use zebra_network::{ + types::{AddrInVersion, Nonce, PeerServices}, + ConnectedAddr, ConnectionInfo, Version, VersionMessage, +}; + +/// Test that the types used in [`ConnectionInfo`] are public, +/// by compiling code that explicitly uses those types. +#[test] +fn connection_info_types_are_public() { + let fake_addr: SocketAddr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 3).into(); + let fake_version = Version(3); + let fake_services = PeerServices::default(); + + // Each struct field must have its type explicitly listed here + let connected_addr: ConnectedAddr = ConnectedAddr::OutboundDirect { addr: fake_addr }; + let negotiated_version: Version = fake_version; + + let remote = VersionMessage { + version: fake_version, + services: fake_services, + timestamp: Utc::now(), + address_recv: AddrInVersion::new(fake_addr, fake_services), + address_from: AddrInVersion::new(fake_addr, fake_services), + nonce: Nonce::default(), + user_agent: "public API compile test".to_string(), + start_height: Height(0), + relay: true, + }; + + let _connection_info = Arc::new(ConnectionInfo { + connected_addr, + remote, + negotiated_version, + }); +} diff --git a/zebrad/src/components/inbound/tests/real_peer_set.rs b/zebrad/src/components/inbound/tests/real_peer_set.rs index 151663f1..3e3cb6be 100644 --- a/zebrad/src/components/inbound/tests/real_peer_set.rs +++ b/zebrad/src/components/inbound/tests/real_peer_set.rs @@ -607,10 +607,7 @@ async fn setup( ) -> ( // real services // connected peer which responds with isolated_peer_response - Buffer< - BoxService, - zebra_network::Request, - >, + Buffer, // inbound service BoxCloneService, // outbound peer set (only has the connected peer)