Ignore sync errors when the block is already verified (#980)
* Ignore sync errors when the block is already verified If we get an error for a block that is already in our state, we don't need to restart the sync. It was probably a duplicate download. Also: Process any ready tasks before reset, so the logs and metrics are up to date. (But ignore the errors, because we're about to reset.) Improve sync logging and metrics during the download and verify task. * Remove duplicate hashes in logs Co-authored-by: Jane Lusby <jlusby42@gmail.com> * Log the sync hash span at warn level Co-authored-by: Jane Lusby <jlusby42@gmail.com>
This commit is contained in:
parent
437549d8e9
commit
48497d4857
|
|
@ -1,7 +1,7 @@
|
||||||
use std::{collections::HashSet, iter, pin::Pin, sync::Arc, time::Duration};
|
use std::{collections::HashSet, iter, pin::Pin, sync::Arc, time::Duration};
|
||||||
|
|
||||||
use color_eyre::eyre::{eyre, Report};
|
use color_eyre::eyre::{eyre, Report, WrapErr};
|
||||||
use futures::future::FutureExt;
|
use futures::future::{FutureExt, TryFutureExt};
|
||||||
use futures::stream::{FuturesUnordered, StreamExt};
|
use futures::stream::{FuturesUnordered, StreamExt};
|
||||||
use tokio::{task::JoinHandle, time::delay_for};
|
use tokio::{task::JoinHandle, time::delay_for};
|
||||||
use tower::{builder::ServiceBuilder, retry::Retry, timeout::Timeout, Service, ServiceExt};
|
use tower::{builder::ServiceBuilder, retry::Retry, timeout::Timeout, Service, ServiceExt};
|
||||||
|
|
@ -52,7 +52,7 @@ where
|
||||||
state: ZS,
|
state: ZS,
|
||||||
verifier: ZV,
|
verifier: ZV,
|
||||||
prospective_tips: HashSet<CheckedTip>,
|
prospective_tips: HashSet<CheckedTip>,
|
||||||
pending_blocks: Pin<Box<FuturesUnordered<JoinHandle<Result<block::Hash, Error>>>>>,
|
pending_blocks: Pin<Box<FuturesUnordered<JoinHandle<Result<block::Hash, ReportAndHash>>>>>,
|
||||||
genesis_hash: block::Hash,
|
genesis_hash: block::Hash,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -93,6 +93,17 @@ where
|
||||||
self.request_genesis().await?;
|
self.request_genesis().await?;
|
||||||
|
|
||||||
'sync: loop {
|
'sync: loop {
|
||||||
|
// Update metrics for any ready tasks, before wiping state
|
||||||
|
while let Some(Some(rsp)) = self.pending_blocks.next().now_or_never() {
|
||||||
|
match rsp.expect("block download and verify tasks should not panic") {
|
||||||
|
Ok(hash) => tracing::trace!(?hash, "verified and committed block to state"),
|
||||||
|
Err((e, _)) => {
|
||||||
|
tracing::trace!(?e, "sync error before restarting sync, ignoring")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.update_metrics();
|
||||||
|
|
||||||
// Wipe state from prevous iterations.
|
// Wipe state from prevous iterations.
|
||||||
self.prospective_tips = HashSet::new();
|
self.prospective_tips = HashSet::new();
|
||||||
self.pending_blocks = Box::pin(FuturesUnordered::new());
|
self.pending_blocks = Box::pin(FuturesUnordered::new());
|
||||||
|
|
@ -108,11 +119,34 @@ where
|
||||||
while !self.prospective_tips.is_empty() {
|
while !self.prospective_tips.is_empty() {
|
||||||
// Check whether any block tasks are currently ready:
|
// Check whether any block tasks are currently ready:
|
||||||
while let Some(Some(rsp)) = self.pending_blocks.next().now_or_never() {
|
while let Some(Some(rsp)) = self.pending_blocks.next().now_or_never() {
|
||||||
match rsp.expect("block download tasks should not panic") {
|
match rsp.expect("block download and verify tasks should not panic") {
|
||||||
Ok(hash) => tracing::debug!(?hash, "verified and committed block to state"),
|
Ok(hash) => {
|
||||||
Err(e) => {
|
tracing::trace!(?hash, "verified and committed block to state");
|
||||||
tracing::info!(?e, "restarting sync");
|
}
|
||||||
continue 'sync;
|
Err((e, hash)) => {
|
||||||
|
// We must restart the sync on every error, unless
|
||||||
|
// this block has already been verified.
|
||||||
|
//
|
||||||
|
// If we ignore other errors, the syncer can:
|
||||||
|
// - get a long way ahead of the state, and queue
|
||||||
|
// up a lot of unverified blocks in memory, or
|
||||||
|
// - get into an endless error cycle.
|
||||||
|
//
|
||||||
|
// In particular, we must restart if the checkpoint
|
||||||
|
// verifier has verified a block at this height, but
|
||||||
|
// the hash is different. In that case, we want to
|
||||||
|
// stop following an ancient side-chain.
|
||||||
|
if self.state_contains(hash).await? {
|
||||||
|
tracing::debug!(?e,
|
||||||
|
"sync error in ready task, but block is already verified, ignoring");
|
||||||
|
} else {
|
||||||
|
tracing::warn!(
|
||||||
|
?e,
|
||||||
|
"sync error in ready task, waiting to restart sync"
|
||||||
|
);
|
||||||
|
delay_for(SYNC_RESTART_TIMEOUT).await;
|
||||||
|
continue 'sync;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.update_metrics();
|
self.update_metrics();
|
||||||
|
|
@ -133,10 +167,22 @@ where
|
||||||
.expect("pending_blocks is nonempty")
|
.expect("pending_blocks is nonempty")
|
||||||
.expect("block download tasks should not panic")
|
.expect("block download tasks should not panic")
|
||||||
{
|
{
|
||||||
Ok(hash) => tracing::debug!(?hash, "verified and committed block to state"),
|
Ok(hash) => {
|
||||||
Err(e) => {
|
tracing::trace!(?hash, "verified and committed block to state");
|
||||||
tracing::info!(?e, "restarting sync");
|
}
|
||||||
continue 'sync;
|
Err((e, hash)) => {
|
||||||
|
// We must restart the sync on every error, unless
|
||||||
|
// this block has already been verified.
|
||||||
|
// See the comment above for details.
|
||||||
|
if self.state_contains(hash).await? {
|
||||||
|
tracing::debug!(?e,
|
||||||
|
"sync error with pending above lookahead limit, but block is already verified, ignoring");
|
||||||
|
} else {
|
||||||
|
tracing::warn!(?e,
|
||||||
|
"sync error with pending above lookahead limit, waiting to restart sync");
|
||||||
|
delay_for(SYNC_RESTART_TIMEOUT).await;
|
||||||
|
continue 'sync;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
@ -420,8 +466,10 @@ where
|
||||||
.expect("inserted a download and verify request")
|
.expect("inserted a download and verify request")
|
||||||
.expect("block download and verify tasks should not panic")
|
.expect("block download and verify tasks should not panic")
|
||||||
{
|
{
|
||||||
Ok(hash) => tracing::debug!(?hash, "verified and committed block to state"),
|
Ok(hash) => tracing::trace!(?hash, "verified and committed block to state"),
|
||||||
Err(e) => tracing::warn!(?e, "could not download genesis block, retrying"),
|
Err((e, _)) => {
|
||||||
|
tracing::warn!(?e, "could not download or verify genesis block, retrying")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -459,7 +507,8 @@ where
|
||||||
|
|
||||||
tracing::trace!(?hash, "requested block");
|
tracing::trace!(?hash, "requested block");
|
||||||
|
|
||||||
let span = tracing::info_span!("block_fetch_verify", ?hash);
|
// This span is used to help diagnose sync warnings
|
||||||
|
let span = tracing::warn_span!("block_fetch_verify", ?hash);
|
||||||
let mut verifier = self.verifier.clone();
|
let mut verifier = self.verifier.clone();
|
||||||
let task = tokio::spawn(
|
let task = tokio::spawn(
|
||||||
async move {
|
async move {
|
||||||
|
|
@ -469,15 +518,25 @@ where
|
||||||
.next()
|
.next()
|
||||||
.expect("successful response has the block in it"),
|
.expect("successful response has the block in it"),
|
||||||
Ok(_) => unreachable!("wrong response to block request"),
|
Ok(_) => unreachable!("wrong response to block request"),
|
||||||
Err(e) => return Err(e),
|
// Make sure we can distinguish download and verify timeouts
|
||||||
|
Err(e) => Err(eyre!(e)).wrap_err("failed to download block")?,
|
||||||
};
|
};
|
||||||
metrics::counter!("sync.downloaded_blocks", 1);
|
metrics::counter!("sync.downloaded.block.count", 1);
|
||||||
|
|
||||||
let result = verifier.ready_and().await?.call(block).await;
|
let result = verifier
|
||||||
metrics::counter!("sync.verified_blocks", 1);
|
.ready_and()
|
||||||
result
|
.await
|
||||||
|
.map_err(|e| eyre!(e))
|
||||||
|
.wrap_err("verifier service failed to be ready")?
|
||||||
|
.call(block)
|
||||||
|
.await
|
||||||
|
.map_err(|e| eyre!(e))
|
||||||
|
.wrap_err("failed to verify block")?;
|
||||||
|
metrics::counter!("sync.verified.block.count", 1);
|
||||||
|
Result::<block::Hash, Report>::Ok(result)
|
||||||
}
|
}
|
||||||
.instrument(span),
|
.instrument(span)
|
||||||
|
.map_err(move |e| (e, hash)),
|
||||||
);
|
);
|
||||||
self.pending_blocks.push(task);
|
self.pending_blocks.push(task);
|
||||||
}
|
}
|
||||||
|
|
@ -515,3 +574,4 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
|
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
|
||||||
|
type ReportAndHash = (Report, block::Hash);
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue