Check for panics in the address book updater task (#3064)

* Check for panics in the address book updater task

* Fix the return type and tests

Co-authored-by: Alfredo Garcia <oxarbitrage@gmail.com>
This commit is contained in:
teor 2021-11-18 22:34:51 +10:00 committed by GitHub
parent 7218b4ffa8
commit c4118dcc2c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 60 additions and 20 deletions

View File

@ -3,33 +3,47 @@
use std::{net::SocketAddr, sync::Arc}; use std::{net::SocketAddr, sync::Arc};
use futures::{channel::mpsc, prelude::*}; use futures::{channel::mpsc, prelude::*};
use thiserror::Error;
use tokio::task::JoinHandle;
use crate::{meta_addr::MetaAddrChange, AddressBook}; use crate::{meta_addr::MetaAddrChange, AddressBook, BoxError, Config};
/// The `AddressBookUpdater` hooks into incoming message streams for each peer /// The `AddressBookUpdater` hooks into incoming message streams for each peer
/// and lets the owner of the sender handle update the address book. For /// and lets the owner of the sender handle update the address book. For
/// example, it can be used to record per-connection last-seen timestamps, or /// example, it can be used to record per-connection last-seen timestamps, or
/// add new initial peers to the address book. /// add new initial peers to the address book.
pub struct AddressBookUpdater {} #[derive(Debug, Eq, PartialEq)]
pub struct AddressBookUpdater;
#[derive(Copy, Clone, Debug, Error, Eq, PartialEq, Hash)]
#[error("all address book updater senders are closed")]
pub struct AllAddressBookUpdaterSendersClosed;
impl AddressBookUpdater { impl AddressBookUpdater {
/// Spawn a new [`AddressBookUpdater`] task, updating a new [`AddressBook`] /// Spawn a new [`AddressBookUpdater`] task, updating a new [`AddressBook`]
/// configured with a `local_listener`. /// configured with Zebra's actual `local_listener` address.
/// ///
/// Returns handles for the transmission channel for timestamp events, and /// Returns handles for:
/// the address book. /// - the transmission channel for address book update events,
/// - the address book, and
/// - the address book updater task.
pub fn spawn( pub fn spawn(
config: &Config,
local_listener: SocketAddr, local_listener: SocketAddr,
) -> ( ) -> (
Arc<std::sync::Mutex<AddressBook>>, Arc<std::sync::Mutex<AddressBook>>,
mpsc::Sender<MetaAddrChange>, mpsc::Sender<MetaAddrChange>,
JoinHandle<Result<(), BoxError>>,
) { ) {
use tracing::Level; use tracing::Level;
const TIMESTAMP_WORKER_BUFFER_SIZE: usize = 100;
let (worker_tx, mut worker_rx) = mpsc::channel(TIMESTAMP_WORKER_BUFFER_SIZE); // Create an mpsc channel for peerset address book updates,
// based on the maximum number of inbound and outbound peers.
let (worker_tx, mut worker_rx) = mpsc::channel(config.peerset_total_connection_limit());
let address_book = Arc::new(std::sync::Mutex::new(AddressBook::new( let address_book = Arc::new(std::sync::Mutex::new(AddressBook::new(
local_listener, local_listener,
span!(Level::TRACE, "timestamp collector"), span!(Level::TRACE, "address book updater"),
))); )));
let worker_address_book = address_book.clone(); let worker_address_book = address_book.clone();
@ -44,9 +58,12 @@ impl AddressBookUpdater {
.expect("mutex should be unpoisoned") .expect("mutex should be unpoisoned")
.update(event); .update(event);
} }
};
tokio::spawn(worker.boxed());
(address_book, worker_tx) Err(AllAddressBookUpdaterSendersClosed.into())
};
let address_book_updater_task_handle = tokio::spawn(worker.boxed());
(address_book, worker_tx, address_book_updater_task_handle)
} }
} }

View File

@ -95,7 +95,8 @@ where
let (tcp_listener, listen_addr) = open_listener(&config.clone()).await; let (tcp_listener, listen_addr) = open_listener(&config.clone()).await;
let (address_book, address_book_updater) = AddressBookUpdater::spawn(listen_addr); let (address_book, address_book_updater, address_book_updater_guard) =
AddressBookUpdater::spawn(&config, listen_addr);
// Create a broadcast channel for peer inventory advertisements. // Create a broadcast channel for peer inventory advertisements.
// If it reaches capacity, this channel drops older inventory advertisements. // If it reaches capacity, this channel drops older inventory advertisements.
@ -191,9 +192,13 @@ where
.expect("unexpected error connecting to initial peers"); .expect("unexpected error connecting to initial peers");
let active_initial_peer_count = active_outbound_connections.update_count(); let active_initial_peer_count = active_outbound_connections.update_count();
// We need to await candidates.update() here, because zcashd only sends one // We need to await candidates.update() here,
// `addr` message per connection, and if we only have one initial peer we // because zcashd rate-limits `addr`/`addrv2` messages per connection,
// need to ensure that its `addr` message is used by the crawler. // and if we only have one initial peer,
// we need to ensure that its `Response::Addr` is used by the crawler.
//
// TODO: cache the most recent `Response::Addr` returned by each peer.
// If the request times out, return the cached response to the caller.
info!( info!(
?active_initial_peer_count, ?active_initial_peer_count,
@ -221,7 +226,9 @@ where
); );
let crawl_guard = tokio::spawn(crawl_fut.instrument(Span::current())); let crawl_guard = tokio::spawn(crawl_fut.instrument(Span::current()));
handle_tx.send(vec![listen_guard, crawl_guard]).unwrap(); handle_tx
.send(vec![listen_guard, crawl_guard, address_book_updater_guard])
.unwrap();
(peer_set, address_book) (peer_set, address_book)
} }

View File

@ -1142,7 +1142,7 @@ async fn add_initial_peers_is_rate_limited() {
let before = Instant::now(); let before = Instant::now();
let (initial_peers_task_handle, peerset_rx) = let (initial_peers_task_handle, peerset_rx, address_book_updater_task_handle) =
spawn_add_initial_peers(PEER_COUNT, outbound_connector).await; spawn_add_initial_peers(PEER_COUNT, outbound_connector).await;
let connections = peerset_rx.take(PEER_COUNT).collect::<Vec<_>>().await; let connections = peerset_rx.take(PEER_COUNT).collect::<Vec<_>>().await;
@ -1162,6 +1162,19 @@ async fn add_initial_peers_is_rate_limited() {
"unexpected error or panic in add_initial_peers task: {:?}", "unexpected error or panic in add_initial_peers task: {:?}",
initial_peers_result, initial_peers_result,
); );
// Check for panics or errors in the address book updater task.
let updater_result = address_book_updater_task_handle.now_or_never();
assert!(
matches!(updater_result, None)
|| matches!(updater_result, Some(Err(ref join_error)) if join_error.is_cancelled())
// The task method only returns one kind of error.
// We can't check for error equality due to type erasure,
// and we can't downcast due to ownership.
|| matches!(updater_result, Some(Ok(Err(ref _all_senders_closed)))),
"unexpected error or panic in address book updater task: {:?}",
updater_result,
);
} }
/// Test that [`init`] does not deadlock in `add_initial_peers`, /// Test that [`init`] does not deadlock in `add_initial_peers`,
@ -1501,13 +1514,15 @@ where
/// Connects to IP addresses in the IPv4 localhost range. /// Connects to IP addresses in the IPv4 localhost range.
/// Does not open a local listener port. /// Does not open a local listener port.
/// ///
/// Returns the task [`JoinHandle`], and the peer set receiver. /// Returns the initial peers task [`JoinHandle`], the peer set receiver,
/// and the address book updater task join handle.
async fn spawn_add_initial_peers<C>( async fn spawn_add_initial_peers<C>(
peer_count: usize, peer_count: usize,
outbound_connector: C, outbound_connector: C,
) -> ( ) -> (
JoinHandle<Result<ActiveConnectionCounter, BoxError>>, JoinHandle<Result<ActiveConnectionCounter, BoxError>>,
mpsc::Receiver<PeerChange>, mpsc::Receiver<PeerChange>,
JoinHandle<Result<(), BoxError>>,
) )
where where
C: Service< C: Service<
@ -1542,10 +1557,11 @@ where
let (peerset_tx, peerset_rx) = mpsc::channel::<PeerChange>(peer_count + 1); let (peerset_tx, peerset_rx) = mpsc::channel::<PeerChange>(peer_count + 1);
let (_address_book, address_book_updater) = AddressBookUpdater::spawn(unused_v4); let (_address_book, address_book_updater, address_book_updater_guard) =
AddressBookUpdater::spawn(&config, unused_v4);
let add_fut = add_initial_peers(config, outbound_connector, peerset_tx, address_book_updater); let add_fut = add_initial_peers(config, outbound_connector, peerset_tx, address_book_updater);
let add_task_handle = tokio::spawn(add_fut); let add_task_handle = tokio::spawn(add_fut);
(add_task_handle, peerset_rx) (add_task_handle, peerset_rx, address_book_updater_guard)
} }