diff --git a/zebra-network/src/peer.rs b/zebra-network/src/peer.rs index 16da7436..b82f3d99 100644 --- a/zebra-network/src/peer.rs +++ b/zebra-network/src/peer.rs @@ -11,7 +11,7 @@ mod handshake; /// Handles inbound requests from the network to our node. mod server; -pub use client::PeerClient; +pub use client::Client; pub use connector::PeerConnector; pub use error::{HandshakeError, PeerError, SharedPeerError}; pub use handshake::PeerHandshake; diff --git a/zebra-network/src/peer/client.rs b/zebra-network/src/peer/client.rs index 239103ed..4889bd0a 100644 --- a/zebra-network/src/peer/client.rs +++ b/zebra-network/src/peer/client.rs @@ -15,15 +15,15 @@ use crate::protocol::internal::{Request, Response}; use super::{error::ErrorSlot, SharedPeerError}; /// The "client" duplex half of a peer connection. -pub struct PeerClient { +pub struct Client { pub(super) span: tracing::Span, pub(super) server_tx: mpsc::Sender, pub(super) error_slot: ErrorSlot, } -/// A message from the `PeerClient` to the `PeerServer`, containing both a +/// A message from the `peer::Client` to the `PeerServer`, containing both a /// request and a return message channel. The reason the return channel is -/// included is because `PeerClient::call` returns a future that may be moved +/// included is because `peer::Client::call` returns a future that may be moved /// around before it resolves, so the future must have ownership of the channel /// on which it receives the response. #[derive(Debug)] @@ -32,7 +32,7 @@ pub(super) struct ClientRequest( pub(super) oneshot::Sender>, ); -impl Service for PeerClient { +impl Service for Client { type Response = Response; type Error = SharedPeerError; type Future = diff --git a/zebra-network/src/peer/connector.rs b/zebra-network/src/peer/connector.rs index a9845a68..abd2be3b 100644 --- a/zebra-network/src/peer/connector.rs +++ b/zebra-network/src/peer/connector.rs @@ -9,7 +9,7 @@ use tower::{discover::Change, Service, ServiceExt}; use crate::{BoxedStdError, Request, Response}; -use super::{HandshakeError, PeerClient, PeerHandshake}; +use super::{HandshakeError, Client, PeerHandshake}; /// A wrapper around [`PeerHandshake`] that opens a TCP connection before /// forwarding to the inner handshake service. Writing this as its own @@ -37,7 +37,7 @@ where S: Service + Clone + Send + 'static, S::Future: Send, { - type Response = Change; + type Response = Change; type Error = HandshakeError; type Future = Pin> + Send + 'static>>; diff --git a/zebra-network/src/peer/error.rs b/zebra-network/src/peer/error.rs index 379da4c4..8f7b20ea 100644 --- a/zebra-network/src/peer/error.rs +++ b/zebra-network/src/peer/error.rs @@ -15,15 +15,15 @@ pub enum PeerError { /// The remote peer closed the connection. #[error("Peer closed connection")] ConnectionClosed, - /// The [`PeerClient`] half of the [`PeerClient`]/[`PeerServer`] pair died before - /// the [`PeerServer`] half did. - #[error("PeerClient instance died")] - DeadPeerClient, - /// The [`PeerServer`] half of the [`PeerServer`]/[`PeerClient`] pair died before - /// the [`PeerClient`] half did. + /// The [`peer::Client`] half of the [`peer::Client`]/[`PeerServer`] pair died before + /// the [`Server`] half did. + #[error("peer::Client instance died")] + DeadClient, + /// The [`PeerServer`] half of the [`PeerServer`]/[`peer::Client`] pair died before + /// the [`peer::Client`] half did. #[error("PeerServer instance died")] DeadPeerServer, - /// The remote peer did not respond to a [`PeerClient`] request in time. + /// The remote peer did not respond to a [`peer::Client`] request in time. #[error("Client request timed out")] ClientRequestTimeout, /// A serialization error occurred while reading or writing a message. diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index d76e9638..39bece0e 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -25,7 +25,7 @@ use crate::{ BoxedStdError, Config, }; -use super::{error::ErrorSlot, server::ServerState, HandshakeError, PeerClient, PeerServer}; +use super::{error::ErrorSlot, server::ServerState, HandshakeError, Client, PeerServer}; /// A [`Service`] that handshakes with a remote peer and constructs a /// client/server pair. @@ -77,7 +77,7 @@ where S: Service + Clone + Send + 'static, S::Future: Send, { - type Response = PeerClient; + type Response = Client; type Error = HandshakeError; type Future = Pin> + Send + 'static>>; @@ -177,14 +177,14 @@ where // two versions, etc. -- actually is it possible to edit the `Codec` // after using it to make a framed adapter? - debug!("constructing PeerClient, spawning PeerServer"); + debug!("constructing client, spawning server"); // These channels should not be cloned more than they are // in this block, see constants.rs for more. let (server_tx, server_rx) = mpsc::channel(0); let slot = ErrorSlot::default(); - let client = PeerClient { + let client = Client { span: connection_span.clone(), server_tx: server_tx.clone(), error_slot: slot.clone(), diff --git a/zebra-network/src/peer/server.rs b/zebra-network/src/peer/server.rs index dc32c487..e5309e52 100644 --- a/zebra-network/src/peer/server.rs +++ b/zebra-network/src/peer/server.rs @@ -42,7 +42,7 @@ pub struct PeerServer { pub(super) request_timer: Option, pub(super) svc: S, pub(super) client_rx: mpsc::Receiver, - /// A slot shared between the PeerServer and PeerClient for storing an error. + /// A slot shared between the client and server for storing an error. pub(super) error_slot: ErrorSlot, //pub(super) peer_rx: Rx, pub(super) peer_tx: Tx, @@ -65,7 +65,7 @@ where // request from the remote peer to our node. // // We also need to handle those client requests in the first place. The client - // requests are received from the corresponding `PeerClient` over a bounded + // requests are received from the corresponding `peer::Client` over a bounded // channel (with bound 1, to minimize buffering), but there is no relationship // between the stream of client requests and the stream of peer messages, so we // cannot ignore one kind while waiting on the other. Moreover, we cannot accept @@ -93,7 +93,7 @@ where self.handle_message_as_request(msg).await } Either::Right((None, _)) => { - self.fail_with(PeerError::DeadPeerClient); + self.fail_with(PeerError::DeadClient); } Either::Right((Some(req), _)) => self.handle_client_request(req).await, } diff --git a/zebra-network/src/peer_set/candidate_set.rs b/zebra-network/src/peer_set/candidate_set.rs index 8e74a35e..d3dad586 100644 --- a/zebra-network/src/peer_set/candidate_set.rs +++ b/zebra-network/src/peer_set/candidate_set.rs @@ -70,7 +70,7 @@ use crate::{types::MetaAddr, AddressBook, BoxedStdError, Request, Response}; /// ▼ /// ┌────────────┐ /// │ send │ -/// │ PeerClient │ +/// │peer::Client│ /// │to Discover │ /// └────────────┘ /// ``` diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index bc0b4d6a..3c48ddfb 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -27,7 +27,8 @@ use tower::{ use tower_load::{peak_ewma::PeakEwmaDiscover, NoInstrument}; use crate::{ - peer::{PeerClient, PeerConnector, PeerHandshake}, + peer, + peer::{PeerConnector, PeerHandshake}, timestamp_collector::TimestampCollector, AddressBook, BoxedStdError, Config, Request, Response, }; @@ -35,7 +36,7 @@ use crate::{ use super::CandidateSet; use super::PeerSet; -type PeerChange = Result, BoxedStdError>; +type PeerChange = Result, BoxedStdError>; /// Initialize a peer set with the given `config`, forwarding peer requests to the `inbound_service`. pub async fn init( @@ -150,7 +151,7 @@ async fn add_initial_peers( connector: S, mut tx: mpsc::Sender, ) where - S: Service, Error = BoxedStdError> + S: Service, Error = BoxedStdError> + Clone, S::Future: Send + 'static, { @@ -172,7 +173,7 @@ async fn listen( tx: mpsc::Sender, ) -> Result<(), BoxedStdError> where - S: Service<(TcpStream, SocketAddr), Response = PeerClient, Error = BoxedStdError> + Clone, + S: Service<(TcpStream, SocketAddr), Response = peer::Client, Error = BoxedStdError> + Clone, S::Future: Send + 'static, { let mut listener = TcpListener::bind(addr).await?; @@ -194,7 +195,7 @@ where } /// Given a channel that signals a need for new peers, try to connect to a peer -/// and send the resulting `PeerClient` through a channel. +/// and send the resulting `peer::Client` through a channel. /// #[instrument(skip(new_peer_interval, demand_signal, candidates, connector, success_tx))] async fn crawl_and_dial( @@ -205,7 +206,7 @@ async fn crawl_and_dial( mut success_tx: mpsc::Sender, ) -> Result<(), BoxedStdError> where - C: Service, Error = BoxedStdError> + C: Service, Error = BoxedStdError> + Clone, C::Future: Send + 'static, S: Service,