diff --git a/zebra-network/src/peer_set/candidate_set.rs b/zebra-network/src/peer_set/candidate_set.rs index 831d4f0f..8ff43c27 100644 --- a/zebra-network/src/peer_set/candidate_set.rs +++ b/zebra-network/src/peer_set/candidate_set.rs @@ -1,4 +1,4 @@ -use std::{mem, sync::Arc, time::Duration}; +use std::{cmp::min, mem, sync::Arc, time::Duration}; use futures::stream::{FuturesUnordered, StreamExt}; use tokio::time::{sleep, sleep_until, timeout, Sleep}; @@ -133,17 +133,17 @@ where /// Update the peer set from the network, using the default fanout limit. /// - /// See `update_initial` for details. + /// See [`update_initial`] for details. pub async fn update(&mut self) -> Result<(), BoxError> { - self.update_inner(None).await + self.update_timeout(None).await } /// Update the peer set from the network, limiting the fanout to /// `fanout_limit`. /// - /// - Ask a few live `Responded` peers to send us more peers. + /// - Ask a few live [`Responded`] peers to send us more peers. /// - Process all completed peer responses, adding new peers in the - /// `NeverAttempted` state. + /// [`NeverAttemptedGossiped`] state. /// /// ## Correctness /// @@ -154,20 +154,48 @@ where /// errors on permanent failures. /// /// The handshaker sets up the peer message receiver so it also sends a - /// `Responded` peer address update. + /// [`Responded`] peer address update. /// - /// `report_failed` puts peers into the `Failed` state. + /// [`report_failed`] puts peers into the [`Failed`] state. /// - /// `next` puts peers into the `AttemptPending` state. + /// [`next`] puts peers into the [`AttemptPending`] state. pub async fn update_initial(&mut self, fanout_limit: usize) -> Result<(), BoxError> { - self.update_inner(Some(fanout_limit)).await + self.update_timeout(Some(fanout_limit)).await + } + + /// Update the peer set from the network, limiting the fanout to + /// `fanout_limit`, and imposing a timeout on the entire fanout. + /// + /// See [`update_initial`] for details. + async fn update_timeout(&mut self, fanout_limit: Option) -> Result<(), BoxError> { + // CORRECTNESS + // + // Use a timeout to avoid deadlocks when there are no connected + // peers, and: + // - we're waiting on a handshake to complete so there are peers, or + // - another task that handles or adds peers is waiting on this task + // to complete. + if let Ok(fanout_result) = + timeout(constants::REQUEST_TIMEOUT, self.update_fanout(fanout_limit)).await + { + fanout_result?; + } else { + // update must only return an error for permanent failures + info!("timeout waiting for the peer service to become ready"); + } + + Ok(()) } /// Update the peer set from the network, limiting the fanout to /// `fanout_limit`. /// - /// See `update_initial` for details. - async fn update_inner(&mut self, fanout_limit: Option) -> Result<(), BoxError> { + /// See [`update_initial`] for details. + /// + /// # Correctness + /// + /// This function does not have a timeout. Use [`update_timeout`] instead. + async fn update_fanout(&mut self, fanout_limit: Option) -> Result<(), BoxError> { // Opportunistically crawl the network on every update call to ensure // we're actively fetching peers. Continue independently of whether we // actually receive any peers, but always ask the network for more. @@ -177,62 +205,25 @@ where // existing peers, but we don't make too many because update may be // called while the peer set is already loaded. let mut responses = FuturesUnordered::new(); - trace!("sending GetPeers requests"); - for _ in 0..fanout_limit.unwrap_or(constants::GET_ADDR_FANOUT) { - // CORRECTNESS - // - // Use a timeout to avoid deadlocks when there are no connected - // peers, and: - // - we're waiting on a handshake to complete so there are peers, or - // - another task that handles or adds peers is waiting on this task - // to complete. - let peer_service = - match timeout(constants::REQUEST_TIMEOUT, self.peer_service.ready_and()).await { - // update must only return an error for permanent failures - Err(temporary_error) => { - info!( - ?temporary_error, - "timeout waiting for the peer service to become ready" - ); - return Ok(()); - } - Ok(Err(permanent_error)) => Err(permanent_error)?, - Ok(Ok(peer_service)) => peer_service, - }; + let fanout_limit = fanout_limit + .map(|fanout_limit| min(fanout_limit, constants::GET_ADDR_FANOUT)) + .unwrap_or(constants::GET_ADDR_FANOUT); + debug!(?fanout_limit, "sending GetPeers requests"); + // TODO: launch each fanout in its own task (might require tokio 1.6) + for _ in 0..fanout_limit { + let peer_service = self.peer_service.ready_and().await?; responses.push(peer_service.call(Request::Peers)); } while let Some(rsp) = responses.next().await { match rsp { - Ok(Response::Peers(rsp_addrs)) => { - // Filter new addresses to ensure that gossiped addresses are actually new - let address_book = &self.address_book; - // # Correctness - // - // Briefly hold the address book threaded mutex, each time we - // check an address. - // - // TODO: reduce mutex contention by moving the filtering into - // the address book itself (#1976) - let new_addrs = rsp_addrs - .iter() - .filter(|meta| !address_book.lock().unwrap().contains_addr(&meta.addr)) - .collect::>(); + Ok(Response::Peers(addrs)) => { trace!( - ?rsp_addrs, - new_addr_count = ?new_addrs.len(), + addr_count = ?addrs.len(), + ?addrs, "got response to GetPeers" ); - - // New addresses are deserialized in the `NeverAttempted` state - // - // # Correctness - // - // Briefly hold the address book threaded mutex, to extend - // the address list. - address_book - .lock() - .unwrap() - .extend(new_addrs.into_iter().cloned()); + let addrs = self.validate_addrs(addrs); + self.send_addrs(addrs); } Err(e) => { // since we do a fanout, and new updates are triggered by @@ -246,6 +237,41 @@ where Ok(()) } + /// Check new `addrs` before adding them to the address book. + /// + /// If the data in an address is invalid, this function can: + /// - modify the address data, or + /// - delete the address. + fn validate_addrs( + &self, + addrs: impl IntoIterator, + ) -> impl IntoIterator { + // Note: The address book handles duplicate addresses internally, + // so we don't need to de-duplicate addresses here. + + // TODO: + // We should eventually implement these checks in this function: + // - Zebra should stop believing far-future last_seen times from peers (#1871) + // - Zebra should ignore peers that are older than 3 weeks (part of #1865) + // - Zebra should count back 3 weeks from the newest peer timestamp sent + // by the other peer, to compensate for clock skew + // - Zebra should limit the number of addresses it uses from a single Addrs + // response (#1869) + + addrs + } + + /// Add new `addrs` to the address book. + fn send_addrs(&self, addrs: impl IntoIterator) { + // # Correctness + // + // Briefly hold the address book threaded mutex, to extend + // the address list. + // + // Extend handles duplicate addresses internally. + self.address_book.lock().unwrap().extend(addrs); + } + /// Returns the next candidate for a connection attempt, if any are available. /// /// Returns peers in this order: