diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index e4e420fe..68aacb2e 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -52,6 +52,8 @@ //! * runs in the background and gossips newly added mempool transactions //! to peers +use std::cmp::max; + use abscissa_core::{config, Command, FrameworkError, Options, Runnable}; use color_eyre::eyre::{eyre, Report}; use futures::FutureExt; @@ -61,7 +63,7 @@ use tracing_futures::Instrument; use crate::{ components::{ - inbound::InboundSetupData, + inbound::{self, InboundSetupData}, mempool::{self, Mempool}, sync, tokio::{RuntimeRun, TokioComponent}, @@ -87,7 +89,9 @@ impl StartCmd { info!("initializing node state"); let (state_service, latest_chain_tip, chain_tip_change) = zebra_state::init(config.state.clone(), config.network.network); - let state = ServiceBuilder::new().buffer(20).service(state_service); + let state = ServiceBuilder::new() + .buffer(Self::state_buffer_bound()) + .service(state_service); info!("initializing network"); // The service that our node uses to respond to requests by peers. The @@ -96,7 +100,7 @@ impl StartCmd { let (setup_tx, setup_rx) = oneshot::channel(); let inbound = ServiceBuilder::new() .load_shed() - .buffer(20) + .buffer(inbound::downloads::MAX_INBOUND_CONCURRENCY) .service(Inbound::new(setup_rx)); let (peer_set, address_book) = @@ -132,7 +136,9 @@ impl StartCmd { chain_tip_change.clone(), ); let mempool = BoxService::new(mempool); - let mempool = ServiceBuilder::new().buffer(20).service(mempool); + let mempool = ServiceBuilder::new() + .buffer(mempool::downloads::MAX_INBOUND_CONCURRENCY) + .service(mempool); let setup_data = InboundSetupData { address_book, @@ -253,6 +259,22 @@ impl StartCmd { exit_status } + + /// Returns the bound for the state service buffer, + /// based on the configurations of the services that use the state concurrently. + fn state_buffer_bound() -> usize { + let config = app_config().clone(); + + // TODO: do we also need to account for concurrent use across services? + // we could multiply the maximum by 3/2, or add a fixed constant + max( + config.sync.max_concurrent_block_requests, + max( + inbound::downloads::MAX_INBOUND_CONCURRENCY, + mempool::downloads::MAX_INBOUND_CONCURRENCY, + ), + ) + } } impl Runnable for StartCmd { diff --git a/zebrad/src/components/inbound.rs b/zebrad/src/components/inbound.rs index 66a2e5d7..734af935 100644 --- a/zebrad/src/components/inbound.rs +++ b/zebrad/src/components/inbound.rs @@ -36,7 +36,8 @@ use super::{ sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT}, }; -mod downloads; +pub(crate) mod downloads; + #[cfg(test)] mod tests; diff --git a/zebrad/src/components/inbound/downloads.rs b/zebrad/src/components/inbound/downloads.rs index aae0736a..163ff079 100644 --- a/zebrad/src/components/inbound/downloads.rs +++ b/zebrad/src/components/inbound/downloads.rs @@ -1,3 +1,5 @@ +//! A download stream that handles gossiped blocks from peers. + use std::{ collections::HashMap, convert::TryFrom, @@ -47,7 +49,7 @@ type BoxError = Box; /// Since Zebra keeps an `inv` index, inbound downloads for malicious blocks /// will be directed to the malicious node that originally gossiped the hash. /// Therefore, this attack can be carried out by a single malicious node. -const MAX_INBOUND_CONCURRENCY: usize = 20; +pub const MAX_INBOUND_CONCURRENCY: usize = 20; /// The action taken in response to a peer's gossiped block hash. pub enum DownloadAction { diff --git a/zebrad/src/components/sync/downloads.rs b/zebrad/src/components/sync/downloads.rs index 81f62a3f..57c8c9e4 100644 --- a/zebrad/src/components/sync/downloads.rs +++ b/zebrad/src/components/sync/downloads.rs @@ -25,18 +25,13 @@ use zebra_chain::{ use zebra_network as zn; use zebra_state as zs; -use super::{DEFAULT_LOOKAHEAD_LIMIT, MAX_TIPS_RESPONSE_HASH_COUNT}; - type BoxError = Box; -/// A divisor used to calculate the extra number of blocks we allow in the +/// A multiplier used to calculate the extra number of blocks we allow in the /// verifier and state pipelines, on top of the lookahead limit. /// /// The extra number of blocks is calculated using -/// `lookahead_limit / VERIFICATION_PIPELINE_SCALING_DIVISOR`. -/// -/// For the default lookahead limit, the extra number of blocks is -/// `4 * MAX_TIPS_RESPONSE_HASH_COUNT`. +/// `lookahead_limit * VERIFICATION_PIPELINE_SCALING_MULTIPLIER`. /// /// This allows the verifier and state queues to hold a few extra tips responses worth of blocks, /// even if the syncer queue is full. Any unused capacity is shared between both queues. @@ -48,8 +43,7 @@ type BoxError = Box; /// the rest of the capacity is reserved for the other queues. /// There is no reserved capacity for the syncer queue: /// if the other queues stay full, the syncer will eventually time out and reset. -const VERIFICATION_PIPELINE_SCALING_DIVISOR: usize = - DEFAULT_LOOKAHEAD_LIMIT / (4 * MAX_TIPS_RESPONSE_HASH_COUNT); +const VERIFICATION_PIPELINE_SCALING_MULTIPLIER: usize = 2; #[derive(Copy, Clone, Debug)] pub(super) struct AlwaysHedge; @@ -282,7 +276,7 @@ where // Scale the height limit with the lookahead limit, // so users with low capacity or under DoS can reduce them both. let lookahead = i32::try_from( - lookahead_limit + lookahead_limit / VERIFICATION_PIPELINE_SCALING_DIVISOR, + lookahead_limit + lookahead_limit * VERIFICATION_PIPELINE_SCALING_MULTIPLIER, ) .expect("fits in i32"); (tip_height + lookahead).expect("tip is much lower than Height::MAX")