diff --git a/zebra-network/src/lib.rs b/zebra-network/src/lib.rs index b7af274d..4ef6fe6f 100644 --- a/zebra-network/src/lib.rs +++ b/zebra-network/src/lib.rs @@ -57,7 +57,7 @@ mod timestamp_collector; pub use crate::{ address_book::AddressBook, config::Config, - peer_set::{init, BoxedZebraService}, + peer_set::init, protocol::internal::{Request, Response}, }; diff --git a/zebra-network/src/peer.rs b/zebra-network/src/peer.rs index c0cbdac7..16da7436 100644 --- a/zebra-network/src/peer.rs +++ b/zebra-network/src/peer.rs @@ -2,6 +2,8 @@ /// Handles outbound requests from our node to the network. mod client; +/// Wrapper around handshake logic that also opens a TCP connection. +mod connector; /// Peer-related errors. mod error; /// Performs peer handshakes. @@ -10,6 +12,7 @@ mod handshake; mod server; pub use client::PeerClient; +pub use connector::PeerConnector; pub use error::{HandshakeError, PeerError, SharedPeerError}; pub use handshake::PeerHandshake; pub use server::PeerServer; diff --git a/zebra-network/src/peer/connector.rs b/zebra-network/src/peer/connector.rs new file mode 100644 index 00000000..aae7f5f1 --- /dev/null +++ b/zebra-network/src/peer/connector.rs @@ -0,0 +1,59 @@ +use std::{ + net::SocketAddr, + pin::Pin, + task::{Context, Poll}, +}; + +use tokio::{net::TcpStream, prelude::*}; +use tower::{discover::Change, Service, ServiceExt}; + +use crate::{BoxedStdError, Request, Response}; + +use super::{HandshakeError, PeerClient, PeerHandshake}; + +/// A wrapper around [`PeerHandshake`] that opens a TCP connection before +/// forwarding to the inner handshake service. Writing this as its own +/// [`tower::Service`] lets us apply unified timeout policies, etc. +pub struct PeerConnector { + handshaker: PeerHandshake, +} + +impl Clone for PeerConnector { + fn clone(&self) -> Self { + Self { + handshaker: self.handshaker.clone(), + } + } +} + +impl PeerConnector { + pub fn new(handshaker: PeerHandshake) -> Self { + Self { handshaker } + } +} + +impl Service for PeerConnector +where + S: Service + Clone + Send + 'static, + S::Future: Send, +{ + type Response = Change; + type Error = HandshakeError; + type Future = + Pin> + Send + 'static>>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, addr: SocketAddr) -> Self::Future { + let mut hs = self.handshaker.clone(); + async move { + let stream = TcpStream::connect(addr).await?; + hs.ready().await?; + let client = hs.call((stream, addr)).await?; + Ok(Change::Insert(addr, client)) + } + .boxed() + } +} diff --git a/zebra-network/src/peer/error.rs b/zebra-network/src/peer/error.rs index e4aa54ad..7c0e751e 100644 --- a/zebra-network/src/peer/error.rs +++ b/zebra-network/src/peer/error.rs @@ -67,6 +67,9 @@ pub enum HandshakeError { /// The remote peer closed the connection. #[error("Peer closed connection")] ConnectionClosed, + /// An error occurred while performing an IO operation. + #[error("Underlying IO error")] + Io(#[from] std::io::Error), /// A serialization error occurred while reading or writing a message. #[error("Serialization error")] Serialization(#[from] SerializationError), diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index 2bd3351c..c6cf5d78 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -4,13 +4,12 @@ use std::{ pin::Pin, sync::{Arc, Mutex}, task::{Context, Poll}, - time::Duration, }; use chrono::Utc; use futures::channel::mpsc; use tokio::{codec::Framed, net::TcpStream, prelude::*, timer::Interval}; -use tower::{Service, ServiceExt}; +use tower::Service; use tracing::{span, Level}; use tracing_futures::Instrument; @@ -34,6 +33,17 @@ pub struct PeerHandshake { nonces: Arc>>, } +impl Clone for PeerHandshake { + fn clone(&self) -> Self { + Self { + config: self.config.clone(), + internal_service: self.internal_service.clone(), + timestamp_collector: self.timestamp_collector.clone(), + nonces: self.nonces.clone(), + } + } +} + impl PeerHandshake where S: Service + Clone + Send + 'static, diff --git a/zebra-network/src/peer_set.rs b/zebra-network/src/peer_set.rs index ae6cd9b0..c77213db 100644 --- a/zebra-network/src/peer_set.rs +++ b/zebra-network/src/peer_set.rs @@ -3,6 +3,8 @@ // Portions of this submodule were adapted from tower-balance, // which is (c) 2019 Tower Contributors (MIT licensed). +// XXX these imports should go in a peer_set::initialize submodule + use std::{ net::SocketAddr, pin::Pin, @@ -20,7 +22,7 @@ use tokio::net::{TcpListener, TcpStream}; use tower::{ buffer::Buffer, discover::{Change, ServiceStream}, - timeout::Timeout, + layer::Layer, Service, ServiceExt, }; use tower_load::{peak_ewma::PeakEwmaDiscover, NoInstrument}; @@ -28,7 +30,7 @@ use tracing::Level; use tracing_futures::Instrument; use crate::{ - peer::{HandshakeError, PeerClient, PeerHandshake}, + peer::{HandshakeError, PeerClient, PeerConnector, PeerHandshake}, timestamp_collector::TimestampCollector, AddressBook, BoxedStdError, Config, Request, Response, }; @@ -39,19 +41,7 @@ mod set; mod unready_service; use candidate_set::CandidateSet; -pub use discover::PeerDiscover; -pub use set::PeerSet; - -/// A type alias for a boxed [`tower::Service`] used to process [`Request`]s into [`Response`]s. -pub type BoxedZebraService = Box< - dyn Service< - Request, - Response = Response, - Error = BoxedStdError, - Future = Pin> + Send>>, - > + Send - + 'static, ->; +use set::PeerSet; type PeerChange = Result, BoxedStdError>; @@ -75,13 +65,20 @@ where S::Future: Send + 'static, { let (address_book, timestamp_collector) = TimestampCollector::spawn(); - let handshaker = Buffer::new( - Timeout::new( - PeerHandshake::new(config.clone(), inbound_service, timestamp_collector), - config.handshake_timeout, - ), - 1, - ); + + // Construct services that handle inbound handshakes and perform outbound + // handshakes. These use the same handshake service internally to detect + // self-connection attempts. Both are decorated with a tower TimeoutLayer to + // enforce timeouts as specified in the Config. + let (listener, connector) = { + use tower::timeout::TimeoutLayer; + let hs_timeout = TimeoutLayer::new(config.handshake_timeout); + let hs = PeerHandshake::new(config.clone(), inbound_service, timestamp_collector); + ( + hs_timeout.layer(hs.clone()), + hs_timeout.layer(PeerConnector::new(hs)), + ) + }; // Create an mpsc channel for peer changes, with a generous buffer. let (peerset_tx, peerset_rx) = mpsc::channel::(100); @@ -111,13 +108,13 @@ where // 1. Initial peers, specified in the config. tokio::spawn(add_initial_peers( config.initial_peers.clone(), - handshaker.clone(), + connector.clone(), peerset_tx.clone(), )); // 2. Incoming peer connections, via a listener. tokio::spawn( - listen(config.listen_addr, handshaker.clone(), peerset_tx.clone()).map(|result| { + listen(config.listen_addr, listener, peerset_tx.clone()).map(|result| { if let Err(e) = result { error!(%e); } @@ -140,7 +137,7 @@ where config.new_peer_interval, demand_rx, candidates, - handshaker, + connector, peerset_tx, ) .map(|result| { @@ -155,28 +152,20 @@ where /// Use the provided `handshaker` to connect to `initial_peers`, then send /// the results over `tx`. -#[instrument(skip(initial_peers, tx, handshaker))] +#[instrument(skip(initial_peers, connector, tx))] async fn add_initial_peers( initial_peers: Vec, - handshaker: S, + connector: S, mut tx: mpsc::Sender, ) where - S: Service<(TcpStream, SocketAddr), Response = PeerClient, Error = BoxedStdError> + Clone, + S: Service, Error = BoxedStdError> + + Clone, S::Future: Send + 'static, { info!(?initial_peers, "Connecting to initial peer set"); - let mut handshakes = initial_peers - .into_iter() - .map(|addr| { - let mut hs = handshaker.clone(); - async move { - let stream = TcpStream::connect(addr).await?; - hs.ready().await?; - let client = hs.call((stream, addr)).await?; - Ok::<_, BoxedStdError>(Change::Insert(addr, client)) - } - }) - .collect::>(); + use tower::util::CallAllUnordered; + let addr_stream = futures::stream::iter(initial_peers.into_iter()); + let mut handshakes = CallAllUnordered::new(connector, addr_stream); while let Some(handshake_result) = handshakes.next().await { let _ = tx.send(handshake_result).await; } @@ -215,43 +204,22 @@ where /// Given a channel that signals a need for new peers, try to connect to a peer /// and send the resulting `PeerClient` through a channel. /// -#[instrument(skip(new_peer_interval, demand_signal, candidates, handshaker, success_tx))] +#[instrument(skip(new_peer_interval, demand_signal, candidates, connector, success_tx))] async fn crawl_and_dial( new_peer_interval: Duration, demand_signal: mpsc::Receiver<()>, mut candidates: CandidateSet, - handshaker: C, + mut connector: C, mut success_tx: mpsc::Sender, ) -> Result<(), BoxedStdError> where - C: Service<(TcpStream, SocketAddr), Response = PeerClient, Error = BoxedStdError> + Clone, + C: Service, Error = BoxedStdError> + + Clone, C::Future: Send + 'static, S: Service, S::Future: Send + 'static, { - // XXX this kind of boilerplate didn't exist before we made PeerConnector - // take (TcpStream, SocketAddr), which made it so that we could share code - // between inbound and outbound handshakes. Probably the cleanest way to - // make it go away again is to rename "Connector" to "Handshake" (since it - // is really responsible just for the handshake) and to have a "Connector" - // Service wrapper around "Handshake" that opens a TCP stream. - // We could also probably make the Handshake service `Clone` directly, - // which might be more efficient than using a Buffer wrapper. - use crate::types::MetaAddr; use futures::TryFutureExt; - let try_connect = |candidate: MetaAddr| { - let mut hs = handshaker.clone(); - async move { - let stream = TcpStream::connect(candidate.addr).await?; - hs.ready().await?; - hs.call((stream, candidate.addr)) - .await - .map(|client| Change::Insert(candidate.addr, client)) - } - // Use map_err to tag failed connections with the MetaAddr, - // so they can be reported to the CandidateSet. - .map_err(move |_| candidate) - }; // On creation, we are likely to have very few peers, so try to get more // connections quickly by concurrently connecting to a large number of @@ -259,7 +227,14 @@ where let mut handshakes = FuturesUnordered::new(); for _ in 0..50usize { if let Some(candidate) = candidates.next() { - handshakes.push(try_connect(candidate)) + connector.ready().await?; + handshakes.push( + connector + .call(candidate.addr) + // Use map_err to tag failed connections with the MetaAddr, + // so they can be reported to the CandidateSet. + .map_err(move |_| candidate), + ) } } while let Some(handshake) = handshakes.next().await { @@ -292,7 +267,12 @@ where } }; - match try_connect(candidate).await { + connector.ready().await?; + match connector + .call(candidate.addr) + .map_err(move |_| candidate) + .await + { Ok(change) => { debug!("Successfully dialed new peer, sending to peerset"); success_tx.send(Ok(change)).await?; diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index 60b5dc20..17cfe8e2 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -25,10 +25,7 @@ use crate::{ BoxedStdError, }; -use super::{ - unready_service::{Error as UnreadyError, UnreadyService}, - PeerDiscover, -}; +use super::unready_service::{Error as UnreadyError, UnreadyService}; /// A [`tower::Service`] that abstractly represents "the rest of the network". ///