zebrad: use Downloads in sync
Try to use the better cancellation logic to revert to previous sync algorithm. As designed, the sync algorithm is supposed to proceed by downloading state prospectively and handle errors by flushing the pipeline and starting over. This hasn't worked well, because we didn't previously cancel tasks properly. Now that we can, try to use something in the spirit of the original sync algorithm.
This commit is contained in:
parent
b90581a3d7
commit
5f229d1475
|
|
@ -1,13 +1,12 @@
|
|||
use std::{collections::HashSet, iter, pin::Pin, sync::Arc, time::Duration};
|
||||
use std::{collections::HashSet, pin::Pin, sync::Arc, time::Duration};
|
||||
|
||||
use color_eyre::eyre::{eyre, Report, WrapErr};
|
||||
use color_eyre::eyre::{eyre, Report};
|
||||
use futures::{
|
||||
future::{FutureExt, TryFutureExt},
|
||||
future::FutureExt,
|
||||
stream::{FuturesUnordered, StreamExt},
|
||||
};
|
||||
use tokio::{task::JoinHandle, time::delay_for};
|
||||
use tokio::time::delay_for;
|
||||
use tower::{builder::ServiceBuilder, retry::Retry, timeout::Timeout, Service, ServiceExt};
|
||||
use tracing_futures::Instrument;
|
||||
|
||||
use zebra_chain::{
|
||||
block::{self, Block},
|
||||
|
|
@ -68,13 +67,6 @@ const MAX_CHECKPOINT_DOWNLOAD_SECONDS: u64 = 300;
|
|||
/// then without a timeout, Zebra would deadlock.
|
||||
const BLOCK_VERIFY_TIMEOUT: Duration = Duration::from_secs(MAX_CHECKPOINT_DOWNLOAD_SECONDS);
|
||||
|
||||
/// Controls how long we wait to retry ExtendTips after it fails.
|
||||
///
|
||||
/// This timeout should be long enough to allow some of our peers to clear
|
||||
/// their connection state.
|
||||
///
|
||||
/// (ObtainTips failures use the sync restart timeout.)
|
||||
const TIPS_RETRY_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
/// Controls how long we wait to restart syncing after finishing a sync run.
|
||||
///
|
||||
/// This timeout should be long enough to:
|
||||
|
|
@ -101,7 +93,6 @@ const TIPS_RETRY_TIMEOUT: Duration = Duration::from_secs(60);
|
|||
const SYNC_RESTART_TIMEOUT: Duration = Duration::from_secs(100);
|
||||
|
||||
type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
|
||||
type ReportAndHash = (Report, block::Hash);
|
||||
|
||||
/// Helps work around defects in the bitcoin protocol by checking whether
|
||||
/// the returned hashes actually extend a chain tip.
|
||||
|
|
@ -124,13 +115,10 @@ where
|
|||
/// Used to perform ObtainTips and ExtendTips requests, with no retry logic
|
||||
/// (failover is handled using fanout).
|
||||
tip_network: Timeout<ZN>,
|
||||
/// Used to download blocks, with retry logic.
|
||||
block_network: Retry<zn::RetryLimit, Timeout<ZN>>,
|
||||
state: ZS,
|
||||
verifier: Timeout<ZV>,
|
||||
prospective_tips: HashSet<CheckedTip>,
|
||||
pending_blocks: Pin<Box<FuturesUnordered<JoinHandle<Result<block::Hash, ReportAndHash>>>>>,
|
||||
genesis_hash: block::Hash,
|
||||
downloads: Pin<Box<Downloads<Retry<zn::RetryLimit, Timeout<ZN>>, Timeout<ZV>>>>,
|
||||
}
|
||||
|
||||
/// Polls the network to determine whether further blocks are available and
|
||||
|
|
@ -155,18 +143,18 @@ where
|
|||
/// - verifier: the zebra-consensus verifier that checks the chain
|
||||
pub fn new(chain: Network, peers: ZN, state: ZS, verifier: ZV) -> Self {
|
||||
let tip_network = Timeout::new(peers.clone(), TIPS_RESPONSE_TIMEOUT);
|
||||
let block_network = ServiceBuilder::new()
|
||||
.retry(zn::RetryLimit::new(BLOCK_DOWNLOAD_RETRY_LIMIT))
|
||||
.timeout(BLOCK_DOWNLOAD_TIMEOUT)
|
||||
.service(peers);
|
||||
let verifier = Timeout::new(verifier, BLOCK_VERIFY_TIMEOUT);
|
||||
let downloads = Downloads::new(
|
||||
ServiceBuilder::new()
|
||||
.retry(zn::RetryLimit::new(BLOCK_DOWNLOAD_RETRY_LIMIT))
|
||||
.timeout(BLOCK_DOWNLOAD_TIMEOUT)
|
||||
.service(peers),
|
||||
Timeout::new(verifier, BLOCK_VERIFY_TIMEOUT),
|
||||
);
|
||||
Self {
|
||||
tip_network,
|
||||
block_network,
|
||||
state,
|
||||
verifier,
|
||||
downloads: Box::pin(downloads),
|
||||
prospective_tips: HashSet::new(),
|
||||
pending_blocks: Box::pin(FuturesUnordered::new()),
|
||||
genesis_hash: genesis_hash(chain),
|
||||
}
|
||||
}
|
||||
|
|
@ -178,61 +166,30 @@ where
|
|||
self.request_genesis().await?;
|
||||
|
||||
'sync: loop {
|
||||
// Update metrics for any ready tasks, before wiping state
|
||||
while let Some(Some(rsp)) = self.pending_blocks.next().now_or_never() {
|
||||
match rsp.expect("block download and verify tasks should not panic") {
|
||||
Ok(hash) => tracing::trace!(?hash, "verified and committed block to state"),
|
||||
Err((e, _)) => {
|
||||
tracing::trace!(?e, "sync error before restarting sync, ignoring")
|
||||
}
|
||||
}
|
||||
}
|
||||
self.update_metrics();
|
||||
|
||||
// Wipe state from prevous iterations.
|
||||
self.prospective_tips = HashSet::new();
|
||||
self.pending_blocks = Box::pin(FuturesUnordered::new());
|
||||
self.downloads.cancel_all();
|
||||
self.update_metrics();
|
||||
|
||||
tracing::info!("starting sync, obtaining new tips");
|
||||
if self.obtain_tips().await.is_err() || self.prospective_tips.is_empty() {
|
||||
self.update_metrics();
|
||||
tracing::warn!("failed to obtain tips, waiting to restart sync");
|
||||
delay_for(SYNC_RESTART_TIMEOUT).await;
|
||||
continue 'sync;
|
||||
};
|
||||
self.update_metrics();
|
||||
|
||||
while !self.prospective_tips.is_empty() {
|
||||
// Check whether any block tasks are currently ready:
|
||||
while let Some(Some(rsp)) = self.pending_blocks.next().now_or_never() {
|
||||
match rsp.expect("block download and verify tasks should not panic") {
|
||||
while let Some(Some(rsp)) = self.downloads.next().now_or_never() {
|
||||
match rsp {
|
||||
Ok(hash) => {
|
||||
tracing::trace!(?hash, "verified and committed block to state");
|
||||
}
|
||||
Err((e, hash)) => {
|
||||
// We must restart the sync on every error, unless
|
||||
// this block has already been verified.
|
||||
//
|
||||
// If we ignore other errors, the syncer can:
|
||||
// - get a long way ahead of the state, and queue
|
||||
// up a lot of unverified blocks in memory, or
|
||||
// - get into an endless error cycle.
|
||||
//
|
||||
// In particular, we must restart if the checkpoint
|
||||
// verifier has verified a block at this height, but
|
||||
// the hash is different. In that case, we want to
|
||||
// stop following an ancient side-chain.
|
||||
if self.state_contains(hash).await? {
|
||||
tracing::debug!(?e,
|
||||
"sync error in ready task, but block is already verified, ignoring");
|
||||
} else {
|
||||
tracing::warn!(
|
||||
?e,
|
||||
"sync error in ready task, waiting to restart sync"
|
||||
);
|
||||
delay_for(SYNC_RESTART_TIMEOUT).await;
|
||||
continue 'sync;
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(?e, "waiting to restart sync");
|
||||
delay_for(SYNC_RESTART_TIMEOUT).await;
|
||||
continue 'sync;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -242,44 +199,30 @@ where
|
|||
//
|
||||
// Starting to wait is interesting, but logging each wait can be
|
||||
// very verbose.
|
||||
if self.pending_blocks.len() > LOOKAHEAD_LIMIT {
|
||||
if self.downloads.in_flight() > LOOKAHEAD_LIMIT {
|
||||
tracing::info!(
|
||||
tips.len = self.prospective_tips.len(),
|
||||
pending.len = self.pending_blocks.len(),
|
||||
pending.limit = LOOKAHEAD_LIMIT,
|
||||
in_flight = self.downloads.in_flight(),
|
||||
lookahead_limit = LOOKAHEAD_LIMIT,
|
||||
"waiting for pending blocks",
|
||||
);
|
||||
}
|
||||
while self.pending_blocks.len() > LOOKAHEAD_LIMIT {
|
||||
while self.downloads.in_flight() > LOOKAHEAD_LIMIT {
|
||||
tracing::trace!(
|
||||
tips.len = self.prospective_tips.len(),
|
||||
pending.len = self.pending_blocks.len(),
|
||||
pending.limit = LOOKAHEAD_LIMIT,
|
||||
"continuing to wait for pending blocks",
|
||||
in_flight = self.downloads.in_flight(),
|
||||
lookahead_limit = LOOKAHEAD_LIMIT,
|
||||
"waiting for pending blocks",
|
||||
);
|
||||
match self
|
||||
.pending_blocks
|
||||
.next()
|
||||
.await
|
||||
.expect("pending_blocks is nonempty")
|
||||
.expect("block download and verify tasks should not panic")
|
||||
{
|
||||
|
||||
match self.downloads.next().await.expect("downloads is nonempty") {
|
||||
Ok(hash) => {
|
||||
tracing::trace!(?hash, "verified and committed block to state");
|
||||
}
|
||||
Err((e, hash)) => {
|
||||
// We must restart the sync on every error, unless
|
||||
// this block has already been verified.
|
||||
// See the comment above for details.
|
||||
if self.state_contains(hash).await? {
|
||||
tracing::debug!(?e,
|
||||
"sync error with pending above lookahead limit, but block is already verified, ignoring");
|
||||
} else {
|
||||
tracing::warn!(?e,
|
||||
"sync error with pending above lookahead limit, waiting to restart sync");
|
||||
delay_for(SYNC_RESTART_TIMEOUT).await;
|
||||
continue 'sync;
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(?e, "waiting to restart sync");
|
||||
delay_for(SYNC_RESTART_TIMEOUT).await;
|
||||
continue 'sync;
|
||||
}
|
||||
}
|
||||
self.update_metrics();
|
||||
|
|
@ -288,24 +231,12 @@ where
|
|||
// Once we're below the lookahead limit, we can keep extending the tips.
|
||||
tracing::info!(
|
||||
tips.len = self.prospective_tips.len(),
|
||||
pending.len = self.pending_blocks.len(),
|
||||
pending.limit = LOOKAHEAD_LIMIT,
|
||||
in_flight = self.downloads.in_flight(),
|
||||
lookahead_limit = LOOKAHEAD_LIMIT,
|
||||
"extending tips",
|
||||
);
|
||||
let old_tips = self.prospective_tips.clone();
|
||||
let _ = self.extend_tips().await;
|
||||
|
||||
// If ExtendTips fails, wait, then give it another shot.
|
||||
//
|
||||
// If we don't have many peers, waiting and retrying helps us
|
||||
// ignore unsolicited BlockHashes from peers.
|
||||
if self.prospective_tips.is_empty() {
|
||||
self.update_metrics();
|
||||
tracing::info!("no new tips, waiting to retry extend tips");
|
||||
delay_for(TIPS_RETRY_TIMEOUT).await;
|
||||
self.prospective_tips = old_tips;
|
||||
let _ = self.extend_tips().await;
|
||||
}
|
||||
let _ = self.extend_tips().await;
|
||||
self.update_metrics();
|
||||
}
|
||||
|
||||
|
|
@ -567,16 +498,13 @@ where
|
|||
//
|
||||
// So we just download and verify the genesis block here.
|
||||
while !self.state_contains(self.genesis_hash).await? {
|
||||
self.request_blocks(vec![self.genesis_hash]).await?;
|
||||
match self
|
||||
.pending_blocks
|
||||
.next()
|
||||
self.downloads
|
||||
.queue_download(self.genesis_hash)
|
||||
.await
|
||||
.expect("inserted a download and verify request")
|
||||
.expect("block download and verify tasks should not panic")
|
||||
{
|
||||
.map_err(|e| eyre!(e))?;
|
||||
match self.downloads.next().await.expect("downloads is nonempty") {
|
||||
Ok(hash) => tracing::trace!(?hash, "verified and committed block to state"),
|
||||
Err((e, _)) => {
|
||||
Err(e) => {
|
||||
tracing::warn!(?e, "could not download or verify genesis block, retrying")
|
||||
}
|
||||
}
|
||||
|
|
@ -589,65 +517,16 @@ where
|
|||
async fn request_blocks(&mut self, hashes: Vec<block::Hash>) -> Result<(), Report> {
|
||||
tracing::debug!(hashes.len = hashes.len(), "requesting blocks");
|
||||
for hash in hashes.into_iter() {
|
||||
// Avoid re-downloading blocks that have already been verified.
|
||||
// This is particularly important for nodes on slow or unreliable
|
||||
// networks.
|
||||
// If we've queued the download of a hash behind our current chain tip,
|
||||
// we've been given bad responses by our peers. Abort the sync and restart.
|
||||
if self.state_contains(hash).await? {
|
||||
tracing::debug!(
|
||||
?hash,
|
||||
"request_blocks: Unexpected duplicate hash: already in state"
|
||||
);
|
||||
continue;
|
||||
return Err(eyre!("queued download of hash behind our chain tip"));
|
||||
}
|
||||
// We construct the block requests sequentially, waiting
|
||||
// for the peer set to be ready to process each request. This
|
||||
// ensures that we start block downloads in the order we want them
|
||||
// (though they may resolve out of order), and it means that we
|
||||
// respect backpressure. Otherwise, if we waited for readiness and
|
||||
// did the service call in the spawned tasks, all of the spawned
|
||||
// tasks would race each other waiting for the network to become
|
||||
// ready.
|
||||
let block_req = self
|
||||
.block_network
|
||||
.ready_and()
|
||||
|
||||
self.downloads
|
||||
.queue_download(hash)
|
||||
.await
|
||||
.map_err(|e| eyre!(e))?
|
||||
.call(zn::Request::BlocksByHash(iter::once(hash).collect()));
|
||||
|
||||
tracing::trace!(?hash, "requested block");
|
||||
|
||||
// This span is used to help diagnose sync warnings
|
||||
let span = tracing::warn_span!("block_fetch_verify", %hash);
|
||||
let mut verifier = self.verifier.clone();
|
||||
let task = tokio::spawn(
|
||||
async move {
|
||||
let block = match block_req.await {
|
||||
Ok(zn::Response::Blocks(blocks)) => blocks
|
||||
.into_iter()
|
||||
.next()
|
||||
.expect("successful response has the block in it"),
|
||||
Ok(_) => unreachable!("wrong response to block request"),
|
||||
// Make sure we can distinguish download and verify timeouts
|
||||
Err(e) => Err(eyre!(e)).wrap_err("failed to download block")?,
|
||||
};
|
||||
metrics::counter!("sync.downloaded.block.count", 1);
|
||||
|
||||
let result = verifier
|
||||
.ready_and()
|
||||
.await
|
||||
.map_err(|e| eyre!(e))
|
||||
.wrap_err("verifier service failed to be ready")?
|
||||
.call(block)
|
||||
.await
|
||||
.map_err(|e| eyre!(e))
|
||||
.wrap_err("failed to verify block")?;
|
||||
metrics::counter!("sync.verified.block.count", 1);
|
||||
Result::<block::Hash, Report>::Ok(result)
|
||||
}
|
||||
.instrument(span)
|
||||
.map_err(move |e| (e, hash)),
|
||||
);
|
||||
self.pending_blocks.push(task);
|
||||
.map_err(|e| eyre!(e))?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
|
@ -678,7 +557,10 @@ where
|
|||
"sync.prospective_tips.len",
|
||||
self.prospective_tips.len() as i64
|
||||
);
|
||||
metrics::gauge!("sync.pending_blocks.len", self.pending_blocks.len() as i64);
|
||||
metrics::gauge!(
|
||||
"sync.downloads.in_flight",
|
||||
self.downloads.in_flight() as i64
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -700,10 +582,6 @@ mod test {
|
|||
"Sync restart should allow for pending and buffered requests to complete"
|
||||
);
|
||||
|
||||
assert!(
|
||||
TIPS_RETRY_TIMEOUT < BLOCK_VERIFY_TIMEOUT,
|
||||
"Verify timeout should allow for retrying tips"
|
||||
);
|
||||
assert!(
|
||||
SYNC_RESTART_TIMEOUT < BLOCK_VERIFY_TIMEOUT,
|
||||
"Verify timeout should allow for a sync restart"
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
|
|||
|
||||
/// Represents a [`Stream`] of download and verification tasks during chain sync.
|
||||
#[pin_project]
|
||||
#[derive(Debug)]
|
||||
pub struct Downloads<ZN, ZV>
|
||||
where
|
||||
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + 'static,
|
||||
|
|
|
|||
Loading…
Reference in New Issue