diff --git a/.github/workflows/continous-integration-docker.yml b/.github/workflows/continous-integration-docker.yml index cbf8fbfa..88d9a9d4 100644 --- a/.github/workflows/continous-integration-docker.yml +++ b/.github/workflows/continous-integration-docker.yml @@ -377,7 +377,8 @@ jobs: app_name: zebrad test_id: full-sync-to-tip test_description: Test a full sync up to the tip - test_variables: '-e TEST_FULL_SYNC=1 -e ZEBRA_FORCE_USE_COLOR=1 -e FULL_SYNC_MAINNET_TIMEOUT_MINUTES=600' + # The value of FULL_SYNC_MAINNET_TIMEOUT_MINUTES is currently ignored. + test_variables: '-e TEST_FULL_SYNC=1 -e ZEBRA_FORCE_USE_COLOR=1 -e FULL_SYNC_MAINNET_TIMEOUT_MINUTES=0' # This test runs for longer than 6 hours, so it needs multiple jobs is_long_test: true needs_zebra_state: false diff --git a/zebra-state/src/lib.rs b/zebra-state/src/lib.rs index 8cfe7e13..f5421217 100644 --- a/zebra-state/src/lib.rs +++ b/zebra-state/src/lib.rs @@ -35,7 +35,9 @@ pub use request::{FinalizedBlock, HashOrHeight, PreparedBlock, ReadRequest, Requ pub use response::{ReadResponse, Response}; pub use service::{ chain_tip::{ChainTipChange, LatestChainTip, TipAction}, - init, spawn_init, OutputIndex, OutputLocation, TransactionLocation, + init, spawn_init, + watch_receiver::WatchReceiver, + OutputIndex, OutputLocation, TransactionLocation, }; #[cfg(any(test, feature = "proptest-impl"))] diff --git a/zebrad/src/components/sync.rs b/zebrad/src/components/sync.rs index c33ea98e..74fa378d 100644 --- a/zebrad/src/components/sync.rs +++ b/zebrad/src/components/sync.rs @@ -8,7 +8,7 @@ use color_eyre::eyre::{eyre, Report}; use futures::stream::{FuturesUnordered, StreamExt}; use indexmap::IndexSet; use serde::{Deserialize, Serialize}; -use tokio::time::sleep; +use tokio::{sync::watch, time::sleep}; use tower::{ builder::ServiceBuilder, hedge::Hedge, limit::ConcurrencyLimit, retry::Retry, timeout::Timeout, Service, ServiceExt, @@ -83,8 +83,7 @@ pub const MIN_CHECKPOINT_CONCURRENCY_LIMIT: usize = zebra_consensus::MAX_CHECKPO /// The default for the user-specified lookahead limit. /// /// See [`MIN_CHECKPOINT_CONCURRENCY_LIMIT`] for details. -pub const DEFAULT_CHECKPOINT_CONCURRENCY_LIMIT: usize = - zebra_consensus::MAX_CHECKPOINT_HEIGHT_GAP * 2; +pub const DEFAULT_CHECKPOINT_CONCURRENCY_LIMIT: usize = MAX_TIPS_RESPONSE_HASH_COUNT * 2; /// A lower bound on the user-specified concurrency limit. /// @@ -359,6 +358,10 @@ where /// The lengths of recent sync responses. recent_syncs: RecentSyncLengths, + + /// Receiver that is `true` when the downloader is past the lookahead limit. + /// This is based on the downloaded block height and the state tip height. + past_lookahead_limit_receiver: zs::WatchReceiver, } /// Polls the network to determine whether further blocks are available and @@ -438,6 +441,7 @@ where } let tip_network = Timeout::new(peers.clone(), TIPS_RESPONSE_TIMEOUT); + // The Hedge middleware is the outermost layer, hedging requests // between two retry-wrapped networks. The innermost timeout // layer is relatively unimportant, because slow requests will @@ -464,27 +468,33 @@ where let (sync_status, recent_syncs) = SyncStatus::new(); + let (past_lookahead_limit_sender, past_lookahead_limit_receiver) = watch::channel(false); + let past_lookahead_limit_receiver = zs::WatchReceiver::new(past_lookahead_limit_receiver); + + let downloads = Box::pin(Downloads::new( + block_network, + verifier, + latest_chain_tip.clone(), + past_lookahead_limit_sender, + max( + checkpoint_verify_concurrency_limit, + full_verify_concurrency_limit, + ), + max_checkpoint_height, + )); + let new_syncer = Self { genesis_hash: genesis_hash(config.network.network), max_checkpoint_height, checkpoint_verify_concurrency_limit, full_verify_concurrency_limit, tip_network, - downloads: Box::pin(Downloads::new( - block_network, - verifier, - latest_chain_tip.clone(), - // TODO: change the download lookahead for full verification? - max( - checkpoint_verify_concurrency_limit, - full_verify_concurrency_limit, - ), - max_checkpoint_height, - )), + downloads, state, latest_chain_tip, prospective_tips: HashSet::new(), recent_syncs, + past_lookahead_limit_receiver, }; (new_syncer, sync_status) @@ -545,7 +555,14 @@ where } self.update_metrics(); - while self.downloads.in_flight() >= self.lookahead_limit(extra_hashes.len()) { + // Pause new downloads while the syncer or downloader are past their lookahead limits. + // + // To avoid a deadlock or long waits for blocks to expire, we ignore the download + // lookahead limit when there are only a small number of blocks waiting. + while self.downloads.in_flight() >= self.lookahead_limit(extra_hashes.len()) + || (self.downloads.in_flight() >= self.lookahead_limit(extra_hashes.len()) / 2 + && self.past_lookahead_limit_receiver.cloned_watch_data()) + { trace!( tips.len = self.prospective_tips.len(), in_flight = self.downloads.in_flight(), @@ -957,7 +974,7 @@ where } /// The configured lookahead limit, based on the currently verified height, - /// and the number of hashes we haven't queued yet.. + /// and the number of hashes we haven't queued yet. fn lookahead_limit(&self, new_hashes: usize) -> usize { let max_checkpoint_height: usize = self .max_checkpoint_height diff --git a/zebrad/src/components/sync/downloads.rs b/zebrad/src/components/sync/downloads.rs index 36038308..10c4bda8 100644 --- a/zebrad/src/components/sync/downloads.rs +++ b/zebrad/src/components/sync/downloads.rs @@ -4,7 +4,7 @@ use std::{ collections::HashMap, convert::{self, TryFrom}, pin::Pin, - sync::Arc, + sync::{Arc, TryLockError}, task::{Context, Poll}, }; @@ -15,7 +15,11 @@ use futures::{ }; use pin_project::pin_project; use thiserror::Error; -use tokio::{sync::oneshot, task::JoinHandle, time::timeout}; +use tokio::{ + sync::{oneshot, watch}, + task::JoinHandle, + time::timeout, +}; use tower::{hedge, Service, ServiceExt}; use tracing_futures::Instrument; @@ -42,14 +46,17 @@ type BoxError = Box; /// to hold a few extra tips responses worth of blocks, /// even if the syncer queue is full. Any unused capacity is shared between both queues. /// -/// If this capacity is exceeded, the downloader will start failing download blocks with -/// [`BlockDownloadVerifyError::AboveLookaheadHeightLimit`], and the syncer will reset. +/// If this capacity is exceeded, the downloader will tell the syncer to pause new downloads. /// /// Since the syncer queue is limited to the `lookahead_limit`, /// the rest of the capacity is reserved for the other queues. /// There is no reserved capacity for the syncer queue: /// if the other queues stay full, the syncer will eventually time out and reset. -pub const VERIFICATION_PIPELINE_SCALING_MULTIPLIER: usize = 5; +pub const VERIFICATION_PIPELINE_SCALING_MULTIPLIER: usize = 2; + +/// The maximum height difference between Zebra's state tip and a downloaded block. +/// Blocks higher than this will get dropped and return an error. +pub const VERIFICATION_PIPELINE_DROP_LIMIT: i32 = 50_000; #[derive(Copy, Clone, Debug)] pub(super) struct AlwaysHedge; @@ -89,6 +96,14 @@ pub enum BlockDownloadVerifyError { hash: block::Hash, }, + /// A downloaded block was a long way ahead of the state chain tip. + /// This error should be very rare during normal operation. + /// + /// We need to reset the syncer on this error, to allow the verifier and state to catch up, + /// or prevent it following a bad chain. + /// + /// If we don't reset the syncer on this error, it will continue downloading blocks from a bad + /// chain, or blocks far ahead of the current state tip. #[error("downloaded block was too far ahead of the chain tip: {height:?} {hash:?}")] AboveLookaheadHeightLimit { height: block::Height, @@ -157,6 +172,7 @@ where ZSTip: ChainTip + Clone + Send + 'static, { // Services + // /// A service that forwards requests to connected peers, and returns their /// responses. network: ZN, @@ -168,13 +184,24 @@ where latest_chain_tip: ZSTip, // Configuration + // /// The configured lookahead limit, after applying the minimum limit. lookahead_limit: usize, /// The largest block height for the checkpoint verifier, based on the current config. max_checkpoint_height: Height, + // Shared syncer state + // + /// Sender that is set to `true` when the downloader is past the lookahead limit. + /// This is based on the downloaded block height and the state tip height. + past_lookahead_limit_sender: Arc>>, + + /// Receiver for `past_lookahead_limit_sender`, which is used to avoid accessing the mutex. + past_lookahead_limit_receiver: zs::WatchReceiver, + // Internal downloads state + // /// A list of pending block download and verify tasks. #[pin] pending: FuturesUnordered< @@ -259,15 +286,23 @@ where network: ZN, verifier: ZV, latest_chain_tip: ZSTip, + past_lookahead_limit_sender: watch::Sender, lookahead_limit: usize, max_checkpoint_height: Height, ) -> Self { + let past_lookahead_limit_receiver = + zs::WatchReceiver::new(past_lookahead_limit_sender.subscribe()); + Self { network, verifier, latest_chain_tip, lookahead_limit, max_checkpoint_height, + past_lookahead_limit_sender: Arc::new(std::sync::Mutex::new( + past_lookahead_limit_sender, + )), + past_lookahead_limit_receiver, pending: FuturesUnordered::new(), cancel_handles: HashMap::new(), } @@ -307,9 +342,13 @@ where let mut verifier = self.verifier.clone(); let latest_chain_tip = self.latest_chain_tip.clone(); + let lookahead_limit = self.lookahead_limit; let max_checkpoint_height = self.max_checkpoint_height; + let past_lookahead_limit_sender = self.past_lookahead_limit_sender.clone(); + let past_lookahead_limit_receiver = self.past_lookahead_limit_receiver.clone(); + let task = tokio::spawn( async move { // Download the block. @@ -346,19 +385,26 @@ where // that will timeout before being verified. let tip_height = latest_chain_tip.best_tip_height(); - // TODO: don't use VERIFICATION_PIPELINE_SCALING_MULTIPLIER for full verification? - let max_lookahead_height = if let Some(tip_height) = tip_height { + let (lookahead_drop_height, lookahead_pause_height, lookahead_reset_height) = if let Some(tip_height) = tip_height { // Scale the height limit with the lookahead limit, // so users with low capacity or under DoS can reduce them both. - let lookahead = i32::try_from( + let lookahead_pause = i32::try_from( lookahead_limit + lookahead_limit * VERIFICATION_PIPELINE_SCALING_MULTIPLIER, ) - .expect("fits in i32"); - (tip_height + lookahead).expect("tip is much lower than Height::MAX") + .expect("fits in i32"); + + + ((tip_height + VERIFICATION_PIPELINE_DROP_LIMIT).expect("tip is much lower than Height::MAX"), + (tip_height + lookahead_pause).expect("tip is much lower than Height::MAX"), + (tip_height + lookahead_pause/2).expect("tip is much lower than Height::MAX")) } else { + let genesis_drop = VERIFICATION_PIPELINE_DROP_LIMIT.try_into().expect("fits in u32"); let genesis_lookahead = u32::try_from(lookahead_limit - 1).expect("fits in u32"); - block::Height(genesis_lookahead) + + (block::Height(genesis_drop), + block::Height(genesis_lookahead), + block::Height(genesis_lookahead/2)) }; // Get the finalized tip height, assuming we're using the non-finalized state. @@ -388,28 +434,59 @@ where return Err(BlockDownloadVerifyError::InvalidHeight { hash }); }; - if block_height > max_lookahead_height { - info!( - ?hash, - ?block_height, - ?tip_height, - ?max_lookahead_height, - lookahead_limit = ?lookahead_limit, - "synced block height too far ahead of the tip: dropped downloaded block", - ); - metrics::counter!("sync.max.height.limit.dropped.block.count", 1); - - // This error should be very rare during normal operation. - // - // We need to reset the syncer on this error, - // to allow the verifier and state to catch up, - // or prevent it following a bad chain. - // - // If we don't reset the syncer on this error, - // it will continue downloading blocks from a bad chain, - // (or blocks far ahead of the current state tip). + if block_height > lookahead_drop_height { Err(BlockDownloadVerifyError::AboveLookaheadHeightLimit { height: block_height, hash })?; - } else if block_height < min_accepted_height { + } else if block_height > lookahead_pause_height { + // This log can be very verbose, usually hundreds of blocks are dropped. + // So we only log at info level for the first above-height block. + if !past_lookahead_limit_receiver.cloned_watch_data() { + info!( + ?hash, + ?block_height, + ?tip_height, + ?lookahead_pause_height, + ?lookahead_reset_height, + lookahead_limit = ?lookahead_limit, + "synced block height too far ahead of the tip: \ + waiting for downloaded blocks to commit to the state", + ); + + // Set the watched value to true, since we're over the limit. + // + // It is ok to block here, because we're going to pause new downloads anyway. + // But if Zebra is shutting down, ignore the send error. + let _ = past_lookahead_limit_sender.lock().expect("thread panicked while holding the past_lookahead_limit_sender mutex guard").send(true); + } else { + debug!( + ?hash, + ?block_height, + ?tip_height, + ?lookahead_pause_height, + ?lookahead_reset_height, + lookahead_limit = ?lookahead_limit, + "synced block height too far ahead of the tip: \ + waiting for downloaded blocks to commit to the state", + ); + } + + metrics::counter!("sync.max.height.limit.paused.count", 1); + } else if block_height <= lookahead_reset_height && past_lookahead_limit_receiver.cloned_watch_data() { + // Try to reset the watched value to false, since we're well under the limit. + match past_lookahead_limit_sender.try_lock() { + Ok(watch_sender_guard) => { + // If Zebra is shutting down, ignore the send error. + let _ = watch_sender_guard.send(true); + metrics::counter!("sync.max.height.limit.reset.count", 1); + }, + Err(TryLockError::Poisoned(_)) => panic!("thread panicked while holding the past_lookahead_limit_sender mutex guard"), + // We'll try allowing new downloads when we get the next block + Err(TryLockError::WouldBlock) => {} + } + + metrics::counter!("sync.max.height.limit.reset.attempt.count", 1); + } + + if block_height < min_accepted_height { debug!( ?hash, ?block_height, @@ -504,8 +581,14 @@ where assert!(self.cancel_handles.is_empty()); } - /// Get the number of currently in-flight download tasks. + /// Get the number of currently in-flight download and verify tasks. pub fn in_flight(&mut self) -> usize { self.pending.len() } + + /// Returns true if there are no in-flight download and verify tasks. + #[allow(dead_code)] + pub fn is_empty(&mut self) -> bool { + self.pending.is_empty() + } } diff --git a/zebrad/src/components/sync/tests/vectors.rs b/zebrad/src/components/sync/tests/vectors.rs index 50cee64e..fa394147 100644 --- a/zebrad/src/components/sync/tests/vectors.rs +++ b/zebrad/src/components/sync/tests/vectors.rs @@ -495,9 +495,9 @@ async fn sync_blocks_duplicate_hashes_ok() -> Result<(), crate::BoxError> { Ok(()) } -/// Test that zebra-network rejects blocks with the wrong hash. +/// Test that zebra-network rejects blocks that are a long way ahead of the state tip. #[tokio::test] -async fn sync_block_wrong_hash() -> Result<(), crate::BoxError> { +async fn sync_block_lookahead_drop() -> Result<(), crate::BoxError> { // Get services let ( chain_sync_future, @@ -526,13 +526,15 @@ async fn sync_block_wrong_hash() -> Result<(), crate::BoxError> { .await .respond(zs::Response::Depth(None)); - // Block 0 is fetched, but the peer returns a much higher block + // Block 0 is fetched, but the peer returns a much higher block. + // (Mismatching hashes are usually ignored by the network service, + // but we use them here to test the syncer lookahead.) peer_set .expect_request(zn::Request::BlocksByHash(iter::once(block0_hash).collect())) .await .respond(zn::Response::Blocks(vec![Available(block982k.clone())])); - // Block is dropped because it has the wrong hash. + // Block is dropped because it is too far ahead of the tip. // We expect more requests to the state service, because the syncer keeps on running. peer_set.expect_no_requests().await; chain_verifier.expect_no_requests().await; diff --git a/zebrad/tests/acceptance.rs b/zebrad/tests/acceptance.rs index e3e365fa..4e91e8ae 100644 --- a/zebrad/tests/acceptance.rs +++ b/zebrad/tests/acceptance.rs @@ -169,6 +169,9 @@ use common::{ /// This limit only applies to some tests. pub const MAX_ASYNC_BLOCKING_TIME: Duration = zebra_test::mock_service::DEFAULT_MAX_REQUEST_DELAY; +/// The test config file prefix for `--feature getblocktemplate-rpcs` configs. +pub const GET_BLOCK_TEMPLATE_CONFIG_PREFIX: &str = "getblocktemplate-"; + #[test] fn generate_no_args() -> Result<()> { let _init_guard = zebra_test::init(); @@ -702,11 +705,22 @@ fn last_config_is_stored() -> Result<()> { Err(eyre!( "latest zebrad config is not being tested for compatibility.\n\ - Run:\n\ - zebrad generate |\n\ - sed \"s/cache_dir = '.*'/cache_dir = 'cache_dir'/\" >\n\ - zebrad/tests/common/configs/.toml\n\ - and commit the latest config to Zebra's git repository" + Run: \n\ + cargo build {}--bin zebrad && \n\ + zebrad generate | \n\ + sed \"s/cache_dir = '.*'/cache_dir = 'cache_dir'/\" > \n\ + zebrad/tests/common/configs/{}.toml \n\ + and commit the latest config to Zebra's git repository", + if cfg!(feature = "getblocktemplate-rpcs") { + "--features=getblocktemplate-rpcs " + } else { + "" + }, + if cfg!(feature = "getblocktemplate-rpcs") { + GET_BLOCK_TEMPLATE_CONFIG_PREFIX + } else { + "" + }, )) } @@ -799,7 +813,7 @@ fn stored_configs_works() -> Result<()> { .file_name() .into_string() .expect("all files names should be string convertible") - .starts_with("getblocktemplate-") + .starts_with(GET_BLOCK_TEMPLATE_CONFIG_PREFIX) { continue; } diff --git a/zebrad/tests/common/configs/getblocktemplate-v1.0.0-rc.0.toml b/zebrad/tests/common/configs/getblocktemplate-v1.0.0-rc.1.toml similarity index 100% rename from zebrad/tests/common/configs/getblocktemplate-v1.0.0-rc.0.toml rename to zebrad/tests/common/configs/getblocktemplate-v1.0.0-rc.1.toml diff --git a/zebrad/tests/common/configs/getblocktemplate-v1.0.0-rc.2.toml b/zebrad/tests/common/configs/getblocktemplate-v1.0.0-rc.2.toml new file mode 100644 index 00000000..ba578bd3 --- /dev/null +++ b/zebrad/tests/common/configs/getblocktemplate-v1.0.0-rc.2.toml @@ -0,0 +1,72 @@ +# Default configuration for zebrad. +# +# This file can be used as a skeleton for custom configs. +# +# Unspecified fields use default values. Optional fields are Some(field) if the +# field is present and None if it is absent. +# +# This file is generated as an example using zebrad's current defaults. +# You should set only the config options you want to keep, and delete the rest. +# Only a subset of fields are present in the skeleton, since optional values +# whose default is None are omitted. +# +# The config format (including a complete list of sections and fields) is +# documented here: +# https://doc.zebra.zfnd.org/zebrad/config/struct.ZebradConfig.html +# +# zebrad attempts to load configs in the following order: +# +# 1. The -c flag on the command line, e.g., `zebrad -c myconfig.toml start`; +# 2. The file `zebrad.toml` in the users's preference directory (platform-dependent); +# 3. The default config. + +[consensus] +checkpoint_sync = true +debug_skip_parameter_preload = false + +[mempool] +eviction_memory_time = '1h' +tx_cost_limit = 80000000 + +[metrics] + +[mining] + +[network] +crawl_new_peer_interval = '1m 1s' +initial_mainnet_peers = [ + 'dnsseed.z.cash:8233', + 'dnsseed.str4d.xyz:8233', + 'mainnet.seeder.zfnd.org:8233', + 'mainnet.is.yolo.money:8233', +] +initial_testnet_peers = [ + 'dnsseed.testnet.z.cash:18233', + 'testnet.seeder.zfnd.org:18233', + 'testnet.is.yolo.money:18233', +] +listen_addr = '0.0.0.0:8233' +network = 'Mainnet' +peerset_initial_target_size = 25 + +[rpc] +debug_force_finished_sync = false +parallel_cpu_threads = 1 + +[state] +cache_dir = 'cache_dir' +delete_old_database = true +ephemeral = false + +[sync] +checkpoint_verify_concurrency_limit = 1000 +download_concurrency_limit = 50 +full_verify_concurrency_limit = 20 +parallel_cpu_threads = 0 + +[tracing] +buffer_limit = 128000 +force_use_color = false +use_color = true +use_journald = false + diff --git a/zebrad/tests/common/configs/v1.0.0-rc.2.toml b/zebrad/tests/common/configs/v1.0.0-rc.2.toml new file mode 100644 index 00000000..8622db79 --- /dev/null +++ b/zebrad/tests/common/configs/v1.0.0-rc.2.toml @@ -0,0 +1,70 @@ +# Default configuration for zebrad. +# +# This file can be used as a skeleton for custom configs. +# +# Unspecified fields use default values. Optional fields are Some(field) if the +# field is present and None if it is absent. +# +# This file is generated as an example using zebrad's current defaults. +# You should set only the config options you want to keep, and delete the rest. +# Only a subset of fields are present in the skeleton, since optional values +# whose default is None are omitted. +# +# The config format (including a complete list of sections and fields) is +# documented here: +# https://doc.zebra.zfnd.org/zebrad/config/struct.ZebradConfig.html +# +# zebrad attempts to load configs in the following order: +# +# 1. The -c flag on the command line, e.g., `zebrad -c myconfig.toml start`; +# 2. The file `zebrad.toml` in the users's preference directory (platform-dependent); +# 3. The default config. + +[consensus] +checkpoint_sync = true +debug_skip_parameter_preload = false + +[mempool] +eviction_memory_time = '1h' +tx_cost_limit = 80000000 + +[metrics] + +[network] +crawl_new_peer_interval = '1m 1s' +initial_mainnet_peers = [ + 'dnsseed.z.cash:8233', + 'dnsseed.str4d.xyz:8233', + 'mainnet.seeder.zfnd.org:8233', + 'mainnet.is.yolo.money:8233', +] +initial_testnet_peers = [ + 'dnsseed.testnet.z.cash:18233', + 'testnet.seeder.zfnd.org:18233', + 'testnet.is.yolo.money:18233', +] +listen_addr = '0.0.0.0:8233' +network = 'Mainnet' +peerset_initial_target_size = 25 + +[rpc] +debug_force_finished_sync = false +parallel_cpu_threads = 1 + +[state] +cache_dir = 'cache_dir' +delete_old_database = true +ephemeral = false + +[sync] +checkpoint_verify_concurrency_limit = 1000 +download_concurrency_limit = 50 +full_verify_concurrency_limit = 20 +parallel_cpu_threads = 0 + +[tracing] +buffer_limit = 128000 +force_use_color = false +use_color = true +use_journald = false + diff --git a/zebrad/tests/common/sync.rs b/zebrad/tests/common/sync.rs index 5583a6e4..735f6574 100644 --- a/zebrad/tests/common/sync.rs +++ b/zebrad/tests/common/sync.rs @@ -74,7 +74,7 @@ pub const FINISH_PARTIAL_SYNC_TIMEOUT: Duration = Duration::from_secs(11 * 60 * /// The maximum time to wait for Zebrad to synchronize up to the chain tip starting from the /// genesis block. -pub const FINISH_FULL_SYNC_TIMEOUT: Duration = Duration::from_secs(32 * 60 * 60); +pub const FINISH_FULL_SYNC_TIMEOUT: Duration = Duration::from_secs(36 * 60 * 60); /// The test sync height where we switch to using the default lookahead limit. ///