diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index 4894977d..2f4f7dc8 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -195,21 +195,29 @@ where let (peer_tx, peer_rx) = stream.split(); - use super::connection; - let server = Connection { - state: connection::State::AwaitingRequest, - svc: internal_service, - client_rx: server_rx, - error_slot: slot, - peer_tx, - request_timer: None, - }; + // Instrument the peer's rx and tx streams. - let hooked_peer_rx = peer_rx + let peer_tx = peer_tx.with(move |msg: Message| { + // Add a metric for outbound messages. + metrics::counter!("peer.outbound_messages", 1, "addr" => addr.to_string()); + // We need to use future::ready rather than an async block here, + // because we need the sink to be Unpin, and the With + // returned by .with is Unpin only if Fut is Unpin, and the + // futures generated by async blocks are not Unpin. + future::ready(Ok(msg)) + }); + + let peer_rx = peer_rx .then(move |msg| { + // Add a metric for inbound messages and fire a timestamp event. let mut timestamp_collector = timestamp_collector.clone(); async move { if msg.is_ok() { + metrics::counter!( + "inbound_messages", + 1, + "addr" => addr.to_string(), + ); use futures::sink::SinkExt; let _ = timestamp_collector .send(MetaAddr { @@ -224,12 +232,17 @@ where }) .boxed(); - tokio::spawn( - server - .run(hooked_peer_rx) - .instrument(connection_span) - .boxed(), - ); + use super::connection; + let server = Connection { + state: connection::State::AwaitingRequest, + svc: internal_service, + client_rx: server_rx, + error_slot: slot, + peer_tx, + request_timer: None, + }; + + tokio::spawn(server.run(peer_rx).instrument(connection_span).boxed()); tokio::spawn(async move { use futures::channel::oneshot;