diff --git a/zebra-network/src/peer/client.rs b/zebra-network/src/peer/client.rs index 8c328742..961e761d 100644 --- a/zebra-network/src/peer/client.rs +++ b/zebra-network/src/peer/client.rs @@ -25,6 +25,7 @@ use crate::{ external::{types::Version, InventoryHash}, internal::{Request, Response}, }, + BoxError, }; #[cfg(any(test, feature = "proptest-impl"))] @@ -58,7 +59,7 @@ pub struct Client { pub(crate) connection_task: JoinHandle<()>, /// A handle to the task responsible for sending periodic heartbeats. - pub(crate) heartbeat_task: JoinHandle<()>, + pub(crate) heartbeat_task: JoinHandle>, } /// A signal sent by the [`Client`] half of a peer connection, @@ -427,7 +428,10 @@ impl Client { .is_ready(); if is_canceled { - return self.set_task_exited_error("heartbeat", PeerError::HeartbeatTaskExited); + return self.set_task_exited_error( + "heartbeat", + PeerError::HeartbeatTaskExited("Task was cancelled".to_string()), + ); } match self.heartbeat_task.poll_unpin(cx) { @@ -435,13 +439,41 @@ impl Client { // Heartbeat task is still running. Ok(()) } - Poll::Ready(Ok(())) => { - // Heartbeat task stopped unexpectedly, without panicking. - self.set_task_exited_error("heartbeat", PeerError::HeartbeatTaskExited) + Poll::Ready(Ok(Ok(_))) => { + // Heartbeat task stopped unexpectedly, without panic or error. + self.set_task_exited_error( + "heartbeat", + PeerError::HeartbeatTaskExited( + "Heartbeat task stopped unexpectedly".to_string(), + ), + ) + } + Poll::Ready(Ok(Err(error))) => { + // Heartbeat task stopped unexpectedly, with error. + self.set_task_exited_error( + "heartbeat", + PeerError::HeartbeatTaskExited(error.to_string()), + ) } Poll::Ready(Err(error)) => { - // Heartbeat task stopped unexpectedly with a panic. - panic!("heartbeat task has panicked: {}", error); + // Heartbeat task was cancelled. + if error.is_cancelled() { + self.set_task_exited_error( + "heartbeat", + PeerError::HeartbeatTaskExited("Task was cancelled".to_string()), + ) + } + // Heartbeat task stopped with panic. + else if error.is_panic() { + panic!("heartbeat task has panicked: {}", error); + } + // Heartbeat task stopped with error. + else { + self.set_task_exited_error( + "heartbeat", + PeerError::HeartbeatTaskExited(error.to_string()), + ) + } } } } diff --git a/zebra-network/src/peer/client/tests.rs b/zebra-network/src/peer/client/tests.rs index 809386ed..354a02f0 100644 --- a/zebra-network/src/peer/client/tests.rs +++ b/zebra-network/src/peer/client/tests.rs @@ -18,6 +18,7 @@ use crate::{ peer::{error::SharedPeerError, CancelHeartbeatTask, Client, ClientRequest, ErrorSlot}, peer_set::InventoryChange, protocol::external::types::Version, + BoxError, }; #[cfg(test)] @@ -282,7 +283,7 @@ where let (connection_task, connection_aborter) = Self::spawn_background_task_or_fallback(self.connection_task); let (heartbeat_task, heartbeat_aborter) = - Self::spawn_background_task_or_fallback(self.heartbeat_task); + Self::spawn_background_task_or_fallback_with_result(self.heartbeat_task); let client = Client { shutdown_tx: Some(shutdown_sender), @@ -332,4 +333,37 @@ where (task_handle, abort_handle) } + + // TODO: In the context of #4734: + // - Delete `spawn_background_task_or_fallback` and `spawn_background_task` + // - Rename `spawn_background_task_or_fallback_with_result` and `spawn_background_task_with_result` to + // `spawn_background_task_or_fallback` and `spawn_background_task` + + // Similar to `spawn_background_task_or_fallback` but returns a `Result`. + fn spawn_background_task_or_fallback_with_result( + task_future: Option, + ) -> (JoinHandle>, AbortHandle) + where + T: Future + Send + 'static, + { + match task_future { + Some(future) => Self::spawn_background_task_with_result(future), + None => Self::spawn_background_task_with_result(tokio::time::sleep( + MAX_PEER_CONNECTION_TIME, + )), + } + } + + // Similar to `spawn_background_task` but returns a `Result`. + fn spawn_background_task_with_result( + task_future: T, + ) -> (JoinHandle>, AbortHandle) + where + T: Future + Send + 'static, + { + let (task, abort_handle) = future::abortable(task_future); + let task_handle = tokio::spawn(task.map(|_result| Ok(()))); + + (task_handle, abort_handle) + } } diff --git a/zebra-network/src/peer/client/tests/vectors.rs b/zebra-network/src/peer/client/tests/vectors.rs index 8dc75f3b..a86087ed 100644 --- a/zebra-network/src/peer/client/tests/vectors.rs +++ b/zebra-network/src/peer/client/tests/vectors.rs @@ -80,7 +80,7 @@ async fn client_service_ready_heartbeat_exit() { let (mut client, mut harness) = ClientTestHarness::build().finish(); - harness.set_error(PeerError::HeartbeatTaskExited); + harness.set_error(PeerError::HeartbeatTaskExited("some error".to_string())); harness.drop_heartbeat_shutdown_receiver(); assert!(client.is_failed().await); diff --git a/zebra-network/src/peer/error.rs b/zebra-network/src/peer/error.rs index 4e53cb9a..a5d6913f 100644 --- a/zebra-network/src/peer/error.rs +++ b/zebra-network/src/peer/error.rs @@ -57,8 +57,8 @@ pub enum PeerError { ClientCancelledHeartbeatTask, /// Zebra's internal heartbeat task exited. - #[error("Internal heartbeat task exited")] - HeartbeatTaskExited, + #[error("Internal heartbeat task exited with message: {0:?}")] + HeartbeatTaskExited(String), /// Sending a message to a remote peer took too long. #[error("Sending Client request timed out")] @@ -130,7 +130,7 @@ impl PeerError { PeerError::ConnectionDropped => "ConnectionDropped".into(), PeerError::ClientDropped => "ClientDropped".into(), PeerError::ClientCancelledHeartbeatTask => "ClientCancelledHeartbeatTask".into(), - PeerError::HeartbeatTaskExited => "HeartbeatTaskExited".into(), + PeerError::HeartbeatTaskExited(_) => "HeartbeatTaskExited".into(), PeerError::ConnectionTaskExited => "ConnectionTaskExited".into(), PeerError::ConnectionSendTimeout => "ConnectionSendTimeout".into(), PeerError::ConnectionReceiveTimeout => "ConnectionReceiveTimeout".into(), diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index 2010a89d..43e481aa 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -996,7 +996,8 @@ where server_tx.clone(), address_book_updater.clone(), ) - .instrument(tracing::debug_span!(parent: connection_span, "heartbeat")), + .instrument(tracing::debug_span!(parent: connection_span, "heartbeat")) + .boxed(), ); let client = Client { @@ -1114,7 +1115,7 @@ async fn send_periodic_heartbeats_with_shutdown_handle( shutdown_rx: oneshot::Receiver, server_tx: futures::channel::mpsc::Sender, mut heartbeat_ts_collector: tokio::sync::mpsc::Sender, -) { +) -> Result<(), BoxError> { use futures::future::Either; let heartbeat_run_loop = send_periodic_heartbeats_run_loop( @@ -1136,7 +1137,7 @@ async fn send_periodic_heartbeats_with_shutdown_handle( // slow rate, and shutdown is a oneshot. If both futures // are ready, we want the shutdown to take priority over // sending a useless heartbeat. - let _result = match future::select(shutdown_rx, heartbeat_run_loop).await { + let result = match future::select(shutdown_rx, heartbeat_run_loop).await { Either::Left((Ok(CancelHeartbeatTask), _unused_run_loop)) => { tracing::trace!("shutting down because Client requested shut down"); handle_heartbeat_shutdown( @@ -1164,6 +1165,8 @@ async fn send_periodic_heartbeats_with_shutdown_handle( result } }; + + result } /// Send periodical heartbeats to `server_tx`, and update the peer status through