From 83b4e6f97516da555aad9756dfe0f7c16e4e2943 Mon Sep 17 00:00:00 2001 From: teor Date: Wed, 15 Jun 2022 16:43:20 +1000 Subject: [PATCH] feat(diagnostics): Add tokio-console support to zebrad (#4519) * Always activate tokio/tracing feature And always build tests with all tokio features. * Refactor tracing-subscriber init to simplify it * Add the tokio-console feature and dependencies * Add optional tokio-console support, and log the installed tracing layers at info level Uses a tracing Registry for tokio-console, and a fmt::Subscriber otherwise. * Add some TODOs based on tracing-subscriber features * Fix up some spans * Add a TODO for fixing a log filter bug in tokio-console mode --- .cargo/config.toml | 2 + Cargo.lock | 41 +++++- tower-batch/Cargo.toml | 4 +- tower-fallback/Cargo.toml | 3 +- zebra-chain/Cargo.toml | 6 +- zebra-consensus/Cargo.toml | 3 +- zebra-network/Cargo.toml | 2 +- zebra-network/src/peer_set/initialize.rs | 33 +++-- zebra-rpc/Cargo.toml | 3 +- zebra-state/Cargo.toml | 5 +- zebra-test/Cargo.toml | 2 +- zebrad/Cargo.toml | 19 ++- zebrad/src/components/tracing/component.rs | 144 ++++++++++++++++----- 13 files changed, 211 insertions(+), 56 deletions(-) diff --git a/.cargo/config.toml b/.cargo/config.toml index 4e644a32..6d9e7e56 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,4 +1,6 @@ +# Zebra cargo configuration +# Flags that apply to all Zebra crates and configurations [target.'cfg(all())'] rustflags = [ # Zebra standard lints for Rust 1.58+ diff --git a/Cargo.lock b/Cargo.lock index 961c5d17..dcd650d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -854,6 +854,42 @@ dependencies = [ "winapi", ] +[[package]] +name = "console-api" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06c5fd425783d81668ed68ec98408a80498fb4ae2fd607797539e1a9dfa3618f" +dependencies = [ + "prost", + "prost-types", + "tonic", + "tracing-core", +] + +[[package]] +name = "console-subscriber" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31432bc31ff8883bf6a693a79371862f73087822470c82d6a1ec778781ee3978" +dependencies = [ + "console-api", + "crossbeam-channel", + "crossbeam-utils", + "futures", + "hdrhistogram", + "humantime", + "prost-types", + "serde", + "serde_json", + "thread_local", + "tokio", + "tokio-stream", + "tonic", + "tracing", + "tracing-core", + "tracing-subscriber 0.3.11", +] + [[package]] name = "const-oid" version = "0.6.2" @@ -5428,9 +5464,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.22" +version = "0.1.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03cfcb51380632a72d3111cb8d3447a8d908e577d31beeac006f836383d29a23" +checksum = "f54c8ca710e81886d498c2fd3331b56c93aa248d49de2222ad2742247c60072f" dependencies = [ "lazy_static", "valuable", @@ -6403,6 +6439,7 @@ dependencies = [ "atty", "chrono", "color-eyre", + "console-subscriber", "dirs", "futures", "gumdrop", diff --git a/tower-batch/Cargo.toml b/tower-batch/Cargo.toml index 88202f82..550b88b0 100644 --- a/tower-batch/Cargo.toml +++ b/tower-batch/Cargo.toml @@ -18,9 +18,11 @@ tracing-futures = "0.2.5" color-eyre = "0.6.1" ed25519-zebra = "3.0.0" rand = { version = "0.8.5", package = "rand" } -tokio = { version = "1.19.2", features = ["full"] } + +tokio = { version = "1.19.2", features = ["full", "tracing", "test-util"] } tokio-test = "0.4.2" tower-fallback = { path = "../tower-fallback/" } tower-test = "0.4.0" tracing = "0.1.31" + zebra-test = { path = "../zebra-test/" } diff --git a/tower-fallback/Cargo.toml b/tower-fallback/Cargo.toml index 34ba2646..0a617cc2 100644 --- a/tower-fallback/Cargo.toml +++ b/tower-fallback/Cargo.toml @@ -12,5 +12,6 @@ futures-core = "0.3.21" tracing = "0.1.31" [dev-dependencies] +tokio = { version = "1.19.2", features = ["full", "tracing", "test-util"] } + zebra-test = { path = "../zebra-test/" } -tokio = { version = "1.19.2", features = ["full"] } diff --git a/zebra-chain/Cargo.toml b/zebra-chain/Cargo.toml index c44ed0bb..6be5f905 100644 --- a/zebra-chain/Cargo.toml +++ b/zebra-chain/Cargo.toml @@ -62,7 +62,8 @@ proptest-derive = { version = "0.3.0", optional = true } rand = { version = "0.8.5", optional = true, package = "rand" } rand_chacha = { version = "0.3.1", optional = true } -tokio = { version = "1.19.2", optional = true } + +tokio = { version = "1.19.2", features = ["tracing"], optional = true } # ZF deps ed25519-zebra = "3.0.0" @@ -79,10 +80,11 @@ tracing = "0.1.31" proptest = "0.10.1" proptest-derive = "0.3.0" + rand = { version = "0.8.5", package = "rand" } rand_chacha = "0.3.1" -tokio = "1.19.2" +tokio = { version = "1.19.2", features = ["full", "tracing", "test-util"] } zebra-test = { path = "../zebra-test/" } diff --git a/zebra-consensus/Cargo.toml b/zebra-consensus/Cargo.toml index 894a4ad1..eadbb5ff 100644 --- a/zebra-consensus/Cargo.toml +++ b/zebra-consensus/Cargo.toml @@ -57,7 +57,8 @@ proptest = "0.10.1" proptest-derive = "0.3.0" rand07 = { package = "rand", version = "0.7" } spandoc = "0.2.2" -tokio = { version = "1.19.2", features = ["full"] } + +tokio = { version = "1.19.2", features = ["full", "tracing", "test-util"] } tracing-error = "0.2.0" tracing-subscriber = "0.3.11" diff --git a/zebra-network/Cargo.toml b/zebra-network/Cargo.toml index a55e18b8..a24abbbe 100644 --- a/zebra-network/Cargo.toml +++ b/zebra-network/Cargo.toml @@ -53,7 +53,7 @@ proptest = "0.10.1" proptest-derive = "0.3.0" static_assertions = "1.1.0" -tokio = { version = "1.19.2", features = ["test-util"] } +tokio = { version = "1.19.2", features = ["full", "tracing", "test-util"] } toml = "0.5.9" zebra-chain = { path = "../zebra-chain", features = ["proptest-impl"] } diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index 66fca1fb..4de951bc 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -25,7 +25,6 @@ use tokio_stream::wrappers::IntervalStream; use tower::{ buffer::Buffer, discover::Change, layer::Layer, util::BoxService, Service, ServiceExt, }; -use tracing::Span; use tracing_futures::Instrument; use zebra_chain::{chain_tip::ChainTip, parameters::Network}; @@ -179,7 +178,7 @@ where listen_handshaker, peerset_tx.clone(), ); - let listen_guard = tokio::spawn(listen_fut.instrument(Span::current())); + let listen_guard = tokio::spawn(listen_fut.in_current_span()); // 2. Initial peers, specified in the config. let initial_peers_fut = add_initial_peers( @@ -188,7 +187,7 @@ where peerset_tx.clone(), address_book_updater, ); - let initial_peers_join = tokio::spawn(initial_peers_fut.instrument(Span::current())); + let initial_peers_join = tokio::spawn(initial_peers_fut.in_current_span()); // 3. Outgoing peers we connect to in response to load. let mut candidates = CandidateSet::new(address_book.clone(), peer_set.clone()); @@ -228,7 +227,7 @@ where peerset_tx, active_outbound_connections, ); - let crawl_guard = tokio::spawn(crawl_fut.instrument(Span::current())); + let crawl_guard = tokio::spawn(crawl_fut.in_current_span()); handle_tx .send(vec![listen_guard, crawl_guard, address_book_updater_guard]) @@ -646,15 +645,20 @@ enum CrawlerAction { /// /// Uses `active_outbound_connections` to limit the number of active outbound connections /// across both the initial peers and crawler. The limit is based on `config`. -#[instrument(skip( - config, - demand_tx, - demand_rx, - candidates, - outbound_connector, - peerset_tx, - active_outbound_connections, -))] +#[instrument( + skip( + config, + demand_tx, + demand_rx, + candidates, + outbound_connector, + peerset_tx, + active_outbound_connections, + ), + fields( + new_peer_interval = ?config.crawl_new_peer_interval, + ) +)] async fn crawl_and_dial( config: Config, mut demand_tx: futures::channel::mpsc::Sender, @@ -761,7 +765,8 @@ where panic!("panic during handshaking with {:?}: {:?} ", candidate, e); } }) - .instrument(Span::current()); + .in_current_span(); + handshakes.push(Box::pin(hs_join)); } DemandCrawl => { diff --git a/zebra-rpc/Cargo.toml b/zebra-rpc/Cargo.toml index 13a15673..e46c4210 100644 --- a/zebra-rpc/Cargo.toml +++ b/zebra-rpc/Cargo.toml @@ -48,7 +48,8 @@ proptest = "0.10.1" proptest-derive = "0.3.0" serde_json = "1.0.81" thiserror = "1.0.31" -tokio = { version = "1.19.2", features = ["full", "test-util"] } + +tokio = { version = "1.19.2", features = ["full", "tracing", "test-util"] } zebra-chain = { path = "../zebra-chain", features = ["proptest-impl"] } zebra-state = { path = "../zebra-state", features = ["proptest-impl"] } diff --git a/zebra-state/Cargo.toml b/zebra-state/Cargo.toml index b136baab..504cdffd 100644 --- a/zebra-state/Cargo.toml +++ b/zebra-state/Cargo.toml @@ -27,7 +27,8 @@ rocksdb = { version = "0.18.0", default_features = false, features = ["lz4"] } serde = { version = "1.0.137", features = ["serde_derive"] } tempfile = "3.3.0" thiserror = "1.0.31" -tokio = { version = "1.19.2", features = ["sync"] } + +tokio = { version = "1.19.2", features = ["sync", "tracing"] } tower = { version = "0.4.12", features = ["buffer", "util"] } tracing = "0.1.31" @@ -48,7 +49,7 @@ proptest-derive = "0.3.0" halo2 = { package = "halo2_proofs", version = "0.1.0" } jubjub = "0.9.0" -tokio = { version = "1.19.2", features = ["full"] } +tokio = { version = "1.19.2", features = ["full", "tracing", "test-util"] } zebra-chain = { path = "../zebra-chain", features = ["proptest-impl"] } zebra-test = { path = "../zebra-test/" } diff --git a/zebra-test/Cargo.toml b/zebra-test/Cargo.toml index ca6bb2df..0041a980 100644 --- a/zebra-test/Cargo.toml +++ b/zebra-test/Cargo.toml @@ -17,7 +17,7 @@ once_cell = "1.12.0" rand = { version = "0.8.5", package = "rand" } regex = "1.5.6" -tokio = { version = "1.19.2", features = ["full"] } +tokio = { version = "1.19.2", features = ["full", "tracing", "test-util"] } tower = { version = "0.4.12", features = ["util"] } futures = "0.3.21" diff --git a/zebrad/Cargo.toml b/zebrad/Cargo.toml index fff90f60..1c4c3a29 100644 --- a/zebrad/Cargo.toml +++ b/zebrad/Cargo.toml @@ -42,6 +42,19 @@ proptest-impl = ["proptest", "proptest-derive", "zebra-chain/proptest-impl", "ze # The gRPC tests also need an installed lightwalletd binary lightwalletd-grpc-tests = ["tonic-build"] +# tokio-console support +# +# To activate this feature, run: +# ```sh +# RUSTFLAGS="--cfg tokio_unstable" cargo build --no-default-features --features="tokio-console" --bin zebrad +# ``` +# +# The console-subscriber is incompatible with the tracing/max_level_* features. +# +# For more details, see: +# https://github.com/tokio-rs/console/blob/main/console-subscriber/README.md#enabling-tokio-instrumentation +tokio-console = ["console-subscriber"] + # TODO: replace with environmental variables that skip the tests when not set (part of #2995) test_sync_to_mandatory_checkpoint_mainnet = [] test_sync_to_mandatory_checkpoint_testnet = [] @@ -105,6 +118,9 @@ log = "0.4.17" proptest = { version = "0.10.1", optional = true } proptest-derive = { version = "0.3.0", optional = true } +# test feature tokio-console +console-subscriber = { version = "0.1.6", optional = true } + [build-dependencies] vergen = { version = "7.2.1", default-features = false, features = ["cargo", "git"] } @@ -121,7 +137,8 @@ semver = "1.0.10" # zebra-rpc needs the preserve_order feature, it also makes test results more stable serde_json = { version = "1.0.81", features = ["preserve_order"] } tempfile = "3.3.0" -tokio = { version = "1.19.2", features = ["full", "test-util"] } + +tokio = { version = "1.19.2", features = ["full", "tracing", "test-util"] } tokio-stream = "0.1.9" # test feature lightwalletd-grpc-tests diff --git a/zebrad/src/components/tracing/component.rs b/zebrad/src/components/tracing/component.rs index eff9492e..fd23b3cd 100644 --- a/zebrad/src/components/tracing/component.rs +++ b/zebrad/src/components/tracing/component.rs @@ -4,7 +4,6 @@ use abscissa_core::{Component, FrameworkError, FrameworkErrorKind, Shutdown}; use tracing_error::ErrorLayer; use tracing_subscriber::{ fmt::Formatter, layer::SubscriberExt, reload::Handle, util::SubscriberInitExt, EnvFilter, - FmtSubscriber, }; use crate::{application::app_version, config::TracingSection}; @@ -13,7 +12,15 @@ use super::flame; /// Abscissa component for initializing the `tracing` subsystem pub struct Tracing { - filter_handle: Handle, + /// The installed filter reloading handle, if enabled. + // + // TODO: when fmt::Subscriber supports per-layer filtering, remove the Option + filter_handle: Option>, + + /// The originally configured filter. + initial_filter: String, + + /// The installed flame graph collector, if enabled. flamegrapher: Option, } @@ -28,58 +35,129 @@ impl Tracing { let use_color = config.force_use_color || (config.use_color && atty::is(atty::Stream::Stdout)); - // Construct a tracing subscriber with the supplied filter and enable reloading. - let builder = FmtSubscriber::builder() - .with_ansi(use_color) - .with_env_filter(&filter) - .with_filter_reloading(); - let filter_handle = builder.reload_handle(); + // Construct a format subscriber with the supplied global logging filter, and enable reloading. + // TODO: when fmt::Subscriber supports per-layer filtering, always enable this code + #[cfg(not(all(feature = "tokio-console", tokio_unstable)))] + let (subscriber, filter_handle) = { + use tracing_subscriber::FmtSubscriber; + let logger = FmtSubscriber::builder() + .with_ansi(use_color) + .with_env_filter(&filter) + .with_filter_reloading(); + + let filter_handle = logger.reload_handle(); + let subscriber = logger.finish().with(ErrorLayer::default()); + + (subscriber, Some(filter_handle)) + }; + + // Construct a tracing registry with the supplied per-layer logging filter, + // and disable filter reloading. + // + // TODO: when fmt::Subscriber supports per-layer filtering, + // remove this registry code, and layer tokio-console on top of fmt::Subscriber + #[cfg(all(feature = "tokio-console", tokio_unstable))] + let (subscriber, filter_handle) = { + use tracing_subscriber::{fmt, Layer}; + + let subscriber = tracing_subscriber::registry(); + // TODO: find out why crawl_and_dial and try_to_sync evade this filter, + // and why they also don't get the global net/commit span + // + // Using `registry` as the base subscriber, the logs from most other functions get filtered. + // Using `FmtSubscriber` as the base subscriber, all the logs get filtered. + let logger = fmt::Layer::new() + .with_ansi(use_color) + .with_filter(EnvFilter::from(&filter)); + + let subscriber = subscriber.with(logger); + + let span_logger = ErrorLayer::default().with_filter(EnvFilter::from(&filter)); + let subscriber = subscriber.with(span_logger); + + (subscriber, None) + }; + + // Add optional layers based on dynamic and compile-time configs + + // Add a flamegraph let (flamelayer, flamegrapher) = if let Some(path) = flame_root { let (flamelayer, flamegrapher) = flame::layer(path); + (Some(flamelayer), Some(flamegrapher)) } else { (None, None) }; + let subscriber = subscriber.with(flamelayer); let journaldlayer = if config.use_journald { let layer = tracing_journald::layer() .map_err(|e| FrameworkErrorKind::ComponentError.context(e))?; + + // If the global filter can't be used, add a per-layer filter instead. + // TODO: when fmt::Subscriber supports per-layer filtering, always enable this code + #[cfg(all(feature = "tokio-console", tokio_unstable))] + let layer = { + use tracing_subscriber::Layer; + layer.with_filter(EnvFilter::from(&filter)) + }; + Some(layer) } else { None }; - - let subscriber = builder.finish().with(ErrorLayer::default()); + let subscriber = subscriber.with(journaldlayer); #[cfg(feature = "enable-sentry")] let subscriber = subscriber.with(sentry_tracing::layer()); - match (flamelayer, journaldlayer) { - (None, None) => subscriber.init(), - (Some(layer1), None) => subscriber.with(layer1).init(), - (None, Some(layer2)) => subscriber.with(layer2).init(), - (Some(layer1), Some(layer2)) => subscriber.with(layer1).with(layer2).init(), - }; + // spawn the console server in the background, and apply the console layer + // TODO: set Builder::poll_duration_histogram_max() if needed + #[cfg(all(feature = "tokio-console", tokio_unstable))] + let subscriber = subscriber.with(console_subscriber::spawn()); + // Initialise the global tracing subscriber + subscriber.init(); + + // Log the tracing stack we just created tracing::info!( ?filter, TRACING_STATIC_MAX_LEVEL = ?tracing::level_filters::STATIC_MAX_LEVEL, LOG_STATIC_MAX_LEVEL = ?log::STATIC_MAX_LEVEL, "started tracing component", ); + if flame_root.is_some() { + info!("installed flamegraph tracing layer"); + } + if config.use_journald { + info!(?filter, "installed journald tracing layer"); + } + #[cfg(feature = "enable-sentry")] + info!("installed sentry tracing layer"); + #[cfg(all(feature = "tokio-console", tokio_unstable))] + info!( + TRACING_STATIC_MAX_LEVEL = ?tracing::level_filters::STATIC_MAX_LEVEL, + LOG_STATIC_MAX_LEVEL = ?log::STATIC_MAX_LEVEL, + "installed tokio-console tracing layer", + ); Ok(Self { filter_handle, + initial_filter: filter, flamegrapher, }) } /// Return the currently-active tracing filter. pub fn filter(&self) -> String { - self.filter_handle - .with_current(|filter| filter.to_string()) - .expect("the subscriber is not dropped before the component is") + if let Some(filter_handle) = self.filter_handle.as_ref() { + filter_handle + .with_current(|filter| filter.to_string()) + .expect("the subscriber is not dropped before the component is") + } else { + self.initial_filter.clone() + } } /// Reload the currently-active filter with the supplied value. @@ -87,18 +165,26 @@ impl Tracing { /// This can be used to provide a dynamic tracing filter endpoint. pub fn reload_filter(&self, filter: impl Into) { let filter = filter.into(); - let filter_str = filter.to_string(); - self.filter_handle - .reload(filter) - .expect("the subscriber is not dropped before the component is"); + if let Some(filter_handle) = self.filter_handle.as_ref() { + tracing::info!( + ?filter, + TRACING_STATIC_MAX_LEVEL = ?tracing::level_filters::STATIC_MAX_LEVEL, + LOG_STATIC_MAX_LEVEL = ?log::STATIC_MAX_LEVEL, + "reloading tracing filter", + ); - tracing::info!( - filter = ?filter_str, - TRACING_STATIC_MAX_LEVEL = ?tracing::level_filters::STATIC_MAX_LEVEL, - LOG_STATIC_MAX_LEVEL = ?log::STATIC_MAX_LEVEL, - "reloaded tracing filter", - ); + filter_handle + .reload(filter) + .expect("the subscriber is not dropped before the component is"); + } else { + tracing::warn!( + ?filter, + TRACING_STATIC_MAX_LEVEL = ?tracing::level_filters::STATIC_MAX_LEVEL, + LOG_STATIC_MAX_LEVEL = ?log::STATIC_MAX_LEVEL, + "attempted to reload tracing filter, but filter reloading is disabled", + ); + } } }