network: propagate tracing Spans through peer connection

This commit is contained in:
Henry de Valence 2020-06-22 19:19:27 -07:00
parent ff4e722cd7
commit 217c25ef07
4 changed files with 128 additions and 65 deletions

View File

@ -16,21 +16,22 @@ use super::{ErrorSlot, SharedPeerError};
/// The "client" duplex half of a peer connection. /// The "client" duplex half of a peer connection.
pub struct Client { pub struct Client {
pub(super) span: tracing::Span,
pub(super) server_tx: mpsc::Sender<ClientRequest>, pub(super) server_tx: mpsc::Sender<ClientRequest>,
pub(super) error_slot: ErrorSlot, pub(super) error_slot: ErrorSlot,
} }
/// A message from the `peer::Client` to the `peer::Server`, containing both a /// A message from the `peer::Client` to the `peer::Server`.
/// request and a return message channel. The reason the return channel is
/// included is because `peer::Client::call` returns a future that may be moved
/// around before it resolves, so the future must have ownership of the channel
/// on which it receives the response.
#[derive(Debug)] #[derive(Debug)]
pub(super) struct ClientRequest( pub(super) struct ClientRequest {
pub(super) Request, /// The actual request.
pub(super) oneshot::Sender<Result<Response, SharedPeerError>>, pub request: Request,
); /// The return message channel, included because `peer::Client::call` returns a
/// future that may be moved around before it resolves.
pub tx: oneshot::Sender<Result<Response, SharedPeerError>>,
/// The tracing context for the request, so that work the connection task does
/// processing messages in the context of this request will have correct context.
pub span: tracing::Span,
}
impl Service<Request> for Client { impl Service<Request> for Client {
type Response = Response; type Response = Response;
@ -49,19 +50,23 @@ impl Service<Request> for Client {
} }
} }
fn call(&mut self, req: Request) -> Self::Future { fn call(&mut self, request: Request) -> Self::Future {
use futures::future::FutureExt; use futures::future::FutureExt;
use tracing_futures::Instrument;
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
match self.server_tx.try_send(ClientRequest(req, tx)) { // get the current Span to propagate it to the peer connection task.
// this allows the peer connection to enter the correct tracing context
// when it's handling messages in the context of processing this
// request.
let span = tracing::Span::current();
match self.server_tx.try_send(ClientRequest { request, span, tx }) {
Err(e) => { Err(e) => {
if e.is_disconnected() { if e.is_disconnected() {
future::ready(Err(self future::ready(Err(self
.error_slot .error_slot
.try_get_error() .try_get_error()
.expect("failed servers must set their error slot"))) .expect("failed servers must set their error slot")))
.instrument(self.span.clone())
.boxed() .boxed()
} else { } else {
// sending fails when there's not enough // sending fails when there's not enough
@ -75,7 +80,6 @@ impl Service<Request> for Client {
oneshot_recv_result oneshot_recv_result
.expect("ClientRequest oneshot sender must not be dropped before send") .expect("ClientRequest oneshot sender must not be dropped before send")
}) })
.instrument(self.span.clone())
.boxed() .boxed()
} }
} }

View File

@ -9,6 +9,7 @@ use futures::{
}; };
use tokio::time::{delay_for, Delay}; use tokio::time::{delay_for, Delay};
use tower::Service; use tower::Service;
use tracing_futures::Instrument;
use zebra_chain::{ use zebra_chain::{
block::{Block, BlockHeaderHash}, block::{Block, BlockHeaderHash},
@ -46,7 +47,6 @@ impl Handler {
/// contents to responses without additional copies. If the message is not /// contents to responses without additional copies. If the message is not
/// interpretable as a response, we return ownership to the caller. /// interpretable as a response, we return ownership to the caller.
fn process_message(&mut self, msg: Message) -> Option<Message> { fn process_message(&mut self, msg: Message) -> Option<Message> {
trace!(?msg);
// This function is where we statefully interpret Bitcoin/Zcash messages // This function is where we statefully interpret Bitcoin/Zcash messages
// into responses to messages in the internal request/response protocol. // into responses to messages in the internal request/response protocol.
// This conversion is done by a sequence of (request, message) match arms, // This conversion is done by a sequence of (request, message) match arms,
@ -93,6 +93,7 @@ impl Handler {
))), ))),
// By default, messages are not responses. // By default, messages are not responses.
(state, msg) => { (state, msg) => {
trace!(?msg, "did not interpret message as response");
ignored_msg = Some(msg); ignored_msg = Some(msg);
state state
} }
@ -106,7 +107,11 @@ pub(super) enum State {
/// Awaiting a client request or a peer message. /// Awaiting a client request or a peer message.
AwaitingRequest, AwaitingRequest,
/// Awaiting a peer message we can interpret as a client request. /// Awaiting a peer message we can interpret as a client request.
AwaitingResponse(Handler, oneshot::Sender<Result<Response, SharedPeerError>>), AwaitingResponse {
handler: Handler,
tx: oneshot::Sender<Result<Response, SharedPeerError>>,
span: tracing::Span,
},
/// A failure has occurred and we are shutting down the connection. /// A failure has occurred and we are shutting down the connection.
Failed, Failed,
} }
@ -171,18 +176,26 @@ where
Either::Right((None, _)) => { Either::Right((None, _)) => {
self.fail_with(PeerError::DeadClient); self.fail_with(PeerError::DeadClient);
} }
Either::Right((Some(req), _)) => self.handle_client_request(req).await, Either::Right((Some(req), _)) => {
let span = req.span.clone();
self.handle_client_request(req).instrument(span).await
}
} }
} }
// We're awaiting a response to a client request, // We're awaiting a response to a client request,
// so wait on either a peer message, or on a request timeout. // so wait on either a peer message, or on a request timeout.
State::AwaitingResponse { .. } => { State::AwaitingResponse { ref span, .. } => {
trace!("awaiting response to client request"); // we have to get rid of the span reference so we can tamper with the state
let span = span.clone();
trace!(parent: &span, "awaiting response to client request");
let timer_ref = self let timer_ref = self
.request_timer .request_timer
.as_mut() .as_mut()
.expect("timeout must be set while awaiting response"); .expect("timeout must be set while awaiting response");
match future::select(peer_rx.next(), timer_ref).await { match future::select(peer_rx.next(), timer_ref)
.instrument(span.clone())
.await
{
Either::Left((None, _)) => self.fail_with(PeerError::ConnectionClosed), Either::Left((None, _)) => self.fail_with(PeerError::ConnectionClosed),
Either::Left((Some(Err(e)), _)) => self.fail_with(e.into()), Either::Left((Some(Err(e)), _)) => self.fail_with(e.into()),
Either::Left((Some(Ok(peer_msg)), _timer)) => { Either::Left((Some(Ok(peer_msg)), _timer)) => {
@ -194,39 +207,48 @@ where
// factor the state required for inbound and // factor the state required for inbound and
// outbound requests. // outbound requests.
let request_msg = match self.state { let request_msg = match self.state {
State::AwaitingResponse(ref mut handler, _) => { State::AwaitingResponse {
handler.process_message(peer_msg) ref mut handler, ..
} } => span.in_scope(|| handler.process_message(peer_msg)),
_ => unreachable!(), _ => unreachable!(),
}; };
// If the message was not consumed, check whether it // If the message was not consumed, check whether it
// should be handled as a request. // should be handled as a request.
if let Some(msg) = request_msg { if let Some(msg) = request_msg {
// do NOT instrument with the request span, this is
// independent work
self.handle_message_as_request(msg).await; self.handle_message_as_request(msg).await;
} else { } else {
// Otherwise, check whether the handler is finished // Otherwise, check whether the handler is finished
// processing messages and update the state. // processing messages and update the state.
self.state = match self.state { self.state = match self.state {
State::AwaitingResponse(Handler::Finished(response), tx) => { State::AwaitingResponse {
handler: Handler::Finished(response),
tx,
..
} => {
let _ = tx.send(response); let _ = tx.send(response);
State::AwaitingRequest State::AwaitingRequest
} }
pending @ State::AwaitingResponse(_, _) => pending, pending @ State::AwaitingResponse { .. } => pending,
_ => unreachable!(), _ => unreachable!(),
}; };
} }
} }
Either::Right(((), _peer_fut)) => { Either::Right(((), _peer_fut)) => {
trace!("client request timed out"); trace!(parent: &span, "client request timed out");
let e = PeerError::ClientRequestTimeout; let e = PeerError::ClientRequestTimeout;
self.state = match self.state { self.state = match self.state {
// Special case: ping timeouts fail the connection. // Special case: ping timeouts fail the connection.
State::AwaitingResponse(Handler::Ping(_), _) => { State::AwaitingResponse {
handler: Handler::Ping(_),
..
} => {
self.fail_with(e); self.fail_with(e);
State::Failed State::Failed
} }
// Other request timeouts fail the request. // Other request timeouts fail the request.
State::AwaitingResponse(_, tx) => { State::AwaitingResponse { tx, .. } => {
let _ = tx.send(Err(e.into())); let _ = tx.send(Err(e.into()));
State::AwaitingRequest State::AwaitingRequest
} }
@ -239,7 +261,11 @@ where
// requests before we can return and complete the future. // requests before we can return and complete the future.
State::Failed => { State::Failed => {
match self.client_rx.next().await { match self.client_rx.next().await {
Some(ClientRequest(_, tx)) => { Some(ClientRequest { tx, span, .. }) => {
trace!(
parent: &span,
"erroring pending request to failed connection"
);
let e = self let e = self
.error_slot .error_slot
.try_get_error() .try_get_error()
@ -278,7 +304,7 @@ where
// we need to deal with it first if it exists. // we need to deal with it first if it exists.
self.client_rx.close(); self.client_rx.close();
let old_state = std::mem::replace(&mut self.state, State::Failed); let old_state = std::mem::replace(&mut self.state, State::Failed);
if let State::AwaitingResponse(_, tx) = old_state { if let State::AwaitingResponse { tx, .. } = old_state {
// We know the slot has Some(e) because we just set it above, // We know the slot has Some(e) because we just set it above,
// and the error slot is never unset. // and the error slot is never unset.
let e = self.error_slot.try_get_error().unwrap(); let e = self.error_slot.try_get_error().unwrap();
@ -288,15 +314,19 @@ where
/// Handle an incoming client request, possibly generating outgoing messages to the /// Handle an incoming client request, possibly generating outgoing messages to the
/// remote peer. /// remote peer.
async fn handle_client_request(&mut self, msg: ClientRequest) { ///
trace!(?msg); /// NOTE: the caller should use .instrument(msg.span) to instrument the function.
async fn handle_client_request(&mut self, req: ClientRequest) {
trace!(?req.request);
use Request::*; use Request::*;
use State::*; use State::*;
let ClientRequest(req, tx) = msg; let ClientRequest { request, tx, span } = req;
// XXX(hdevalence) this is truly horrible, but let's fix it later
// Inner match returns Result with the new state or an error. // Inner match returns Result with the new state or an error.
// Outer match updates state or fails. // Outer match updates state or fails.
match match (&self.state, req) { match match (&self.state, request) {
(Failed, _) => panic!("failed connection cannot handle requests"), (Failed, _) => panic!("failed connection cannot handle requests"),
(AwaitingResponse { .. }, _) => panic!("tried to update pending request"), (AwaitingResponse { .. }, _) => panic!("tried to update pending request"),
(AwaitingRequest, Peers) => self (AwaitingRequest, Peers) => self
@ -304,13 +334,21 @@ where
.send(Message::GetAddr) .send(Message::GetAddr)
.await .await
.map_err(|e| e.into()) .map_err(|e| e.into())
.map(|()| AwaitingResponse(Handler::GetPeers, tx)), .map(|()| AwaitingResponse {
handler: Handler::GetPeers,
tx,
span,
}),
(AwaitingRequest, Ping(nonce)) => self (AwaitingRequest, Ping(nonce)) => self
.peer_tx .peer_tx
.send(Message::Ping(nonce)) .send(Message::Ping(nonce))
.await .await
.map_err(|e| e.into()) .map_err(|e| e.into())
.map(|()| AwaitingResponse(Handler::Ping(nonce), tx)), .map(|()| AwaitingResponse {
handler: Handler::Ping(nonce),
tx,
span,
}),
(AwaitingRequest, BlocksByHash(hashes)) => self (AwaitingRequest, BlocksByHash(hashes)) => self
.peer_tx .peer_tx
.send(Message::GetData( .send(Message::GetData(
@ -318,14 +356,13 @@ where
)) ))
.await .await
.map_err(|e| e.into()) .map_err(|e| e.into())
.map(|()| { .map(|()| AwaitingResponse {
AwaitingResponse( handler: Handler::GetBlocksByHash {
Handler::GetBlocksByHash { blocks: Vec::with_capacity(hashes.len()),
blocks: Vec::with_capacity(hashes.len()), hashes,
hashes, },
}, tx,
tx, span,
)
}), }),
(AwaitingRequest, FindBlocks { known_blocks, stop }) => self (AwaitingRequest, FindBlocks { known_blocks, stop }) => self
.peer_tx .peer_tx
@ -335,7 +372,11 @@ where
}) })
.await .await
.map_err(|e| e.into()) .map_err(|e| e.into())
.map(|()| AwaitingResponse(Handler::FindBlocks, tx)), .map(|()| AwaitingResponse {
handler: Handler::FindBlocks,
tx,
span,
}),
} { } {
Ok(new_state) => { Ok(new_state) => {
self.state = new_state; self.state = new_state;
@ -345,6 +386,9 @@ where
} }
} }
// This function has its own span, because we're creating a new work
// context (namely, the work of processing the inbound msg as a request)
#[instrument(skip(self))]
async fn handle_message_as_request(&mut self, msg: Message) { async fn handle_message_as_request(&mut self, msg: Message) {
trace!(?msg); trace!(?msg);
// These messages are transport-related, handle them separately: // These messages are transport-related, handle them separately:
@ -358,6 +402,7 @@ where
return; return;
} }
Message::Ping(nonce) => { Message::Ping(nonce) => {
trace!(?nonce, "responding to heartbeat");
match self.peer_tx.send(Message::Pong(nonce)).await { match self.peer_tx.send(Message::Pong(nonce)).await {
Ok(()) => {} Ok(()) => {}
Err(e) => self.fail_with(e.into()), Err(e) => self.fail_with(e.into()),

View File

@ -212,7 +212,6 @@ where
let slot = ErrorSlot::default(); let slot = ErrorSlot::default();
let client = Client { let client = Client {
span: connection_span.clone(),
server_tx: server_tx.clone(), server_tx: server_tx.clone(),
error_slot: slot.clone(), error_slot: slot.clone(),
}; };
@ -268,31 +267,46 @@ where
request_timer: None, request_timer: None,
}; };
tokio::spawn(server.run(peer_rx).instrument(connection_span).boxed()); tokio::spawn(
server
.run(peer_rx)
.instrument(connection_span.clone())
.boxed(),
);
tokio::spawn(async move { let heartbeat_span = tracing::debug_span!(parent: connection_span, "heartbeat");
use futures::channel::oneshot; tokio::spawn(
async move {
use futures::channel::oneshot;
use super::client::ClientRequest; use super::client::ClientRequest;
let mut server_tx = server_tx; let mut server_tx = server_tx;
let mut interval_stream = tokio::time::interval(constants::HEARTBEAT_INTERVAL); let mut interval_stream = tokio::time::interval(constants::HEARTBEAT_INTERVAL);
loop { loop {
interval_stream.tick().await; interval_stream.tick().await;
// We discard the server handle because our // We discard the server handle because our
// heartbeat `Ping`s are a special case, and we // heartbeat `Ping`s are a special case, and we
// don't actually care about the response here. // don't actually care about the response here.
let (request_tx, _) = oneshot::channel(); let (request_tx, _) = oneshot::channel();
let msg = ClientRequest(Request::Ping(Nonce::default()), request_tx); if server_tx
.send(ClientRequest {
if server_tx.send(msg).await.is_err() { request: Request::Ping(Nonce::default()),
return; tx: request_tx,
span: tracing::Span::current(),
})
.await
.is_err()
{
return;
}
} }
} }
}); .instrument(heartbeat_span),
);
Ok(client) Ok(client)
}; };

View File

@ -110,9 +110,9 @@ where
// `addr` message per connection, and if we only have one initial peer we // `addr` message per connection, and if we only have one initial peer we
// need to ensure that its `addr` message is used by the crawler. // need to ensure that its `addr` message is used by the crawler.
// XXX this should go in CandidateSet::new, but we need init() -> Result<_,_> // XXX this should go in CandidateSet::new, but we need init() -> Result<_,_>
let _ = candidates.update().await;
info!("Sending initial request for peers"); info!("Sending initial request for peers");
let _ = candidates.update().await;
for _ in 0..config.peerset_initial_target_size { for _ in 0..config.peerset_initial_target_size {
let _ = demand_tx.try_send(()); let _ = demand_tx.try_send(());