Fix sync algorithm. (#887)

* checkpoint: reject older of duplicate verification requests.

If we get a duplicate block verification request, we should drop the older one
in favor of the newer one, because the older request is likely to have been
canceled.  Previously, this code would accept up to four duplicate verification
requests, then fail all subsequent ones.

* sync: add a timeout layer to block requests.

Note that if this timeout is too short, we'll bring down the peer set in a
retry storm.

* sync: restart syncing on error

Restart the syncing process when an error occurs, rather than ignoring it.
Restarting means we discard all tips and start over with a new block locator,
so we can have another chance to "unstuck" ourselves.

* sync: additional debug info

* sync: handle lookahead limit correctly.

Instead of extracting all the completed task results, the previous code pulled
results out until there were fewer tasks than the lookahead limit, then
stopped.  This meant that completed tasks could be left until the limit was
exceeded again.  Instead, extract all completed results, and use the number of
pending tasks to decide whether to extend the tip or wait for blocks to finish.

* network: add debug instrumentation to retry policy

* sync: instrument the spawned task

* sync: streamline ObtainTips/ExtendTips logic & tracing

This change does three things:

1.  It aligns the implementation of ObtainTips and ExtendTips so that they use
the same deduplication method.  This means that when debugging we only have one
deduplication algorithm to focus on.

2.  It streamlines the tracing output to not include information already
included in spans. Both obtain_tips and extend_tips have their own spans
attached to the events, so it's not necessary to add Scope: prefixes in
messages.

3.  It changes the messages to be focused on reporting the actual
events rather than the interpretation of the events (e.g., "got genesis hash in
response" rather than "peer could not extend tip").  The motivation for this
change is that when debugging, the interpretation of events is already known to
be incorrect, in the sense that the mental model of the code (no bug) does not
match its behavior (has bug), so presenting minimally-interpreted events forces
interpretation relative to the actual code.

* sync: hack to work around zcashd behavior

* sync: localize debug statement in extend_tips

* sync: change algorithm to define tips as pairs of hashes.

This is different enough from the existing description that its comments no
longer apply, so I removed them.  A further chunk of work is to change the sync
RFC to document this algorithm.

* sync: reduce block timeout

* state: add resource limits for sled

Closes #888

* sync: add a restart timeout constant

* sync: de-pub constants
This commit is contained in:
Henry de Valence 2020-08-12 16:48:01 -07:00 committed by GitHub
parent 0aea09c1ac
commit a79ce97957
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 193 additions and 200 deletions

View File

@ -412,6 +412,18 @@ impl CheckpointVerifier {
.entry(height)
.or_insert_with(|| QueuedBlockList::with_capacity(1));
let hash = block.hash();
for qb in qblocks.iter_mut() {
if qb.hash == hash {
let old_tx = std::mem::replace(&mut qb.tx, tx);
let e = "rejected older of duplicate verification requests".into();
tracing::debug!(?e);
let _ = old_tx.send(Err(e));
return rx;
}
}
// Memory DoS resistance: limit the queued blocks at each height
if qblocks.len() >= MAX_QUEUED_BLOCKS_PER_HEIGHT {
let e = "too many queued blocks at this height".into();
@ -421,7 +433,6 @@ impl CheckpointVerifier {
}
// Add the block to the list of queued blocks at this height
let hash = block.as_ref().into();
let new_qblock = QueuedBlock { block, hash, tx };
// This is a no-op for the first block in each QueuedBlockList.
qblocks.reserve_exact(1);

View File

@ -18,11 +18,12 @@ impl RetryLimit {
}
}
impl<Req: Clone, Res, E> Policy<Req, Res, E> for RetryLimit {
impl<Req: Clone + std::fmt::Debug, Res, E: std::fmt::Debug> Policy<Req, Res, E> for RetryLimit {
type Future = future::Ready<Self>;
fn retry(&self, _: &Req, result: Result<&Res, &E>) -> Option<Self::Future> {
if result.is_err() {
fn retry(&self, req: &Req, result: Result<&Res, &E>) -> Option<Self::Future> {
if let Err(e) = result {
if self.remaining_tries > 0 {
tracing::debug!(?req, ?e, remaining_tries = self.remaining_tries, "retrying");
Some(future::ready(RetryLimit {
remaining_tries: self.remaining_tries - 1,
}))

View File

@ -68,6 +68,9 @@ pub struct Config {
/// | Windows | `{FOLDERID_LocalAppData}\zebra` | C:\Users\Alice\AppData\Local\zebra |
/// | Other | `std::env::current_dir()/cache` | |
pub cache_dir: PathBuf,
/// The maximum number of bytes to use caching data in memory.
pub memory_cache_bytes: u64,
}
impl Config {
@ -80,7 +83,10 @@ impl Config {
};
let path = self.cache_dir.join(net_dir).join("state");
sled::Config::default().path(path)
sled::Config::default()
.path(path)
.cache_capacity(self.memory_cache_bytes)
.mode(sled::Mode::LowSpace)
}
}
@ -89,7 +95,10 @@ impl Default for Config {
let cache_dir = dirs::cache_dir()
.unwrap_or_else(|| std::env::current_dir().unwrap().join("cache"))
.join("zebra");
Self { cache_dir }
Self {
cache_dir,
memory_cache_bytes: 512 * 1024 * 1024,
}
}
}

View File

@ -123,7 +123,13 @@ async fn check_transcripts(network: Network) -> Result<(), Report> {
let storage_guard = TempDir::new("")?;
let cache_dir = storage_guard.path().to_owned();
let service = on_disk::init(Config { cache_dir }, network);
let service = on_disk::init(
Config {
cache_dir,
..Config::default()
},
network,
);
let transcript = Transcript::from(transcript_data.iter().cloned());
/// SPANDOC: check the on disk service against the transcript
transcript.check(service).await?;

View File

@ -20,7 +20,7 @@ pub fn test_cmd(path: &str) -> Result<(Command, impl Drop)> {
fs::File::create(dir.path().join("zebrad.toml"))?.write_all(
format!(
"[state]\ncache_dir = '{}'",
"[state]\ncache_dir = '{}'\nmemory_cache_bytes = 256000000",
cache_dir
.into_os_string()
.into_string()

View File

@ -1,10 +1,11 @@
use std::{collections::HashSet, iter, pin::Pin, sync::Arc, time::Duration};
use color_eyre::eyre::{eyre, Report};
use futures::future::FutureExt;
use futures::stream::{FuturesUnordered, StreamExt};
use tokio::{task::JoinHandle, time::delay_for};
use tower::{retry::Retry, Service, ServiceExt};
use tracing_futures::{Instrument, Instrumented};
use tower::{builder::ServiceBuilder, retry::Retry, timeout::Timeout, Service, ServiceExt};
use tracing_futures::Instrument;
use zebra_chain::{
block::{Block, BlockHeaderHash},
@ -20,7 +21,19 @@ const FANOUT: usize = checkpoint::MAX_QUEUED_BLOCKS_PER_HEIGHT;
/// Controls how far ahead of the chain tip the syncer tries to download before
/// waiting for queued verifications to complete. Set to twice the maximum
/// checkpoint distance.
pub const LOOKAHEAD_LIMIT: usize = checkpoint::MAX_CHECKPOINT_HEIGHT_GAP * 2;
const LOOKAHEAD_LIMIT: usize = checkpoint::MAX_CHECKPOINT_HEIGHT_GAP * 2;
/// Controls how long we wait for a block download request to complete.
const BLOCK_TIMEOUT: Duration = Duration::from_secs(6);
/// Controls how long we wait to restart syncing after finishing a sync run.
const SYNC_RESTART_TIMEOUT: Duration = Duration::from_secs(20);
/// Helps work around defects in the bitcoin protocol by checking whether
/// the returned hashes actually extend a chain tip.
#[derive(Debug, Hash, PartialEq, Eq)]
struct CheckedTip {
tip: BlockHeaderHash,
expected_next: BlockHeaderHash,
}
#[derive(Debug)]
pub struct Syncer<ZN, ZS, ZV>
@ -35,12 +48,11 @@ where
/// Used to perform extendtips requests, with no retry logic (failover is handled using fanout).
tip_network: ZN,
/// Used to download blocks, with retry logic.
block_network: Retry<RetryLimit, ZN>,
block_network: Retry<RetryLimit, Timeout<ZN>>,
state: ZS,
verifier: ZV,
prospective_tips: HashSet<BlockHeaderHash>,
pending_blocks:
Pin<Box<FuturesUnordered<Instrumented<JoinHandle<Result<BlockHeaderHash, Error>>>>>>,
prospective_tips: HashSet<CheckedTip>,
pending_blocks: Pin<Box<FuturesUnordered<JoinHandle<Result<BlockHeaderHash, Error>>>>>,
genesis_hash: BlockHeaderHash,
}
@ -59,10 +71,13 @@ where
/// - state: the zebra-state that stores the chain
/// - verifier: the zebra-consensus verifier that checks the chain
pub fn new(chain: Network, peers: ZN, state: ZS, verifier: ZV) -> Self {
let retry_peers = Retry::new(RetryLimit::new(10), peers.clone());
let block_network = ServiceBuilder::new()
.retry(RetryLimit::new(3))
.timeout(BLOCK_TIMEOUT)
.service(peers.clone());
Self {
tip_network: peers,
block_network: retry_peers,
block_network,
state,
verifier,
prospective_tips: HashSet::new(),
@ -77,50 +92,68 @@ where
// due to protocol limitations
self.request_genesis().await?;
loop {
self.obtain_tips().await?;
'sync: loop {
// Wipe state from prevous iterations.
self.prospective_tips = HashSet::new();
self.pending_blocks = Box::pin(FuturesUnordered::new());
tracing::info!("starting sync, obtaining new tips");
if self.obtain_tips().await.is_err() {
tracing::warn!("failed to obtain tips, waiting to restart sync");
delay_for(SYNC_RESTART_TIMEOUT).await;
continue 'sync;
};
self.update_metrics();
// ObtainTips Step 6
//
// If there are any prospective tips, call ExtendTips.
// Continue this step until there are no more prospective tips.
while !self.prospective_tips.is_empty() {
tracing::debug!("extending prospective tips");
// Check whether any block tasks are currently ready:
while let Some(Some(rsp)) = self.pending_blocks.next().now_or_never() {
match rsp.expect("block download tasks should not panic") {
Ok(hash) => tracing::debug!(?hash, "verified and committed block to state"),
Err(e) => {
tracing::info!(?e, "restarting sync");
continue 'sync;
}
}
self.update_metrics();
}
self.extend_tips().await?;
self.update_metrics();
// Check whether we need to wait for existing block download tasks to finish
while self.pending_blocks.len() > LOOKAHEAD_LIMIT {
// If we have too many pending tasks, wait for one to finish:
if self.pending_blocks.len() > LOOKAHEAD_LIMIT {
tracing::debug!(
tips.len = self.prospective_tips.len(),
pending.len = self.pending_blocks.len(),
pending.limit = LOOKAHEAD_LIMIT,
"waiting for pending blocks",
);
match self
.pending_blocks
.next()
.await
.expect("already checked there's at least one pending block task")
.expect("pending_blocks is nonempty")
.expect("block download tasks should not panic")
{
Ok(hash) => tracing::debug!(?hash, "verified and committed block to state"),
// This is a non-transient error indicating either that
// we've repeatedly missed a block we need or that we've
// repeatedly missed a bad block suggested by a peer
// feeding us bad hashes.
//
// TODO(hdevalence): handle interruptions in the chain
// sync process. this should detect when we've stopped
// making progress (probably using a timeout), then
// continue the loop with a new invocation of
// obtain_tips(), which will restart block downloads.
Err(e) => tracing::error!(?e, "potentially transient error"),
};
Err(e) => {
tracing::info!(?e, "restarting sync");
continue 'sync;
}
}
} else {
// Otherwise, we can keep extending the tips.
tracing::info!(
tips.len = self.prospective_tips.len(),
pending.len = self.pending_blocks.len(),
pending.limit = LOOKAHEAD_LIMIT,
"extending tips",
);
let _ = self.extend_tips().await;
}
// We just added a bunch of failures, update the metrics now,
// because we might be about to reset or delay.
self.update_metrics();
}
delay_for(Duration::from_secs(15)).await;
tracing::info!("exhausted tips, waiting to restart sync");
delay_for(SYNC_RESTART_TIMEOUT).await;
}
}
@ -128,10 +161,6 @@ where
/// multiple peers
#[instrument(skip(self))]
async fn obtain_tips(&mut self) -> Result<(), Report> {
// ObtainTips Step 1
//
// Query the current state to construct the sequence of hashes: handled by
// the caller
let block_locator = self
.state
.ready_and()
@ -151,10 +180,6 @@ where
tracing::debug!(?block_locator, "trying to obtain new chain tips");
// ObtainTips Step 2
//
// Make a FindBlocksByHash request to the network F times, where F is a
// fanout parameter, to get resp1, ..., respF
let mut requests = FuturesUnordered::new();
for _ in 0..FANOUT {
requests.push(
@ -173,19 +198,6 @@ where
while let Some(res) = requests.next().await {
match res.map_err::<Report, _>(|e| eyre!(e)) {
Ok(zn::Response::BlockHeaderHashes(hashes)) => {
if hashes.is_empty() {
tracing::debug!("skipping empty response");
continue;
} else {
tracing::debug!(hashes.len = hashes.len(), "processing response");
}
// ObtainTips Step 3
//
// For each response, starting from the beginning of the
// list, prune any block hashes already included in the
// state, stopping at the first unknown hash to get resp1',
// ..., respF'. (These lists may be empty).
let mut first_unknown = None;
for (i, &hash) in hashes.iter().enumerate() {
if !self.state_contains(hash).await? {
@ -194,41 +206,37 @@ where
}
}
// Hashes will be empty if we know about all the blocks in the response.
if first_unknown.is_none() {
tracing::debug!("ObtainTips: all hashes are known");
tracing::debug!(hashes.len = ?hashes.len(), ?first_unknown);
let unknown_hashes = if let Some(index) = first_unknown {
&hashes[index..]
} else {
continue;
}
let first_unknown = first_unknown.expect("already checked for None");
tracing::debug!(
first_unknown,
"found index of first unknown hash in response"
);
};
let unknown_hashes = &hashes[first_unknown..];
let new_tip = *unknown_hashes
.last()
.expect("already checked that unknown hashes isn't empty");
tracing::trace!(?unknown_hashes);
// ObtainTips Step 4:
// Combine the last elements of each list into a set; this is the
// set of prospective tips.
if !download_set.contains(&new_tip) {
tracing::debug!(hashes.len = ?hashes.len(), ?new_tip, "adding new prospective tip");
let new_tip = if let Some(end) = unknown_hashes.rchunks_exact(2).next() {
CheckedTip {
tip: end[0],
expected_next: end[1],
}
} else {
tracing::debug!("discarding response that extends only one block");
continue;
};
if !download_set.contains(&new_tip.expected_next) {
tracing::debug!(?new_tip, "adding new prospective tip");
self.prospective_tips.insert(new_tip);
} else {
tracing::debug!(?new_tip, "discarding tip already queued for download");
}
// ObtainTips Step 5.1
//
// Combine all elements of each list into a set
let prev_download_len = download_set.len();
download_set.extend(unknown_hashes);
let new_download_len = download_set.len();
tracing::debug!(
prev_download_len,
new_download_len,
new_hashes = new_download_len - prev_download_len,
"added hashes to download set"
);
@ -239,11 +247,8 @@ where
}
}
tracing::debug!(?self.prospective_tips, "ObtainTips: downloading blocks for tips");
tracing::debug!(?self.prospective_tips);
// ObtainTips Step 5.2
//
// queue download and verification of those blocks.
self.request_blocks(download_set.into_iter().collect())
.await?;
@ -252,18 +257,11 @@ where
#[instrument(skip(self))]
async fn extend_tips(&mut self) -> Result<(), Report> {
// Extend Tips 1
//
// remove all prospective tips and iterate over them individually
let tips = std::mem::take(&mut self.prospective_tips);
tracing::debug!(?tips, "extending tip set");
let mut download_set = HashSet::new();
for tip in tips {
// ExtendTips Step 2
//
// Create a FindBlocksByHash request consisting of just the
// prospective tip. Send this request to the network F times
tracing::debug!(?tip, "extending tip");
let mut responses = FuturesUnordered::new();
for _ in 0..FANOUT {
responses.push(
@ -272,78 +270,54 @@ where
.await
.map_err(|e| eyre!(e))?
.call(zn::Request::FindBlocks {
known_blocks: vec![tip],
known_blocks: vec![tip.tip],
stop: None,
}),
);
}
while let Some(res) = responses.next().await {
match res.map_err::<Report, _>(|e| eyre!(e)) {
Ok(zn::Response::BlockHeaderHashes(mut hashes)) => {
// ExtendTips Step 3
//
// For each response, check whether the first hash in the
// response is the genesis block; if so, discard the response.
// It indicates that the remote peer does not have any blocks
// following the prospective tip.
match (hashes.first(), hashes.len()) {
(None, _) => {
tracing::debug!("ExtendTips: skipping empty response");
continue;
}
(_, 1) => {
tracing::debug!("ExtendTips: skipping length-1 response, in case it's an unsolicited inv message");
continue;
}
(Some(hash), _) if (hash == &self.genesis_hash) => {
tracing::debug!(
"ExtendTips: skipping response, peer could not extend the tip"
);
continue;
}
(Some(&hash), _) => {
// Check for hashes we've already seen.
// This happens a lot near the end of the chain.
// This check reduces the number of duplicate
// blocks, but it is not required for
// correctness.
if self.state_contains(hash).await? {
tracing::debug!(
?hash,
"ExtendTips: skipping response, peer returned a duplicate hash: already in state"
);
continue;
}
}
}
Ok(zn::Response::BlockHeaderHashes(hashes)) => {
tracing::debug!(first = ?hashes.first(), len = ?hashes.len());
let new_tip = hashes.pop().expect("expected: hashes must have len > 0");
let unknown_hashes = match hashes.split_first() {
None => continue,
Some((expected_hash, rest)) if expected_hash == &tip.expected_next => {
rest
}
Some((other_hash, _rest)) => {
tracing::debug!(?other_hash, ?tip.expected_next, "discarding response with unexpected next hash");
continue;
}
};
// Check for tips we've already seen
// TODO: remove this check once the sync service is more reliable
if self.state_contains(new_tip).await? {
tracing::debug!(
?new_tip,
"ExtendTips: Unexpected duplicate tip from peer: already in state"
);
tracing::trace!(?unknown_hashes);
let new_tip = if let Some(end) = unknown_hashes.rchunks_exact(2).next() {
CheckedTip {
tip: end[0],
expected_next: end[1],
}
} else {
tracing::debug!("discarding response that extends only one block");
continue;
}
};
// ExtendTips Step 4
//
// Combine the last elements of the remaining responses into
// a set, and add this set to the set of prospective tips.
tracing::debug!(hashes.len = ?hashes.len(), ?new_tip, "ExtendTips: extending to new tip");
let _ = self.prospective_tips.insert(new_tip);
tracing::trace!(?hashes);
if !download_set.contains(&new_tip.expected_next) {
tracing::debug!(?new_tip, "adding new prospective tip");
self.prospective_tips.insert(new_tip);
} else {
tracing::debug!(?new_tip, "discarding tip already queued for download");
}
let prev_download_len = download_set.len();
download_set.extend(hashes);
download_set.extend(unknown_hashes);
let new_download_len = download_set.len();
tracing::debug!(
prev_download_len,
new_download_len,
new_hashes = new_download_len - prev_download_len,
"ExtendTips: added hashes to download set"
"added hashes to download set"
);
}
Ok(_) => unreachable!("network returned wrong response"),
@ -353,25 +327,8 @@ where
}
}
// ExtendTips Step ??
//
// Remove tips that are already included behind one of the other
// returned tips
self.prospective_tips
.retain(|tip| !download_set.contains(tip));
tracing::debug!(?self.prospective_tips, "ExtendTips: downloading blocks for tips");
// ExtendTips Step 5
//
// Combine all elements of the remaining responses into a
// set, and queue download and verification of those blocks
self.request_blocks(
download_set
.into_iter()
.chain(self.prospective_tips.iter().cloned())
.collect(),
)
.await?;
self.request_blocks(download_set.into_iter().collect())
.await?;
Ok(())
}
@ -387,8 +344,18 @@ where
// - the genesis hash is used as a placeholder for "no matches".
//
// So we just queue the genesis block here.
if !self.state_contains(self.genesis_hash).await? {
while !self.state_contains(self.genesis_hash).await? {
self.request_blocks(vec![self.genesis_hash]).await?;
match self
.pending_blocks
.next()
.await
.expect("inserted a download request")
.expect("block download tasks should not panic")
{
Ok(hash) => tracing::debug!(?hash, "verified and committed block to state"),
Err(e) => tracing::warn!(?e, "could not download genesis block, retrying"),
}
}
Ok(())
@ -420,24 +387,29 @@ where
.await
.map_err(|e| eyre!(e))?
.call(zn::Request::BlocksByHash(iter::once(hash).collect()));
tracing::debug!(?hash, "requested block");
let span = tracing::info_span!("block_fetch_verify", ?hash);
let mut verifier = self.verifier.clone();
let task = tokio::spawn(async move {
let block = match block_req.await {
Ok(zn::Response::Blocks(blocks)) => blocks
.into_iter()
.next()
.expect("successful response has the block in it"),
Ok(_) => unreachable!("wrong response to block request"),
Err(e) => return Err(e),
};
metrics::counter!("sync.downloaded_blocks", 1);
let task = tokio::spawn(
async move {
let block = match block_req.await {
Ok(zn::Response::Blocks(blocks)) => blocks
.into_iter()
.next()
.expect("successful response has the block in it"),
Ok(_) => unreachable!("wrong response to block request"),
Err(e) => return Err(e),
};
metrics::counter!("sync.downloaded_blocks", 1);
let result = verifier.ready_and().await?.call(block).await;
metrics::counter!("sync.verified_blocks", 1);
result
})
.instrument(span);
let result = verifier.ready_and().await?.call(block).await;
metrics::counter!("sync.verified_blocks", 1);
result
}
.instrument(span),
);
self.pending_blocks.push(task);
}
@ -467,18 +439,12 @@ where
}
}
/// Update metrics gauges, and create a trace containing metrics.
fn update_metrics(&self) {
metrics::gauge!(
"sync.prospective_tips.len",
self.prospective_tips.len() as i64
);
metrics::gauge!("sync.pending_blocks.len", self.pending_blocks.len() as i64);
tracing::info!(
tips.len = self.prospective_tips.len(),
pending.len = self.pending_blocks.len(),
pending.limit = LOOKAHEAD_LIMIT,
);
}
}