diff --git a/zebrad/src/commands/start/sync.rs b/zebrad/src/commands/start/sync.rs index e0d2436c..188476b4 100644 --- a/zebrad/src/commands/start/sync.rs +++ b/zebrad/src/commands/start/sync.rs @@ -1,7 +1,7 @@ use std::{collections::HashSet, iter, pin::Pin, sync::Arc, time::Duration}; -use color_eyre::eyre::{eyre, Report}; -use futures::future::FutureExt; +use color_eyre::eyre::{eyre, Report, WrapErr}; +use futures::future::{FutureExt, TryFutureExt}; use futures::stream::{FuturesUnordered, StreamExt}; use tokio::{task::JoinHandle, time::delay_for}; use tower::{builder::ServiceBuilder, retry::Retry, timeout::Timeout, Service, ServiceExt}; @@ -52,7 +52,7 @@ where state: ZS, verifier: ZV, prospective_tips: HashSet, - pending_blocks: Pin>>>>, + pending_blocks: Pin>>>>, genesis_hash: block::Hash, } @@ -93,6 +93,17 @@ where self.request_genesis().await?; 'sync: loop { + // Update metrics for any ready tasks, before wiping state + while let Some(Some(rsp)) = self.pending_blocks.next().now_or_never() { + match rsp.expect("block download and verify tasks should not panic") { + Ok(hash) => tracing::trace!(?hash, "verified and committed block to state"), + Err((e, _)) => { + tracing::trace!(?e, "sync error before restarting sync, ignoring") + } + } + } + self.update_metrics(); + // Wipe state from prevous iterations. self.prospective_tips = HashSet::new(); self.pending_blocks = Box::pin(FuturesUnordered::new()); @@ -108,11 +119,34 @@ where while !self.prospective_tips.is_empty() { // Check whether any block tasks are currently ready: while let Some(Some(rsp)) = self.pending_blocks.next().now_or_never() { - match rsp.expect("block download tasks should not panic") { - Ok(hash) => tracing::debug!(?hash, "verified and committed block to state"), - Err(e) => { - tracing::info!(?e, "restarting sync"); - continue 'sync; + match rsp.expect("block download and verify tasks should not panic") { + Ok(hash) => { + tracing::trace!(?hash, "verified and committed block to state"); + } + Err((e, hash)) => { + // We must restart the sync on every error, unless + // this block has already been verified. + // + // If we ignore other errors, the syncer can: + // - get a long way ahead of the state, and queue + // up a lot of unverified blocks in memory, or + // - get into an endless error cycle. + // + // In particular, we must restart if the checkpoint + // verifier has verified a block at this height, but + // the hash is different. In that case, we want to + // stop following an ancient side-chain. + if self.state_contains(hash).await? { + tracing::debug!(?e, + "sync error in ready task, but block is already verified, ignoring"); + } else { + tracing::warn!( + ?e, + "sync error in ready task, waiting to restart sync" + ); + delay_for(SYNC_RESTART_TIMEOUT).await; + continue 'sync; + } } } self.update_metrics(); @@ -133,10 +167,22 @@ where .expect("pending_blocks is nonempty") .expect("block download tasks should not panic") { - Ok(hash) => tracing::debug!(?hash, "verified and committed block to state"), - Err(e) => { - tracing::info!(?e, "restarting sync"); - continue 'sync; + Ok(hash) => { + tracing::trace!(?hash, "verified and committed block to state"); + } + Err((e, hash)) => { + // We must restart the sync on every error, unless + // this block has already been verified. + // See the comment above for details. + if self.state_contains(hash).await? { + tracing::debug!(?e, + "sync error with pending above lookahead limit, but block is already verified, ignoring"); + } else { + tracing::warn!(?e, + "sync error with pending above lookahead limit, waiting to restart sync"); + delay_for(SYNC_RESTART_TIMEOUT).await; + continue 'sync; + } } } } else { @@ -420,8 +466,10 @@ where .expect("inserted a download and verify request") .expect("block download and verify tasks should not panic") { - Ok(hash) => tracing::debug!(?hash, "verified and committed block to state"), - Err(e) => tracing::warn!(?e, "could not download genesis block, retrying"), + Ok(hash) => tracing::trace!(?hash, "verified and committed block to state"), + Err((e, _)) => { + tracing::warn!(?e, "could not download or verify genesis block, retrying") + } } } @@ -459,7 +507,8 @@ where tracing::trace!(?hash, "requested block"); - let span = tracing::info_span!("block_fetch_verify", ?hash); + // This span is used to help diagnose sync warnings + let span = tracing::warn_span!("block_fetch_verify", ?hash); let mut verifier = self.verifier.clone(); let task = tokio::spawn( async move { @@ -469,15 +518,25 @@ where .next() .expect("successful response has the block in it"), Ok(_) => unreachable!("wrong response to block request"), - Err(e) => return Err(e), + // Make sure we can distinguish download and verify timeouts + Err(e) => Err(eyre!(e)).wrap_err("failed to download block")?, }; - metrics::counter!("sync.downloaded_blocks", 1); + metrics::counter!("sync.downloaded.block.count", 1); - let result = verifier.ready_and().await?.call(block).await; - metrics::counter!("sync.verified_blocks", 1); - result + let result = verifier + .ready_and() + .await + .map_err(|e| eyre!(e)) + .wrap_err("verifier service failed to be ready")? + .call(block) + .await + .map_err(|e| eyre!(e)) + .wrap_err("failed to verify block")?; + metrics::counter!("sync.verified.block.count", 1); + Result::::Ok(result) } - .instrument(span), + .instrument(span) + .map_err(move |e| (e, hash)), ); self.pending_blocks.push(task); } @@ -515,3 +574,4 @@ where } type Error = Box; +type ReportAndHash = (Report, block::Hash);