diff --git a/zebrad/src/components/sync.rs b/zebrad/src/components/sync.rs index dac9314a..070e0769 100644 --- a/zebrad/src/components/sync.rs +++ b/zebrad/src/components/sync.rs @@ -1,13 +1,12 @@ -use std::{collections::HashSet, iter, pin::Pin, sync::Arc, time::Duration}; +use std::{collections::HashSet, pin::Pin, sync::Arc, time::Duration}; -use color_eyre::eyre::{eyre, Report, WrapErr}; +use color_eyre::eyre::{eyre, Report}; use futures::{ - future::{FutureExt, TryFutureExt}, + future::FutureExt, stream::{FuturesUnordered, StreamExt}, }; -use tokio::{task::JoinHandle, time::delay_for}; +use tokio::time::delay_for; use tower::{builder::ServiceBuilder, retry::Retry, timeout::Timeout, Service, ServiceExt}; -use tracing_futures::Instrument; use zebra_chain::{ block::{self, Block}, @@ -68,13 +67,6 @@ const MAX_CHECKPOINT_DOWNLOAD_SECONDS: u64 = 300; /// then without a timeout, Zebra would deadlock. const BLOCK_VERIFY_TIMEOUT: Duration = Duration::from_secs(MAX_CHECKPOINT_DOWNLOAD_SECONDS); -/// Controls how long we wait to retry ExtendTips after it fails. -/// -/// This timeout should be long enough to allow some of our peers to clear -/// their connection state. -/// -/// (ObtainTips failures use the sync restart timeout.) -const TIPS_RETRY_TIMEOUT: Duration = Duration::from_secs(60); /// Controls how long we wait to restart syncing after finishing a sync run. /// /// This timeout should be long enough to: @@ -101,7 +93,6 @@ const TIPS_RETRY_TIMEOUT: Duration = Duration::from_secs(60); const SYNC_RESTART_TIMEOUT: Duration = Duration::from_secs(100); type BoxError = Box; -type ReportAndHash = (Report, block::Hash); /// Helps work around defects in the bitcoin protocol by checking whether /// the returned hashes actually extend a chain tip. @@ -124,13 +115,10 @@ where /// Used to perform ObtainTips and ExtendTips requests, with no retry logic /// (failover is handled using fanout). tip_network: Timeout, - /// Used to download blocks, with retry logic. - block_network: Retry>, state: ZS, - verifier: Timeout, prospective_tips: HashSet, - pending_blocks: Pin>>>>, genesis_hash: block::Hash, + downloads: Pin>, Timeout>>>, } /// Polls the network to determine whether further blocks are available and @@ -155,18 +143,18 @@ where /// - verifier: the zebra-consensus verifier that checks the chain pub fn new(chain: Network, peers: ZN, state: ZS, verifier: ZV) -> Self { let tip_network = Timeout::new(peers.clone(), TIPS_RESPONSE_TIMEOUT); - let block_network = ServiceBuilder::new() - .retry(zn::RetryLimit::new(BLOCK_DOWNLOAD_RETRY_LIMIT)) - .timeout(BLOCK_DOWNLOAD_TIMEOUT) - .service(peers); - let verifier = Timeout::new(verifier, BLOCK_VERIFY_TIMEOUT); + let downloads = Downloads::new( + ServiceBuilder::new() + .retry(zn::RetryLimit::new(BLOCK_DOWNLOAD_RETRY_LIMIT)) + .timeout(BLOCK_DOWNLOAD_TIMEOUT) + .service(peers), + Timeout::new(verifier, BLOCK_VERIFY_TIMEOUT), + ); Self { tip_network, - block_network, state, - verifier, + downloads: Box::pin(downloads), prospective_tips: HashSet::new(), - pending_blocks: Box::pin(FuturesUnordered::new()), genesis_hash: genesis_hash(chain), } } @@ -178,61 +166,30 @@ where self.request_genesis().await?; 'sync: loop { - // Update metrics for any ready tasks, before wiping state - while let Some(Some(rsp)) = self.pending_blocks.next().now_or_never() { - match rsp.expect("block download and verify tasks should not panic") { - Ok(hash) => tracing::trace!(?hash, "verified and committed block to state"), - Err((e, _)) => { - tracing::trace!(?e, "sync error before restarting sync, ignoring") - } - } - } - self.update_metrics(); - // Wipe state from prevous iterations. self.prospective_tips = HashSet::new(); - self.pending_blocks = Box::pin(FuturesUnordered::new()); + self.downloads.cancel_all(); self.update_metrics(); tracing::info!("starting sync, obtaining new tips"); if self.obtain_tips().await.is_err() || self.prospective_tips.is_empty() { - self.update_metrics(); tracing::warn!("failed to obtain tips, waiting to restart sync"); delay_for(SYNC_RESTART_TIMEOUT).await; continue 'sync; }; + self.update_metrics(); while !self.prospective_tips.is_empty() { // Check whether any block tasks are currently ready: - while let Some(Some(rsp)) = self.pending_blocks.next().now_or_never() { - match rsp.expect("block download and verify tasks should not panic") { + while let Some(Some(rsp)) = self.downloads.next().now_or_never() { + match rsp { Ok(hash) => { tracing::trace!(?hash, "verified and committed block to state"); } - Err((e, hash)) => { - // We must restart the sync on every error, unless - // this block has already been verified. - // - // If we ignore other errors, the syncer can: - // - get a long way ahead of the state, and queue - // up a lot of unverified blocks in memory, or - // - get into an endless error cycle. - // - // In particular, we must restart if the checkpoint - // verifier has verified a block at this height, but - // the hash is different. In that case, we want to - // stop following an ancient side-chain. - if self.state_contains(hash).await? { - tracing::debug!(?e, - "sync error in ready task, but block is already verified, ignoring"); - } else { - tracing::warn!( - ?e, - "sync error in ready task, waiting to restart sync" - ); - delay_for(SYNC_RESTART_TIMEOUT).await; - continue 'sync; - } + Err(e) => { + tracing::warn!(?e, "waiting to restart sync"); + delay_for(SYNC_RESTART_TIMEOUT).await; + continue 'sync; } } } @@ -242,44 +199,30 @@ where // // Starting to wait is interesting, but logging each wait can be // very verbose. - if self.pending_blocks.len() > LOOKAHEAD_LIMIT { + if self.downloads.in_flight() > LOOKAHEAD_LIMIT { tracing::info!( tips.len = self.prospective_tips.len(), - pending.len = self.pending_blocks.len(), - pending.limit = LOOKAHEAD_LIMIT, + in_flight = self.downloads.in_flight(), + lookahead_limit = LOOKAHEAD_LIMIT, "waiting for pending blocks", ); } - while self.pending_blocks.len() > LOOKAHEAD_LIMIT { + while self.downloads.in_flight() > LOOKAHEAD_LIMIT { tracing::trace!( tips.len = self.prospective_tips.len(), - pending.len = self.pending_blocks.len(), - pending.limit = LOOKAHEAD_LIMIT, - "continuing to wait for pending blocks", + in_flight = self.downloads.in_flight(), + lookahead_limit = LOOKAHEAD_LIMIT, + "waiting for pending blocks", ); - match self - .pending_blocks - .next() - .await - .expect("pending_blocks is nonempty") - .expect("block download and verify tasks should not panic") - { + + match self.downloads.next().await.expect("downloads is nonempty") { Ok(hash) => { tracing::trace!(?hash, "verified and committed block to state"); } - Err((e, hash)) => { - // We must restart the sync on every error, unless - // this block has already been verified. - // See the comment above for details. - if self.state_contains(hash).await? { - tracing::debug!(?e, - "sync error with pending above lookahead limit, but block is already verified, ignoring"); - } else { - tracing::warn!(?e, - "sync error with pending above lookahead limit, waiting to restart sync"); - delay_for(SYNC_RESTART_TIMEOUT).await; - continue 'sync; - } + Err(e) => { + tracing::warn!(?e, "waiting to restart sync"); + delay_for(SYNC_RESTART_TIMEOUT).await; + continue 'sync; } } self.update_metrics(); @@ -288,24 +231,12 @@ where // Once we're below the lookahead limit, we can keep extending the tips. tracing::info!( tips.len = self.prospective_tips.len(), - pending.len = self.pending_blocks.len(), - pending.limit = LOOKAHEAD_LIMIT, + in_flight = self.downloads.in_flight(), + lookahead_limit = LOOKAHEAD_LIMIT, "extending tips", ); - let old_tips = self.prospective_tips.clone(); - let _ = self.extend_tips().await; - // If ExtendTips fails, wait, then give it another shot. - // - // If we don't have many peers, waiting and retrying helps us - // ignore unsolicited BlockHashes from peers. - if self.prospective_tips.is_empty() { - self.update_metrics(); - tracing::info!("no new tips, waiting to retry extend tips"); - delay_for(TIPS_RETRY_TIMEOUT).await; - self.prospective_tips = old_tips; - let _ = self.extend_tips().await; - } + let _ = self.extend_tips().await; self.update_metrics(); } @@ -567,16 +498,13 @@ where // // So we just download and verify the genesis block here. while !self.state_contains(self.genesis_hash).await? { - self.request_blocks(vec![self.genesis_hash]).await?; - match self - .pending_blocks - .next() + self.downloads + .queue_download(self.genesis_hash) .await - .expect("inserted a download and verify request") - .expect("block download and verify tasks should not panic") - { + .map_err(|e| eyre!(e))?; + match self.downloads.next().await.expect("downloads is nonempty") { Ok(hash) => tracing::trace!(?hash, "verified and committed block to state"), - Err((e, _)) => { + Err(e) => { tracing::warn!(?e, "could not download or verify genesis block, retrying") } } @@ -589,65 +517,16 @@ where async fn request_blocks(&mut self, hashes: Vec) -> Result<(), Report> { tracing::debug!(hashes.len = hashes.len(), "requesting blocks"); for hash in hashes.into_iter() { - // Avoid re-downloading blocks that have already been verified. - // This is particularly important for nodes on slow or unreliable - // networks. + // If we've queued the download of a hash behind our current chain tip, + // we've been given bad responses by our peers. Abort the sync and restart. if self.state_contains(hash).await? { - tracing::debug!( - ?hash, - "request_blocks: Unexpected duplicate hash: already in state" - ); - continue; + return Err(eyre!("queued download of hash behind our chain tip")); } - // We construct the block requests sequentially, waiting - // for the peer set to be ready to process each request. This - // ensures that we start block downloads in the order we want them - // (though they may resolve out of order), and it means that we - // respect backpressure. Otherwise, if we waited for readiness and - // did the service call in the spawned tasks, all of the spawned - // tasks would race each other waiting for the network to become - // ready. - let block_req = self - .block_network - .ready_and() + + self.downloads + .queue_download(hash) .await - .map_err(|e| eyre!(e))? - .call(zn::Request::BlocksByHash(iter::once(hash).collect())); - - tracing::trace!(?hash, "requested block"); - - // This span is used to help diagnose sync warnings - let span = tracing::warn_span!("block_fetch_verify", %hash); - let mut verifier = self.verifier.clone(); - let task = tokio::spawn( - async move { - let block = match block_req.await { - Ok(zn::Response::Blocks(blocks)) => blocks - .into_iter() - .next() - .expect("successful response has the block in it"), - Ok(_) => unreachable!("wrong response to block request"), - // Make sure we can distinguish download and verify timeouts - Err(e) => Err(eyre!(e)).wrap_err("failed to download block")?, - }; - metrics::counter!("sync.downloaded.block.count", 1); - - let result = verifier - .ready_and() - .await - .map_err(|e| eyre!(e)) - .wrap_err("verifier service failed to be ready")? - .call(block) - .await - .map_err(|e| eyre!(e)) - .wrap_err("failed to verify block")?; - metrics::counter!("sync.verified.block.count", 1); - Result::::Ok(result) - } - .instrument(span) - .map_err(move |e| (e, hash)), - ); - self.pending_blocks.push(task); + .map_err(|e| eyre!(e))?; } Ok(()) @@ -678,7 +557,10 @@ where "sync.prospective_tips.len", self.prospective_tips.len() as i64 ); - metrics::gauge!("sync.pending_blocks.len", self.pending_blocks.len() as i64); + metrics::gauge!( + "sync.downloads.in_flight", + self.downloads.in_flight() as i64 + ); } } @@ -700,10 +582,6 @@ mod test { "Sync restart should allow for pending and buffered requests to complete" ); - assert!( - TIPS_RETRY_TIMEOUT < BLOCK_VERIFY_TIMEOUT, - "Verify timeout should allow for retrying tips" - ); assert!( SYNC_RESTART_TIMEOUT < BLOCK_VERIFY_TIMEOUT, "Verify timeout should allow for a sync restart" diff --git a/zebrad/src/components/sync/downloads.rs b/zebrad/src/components/sync/downloads.rs index 653c497d..7e4d2d08 100644 --- a/zebrad/src/components/sync/downloads.rs +++ b/zebrad/src/components/sync/downloads.rs @@ -22,6 +22,7 @@ type BoxError = Box; /// Represents a [`Stream`] of download and verification tasks during chain sync. #[pin_project] +#[derive(Debug)] pub struct Downloads where ZN: Service + Send + 'static,