diff --git a/zebrad/Cargo.toml b/zebrad/Cargo.toml index 792d9f28..ef592d22 100644 --- a/zebrad/Cargo.toml +++ b/zebrad/Cargo.toml @@ -22,7 +22,7 @@ rand = "0.7" hyper = "0.13.8" futures = "0.3" tokio = { version = "0.2.22", features = ["time", "rt-threaded", "stream", "macros", "tracing", "signal"] } -tower = { version = "0.3", features = ["hedge"] } +tower = { version = "0.3", features = ["hedge", "limit"] } pin-project = "0.4.23" color-eyre = { version = "0.5.6", features = ["issue-url"] } diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index 0a2bc288..619d47bd 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -75,7 +75,7 @@ impl StartCmd { .map_err(|_| eyre!("could not send setup data to inbound service"))?; info!("initializing syncer"); - let mut syncer = ChainSync::new(config.network.network, peer_set, state, verifier); + let mut syncer = ChainSync::new(&config, peer_set, state, verifier); syncer.sync().await } diff --git a/zebrad/src/components/sync.rs b/zebrad/src/components/sync.rs index 54bab275..070e4f16 100644 --- a/zebrad/src/components/sync.rs +++ b/zebrad/src/components/sync.rs @@ -7,16 +7,19 @@ use futures::{ }; use tokio::time::delay_for; use tower::{ - builder::ServiceBuilder, hedge::Hedge, retry::Retry, timeout::Timeout, Service, ServiceExt, + builder::ServiceBuilder, hedge::Hedge, limit::ConcurrencyLimit, retry::Retry, timeout::Timeout, + Service, ServiceExt, }; use zebra_chain::{ block::{self, Block}, - parameters::{genesis_hash, Network}, + parameters::genesis_hash, }; use zebra_network as zn; use zebra_state as zs; +use crate::config::ZebradConfig; + mod downloads; use downloads::{AlwaysHedge, Downloads}; @@ -103,8 +106,14 @@ where state: ZS, prospective_tips: HashSet, genesis_hash: block::Hash, - downloads: - Pin>, AlwaysHedge>, Timeout>>>, + downloads: Pin< + Box< + Downloads< + Hedge>>, AlwaysHedge>, + Timeout, + >, + >, + >, } /// Polls the network to determine whether further blocks are available and @@ -127,7 +136,7 @@ where /// - peers: the zebra-network peers to contact for downloads /// - state: the zebra-state that stores the chain /// - verifier: the zebra-consensus verifier that checks the chain - pub fn new(chain: Network, peers: ZN, state: ZS, verifier: ZV) -> Self { + pub fn new(config: &ZebradConfig, peers: ZN, state: ZS, verifier: ZV) -> Self { let tip_network = Timeout::new(peers.clone(), TIPS_RESPONSE_TIMEOUT); // The Hedge middleware is the outermost layer, hedging requests // between two retry-wrapped networks. The innermost timeout @@ -138,6 +147,7 @@ where // ServiceBuilder::new().hedge(...).retry(...)... let block_network = Hedge::new( ServiceBuilder::new() + .concurrency_limit(config.sync.max_concurrent_block_requests) .retry(zn::RetryLimit::new(BLOCK_DOWNLOAD_RETRY_LIMIT)) .timeout(BLOCK_DOWNLOAD_TIMEOUT) .service(peers), @@ -154,7 +164,7 @@ where Timeout::new(verifier, BLOCK_VERIFY_TIMEOUT), )), prospective_tips: HashSet::new(), - genesis_hash: genesis_hash(chain), + genesis_hash: genesis_hash(config.network.network), } } diff --git a/zebrad/src/config.rs b/zebrad/src/config.rs index a1b48fd5..c2c2a4e0 100644 --- a/zebrad/src/config.rs +++ b/zebrad/src/config.rs @@ -34,6 +34,9 @@ pub struct ZebradConfig { /// Tracing configuration pub tracing: TracingSection, + + /// Sync configuration + pub sync: SyncSection, } /// Tracing configuration section. @@ -122,3 +125,24 @@ impl Default for MetricsSection { } } } + +/// Sync configuration section. +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(deny_unknown_fields, default)] +pub struct SyncSection { + /// The maximum number of concurrent block requests during sync. + /// + /// This is set to a high value by default, so that the concurrency limit is + /// based only on the number of available peers. However, on a slow network, + /// making too many concurrent block requests can overwhelm the connection. + /// Lowering this value may help on slow or unreliable networks. + pub max_concurrent_block_requests: usize, +} + +impl Default for SyncSection { + fn default() -> Self { + Self { + max_concurrent_block_requests: 1_000, + } + } +}