diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index 06e062e2..cac20258 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -22,8 +22,6 @@ use crate::config::ZebradConfig; use crate::{components::tokio::TokioComponent, prelude::*}; use abscissa_core::{config, Command, FrameworkError, Options, Runnable}; use color_eyre::eyre::Report; -use futures::stream::FuturesUnordered; -use std::collections::HashSet; use tower::{buffer::Buffer, service_fn}; use zebra_chain::block::BlockHeaderHash; @@ -60,14 +58,7 @@ impl StartCmd { let (peer_set, _address_book) = zebra_network::init(config, node).await; let verifier = zebra_consensus::verify::init(state.clone()); - let mut syncer = sync::Syncer { - peer_set, - state, - verifier, - block_requests: FuturesUnordered::new(), - fanout: 4, - prospective_tips: HashSet::new(), - }; + let mut syncer = sync::Syncer::new(peer_set, state, verifier); syncer.run().await } diff --git a/zebrad/src/commands/start/sync.rs b/zebrad/src/commands/start/sync.rs index 4b61f19b..8a770973 100644 --- a/zebrad/src/commands/start/sync.rs +++ b/zebrad/src/commands/start/sync.rs @@ -2,14 +2,13 @@ use color_eyre::eyre::{eyre, Report}; use futures::stream::{FuturesUnordered, StreamExt}; use std::{collections::HashSet, iter, sync::Arc, time::Duration}; use tokio::time::delay_for; -use tower::{Service, ServiceExt}; +use tower::{retry::Retry, Service, ServiceExt}; use zebra_chain::{ block::{Block, BlockHeaderHash}, types::BlockHeight, }; - -use zebra_network as zn; -use zebra_state as zs; +use zebra_network::{self as zn, RetryLimit}; +use zebra_state::{self as zs}; pub struct Syncer where @@ -18,11 +17,30 @@ where pub peer_set: ZN, pub state: ZS, pub verifier: ZV, + pub retry_peer_set: Retry, pub prospective_tips: HashSet, pub block_requests: FuturesUnordered, pub fanout: NumReq, } +impl Syncer +where + ZN: Service + Clone, +{ + pub fn new(peer_set: ZN, state: ZS, verifier: ZC) -> Self { + let retry_peer_set = Retry::new(RetryLimit::new(3), peer_set.clone()); + Self { + peer_set, + state, + verifier, + retry_peer_set, + block_requests: FuturesUnordered::new(), + fanout: 4, + prospective_tips: HashSet::new(), + } + } +} + impl Syncer where ZN: Service + Send + Clone + 'static, @@ -224,7 +242,7 @@ where let set = chunk.iter().cloned().collect(); let request = self - .peer_set + .retry_peer_set .ready_and() .await .map_err(|e| eyre!(e))?