Stop panicking on shutdown in the syncer and network init (#7104)

This commit is contained in:
teor 2023-07-03 06:08:11 +10:00 committed by GitHub
parent 322cbec817
commit e6c3b87872
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 41 additions and 6 deletions

View File

@ -3,7 +3,7 @@
use std::{ use std::{
fmt, fmt,
net::SocketAddr, net::{Ipv4Addr, SocketAddr},
ops::{Deref, DerefMut}, ops::{Deref, DerefMut},
str::FromStr, str::FromStr,
}; };
@ -66,6 +66,11 @@ impl DerefMut for PeerSocketAddr {
} }
impl PeerSocketAddr { impl PeerSocketAddr {
/// Returns an unspecified `PeerSocketAddr`, which can't be used for outbound connections.
pub fn unspecified() -> Self {
Self(SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 0))
}
/// Return the underlying [`SocketAddr`], which allows sensitive peer address information to /// Return the underlying [`SocketAddr`], which allows sensitive peer address information to
/// be printed and logged. /// be printed and logged.
pub fn remove_socket_addr_privacy(&self) -> SocketAddr { pub fn remove_socket_addr_privacy(&self) -> SocketAddr {

View File

@ -213,7 +213,17 @@ where
// Wait for the initial seed peer count // Wait for the initial seed peer count
let mut active_outbound_connections = initial_peers_join let mut active_outbound_connections = initial_peers_join
.await .await
.expect("unexpected panic in spawned initial peers task") .unwrap_or_else(|e @ JoinError { .. }| {
if e.is_panic() {
panic!("panic in initial peer connections task: {e:?}");
} else {
info!(
"task error during initial peer connections: {e:?},\
is Zebra shutting down?"
);
Err(e.into())
}
})
.expect("unexpected error connecting to initial peers"); .expect("unexpected error connecting to initial peers");
let active_initial_peer_count = active_outbound_connections.update_count(); let active_initial_peer_count = active_outbound_connections.update_count();
@ -353,8 +363,18 @@ where
.collect(); .collect();
while let Some(handshake_result) = handshakes.next().await { while let Some(handshake_result) = handshakes.next().await {
let handshake_result = let handshake_result = handshake_result.unwrap_or_else(|e @ JoinError { .. }| {
handshake_result.expect("unexpected panic in initial peer handshake"); if e.is_panic() {
panic!("panic in initial peer connection: {e:?}");
} else {
info!(
"task error during initial peer connection: {e:?},\
is Zebra shutting down?"
);
// Fake the address, it doesn't matter because we're shutting down anyway
Err((PeerSocketAddr::unspecified(), e.into()))
}
});
match handshake_result { match handshake_result {
Ok(change) => { Ok(change) => {
handshake_success_total += 1; handshake_success_total += 1;

View File

@ -8,7 +8,7 @@ use color_eyre::eyre::{eyre, Report};
use futures::stream::{FuturesUnordered, StreamExt}; use futures::stream::{FuturesUnordered, StreamExt};
use indexmap::IndexSet; use indexmap::IndexSet;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::{sync::watch, time::sleep}; use tokio::{sync::watch, task::JoinError, time::sleep};
use tower::{ use tower::{
builder::ServiceBuilder, hedge::Hedge, limit::ConcurrencyLimit, retry::Retry, timeout::Timeout, builder::ServiceBuilder, hedge::Hedge, limit::ConcurrencyLimit, retry::Retry, timeout::Timeout,
Service, ServiceExt, Service, ServiceExt,
@ -668,7 +668,17 @@ where
let mut download_set = IndexSet::new(); let mut download_set = IndexSet::new();
while let Some(res) = requests.next().await { while let Some(res) = requests.next().await {
match res match res
.expect("panic in spawned obtain tips request") .unwrap_or_else(|e @ JoinError { .. }| {
if e.is_panic() {
panic!("panic in obtain tips task: {e:?}");
} else {
info!(
"task error during obtain tips task: {e:?},\
is Zebra shutting down?"
);
Err(e.into())
}
})
.map_err::<Report, _>(|e| eyre!(e)) .map_err::<Report, _>(|e| eyre!(e))
{ {
Ok(zn::Response::BlockHashes(hashes)) => { Ok(zn::Response::BlockHashes(hashes)) => {