propagate errors out of zebra_network::init (#435)

Prior to this change, the service returned by `zebra_network::init` would spawn background tasks that could silently fail, causing unexpected errors in the zebra_network service.

This change modifies the `PeerSet` that backs `zebra_network::init` to store all of the `JoinHandle`s for each background task it depends on. The `PeerSet` then checks this set of futures to see if any of them have exited with an error or a panic, and if they have it returns the error as part of `poll_ready`.
This commit is contained in:
Jane Lusby 2020-06-09 12:24:28 -07:00 committed by GitHub
parent d61cf27d74
commit 431f194c0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 104 additions and 48 deletions

View File

@ -31,7 +31,7 @@
#![deny(missing_docs)] #![deny(missing_docs)]
// Tracing causes false positives on this lint: // Tracing causes false positives on this lint:
// https://github.com/tokio-rs/tracing/issues/553 // https://github.com/tokio-rs/tracing/issues/553
#![allow(clippy::cognitive_complexity)] #![allow(clippy::cognitive_complexity, clippy::try_err)]
#[macro_use] #[macro_use]
extern crate pin_project; extern crate pin_project;

View File

@ -72,45 +72,38 @@ where
let (peerset_tx, peerset_rx) = mpsc::channel::<PeerChange>(100); let (peerset_tx, peerset_rx) = mpsc::channel::<PeerChange>(100);
// Create an mpsc channel for peerset demand signaling. // Create an mpsc channel for peerset demand signaling.
let (mut demand_tx, demand_rx) = mpsc::channel::<()>(100); let (mut demand_tx, demand_rx) = mpsc::channel::<()>(100);
let (handle_tx, handle_rx) = tokio::sync::oneshot::channel();
// Connect the rx end to a PeerSet, wrapping new peers in load instruments. // Connect the rx end to a PeerSet, wrapping new peers in load instruments.
let peer_set = Buffer::new( let peer_set = PeerSet::new(
PeerSet::new( PeakEwmaDiscover::new(
PeakEwmaDiscover::new( ServiceStream::new(
ServiceStream::new( // ServiceStream interprets an error as stream termination,
// ServiceStream interprets an error as stream termination, // so discard any errored connections...
// so discard any errored connections... peerset_rx.filter(|result| future::ready(result.is_ok())),
peerset_rx.filter(|result| future::ready(result.is_ok())),
),
config.ewma_default_rtt,
config.ewma_decay_time,
NoInstrument,
), ),
demand_tx.clone(), config.ewma_default_rtt,
config.ewma_decay_time,
NoInstrument,
), ),
config.peerset_request_buffer_size, demand_tx.clone(),
handle_rx,
); );
let peer_set = Buffer::new(peer_set, config.peerset_request_buffer_size);
// Connect the tx end to the 3 peer sources: // Connect the tx end to the 3 peer sources:
// 1. Initial peers, specified in the config. // 1. Initial peers, specified in the config.
tokio::spawn(add_initial_peers( let add_guard = tokio::spawn(add_initial_peers(
config.initial_peers(), config.initial_peers(),
connector.clone(), connector.clone(),
peerset_tx.clone(), peerset_tx.clone(),
)); ));
// 2. Incoming peer connections, via a listener. // 2. Incoming peer connections, via a listener.
tokio::spawn( let listen_guard = tokio::spawn(listen(config.listen_addr, listener, peerset_tx.clone()));
listen(config.listen_addr, listener, peerset_tx.clone()).map(|result| {
if let Err(e) = result {
error!(%e);
}
}),
);
// 3. Outgoing peers we connect to in response to load. // 3. Outgoing peers we connect to in response to load.
let mut candidates = CandidateSet::new(address_book.clone(), peer_set.clone()); let mut candidates = CandidateSet::new(address_book.clone(), peer_set.clone());
// We need to await candidates.update() here, because Zcashd only sends one // We need to await candidates.update() here, because Zcashd only sends one
@ -125,21 +118,18 @@ where
let _ = demand_tx.try_send(()); let _ = demand_tx.try_send(());
} }
tokio::spawn( let crawl_guard = tokio::spawn(crawl_and_dial(
crawl_and_dial( config.new_peer_interval,
config.new_peer_interval, demand_tx,
demand_tx, demand_rx,
demand_rx, candidates,
candidates, connector,
connector, peerset_tx,
peerset_tx, ));
)
.map(|result| { handle_tx
if let Err(e) = result { .send(vec![add_guard, listen_guard, crawl_guard])
error!(%e); .unwrap();
}
}),
);
(peer_set, address_book) (peer_set, address_book)
} }
@ -151,7 +141,8 @@ async fn add_initial_peers<S>(
initial_peers: std::collections::HashSet<SocketAddr>, initial_peers: std::collections::HashSet<SocketAddr>,
connector: S, connector: S,
mut tx: mpsc::Sender<PeerChange>, mut tx: mpsc::Sender<PeerChange>,
) where ) -> Result<(), BoxedStdError>
where
S: Service<SocketAddr, Response = Change<SocketAddr, peer::Client>, Error = BoxedStdError> S: Service<SocketAddr, Response = Change<SocketAddr, peer::Client>, Error = BoxedStdError>
+ Clone, + Clone,
S::Future: Send + 'static, S::Future: Send + 'static,
@ -160,9 +151,12 @@ async fn add_initial_peers<S>(
use tower::util::CallAllUnordered; use tower::util::CallAllUnordered;
let addr_stream = futures::stream::iter(initial_peers.into_iter()); let addr_stream = futures::stream::iter(initial_peers.into_iter());
let mut handshakes = CallAllUnordered::new(connector, addr_stream); let mut handshakes = CallAllUnordered::new(connector, addr_stream);
while let Some(handshake_result) = handshakes.next().await { while let Some(handshake_result) = handshakes.next().await {
let _ = tx.send(handshake_result).await; tx.send(handshake_result).await?;
} }
Ok(())
} }
/// Bind to `addr`, listen for peers using `handshaker`, then send the /// Bind to `addr`, listen for peers using `handshaker`, then send the

View File

@ -14,6 +14,8 @@ use futures::{
stream::FuturesUnordered, stream::FuturesUnordered,
}; };
use indexmap::IndexMap; use indexmap::IndexMap;
use tokio::sync::oneshot::error::TryRecvError;
use tokio::task::JoinHandle;
use tower::{ use tower::{
discover::{Change, Discover}, discover::{Change, Discover},
Service, Service,
@ -77,6 +79,15 @@ where
unready_services: FuturesUnordered<UnreadyService<D::Key, D::Service, Request>>, unready_services: FuturesUnordered<UnreadyService<D::Key, D::Service, Request>>,
next_idx: Option<usize>, next_idx: Option<usize>,
demand_signal: mpsc::Sender<()>, demand_signal: mpsc::Sender<()>,
/// Channel for passing ownership of tokio JoinHandles from PeerSet's background tasks
///
/// The join handles passed into the PeerSet are used populate the `guards` member
handle_rx: tokio::sync::oneshot::Receiver<Vec<JoinHandle<Result<(), BoxedStdError>>>>,
/// Unordered set of handles to background tasks associated with the `PeerSet`
///
/// These guards are checked for errors as part of `poll_ready` which lets
/// the `PeerSet` propagate errors from background tasks back to the user
guards: futures::stream::FuturesUnordered<JoinHandle<Result<(), BoxedStdError>>>,
} }
impl<D> PeerSet<D> impl<D> PeerSet<D>
@ -90,7 +101,11 @@ where
<D::Service as Load>::Metric: Debug, <D::Service as Load>::Metric: Debug,
{ {
/// Construct a peerset which uses `discover` internally. /// Construct a peerset which uses `discover` internally.
pub fn new(discover: D, demand_signal: mpsc::Sender<()>) -> Self { pub fn new(
discover: D,
demand_signal: mpsc::Sender<()>,
handle_rx: tokio::sync::oneshot::Receiver<Vec<JoinHandle<Result<(), BoxedStdError>>>>,
) -> Self {
Self { Self {
discover, discover,
ready_services: IndexMap::new(), ready_services: IndexMap::new(),
@ -98,6 +113,8 @@ where
unready_services: FuturesUnordered::new(), unready_services: FuturesUnordered::new(),
next_idx: None, next_idx: None,
demand_signal, demand_signal,
guards: futures::stream::FuturesUnordered::new(),
handle_rx,
} }
} }
@ -152,6 +169,30 @@ where
}); });
} }
fn check_for_background_errors(&mut self, cx: &mut Context) -> Result<(), BoxedStdError> {
if self.guards.is_empty() {
match self.handle_rx.try_recv() {
Ok(handles) => {
for handle in handles {
self.guards.push(handle);
}
}
Err(TryRecvError::Closed) => unreachable!(
"try_recv will never be called if the futures have already been received"
),
Err(TryRecvError::Empty) => return Ok(()),
}
}
match Pin::new(&mut self.guards).poll_next(cx) {
Poll::Pending => {}
Poll::Ready(Some(res)) => res??,
Poll::Ready(None) => Err("all background tasks have exited")?,
}
Ok(())
}
fn poll_unready(&mut self, cx: &mut Context<'_>) { fn poll_unready(&mut self, cx: &mut Context<'_>) {
loop { loop {
match Pin::new(&mut self.unready_services).poll_next(cx) { match Pin::new(&mut self.unready_services).poll_next(cx) {
@ -223,6 +264,7 @@ where
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>; Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.check_for_background_errors(cx)?;
// Process peer discovery updates. // Process peer discovery updates.
let _ = self.poll_discover(cx)?; let _ = self.poll_discover(cx)?;

View File

@ -14,7 +14,7 @@ serde = { version = "1", features = ["serde_derive"] }
toml = "0.5" toml = "0.5"
thiserror = "1" thiserror = "1"
tokio = { version = "0.2", features = ["time", "rt-threaded", "stream"] } tokio = { version = "0.2", features = ["time", "rt-threaded", "stream", "macros"] }
futures = "0.3" futures = "0.3"
tracing = "0.1" tracing = "0.1"

View File

@ -33,10 +33,17 @@ impl Runnable for ConnectCmd {
.rt .rt
.take(); .take();
rt.expect("runtime should not already be taken") let result = rt
.block_on(self.connect()) .expect("runtime should not already be taken")
// Surface any error that occurred executing the future. .block_on(self.connect());
.unwrap();
match result {
Ok(()) => {}
Err(e) => {
eprintln!("Error: {:?}", e);
std::process::exit(1);
}
}
} }
} }

View File

@ -138,7 +138,7 @@ impl SeedCmd {
let config = app_config().network.clone(); let config = app_config().network.clone();
let (mut peer_set, address_book) = zebra_network::init(config, buffered_svc.clone()).await; let (mut peer_set, address_book) = zebra_network::init(config, buffered_svc).await;
let _ = addressbook_tx.send(address_book); let _ = addressbook_tx.send(address_book);

View File

@ -36,7 +36,7 @@ impl Runnable for StartCmd {
let default_config = ZebradConfig::default(); let default_config = ZebradConfig::default();
println!("Default config: {:?}", default_config); println!("Default config: {:?}", default_config);
println!("Toml:\n{}", toml::to_string(&default_config).unwrap()); println!("Toml:\n{}", toml::Value::try_from(&default_config).unwrap());
info!("Starting placeholder loop"); info!("Starting placeholder loop");

View File

@ -53,3 +53,16 @@ impl Default for MetricsSection {
} }
} }
} }
#[cfg(test)]
mod test {
#[test]
fn test_toml_ser() -> color_eyre::Result<()> {
let default_config = super::ZebradConfig::default();
println!("Default config: {:?}", default_config);
println!("Toml:\n{}", toml::Value::try_from(&default_config)?);
Ok(())
}
}