From e9c9ea91bcd073d115b2c65d51ade48aaf695866 Mon Sep 17 00:00:00 2001 From: teor Date: Mon, 1 Aug 2022 07:05:52 +1000 Subject: [PATCH] fix(net): Fix handshake timing and error handling (#4772) * Actually wait between initial peer connections * Add a missing span to initial handshake tasks * Forward handshake panics to the calling task * Clarify a handshake comment * Wrap the entire handshake in a timeout, not just some messages * Actually delay spawning initial connections, so we don't flood the network Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- zebra-network/src/peer/handshake.rs | 58 +++++++++++++++--------- zebra-network/src/peer_set/initialize.rs | 29 ++++++------ 2 files changed, 52 insertions(+), 35 deletions(-) diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index 63e6f3ba..ae7df8ae 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -6,6 +6,7 @@ use std::{ fmt, future::Future, net::{IpAddr, Ipv4Addr, SocketAddr}, + panic, pin::Pin, sync::Arc, task::{Context, Poll}, @@ -17,7 +18,7 @@ use tokio::{ io::{AsyncRead, AsyncWrite}, sync::broadcast, task::JoinError, - time::{timeout, Instant}, + time::{error, timeout, Instant}, }; use tokio_stream::wrappers::IntervalStream; use tokio_util::codec::Framed; @@ -807,10 +808,6 @@ where "negotiating protocol version with remote peer" ); - // CORRECTNESS - // - // As a defence-in-depth against hangs, every send() or next() on peer_conn - // should be wrapped in a timeout. let mut peer_conn = Framed::new( data_stream, Codec::builder() @@ -820,20 +817,17 @@ where ); // Wrap the entire initial connection setup in a timeout. - let (remote_version, remote_services, remote_canonical_addr) = timeout( - constants::HANDSHAKE_TIMEOUT, - negotiate_version( - &mut peer_conn, - &connected_addr, - config, - nonces, - user_agent, - our_services, - relay, - minimum_peer_version, - ), + let (remote_version, remote_services, remote_canonical_addr) = negotiate_version( + &mut peer_conn, + &connected_addr, + config, + nonces, + user_agent, + our_services, + relay, + minimum_peer_version, ) - .await??; + .await?; // If we've learned potential peer addresses from an inbound // connection or handshake, add those addresses to our address book. @@ -874,8 +868,9 @@ where debug!("constructing client, spawning server"); - // These channels should not be cloned more than they are - // in this block, see constants.rs for more. + // These channels communicate between the inbound and outbound halves of the connection, + // and between the different connection tasks. We create separate tasks and channels + // for each new connection. let (server_tx, server_rx) = futures::channel::mpsc::channel(0); let (shutdown_tx, shutdown_rx) = oneshot::channel(); let error_slot = ErrorSlot::default(); @@ -1014,9 +1009,28 @@ where Ok(client) }; - // Spawn a new task to drive this handshake. + // Correctness: As a defence-in-depth against hangs, wrap the entire handshake in a timeout. + let fut = timeout(constants::HANDSHAKE_TIMEOUT, fut); + + // Spawn a new task to drive this handshake, forwarding panics to the calling task. tokio::spawn(fut.instrument(negotiator_span)) - .map(|x: Result, JoinError>| Ok(x??)) + .map( + |join_result: Result< + Result, error::Elapsed>, + JoinError, + >| { + match join_result { + Ok(Ok(Ok(connection_client))) => Ok(connection_client), + Ok(Ok(Err(handshake_error))) => Err(handshake_error.into()), + Ok(Err(timeout_error)) => Err(timeout_error.into()), + Err(join_error) => match join_error.try_into_panic() { + // Forward panics to the calling task + Ok(panic_reason) => panic::resume_unwind(panic_reason), + Err(join_error) => Err(join_error.into()), + }, + } + }, + ) .boxed() } } diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index 4de951bc..f7d69840 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -289,21 +289,24 @@ where addr, connection_tracker, }; + let outbound_connector = outbound_connector.clone(); - // Construct a connector future but do not drive it yet ... - let outbound_connector_future = outbound_connector - .clone() - .oneshot(req) - .map_err(move |e| (addr, e)); + // Spawn a new task to make the outbound connection. + tokio::spawn( + async move { + // Only spawn one outbound connector per `MIN_PEER_CONNECTION_INTERVAL`, + // sleeping for an interval according to its index in the list. + sleep(constants::MIN_PEER_CONNECTION_INTERVAL.saturating_mul(i as u32)).await; - // ... instead, spawn a new task to handle this connector - tokio::spawn(async move { - let task = outbound_connector_future.await; - // Only spawn one outbound connector per `MIN_PEER_CONNECTION_INTERVAL`, - // sleeping for an interval according to its index in the list. - sleep(constants::MIN_PEER_CONNECTION_INTERVAL.saturating_mul(i as u32)).await; - task - }) + // As soon as we create the connector future, + // the handshake starts running as a spawned task. + outbound_connector + .oneshot(req) + .map_err(move |e| (addr, e)) + .await + } + .in_current_span(), + ) }) .collect();