diff --git a/zebra-network/src/config.rs b/zebra-network/src/config.rs index 06116b66..fe00e6dc 100644 --- a/zebra-network/src/config.rs +++ b/zebra-network/src/config.rs @@ -16,7 +16,7 @@ mod tests; /// The number of times Zebra will retry each initial peer's DNS resolution, /// before checking if any other initial peers have returned addresses. -const MAX_SINGLE_PEER_RETRIES: usize = 2; +const MAX_SINGLE_PEER_RETRIES: usize = 1; /// Configuration for networking code. #[derive(Clone, Debug, Serialize)] diff --git a/zebra-network/src/constants.rs b/zebra-network/src/constants.rs index fc8f7b5a..8392df41 100644 --- a/zebra-network/src/constants.rs +++ b/zebra-network/src/constants.rs @@ -105,7 +105,13 @@ pub const MIN_PEER_CONNECTION_INTERVAL: Duration = Duration::from_millis(100); /// /// Zebra resists distributed denial of service attacks by making sure that requests for more /// peer addresses are sent at least `MIN_PEER_GET_ADDR_INTERVAL` apart. -pub const MIN_PEER_GET_ADDR_INTERVAL: Duration = Duration::from_secs(10); +pub const MIN_PEER_GET_ADDR_INTERVAL: Duration = Duration::from_secs(30); + +/// The combined timeout for all the requests in [`CandidateSet::update()`][Self::update]. +/// +/// `zcashd` doesn't respond to most `getaddr` requests, +/// so this timeout needs to be short. +pub const PEER_GET_ADDR_TIMEOUT: Duration = Duration::from_secs(8); /// The number of GetAddr requests sent when crawling for new peers. /// @@ -116,9 +122,6 @@ pub const MIN_PEER_GET_ADDR_INTERVAL: Duration = Duration::from_secs(10); /// /// Zebra regularly crawls for new peers, initiating a new crawl every /// [`crawl_new_peer_interval`](crate::config::Config.crawl_new_peer_interval). -/// -/// TODO: limit the number of addresses that Zebra uses from a single peer -/// response (#1869) pub const GET_ADDR_FANOUT: usize = 3; /// The maximum number of addresses allowed in an `addr` or `addrv2` message. diff --git a/zebra-network/src/peer_set/candidate_set.rs b/zebra-network/src/peer_set/candidate_set.rs index 561c2626..7a995106 100644 --- a/zebra-network/src/peer_set/candidate_set.rs +++ b/zebra-network/src/peer_set/candidate_set.rs @@ -197,7 +197,7 @@ where // - another task that handles or adds peers is waiting on this task // to complete. if let Ok(fanout_result) = timeout( - constants::MIN_PEER_GET_ADDR_INTERVAL, + constants::PEER_GET_ADDR_TIMEOUT, self.update_fanout(fanout_limit), ) .await @@ -205,7 +205,7 @@ where fanout_result?; } else { // update must only return an error for permanent failures - info!("timeout waiting for the peer service to become ready"); + info!("timeout waiting for peer service readiness or peer responses"); } self.min_next_crawl = Instant::now() + constants::MIN_PEER_GET_ADDR_INTERVAL; diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index e8191727..801b3ebe 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -57,7 +57,7 @@ use std::{ use futures::{ channel::{mpsc, oneshot}, - future::TryFutureExt, + future::{FutureExt, TryFutureExt}, prelude::*, stream::FuturesUnordered, }; @@ -129,7 +129,7 @@ where /// If this is `Some(addr)`, `addr` must be a key for a peer in `ready_services`. /// If that peer is removed from `ready_services`, we must set the preselected peer to `None`. /// - /// This is handled by [`PeerSet::take_ready_service`] and [`PeerSet::route_all`]. + /// This is handled by [`PeerSet::take_ready_service`]. preselected_p2c_peer: Option, /// Stores gossiped inventory hashes from connected peers. @@ -459,16 +459,16 @@ where /// Performs P2C on `self.ready_services` to randomly select a less-loaded ready service. fn preselect_p2c_peer(&self) -> Option { - self.select_p2c_peer_from_list(self.ready_services.keys().copied().collect()) + self.select_p2c_peer_from_list(&self.ready_services.keys().copied().collect()) } /// Performs P2C on `ready_service_list` to randomly select a less-loaded ready service. - fn select_p2c_peer_from_list(&self, ready_service_list: HashSet) -> Option { + fn select_p2c_peer_from_list(&self, ready_service_list: &HashSet) -> Option { match ready_service_list.len() { 0 => None, 1 => Some( - ready_service_list - .into_iter() + *ready_service_list + .iter() .next() .expect("just checked there is one service"), ), @@ -512,6 +512,18 @@ where } } + /// Randomly chooses `max_peers` ready services, ignoring service load. + /// + /// The chosen peers are unique, but their order is not fully random. + fn select_random_ready_peers(&self, max_peers: usize) -> Vec { + use rand::seq::IteratorRandom; + + self.ready_services + .keys() + .copied() + .choose_multiple(&mut rand::thread_rng(), max_peers) + } + /// Accesses a ready endpoint by `key` and returns its current load. /// /// Returns `None` if the service is not in the ready service list. @@ -559,7 +571,7 @@ where // peers would be able to influence our choice by switching addresses. // But we need the choice to be random, // so that a peer can't provide all our inventory responses. - let peer = self.select_p2c_peer_from_list(inventory_peer_list); + let peer = self.select_p2c_peer_from_list(&inventory_peer_list); match peer.and_then(|key| self.take_ready_service(&key)) { Some(mut svc) => { @@ -576,15 +588,36 @@ where } } - /// Routes a request to all ready peers, ignoring return values. - fn route_all(&mut self, req: Request) -> >::Future { - // This is not needless: otherwise, we'd hold a &mut reference to self.ready_services, - // blocking us from passing &mut self to push_unready. - let ready_services = std::mem::take(&mut self.ready_services); - self.preselected_p2c_peer = None; // All services are now unready. + /// Routes the same request to up to `max_peers` ready peers, ignoring return values. + /// + /// `max_peers` must be at least one, and at most the number of ready peers. + fn route_multiple( + &mut self, + req: Request, + max_peers: usize, + ) -> >::Future { + assert!( + max_peers > 0, + "requests must be routed to at least one peer" + ); + assert!( + max_peers <= self.ready_services.len(), + "requests can only be routed to ready peers" + ); + + // # Security + // + // We choose peers randomly, ignoring load. + // This avoids favouring malicious peers, because peers can influence their own load. + // + // The order of peers isn't completely random, + // but peer request order is not security-sensitive. let futs = FuturesUnordered::new(); - for (key, mut svc) in ready_services { + for key in self.select_random_ready_peers(max_peers) { + let mut svc = self + .take_ready_service(&key) + .expect("selected peers are ready"); futs.push(svc.call(req.clone()).map_err(|_| ())); self.push_unready(key, svc); } @@ -594,13 +627,22 @@ where tracing::debug!( ok.len = results.iter().filter(|r| r.is_ok()).count(), err.len = results.iter().filter(|r| r.is_err()).count(), - "sent peer request broadcast" + "sent peer request to multiple peers" ); Ok(Response::Nil) } .boxed() } + /// Broadcasts the same request to lots of ready peers, ignoring return values. + fn route_broadcast(&mut self, req: Request) -> >::Future { + // Round up, so that if we have one ready peer, it gets the request + let half_ready_peers = (self.ready_services.len() + 1) / 2; + + // Broadcasts ignore the response + self.route_multiple(req, half_ready_peers) + } + /// Logs the peer set size. fn log_peer_set_size(&mut self) { let ready_services_len = self.ready_services.len(); @@ -775,9 +817,9 @@ where self.route_inv(req, hash) } - // Broadcast advertisements to all peers - Request::AdvertiseTransactionIds(_) => self.route_all(req), - Request::AdvertiseBlock(_) => self.route_all(req), + // Broadcast advertisements to lots of peers + Request::AdvertiseTransactionIds(_) => self.route_broadcast(req), + Request::AdvertiseBlock(_) => self.route_broadcast(req), // Choose a random less-loaded peer for all other requests _ => self.route_p2c(req), diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index 36b7d53e..4c7aa874 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -164,9 +164,9 @@ impl StartCmd { peer_set, )); - info!("started initial Zebra tasks"); + info!("spawned initial Zebra tasks"); - // TODO: spawn the syncer task, after making the PeerSet sync and send + // TODO: spawn the syncer task, after making the PeerSet marker::Sync and marker::Send // turn these tasks into a FuturesUnordered? // ongoing futures & tasks diff --git a/zebrad/src/components/mempool.rs b/zebrad/src/components/mempool.rs index ce5ad83e..3534f96b 100644 --- a/zebrad/src/components/mempool.rs +++ b/zebrad/src/components/mempool.rs @@ -306,12 +306,14 @@ impl Mempool { /// Update the mempool state (enabled / disabled) depending on how close to /// the tip is the synchronization, including side effects to state changes. - fn update_state(&mut self) { + /// + /// Returns `true` if the state changed. + fn update_state(&mut self) -> bool { let is_close_to_tip = self.sync_status.is_close_to_tip() || self.is_enabled_by_debug(); if self.is_enabled() == is_close_to_tip { // the active state is up to date - return; + return false; } // Update enabled / disabled state @@ -340,6 +342,8 @@ impl Mempool { // cancelling its download tasks. self.active_state = ActiveState::Disabled } + + true } /// Return whether the mempool is enabled or not. @@ -369,17 +373,22 @@ impl Service for Mempool { Pin> + Send + 'static>>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.update_state(); + let is_state_changed = self.update_state(); // When the mempool is disabled we still return that the service is ready. - // Otherwise, callers could block waiting for the mempool to be enabled, - // which may not be the desired behavior. + // Otherwise, callers could block waiting for the mempool to be enabled. if !self.is_enabled() { return Poll::Ready(Ok(())); } let tip_action = self.chain_tip_change.last_tip_change(); + // If the mempool was just freshly enabled, + // skip resetting and removing mined transactions for this tip. + if is_state_changed { + return Poll::Ready(Ok(())); + } + // Clear the mempool and cancel downloads if there has been a chain tip reset. if matches!(tip_action, Some(TipAction::Reset { .. })) { info!( diff --git a/zebrad/src/components/mempool/crawler.rs b/zebrad/src/components/mempool/crawler.rs index cc8109ab..20d93eac 100644 --- a/zebrad/src/components/mempool/crawler.rs +++ b/zebrad/src/components/mempool/crawler.rs @@ -66,7 +66,7 @@ use crate::components::{ mod tests; /// The number of peers to request transactions from per crawl event. -const FANOUT: usize = 4; +const FANOUT: usize = 3; /// The delay between crawl events. const RATE_LIMIT_DELAY: Duration = Duration::from_secs(75); diff --git a/zebrad/src/components/sync.rs b/zebrad/src/components/sync.rs index a5a85224..fc8852f7 100644 --- a/zebrad/src/components/sync.rs +++ b/zebrad/src/components/sync.rs @@ -42,7 +42,7 @@ pub use recent_sync_lengths::RecentSyncLengths; pub use status::SyncStatus; /// Controls the number of peers used for each ObtainTips and ExtendTips request. -const FANOUT: usize = 4; +const FANOUT: usize = 3; /// Controls how many times we will retry each block download. ///