sync: touch up tracing output.

This commit is contained in:
Henry de Valence 2020-07-08 13:33:39 -07:00
parent d627889104
commit ff4e722cd7
2 changed files with 87 additions and 46 deletions

View File

@ -60,7 +60,7 @@ impl StartCmd {
let mut syncer = sync::Syncer::new(peer_set, state, verifier); let mut syncer = sync::Syncer::new(peer_set, state, verifier);
syncer.run().await syncer.sync().await
} }
} }

View File

@ -1,8 +1,11 @@
use std::{collections::HashSet, iter, sync::Arc, time::Duration};
use color_eyre::eyre::{eyre, Report}; use color_eyre::eyre::{eyre, Report};
use futures::stream::{FuturesUnordered, StreamExt}; use futures::stream::{FuturesUnordered, StreamExt};
use std::{collections::HashSet, iter, sync::Arc, time::Duration};
use tokio::time::delay_for; use tokio::time::delay_for;
use tower::{retry::Retry, Service, ServiceExt}; use tower::{retry::Retry, Service, ServiceExt};
use tracing_futures::Instrument;
use zebra_chain::{ use zebra_chain::{
block::{Block, BlockHeaderHash}, block::{Block, BlockHeaderHash},
types::BlockHeight, types::BlockHeight,
@ -50,9 +53,9 @@ where
ZV: Service<Arc<Block>, Response = BlockHeaderHash, Error = Error> + Send + Clone + 'static, ZV: Service<Arc<Block>, Response = BlockHeaderHash, Error = Error> + Send + Clone + 'static,
ZV::Future: Send, ZV::Future: Send,
{ {
pub async fn run(&mut self) -> Result<(), Report> { #[instrument(skip(self))]
pub async fn sync(&mut self) -> Result<(), Report> {
loop { loop {
info!("populating prospective tips list");
self.obtain_tips().await?; self.obtain_tips().await?;
// ObtainTips Step 6 // ObtainTips Step 6
@ -69,6 +72,7 @@ where
/// Given a block_locator list fan out request for subsequent hashes to /// Given a block_locator list fan out request for subsequent hashes to
/// multiple peers /// multiple peers
#[instrument(skip(self))]
async fn obtain_tips(&mut self) -> Result<(), Report> { async fn obtain_tips(&mut self) -> Result<(), Report> {
// ObtainTips Step 1 // ObtainTips Step 1
// //
@ -78,6 +82,7 @@ where
// TODO(jlusby): get the block_locator from the state // TODO(jlusby): get the block_locator from the state
let block_locator = vec![super::GENESIS]; let block_locator = vec![super::GENESIS];
let mut tip_futs = FuturesUnordered::new(); let mut tip_futs = FuturesUnordered::new();
tracing::info!(?block_locator, "trying to obtain new chain tips");
// ObtainTips Step 2 // ObtainTips Step 2
// //
@ -100,6 +105,8 @@ where
if hashes.is_empty() { if hashes.is_empty() {
tracing::debug!("skipping empty response"); tracing::debug!("skipping empty response");
continue; continue;
} else {
tracing::debug!(hashes.len = hashes.len(), "processing response");
} }
// ObtainTips Step 3 // ObtainTips Step 3
@ -123,21 +130,41 @@ where
break; break;
} }
} }
tracing::debug!(
first_unknown,
"found index of first unknown hash in response"
);
if first_unknown == hashes.len() { if first_unknown == hashes.len() {
// XXX until we fix the TODO above to construct the locator correctly,
// we might hit this case, but it will be unexpected afterwards.
tracing::debug!("no new hashes, even though we gave our tip?"); tracing::debug!("no new hashes, even though we gave our tip?");
continue; continue;
} }
let unknown_hashes = &hashes[first_unknown..];
download_set.extend(unknown_hashes);
// ObtainTips Step 4 let unknown_hashes = &hashes[first_unknown..];
//
// Combine the last elements of each list into a set; this
// is the set of prospective tips.
let new_tip = *unknown_hashes let new_tip = *unknown_hashes
.last() .last()
.expect("already checked first_unknown < hashes.len()"); .expect("already checked first_unknown < hashes.len()");
let _ = self.prospective_tips.insert(new_tip);
// ObtainTips Step 4:
// Combine the last elements of each list into a set; this is the
// set of prospective tips.
if !download_set.contains(&new_tip) {
tracing::debug!(?new_tip, "adding new prospective tip");
self.prospective_tips.insert(new_tip);
} else {
tracing::debug!(?new_tip, "discarding tip already queued for download");
}
let prev_download_len = download_set.len();
download_set.extend(unknown_hashes);
let new_download_len = download_set.len();
tracing::debug!(
prev_download_len,
new_download_len,
new_hashes = new_download_len - prev_download_len,
"added hashes to download set"
);
} }
Ok(r) => tracing::error!("unexpected response {:?}", r), Ok(r) => tracing::error!("unexpected response {:?}", r),
Err(e) => tracing::error!("{:?}", e), Err(e) => tracing::error!("{:?}", e),
@ -154,11 +181,13 @@ where
Ok(()) Ok(())
} }
#[instrument(skip(self))]
async fn extend_tips(&mut self) -> Result<(), Report> { async fn extend_tips(&mut self) -> Result<(), Report> {
// Extend Tips 1 // Extend Tips 1
// //
// remove all prospective tips and iterate over them individually // remove all prospective tips and iterate over them individually
let tips = std::mem::take(&mut self.prospective_tips); let tips = std::mem::take(&mut self.prospective_tips);
tracing::debug!(?tips, "extending tip set");
let mut download_set = HashSet::new(); let mut download_set = HashSet::new();
for tip in tips { for tip in tips {
@ -166,17 +195,16 @@ where
// //
// Create a FindBlocksByHash request consisting of just the // Create a FindBlocksByHash request consisting of just the
// prospective tip. Send this request to the network F times // prospective tip. Send this request to the network F times
let mut tip_futs = FuturesUnordered::new();
for _ in 0..self.fanout { for _ in 0..self.fanout {
let res = self tip_futs.push(self.peer_set.ready_and().await.map_err(|e| eyre!(e))?.call(
.peer_set zn::Request::FindBlocks {
.ready_and()
.await
.map_err(|e| eyre!(e))?
.call(zn::Request::FindBlocks {
known_blocks: vec![tip], known_blocks: vec![tip],
stop: None, stop: None,
}) },
.await; ));
}
while let Some(res) = tip_futs.next().await {
match res.map_err::<Report, _>(|e| eyre!(e)) { match res.map_err::<Report, _>(|e| eyre!(e)) {
Ok(zn::Response::BlockHeaderHashes(mut hashes)) => { Ok(zn::Response::BlockHeaderHashes(mut hashes)) => {
// ExtendTips Step 3 // ExtendTips Step 3
@ -188,7 +216,7 @@ where
// TODO(jlusby): reject both main and test net genesis blocks // TODO(jlusby): reject both main and test net genesis blocks
match hashes.first() { match hashes.first() {
Some(&super::GENESIS) => { Some(&super::GENESIS) => {
tracing::debug!("skipping response that does not extend the tip"); tracing::debug!("skipping response, peer could not extend the tip");
continue; continue;
} }
None => { None => {
@ -204,6 +232,7 @@ where
// //
// Combine the last elements of the remaining responses into // Combine the last elements of the remaining responses into
// a set, and add this set to the set of prospective tips. // a set, and add this set to the set of prospective tips.
tracing::debug!(?new_tip, hashes.len = ?hashes.len());
let _ = self.prospective_tips.insert(new_tip); let _ = self.prospective_tips.insert(new_tip);
download_set.extend(hashes); download_set.extend(hashes);
@ -237,7 +266,9 @@ where
} }
/// Queue downloads for each block that isn't currently known to our node /// Queue downloads for each block that isn't currently known to our node
#[instrument(skip(self, hashes))]
async fn request_blocks(&mut self, hashes: Vec<BlockHeaderHash>) -> Result<(), Report> { async fn request_blocks(&mut self, hashes: Vec<BlockHeaderHash>) -> Result<(), Report> {
tracing::debug!(hashes.len = hashes.len(), "requesting blocks");
for chunk in hashes.chunks(10usize) { for chunk in hashes.chunks(10usize) {
let set = chunk.iter().cloned().collect(); let set = chunk.iter().cloned().collect();
@ -250,37 +281,47 @@ where
let verifier = self.verifier.clone(); let verifier = self.verifier.clone();
let _ = tokio::spawn(async move { let _ = tokio::spawn(
let result_fut = async move { async move {
let mut handles = FuturesUnordered::new(); // XXX for some reason the tracing filter
let resp = request.await?; // filter = 'info,[sync]=debug'
// does not pick this up, even though this future is instrumented
// with the current span below. However, fixing it immediately
// isn't critical because this code needs to be changed to propagate
// backpressure to the syncer.
tracing::debug!("test");
let result_fut = async move {
let mut handles = FuturesUnordered::new();
let resp = request.await?;
if let zn::Response::Blocks(blocks) = resp { if let zn::Response::Blocks(blocks) = resp {
debug!(count = blocks.len(), "received blocks"); debug!(count = blocks.len(), "received blocks");
for block in blocks { for block in blocks {
let mut verifier = verifier.clone(); let mut verifier = verifier.clone();
let handle = tokio::spawn(async move { let handle = tokio::spawn(async move {
verifier.ready_and().await?.call(block).await verifier.ready_and().await?.call(block).await
}); });
handles.push(handle); handles.push(handle);
}
} else {
debug!(?resp, "unexpected response");
} }
} else {
debug!(?resp, "unexpected response"); while let Some(res) = handles.next().await {
let _hash = res??;
}
Ok::<_, Error>(())
};
match result_fut.await {
Ok(()) => {}
Err(e) => error!("{:?}", e),
} }
while let Some(res) = handles.next().await {
let _hash = res??;
}
Ok::<_, Error>(())
};
match result_fut.await {
Ok(()) => {}
Err(e) => error!("{:?}", e),
} }
}); .instrument(tracing::Span::current()),
);
} }
Ok(()) Ok(())