diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index 181081d2..f7870d13 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -60,7 +60,7 @@ impl StartCmd { let mut syncer = sync::Syncer::new(peer_set, state, verifier); - syncer.run().await + syncer.sync().await } } diff --git a/zebrad/src/commands/start/sync.rs b/zebrad/src/commands/start/sync.rs index 8a770973..447aa543 100644 --- a/zebrad/src/commands/start/sync.rs +++ b/zebrad/src/commands/start/sync.rs @@ -1,8 +1,11 @@ +use std::{collections::HashSet, iter, sync::Arc, time::Duration}; + use color_eyre::eyre::{eyre, Report}; use futures::stream::{FuturesUnordered, StreamExt}; -use std::{collections::HashSet, iter, sync::Arc, time::Duration}; use tokio::time::delay_for; use tower::{retry::Retry, Service, ServiceExt}; +use tracing_futures::Instrument; + use zebra_chain::{ block::{Block, BlockHeaderHash}, types::BlockHeight, @@ -50,9 +53,9 @@ where ZV: Service, Response = BlockHeaderHash, Error = Error> + Send + Clone + 'static, ZV::Future: Send, { - pub async fn run(&mut self) -> Result<(), Report> { + #[instrument(skip(self))] + pub async fn sync(&mut self) -> Result<(), Report> { loop { - info!("populating prospective tips list"); self.obtain_tips().await?; // ObtainTips Step 6 @@ -69,6 +72,7 @@ where /// Given a block_locator list fan out request for subsequent hashes to /// multiple peers + #[instrument(skip(self))] async fn obtain_tips(&mut self) -> Result<(), Report> { // ObtainTips Step 1 // @@ -78,6 +82,7 @@ where // TODO(jlusby): get the block_locator from the state let block_locator = vec![super::GENESIS]; let mut tip_futs = FuturesUnordered::new(); + tracing::info!(?block_locator, "trying to obtain new chain tips"); // ObtainTips Step 2 // @@ -100,6 +105,8 @@ where if hashes.is_empty() { tracing::debug!("skipping empty response"); continue; + } else { + tracing::debug!(hashes.len = hashes.len(), "processing response"); } // ObtainTips Step 3 @@ -123,21 +130,41 @@ where break; } } + tracing::debug!( + first_unknown, + "found index of first unknown hash in response" + ); if first_unknown == hashes.len() { + // XXX until we fix the TODO above to construct the locator correctly, + // we might hit this case, but it will be unexpected afterwards. tracing::debug!("no new hashes, even though we gave our tip?"); continue; } - let unknown_hashes = &hashes[first_unknown..]; - download_set.extend(unknown_hashes); - // ObtainTips Step 4 - // - // Combine the last elements of each list into a set; this - // is the set of prospective tips. + let unknown_hashes = &hashes[first_unknown..]; let new_tip = *unknown_hashes .last() .expect("already checked first_unknown < hashes.len()"); - let _ = self.prospective_tips.insert(new_tip); + + // ObtainTips Step 4: + // Combine the last elements of each list into a set; this is the + // set of prospective tips. + if !download_set.contains(&new_tip) { + tracing::debug!(?new_tip, "adding new prospective tip"); + self.prospective_tips.insert(new_tip); + } else { + tracing::debug!(?new_tip, "discarding tip already queued for download"); + } + + let prev_download_len = download_set.len(); + download_set.extend(unknown_hashes); + let new_download_len = download_set.len(); + tracing::debug!( + prev_download_len, + new_download_len, + new_hashes = new_download_len - prev_download_len, + "added hashes to download set" + ); } Ok(r) => tracing::error!("unexpected response {:?}", r), Err(e) => tracing::error!("{:?}", e), @@ -154,11 +181,13 @@ where Ok(()) } + #[instrument(skip(self))] async fn extend_tips(&mut self) -> Result<(), Report> { // Extend Tips 1 // // remove all prospective tips and iterate over them individually let tips = std::mem::take(&mut self.prospective_tips); + tracing::debug!(?tips, "extending tip set"); let mut download_set = HashSet::new(); for tip in tips { @@ -166,17 +195,16 @@ where // // Create a FindBlocksByHash request consisting of just the // prospective tip. Send this request to the network F times + let mut tip_futs = FuturesUnordered::new(); for _ in 0..self.fanout { - let res = self - .peer_set - .ready_and() - .await - .map_err(|e| eyre!(e))? - .call(zn::Request::FindBlocks { + tip_futs.push(self.peer_set.ready_and().await.map_err(|e| eyre!(e))?.call( + zn::Request::FindBlocks { known_blocks: vec![tip], stop: None, - }) - .await; + }, + )); + } + while let Some(res) = tip_futs.next().await { match res.map_err::(|e| eyre!(e)) { Ok(zn::Response::BlockHeaderHashes(mut hashes)) => { // ExtendTips Step 3 @@ -188,7 +216,7 @@ where // TODO(jlusby): reject both main and test net genesis blocks match hashes.first() { Some(&super::GENESIS) => { - tracing::debug!("skipping response that does not extend the tip"); + tracing::debug!("skipping response, peer could not extend the tip"); continue; } None => { @@ -204,6 +232,7 @@ where // // Combine the last elements of the remaining responses into // a set, and add this set to the set of prospective tips. + tracing::debug!(?new_tip, hashes.len = ?hashes.len()); let _ = self.prospective_tips.insert(new_tip); download_set.extend(hashes); @@ -237,7 +266,9 @@ where } /// Queue downloads for each block that isn't currently known to our node + #[instrument(skip(self, hashes))] async fn request_blocks(&mut self, hashes: Vec) -> Result<(), Report> { + tracing::debug!(hashes.len = hashes.len(), "requesting blocks"); for chunk in hashes.chunks(10usize) { let set = chunk.iter().cloned().collect(); @@ -250,37 +281,47 @@ where let verifier = self.verifier.clone(); - let _ = tokio::spawn(async move { - let result_fut = async move { - let mut handles = FuturesUnordered::new(); - let resp = request.await?; + let _ = tokio::spawn( + async move { + // XXX for some reason the tracing filter + // filter = 'info,[sync]=debug' + // does not pick this up, even though this future is instrumented + // with the current span below. However, fixing it immediately + // isn't critical because this code needs to be changed to propagate + // backpressure to the syncer. + tracing::debug!("test"); + let result_fut = async move { + let mut handles = FuturesUnordered::new(); + let resp = request.await?; - if let zn::Response::Blocks(blocks) = resp { - debug!(count = blocks.len(), "received blocks"); + if let zn::Response::Blocks(blocks) = resp { + debug!(count = blocks.len(), "received blocks"); - for block in blocks { - let mut verifier = verifier.clone(); - let handle = tokio::spawn(async move { - verifier.ready_and().await?.call(block).await - }); - handles.push(handle); + for block in blocks { + let mut verifier = verifier.clone(); + let handle = tokio::spawn(async move { + verifier.ready_and().await?.call(block).await + }); + handles.push(handle); + } + } else { + debug!(?resp, "unexpected response"); } - } else { - debug!(?resp, "unexpected response"); + + while let Some(res) = handles.next().await { + let _hash = res??; + } + + Ok::<_, Error>(()) + }; + + match result_fut.await { + Ok(()) => {} + Err(e) => error!("{:?}", e), } - - while let Some(res) = handles.next().await { - let _hash = res??; - } - - Ok::<_, Error>(()) - }; - - match result_fut.await { - Ok(()) => {} - Err(e) => error!("{:?}", e), } - }); + .instrument(tracing::Span::current()), + ); } Ok(())