From 217c25ef07827f091ea1bf00fe508cd575165a40 Mon Sep 17 00:00:00 2001 From: Henry de Valence Date: Mon, 22 Jun 2020 19:19:27 -0700 Subject: [PATCH] network: propagate tracing Spans through peer connection --- zebra-network/src/peer/client.rs | 34 +++---- zebra-network/src/peer/connection.rs | 107 ++++++++++++++++------- zebra-network/src/peer/handshake.rs | 50 +++++++---- zebra-network/src/peer_set/initialize.rs | 2 +- 4 files changed, 128 insertions(+), 65 deletions(-) diff --git a/zebra-network/src/peer/client.rs b/zebra-network/src/peer/client.rs index f9b1e87e..7f785224 100644 --- a/zebra-network/src/peer/client.rs +++ b/zebra-network/src/peer/client.rs @@ -16,21 +16,22 @@ use super::{ErrorSlot, SharedPeerError}; /// The "client" duplex half of a peer connection. pub struct Client { - pub(super) span: tracing::Span, pub(super) server_tx: mpsc::Sender, pub(super) error_slot: ErrorSlot, } -/// A message from the `peer::Client` to the `peer::Server`, containing both a -/// request and a return message channel. The reason the return channel is -/// included is because `peer::Client::call` returns a future that may be moved -/// around before it resolves, so the future must have ownership of the channel -/// on which it receives the response. +/// A message from the `peer::Client` to the `peer::Server`. #[derive(Debug)] -pub(super) struct ClientRequest( - pub(super) Request, - pub(super) oneshot::Sender>, -); +pub(super) struct ClientRequest { + /// The actual request. + pub request: Request, + /// The return message channel, included because `peer::Client::call` returns a + /// future that may be moved around before it resolves. + pub tx: oneshot::Sender>, + /// The tracing context for the request, so that work the connection task does + /// processing messages in the context of this request will have correct context. + pub span: tracing::Span, +} impl Service for Client { type Response = Response; @@ -49,19 +50,23 @@ impl Service for Client { } } - fn call(&mut self, req: Request) -> Self::Future { + fn call(&mut self, request: Request) -> Self::Future { use futures::future::FutureExt; - use tracing_futures::Instrument; let (tx, rx) = oneshot::channel(); - match self.server_tx.try_send(ClientRequest(req, tx)) { + // get the current Span to propagate it to the peer connection task. + // this allows the peer connection to enter the correct tracing context + // when it's handling messages in the context of processing this + // request. + let span = tracing::Span::current(); + + match self.server_tx.try_send(ClientRequest { request, span, tx }) { Err(e) => { if e.is_disconnected() { future::ready(Err(self .error_slot .try_get_error() .expect("failed servers must set their error slot"))) - .instrument(self.span.clone()) .boxed() } else { // sending fails when there's not enough @@ -75,7 +80,6 @@ impl Service for Client { oneshot_recv_result .expect("ClientRequest oneshot sender must not be dropped before send") }) - .instrument(self.span.clone()) .boxed() } } diff --git a/zebra-network/src/peer/connection.rs b/zebra-network/src/peer/connection.rs index c7b68868..5913f4a4 100644 --- a/zebra-network/src/peer/connection.rs +++ b/zebra-network/src/peer/connection.rs @@ -9,6 +9,7 @@ use futures::{ }; use tokio::time::{delay_for, Delay}; use tower::Service; +use tracing_futures::Instrument; use zebra_chain::{ block::{Block, BlockHeaderHash}, @@ -46,7 +47,6 @@ impl Handler { /// contents to responses without additional copies. If the message is not /// interpretable as a response, we return ownership to the caller. fn process_message(&mut self, msg: Message) -> Option { - trace!(?msg); // This function is where we statefully interpret Bitcoin/Zcash messages // into responses to messages in the internal request/response protocol. // This conversion is done by a sequence of (request, message) match arms, @@ -93,6 +93,7 @@ impl Handler { ))), // By default, messages are not responses. (state, msg) => { + trace!(?msg, "did not interpret message as response"); ignored_msg = Some(msg); state } @@ -106,7 +107,11 @@ pub(super) enum State { /// Awaiting a client request or a peer message. AwaitingRequest, /// Awaiting a peer message we can interpret as a client request. - AwaitingResponse(Handler, oneshot::Sender>), + AwaitingResponse { + handler: Handler, + tx: oneshot::Sender>, + span: tracing::Span, + }, /// A failure has occurred and we are shutting down the connection. Failed, } @@ -171,18 +176,26 @@ where Either::Right((None, _)) => { self.fail_with(PeerError::DeadClient); } - Either::Right((Some(req), _)) => self.handle_client_request(req).await, + Either::Right((Some(req), _)) => { + let span = req.span.clone(); + self.handle_client_request(req).instrument(span).await + } } } // We're awaiting a response to a client request, // so wait on either a peer message, or on a request timeout. - State::AwaitingResponse { .. } => { - trace!("awaiting response to client request"); + State::AwaitingResponse { ref span, .. } => { + // we have to get rid of the span reference so we can tamper with the state + let span = span.clone(); + trace!(parent: &span, "awaiting response to client request"); let timer_ref = self .request_timer .as_mut() .expect("timeout must be set while awaiting response"); - match future::select(peer_rx.next(), timer_ref).await { + match future::select(peer_rx.next(), timer_ref) + .instrument(span.clone()) + .await + { Either::Left((None, _)) => self.fail_with(PeerError::ConnectionClosed), Either::Left((Some(Err(e)), _)) => self.fail_with(e.into()), Either::Left((Some(Ok(peer_msg)), _timer)) => { @@ -194,39 +207,48 @@ where // factor the state required for inbound and // outbound requests. let request_msg = match self.state { - State::AwaitingResponse(ref mut handler, _) => { - handler.process_message(peer_msg) - } + State::AwaitingResponse { + ref mut handler, .. + } => span.in_scope(|| handler.process_message(peer_msg)), _ => unreachable!(), }; // If the message was not consumed, check whether it // should be handled as a request. if let Some(msg) = request_msg { + // do NOT instrument with the request span, this is + // independent work self.handle_message_as_request(msg).await; } else { // Otherwise, check whether the handler is finished // processing messages and update the state. self.state = match self.state { - State::AwaitingResponse(Handler::Finished(response), tx) => { + State::AwaitingResponse { + handler: Handler::Finished(response), + tx, + .. + } => { let _ = tx.send(response); State::AwaitingRequest } - pending @ State::AwaitingResponse(_, _) => pending, + pending @ State::AwaitingResponse { .. } => pending, _ => unreachable!(), }; } } Either::Right(((), _peer_fut)) => { - trace!("client request timed out"); + trace!(parent: &span, "client request timed out"); let e = PeerError::ClientRequestTimeout; self.state = match self.state { // Special case: ping timeouts fail the connection. - State::AwaitingResponse(Handler::Ping(_), _) => { + State::AwaitingResponse { + handler: Handler::Ping(_), + .. + } => { self.fail_with(e); State::Failed } // Other request timeouts fail the request. - State::AwaitingResponse(_, tx) => { + State::AwaitingResponse { tx, .. } => { let _ = tx.send(Err(e.into())); State::AwaitingRequest } @@ -239,7 +261,11 @@ where // requests before we can return and complete the future. State::Failed => { match self.client_rx.next().await { - Some(ClientRequest(_, tx)) => { + Some(ClientRequest { tx, span, .. }) => { + trace!( + parent: &span, + "erroring pending request to failed connection" + ); let e = self .error_slot .try_get_error() @@ -278,7 +304,7 @@ where // we need to deal with it first if it exists. self.client_rx.close(); let old_state = std::mem::replace(&mut self.state, State::Failed); - if let State::AwaitingResponse(_, tx) = old_state { + if let State::AwaitingResponse { tx, .. } = old_state { // We know the slot has Some(e) because we just set it above, // and the error slot is never unset. let e = self.error_slot.try_get_error().unwrap(); @@ -288,15 +314,19 @@ where /// Handle an incoming client request, possibly generating outgoing messages to the /// remote peer. - async fn handle_client_request(&mut self, msg: ClientRequest) { - trace!(?msg); + /// + /// NOTE: the caller should use .instrument(msg.span) to instrument the function. + async fn handle_client_request(&mut self, req: ClientRequest) { + trace!(?req.request); use Request::*; use State::*; - let ClientRequest(req, tx) = msg; + let ClientRequest { request, tx, span } = req; + + // XXX(hdevalence) this is truly horrible, but let's fix it later // Inner match returns Result with the new state or an error. // Outer match updates state or fails. - match match (&self.state, req) { + match match (&self.state, request) { (Failed, _) => panic!("failed connection cannot handle requests"), (AwaitingResponse { .. }, _) => panic!("tried to update pending request"), (AwaitingRequest, Peers) => self @@ -304,13 +334,21 @@ where .send(Message::GetAddr) .await .map_err(|e| e.into()) - .map(|()| AwaitingResponse(Handler::GetPeers, tx)), + .map(|()| AwaitingResponse { + handler: Handler::GetPeers, + tx, + span, + }), (AwaitingRequest, Ping(nonce)) => self .peer_tx .send(Message::Ping(nonce)) .await .map_err(|e| e.into()) - .map(|()| AwaitingResponse(Handler::Ping(nonce), tx)), + .map(|()| AwaitingResponse { + handler: Handler::Ping(nonce), + tx, + span, + }), (AwaitingRequest, BlocksByHash(hashes)) => self .peer_tx .send(Message::GetData( @@ -318,14 +356,13 @@ where )) .await .map_err(|e| e.into()) - .map(|()| { - AwaitingResponse( - Handler::GetBlocksByHash { - blocks: Vec::with_capacity(hashes.len()), - hashes, - }, - tx, - ) + .map(|()| AwaitingResponse { + handler: Handler::GetBlocksByHash { + blocks: Vec::with_capacity(hashes.len()), + hashes, + }, + tx, + span, }), (AwaitingRequest, FindBlocks { known_blocks, stop }) => self .peer_tx @@ -335,7 +372,11 @@ where }) .await .map_err(|e| e.into()) - .map(|()| AwaitingResponse(Handler::FindBlocks, tx)), + .map(|()| AwaitingResponse { + handler: Handler::FindBlocks, + tx, + span, + }), } { Ok(new_state) => { self.state = new_state; @@ -345,6 +386,9 @@ where } } + // This function has its own span, because we're creating a new work + // context (namely, the work of processing the inbound msg as a request) + #[instrument(skip(self))] async fn handle_message_as_request(&mut self, msg: Message) { trace!(?msg); // These messages are transport-related, handle them separately: @@ -358,6 +402,7 @@ where return; } Message::Ping(nonce) => { + trace!(?nonce, "responding to heartbeat"); match self.peer_tx.send(Message::Pong(nonce)).await { Ok(()) => {} Err(e) => self.fail_with(e.into()), diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index 7ab0af76..f8d82226 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -212,7 +212,6 @@ where let slot = ErrorSlot::default(); let client = Client { - span: connection_span.clone(), server_tx: server_tx.clone(), error_slot: slot.clone(), }; @@ -268,31 +267,46 @@ where request_timer: None, }; - tokio::spawn(server.run(peer_rx).instrument(connection_span).boxed()); + tokio::spawn( + server + .run(peer_rx) + .instrument(connection_span.clone()) + .boxed(), + ); - tokio::spawn(async move { - use futures::channel::oneshot; + let heartbeat_span = tracing::debug_span!(parent: connection_span, "heartbeat"); + tokio::spawn( + async move { + use futures::channel::oneshot; - use super::client::ClientRequest; + use super::client::ClientRequest; - let mut server_tx = server_tx; + let mut server_tx = server_tx; - let mut interval_stream = tokio::time::interval(constants::HEARTBEAT_INTERVAL); + let mut interval_stream = tokio::time::interval(constants::HEARTBEAT_INTERVAL); - loop { - interval_stream.tick().await; + loop { + interval_stream.tick().await; - // We discard the server handle because our - // heartbeat `Ping`s are a special case, and we - // don't actually care about the response here. - let (request_tx, _) = oneshot::channel(); - let msg = ClientRequest(Request::Ping(Nonce::default()), request_tx); - - if server_tx.send(msg).await.is_err() { - return; + // We discard the server handle because our + // heartbeat `Ping`s are a special case, and we + // don't actually care about the response here. + let (request_tx, _) = oneshot::channel(); + if server_tx + .send(ClientRequest { + request: Request::Ping(Nonce::default()), + tx: request_tx, + span: tracing::Span::current(), + }) + .await + .is_err() + { + return; + } } } - }); + .instrument(heartbeat_span), + ); Ok(client) }; diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index 6b25481a..859f4405 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -110,9 +110,9 @@ where // `addr` message per connection, and if we only have one initial peer we // need to ensure that its `addr` message is used by the crawler. // XXX this should go in CandidateSet::new, but we need init() -> Result<_,_> - let _ = candidates.update().await; info!("Sending initial request for peers"); + let _ = candidates.update().await; for _ in 0..config.peerset_initial_target_size { let _ = demand_tx.try_send(());