Security: Fix CandidateSet timeout and fanout

* Refactor: Split CandidateSet::update into separate functions
* Security: Apply a timeout to the entire CandidateSet::update
* Security: Stop using very large fanout limits during initialization

Previously, Zebra used the number of resolved peer addresses.
So it was possible for all peers to fail, and for Zebra to hang on the
first update.

And Zebra could send a fanout for each initial peer, regardless
of whether their connection was successful.

Also:
- wait for at least one successful peer before trying an update
- warn if there are no successful initial peers
This commit is contained in:
teor 2021-05-20 18:15:46 +10:00
parent 093075d69a
commit c7ea1395e7
1 changed files with 86 additions and 60 deletions

View File

@ -1,4 +1,4 @@
use std::{mem, sync::Arc, time::Duration};
use std::{cmp::min, mem, sync::Arc, time::Duration};
use futures::stream::{FuturesUnordered, StreamExt};
use tokio::time::{sleep, sleep_until, timeout, Sleep};
@ -133,17 +133,17 @@ where
/// Update the peer set from the network, using the default fanout limit.
///
/// See `update_initial` for details.
/// See [`update_initial`] for details.
pub async fn update(&mut self) -> Result<(), BoxError> {
self.update_inner(None).await
self.update_timeout(None).await
}
/// Update the peer set from the network, limiting the fanout to
/// `fanout_limit`.
///
/// - Ask a few live `Responded` peers to send us more peers.
/// - Ask a few live [`Responded`] peers to send us more peers.
/// - Process all completed peer responses, adding new peers in the
/// `NeverAttempted` state.
/// [`NeverAttemptedGossiped`] state.
///
/// ## Correctness
///
@ -154,20 +154,48 @@ where
/// errors on permanent failures.
///
/// The handshaker sets up the peer message receiver so it also sends a
/// `Responded` peer address update.
/// [`Responded`] peer address update.
///
/// `report_failed` puts peers into the `Failed` state.
/// [`report_failed`] puts peers into the [`Failed`] state.
///
/// `next` puts peers into the `AttemptPending` state.
/// [`next`] puts peers into the [`AttemptPending`] state.
pub async fn update_initial(&mut self, fanout_limit: usize) -> Result<(), BoxError> {
self.update_inner(Some(fanout_limit)).await
self.update_timeout(Some(fanout_limit)).await
}
/// Update the peer set from the network, limiting the fanout to
/// `fanout_limit`, and imposing a timeout on the entire fanout.
///
/// See [`update_initial`] for details.
async fn update_timeout(&mut self, fanout_limit: Option<usize>) -> Result<(), BoxError> {
// CORRECTNESS
//
// Use a timeout to avoid deadlocks when there are no connected
// peers, and:
// - we're waiting on a handshake to complete so there are peers, or
// - another task that handles or adds peers is waiting on this task
// to complete.
if let Ok(fanout_result) =
timeout(constants::REQUEST_TIMEOUT, self.update_fanout(fanout_limit)).await
{
fanout_result?;
} else {
// update must only return an error for permanent failures
info!("timeout waiting for the peer service to become ready");
}
Ok(())
}
/// Update the peer set from the network, limiting the fanout to
/// `fanout_limit`.
///
/// See `update_initial` for details.
async fn update_inner(&mut self, fanout_limit: Option<usize>) -> Result<(), BoxError> {
/// See [`update_initial`] for details.
///
/// # Correctness
///
/// This function does not have a timeout. Use [`update_timeout`] instead.
async fn update_fanout(&mut self, fanout_limit: Option<usize>) -> Result<(), BoxError> {
// Opportunistically crawl the network on every update call to ensure
// we're actively fetching peers. Continue independently of whether we
// actually receive any peers, but always ask the network for more.
@ -177,62 +205,25 @@ where
// existing peers, but we don't make too many because update may be
// called while the peer set is already loaded.
let mut responses = FuturesUnordered::new();
trace!("sending GetPeers requests");
for _ in 0..fanout_limit.unwrap_or(constants::GET_ADDR_FANOUT) {
// CORRECTNESS
//
// Use a timeout to avoid deadlocks when there are no connected
// peers, and:
// - we're waiting on a handshake to complete so there are peers, or
// - another task that handles or adds peers is waiting on this task
// to complete.
let peer_service =
match timeout(constants::REQUEST_TIMEOUT, self.peer_service.ready_and()).await {
// update must only return an error for permanent failures
Err(temporary_error) => {
info!(
?temporary_error,
"timeout waiting for the peer service to become ready"
);
return Ok(());
}
Ok(Err(permanent_error)) => Err(permanent_error)?,
Ok(Ok(peer_service)) => peer_service,
};
let fanout_limit = fanout_limit
.map(|fanout_limit| min(fanout_limit, constants::GET_ADDR_FANOUT))
.unwrap_or(constants::GET_ADDR_FANOUT);
debug!(?fanout_limit, "sending GetPeers requests");
// TODO: launch each fanout in its own task (might require tokio 1.6)
for _ in 0..fanout_limit {
let peer_service = self.peer_service.ready_and().await?;
responses.push(peer_service.call(Request::Peers));
}
while let Some(rsp) = responses.next().await {
match rsp {
Ok(Response::Peers(rsp_addrs)) => {
// Filter new addresses to ensure that gossiped addresses are actually new
let address_book = &self.address_book;
// # Correctness
//
// Briefly hold the address book threaded mutex, each time we
// check an address.
//
// TODO: reduce mutex contention by moving the filtering into
// the address book itself (#1976)
let new_addrs = rsp_addrs
.iter()
.filter(|meta| !address_book.lock().unwrap().contains_addr(&meta.addr))
.collect::<Vec<_>>();
Ok(Response::Peers(addrs)) => {
trace!(
?rsp_addrs,
new_addr_count = ?new_addrs.len(),
addr_count = ?addrs.len(),
?addrs,
"got response to GetPeers"
);
// New addresses are deserialized in the `NeverAttempted` state
//
// # Correctness
//
// Briefly hold the address book threaded mutex, to extend
// the address list.
address_book
.lock()
.unwrap()
.extend(new_addrs.into_iter().cloned());
let addrs = self.validate_addrs(addrs);
self.send_addrs(addrs);
}
Err(e) => {
// since we do a fanout, and new updates are triggered by
@ -246,6 +237,41 @@ where
Ok(())
}
/// Check new `addrs` before adding them to the address book.
///
/// If the data in an address is invalid, this function can:
/// - modify the address data, or
/// - delete the address.
fn validate_addrs(
&self,
addrs: impl IntoIterator<Item = MetaAddr>,
) -> impl IntoIterator<Item = MetaAddr> {
// Note: The address book handles duplicate addresses internally,
// so we don't need to de-duplicate addresses here.
// TODO:
// We should eventually implement these checks in this function:
// - Zebra should stop believing far-future last_seen times from peers (#1871)
// - Zebra should ignore peers that are older than 3 weeks (part of #1865)
// - Zebra should count back 3 weeks from the newest peer timestamp sent
// by the other peer, to compensate for clock skew
// - Zebra should limit the number of addresses it uses from a single Addrs
// response (#1869)
addrs
}
/// Add new `addrs` to the address book.
fn send_addrs(&self, addrs: impl IntoIterator<Item = MetaAddr>) {
// # Correctness
//
// Briefly hold the address book threaded mutex, to extend
// the address list.
//
// Extend handles duplicate addresses internally.
self.address_book.lock().unwrap().extend(addrs);
}
/// Returns the next candidate for a connection attempt, if any are available.
///
/// Returns peers in this order: