fix(panic): Stop panicking when handling inbound connection handshakes (#6984)

* Remove a redundant outbound connector timeout

* Fix panics in inbound connection handshaker

* Refactor to simplify FuturesUnordered types
This commit is contained in:
teor 2023-06-19 13:39:59 +10:00 committed by GitHub
parent 73ce8fbef0
commit 859353b417
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 65 additions and 24 deletions

View File

@ -7,14 +7,13 @@ use std::{
};
use futures::prelude::*;
use tokio::{net::TcpStream, time::timeout};
use tokio::net::TcpStream;
use tower::{Service, ServiceExt};
use tracing_futures::Instrument;
use zebra_chain::chain_tip::{ChainTip, NoChainTip};
use crate::{
constants::HANDSHAKE_TIMEOUT,
peer::{Client, ConnectedAddr, Handshake, HandshakeRequest},
peer_set::ConnectionTracker,
BoxError, PeerSocketAddr, Request, Response,
@ -93,8 +92,12 @@ where
let connected_addr = ConnectedAddr::new_outbound_direct(addr);
let connector_span = info_span!("connector", peer = ?connected_addr);
// # Security
//
// `zebra_network::init()` implements a connection timeout on this future.
// Any code outside this future does not have a timeout.
async move {
let tcp_stream = timeout(HANDSHAKE_TIMEOUT, TcpStream::connect(*addr)).await??;
let tcp_stream = TcpStream::connect(*addr).await?;
let client = hs
.oneshot(HandshakeRequest::<TcpStream> {
data_stream: tcp_stream,

View File

@ -876,6 +876,10 @@ where
let relay = self.relay;
let minimum_peer_version = self.minimum_peer_version.clone();
// # Security
//
// `zebra_network::init()` implements a connection timeout on this future.
// Any code outside this future does not have a timeout.
let fut = async move {
debug!(
addr = ?connected_addr,

View File

@ -7,6 +7,7 @@ use std::{
collections::{BTreeMap, HashSet},
convert::Infallible,
net::SocketAddr,
pin::Pin,
sync::Arc,
time::Duration,
};
@ -15,13 +16,14 @@ use futures::{
future::{self, FutureExt},
sink::SinkExt,
stream::{FuturesUnordered, StreamExt},
TryFutureExt,
Future, TryFutureExt,
};
use rand::seq::SliceRandom;
use tokio::{
net::{TcpListener, TcpStream},
sync::broadcast,
time::{sleep, Instant},
task::JoinError,
time::{error::Elapsed, sleep, Instant},
};
use tokio_stream::wrappers::IntervalStream;
use tower::{
@ -565,7 +567,8 @@ where
"Inbound Connections",
);
let mut handshakes = FuturesUnordered::new();
let mut handshakes: FuturesUnordered<Pin<Box<dyn Future<Output = ()> + Send>>> =
FuturesUnordered::new();
// Keeping an unresolved future in the pool means the stream never terminates.
handshakes.push(future::pending().boxed());
@ -575,8 +578,7 @@ where
biased;
next_handshake_res = handshakes.next() => match next_handshake_res {
// The task has already sent the peer change to the peer set.
Some(Ok(_)) => continue,
Some(Err(task_panic)) => panic!("panic in inbound handshake task: {task_panic:?}"),
Some(()) => continue,
None => unreachable!("handshakes never terminates, because it contains a future that never resolves"),
},
@ -611,19 +613,37 @@ where
connection_tracker,
peerset_tx.clone(),
)
.await?;
.await?
.map(move |res| match res {
Ok(()) => (),
Err(e @ JoinError { .. }) => {
if e.is_panic() {
panic!("panic during inbound handshaking: {e:?}");
} else {
info!(
"task error during inbound handshaking: {e:?}, is Zebra shutting down?"
)
}
}
});
let handshake_timeout = tokio::time::timeout(
// Only trigger this timeout if the inner handshake timeout fails
HANDSHAKE_TIMEOUT + Duration::from_millis(500),
handshake_task,
)
.map(move |res| match res {
Ok(()) => (),
Err(_e @ Elapsed { .. }) => {
info!(
"timeout in spawned accept_inbound_handshake() task: \
inner task should have timeout out already"
);
}
});
// This timeout helps locate inbound peer connection hangs, see #6763 for details.
handshakes.push(Box::pin(
tokio::time::timeout(
// Only trigger this timeout if the inner handshake timeout fails
HANDSHAKE_TIMEOUT + Duration::from_millis(500),
handshake_task,
)
.inspect_err(|_elapsed| {
info!("timeout in spawned accept_inbound_handshake() task")
}),
));
handshakes.push(Box::pin(handshake_timeout));
// Rate-limit inbound connection handshakes.
// But sleep longer after a successful connection,
@ -798,7 +818,9 @@ where
let candidates = Arc::new(futures::lock::Mutex::new(candidates));
// This contains both crawl and handshake tasks.
let mut handshakes = FuturesUnordered::new();
let mut handshakes: FuturesUnordered<
Pin<Box<dyn Future<Output = Result<CrawlerAction, BoxError>> + Send>>,
> = FuturesUnordered::new();
// <FuturesUnordered as Stream> returns None when empty.
// Keeping an unresolved future in the pool means the stream never terminates.
handshakes.push(future::pending().boxed());
@ -905,8 +927,14 @@ where
})
.map(move |res| match res {
Ok(crawler_action) => crawler_action,
Err(e) => {
panic!("panic during handshaking: {e:?}");
Err(e @ JoinError {..}) => {
if e.is_panic() {
panic!("panic during outbound handshake: {e:?}");
} else {
info!("task error during outbound handshake: {e:?}, is Zebra shutting down?")
}
// Just fake it
Ok(HandshakeFinished)
}
})
.in_current_span();
@ -929,8 +957,14 @@ where
})
.map(move |res| match res {
Ok(crawler_action) => crawler_action,
Err(e) => {
panic!("panic during TimerCrawl: {tick:?} {e:?}");
Err(e @ JoinError {..}) => {
if e.is_panic() {
panic!("panic during outbound TimerCrawl: {tick:?} {e:?}");
} else {
info!("task error during outbound TimerCrawl: {e:?}, is Zebra shutting down?")
}
// Just fake it
Ok(TimerCrawlFinished)
}
})
.in_current_span();