diff --git a/zebra-network/src/config.rs b/zebra-network/src/config.rs index 78662a71..067a50ba 100644 --- a/zebra-network/src/config.rs +++ b/zebra-network/src/config.rs @@ -320,11 +320,6 @@ impl Config { Ok(Ok(ip_addrs)) => { let ip_addrs: Vec = ip_addrs.map(canonical_peer_addr).collect(); - // if we're logging at debug level, - // the full list of IP addresses will be shown in the log message - let debug_span = debug_span!("", remote_ip_addrs = ?ip_addrs); - let _span_guard = debug_span.enter(); - // This log is needed for user debugging, but it's annoying during tests. #[cfg(not(test))] info!(seed = ?host, remote_ip_count = ?ip_addrs.len(), "resolved seed peer IP addresses"); diff --git a/zebra-network/src/constants.rs b/zebra-network/src/constants.rs index e137fb7b..0f9d2fac 100644 --- a/zebra-network/src/constants.rs +++ b/zebra-network/src/constants.rs @@ -83,7 +83,11 @@ pub const PEERSET_BUFFER_SIZE: usize = 3; /// and receiving a response from a remote peer. pub const REQUEST_TIMEOUT: Duration = Duration::from_secs(20); -/// The timeout for handshakes when connecting to new peers. +/// The timeout for connections and handshakes when connecting to new peers. +/// +/// Outbound TCP connections must complete within this timeout, +/// then the handshake messages get an additional `HANDSHAKE_TIMEOUT` to complete. +/// (Inbound TCP accepts can't have a timeout, because they are handled by the OS.) /// /// This timeout should remain small, because it helps stop slow peers getting /// into the peer set. This is particularly important for network-constrained diff --git a/zebra-network/src/peer/connector.rs b/zebra-network/src/peer/connector.rs index dd2342c7..e7047ea7 100644 --- a/zebra-network/src/peer/connector.rs +++ b/zebra-network/src/peer/connector.rs @@ -7,13 +7,14 @@ use std::{ }; use futures::prelude::*; -use tokio::net::TcpStream; +use tokio::{net::TcpStream, time::timeout}; use tower::{Service, ServiceExt}; use tracing_futures::Instrument; use zebra_chain::chain_tip::{ChainTip, NoChainTip}; use crate::{ + constants::HANDSHAKE_TIMEOUT, peer::{Client, ConnectedAddr, Handshake, HandshakeRequest}, peer_set::ConnectionTracker, BoxError, PeerSocketAddr, Request, Response, @@ -93,7 +94,7 @@ where let connector_span = info_span!("connector", peer = ?connected_addr); async move { - let tcp_stream = TcpStream::connect(*addr).await?; + let tcp_stream = timeout(HANDSHAKE_TIMEOUT, TcpStream::connect(*addr)).await??; let client = hs .oneshot(HandshakeRequest:: { data_stream: tcp_stream, diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index 98b32648..2d01437a 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -34,7 +34,7 @@ use zebra_chain::chain_tip::ChainTip; use crate::{ address_book_updater::AddressBookUpdater, - constants, + constants::{self, HANDSHAKE_TIMEOUT}, meta_addr::{MetaAddr, MetaAddrChange}, peer::{ self, address_is_valid_for_inbound_listeners, HandshakeRequest, MinimumPeerVersion, @@ -100,9 +100,9 @@ pub async fn init( Arc>, ) where - S: Service + Clone + Send + 'static, + S: Service + Clone + Send + Sync + 'static, S::Future: Send + 'static, - C: ChainTip + Clone + Send + 'static, + C: ChainTip + Clone + Send + Sync + 'static, { // If we want Zebra to operate with no network, // we should implement a `zebrad` command that doesn't use `zebra-network`. @@ -551,7 +551,7 @@ async fn accept_inbound_connections( config: Config, listener: TcpListener, min_inbound_peer_connection_interval: Duration, - mut handshaker: S, + handshaker: S, peerset_tx: futures::channel::mpsc::Sender, ) -> Result<(), BoxError> where @@ -579,6 +579,7 @@ where None => unreachable!("handshakes never terminates, because it contains a future that never resolves"), }, + // This future must wait until new connections are available: it can't have a timeout. inbound_result = listener.accept() => inbound_result, }; @@ -602,51 +603,26 @@ where "handshaking on an open inbound peer connection" ); - let connected_addr = peer::ConnectedAddr::new_inbound_direct(addr); - let accept_span = info_span!("listen_accept", peer = ?connected_addr); - let _guard = accept_span.enter(); - - debug!("got incoming connection"); - - // # Correctness - // - // Holding the drop guard returned by Span::enter across .await points will - // result in incorrect traces if it yields. - // - // This await is okay because the handshaker's `poll_ready` method always returns Ready. - handshaker.ready().await?; - // TODO: distinguish between proxied listeners and direct listeners - let handshaker_span = info_span!("listen_handshaker", peer = ?connected_addr); - - // Construct a handshake future but do not drive it yet.... - let handshake = handshaker.call(HandshakeRequest { - data_stream: tcp_stream, - connected_addr, + let handshake_task = accept_inbound_handshake( + addr, + handshaker.clone(), + tcp_stream, connection_tracker, - }); - // ... instead, spawn a new task to handle this connection - { - let mut peerset_tx = peerset_tx.clone(); + peerset_tx.clone(), + ) + .await?; - let handshake_task = tokio::spawn( - async move { - let handshake_result = handshake.await; - - if let Ok(client) = handshake_result { - // The connection limit makes sure this send doesn't block - let _ = peerset_tx.send((addr, client)).await; - } else { - debug!(?handshake_result, "error handshaking with inbound peer"); - } - } - .instrument(handshaker_span), - ); - - handshakes.push(Box::pin(handshake_task)); - } - - // We need to drop the guard before yielding. - std::mem::drop(_guard); + // This timeout helps locate inbound peer connection hangs, see #6763 for details. + handshakes.push(Box::pin( + tokio::time::timeout( + // Only trigger this timeout if the inner handshake timeout fails + HANDSHAKE_TIMEOUT + Duration::from_millis(500), + handshake_task, + ) + .inspect_err(|_elapsed| { + info!("timeout in spawned accept_inbound_handshake() task") + }), + )); // Rate-limit inbound connection handshakes. // But sleep longer after a successful connection, @@ -676,6 +652,63 @@ where } } +/// Set up a new inbound connection as a Zcash peer. +/// +/// Uses `handshaker` to perform a Zcash network protocol handshake, and sends +/// the [`peer::Client`] result over `peerset_tx`. +#[instrument(skip(handshaker, tcp_stream, connection_tracker, peerset_tx))] +async fn accept_inbound_handshake( + addr: PeerSocketAddr, + mut handshaker: S, + tcp_stream: TcpStream, + connection_tracker: ConnectionTracker, + peerset_tx: futures::channel::mpsc::Sender, +) -> Result, BoxError> +where + S: Service, Response = peer::Client, Error = BoxError> + + Clone, + S::Future: Send + 'static, +{ + let connected_addr = peer::ConnectedAddr::new_inbound_direct(addr); + + debug!("got incoming connection"); + + // # Correctness + // + // Holding the drop guard returned by Span::enter across .await points will + // result in incorrect traces if it yields. + // + // This await is okay because the handshaker's `poll_ready` method always returns Ready. + handshaker.ready().await?; + // TODO: distinguish between proxied listeners and direct listeners + let handshaker_span = info_span!("listen_handshaker", peer = ?connected_addr); + + // Construct a handshake future but do not drive it yet.... + let handshake = handshaker.call(HandshakeRequest { + data_stream: tcp_stream, + connected_addr, + connection_tracker, + }); + // ... instead, spawn a new task to handle this connection + let mut peerset_tx = peerset_tx.clone(); + + let handshake_task = tokio::spawn( + async move { + let handshake_result = handshake.await; + + if let Ok(client) = handshake_result { + // The connection limit makes sure this send doesn't block + let _ = peerset_tx.send((addr, client)).await; + } else { + debug!(?handshake_result, "error handshaking with inbound peer"); + } + } + .instrument(handshaker_span), + ); + + Ok(handshake_task) +} + /// An action that the peer crawler can take. enum CrawlerAction { /// Drop the demand signal because there are too many pending handshakes. diff --git a/zebra-network/src/peer_set/initialize/tests/vectors.rs b/zebra-network/src/peer_set/initialize/tests/vectors.rs index f949506c..a0abe128 100644 --- a/zebra-network/src/peer_set/initialize/tests/vectors.rs +++ b/zebra-network/src/peer_set/initialize/tests/vectors.rs @@ -1453,7 +1453,7 @@ async fn init_with_peer_limit( default_config: impl Into>, ) -> Arc> where - S: Service + Clone + Send + 'static, + S: Service + Clone + Send + Sync + 'static, S::Future: Send + 'static, { // This test might fail on machines with no configured IPv4 addresses @@ -1610,6 +1610,7 @@ where S: Service, Response = peer::Client, Error = BoxError> + Clone + Send + + Sync + 'static, S::Future: Send + 'static, { diff --git a/zebrad/src/components/inbound/tests/real_peer_set.rs b/zebrad/src/components/inbound/tests/real_peer_set.rs index 6e734aae..ac773145 100644 --- a/zebrad/src/components/inbound/tests/real_peer_set.rs +++ b/zebrad/src/components/inbound/tests/real_peer_set.rs @@ -6,10 +6,7 @@ use futures::FutureExt; use indexmap::IndexSet; use tokio::{sync::oneshot, task::JoinHandle}; use tower::{ - buffer::Buffer, - builder::ServiceBuilder, - util::{BoxCloneService, BoxService}, - ServiceExt, + buffer::Buffer, builder::ServiceBuilder, load_shed::LoadShed, util::BoxService, ServiceExt, }; use zebra_chain::{ @@ -600,7 +597,12 @@ async fn setup( // connected peer which responds with isolated_peer_response Buffer, // inbound service - BoxCloneService, + LoadShed< + Buffer< + BoxService, + zebra_network::Request, + >, + >, // outbound peer set (only has the connected peer) Buffer< BoxService, @@ -626,11 +628,11 @@ async fn setup( // Inbound let (setup_tx, setup_rx) = oneshot::channel(); let inbound_service = Inbound::new(MAX_INBOUND_CONCURRENCY, setup_rx); + // TODO: add a timeout just above the service, if needed let inbound_service = ServiceBuilder::new() - .boxed_clone() .load_shed() .buffer(10) - .service(inbound_service); + .service(BoxService::new(inbound_service)); // State // UTXO verification doesn't matter for these tests.