From d9fae6e31141bb98d97671178b3c5b559d5da9b9 Mon Sep 17 00:00:00 2001 From: Arya Date: Wed, 7 Sep 2022 03:39:30 -0400 Subject: [PATCH] fix(test) switch zebrad to a non-blocking tracing logger (#5032) * adds non-blocking writer for tracing subscriber * use non_blocking writer for the fmt::Layer with the tokio-console feature as well * adds doc comment to _guard field * adds acceptance test * update filter_handle type to use NonBlocking * adds more detail on lossy non-blocking writer and sets tracing.filter to "trace" in acceptance test * drops ZebradApp before process::exit(1) in the event of a FrameworkError * reduces buffered lines limit to 8000 * adds tracing.buffer_limit config and some comments * update acceptance.rs * fix acceptance test * fixes ambigious phrasing in comment * updates zebrad/src/application.rs * Find out what the join error is in the GitHub runner tests * updates acceptance test to use recv_timeout instead of always waiting 10 seconds, removes unnecessary echo command, and reduces # of rpc requests to 500 * see if sleeping for a few seconds before exiting helps the macOS test pass * Expand exit sleep docs Co-authored-by: Arya Co-authored-by: teor --- Cargo.lock | 12 ++++ zebra-state/src/service/finalized_state.rs | 17 ++++++ zebrad/Cargo.toml | 1 + zebrad/src/application.rs | 15 ++++- zebrad/src/components/tracing/component.rs | 29 ++++++++- zebrad/src/config.rs | 7 +++ zebrad/tests/acceptance.rs | 71 ++++++++++++++++++++++ 7 files changed, 148 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 011bc5f5..f6c18880 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5532,6 +5532,17 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-appender" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09d48f71a791638519505cefafe162606f706c25592e4bde4d97600c0195312e" +dependencies = [ + "crossbeam-channel", + "time 0.3.14", + "tracing-subscriber 0.3.11", +] + [[package]] name = "tracing-attributes" version = "0.1.19" @@ -6638,6 +6649,7 @@ dependencies = [ "tonic-build", "tower", "tracing", + "tracing-appender", "tracing-error", "tracing-flame", "tracing-futures", diff --git a/zebra-state/src/service/finalized_state.rs b/zebra-state/src/service/finalized_state.rs index 2e86d194..611c7fcf 100644 --- a/zebra-state/src/service/finalized_state.rs +++ b/zebra-state/src/service/finalized_state.rs @@ -108,6 +108,11 @@ impl FinalizedState { // So we want to drop it before we exit. std::mem::drop(new_state); + // Drops tracing log output that's hasn't already been written to stdout + // since this exits before calling drop on the WorkerGuard for the logger thread. + // This is okay for now because this is test-only code + // + // TODO: Call ZebradApp.shutdown or drop its Tracing component before calling exit_process to flush logs to stdout Self::exit_process(); } } @@ -356,6 +361,11 @@ impl FinalizedState { // We're just about to do a forced exit, so it's ok to do a forced db shutdown self.db.shutdown(true); + // Drops tracing log output that's hasn't already been written to stdout + // since this exits before calling drop on the WorkerGuard for the logger thread. + // This is okay for now because this is test-only code + // + // TODO: Call ZebradApp.shutdown or drop its Tracing component before calling exit_process to flush logs to stdout Self::exit_process(); } @@ -392,6 +402,13 @@ impl FinalizedState { let _ = stdout().lock().flush(); let _ = stderr().lock().flush(); + // Give some time to logger thread to flush out any remaining lines to stdout + // and yield so that tests pass on MacOS + std::thread::sleep(std::time::Duration::from_secs(3)); + + // Exits before calling drop on the WorkerGuard for the logger thread, + // dropping any lines that haven't already been written to stdout. + // This is okay for now because this is test-only code std::process::exit(0); } } diff --git a/zebrad/Cargo.toml b/zebrad/Cargo.toml index c660b43f..e8583b46 100644 --- a/zebrad/Cargo.toml +++ b/zebrad/Cargo.toml @@ -99,6 +99,7 @@ tinyvec = { version = "1.6.0", features = ["rustc_1_55"] } thiserror = "1.0.34" tracing-subscriber = { version = "0.3.11", features = ["env-filter"] } +tracing-appender = "0.2.2" tracing-error = "0.2.0" tracing-futures = "0.2.5" tracing = "0.1.31" diff --git a/zebrad/src/application.rs b/zebrad/src/application.rs index 6ebd51c9..521fec67 100644 --- a/zebrad/src/application.rs +++ b/zebrad/src/application.rs @@ -6,7 +6,7 @@ use self::entry_point::EntryPoint; use std::{fmt::Write as _, io::Write as _, process}; use abscissa_core::{ - application::{self, fatal_error, AppCell}, + application::{self, AppCell}, config::{self, Configurable}, status_err, terminal::{component::Terminal, stderr, stdout, ColorChoice}, @@ -18,6 +18,13 @@ use zebra_state::constants::{DATABASE_FORMAT_VERSION, LOCK_FILE_ERROR}; use crate::{commands::ZebradCmd, components::tracing::Tracing, config::ZebradConfig}; +/// See +/// Print a fatal error message and exit +fn fatal_error(app_name: String, err: &dyn std::error::Error) -> ! { + status_err!("{} fatal error: {}", app_name, err); + process::exit(1) +} + /// Application state pub static APPLICATION: AppCell = AppCell::new(); @@ -462,7 +469,11 @@ impl Application for ZebradApp { let _ = stderr().lock().flush(); if let Err(e) = self.state().components.shutdown(self, shutdown) { - fatal_error(self, &e) + let app_name = self.name().to_string(); + + // Swap out a fake app so we can trigger the destructor on the original + let _ = std::mem::take(self); + fatal_error(app_name, &e); } // Swap out a fake app so we can trigger the destructor on the original diff --git a/zebrad/src/components/tracing/component.rs b/zebrad/src/components/tracing/component.rs index bab5675d..d2bca164 100644 --- a/zebrad/src/components/tracing/component.rs +++ b/zebrad/src/components/tracing/component.rs @@ -3,9 +3,15 @@ use abscissa_core::{Component, FrameworkError, Shutdown}; use tracing_error::ErrorLayer; use tracing_subscriber::{ - fmt::Formatter, layer::SubscriberExt, reload::Handle, util::SubscriberInitExt, EnvFilter, + fmt::{format, Formatter}, + layer::SubscriberExt, + reload::Handle, + util::SubscriberInitExt, + EnvFilter, }; +use tracing_appender::non_blocking::{NonBlocking, NonBlockingBuilder, WorkerGuard}; + use crate::{application::app_version, config::TracingSection}; #[cfg(feature = "flamegraph")] @@ -16,7 +22,12 @@ pub struct Tracing { /// The installed filter reloading handle, if enabled. // // TODO: when fmt::Subscriber supports per-layer filtering, remove the Option - filter_handle: Option>, + filter_handle: Option< + Handle< + EnvFilter, + Formatter, NonBlocking>, + >, + >, /// The originally configured filter. initial_filter: String, @@ -24,6 +35,10 @@ pub struct Tracing { /// The installed flame graph collector, if enabled. #[cfg(feature = "flamegraph")] flamegrapher: Option, + + /// Drop guard for worker thread of non-blocking logger, + /// responsible for flushing any remaining logs when the program terminates + _guard: WorkerGuard, } impl Tracing { @@ -32,6 +47,13 @@ impl Tracing { let filter = config.filter.unwrap_or_else(|| "".to_string()); let flame_root = &config.flamegraph; + // Builds a lossy NonBlocking logger with a default line limit of 128_000 or an explicit buffer_limit. + // The write method queues lines down a bounded channel with this capacity to a worker thread that writes to stdout. + // Increments error_counter and drops lines when the buffer is full. + let (non_blocking, _guard) = NonBlockingBuilder::default() + .buffered_lines_limit(config.buffer_limit.max(100)) + .finish(std::io::stdout()); + // Only use color if tracing output is being sent to a terminal or if it was explicitly // forced to. let use_color = @@ -47,6 +69,7 @@ impl Tracing { let logger = FmtSubscriber::builder() .with_ansi(use_color) + .with_writer(non_blocking) .with_env_filter(&filter); // Enable reloading if that feature is selected. @@ -82,6 +105,7 @@ impl Tracing { // Using `FmtSubscriber` as the base subscriber, all the logs get filtered. let logger = fmt::Layer::new() .with_ansi(use_color) + .with_writer(non_blocking) .with_filter(EnvFilter::from(&filter)); let subscriber = subscriber.with(logger); @@ -185,6 +209,7 @@ impl Tracing { initial_filter: filter, #[cfg(feature = "flamegraph")] flamegrapher, + _guard, }) } diff --git a/zebrad/src/config.rs b/zebrad/src/config.rs index da5c2322..86228577 100644 --- a/zebrad/src/config.rs +++ b/zebrad/src/config.rs @@ -93,6 +93,12 @@ pub struct TracingSection { /// verification of every 1000th block. pub filter: Option, + /// The buffer_limit size sets the number of log lines that can be queued by the tracing subscriber + /// to be written to stdout before logs are dropped. + /// + /// Defaults to 128,000 with a minimum of 100. + pub buffer_limit: usize, + /// The address used for an ad-hoc RPC endpoint allowing dynamic control of the tracing filter. /// /// Install Zebra using `cargo install --features=filter-reload` to enable this config. @@ -140,6 +146,7 @@ impl Default for TracingSection { use_color: true, force_use_color: false, filter: None, + buffer_limit: 128_000, endpoint_addr: None, flamegraph: None, use_journald: false, diff --git a/zebrad/tests/acceptance.rs b/zebrad/tests/acceptance.rs index 50584b84..4a797d4b 100644 --- a/zebrad/tests/acceptance.rs +++ b/zebrad/tests/acceptance.rs @@ -1239,6 +1239,77 @@ async fn rpc_endpoint(parallel_cpu_threads: bool) -> Result<()> { Ok(()) } +#[test] +fn non_blocking_logger() -> Result<()> { + use futures::FutureExt; + use std::{sync::mpsc, time::Duration}; + + let rt = tokio::runtime::Runtime::new().unwrap(); + let (done_tx, done_rx) = mpsc::channel(); + + let test_task_handle: tokio::task::JoinHandle> = rt.spawn(async move { + let _init_guard = zebra_test::init(); + + // Write a configuration that has RPC listen_addr set + // [Note on port conflict](#Note on port conflict) + let mut config = random_known_rpc_port_config(false)?; + config.tracing.filter = Some("trace".to_string()); + config.tracing.buffer_limit = 100; + let zebra_rpc_address = config.rpc.listen_addr.unwrap(); + + let dir = testdir()?.with_config(&mut config)?; + let mut child = dir.spawn_child(args!["start"])?; + // Wait until port is open. + child.expect_stdout_line_matches( + format!("Opened RPC endpoint at {}", config.rpc.listen_addr.unwrap()).as_str(), + )?; + + // Create an http client + let client = reqwest::Client::new(); + + // Most of Zebra's lines are 100-200 characters long, so 500 requests should print enough to fill the unix pipe, + // fill the channel that tracing logs are queued onto, and drop logs rather than block execution. + for _ in 0..500 { + let res = client + .post(format!("http://{}", &zebra_rpc_address)) + .body(r#"{"jsonrpc":"1.0","method":"getinfo","params":[],"id":123}"#) + .header("Content-Type", "application/json") + .send() + .await?; + + // Test that zebrad rpc endpoint is still responding to requests + assert!(res.status().is_success()); + } + + child.kill(false)?; + + let output = child.wait_with_output()?; + let output = output.assert_failure()?; + + // [Note on port conflict](#Note on port conflict) + output + .assert_was_killed() + .wrap_err("Possible port conflict. Are there other acceptance tests running?")?; + + done_tx.send(())?; + + Ok(()) + }); + + // Wait until the spawned task finishes or return an error in 45 seconds + if done_rx.recv_timeout(Duration::from_secs(45)).is_err() { + return Err(eyre!("unexpected test task hang")); + } + + rt.shutdown_timeout(Duration::from_secs(3)); + + match test_task_handle.now_or_never() { + Some(Ok(result)) => result, + Some(Err(error)) => Err(eyre!("join error: {:?}", error)), + None => Err(eyre!("unexpected test task hang")), + } +} + /// Make sure `lightwalletd` works with Zebra, when both their states are empty. /// /// This test only runs when the `ZEBRA_TEST_LIGHTWALLETD` env var is set.