diff --git a/zebra-consensus/src/checkpoint.rs b/zebra-consensus/src/checkpoint.rs index 8f3ff5ae..2ebf29b8 100644 --- a/zebra-consensus/src/checkpoint.rs +++ b/zebra-consensus/src/checkpoint.rs @@ -412,6 +412,18 @@ impl CheckpointVerifier { .entry(height) .or_insert_with(|| QueuedBlockList::with_capacity(1)); + let hash = block.hash(); + + for qb in qblocks.iter_mut() { + if qb.hash == hash { + let old_tx = std::mem::replace(&mut qb.tx, tx); + let e = "rejected older of duplicate verification requests".into(); + tracing::debug!(?e); + let _ = old_tx.send(Err(e)); + return rx; + } + } + // Memory DoS resistance: limit the queued blocks at each height if qblocks.len() >= MAX_QUEUED_BLOCKS_PER_HEIGHT { let e = "too many queued blocks at this height".into(); @@ -421,7 +433,6 @@ impl CheckpointVerifier { } // Add the block to the list of queued blocks at this height - let hash = block.as_ref().into(); let new_qblock = QueuedBlock { block, hash, tx }; // This is a no-op for the first block in each QueuedBlockList. qblocks.reserve_exact(1); diff --git a/zebra-network/src/policies.rs b/zebra-network/src/policies.rs index 2e2fdf05..98df5db5 100644 --- a/zebra-network/src/policies.rs +++ b/zebra-network/src/policies.rs @@ -18,11 +18,12 @@ impl RetryLimit { } } -impl Policy for RetryLimit { +impl Policy for RetryLimit { type Future = future::Ready; - fn retry(&self, _: &Req, result: Result<&Res, &E>) -> Option { - if result.is_err() { + fn retry(&self, req: &Req, result: Result<&Res, &E>) -> Option { + if let Err(e) = result { if self.remaining_tries > 0 { + tracing::debug!(?req, ?e, remaining_tries = self.remaining_tries, "retrying"); Some(future::ready(RetryLimit { remaining_tries: self.remaining_tries - 1, })) diff --git a/zebra-state/src/lib.rs b/zebra-state/src/lib.rs index a470d25a..105c00d1 100644 --- a/zebra-state/src/lib.rs +++ b/zebra-state/src/lib.rs @@ -68,6 +68,9 @@ pub struct Config { /// | Windows | `{FOLDERID_LocalAppData}\zebra` | C:\Users\Alice\AppData\Local\zebra | /// | Other | `std::env::current_dir()/cache` | | pub cache_dir: PathBuf, + + /// The maximum number of bytes to use caching data in memory. + pub memory_cache_bytes: u64, } impl Config { @@ -80,7 +83,10 @@ impl Config { }; let path = self.cache_dir.join(net_dir).join("state"); - sled::Config::default().path(path) + sled::Config::default() + .path(path) + .cache_capacity(self.memory_cache_bytes) + .mode(sled::Mode::LowSpace) } } @@ -89,7 +95,10 @@ impl Default for Config { let cache_dir = dirs::cache_dir() .unwrap_or_else(|| std::env::current_dir().unwrap().join("cache")) .join("zebra"); - Self { cache_dir } + Self { + cache_dir, + memory_cache_bytes: 512 * 1024 * 1024, + } } } diff --git a/zebra-state/tests/basic.rs b/zebra-state/tests/basic.rs index 8d2e3d75..95334f7c 100644 --- a/zebra-state/tests/basic.rs +++ b/zebra-state/tests/basic.rs @@ -123,7 +123,13 @@ async fn check_transcripts(network: Network) -> Result<(), Report> { let storage_guard = TempDir::new("")?; let cache_dir = storage_guard.path().to_owned(); - let service = on_disk::init(Config { cache_dir }, network); + let service = on_disk::init( + Config { + cache_dir, + ..Config::default() + }, + network, + ); let transcript = Transcript::from(transcript_data.iter().cloned()); /// SPANDOC: check the on disk service against the transcript transcript.check(service).await?; diff --git a/zebra-test/src/command.rs b/zebra-test/src/command.rs index f0cffb19..99eea9e2 100644 --- a/zebra-test/src/command.rs +++ b/zebra-test/src/command.rs @@ -20,7 +20,7 @@ pub fn test_cmd(path: &str) -> Result<(Command, impl Drop)> { fs::File::create(dir.path().join("zebrad.toml"))?.write_all( format!( - "[state]\ncache_dir = '{}'", + "[state]\ncache_dir = '{}'\nmemory_cache_bytes = 256000000", cache_dir .into_os_string() .into_string() diff --git a/zebrad/src/commands/start/sync.rs b/zebrad/src/commands/start/sync.rs index 2c56e445..8b5fab03 100644 --- a/zebrad/src/commands/start/sync.rs +++ b/zebrad/src/commands/start/sync.rs @@ -1,10 +1,11 @@ use std::{collections::HashSet, iter, pin::Pin, sync::Arc, time::Duration}; use color_eyre::eyre::{eyre, Report}; +use futures::future::FutureExt; use futures::stream::{FuturesUnordered, StreamExt}; use tokio::{task::JoinHandle, time::delay_for}; -use tower::{retry::Retry, Service, ServiceExt}; -use tracing_futures::{Instrument, Instrumented}; +use tower::{builder::ServiceBuilder, retry::Retry, timeout::Timeout, Service, ServiceExt}; +use tracing_futures::Instrument; use zebra_chain::{ block::{Block, BlockHeaderHash}, @@ -20,7 +21,19 @@ const FANOUT: usize = checkpoint::MAX_QUEUED_BLOCKS_PER_HEIGHT; /// Controls how far ahead of the chain tip the syncer tries to download before /// waiting for queued verifications to complete. Set to twice the maximum /// checkpoint distance. -pub const LOOKAHEAD_LIMIT: usize = checkpoint::MAX_CHECKPOINT_HEIGHT_GAP * 2; +const LOOKAHEAD_LIMIT: usize = checkpoint::MAX_CHECKPOINT_HEIGHT_GAP * 2; +/// Controls how long we wait for a block download request to complete. +const BLOCK_TIMEOUT: Duration = Duration::from_secs(6); +/// Controls how long we wait to restart syncing after finishing a sync run. +const SYNC_RESTART_TIMEOUT: Duration = Duration::from_secs(20); + +/// Helps work around defects in the bitcoin protocol by checking whether +/// the returned hashes actually extend a chain tip. +#[derive(Debug, Hash, PartialEq, Eq)] +struct CheckedTip { + tip: BlockHeaderHash, + expected_next: BlockHeaderHash, +} #[derive(Debug)] pub struct Syncer @@ -35,12 +48,11 @@ where /// Used to perform extendtips requests, with no retry logic (failover is handled using fanout). tip_network: ZN, /// Used to download blocks, with retry logic. - block_network: Retry, + block_network: Retry>, state: ZS, verifier: ZV, - prospective_tips: HashSet, - pending_blocks: - Pin>>>>>, + prospective_tips: HashSet, + pending_blocks: Pin>>>>, genesis_hash: BlockHeaderHash, } @@ -59,10 +71,13 @@ where /// - state: the zebra-state that stores the chain /// - verifier: the zebra-consensus verifier that checks the chain pub fn new(chain: Network, peers: ZN, state: ZS, verifier: ZV) -> Self { - let retry_peers = Retry::new(RetryLimit::new(10), peers.clone()); + let block_network = ServiceBuilder::new() + .retry(RetryLimit::new(3)) + .timeout(BLOCK_TIMEOUT) + .service(peers.clone()); Self { tip_network: peers, - block_network: retry_peers, + block_network, state, verifier, prospective_tips: HashSet::new(), @@ -77,50 +92,68 @@ where // due to protocol limitations self.request_genesis().await?; - loop { - self.obtain_tips().await?; + 'sync: loop { + // Wipe state from prevous iterations. + self.prospective_tips = HashSet::new(); + self.pending_blocks = Box::pin(FuturesUnordered::new()); + + tracing::info!("starting sync, obtaining new tips"); + if self.obtain_tips().await.is_err() { + tracing::warn!("failed to obtain tips, waiting to restart sync"); + delay_for(SYNC_RESTART_TIMEOUT).await; + continue 'sync; + }; self.update_metrics(); - // ObtainTips Step 6 - // - // If there are any prospective tips, call ExtendTips. - // Continue this step until there are no more prospective tips. while !self.prospective_tips.is_empty() { - tracing::debug!("extending prospective tips"); + // Check whether any block tasks are currently ready: + while let Some(Some(rsp)) = self.pending_blocks.next().now_or_never() { + match rsp.expect("block download tasks should not panic") { + Ok(hash) => tracing::debug!(?hash, "verified and committed block to state"), + Err(e) => { + tracing::info!(?e, "restarting sync"); + continue 'sync; + } + } + self.update_metrics(); + } - self.extend_tips().await?; - self.update_metrics(); - - // Check whether we need to wait for existing block download tasks to finish - while self.pending_blocks.len() > LOOKAHEAD_LIMIT { + // If we have too many pending tasks, wait for one to finish: + if self.pending_blocks.len() > LOOKAHEAD_LIMIT { + tracing::debug!( + tips.len = self.prospective_tips.len(), + pending.len = self.pending_blocks.len(), + pending.limit = LOOKAHEAD_LIMIT, + "waiting for pending blocks", + ); match self .pending_blocks .next() .await - .expect("already checked there's at least one pending block task") + .expect("pending_blocks is nonempty") .expect("block download tasks should not panic") { Ok(hash) => tracing::debug!(?hash, "verified and committed block to state"), - // This is a non-transient error indicating either that - // we've repeatedly missed a block we need or that we've - // repeatedly missed a bad block suggested by a peer - // feeding us bad hashes. - // - // TODO(hdevalence): handle interruptions in the chain - // sync process. this should detect when we've stopped - // making progress (probably using a timeout), then - // continue the loop with a new invocation of - // obtain_tips(), which will restart block downloads. - Err(e) => tracing::error!(?e, "potentially transient error"), - }; + Err(e) => { + tracing::info!(?e, "restarting sync"); + continue 'sync; + } + } + } else { + // Otherwise, we can keep extending the tips. + tracing::info!( + tips.len = self.prospective_tips.len(), + pending.len = self.pending_blocks.len(), + pending.limit = LOOKAHEAD_LIMIT, + "extending tips", + ); + let _ = self.extend_tips().await; } - - // We just added a bunch of failures, update the metrics now, - // because we might be about to reset or delay. self.update_metrics(); } - delay_for(Duration::from_secs(15)).await; + tracing::info!("exhausted tips, waiting to restart sync"); + delay_for(SYNC_RESTART_TIMEOUT).await; } } @@ -128,10 +161,6 @@ where /// multiple peers #[instrument(skip(self))] async fn obtain_tips(&mut self) -> Result<(), Report> { - // ObtainTips Step 1 - // - // Query the current state to construct the sequence of hashes: handled by - // the caller let block_locator = self .state .ready_and() @@ -151,10 +180,6 @@ where tracing::debug!(?block_locator, "trying to obtain new chain tips"); - // ObtainTips Step 2 - // - // Make a FindBlocksByHash request to the network F times, where F is a - // fanout parameter, to get resp1, ..., respF let mut requests = FuturesUnordered::new(); for _ in 0..FANOUT { requests.push( @@ -173,19 +198,6 @@ where while let Some(res) = requests.next().await { match res.map_err::(|e| eyre!(e)) { Ok(zn::Response::BlockHeaderHashes(hashes)) => { - if hashes.is_empty() { - tracing::debug!("skipping empty response"); - continue; - } else { - tracing::debug!(hashes.len = hashes.len(), "processing response"); - } - - // ObtainTips Step 3 - // - // For each response, starting from the beginning of the - // list, prune any block hashes already included in the - // state, stopping at the first unknown hash to get resp1', - // ..., respF'. (These lists may be empty). let mut first_unknown = None; for (i, &hash) in hashes.iter().enumerate() { if !self.state_contains(hash).await? { @@ -194,41 +206,37 @@ where } } - // Hashes will be empty if we know about all the blocks in the response. - if first_unknown.is_none() { - tracing::debug!("ObtainTips: all hashes are known"); + tracing::debug!(hashes.len = ?hashes.len(), ?first_unknown); + + let unknown_hashes = if let Some(index) = first_unknown { + &hashes[index..] + } else { continue; - } - let first_unknown = first_unknown.expect("already checked for None"); - tracing::debug!( - first_unknown, - "found index of first unknown hash in response" - ); + }; - let unknown_hashes = &hashes[first_unknown..]; - let new_tip = *unknown_hashes - .last() - .expect("already checked that unknown hashes isn't empty"); + tracing::trace!(?unknown_hashes); - // ObtainTips Step 4: - // Combine the last elements of each list into a set; this is the - // set of prospective tips. - if !download_set.contains(&new_tip) { - tracing::debug!(hashes.len = ?hashes.len(), ?new_tip, "adding new prospective tip"); + let new_tip = if let Some(end) = unknown_hashes.rchunks_exact(2).next() { + CheckedTip { + tip: end[0], + expected_next: end[1], + } + } else { + tracing::debug!("discarding response that extends only one block"); + continue; + }; + + if !download_set.contains(&new_tip.expected_next) { + tracing::debug!(?new_tip, "adding new prospective tip"); self.prospective_tips.insert(new_tip); } else { tracing::debug!(?new_tip, "discarding tip already queued for download"); } - // ObtainTips Step 5.1 - // - // Combine all elements of each list into a set let prev_download_len = download_set.len(); download_set.extend(unknown_hashes); let new_download_len = download_set.len(); tracing::debug!( - prev_download_len, - new_download_len, new_hashes = new_download_len - prev_download_len, "added hashes to download set" ); @@ -239,11 +247,8 @@ where } } - tracing::debug!(?self.prospective_tips, "ObtainTips: downloading blocks for tips"); + tracing::debug!(?self.prospective_tips); - // ObtainTips Step 5.2 - // - // queue download and verification of those blocks. self.request_blocks(download_set.into_iter().collect()) .await?; @@ -252,18 +257,11 @@ where #[instrument(skip(self))] async fn extend_tips(&mut self) -> Result<(), Report> { - // Extend Tips 1 - // - // remove all prospective tips and iterate over them individually let tips = std::mem::take(&mut self.prospective_tips); - tracing::debug!(?tips, "extending tip set"); let mut download_set = HashSet::new(); for tip in tips { - // ExtendTips Step 2 - // - // Create a FindBlocksByHash request consisting of just the - // prospective tip. Send this request to the network F times + tracing::debug!(?tip, "extending tip"); let mut responses = FuturesUnordered::new(); for _ in 0..FANOUT { responses.push( @@ -272,78 +270,54 @@ where .await .map_err(|e| eyre!(e))? .call(zn::Request::FindBlocks { - known_blocks: vec![tip], + known_blocks: vec![tip.tip], stop: None, }), ); } while let Some(res) = responses.next().await { match res.map_err::(|e| eyre!(e)) { - Ok(zn::Response::BlockHeaderHashes(mut hashes)) => { - // ExtendTips Step 3 - // - // For each response, check whether the first hash in the - // response is the genesis block; if so, discard the response. - // It indicates that the remote peer does not have any blocks - // following the prospective tip. - match (hashes.first(), hashes.len()) { - (None, _) => { - tracing::debug!("ExtendTips: skipping empty response"); - continue; - } - (_, 1) => { - tracing::debug!("ExtendTips: skipping length-1 response, in case it's an unsolicited inv message"); - continue; - } - (Some(hash), _) if (hash == &self.genesis_hash) => { - tracing::debug!( - "ExtendTips: skipping response, peer could not extend the tip" - ); - continue; - } - (Some(&hash), _) => { - // Check for hashes we've already seen. - // This happens a lot near the end of the chain. - // This check reduces the number of duplicate - // blocks, but it is not required for - // correctness. - if self.state_contains(hash).await? { - tracing::debug!( - ?hash, - "ExtendTips: skipping response, peer returned a duplicate hash: already in state" - ); - continue; - } - } - } + Ok(zn::Response::BlockHeaderHashes(hashes)) => { + tracing::debug!(first = ?hashes.first(), len = ?hashes.len()); - let new_tip = hashes.pop().expect("expected: hashes must have len > 0"); + let unknown_hashes = match hashes.split_first() { + None => continue, + Some((expected_hash, rest)) if expected_hash == &tip.expected_next => { + rest + } + Some((other_hash, _rest)) => { + tracing::debug!(?other_hash, ?tip.expected_next, "discarding response with unexpected next hash"); + continue; + } + }; - // Check for tips we've already seen - // TODO: remove this check once the sync service is more reliable - if self.state_contains(new_tip).await? { - tracing::debug!( - ?new_tip, - "ExtendTips: Unexpected duplicate tip from peer: already in state" - ); + tracing::trace!(?unknown_hashes); + + let new_tip = if let Some(end) = unknown_hashes.rchunks_exact(2).next() { + CheckedTip { + tip: end[0], + expected_next: end[1], + } + } else { + tracing::debug!("discarding response that extends only one block"); continue; - } + }; - // ExtendTips Step 4 - // - // Combine the last elements of the remaining responses into - // a set, and add this set to the set of prospective tips. - tracing::debug!(hashes.len = ?hashes.len(), ?new_tip, "ExtendTips: extending to new tip"); - let _ = self.prospective_tips.insert(new_tip); + tracing::trace!(?hashes); + + if !download_set.contains(&new_tip.expected_next) { + tracing::debug!(?new_tip, "adding new prospective tip"); + self.prospective_tips.insert(new_tip); + } else { + tracing::debug!(?new_tip, "discarding tip already queued for download"); + } let prev_download_len = download_set.len(); - download_set.extend(hashes); + download_set.extend(unknown_hashes); let new_download_len = download_set.len(); tracing::debug!( - prev_download_len, - new_download_len, new_hashes = new_download_len - prev_download_len, - "ExtendTips: added hashes to download set" + "added hashes to download set" ); } Ok(_) => unreachable!("network returned wrong response"), @@ -353,25 +327,8 @@ where } } - // ExtendTips Step ?? - // - // Remove tips that are already included behind one of the other - // returned tips - self.prospective_tips - .retain(|tip| !download_set.contains(tip)); - tracing::debug!(?self.prospective_tips, "ExtendTips: downloading blocks for tips"); - - // ExtendTips Step 5 - // - // Combine all elements of the remaining responses into a - // set, and queue download and verification of those blocks - self.request_blocks( - download_set - .into_iter() - .chain(self.prospective_tips.iter().cloned()) - .collect(), - ) - .await?; + self.request_blocks(download_set.into_iter().collect()) + .await?; Ok(()) } @@ -387,8 +344,18 @@ where // - the genesis hash is used as a placeholder for "no matches". // // So we just queue the genesis block here. - if !self.state_contains(self.genesis_hash).await? { + while !self.state_contains(self.genesis_hash).await? { self.request_blocks(vec![self.genesis_hash]).await?; + match self + .pending_blocks + .next() + .await + .expect("inserted a download request") + .expect("block download tasks should not panic") + { + Ok(hash) => tracing::debug!(?hash, "verified and committed block to state"), + Err(e) => tracing::warn!(?e, "could not download genesis block, retrying"), + } } Ok(()) @@ -420,24 +387,29 @@ where .await .map_err(|e| eyre!(e))? .call(zn::Request::BlocksByHash(iter::once(hash).collect())); + + tracing::debug!(?hash, "requested block"); + let span = tracing::info_span!("block_fetch_verify", ?hash); let mut verifier = self.verifier.clone(); - let task = tokio::spawn(async move { - let block = match block_req.await { - Ok(zn::Response::Blocks(blocks)) => blocks - .into_iter() - .next() - .expect("successful response has the block in it"), - Ok(_) => unreachable!("wrong response to block request"), - Err(e) => return Err(e), - }; - metrics::counter!("sync.downloaded_blocks", 1); + let task = tokio::spawn( + async move { + let block = match block_req.await { + Ok(zn::Response::Blocks(blocks)) => blocks + .into_iter() + .next() + .expect("successful response has the block in it"), + Ok(_) => unreachable!("wrong response to block request"), + Err(e) => return Err(e), + }; + metrics::counter!("sync.downloaded_blocks", 1); - let result = verifier.ready_and().await?.call(block).await; - metrics::counter!("sync.verified_blocks", 1); - result - }) - .instrument(span); + let result = verifier.ready_and().await?.call(block).await; + metrics::counter!("sync.verified_blocks", 1); + result + } + .instrument(span), + ); self.pending_blocks.push(task); } @@ -467,18 +439,12 @@ where } } - /// Update metrics gauges, and create a trace containing metrics. fn update_metrics(&self) { metrics::gauge!( "sync.prospective_tips.len", self.prospective_tips.len() as i64 ); metrics::gauge!("sync.pending_blocks.len", self.pending_blocks.len() as i64); - tracing::info!( - tips.len = self.prospective_tips.len(), - pending.len = self.pending_blocks.len(), - pending.limit = LOOKAHEAD_LIMIT, - ); } }