Change TracingEndpoint to forward to the Abscissa Tracing component.

This commit is contained in:
Henry de Valence 2019-12-26 11:56:41 -08:00
parent 45eb81a204
commit ab3db201ee
2 changed files with 52 additions and 98 deletions

View File

@ -76,20 +76,6 @@ impl Application for ZebradApp {
&mut self.state &mut self.state
} }
/// Override the provided impl to skip the default logging component.
///
/// We want to use tracing as the log subscriber in our tracing component,
/// so only initialize the abscissa Terminal component.
fn framework_components(
&mut self,
_command: &Self::Cmd,
) -> Result<Vec<Box<dyn Component<Self>>>, FrameworkError> {
use abscissa_core::terminal::{component::Terminal, ColorChoice};
// XXX abscissa uses self.term_colors(command), check if we should match
let terminal = Terminal::new(ColorChoice::Auto);
Ok(vec![Box::new(terminal)])
}
/// Register all components used by this application. /// Register all components used by this application.
/// ///
/// If you would like to add additional components to your application /// If you would like to add additional components to your application

View File

@ -1,24 +1,16 @@
//! An HTTP endpoint for dynamically setting tracing filters. //! An HTTP endpoint for dynamically setting tracing filters.
use crate::components::tokio::TokioComponent; use crate::{components::tokio::TokioComponent, prelude::*};
use abscissa_core::{format_err, Component, FrameworkError, FrameworkErrorKind}; use abscissa_core::{Component, FrameworkError};
use hyper::service::{make_service_fn, service_fn}; use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server}; use hyper::{Body, Request, Response, Server};
use tracing::Subscriber;
use tracing_log::LogTracer;
use tracing_subscriber::{reload::Handle, EnvFilter, FmtSubscriber};
/// Abscissa component which runs a tracing filter endpoint. /// Abscissa component which runs a tracing filter endpoint.
#[derive(Component)] #[derive(Component)]
#[component(inject = "init_tokio(zebrad::components::tokio::TokioComponent)")] #[component(inject = "init_tokio(zebrad::components::tokio::TokioComponent)")]
// XXX ideally this would be TracingEndpoint<S: Subscriber> pub struct TracingEndpoint {}
// but this doesn't seem to play well with derive(Component)
pub struct TracingEndpoint {
filter_handle: Handle<EnvFilter, tracing_subscriber::fmt::Formatter>,
}
impl ::std::fmt::Debug for TracingEndpoint { impl ::std::fmt::Debug for TracingEndpoint {
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> Result<(), ::std::fmt::Error> { fn fmt(&self, f: &mut ::std::fmt::Formatter) -> Result<(), ::std::fmt::Error> {
@ -27,54 +19,28 @@ impl ::std::fmt::Debug for TracingEndpoint {
} }
} }
async fn read_filter(req: Request<Body>) -> Result<String, String> {
std::str::from_utf8(
&hyper::body::to_bytes(req.into_body())
.await
.map_err(|_| "Error reading body".to_owned())?,
)
.map(|s| s.to_owned())
.map_err(|_| "Filter must be UTF-8".to_owned())
}
impl TracingEndpoint { impl TracingEndpoint {
/// Create the component. /// Create the component.
pub fn new() -> Result<Self, FrameworkError> { pub fn new() -> Result<Self, FrameworkError> {
// Set the global logger for the log crate to emit tracing events. Ok(Self {})
// XXX this is only required if we have a dependency that uses log;
// currently this is maybe only abscissa itself?
LogTracer::init().map_err(|e| {
format_err!(
FrameworkErrorKind::ComponentError,
"could not set log subscriber: {}",
e
)
})?;
let builder = FmtSubscriber::builder()
.with_ansi(true)
// Set the initial filter from the RUST_LOG env variable
// XXX pull from config file?
.with_env_filter(EnvFilter::from_default_env())
.with_filter_reloading();
let filter_handle = builder.reload_handle();
let subscriber = builder.finish();
// Set that subscriber to be the global tracing subscriber
tracing::subscriber::set_global_default(subscriber).map_err(|e| {
format_err!(
FrameworkErrorKind::ComponentError,
"could not set tracing subscriber: {}",
e
)
})?;
Ok(Self { filter_handle })
} }
/// Do setup after receiving a tokio runtime. /// Do setup after receiving a tokio runtime.
pub fn init_tokio(&mut self, tokio_component: &TokioComponent) -> Result<(), FrameworkError> { pub fn init_tokio(&mut self, tokio_component: &TokioComponent) -> Result<(), FrameworkError> {
info!("Initializing tracing endpoint"); info!("Initializing tracing endpoint");
// Clone the filter handle so it can be moved into make_service_fn closure let service =
let handle = self.filter_handle.clone(); make_service_fn(|_| async { Ok::<_, hyper::Error>(service_fn(request_handler)) });
let service = make_service_fn(move |_| {
// Clone again to move into the service_fn closure
let handle = handle.clone();
async move {
Ok::<_, hyper::Error>(service_fn(move |req| filter_handler(handle.clone(), req)))
}
});
// XXX load tracing addr from config // XXX load tracing addr from config
let addr = "127.0.0.1:3000" let addr = "127.0.0.1:3000"
@ -103,50 +69,52 @@ impl TracingEndpoint {
} }
} }
fn reload_filter_from_bytes<S: Subscriber>( #[instrument]
handle: Handle<EnvFilter, S>, async fn request_handler(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
bytes: hyper::body::Bytes,
) -> Result<(), String> {
let body = std::str::from_utf8(bytes.as_ref()).map_err(|e| format!("{}", e))?;
trace!(request.body = ?body);
let filter = body.parse::<EnvFilter>().map_err(|e| format!("{}", e))?;
handle.reload(filter).map_err(|e| format!("{}", e))
}
async fn filter_handler<S: Subscriber>(
handle: Handle<EnvFilter, S>,
req: Request<Body>,
) -> Result<Response<Body>, hyper::Error> {
// XXX see below
//use futures_util::TryStreamExt;
use hyper::{Method, StatusCode}; use hyper::{Method, StatusCode};
// We can't use #[instrument] because Handle<_,_> is not Debug,
// so we create a span manually.
let handler_span =
info_span!("filter_handler", method = ?req.method(), path = ?req.uri().path());
let _enter = handler_span.enter(); // dropping _enter closes the span
let rsp = match (req.method(), req.uri().path()) { let rsp = match (req.method(), req.uri().path()) {
(&Method::GET, "/") => Response::new(Body::from( (&Method::GET, "/") => Response::new(Body::from(
r#" r#"
This HTTP endpoint allows dynamic control of the filter applied to This HTTP endpoint allows dynamic control of the filter applied to
tracing events. To set the filter, POST it to /filter: tracing events.
curl -X POST localhost:3000/filter -d "zebrad=trace" To get the current filter, GET /filter:
curl -X GET localhost:3000/filter
To set the filter, POST the new filter string to /filter:
curl -X POST localhost:3000/filter -d "zebrad=trace"
"#, "#,
)), )),
(&Method::POST, "/filter") => { (&Method::GET, "/filter") => Response::builder()
// Combine all HTTP request chunks into one .status(StatusCode::OK)
let body_bytes = hyper::body::to_bytes(req.into_body()).await?; .body(Body::from(
match reload_filter_from_bytes(handle, body_bytes) { app_reader()
.state()
.components
.get_downcast_ref::<abscissa_core::trace::Tracing>()
.expect("Tracing component should be available")
.filter(),
))
.expect("response with known status code cannot fail"),
(&Method::POST, "/filter") => match read_filter(req).await {
Ok(filter) => {
app_writer()
.state_mut()
.components
.get_downcast_mut::<abscissa_core::trace::Tracing>()
.expect("Tracing component should be available")
.reload_filter(filter);
Response::new(Body::from(""))
}
Err(e) => Response::builder() Err(e) => Response::builder()
.status(StatusCode::BAD_REQUEST) .status(StatusCode::BAD_REQUEST)
.body(Body::from(e)) .body(Body::from(e))
.expect("response with known status code cannot fail"), .expect("response with known status code cannot fail"),
Ok(()) => Response::new(Body::from("")), },
}
}
_ => Response::builder() _ => Response::builder()
.status(StatusCode::NOT_FOUND) .status(StatusCode::NOT_FOUND)
.body(Body::from("")) .body(Body::from(""))