From ac4ed57751c8d5fd69be7c35815c05b4e3e3f894 Mon Sep 17 00:00:00 2001 From: teor Date: Thu, 13 Jan 2022 05:15:07 +1000 Subject: [PATCH] Cancel heartbeats that are waiting for a peer, rather than hanging Zebra (#3325) * If the crawler is delayed, delay future crawl intervals by the same amount * Cancel heartbeats that are waiting for network requests or responses --- zebra-network/src/peer/error.rs | 5 + zebra-network/src/peer/handshake.rs | 128 ++++++++++++++++------- zebra-network/src/peer_set/initialize.rs | 9 +- 3 files changed, 99 insertions(+), 43 deletions(-) diff --git a/zebra-network/src/peer/error.rs b/zebra-network/src/peer/error.rs index 68b9a801..fdcf6bf1 100644 --- a/zebra-network/src/peer/error.rs +++ b/zebra-network/src/peer/error.rs @@ -41,6 +41,10 @@ pub enum PeerError { #[error("Internal peer connection task exited")] ConnectionTaskExited, + /// Zebra's [`Client`] cancelled its heartbeat task. + #[error("Internal client cancelled its heartbeat task")] + ClientCancelledHeartbeatTask, + /// Zebra's internal heartbeat task exited. #[error("Internal heartbeat task exited")] HeartbeatTaskExited, @@ -75,6 +79,7 @@ impl PeerError { PeerError::ConnectionClosed => "ConnectionClosed".into(), PeerError::ConnectionDropped => "ConnectionDropped".into(), PeerError::ClientDropped => "ClientDropped".into(), + PeerError::ClientCancelledHeartbeatTask => "ClientCancelledHeartbeatTask".into(), PeerError::HeartbeatTaskExited => "HeartbeatTaskExited".into(), PeerError::ConnectionTaskExited => "ConnectionTaskExited".into(), PeerError::ClientRequestTimeout => "ClientRequestTimeout".into(), diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index 0f8e9d20..43d76159 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -10,7 +10,7 @@ use std::{ }; use chrono::{TimeZone, Utc}; -use futures::{channel::oneshot, future, FutureExt, SinkExt, StreamExt}; +use futures::{channel::oneshot, future, pin_mut, FutureExt, SinkExt, StreamExt}; use tokio::{ net::TcpStream, sync::broadcast, @@ -732,7 +732,7 @@ where // CORRECTNESS // - // As a defence-in-depth against hangs, every send or next on stream + // As a defence-in-depth against hangs, every send() or next() on peer_conn // should be wrapped in a timeout. let mut peer_conn = Framed::new( tcp_stream, @@ -933,7 +933,7 @@ where ); let heartbeat_task = tokio::spawn( - send_periodic_heartbeats( + send_periodic_heartbeats_with_shutdown_handle( connected_addr, remote_services, shutdown_rx, @@ -975,15 +975,74 @@ where /// heartbeat_ts_collector. /// /// Returning from this function terminates the connection's heartbeat task. -async fn send_periodic_heartbeats( +async fn send_periodic_heartbeats_with_shutdown_handle( connected_addr: ConnectedAddr, remote_services: PeerServices, - mut shutdown_rx: oneshot::Receiver, - mut server_tx: futures::channel::mpsc::Sender, + shutdown_rx: oneshot::Receiver, + server_tx: futures::channel::mpsc::Sender, mut heartbeat_ts_collector: tokio::sync::mpsc::Sender, ) { use futures::future::Either; + let heartbeat_run_loop = send_periodic_heartbeats_run_loop( + connected_addr, + remote_services, + server_tx, + heartbeat_ts_collector.clone(), + ); + + pin_mut!(shutdown_rx); + pin_mut!(heartbeat_run_loop); + + // CORRECTNESS + // + // Currently, select prefers the first future if multiple + // futures are ready. + // + // Starvation is impossible here, because interval has a + // slow rate, and shutdown is a oneshot. If both futures + // are ready, we want the shutdown to take priority over + // sending a useless heartbeat. + let _result = match future::select(shutdown_rx, heartbeat_run_loop).await { + Either::Left((Ok(CancelHeartbeatTask), _unused_run_loop)) => { + tracing::trace!("shutting down because Client requested shut down"); + handle_heartbeat_shutdown( + PeerError::ClientCancelledHeartbeatTask, + &mut heartbeat_ts_collector, + &connected_addr, + &remote_services, + ) + .await + } + Either::Left((Err(oneshot::Canceled), _unused_run_loop)) => { + tracing::trace!("shutting down because Client was dropped"); + handle_heartbeat_shutdown( + PeerError::ClientDropped, + &mut heartbeat_ts_collector, + &connected_addr, + &remote_services, + ) + .await + } + Either::Right((result, _unused_shutdown)) => { + tracing::trace!("shutting down due to heartbeat failure"); + // heartbeat_timeout() already send an error on the timestamp collector channel + + result + } + }; +} + +/// Send periodical heartbeats to `server_tx`, and update the peer status through +/// `heartbeat_ts_collector`. +/// +/// See `send_periodic_heartbeats_with_shutdown_handle` for details. +async fn send_periodic_heartbeats_run_loop( + connected_addr: ConnectedAddr, + remote_services: PeerServices, + mut server_tx: futures::channel::mpsc::Sender, + mut heartbeat_ts_collector: tokio::sync::mpsc::Sender, +) -> Result<(), BoxError> { // Don't send the first heartbeat immediately - we've just completed the handshake! let mut interval = tokio::time::interval_at( Instant::now() + constants::HEARTBEAT_INTERVAL, @@ -995,49 +1054,20 @@ async fn send_periodic_heartbeats( let mut interval_stream = IntervalStream::new(interval); - loop { - let shutdown_rx_ref = Pin::new(&mut shutdown_rx); - - // CORRECTNESS - // - // Currently, select prefers the first future if multiple - // futures are ready. - // - // Starvation is impossible here, because interval has a - // slow rate, and shutdown is a oneshot. If both futures - // are ready, we want the shutdown to take priority over - // sending a useless heartbeat. - if matches!( - future::select(shutdown_rx_ref, interval_stream.next()).await, - Either::Left(_) - ) { - tracing::trace!("shutting down due to Client shut down"); - if let Some(book_addr) = connected_addr.get_address_book_addr() { - // awaiting a local task won't hang - let _ = heartbeat_ts_collector - .send(MetaAddr::new_shutdown(&book_addr, remote_services)) - .await; - } - return; - } - + while let Some(_instant) = interval_stream.next().await { // We've reached another heartbeat interval without // shutting down, so do a heartbeat request. - // - // TODO: await heartbeat and shutdown (#3254) let heartbeat = send_one_heartbeat(&mut server_tx); - if heartbeat_timeout( + heartbeat_timeout( heartbeat, &mut heartbeat_ts_collector, &connected_addr, &remote_services, ) - .await - .is_err() - { - return; - } + .await?; } + + unreachable!("unexpected IntervalStream termination") } /// Send one heartbeat using `server_tx`. @@ -1145,3 +1175,21 @@ where } } } + +/// Mark `connected_addr` as shut down using `address_book_updater`. +async fn handle_heartbeat_shutdown( + peer_error: PeerError, + address_book_updater: &mut tokio::sync::mpsc::Sender, + connected_addr: &ConnectedAddr, + remote_services: &PeerServices, +) -> Result<(), BoxError> { + tracing::debug!(?peer_error, "client shutdown, shutting down heartbeat"); + + if let Some(book_addr) = connected_addr.get_address_book_addr() { + let _ = address_book_updater + .send(MetaAddr::new_shutdown(&book_addr, *remote_services)) + .await; + } + + Err(peer_error.into()) +} diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index 82d42cbd..74e8a5a2 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -667,9 +667,12 @@ where // prevents us from adding items to the stream and checking its length. handshakes.push(future::pending().boxed()); - let mut crawl_timer = - IntervalStream::new(tokio::time::interval(config.crawl_new_peer_interval)) - .map(|tick| TimerCrawl { tick }); + let mut crawl_timer = tokio::time::interval(config.crawl_new_peer_interval); + // If the crawl is delayed, also delay all future crawls. + // (Shorter intervals just add load, without any benefit.) + crawl_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + + let mut crawl_timer = IntervalStream::new(crawl_timer).map(|tick| TimerCrawl { tick }); loop { metrics::gauge!(