diff --git a/zebra-network/src/lib.rs b/zebra-network/src/lib.rs index 38460d8a..b7af274d 100644 --- a/zebra-network/src/lib.rs +++ b/zebra-network/src/lib.rs @@ -68,5 +68,5 @@ pub mod types { /// This will be removed when we finish encapsulation pub mod should_be_private { - pub use crate::{peer::PeerConnector, timestamp_collector::TimestampCollector}; + pub use crate::{peer::PeerHandshake, timestamp_collector::TimestampCollector}; } diff --git a/zebra-network/src/peer.rs b/zebra-network/src/peer.rs index c61b7bd9..c0cbdac7 100644 --- a/zebra-network/src/peer.rs +++ b/zebra-network/src/peer.rs @@ -2,14 +2,14 @@ /// Handles outbound requests from our node to the network. mod client; -/// Asynchronously connects to peers. -mod connector; /// Peer-related errors. mod error; +/// Performs peer handshakes. +mod handshake; /// Handles inbound requests from the network to our node. mod server; pub use client::PeerClient; -pub use connector::PeerConnector; pub use error::{HandshakeError, PeerError, SharedPeerError}; +pub use handshake::PeerHandshake; pub use server::PeerServer; diff --git a/zebra-network/src/peer/connector.rs b/zebra-network/src/peer/handshake.rs similarity index 97% rename from zebra-network/src/peer/connector.rs rename to zebra-network/src/peer/handshake.rs index 876e48c0..2bd3351c 100644 --- a/zebra-network/src/peer/connector.rs +++ b/zebra-network/src/peer/handshake.rs @@ -25,15 +25,16 @@ use crate::{ use super::{error::ErrorSlot, server::ServerState, HandshakeError, PeerClient, PeerServer}; -/// A [`Service`] that connects to a remote peer and constructs a client/server pair. -pub struct PeerConnector { +/// A [`Service`] that handshakes with a remote peer and constructs a +/// client/server pair. +pub struct PeerHandshake { config: Config, internal_service: S, timestamp_collector: mpsc::Sender, nonces: Arc>>, } -impl PeerConnector +impl PeerHandshake where S: Service + Clone + Send + 'static, S::Future: Send, @@ -49,7 +50,7 @@ where // Builder2, ..., with Builder1::with_config() -> Builder2; // Builder2::with_internal_service() -> ... or use Options in a single // Builder type or use the derive_builder crate. - PeerConnector { + PeerHandshake { config, internal_service, timestamp_collector, @@ -58,7 +59,7 @@ where } } -impl Service<(TcpStream, SocketAddr)> for PeerConnector +impl Service<(TcpStream, SocketAddr)> for PeerHandshake where S: Service + Clone + Send + 'static, S::Future: Send, diff --git a/zebra-network/src/peer_set.rs b/zebra-network/src/peer_set.rs index c4c6469f..ae6cd9b0 100644 --- a/zebra-network/src/peer_set.rs +++ b/zebra-network/src/peer_set.rs @@ -28,7 +28,7 @@ use tracing::Level; use tracing_futures::Instrument; use crate::{ - peer::{HandshakeError, PeerClient, PeerConnector}, + peer::{HandshakeError, PeerClient, PeerHandshake}, timestamp_collector::TimestampCollector, AddressBook, BoxedStdError, Config, Request, Response, }; @@ -75,9 +75,9 @@ where S::Future: Send + 'static, { let (address_book, timestamp_collector) = TimestampCollector::spawn(); - let peer_connector = Buffer::new( + let handshaker = Buffer::new( Timeout::new( - PeerConnector::new(config.clone(), inbound_service, timestamp_collector), + PeerHandshake::new(config.clone(), inbound_service, timestamp_collector), config.handshake_timeout, ), 1, @@ -111,18 +111,13 @@ where // 1. Initial peers, specified in the config. tokio::spawn(add_initial_peers( config.initial_peers.clone(), - peer_connector.clone(), + handshaker.clone(), peerset_tx.clone(), )); // 2. Incoming peer connections, via a listener. tokio::spawn( - listen( - config.listen_addr, - peer_connector.clone(), - peerset_tx.clone(), - ) - .map(|result| { + listen(config.listen_addr, handshaker.clone(), peerset_tx.clone()).map(|result| { if let Err(e) = result { error!(%e); } @@ -145,7 +140,7 @@ where config.new_peer_interval, demand_rx, candidates, - peer_connector, + handshaker, peerset_tx, ) .map(|result| { @@ -158,12 +153,12 @@ where (peer_set, address_book) } -/// Use the provided `peer_connector` to connect to `initial_peers`, then send +/// Use the provided `handshaker` to connect to `initial_peers`, then send /// the results over `tx`. -#[instrument(skip(initial_peers, tx, peer_connector))] +#[instrument(skip(initial_peers, tx, handshaker))] async fn add_initial_peers( initial_peers: Vec, - peer_connector: S, + handshaker: S, mut tx: mpsc::Sender, ) where S: Service<(TcpStream, SocketAddr), Response = PeerClient, Error = BoxedStdError> + Clone, @@ -173,11 +168,11 @@ async fn add_initial_peers( let mut handshakes = initial_peers .into_iter() .map(|addr| { - let mut pc = peer_connector.clone(); + let mut hs = handshaker.clone(); async move { let stream = TcpStream::connect(addr).await?; - pc.ready().await?; - let client = pc.call((stream, addr)).await?; + hs.ready().await?; + let client = hs.call((stream, addr)).await?; Ok::<_, BoxedStdError>(Change::Insert(addr, client)) } }) @@ -187,12 +182,12 @@ async fn add_initial_peers( } } -/// Bind to `addr`, listen for peers using `peer_connector`, then send the +/// Bind to `addr`, listen for peers using `handshaker`, then send the /// results over `tx`. -#[instrument(skip(tx, peer_connector))] +#[instrument(skip(tx, handshaker))] async fn listen( addr: SocketAddr, - mut peer_connector: S, + mut handshaker: S, tx: mpsc::Sender, ) -> Result<(), BoxedStdError> where @@ -203,9 +198,9 @@ where loop { if let Ok((tcp_stream, addr)) = listener.accept().await { debug!(?addr, "got incoming connection"); - peer_connector.ready().await?; + handshaker.ready().await?; // Construct a handshake future but do not drive it yet.... - let handshake = peer_connector.call((tcp_stream, addr)); + let handshake = handshaker.call((tcp_stream, addr)); // ... instead, spawn a new task to handle this connection let mut tx2 = tx.clone(); tokio::spawn(async move { @@ -220,18 +215,12 @@ where /// Given a channel that signals a need for new peers, try to connect to a peer /// and send the resulting `PeerClient` through a channel. /// -#[instrument(skip( - new_peer_interval, - demand_signal, - candidates, - peer_connector, - success_tx -))] +#[instrument(skip(new_peer_interval, demand_signal, candidates, handshaker, success_tx))] async fn crawl_and_dial( new_peer_interval: Duration, demand_signal: mpsc::Receiver<()>, mut candidates: CandidateSet, - peer_connector: C, + handshaker: C, mut success_tx: mpsc::Sender, ) -> Result<(), BoxedStdError> where @@ -251,11 +240,11 @@ where use crate::types::MetaAddr; use futures::TryFutureExt; let try_connect = |candidate: MetaAddr| { - let mut pc = peer_connector.clone(); + let mut hs = handshaker.clone(); async move { let stream = TcpStream::connect(candidate.addr).await?; - pc.ready().await?; - pc.call((stream, candidate.addr)) + hs.ready().await?; + hs.call((stream, candidate.addr)) .await .map(|client| Change::Insert(candidate.addr, client)) }