diff --git a/zebra-network/src/peer_set/candidate_set.rs b/zebra-network/src/peer_set/candidate_set.rs index d296747b..831d4f0f 100644 --- a/zebra-network/src/peer_set/candidate_set.rs +++ b/zebra-network/src/peer_set/candidate_set.rs @@ -131,7 +131,15 @@ where } } - /// Update the peer set from the network. + /// Update the peer set from the network, using the default fanout limit. + /// + /// See `update_initial` for details. + pub async fn update(&mut self) -> Result<(), BoxError> { + self.update_inner(None).await + } + + /// Update the peer set from the network, limiting the fanout to + /// `fanout_limit`. /// /// - Ask a few live `Responded` peers to send us more peers. /// - Process all completed peer responses, adding new peers in the @@ -139,6 +147,9 @@ where /// /// ## Correctness /// + /// Pass the initial peer set size as `fanout_limit` during initialization, + /// so that Zebra does not send duplicate requests to the same peer. + /// /// The crawler exits when update returns an error, so it must only return /// errors on permanent failures. /// @@ -148,7 +159,15 @@ where /// `report_failed` puts peers into the `Failed` state. /// /// `next` puts peers into the `AttemptPending` state. - pub async fn update(&mut self) -> Result<(), BoxError> { + pub async fn update_initial(&mut self, fanout_limit: usize) -> Result<(), BoxError> { + self.update_inner(Some(fanout_limit)).await + } + + /// Update the peer set from the network, limiting the fanout to + /// `fanout_limit`. + /// + /// See `update_initial` for details. + async fn update_inner(&mut self, fanout_limit: Option) -> Result<(), BoxError> { // Opportunistically crawl the network on every update call to ensure // we're actively fetching peers. Continue independently of whether we // actually receive any peers, but always ask the network for more. @@ -159,7 +178,7 @@ where // called while the peer set is already loaded. let mut responses = FuturesUnordered::new(); trace!("sending GetPeers requests"); - for _ in 0..constants::GET_ADDR_FANOUT { + for _ in 0..fanout_limit.unwrap_or(constants::GET_ADDR_FANOUT) { // CORRECTNESS // // Use a timeout to avoid deadlocks when there are no connected diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index 2ed4d8f0..8841375f 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -137,12 +137,14 @@ where ); // 2. Initial peers, specified in the config. + let (initial_peer_count_tx, initial_peer_count_rx) = tokio::sync::oneshot::channel(); let initial_peers_fut = { let config = config.clone(); let outbound_connector = outbound_connector.clone(); let peerset_tx = peerset_tx.clone(); async move { let initial_peers = config.initial_peers().await; + let _ = initial_peer_count_tx.send(initial_peers.len()); // Connect the tx end to the 3 peer sources: add_initial_peers(initial_peers, outbound_connector, peerset_tx).await } @@ -157,10 +159,11 @@ where // We need to await candidates.update() here, because zcashd only sends one // `addr` message per connection, and if we only have one initial peer we // need to ensure that its `addr` message is used by the crawler. - // XXX this should go in CandidateSet::new, but we need init() -> Result<_,_> info!("Sending initial request for peers"); - let _ = candidates.update().await; + let _ = candidates + .update_initial(initial_peer_count_rx.await.expect("value sent before drop")) + .await; for _ in 0..config.peerset_initial_target_size { let _ = demand_tx.try_send(());