sync: add backpressure to syncer

Closes #617.
Closes #698.

The remaining work on the syncer is alluded to in a new comment:

1. Correctly constructing a block locator object
2. Detecting when we've stopped making progress syncing and restarting obtain_tips.
This commit is contained in:
Henry de Valence 2020-07-21 13:50:38 -07:00
parent b8b1239ac4
commit 1047d2f690
2 changed files with 181 additions and 159 deletions

112
Cargo.lock generated
View File

@ -34,9 +34,9 @@ source = "git+https://github.com/yaahc/abscissa.git?rev=41d342a9344e38442b2211b0
dependencies = [
"darling",
"ident_case",
"proc-macro2 1.0.18",
"proc-macro2 1.0.19",
"quote 1.0.7",
"syn 1.0.34",
"syn 1.0.35",
"synstructure",
]
@ -392,12 +392,12 @@ dependencies = [
[[package]]
name = "crossbeam-channel"
version = "0.4.2"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cced8691919c02aac3cb0a1bc2e9b73d89e832bf9a06fc579d4e71b68a2da061"
checksum = "09ee0cc8804d5393478d743b035099520087a5186f3b93fa58cec08fa62407b6"
dependencies = [
"cfg-if",
"crossbeam-utils",
"maybe-uninit",
]
[[package]]
@ -448,6 +448,16 @@ dependencies = [
"lazy_static",
]
[[package]]
name = "ctor"
version = "0.1.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39858aa5bac06462d4dd4b9164848eb81ffc4aa5c479746393598fd193afa227"
dependencies = [
"quote 1.0.7",
"syn 1.0.35",
]
[[package]]
name = "curve25519-dalek"
version = "2.1.0"
@ -480,10 +490,10 @@ checksum = "f0c960ae2da4de88a91b2d920c2a7233b400bc33cb28453a2987822d8392519b"
dependencies = [
"fnv",
"ident_case",
"proc-macro2 1.0.18",
"proc-macro2 1.0.19",
"quote 1.0.7",
"strsim 0.9.3",
"syn 1.0.34",
"syn 1.0.35",
]
[[package]]
@ -494,7 +504,7 @@ checksum = "d9b5a2f4ac4969822c62224815d069952656cadc7084fdca9751e6d959189b72"
dependencies = [
"darling_core",
"quote 1.0.7",
"syn 1.0.34",
"syn 1.0.35",
]
[[package]]
@ -521,9 +531,9 @@ version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adc2ab4d5a16117f9029e9a6b5e4e79f4c67f6519bc134210d4d4a04ba31f41b"
dependencies = [
"proc-macro2 1.0.18",
"proc-macro2 1.0.19",
"quote 1.0.7",
"syn 1.0.34",
"syn 1.0.35",
]
[[package]]
@ -665,9 +675,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0b5a30a4328ab5473878237c447333c093297bded83a4983d10f4deea240d39"
dependencies = [
"proc-macro-hack",
"proc-macro2 1.0.18",
"proc-macro2 1.0.19",
"quote 1.0.7",
"syn 1.0.34",
"syn 1.0.35",
]
[[package]]
@ -774,9 +784,9 @@ version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90454ce4de40b7ca6a8968b5ef367bdab48413962588d0d2b1638d60090c35d7"
dependencies = [
"proc-macro2 1.0.18",
"proc-macro2 1.0.19",
"quote 1.0.7",
"syn 1.0.34",
"syn 1.0.35",
]
[[package]]
@ -977,9 +987,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
version = "0.2.72"
version = "0.2.73"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9f8082297d534141b30c8d39e9b1773713ab50fdbe4ff30f750d063b3bfd701"
checksum = "bd7d4bd64732af4bf3a67f367c27df8520ad7e230c5817b8ff485864d80242b9"
[[package]]
name = "linked-hash-map"
@ -1007,9 +1017,9 @@ dependencies = [
[[package]]
name = "log"
version = "0.4.8"
version = "0.4.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14b6052be84e6b71ab17edffc2eeabf5c2c3ae1fdb464aae35ac50c67a44e1f7"
checksum = "4fabed175da42fed1fa0746b0ea71f412aa9d35e76e95e59b192c64b9dc2bf8b"
dependencies = [
"cfg-if",
]
@ -1344,9 +1354,9 @@ version = "0.4.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a0ffd45cf79d88737d7cc85bfd5d2894bee1139b356e616fe85dc389c61aaf7"
dependencies = [
"proc-macro2 1.0.18",
"proc-macro2 1.0.19",
"quote 1.0.7",
"syn 1.0.34",
"syn 1.0.35",
]
[[package]]
@ -1374,9 +1384,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc175e9777c3116627248584e8f8b3e2987405cabe1c0adf7d1dd28f09dc7880"
dependencies = [
"proc-macro-error-attr",
"proc-macro2 1.0.18",
"proc-macro2 1.0.19",
"quote 1.0.7",
"syn 1.0.34",
"syn 1.0.35",
"version_check",
]
@ -1386,9 +1396,9 @@ version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3cc9795ca17eb581285ec44936da7fc2335a3f34f2ddd13118b6f4d515435c50"
dependencies = [
"proc-macro2 1.0.18",
"proc-macro2 1.0.19",
"quote 1.0.7",
"syn 1.0.34",
"syn 1.0.35",
"syn-mid",
"version_check",
]
@ -1416,9 +1426,9 @@ dependencies = [
[[package]]
name = "proc-macro2"
version = "1.0.18"
version = "1.0.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "beae6331a816b1f65d04c45b078fd8e6c93e8071771f41b8163255bbd8d7c8fa"
checksum = "04f5f085b5d71e2188cb8271e5da0161ad52c3f227a661a3c135fdf28e258b12"
dependencies = [
"unicode-xid 0.2.1",
]
@ -1456,10 +1466,12 @@ dependencies = [
[[package]]
name = "quanta"
version = "0.3.1"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f4f7a1905379198075914bc93d32a5465c40474f90a078bb13439cb00c547bcc"
checksum = "21484fda3d8ad7affee37755c77a5d0da527543f0af0c7f731c14e2215645d39"
dependencies = [
"atomic-shim",
"ctor",
"libc",
"winapi 0.3.9",
]
@ -1485,7 +1497,7 @@ version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa563d17ecb180e500da1cfd2b028310ac758de548efdd203e18f283af693f37"
dependencies = [
"proc-macro2 1.0.18",
"proc-macro2 1.0.19",
]
[[package]]
@ -1753,9 +1765,9 @@ version = "1.0.114"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a0be94b04690fbaed37cddffc5c134bf537c8e3329d53e982fe04c374978f8e"
dependencies = [
"proc-macro2 1.0.18",
"proc-macro2 1.0.19",
"quote 1.0.7",
"syn 1.0.34",
"syn 1.0.35",
]
[[package]]
@ -1890,9 +1902,9 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5254766110c377a921c002ca0775d4e384ba69af951fc4329d9dd77af2c25763"
dependencies = [
"proc-macro2 1.0.18",
"proc-macro2 1.0.19",
"quote 1.0.7",
"syn 1.0.34",
"syn 1.0.35",
]
[[package]]
@ -1926,9 +1938,9 @@ checksum = "510413f9de616762a4fbeab62509bf15c729603b72d7cd71280fbca431b1c118"
dependencies = [
"heck",
"proc-macro-error",
"proc-macro2 1.0.18",
"proc-macro2 1.0.19",
"quote 1.0.7",
"syn 1.0.34",
"syn 1.0.35",
]
[[package]]
@ -1950,11 +1962,11 @@ dependencies = [
[[package]]
name = "syn"
version = "1.0.34"
version = "1.0.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "936cae2873c940d92e697597c5eee105fb570cd5689c695806f672883653349b"
checksum = "fb7f4c519df8c117855e19dd8cc851e89eb746fe7a73f0157e0d95fdec5369b0"
dependencies = [
"proc-macro2 1.0.18",
"proc-macro2 1.0.19",
"quote 1.0.7",
"unicode-xid 0.2.1",
]
@ -1965,9 +1977,9 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7be3539f6c128a931cf19dcee741c1af532c7fd387baa739c03dd2e96479338a"
dependencies = [
"proc-macro2 1.0.18",
"proc-macro2 1.0.19",
"quote 1.0.7",
"syn 1.0.34",
"syn 1.0.35",
]
[[package]]
@ -1976,9 +1988,9 @@ version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b834f2d66f734cb897113e34aaff2f1ab4719ca946f9a7358dba8f8064148701"
dependencies = [
"proc-macro2 1.0.18",
"proc-macro2 1.0.19",
"quote 1.0.7",
"syn 1.0.34",
"syn 1.0.35",
"unicode-xid 0.2.1",
]
@ -2039,9 +2051,9 @@ version = "1.0.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd80fc12f73063ac132ac92aceea36734f04a1d93c1240c6944e23a3b8841793"
dependencies = [
"proc-macro2 1.0.18",
"proc-macro2 1.0.19",
"quote 1.0.7",
"syn 1.0.34",
"syn 1.0.35",
]
[[package]]
@ -2093,9 +2105,9 @@ version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0c3acc6aa564495a0f2e1d59fab677cd7f81a19994cfc7f3ad0e64301560389"
dependencies = [
"proc-macro2 1.0.18",
"proc-macro2 1.0.19",
"quote 1.0.7",
"syn 1.0.34",
"syn 1.0.35",
]
[[package]]
@ -2314,9 +2326,9 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0693bf8d6f2bf22c690fc61a9d21ac69efdbb894a17ed596b9af0f01e64b84b"
dependencies = [
"proc-macro2 1.0.18",
"proc-macro2 1.0.19",
"quote 1.0.7",
"syn 1.0.34",
"syn 1.0.35",
]
[[package]]
@ -2722,8 +2734,8 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de251eec69fc7c1bc3923403d18ececb929380e016afe103da75f396704f8ca2"
dependencies = [
"proc-macro2 1.0.18",
"proc-macro2 1.0.19",
"quote 1.0.7",
"syn 1.0.34",
"syn 1.0.35",
"synstructure",
]

View File

@ -1,10 +1,10 @@
use std::{collections::HashSet, iter, sync::Arc, time::Duration};
use std::{collections::HashSet, iter, pin::Pin, sync::Arc, time::Duration};
use color_eyre::eyre::{eyre, Report};
use futures::stream::{FuturesUnordered, StreamExt};
use tokio::time::delay_for;
use tokio::{task::JoinHandle, time::delay_for};
use tower::{retry::Retry, Service, ServiceExt};
use tracing_futures::Instrument;
use tracing_futures::{Instrument, Instrumented};
use zebra_chain::{
block::{Block, BlockHeaderHash},
@ -14,37 +14,32 @@ use zebra_consensus::checkpoint;
use zebra_network::{self as zn, RetryLimit};
use zebra_state::{self as zs};
// XXX in the future, we may not be able to access the checkpoint module.
const FANOUT: usize = checkpoint::MAX_QUEUED_BLOCKS_PER_HEIGHT;
/// Controls how far ahead of the chain tip the syncer tries to download before
/// waiting for queued verifications to complete. Set to twice the maximum
/// checkpoint distance.
const LOOKAHEAD_LIMIT: usize = 2 * 2_000;
#[derive(Debug)]
pub struct Syncer<ZN, ZS, ZV>
where
ZN: Service<zn::Request>,
ZN: Service<zn::Request, Response = zn::Response, Error = Error> + Send + Clone + 'static,
ZN::Future: Send,
ZS: Service<zs::Request, Response = zs::Response, Error = Error> + Send + Clone + 'static,
ZS::Future: Send,
ZV: Service<Arc<Block>, Response = BlockHeaderHash, Error = Error> + Send + Clone + 'static,
ZV::Future: Send,
{
pub peer_set: ZN,
pub state: ZS,
pub verifier: ZV,
pub retry_peer_set: Retry<RetryLimit, ZN>,
pub prospective_tips: HashSet<BlockHeaderHash>,
pub block_requests: FuturesUnordered<ZN::Future>,
pub fanout: NumReq,
}
impl<ZN, ZS, ZC> Syncer<ZN, ZS, ZC>
where
ZN: Service<zn::Request> + Clone,
{
pub fn new(peer_set: ZN, state: ZS, verifier: ZC) -> Self {
let retry_peer_set = Retry::new(RetryLimit::new(3), peer_set.clone());
Self {
peer_set,
state,
verifier,
retry_peer_set,
block_requests: FuturesUnordered::new(),
// Limit the fanout to the number of chains that the
// CheckpointVerifier can handle
fanout: checkpoint::MAX_QUEUED_BLOCKS_PER_HEIGHT,
prospective_tips: HashSet::new(),
}
}
/// Used to perform extendtips requests, with no retry logic (failover is handled using fanout).
tip_network: ZN,
/// Used to download blocks, with retry logic.
block_network: Retry<RetryLimit, ZN>,
state: ZS,
verifier: ZV,
prospective_tips: HashSet<BlockHeaderHash>,
pending_blocks:
Pin<Box<FuturesUnordered<Instrumented<JoinHandle<Result<BlockHeaderHash, Error>>>>>>,
}
impl<ZN, ZS, ZV> Syncer<ZN, ZS, ZV>
@ -56,6 +51,18 @@ where
ZV: Service<Arc<Block>, Response = BlockHeaderHash, Error = Error> + Send + Clone + 'static,
ZV::Future: Send,
{
pub fn new(network: ZN, state: ZS, verifier: ZV) -> Self {
let retry_network = Retry::new(RetryLimit::new(3), network.clone());
Self {
tip_network: network,
block_network: retry_network,
state,
verifier,
prospective_tips: HashSet::new(),
pending_blocks: Box::pin(FuturesUnordered::new()),
}
}
#[instrument(skip(self))]
pub async fn sync(&mut self) -> Result<(), Report> {
loop {
@ -67,6 +74,35 @@ where
while !self.prospective_tips.is_empty() {
info!("extending prospective tips");
self.extend_tips().await?;
// Check whether we need to wait for existing block download tasks to finish
while self.pending_blocks.len() > LOOKAHEAD_LIMIT {
match self
.pending_blocks
.next()
.await
.expect("already checked there's at least one pending block task")
.expect("block download tasks should not panic")
{
Ok(hash) => tracing::debug!(?hash, "verified and committed block to state"),
// This is a non-transient error indicating either that
// we've repeatedly missed a block we need or that we've
// repeatedly missed a bad block suggested by a peer
// feeding us bad hashes.
//
// TODO(hdevalence): handle interruptions in the chain
// sync process. this should detect when we've stopped
// making progress (probably using a timeout), then
// continue the loop with a new invocation of
// obtain_tips(), which will restart block downloads.
// this requires correctly constructing a block locator
// (TODO below) and ensuring that the verifier handles
// multiple requests for verification of the same block
// hash by handling both requests or by discarding the
// earlier request in favor of the later one.
Err(e) => tracing::error!(?e, "potentially transient error"),
};
}
}
delay_for(Duration::from_secs(15)).await;
@ -84,25 +120,28 @@ where
//
// TODO(jlusby): get the block_locator from the state
let block_locator = vec![super::GENESIS];
let mut tip_futs = FuturesUnordered::new();
tracing::info!(?block_locator, "trying to obtain new chain tips");
// ObtainTips Step 2
//
// Make a FindBlocksByHash request to the network F times, where F is a
// fanout parameter, to get resp1, ..., respF
for _ in 0..self.fanout {
let req = self.peer_set.ready_and().await.map_err(|e| eyre!(e))?.call(
zn::Request::FindBlocks {
known_blocks: block_locator.clone(),
stop: None,
},
let mut requests = FuturesUnordered::new();
for _ in 0..FANOUT {
requests.push(
self.tip_network
.ready_and()
.await
.map_err(|e| eyre!(e))?
.call(zn::Request::FindBlocks {
known_blocks: block_locator.clone(),
stop: None,
}),
);
tip_futs.push(req);
}
let mut download_set = HashSet::new();
while let Some(res) = tip_futs.next().await {
while let Some(res) = requests.next().await {
match res.map_err::<Report, _>(|e| eyre!(e)) {
Ok(zn::Response::BlockHeaderHashes(hashes)) => {
if hashes.is_empty() {
@ -169,8 +208,9 @@ where
"added hashes to download set"
);
}
Ok(r) => tracing::info!("unexpected response {:?}", r),
Err(e) => tracing::info!("{:?}", e),
Ok(_) => unreachable!("network returned wrong response"),
// We ignore this error because we made multiple fanout requests.
Err(e) => tracing::debug!(?e),
}
}
@ -198,16 +238,20 @@ where
//
// Create a FindBlocksByHash request consisting of just the
// prospective tip. Send this request to the network F times
let mut tip_futs = FuturesUnordered::new();
for _ in 0..self.fanout {
tip_futs.push(self.peer_set.ready_and().await.map_err(|e| eyre!(e))?.call(
zn::Request::FindBlocks {
known_blocks: vec![tip],
stop: None,
},
));
let mut responses = FuturesUnordered::new();
for _ in 0..FANOUT {
responses.push(
self.tip_network
.ready_and()
.await
.map_err(|e| eyre!(e))?
.call(zn::Request::FindBlocks {
known_blocks: vec![tip],
stop: None,
}),
);
}
while let Some(res) = tip_futs.next().await {
while let Some(res) = responses.next().await {
match res.map_err::<Report, _>(|e| eyre!(e)) {
Ok(zn::Response::BlockHeaderHashes(mut hashes)) => {
// ExtendTips Step 3
@ -240,8 +284,9 @@ where
download_set.extend(hashes);
}
Ok(r) => tracing::info!("unexpected response {:?}", r),
Err(e) => tracing::info!("{:?}", e),
Ok(_) => unreachable!("network returned wrong response"),
// We ignore this error because we made multiple fanout requests.
Err(e) => tracing::debug!("{:?}", e),
}
}
}
@ -269,65 +314,31 @@ where
}
/// Queue downloads for each block that isn't currently known to our node
#[instrument(skip(self, hashes))]
async fn request_blocks(&mut self, hashes: Vec<BlockHeaderHash>) -> Result<(), Report> {
tracing::debug!(hashes.len = hashes.len(), "requesting blocks");
for chunk in hashes.chunks(10usize) {
let set = chunk.iter().cloned().collect();
let request = self
.retry_peer_set
.ready_and()
.await
.map_err(|e| eyre!(e))?
.call(zn::Request::BlocksByHash(set));
let verifier = self.verifier.clone();
let _ = tokio::spawn(
async move {
// XXX for some reason the tracing filter
// filter = 'info,[sync]=debug'
// does not pick this up, even though this future is instrumented
// with the current span below. However, fixing it immediately
// isn't critical because this code needs to be changed to propagate
// backpressure to the syncer.
tracing::debug!("test");
let result_fut = async move {
let mut handles = FuturesUnordered::new();
let resp = request.await?;
if let zn::Response::Blocks(blocks) = resp {
debug!(count = blocks.len(), "received blocks");
for block in blocks {
let mut verifier = verifier.clone();
let handle = tokio::spawn(async move {
verifier.ready_and().await?.call(block).await
});
handles.push(handle);
}
} else {
debug!(?resp, "unexpected response");
}
while let Some(res) = handles.next().await {
let _hash = res??;
}
Ok::<_, Error>(())
for hash in hashes.into_iter() {
let mut retry_peer_set = self.block_network.clone();
let mut verifier = self.verifier.clone();
let span = tracing::info_span!("block_fetch_verify", hash = ?hash);
self.pending_blocks.push(
tokio::spawn(async move {
let block = match retry_peer_set
.ready_and()
.await?
.call(zn::Request::BlocksByHash(iter::once(hash).collect()))
.await
{
Ok(zn::Response::Blocks(blocks)) => blocks
.into_iter()
.next()
.expect("successful response has the block in it"),
Ok(_) => unreachable!("wrong response to block request"),
Err(e) => return Err(e),
};
match result_fut.await {
Ok(()) => {}
// Block validation errors are unexpected, they could
// be a bug in our code.
//
// TODO: log request errors at info level
Err(e) => warn!("{:?}", e),
}
}
.instrument(tracing::Span::current()),
verifier.ready_and().await?.call(block).await
})
.instrument(span),
);
}
@ -345,4 +356,3 @@ pub fn block_locator_heights(tip_height: BlockHeight) -> impl Iterator<Item = Bl
}
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
type NumReq = usize;