From 458c26f1e3e2305b921805a555b7bedd22bc03a6 Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 14 May 2021 12:15:39 +1000 Subject: [PATCH] Limit initial candidate set fanout to the number of initial peers If there is a small number of initial peers, and they are slow, the initial candidate set update can appear to hang. To avoid this issue, limit the initial candidate set fanout to the number of initial peers. Once the initial peers have sent us more peer addresses, there is no need to limit the fanouts for future updates. Reported by Niklas Long of Equilibrium. --- zebra-network/src/peer_set/candidate_set.rs | 25 ++++++++++++++++++--- zebra-network/src/peer_set/initialize.rs | 7 ++++-- 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/zebra-network/src/peer_set/candidate_set.rs b/zebra-network/src/peer_set/candidate_set.rs index d296747b..831d4f0f 100644 --- a/zebra-network/src/peer_set/candidate_set.rs +++ b/zebra-network/src/peer_set/candidate_set.rs @@ -131,7 +131,15 @@ where } } - /// Update the peer set from the network. + /// Update the peer set from the network, using the default fanout limit. + /// + /// See `update_initial` for details. + pub async fn update(&mut self) -> Result<(), BoxError> { + self.update_inner(None).await + } + + /// Update the peer set from the network, limiting the fanout to + /// `fanout_limit`. /// /// - Ask a few live `Responded` peers to send us more peers. /// - Process all completed peer responses, adding new peers in the @@ -139,6 +147,9 @@ where /// /// ## Correctness /// + /// Pass the initial peer set size as `fanout_limit` during initialization, + /// so that Zebra does not send duplicate requests to the same peer. + /// /// The crawler exits when update returns an error, so it must only return /// errors on permanent failures. /// @@ -148,7 +159,15 @@ where /// `report_failed` puts peers into the `Failed` state. /// /// `next` puts peers into the `AttemptPending` state. - pub async fn update(&mut self) -> Result<(), BoxError> { + pub async fn update_initial(&mut self, fanout_limit: usize) -> Result<(), BoxError> { + self.update_inner(Some(fanout_limit)).await + } + + /// Update the peer set from the network, limiting the fanout to + /// `fanout_limit`. + /// + /// See `update_initial` for details. + async fn update_inner(&mut self, fanout_limit: Option) -> Result<(), BoxError> { // Opportunistically crawl the network on every update call to ensure // we're actively fetching peers. Continue independently of whether we // actually receive any peers, but always ask the network for more. @@ -159,7 +178,7 @@ where // called while the peer set is already loaded. let mut responses = FuturesUnordered::new(); trace!("sending GetPeers requests"); - for _ in 0..constants::GET_ADDR_FANOUT { + for _ in 0..fanout_limit.unwrap_or(constants::GET_ADDR_FANOUT) { // CORRECTNESS // // Use a timeout to avoid deadlocks when there are no connected diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index 2ed4d8f0..8841375f 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -137,12 +137,14 @@ where ); // 2. Initial peers, specified in the config. + let (initial_peer_count_tx, initial_peer_count_rx) = tokio::sync::oneshot::channel(); let initial_peers_fut = { let config = config.clone(); let outbound_connector = outbound_connector.clone(); let peerset_tx = peerset_tx.clone(); async move { let initial_peers = config.initial_peers().await; + let _ = initial_peer_count_tx.send(initial_peers.len()); // Connect the tx end to the 3 peer sources: add_initial_peers(initial_peers, outbound_connector, peerset_tx).await } @@ -157,10 +159,11 @@ where // We need to await candidates.update() here, because zcashd only sends one // `addr` message per connection, and if we only have one initial peer we // need to ensure that its `addr` message is used by the crawler. - // XXX this should go in CandidateSet::new, but we need init() -> Result<_,_> info!("Sending initial request for peers"); - let _ = candidates.update().await; + let _ = candidates + .update_initial(initial_peer_count_rx.await.expect("value sent before drop")) + .await; for _ in 0..config.peerset_initial_target_size { let _ = demand_tx.try_send(());