sync: add a concurrency limit for block downloads
This commit is contained in:
parent
0a405c737d
commit
253bab042e
|
|
@ -22,7 +22,7 @@ rand = "0.7"
|
||||||
hyper = "0.13.8"
|
hyper = "0.13.8"
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
tokio = { version = "0.2.22", features = ["time", "rt-threaded", "stream", "macros", "tracing", "signal"] }
|
tokio = { version = "0.2.22", features = ["time", "rt-threaded", "stream", "macros", "tracing", "signal"] }
|
||||||
tower = { version = "0.3", features = ["hedge"] }
|
tower = { version = "0.3", features = ["hedge", "limit"] }
|
||||||
pin-project = "0.4.23"
|
pin-project = "0.4.23"
|
||||||
|
|
||||||
color-eyre = { version = "0.5.6", features = ["issue-url"] }
|
color-eyre = { version = "0.5.6", features = ["issue-url"] }
|
||||||
|
|
|
||||||
|
|
@ -75,7 +75,7 @@ impl StartCmd {
|
||||||
.map_err(|_| eyre!("could not send setup data to inbound service"))?;
|
.map_err(|_| eyre!("could not send setup data to inbound service"))?;
|
||||||
|
|
||||||
info!("initializing syncer");
|
info!("initializing syncer");
|
||||||
let mut syncer = ChainSync::new(config.network.network, peer_set, state, verifier);
|
let mut syncer = ChainSync::new(&config, peer_set, state, verifier);
|
||||||
|
|
||||||
syncer.sync().await
|
syncer.sync().await
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -7,16 +7,19 @@ use futures::{
|
||||||
};
|
};
|
||||||
use tokio::time::delay_for;
|
use tokio::time::delay_for;
|
||||||
use tower::{
|
use tower::{
|
||||||
builder::ServiceBuilder, hedge::Hedge, retry::Retry, timeout::Timeout, Service, ServiceExt,
|
builder::ServiceBuilder, hedge::Hedge, limit::ConcurrencyLimit, retry::Retry, timeout::Timeout,
|
||||||
|
Service, ServiceExt,
|
||||||
};
|
};
|
||||||
|
|
||||||
use zebra_chain::{
|
use zebra_chain::{
|
||||||
block::{self, Block},
|
block::{self, Block},
|
||||||
parameters::{genesis_hash, Network},
|
parameters::genesis_hash,
|
||||||
};
|
};
|
||||||
use zebra_network as zn;
|
use zebra_network as zn;
|
||||||
use zebra_state as zs;
|
use zebra_state as zs;
|
||||||
|
|
||||||
|
use crate::config::ZebradConfig;
|
||||||
|
|
||||||
mod downloads;
|
mod downloads;
|
||||||
use downloads::{AlwaysHedge, Downloads};
|
use downloads::{AlwaysHedge, Downloads};
|
||||||
|
|
||||||
|
|
@ -103,8 +106,14 @@ where
|
||||||
state: ZS,
|
state: ZS,
|
||||||
prospective_tips: HashSet<CheckedTip>,
|
prospective_tips: HashSet<CheckedTip>,
|
||||||
genesis_hash: block::Hash,
|
genesis_hash: block::Hash,
|
||||||
downloads:
|
downloads: Pin<
|
||||||
Pin<Box<Downloads<Hedge<Retry<zn::RetryLimit, Timeout<ZN>>, AlwaysHedge>, Timeout<ZV>>>>,
|
Box<
|
||||||
|
Downloads<
|
||||||
|
Hedge<ConcurrencyLimit<Retry<zn::RetryLimit, Timeout<ZN>>>, AlwaysHedge>,
|
||||||
|
Timeout<ZV>,
|
||||||
|
>,
|
||||||
|
>,
|
||||||
|
>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Polls the network to determine whether further blocks are available and
|
/// Polls the network to determine whether further blocks are available and
|
||||||
|
|
@ -127,7 +136,7 @@ where
|
||||||
/// - peers: the zebra-network peers to contact for downloads
|
/// - peers: the zebra-network peers to contact for downloads
|
||||||
/// - state: the zebra-state that stores the chain
|
/// - state: the zebra-state that stores the chain
|
||||||
/// - verifier: the zebra-consensus verifier that checks the chain
|
/// - verifier: the zebra-consensus verifier that checks the chain
|
||||||
pub fn new(chain: Network, peers: ZN, state: ZS, verifier: ZV) -> Self {
|
pub fn new(config: &ZebradConfig, peers: ZN, state: ZS, verifier: ZV) -> Self {
|
||||||
let tip_network = Timeout::new(peers.clone(), TIPS_RESPONSE_TIMEOUT);
|
let tip_network = Timeout::new(peers.clone(), TIPS_RESPONSE_TIMEOUT);
|
||||||
// The Hedge middleware is the outermost layer, hedging requests
|
// The Hedge middleware is the outermost layer, hedging requests
|
||||||
// between two retry-wrapped networks. The innermost timeout
|
// between two retry-wrapped networks. The innermost timeout
|
||||||
|
|
@ -138,6 +147,7 @@ where
|
||||||
// ServiceBuilder::new().hedge(...).retry(...)...
|
// ServiceBuilder::new().hedge(...).retry(...)...
|
||||||
let block_network = Hedge::new(
|
let block_network = Hedge::new(
|
||||||
ServiceBuilder::new()
|
ServiceBuilder::new()
|
||||||
|
.concurrency_limit(config.sync.max_concurrent_block_requests)
|
||||||
.retry(zn::RetryLimit::new(BLOCK_DOWNLOAD_RETRY_LIMIT))
|
.retry(zn::RetryLimit::new(BLOCK_DOWNLOAD_RETRY_LIMIT))
|
||||||
.timeout(BLOCK_DOWNLOAD_TIMEOUT)
|
.timeout(BLOCK_DOWNLOAD_TIMEOUT)
|
||||||
.service(peers),
|
.service(peers),
|
||||||
|
|
@ -154,7 +164,7 @@ where
|
||||||
Timeout::new(verifier, BLOCK_VERIFY_TIMEOUT),
|
Timeout::new(verifier, BLOCK_VERIFY_TIMEOUT),
|
||||||
)),
|
)),
|
||||||
prospective_tips: HashSet::new(),
|
prospective_tips: HashSet::new(),
|
||||||
genesis_hash: genesis_hash(chain),
|
genesis_hash: genesis_hash(config.network.network),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -34,6 +34,9 @@ pub struct ZebradConfig {
|
||||||
|
|
||||||
/// Tracing configuration
|
/// Tracing configuration
|
||||||
pub tracing: TracingSection,
|
pub tracing: TracingSection,
|
||||||
|
|
||||||
|
/// Sync configuration
|
||||||
|
pub sync: SyncSection,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Tracing configuration section.
|
/// Tracing configuration section.
|
||||||
|
|
@ -122,3 +125,24 @@ impl Default for MetricsSection {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Sync configuration section.
|
||||||
|
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||||
|
#[serde(deny_unknown_fields, default)]
|
||||||
|
pub struct SyncSection {
|
||||||
|
/// The maximum number of concurrent block requests during sync.
|
||||||
|
///
|
||||||
|
/// This is set to a high value by default, so that the concurrency limit is
|
||||||
|
/// based only on the number of available peers. However, on a slow network,
|
||||||
|
/// making too many concurrent block requests can overwhelm the connection.
|
||||||
|
/// Lowering this value may help on slow or unreliable networks.
|
||||||
|
pub max_concurrent_block_requests: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for SyncSection {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
max_concurrent_block_requests: 1_000,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue