diff --git a/zebra-network/src/config.rs b/zebra-network/src/config.rs index e2fc0854..19c0e21f 100644 --- a/zebra-network/src/config.rs +++ b/zebra-network/src/config.rs @@ -6,6 +6,8 @@ use crate::network::Network; #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(deny_unknown_fields)] pub struct Config { + /// The address on which this node should listen for connections. + pub listen_addr: SocketAddr, /// The network to connect to. pub network: Network, /// The user-agent to advertise. @@ -23,6 +25,9 @@ pub struct Config { impl Default for Config { fn default() -> Config { Config { + listen_addr: "127.0.0.1:28233" + .parse() + .expect("Hardcoded address should be parseable"), user_agent: crate::constants::USER_AGENT.to_owned(), network: Network::Mainnet, initial_peers: Vec::new(), diff --git a/zebra-network/src/peer_set.rs b/zebra-network/src/peer_set.rs index 4577a958..3d34402c 100644 --- a/zebra-network/src/peer_set.rs +++ b/zebra-network/src/peer_set.rs @@ -10,15 +10,31 @@ mod unready_service; pub use discover::PeerDiscover; pub use set::PeerSet; -use std::pin::Pin; +use std::{net::SocketAddr, pin::Pin}; -use futures::future::Future; -use tower::Service; +use futures::{ + channel::mpsc, + future::{self, Future, FutureExt}, + sink::SinkExt, + stream::{FuturesUnordered, StreamExt}, +}; +use tokio::net::{TcpListener, TcpStream}; +use tower::{ + buffer::Buffer, + discover::{Change, ServiceStream}, + Service, ServiceExt, +}; +use tower_load::{peak_ewma::PeakEwmaDiscover, NoInstrument}; +use tracing::Level; +use tracing_futures::Instrument; -use crate::config::Config; -use crate::protocol::internal::{Request, Response}; -use crate::BoxedStdError; -use crate::{peer::PeerConnector, timestamp_collector::TimestampCollector}; +use crate::{ + config::Config, + peer::{HandshakeError, PeerClient, PeerConnector}, + protocol::internal::{Request, Response}, + timestamp_collector::TimestampCollector, + BoxedStdError, +}; type BoxedZebraService = Box< dyn Service< @@ -30,55 +46,113 @@ type BoxedZebraService = Box< + 'static, >; +type PeerChange = Result, BoxedStdError>; + /// Initialize a peer set with the given `config`, forwarding peer requests to the `inbound_service`. pub fn init(config: Config, inbound_service: S) -> (BoxedZebraService, TimestampCollector) where S: Service + Clone + Send + 'static, - S::Future: Send, + S::Future: Send + 'static, { - use futures::{ - future, - stream::{FuturesUnordered, StreamExt}, - }; - use tokio::net::TcpStream; - use tower::{ - buffer::Buffer, - discover::{Change, ServiceStream}, - ServiceExt, - }; - use tower_load::{peak_ewma::PeakEwmaDiscover, NoInstrument}; + let timestamp_collector = TimestampCollector::new(); + let peer_connector = Buffer::new( + PeerConnector::new(config.clone(), inbound_service, ×tamp_collector), + 1, + ); - let tc = TimestampCollector::new(); - let pc = Buffer::new(PeerConnector::new(config.clone(), inbound_service, &tc), 1); + // Create an mpsc channel for peer changes, with a generous buffer. + let (peerset_tx, peerset_rx) = mpsc::channel::(100); - // construct a stream of services XXX currently the stream is based on a - // static stream from config.initial_peers; this should be replaced with a - // channel that starts with initial_peers but also accetps incoming, dials - // new, etc. - let client_stream = PeakEwmaDiscover::new( + // Connect the rx end to a PeerSet, wrapping new peers in load instruments. + let peer_set = PeerSet::new(PeakEwmaDiscover::new( ServiceStream::new( - config - .initial_peers - .into_iter() - .map(|addr| { - let mut pc = pc.clone(); - async move { - let stream = TcpStream::connect(addr).await?; - pc.ready().await?; - let client = pc.call((stream, addr)).await?; - Ok::<_, BoxedStdError>(Change::Insert(addr, client)) - } - }) - .collect::>() - // Discard any errored connections... - .filter(|result| future::ready(result.is_ok())), + // ServiceStream interprets an error as stream termination, + // so discard any errored connections... + peerset_rx.filter(|result| future::ready(result.is_ok())), ), config.ewma_default_rtt, config.ewma_decay_time, NoInstrument, + )); + + // Connect the tx end to the 3 peer sources: + + // 1. Initial peers, specified in the config. + tokio::spawn(add_initial_peers( + config.initial_peers.clone(), + peer_connector.clone(), + peerset_tx.clone(), + )); + + // 2. Incoming peer connections, via a listener. + tokio::spawn( + listen(config.listen_addr, peer_connector, peerset_tx).map(|result| { + if let Err(e) = result { + error!(%e); + } + }), ); - let peer_set = PeerSet::new(client_stream); + // 3. Outgoing peers we connect to in response to load. - (Box::new(peer_set), tc) + (Box::new(peer_set), timestamp_collector) +} + +/// Use the provided `peer_connector` to connect to `initial_peers`, then send +/// the results over `tx`. +#[instrument(skip(initial_peers, tx, peer_connector))] +async fn add_initial_peers( + initial_peers: Vec, + peer_connector: S, + mut tx: mpsc::Sender, +) where + S: Service<(TcpStream, SocketAddr), Response = PeerClient, Error = BoxedStdError> + Clone, + S::Future: Send + 'static, +{ + info!(?initial_peers, "Connecting to initial peer set"); + let mut handshakes = initial_peers + .into_iter() + .map(|addr| { + let mut pc = peer_connector.clone(); + async move { + let stream = TcpStream::connect(addr).await?; + pc.ready().await?; + let client = pc.call((stream, addr)).await?; + Ok::<_, BoxedStdError>(Change::Insert(addr, client)) + } + }) + .collect::>(); + while let Some(handshake_result) = handshakes.next().await { + let _ = tx.send(handshake_result).await; + } +} + +/// Bind to `addr`, listen for peers using `peer_connector`, then send the +/// results over `tx`. +#[instrument(skip(tx, peer_connector))] +async fn listen( + addr: SocketAddr, + mut peer_connector: S, + tx: mpsc::Sender, +) -> Result<(), BoxedStdError> +where + S: Service<(TcpStream, SocketAddr), Response = PeerClient, Error = BoxedStdError> + Clone, + S::Future: Send + 'static, +{ + let mut listener = TcpListener::bind(addr).await?; + loop { + if let Ok((tcp_stream, addr)) = listener.accept().await { + debug!(?addr, "got incoming connection"); + peer_connector.ready().await?; + // Construct a handshake future but do not drive it yet.... + let handshake = peer_connector.call((tcp_stream, addr)); + // ... instead, spawn a new task to handle this connection + let mut tx2 = tx.clone(); + tokio::spawn(async move { + if let Ok(client) = handshake.await { + let _ = tx2.send(Ok(Change::Insert(addr, client))).await; + } + }); + } + } }