Rename `PeerClient` to `peer::Client`.
This commit is contained in:
parent
35d0ce3143
commit
da78603d3a
|
|
@ -11,7 +11,7 @@ mod handshake;
|
||||||
/// Handles inbound requests from the network to our node.
|
/// Handles inbound requests from the network to our node.
|
||||||
mod server;
|
mod server;
|
||||||
|
|
||||||
pub use client::PeerClient;
|
pub use client::Client;
|
||||||
pub use connector::PeerConnector;
|
pub use connector::PeerConnector;
|
||||||
pub use error::{HandshakeError, PeerError, SharedPeerError};
|
pub use error::{HandshakeError, PeerError, SharedPeerError};
|
||||||
pub use handshake::PeerHandshake;
|
pub use handshake::PeerHandshake;
|
||||||
|
|
|
||||||
|
|
@ -15,15 +15,15 @@ use crate::protocol::internal::{Request, Response};
|
||||||
use super::{error::ErrorSlot, SharedPeerError};
|
use super::{error::ErrorSlot, SharedPeerError};
|
||||||
|
|
||||||
/// The "client" duplex half of a peer connection.
|
/// The "client" duplex half of a peer connection.
|
||||||
pub struct PeerClient {
|
pub struct Client {
|
||||||
pub(super) span: tracing::Span,
|
pub(super) span: tracing::Span,
|
||||||
pub(super) server_tx: mpsc::Sender<ClientRequest>,
|
pub(super) server_tx: mpsc::Sender<ClientRequest>,
|
||||||
pub(super) error_slot: ErrorSlot,
|
pub(super) error_slot: ErrorSlot,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A message from the `PeerClient` to the `PeerServer`, containing both a
|
/// A message from the `peer::Client` to the `PeerServer`, containing both a
|
||||||
/// request and a return message channel. The reason the return channel is
|
/// request and a return message channel. The reason the return channel is
|
||||||
/// included is because `PeerClient::call` returns a future that may be moved
|
/// included is because `peer::Client::call` returns a future that may be moved
|
||||||
/// around before it resolves, so the future must have ownership of the channel
|
/// around before it resolves, so the future must have ownership of the channel
|
||||||
/// on which it receives the response.
|
/// on which it receives the response.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
|
@ -32,7 +32,7 @@ pub(super) struct ClientRequest(
|
||||||
pub(super) oneshot::Sender<Result<Response, SharedPeerError>>,
|
pub(super) oneshot::Sender<Result<Response, SharedPeerError>>,
|
||||||
);
|
);
|
||||||
|
|
||||||
impl Service<Request> for PeerClient {
|
impl Service<Request> for Client {
|
||||||
type Response = Response;
|
type Response = Response;
|
||||||
type Error = SharedPeerError;
|
type Error = SharedPeerError;
|
||||||
type Future =
|
type Future =
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,7 @@ use tower::{discover::Change, Service, ServiceExt};
|
||||||
|
|
||||||
use crate::{BoxedStdError, Request, Response};
|
use crate::{BoxedStdError, Request, Response};
|
||||||
|
|
||||||
use super::{HandshakeError, PeerClient, PeerHandshake};
|
use super::{HandshakeError, Client, PeerHandshake};
|
||||||
|
|
||||||
/// A wrapper around [`PeerHandshake`] that opens a TCP connection before
|
/// A wrapper around [`PeerHandshake`] that opens a TCP connection before
|
||||||
/// forwarding to the inner handshake service. Writing this as its own
|
/// forwarding to the inner handshake service. Writing this as its own
|
||||||
|
|
@ -37,7 +37,7 @@ where
|
||||||
S: Service<Request, Response = Response, Error = BoxedStdError> + Clone + Send + 'static,
|
S: Service<Request, Response = Response, Error = BoxedStdError> + Clone + Send + 'static,
|
||||||
S::Future: Send,
|
S::Future: Send,
|
||||||
{
|
{
|
||||||
type Response = Change<SocketAddr, PeerClient>;
|
type Response = Change<SocketAddr, Client>;
|
||||||
type Error = HandshakeError;
|
type Error = HandshakeError;
|
||||||
type Future =
|
type Future =
|
||||||
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
|
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
|
||||||
|
|
|
||||||
|
|
@ -15,15 +15,15 @@ pub enum PeerError {
|
||||||
/// The remote peer closed the connection.
|
/// The remote peer closed the connection.
|
||||||
#[error("Peer closed connection")]
|
#[error("Peer closed connection")]
|
||||||
ConnectionClosed,
|
ConnectionClosed,
|
||||||
/// The [`PeerClient`] half of the [`PeerClient`]/[`PeerServer`] pair died before
|
/// The [`peer::Client`] half of the [`peer::Client`]/[`PeerServer`] pair died before
|
||||||
/// the [`PeerServer`] half did.
|
/// the [`Server`] half did.
|
||||||
#[error("PeerClient instance died")]
|
#[error("peer::Client instance died")]
|
||||||
DeadPeerClient,
|
DeadClient,
|
||||||
/// The [`PeerServer`] half of the [`PeerServer`]/[`PeerClient`] pair died before
|
/// The [`PeerServer`] half of the [`PeerServer`]/[`peer::Client`] pair died before
|
||||||
/// the [`PeerClient`] half did.
|
/// the [`peer::Client`] half did.
|
||||||
#[error("PeerServer instance died")]
|
#[error("PeerServer instance died")]
|
||||||
DeadPeerServer,
|
DeadPeerServer,
|
||||||
/// The remote peer did not respond to a [`PeerClient`] request in time.
|
/// The remote peer did not respond to a [`peer::Client`] request in time.
|
||||||
#[error("Client request timed out")]
|
#[error("Client request timed out")]
|
||||||
ClientRequestTimeout,
|
ClientRequestTimeout,
|
||||||
/// A serialization error occurred while reading or writing a message.
|
/// A serialization error occurred while reading or writing a message.
|
||||||
|
|
|
||||||
|
|
@ -25,7 +25,7 @@ use crate::{
|
||||||
BoxedStdError, Config,
|
BoxedStdError, Config,
|
||||||
};
|
};
|
||||||
|
|
||||||
use super::{error::ErrorSlot, server::ServerState, HandshakeError, PeerClient, PeerServer};
|
use super::{error::ErrorSlot, server::ServerState, HandshakeError, Client, PeerServer};
|
||||||
|
|
||||||
/// A [`Service`] that handshakes with a remote peer and constructs a
|
/// A [`Service`] that handshakes with a remote peer and constructs a
|
||||||
/// client/server pair.
|
/// client/server pair.
|
||||||
|
|
@ -77,7 +77,7 @@ where
|
||||||
S: Service<Request, Response = Response, Error = BoxedStdError> + Clone + Send + 'static,
|
S: Service<Request, Response = Response, Error = BoxedStdError> + Clone + Send + 'static,
|
||||||
S::Future: Send,
|
S::Future: Send,
|
||||||
{
|
{
|
||||||
type Response = PeerClient;
|
type Response = Client;
|
||||||
type Error = HandshakeError;
|
type Error = HandshakeError;
|
||||||
type Future =
|
type Future =
|
||||||
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
|
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
|
||||||
|
|
@ -177,14 +177,14 @@ where
|
||||||
// two versions, etc. -- actually is it possible to edit the `Codec`
|
// two versions, etc. -- actually is it possible to edit the `Codec`
|
||||||
// after using it to make a framed adapter?
|
// after using it to make a framed adapter?
|
||||||
|
|
||||||
debug!("constructing PeerClient, spawning PeerServer");
|
debug!("constructing client, spawning server");
|
||||||
|
|
||||||
// These channels should not be cloned more than they are
|
// These channels should not be cloned more than they are
|
||||||
// in this block, see constants.rs for more.
|
// in this block, see constants.rs for more.
|
||||||
let (server_tx, server_rx) = mpsc::channel(0);
|
let (server_tx, server_rx) = mpsc::channel(0);
|
||||||
let slot = ErrorSlot::default();
|
let slot = ErrorSlot::default();
|
||||||
|
|
||||||
let client = PeerClient {
|
let client = Client {
|
||||||
span: connection_span.clone(),
|
span: connection_span.clone(),
|
||||||
server_tx: server_tx.clone(),
|
server_tx: server_tx.clone(),
|
||||||
error_slot: slot.clone(),
|
error_slot: slot.clone(),
|
||||||
|
|
|
||||||
|
|
@ -42,7 +42,7 @@ pub struct PeerServer<S, Tx> {
|
||||||
pub(super) request_timer: Option<Delay>,
|
pub(super) request_timer: Option<Delay>,
|
||||||
pub(super) svc: S,
|
pub(super) svc: S,
|
||||||
pub(super) client_rx: mpsc::Receiver<ClientRequest>,
|
pub(super) client_rx: mpsc::Receiver<ClientRequest>,
|
||||||
/// A slot shared between the PeerServer and PeerClient for storing an error.
|
/// A slot shared between the client and server for storing an error.
|
||||||
pub(super) error_slot: ErrorSlot,
|
pub(super) error_slot: ErrorSlot,
|
||||||
//pub(super) peer_rx: Rx,
|
//pub(super) peer_rx: Rx,
|
||||||
pub(super) peer_tx: Tx,
|
pub(super) peer_tx: Tx,
|
||||||
|
|
@ -65,7 +65,7 @@ where
|
||||||
// request from the remote peer to our node.
|
// request from the remote peer to our node.
|
||||||
//
|
//
|
||||||
// We also need to handle those client requests in the first place. The client
|
// We also need to handle those client requests in the first place. The client
|
||||||
// requests are received from the corresponding `PeerClient` over a bounded
|
// requests are received from the corresponding `peer::Client` over a bounded
|
||||||
// channel (with bound 1, to minimize buffering), but there is no relationship
|
// channel (with bound 1, to minimize buffering), but there is no relationship
|
||||||
// between the stream of client requests and the stream of peer messages, so we
|
// between the stream of client requests and the stream of peer messages, so we
|
||||||
// cannot ignore one kind while waiting on the other. Moreover, we cannot accept
|
// cannot ignore one kind while waiting on the other. Moreover, we cannot accept
|
||||||
|
|
@ -93,7 +93,7 @@ where
|
||||||
self.handle_message_as_request(msg).await
|
self.handle_message_as_request(msg).await
|
||||||
}
|
}
|
||||||
Either::Right((None, _)) => {
|
Either::Right((None, _)) => {
|
||||||
self.fail_with(PeerError::DeadPeerClient);
|
self.fail_with(PeerError::DeadClient);
|
||||||
}
|
}
|
||||||
Either::Right((Some(req), _)) => self.handle_client_request(req).await,
|
Either::Right((Some(req), _)) => self.handle_client_request(req).await,
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -70,7 +70,7 @@ use crate::{types::MetaAddr, AddressBook, BoxedStdError, Request, Response};
|
||||||
/// ▼
|
/// ▼
|
||||||
/// ┌────────────┐
|
/// ┌────────────┐
|
||||||
/// │ send │
|
/// │ send │
|
||||||
/// │ PeerClient │
|
/// │peer::Client│
|
||||||
/// │to Discover │
|
/// │to Discover │
|
||||||
/// └────────────┘
|
/// └────────────┘
|
||||||
/// ```
|
/// ```
|
||||||
|
|
|
||||||
|
|
@ -27,7 +27,8 @@ use tower::{
|
||||||
use tower_load::{peak_ewma::PeakEwmaDiscover, NoInstrument};
|
use tower_load::{peak_ewma::PeakEwmaDiscover, NoInstrument};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
peer::{PeerClient, PeerConnector, PeerHandshake},
|
peer,
|
||||||
|
peer::{PeerConnector, PeerHandshake},
|
||||||
timestamp_collector::TimestampCollector,
|
timestamp_collector::TimestampCollector,
|
||||||
AddressBook, BoxedStdError, Config, Request, Response,
|
AddressBook, BoxedStdError, Config, Request, Response,
|
||||||
};
|
};
|
||||||
|
|
@ -35,7 +36,7 @@ use crate::{
|
||||||
use super::CandidateSet;
|
use super::CandidateSet;
|
||||||
use super::PeerSet;
|
use super::PeerSet;
|
||||||
|
|
||||||
type PeerChange = Result<Change<SocketAddr, PeerClient>, BoxedStdError>;
|
type PeerChange = Result<Change<SocketAddr, peer::Client>, BoxedStdError>;
|
||||||
|
|
||||||
/// Initialize a peer set with the given `config`, forwarding peer requests to the `inbound_service`.
|
/// Initialize a peer set with the given `config`, forwarding peer requests to the `inbound_service`.
|
||||||
pub async fn init<S>(
|
pub async fn init<S>(
|
||||||
|
|
@ -150,7 +151,7 @@ async fn add_initial_peers<S>(
|
||||||
connector: S,
|
connector: S,
|
||||||
mut tx: mpsc::Sender<PeerChange>,
|
mut tx: mpsc::Sender<PeerChange>,
|
||||||
) where
|
) where
|
||||||
S: Service<SocketAddr, Response = Change<SocketAddr, PeerClient>, Error = BoxedStdError>
|
S: Service<SocketAddr, Response = Change<SocketAddr, peer::Client>, Error = BoxedStdError>
|
||||||
+ Clone,
|
+ Clone,
|
||||||
S::Future: Send + 'static,
|
S::Future: Send + 'static,
|
||||||
{
|
{
|
||||||
|
|
@ -172,7 +173,7 @@ async fn listen<S>(
|
||||||
tx: mpsc::Sender<PeerChange>,
|
tx: mpsc::Sender<PeerChange>,
|
||||||
) -> Result<(), BoxedStdError>
|
) -> Result<(), BoxedStdError>
|
||||||
where
|
where
|
||||||
S: Service<(TcpStream, SocketAddr), Response = PeerClient, Error = BoxedStdError> + Clone,
|
S: Service<(TcpStream, SocketAddr), Response = peer::Client, Error = BoxedStdError> + Clone,
|
||||||
S::Future: Send + 'static,
|
S::Future: Send + 'static,
|
||||||
{
|
{
|
||||||
let mut listener = TcpListener::bind(addr).await?;
|
let mut listener = TcpListener::bind(addr).await?;
|
||||||
|
|
@ -194,7 +195,7 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Given a channel that signals a need for new peers, try to connect to a peer
|
/// Given a channel that signals a need for new peers, try to connect to a peer
|
||||||
/// and send the resulting `PeerClient` through a channel.
|
/// and send the resulting `peer::Client` through a channel.
|
||||||
///
|
///
|
||||||
#[instrument(skip(new_peer_interval, demand_signal, candidates, connector, success_tx))]
|
#[instrument(skip(new_peer_interval, demand_signal, candidates, connector, success_tx))]
|
||||||
async fn crawl_and_dial<C, S>(
|
async fn crawl_and_dial<C, S>(
|
||||||
|
|
@ -205,7 +206,7 @@ async fn crawl_and_dial<C, S>(
|
||||||
mut success_tx: mpsc::Sender<PeerChange>,
|
mut success_tx: mpsc::Sender<PeerChange>,
|
||||||
) -> Result<(), BoxedStdError>
|
) -> Result<(), BoxedStdError>
|
||||||
where
|
where
|
||||||
C: Service<SocketAddr, Response = Change<SocketAddr, PeerClient>, Error = BoxedStdError>
|
C: Service<SocketAddr, Response = Change<SocketAddr, peer::Client>, Error = BoxedStdError>
|
||||||
+ Clone,
|
+ Clone,
|
||||||
C::Future: Send + 'static,
|
C::Future: Send + 'static,
|
||||||
S: Service<Request, Response = Response, Error = BoxedStdError>,
|
S: Service<Request, Response = Response, Error = BoxedStdError>,
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue