diff --git a/zebra-consensus/src/chain.rs b/zebra-consensus/src/chain.rs index ca2050ff..937fc146 100644 --- a/zebra-consensus/src/chain.rs +++ b/zebra-consensus/src/chain.rs @@ -24,7 +24,7 @@ use futures::{FutureExt, TryFutureExt}; use thiserror::Error; use tokio::task::{spawn_blocking, JoinHandle}; use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt}; -use tracing::instrument; +use tracing::{instrument, Span}; use zebra_chain::{ block::{self, Block}, @@ -215,13 +215,16 @@ where // The parameter download thread must be launched before initializing any verifiers. // Otherwise, the download might happen on the startup thread. + let span = Span::current(); let groth16_download_handle = spawn_blocking(move || { - if !debug_skip_parameter_preload { - // The lazy static initializer does the download, if needed, - // and the file hash checks. - lazy_static::initialize(&crate::groth16::GROTH16_PARAMETERS); - tracing::info!("Groth16 pre-download and check task finished"); - } + span.in_scope(|| { + if !debug_skip_parameter_preload { + // The lazy static initializer does the download, if needed, + // and the file hash checks. + lazy_static::initialize(&crate::groth16::GROTH16_PARAMETERS); + tracing::info!("Groth16 pre-download and check task finished"); + } + }) }); // transaction verification diff --git a/zebra-network/src/address_book_updater.rs b/zebra-network/src/address_book_updater.rs index 49afe2db..04f4e613 100644 --- a/zebra-network/src/address_book_updater.rs +++ b/zebra-network/src/address_book_updater.rs @@ -7,6 +7,7 @@ use tokio::{ sync::{mpsc, watch}, task::JoinHandle, }; +use tracing::Span; use crate::{ address_book::AddressMetrics, meta_addr::MetaAddrChange, AddressBook, BoxError, Config, @@ -54,25 +55,28 @@ impl AddressBookUpdater { let address_book = Arc::new(std::sync::Mutex::new(address_book)); let worker_address_book = address_book.clone(); + let span = Span::current(); let worker = move || { - info!("starting the address book updater"); + span.in_scope(|| { + info!("starting the address book updater"); - while let Some(event) = worker_rx.blocking_recv() { - trace!(?event, "got address book change"); + while let Some(event) = worker_rx.blocking_recv() { + trace!(?event, "got address book change"); - // # Correctness - // - // Briefly hold the address book threaded mutex, to update the - // state for a single address. - worker_address_book - .lock() - .expect("mutex should be unpoisoned") - .update(event); - } + // # Correctness + // + // Briefly hold the address book threaded mutex, to update the + // state for a single address. + worker_address_book + .lock() + .expect("mutex should be unpoisoned") + .update(event); + } - let error = Err(AllAddressBookUpdaterSendersClosed.into()); - info!(?error, "stopping address book updater"); - error + let error = Err(AllAddressBookUpdaterSendersClosed.into()); + info!(?error, "stopping address book updater"); + error + }) }; // Correctness: spawn address book accesses on a blocking thread, diff --git a/zebra-state/src/service.rs b/zebra-state/src/service.rs index 397e87ba..60a8793c 100644 --- a/zebra-state/src/service.rs +++ b/zebra-state/src/service.rs @@ -682,7 +682,7 @@ impl Service for StateService { .checked_sub(new_len) .expect("prune does not add any utxo requests"); if prune_count > 0 { - tracing::info!( + tracing::debug!( ?old_len, ?new_len, ?prune_count, diff --git a/zebra-state/src/util.rs b/zebra-state/src/util.rs index bd721ed5..6ee848ac 100644 --- a/zebra-state/src/util.rs +++ b/zebra-state/src/util.rs @@ -18,7 +18,7 @@ pub fn block_locator_heights(tip_height: block::Height) -> Vec { .map(block::Height); let locators = locators.collect(); - tracing::info!( + tracing::debug!( ?tip_height, ?min_locator_height, ?locators, diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index 6666f03e..e4e420fe 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -57,6 +57,7 @@ use color_eyre::eyre::{eyre, Report}; use futures::FutureExt; use tokio::{pin, select, sync::oneshot}; use tower::{builder::ServiceBuilder, util::BoxService}; +use tracing_futures::Instrument; use crate::{ components::{ @@ -145,13 +146,16 @@ impl StartCmd { .send(setup_data) .map_err(|_| eyre!("could not send setup data to inbound service"))?; - let syncer_task_handle = tokio::spawn(syncer.sync()); + let syncer_task_handle = tokio::spawn(syncer.sync().in_current_span()); - let mut block_gossip_task_handle = tokio::spawn(sync::gossip_best_tip_block_hashes( - sync_status.clone(), - chain_tip_change.clone(), - peer_set.clone(), - )); + let mut block_gossip_task_handle = tokio::spawn( + sync::gossip_best_tip_block_hashes( + sync_status.clone(), + chain_tip_change.clone(), + peer_set.clone(), + ) + .in_current_span(), + ); let mempool_crawler_task_handle = mempool::Crawler::spawn( &config.mempool, @@ -163,10 +167,10 @@ impl StartCmd { let mempool_queue_checker_task_handle = mempool::QueueChecker::spawn(mempool); - let tx_gossip_task_handle = tokio::spawn(mempool::gossip_mempool_transaction_id( - mempool_transaction_receiver, - peer_set, - )); + let tx_gossip_task_handle = tokio::spawn( + mempool::gossip_mempool_transaction_id(mempool_transaction_receiver, peer_set) + .in_current_span(), + ); info!("spawned initial Zebra tasks"); diff --git a/zebrad/src/components/inbound/downloads.rs b/zebrad/src/components/inbound/downloads.rs index 5178c064..aae0736a 100644 --- a/zebrad/src/components/inbound/downloads.rs +++ b/zebrad/src/components/inbound/downloads.rs @@ -275,45 +275,49 @@ where }) .unwrap_or(block::Height(0)); - if let Some(block_height) = block.coinbase_height() { - if block_height > max_lookahead_height { - debug!( - ?hash, - ?block_height, - ?tip_height, - ?max_lookahead_height, - lookahead_limit = ?MAX_INBOUND_CONCURRENCY, - "gossiped block height too far ahead of the tip: dropped downloaded block" - ); - metrics::counter!("gossip.max.height.limit.dropped.block.count", 1); - - Err("gossiped block height too far ahead")?; - } else if block_height < min_accepted_height { - debug!( - ?hash, - ?block_height, - ?tip_height, - ?min_accepted_height, - behind_tip_limit = ?zs::MAX_BLOCK_REORG_HEIGHT, - "gossiped block height behind the finalized tip: dropped downloaded block" - ); - metrics::counter!("gossip.min.height.limit.dropped.block.count", 1); - - Err("gossiped block height behind the finalized tip")?; - } - } else { + let block_height = block.coinbase_height().ok_or_else(|| { debug!( ?hash, "gossiped block with no height: dropped downloaded block" ); metrics::counter!("gossip.no.height.dropped.block.count", 1); - Err("gossiped block with no height")?; + BoxError::from("gossiped block with no height") + })?; + + if block_height > max_lookahead_height { + debug!( + ?hash, + ?block_height, + ?tip_height, + ?max_lookahead_height, + lookahead_limit = ?MAX_INBOUND_CONCURRENCY, + "gossiped block height too far ahead of the tip: dropped downloaded block" + ); + metrics::counter!("gossip.max.height.limit.dropped.block.count", 1); + + Err("gossiped block height too far ahead")?; + } else if block_height < min_accepted_height { + debug!( + ?hash, + ?block_height, + ?tip_height, + ?min_accepted_height, + behind_tip_limit = ?zs::MAX_BLOCK_REORG_HEIGHT, + "gossiped block height behind the finalized tip: dropped downloaded block" + ); + metrics::counter!("gossip.min.height.limit.dropped.block.count", 1); + + Err("gossiped block height behind the finalized tip")?; } - verifier.oneshot(block).await + verifier + .oneshot(block) + .await + .map(|hash| (hash, block_height)) } - .map_ok(|hash| { + .map_ok(|(hash, height)| { + info!(?height, "downloaded and verified gossiped block"); metrics::counter!("gossip.verified.block.count", 1); hash }) diff --git a/zebrad/src/components/mempool/crawler.rs b/zebrad/src/components/mempool/crawler.rs index ced4878c..c7b03370 100644 --- a/zebrad/src/components/mempool/crawler.rs +++ b/zebrad/src/components/mempool/crawler.rs @@ -52,6 +52,7 @@ use std::{collections::HashSet, time::Duration}; use futures::{future, pin_mut, stream::FuturesUnordered, StreamExt}; use tokio::{sync::watch, task::JoinHandle, time::sleep}; use tower::{timeout::Timeout, BoxError, Service, ServiceExt}; +use tracing_futures::Instrument; use zebra_chain::{block::Height, transaction::UnminedTxId}; use zebra_network as zn; @@ -129,7 +130,7 @@ where debug_enable_at_height: config.debug_enable_at_height.map(Height), }; - tokio::spawn(crawler.run()) + tokio::spawn(crawler.run().in_current_span()) } /// Waits until the mempool crawler is enabled by a debug config option. diff --git a/zebrad/src/components/mempool/queue_checker.rs b/zebrad/src/components/mempool/queue_checker.rs index b02a2563..247a86d0 100644 --- a/zebrad/src/components/mempool/queue_checker.rs +++ b/zebrad/src/components/mempool/queue_checker.rs @@ -15,6 +15,7 @@ use std::time::Duration; use tokio::{task::JoinHandle, time::sleep}; use tower::{BoxError, Service, ServiceExt}; +use tracing_futures::Instrument; use crate::components::mempool; @@ -45,7 +46,7 @@ where pub fn spawn(mempool: Mempool) -> JoinHandle> { let queue_checker = QueueChecker { mempool }; - tokio::spawn(queue_checker.run()) + tokio::spawn(queue_checker.run().in_current_span()) } /// Periodically check if the mempool has newly verified transactions. diff --git a/zebrad/src/components/sync.rs b/zebrad/src/components/sync.rs index 7208776b..cfbad120 100644 --- a/zebrad/src/components/sync.rs +++ b/zebrad/src/components/sync.rs @@ -236,6 +236,9 @@ where /// The cached block chain state. state: ZS, + /// Allows efficient access to the best tip of the blockchain. + latest_chain_tip: ZSTip, + // Internal sync state /// The tips that the syncer is currently following. prospective_tips: HashSet, @@ -331,10 +334,11 @@ where downloads: Box::pin(Downloads::new( block_network, verifier, - latest_chain_tip, + latest_chain_tip.clone(), config.sync.lookahead_limit, )), state, + latest_chain_tip, prospective_tips: HashSet::new(), recent_syncs, }; @@ -354,7 +358,11 @@ where 'sync: loop { if started_once { - info!(timeout = ?SYNC_RESTART_DELAY, "waiting to restart sync"); + info!( + timeout = ?SYNC_RESTART_DELAY, + state_tip = ?self.latest_chain_tip.best_tip_height(), + "waiting to restart sync" + ); self.prospective_tips = HashSet::new(); self.downloads.cancel_all(); self.update_metrics(); @@ -363,7 +371,10 @@ where started_once = true; } - info!("starting sync, obtaining new tips"); + info!( + state_tip = ?self.latest_chain_tip.best_tip_height(), + "starting sync, obtaining new tips" + ); if let Err(e) = self.obtain_tips().await { warn!(?e, "error obtaining tips"); continue 'sync; @@ -403,6 +414,7 @@ where tips.len = self.prospective_tips.len(), in_flight = self.downloads.in_flight(), lookahead_limit = self.lookahead_limit, + state_tip = ?self.latest_chain_tip.best_tip_height(), "waiting for pending blocks", ); @@ -425,6 +437,7 @@ where tips.len = self.prospective_tips.len(), in_flight = self.downloads.in_flight(), lookahead_limit = self.lookahead_limit, + state_tip = ?self.latest_chain_tip.best_tip_height(), "extending tips", );