From 60a0b8c382dfbd9b598d5b06a58e7de0e0909b8a Mon Sep 17 00:00:00 2001 From: Henry de Valence Date: Mon, 31 Aug 2020 21:32:35 -0700 Subject: [PATCH] network: change Handshake::new to a Builder. This allows more detailed control over the handshake parameters. --- zebra-network/src/peer/handshake.rs | 166 +++++++++++++++++++---- zebra-network/src/peer_set/initialize.rs | 11 +- 2 files changed, 146 insertions(+), 31 deletions(-) diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index 9fa06e97..a57d1882 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -36,43 +36,149 @@ use super::{Client, Connection, ErrorSlot, HandshakeError}; /// client/server pair. pub struct Handshake { config: Config, - internal_service: S, + inbound_service: S, timestamp_collector: mpsc::Sender, nonces: Arc>>, + our_addr: SocketAddr, + user_agent: String, + our_services: PeerServices, + relay: bool, } +// This is manually implemented because we can't pass an +// S: Clone bound to #[derive(Clone)]. impl Clone for Handshake { fn clone(&self) -> Self { Handshake { config: self.config.clone(), - internal_service: self.internal_service.clone(), + inbound_service: self.inbound_service.clone(), timestamp_collector: self.timestamp_collector.clone(), nonces: self.nonces.clone(), + our_addr: self.our_addr.clone(), + user_agent: self.user_agent.clone(), + our_services: self.our_services.clone(), + relay: self.relay.clone(), } } } +pub struct Builder { + config: Option, + inbound_service: Option, + timestamp_collector: Option>, + our_addr: Option, + our_services: Option, + user_agent: Option, + relay: Option, +} + +impl Builder +where + S: Service + Clone + Send + 'static, + S::Future: Send, +{ + /// Provide a config. Mandatory. + pub fn with_config(mut self, config: Config) -> Self { + self.config = Some(config); + self + } + + /// Provide a service to handle inbound requests. Mandatory. + pub fn with_inbound_service(mut self, inbound_service: S) -> Self { + self.inbound_service = Some(inbound_service); + self + } + + /// Provide a hook for timestamp collection. Optional. + /// + /// If this is unset, timestamps will not be collected. + pub fn with_timestamp_collector(mut self, timestamp_collector: mpsc::Sender) -> Self { + self.timestamp_collector = Some(timestamp_collector); + self + } + + /// Provide this node's address, to send to peers. Optional. + /// + /// If this is unset, the default of 0.0.0.0:8233 will be used. + pub fn with_addr(mut self, addr: SocketAddr) -> Self { + self.our_addr = Some(addr); + self + } + + /// Provide the services this node advertises to other peers. Optional. + /// + /// If this is unset, the node will advertise itself as a client. + pub fn with_advertised_services(mut self, services: PeerServices) -> Self { + self.our_services = Some(services); + self + } + + /// Provide this node's user agent. Optional. + /// + /// This must be a valid BIP14 string. If it is unset, the user-agent will be empty. + pub fn with_user_agent(mut self, user_agent: String) -> Self { + self.user_agent = Some(user_agent); + self + } + + /// Whether to request that peers relay transactions to our node. Optional. + /// + /// If this is unset, the node will not request transactions. + pub fn want_transactions(mut self, relay: bool) -> Self { + self.relay = Some(relay); + self + } + + /// Consume this builder and produce a [`Handshake`]. + /// + /// Returns an error only if any mandatory field was unset. + pub fn finish(self) -> Result, &'static str> { + let config = self.config.ok_or("did not specify config")?; + let inbound_service = self + .inbound_service + .ok_or("did not specify inbound service")?; + let timestamp_collector = self.timestamp_collector.unwrap_or_else(|| { + // No timestamp collector was passed, so create a stub channel. + // Dropping the receiver means sends will fail, but we don't care. + let (tx, _rx) = mpsc::channel(1); + tx + }); + let nonces = Arc::new(Mutex::new(HashSet::new())); + let user_agent = self.user_agent.unwrap_or_else(|| "".to_string()); + let our_addr = self + .our_addr + .unwrap_or_else(|| "0.0.0.0:8233".parse().unwrap()); + let our_services = self.our_services.unwrap_or(PeerServices::empty()); + let relay = self.relay.unwrap_or(false); + Ok(Handshake { + config, + inbound_service, + timestamp_collector, + nonces, + user_agent, + our_addr, + our_services, + relay, + }) + } +} + impl Handshake where S: Service + Clone + Send + 'static, S::Future: Send, { - /// Construct a new `PeerConnector`. - pub fn new( - config: Config, - internal_service: S, - timestamp_collector: mpsc::Sender, - ) -> Self { - // XXX this function has too many parameters, but it's not clear how to - // do a nice builder as all fields are mandatory. Could have Builder1, - // Builder2, ..., with Builder1::with_config() -> Builder2; - // Builder2::with_internal_service() -> ... or use Options in a single - // Builder type or use the derive_builder crate. - Handshake { - config, - internal_service, - timestamp_collector, - nonces: Arc::new(Mutex::new(HashSet::new())), + /// Create a builder that configures a [`Handshake`] service. + pub fn builder() -> Builder { + // can't derive Builder::default without a bound on S :( + Builder { + config: None, + inbound_service: None, + timestamp_collector: None, + user_agent: None, + our_addr: None, + our_services: None, + relay: None, } } } @@ -102,9 +208,13 @@ where // Clone these upfront, so they can be moved into the future. let nonces = self.nonces.clone(); - let internal_service = self.internal_service.clone(); + let inbound_service = self.inbound_service.clone(); let timestamp_collector = self.timestamp_collector.clone(); let network = self.config.network; + let our_addr = self.our_addr.clone(); + let user_agent = self.user_agent.clone(); + let our_services = self.our_services; + let relay = self.relay; let fut = async move { debug!("connecting to remote peer"); @@ -125,20 +235,16 @@ where let version = Message::Version { version: constants::CURRENT_VERSION, - services: PeerServices::NODE_NETWORK, + services: our_services, timestamp: Utc::now(), address_recv: (PeerServices::NODE_NETWORK, addr), - // TODO: when we've implemented block and transaction relaying, - // send our configured address to the peer - address_from: (PeerServices::NODE_NETWORK, "0.0.0.0:8233".parse().unwrap()), + address_from: (our_services, our_addr), nonce: local_nonce, - user_agent: constants::USER_AGENT.to_string(), - // XXX eventually the `PeerConnector` will need to have a handle - // for a service that gets the current block height. Among other - // things we need it to reject peers who don't know about the - // current protocol epoch. + user_agent, + // The protocol works fine if we don't reveal our current block height, + // and not sending it means we don't need to be connected to the chain state. start_height: block::Height(0), - relay: false, + relay, }; debug!(?version, "sending initial version message"); @@ -282,7 +388,7 @@ where use super::connection; let server = Connection { state: connection::State::AwaitingRequest, - svc: internal_service, + svc: inbound_service, client_rx: server_rx, error_slot: slot, peer_tx, diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index 908c8a2c..16e7d701 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -57,7 +57,16 @@ where let (listener, connector) = { use tower::timeout::TimeoutLayer; let hs_timeout = TimeoutLayer::new(constants::HANDSHAKE_TIMEOUT); - let hs = peer::Handshake::new(config.clone(), inbound_service, timestamp_collector); + use crate::protocol::external::types::PeerServices; + let hs = peer::Handshake::builder() + .with_config(config.clone()) + .with_inbound_service(inbound_service) + .with_timestamp_collector(timestamp_collector) + // XXX .with_addr(addr) once we can access our configured address + .with_advertised_services(PeerServices::NODE_NETWORK) + .with_user_agent(crate::constants::USER_AGENT.to_string()) + .finish() + .expect("configured all required parameters"); ( hs_timeout.layer(hs.clone()), hs_timeout.layer(peer::Connector::new(hs)),