From c4118dcc2c490495ceabe2366952f3a086d54006 Mon Sep 17 00:00:00 2001 From: teor Date: Thu, 18 Nov 2021 22:34:51 +1000 Subject: [PATCH] Check for panics in the address book updater task (#3064) * Check for panics in the address book updater task * Fix the return type and tests Co-authored-by: Alfredo Garcia --- zebra-network/src/address_book_updater.rs | 39 +++++++++++++------ zebra-network/src/peer_set/initialize.rs | 17 +++++--- .../src/peer_set/initialize/tests/vectors.rs | 24 ++++++++++-- 3 files changed, 60 insertions(+), 20 deletions(-) diff --git a/zebra-network/src/address_book_updater.rs b/zebra-network/src/address_book_updater.rs index d49ca57f..b09d19b2 100644 --- a/zebra-network/src/address_book_updater.rs +++ b/zebra-network/src/address_book_updater.rs @@ -3,33 +3,47 @@ use std::{net::SocketAddr, sync::Arc}; use futures::{channel::mpsc, prelude::*}; +use thiserror::Error; +use tokio::task::JoinHandle; -use crate::{meta_addr::MetaAddrChange, AddressBook}; +use crate::{meta_addr::MetaAddrChange, AddressBook, BoxError, Config}; /// The `AddressBookUpdater` hooks into incoming message streams for each peer /// and lets the owner of the sender handle update the address book. For /// example, it can be used to record per-connection last-seen timestamps, or /// add new initial peers to the address book. -pub struct AddressBookUpdater {} +#[derive(Debug, Eq, PartialEq)] +pub struct AddressBookUpdater; + +#[derive(Copy, Clone, Debug, Error, Eq, PartialEq, Hash)] +#[error("all address book updater senders are closed")] +pub struct AllAddressBookUpdaterSendersClosed; impl AddressBookUpdater { /// Spawn a new [`AddressBookUpdater`] task, updating a new [`AddressBook`] - /// configured with a `local_listener`. + /// configured with Zebra's actual `local_listener` address. /// - /// Returns handles for the transmission channel for timestamp events, and - /// the address book. + /// Returns handles for: + /// - the transmission channel for address book update events, + /// - the address book, and + /// - the address book updater task. pub fn spawn( + config: &Config, local_listener: SocketAddr, ) -> ( Arc>, mpsc::Sender, + JoinHandle>, ) { use tracing::Level; - const TIMESTAMP_WORKER_BUFFER_SIZE: usize = 100; - let (worker_tx, mut worker_rx) = mpsc::channel(TIMESTAMP_WORKER_BUFFER_SIZE); + + // Create an mpsc channel for peerset address book updates, + // based on the maximum number of inbound and outbound peers. + let (worker_tx, mut worker_rx) = mpsc::channel(config.peerset_total_connection_limit()); + let address_book = Arc::new(std::sync::Mutex::new(AddressBook::new( local_listener, - span!(Level::TRACE, "timestamp collector"), + span!(Level::TRACE, "address book updater"), ))); let worker_address_book = address_book.clone(); @@ -44,9 +58,12 @@ impl AddressBookUpdater { .expect("mutex should be unpoisoned") .update(event); } - }; - tokio::spawn(worker.boxed()); - (address_book, worker_tx) + Err(AllAddressBookUpdaterSendersClosed.into()) + }; + + let address_book_updater_task_handle = tokio::spawn(worker.boxed()); + + (address_book, worker_tx, address_book_updater_task_handle) } } diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index 89f27eee..c626c129 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -95,7 +95,8 @@ where let (tcp_listener, listen_addr) = open_listener(&config.clone()).await; - let (address_book, address_book_updater) = AddressBookUpdater::spawn(listen_addr); + let (address_book, address_book_updater, address_book_updater_guard) = + AddressBookUpdater::spawn(&config, listen_addr); // Create a broadcast channel for peer inventory advertisements. // If it reaches capacity, this channel drops older inventory advertisements. @@ -191,9 +192,13 @@ where .expect("unexpected error connecting to initial peers"); let active_initial_peer_count = active_outbound_connections.update_count(); - // We need to await candidates.update() here, because zcashd only sends one - // `addr` message per connection, and if we only have one initial peer we - // need to ensure that its `addr` message is used by the crawler. + // We need to await candidates.update() here, + // because zcashd rate-limits `addr`/`addrv2` messages per connection, + // and if we only have one initial peer, + // we need to ensure that its `Response::Addr` is used by the crawler. + // + // TODO: cache the most recent `Response::Addr` returned by each peer. + // If the request times out, return the cached response to the caller. info!( ?active_initial_peer_count, @@ -221,7 +226,9 @@ where ); let crawl_guard = tokio::spawn(crawl_fut.instrument(Span::current())); - handle_tx.send(vec![listen_guard, crawl_guard]).unwrap(); + handle_tx + .send(vec![listen_guard, crawl_guard, address_book_updater_guard]) + .unwrap(); (peer_set, address_book) } diff --git a/zebra-network/src/peer_set/initialize/tests/vectors.rs b/zebra-network/src/peer_set/initialize/tests/vectors.rs index 0e41f6fd..0d09074a 100644 --- a/zebra-network/src/peer_set/initialize/tests/vectors.rs +++ b/zebra-network/src/peer_set/initialize/tests/vectors.rs @@ -1142,7 +1142,7 @@ async fn add_initial_peers_is_rate_limited() { let before = Instant::now(); - let (initial_peers_task_handle, peerset_rx) = + let (initial_peers_task_handle, peerset_rx, address_book_updater_task_handle) = spawn_add_initial_peers(PEER_COUNT, outbound_connector).await; let connections = peerset_rx.take(PEER_COUNT).collect::>().await; @@ -1162,6 +1162,19 @@ async fn add_initial_peers_is_rate_limited() { "unexpected error or panic in add_initial_peers task: {:?}", initial_peers_result, ); + + // Check for panics or errors in the address book updater task. + let updater_result = address_book_updater_task_handle.now_or_never(); + assert!( + matches!(updater_result, None) + || matches!(updater_result, Some(Err(ref join_error)) if join_error.is_cancelled()) + // The task method only returns one kind of error. + // We can't check for error equality due to type erasure, + // and we can't downcast due to ownership. + || matches!(updater_result, Some(Ok(Err(ref _all_senders_closed)))), + "unexpected error or panic in address book updater task: {:?}", + updater_result, + ); } /// Test that [`init`] does not deadlock in `add_initial_peers`, @@ -1501,13 +1514,15 @@ where /// Connects to IP addresses in the IPv4 localhost range. /// Does not open a local listener port. /// -/// Returns the task [`JoinHandle`], and the peer set receiver. +/// Returns the initial peers task [`JoinHandle`], the peer set receiver, +/// and the address book updater task join handle. async fn spawn_add_initial_peers( peer_count: usize, outbound_connector: C, ) -> ( JoinHandle>, mpsc::Receiver, + JoinHandle>, ) where C: Service< @@ -1542,10 +1557,11 @@ where let (peerset_tx, peerset_rx) = mpsc::channel::(peer_count + 1); - let (_address_book, address_book_updater) = AddressBookUpdater::spawn(unused_v4); + let (_address_book, address_book_updater, address_book_updater_guard) = + AddressBookUpdater::spawn(&config, unused_v4); let add_fut = add_initial_peers(config, outbound_connector, peerset_tx, address_book_updater); let add_task_handle = tokio::spawn(add_fut); - (add_task_handle, peerset_rx) + (add_task_handle, peerset_rx, address_book_updater_guard) }