diff --git a/zebra-network/src/peer/connector.rs b/zebra-network/src/peer/connector.rs index 104d94b0..fd0f2e6d 100644 --- a/zebra-network/src/peer/connector.rs +++ b/zebra-network/src/peer/connector.rs @@ -29,7 +29,7 @@ use super::{error::ErrorSlot, server::ServerState, HandshakeError, PeerClient, P pub struct PeerConnector { config: Config, internal_service: S, - sender: mpsc::Sender, + timestamp_collector: mpsc::Sender, nonces: Arc>>, } @@ -39,17 +39,20 @@ where S::Future: Send, { /// Construct a new `PeerConnector`. - pub fn new(config: Config, internal_service: S, collector: &TimestampCollector) -> Self { + pub fn new( + config: Config, + internal_service: S, + timestamp_collector: mpsc::Sender, + ) -> Self { // XXX this function has too many parameters, but it's not clear how to // do a nice builder as all fields are mandatory. Could have Builder1, // Builder2, ..., with Builder1::with_config() -> Builder2; // Builder2::with_internal_service() -> ... or use Options in a single // Builder type or use the derive_builder crate. - let sender = collector.sender_handle(); PeerConnector { config, internal_service, - sender, + timestamp_collector, nonces: Arc::new(Mutex::new(HashSet::new())), } } @@ -78,7 +81,7 @@ where // Clone these upfront, so they can be moved into the future. let nonces = self.nonces.clone(); let internal_service = self.internal_service.clone(); - let sender = self.sender.clone(); + let timestamp_collector = self.timestamp_collector.clone(); let user_agent = self.config.user_agent.clone(); let network = self.config.network.clone(); @@ -181,11 +184,11 @@ where let hooked_peer_rx = peer_rx .then(move |msg| { - let mut sender = sender.clone(); + let mut timestamp_collector = timestamp_collector.clone(); async move { if let Ok(_) = msg { use futures::sink::SinkExt; - let _ = sender + let _ = timestamp_collector .send(MetaAddr { addr, services: remote_services, diff --git a/zebra-network/src/peer_set.rs b/zebra-network/src/peer_set.rs index 9cc38023..81c1652d 100644 --- a/zebra-network/src/peer_set.rs +++ b/zebra-network/src/peer_set.rs @@ -57,9 +57,9 @@ where S: Service + Clone + Send + 'static, S::Future: Send + 'static, { - let timestamp_collector = TimestampCollector::new(); + let (address_book, timestamp_collector) = TimestampCollector::spawn(); let peer_connector = Buffer::new( - PeerConnector::new(config.clone(), inbound_service, ×tamp_collector), + PeerConnector::new(config.clone(), inbound_service, timestamp_collector), 1, ); @@ -98,7 +98,7 @@ where // 3. Outgoing peers we connect to in response to load. - (Box::new(peer_set), timestamp_collector.address_book()) + (Box::new(peer_set), address_book) } /// Use the provided `peer_connector` to connect to `initial_peers`, then send diff --git a/zebra-network/src/timestamp_collector.rs b/zebra-network/src/timestamp_collector.rs index 6cc8da34..bf0c459c 100644 --- a/zebra-network/src/timestamp_collector.rs +++ b/zebra-network/src/timestamp_collector.rs @@ -1,95 +1,36 @@ //! The timestamp collector collects liveness information from peers. -use std::{ - collections::{BTreeMap, HashMap}, - net::SocketAddr, - sync::{Arc, Mutex}, -}; +use std::sync::{Arc, Mutex}; -use chrono::{DateTime, Utc}; use futures::channel::mpsc; use tokio::prelude::*; -use crate::{ - constants, - types::{MetaAddr, PeerServices}, - AddressBook, -}; +use crate::{types::MetaAddr, AddressBook}; /// The timestamp collector hooks into incoming message streams for each peer and /// records per-connection last-seen timestamps into an [`AddressBook`]. -/// -/// On creation, the `TimestampCollector` spawns a worker task to process new -/// timestamp events. The resulting `TimestampCollector` can be cloned, and the -/// worker task and state are shared among all of the clones. -/// -/// XXX add functionality for querying the timestamp data -#[derive(Clone, Debug)] -pub struct TimestampCollector { - // We do not expect mutex contention to be a problem, because - // the dominant accessor is the collector worker, and it has a long - // event buffer to hide latency if other tasks block it temporarily. - data: Arc>, - shutdown: Arc, - worker_tx: mpsc::Sender, -} +pub struct TimestampCollector {} impl TimestampCollector { - /// Constructs a new `TimestampCollector`, spawning a worker task to process updates. - pub fn new() -> TimestampCollector { - let data = Arc::new(Mutex::new(AddressBook::default())); - // We need to make a copy now so we can move data into the async block. - let data2 = data.clone(); - + /// Spawn a new [`TimestampCollector`] task, and return handles for the + /// transmission channel for timestamp events and for the [`AddressBook`] it + /// updates. + pub fn spawn() -> (Arc>, mpsc::Sender) { const TIMESTAMP_WORKER_BUFFER_SIZE: usize = 100; let (worker_tx, mut worker_rx) = mpsc::channel(TIMESTAMP_WORKER_BUFFER_SIZE); - let (shutdown_tx, mut shutdown_rx) = mpsc::channel(0); + let address_book = Arc::new(Mutex::new(AddressBook::default())); + let worker_address_book = address_book.clone(); - // Construct and then spawn a worker. let worker = async move { - use futures::future::{self, Either}; - loop { - match future::select(shutdown_rx.next(), worker_rx.next()).await { - Either::Left((_, _)) => return, // shutdown signal - Either::Right((None, _)) => return, // all workers are gone - Either::Right((Some(event), _)) => data2 - .lock() - .expect("mutex should be unpoisoned") - .update(event), - } + while let Some(event) = worker_rx.next().await { + worker_address_book + .lock() + .expect("mutex should be unpoisoned") + .update(event); } }; tokio::spawn(worker.boxed()); - TimestampCollector { - data, - worker_tx, - shutdown: Arc::new(ShutdownSignal { tx: shutdown_tx }), - } - } - - /// Return a shared reference to the [`AddressBook`] this collector updates. - pub fn address_book(&self) -> Arc> { - self.data.clone() - } - - pub(crate) fn sender_handle(&self) -> mpsc::Sender { - self.worker_tx.clone() - } -} - -/// Sends a signal when dropped. -#[derive(Debug)] -struct ShutdownSignal { - /// Sending () signals that the task holding the rx end should terminate. - /// - /// This should really be a oneshot but calling a oneshot consumes it, - /// and we can't move out of self in Drop. - tx: mpsc::Sender<()>, -} - -impl Drop for ShutdownSignal { - fn drop(&mut self) { - self.tx.try_send(()).expect("tx is only used in drop"); + (address_book, worker_tx) } } diff --git a/zebrad/src/commands/connect.rs b/zebrad/src/commands/connect.rs index 6957711e..ea14ad74 100644 --- a/zebrad/src/commands/connect.rs +++ b/zebrad/src/commands/connect.rs @@ -79,9 +79,9 @@ impl ConnectCmd { use tokio::net::TcpStream; use zebra_network::should_be_private::{PeerConnector, TimestampCollector}; - let collector = TimestampCollector::new(); + let (_, collector) = TimestampCollector::spawn(); let mut pc = Buffer::new( - PeerConnector::new(config.clone(), node.clone(), &collector), + PeerConnector::new(config.clone(), node.clone(), collector), 1, );