diff --git a/zebra-network/src/address_book.rs b/zebra-network/src/address_book.rs new file mode 100644 index 00000000..4e75cb3e --- /dev/null +++ b/zebra-network/src/address_book.rs @@ -0,0 +1,76 @@ +//! The addressbook manages information about what peers exist, when they were +//! seen, and what services they provide. + +use std::{ + collections::{BTreeMap, HashMap}, + net::SocketAddr, + sync::{Arc, Mutex}, +}; + +use chrono::{DateTime, Utc}; +use futures::channel::mpsc; +use tokio::prelude::*; + +use crate::{ + constants, + types::{MetaAddr, PeerServices}, +}; + +/// A database of peers, their advertised services, and information on when they +/// were last seen. +#[derive(Default, Debug)] +pub struct AddressBook { + by_addr: HashMap, PeerServices)>, + by_time: BTreeMap, (SocketAddr, PeerServices)>, +} + +impl AddressBook { + /// Update the address book with `event`, a [`MetaAddr`] representing + /// observation of a peer. + pub fn update(&mut self, event: MetaAddr) { + use chrono::Duration as CD; + use std::collections::hash_map::Entry; + + trace!( + ?event, + data.total = self.by_time.len(), + // This would be cleaner if it used "variables" but keeping + // it inside the trace! invocation prevents running the range + // query unless the output will actually be used. + data.recent = self + .by_time + .range( + (Utc::now() - CD::from_std(constants::LIVE_PEER_DURATION).unwrap())..Utc::now() + ) + .count() + ); + + let MetaAddr { + addr, + services, + last_seen, + } = event; + + match self.by_addr.entry(addr) { + Entry::Occupied(mut entry) => { + let (prev_last_seen, _) = entry.get(); + // If the new timestamp event is older than the current + // one, discard it. This is irrelevant for the timestamp + // collector but is important for combining address + // information from different peers. + if *prev_last_seen > last_seen { + return; + } + self.by_time + .remove(prev_last_seen) + .expect("cannot have by_addr entry without by_time entry"); + entry.insert((last_seen, services)); + self.by_time.insert(last_seen, (addr, services)); + } + Entry::Vacant(entry) => { + entry.insert((last_seen, services)); + self.by_time.insert(last_seen, (addr, services)); + } + } + } +} diff --git a/zebra-network/src/lib.rs b/zebra-network/src/lib.rs index d040df5e..38460d8a 100644 --- a/zebra-network/src/lib.rs +++ b/zebra-network/src/lib.rs @@ -44,6 +44,7 @@ extern crate bitflags; /// parameterized by 'a), *not* that the object itself has 'static lifetime. pub type BoxedStdError = Box; +mod address_book; mod config; mod constants; mod meta_addr; @@ -54,11 +55,10 @@ mod protocol; mod timestamp_collector; pub use crate::{ + address_book::AddressBook, config::Config, peer_set::{init, BoxedZebraService}, protocol::internal::{Request, Response}, - // XXX replace with `AddressBook` - timestamp_collector::TimestampCollector, }; /// Types used in the definition of [`Request`] and [`Response`] messages. @@ -68,5 +68,5 @@ pub mod types { /// This will be removed when we finish encapsulation pub mod should_be_private { - pub use crate::peer::PeerConnector; + pub use crate::{peer::PeerConnector, timestamp_collector::TimestampCollector}; } diff --git a/zebra-network/src/peer_set.rs b/zebra-network/src/peer_set.rs index 074b698f..9cc38023 100644 --- a/zebra-network/src/peer_set.rs +++ b/zebra-network/src/peer_set.rs @@ -10,7 +10,11 @@ mod unready_service; pub use discover::PeerDiscover; pub use set::PeerSet; -use std::{net::SocketAddr, pin::Pin}; +use std::{ + net::SocketAddr, + pin::Pin, + sync::{Arc, Mutex}, +}; use futures::{ channel::mpsc, @@ -29,11 +33,9 @@ use tracing::Level; use tracing_futures::Instrument; use crate::{ - config::Config, peer::{HandshakeError, PeerClient, PeerConnector}, - protocol::internal::{Request, Response}, timestamp_collector::TimestampCollector, - BoxedStdError, + AddressBook, BoxedStdError, Config, Request, Response, }; /// A type alias for a boxed [`tower::Service`] used to process [`Request`]s into [`Response`]s. @@ -50,7 +52,7 @@ pub type BoxedZebraService = Box< type PeerChange = Result, BoxedStdError>; /// Initialize a peer set with the given `config`, forwarding peer requests to the `inbound_service`. -pub fn init(config: Config, inbound_service: S) -> (BoxedZebraService, TimestampCollector) +pub fn init(config: Config, inbound_service: S) -> (BoxedZebraService, Arc>) where S: Service + Clone + Send + 'static, S::Future: Send + 'static, @@ -96,7 +98,7 @@ where // 3. Outgoing peers we connect to in response to load. - (Box::new(peer_set), timestamp_collector) + (Box::new(peer_set), timestamp_collector.address_book()) } /// Use the provided `peer_connector` to connect to `initial_peers`, then send diff --git a/zebra-network/src/timestamp_collector.rs b/zebra-network/src/timestamp_collector.rs index 586320af..6cc8da34 100644 --- a/zebra-network/src/timestamp_collector.rs +++ b/zebra-network/src/timestamp_collector.rs @@ -1,4 +1,4 @@ -//! Management of peer liveness / last-seen information. +//! The timestamp collector collects liveness information from peers. use std::{ collections::{BTreeMap, HashMap}, @@ -13,9 +13,11 @@ use tokio::prelude::*; use crate::{ constants, types::{MetaAddr, PeerServices}, + AddressBook, }; -/// Maintains a lookup table from peer addresses to last-seen times. +/// The timestamp collector hooks into incoming message streams for each peer and +/// records per-connection last-seen timestamps into an [`AddressBook`]. /// /// On creation, the `TimestampCollector` spawns a worker task to process new /// timestamp events. The resulting `TimestampCollector` can be cloned, and the @@ -27,70 +29,15 @@ pub struct TimestampCollector { // We do not expect mutex contention to be a problem, because // the dominant accessor is the collector worker, and it has a long // event buffer to hide latency if other tasks block it temporarily. - data: Arc>, + data: Arc>, shutdown: Arc, worker_tx: mpsc::Sender, } -#[derive(Default, Debug)] -struct TimestampData { - by_addr: HashMap, PeerServices)>, - by_time: BTreeMap, (SocketAddr, PeerServices)>, -} - -impl TimestampData { - fn update(&mut self, event: MetaAddr) { - use chrono::Duration as CD; - use std::collections::hash_map::Entry; - - trace!( - ?event, - data.total = self.by_time.len(), - // This would be cleaner if it used "variables" but keeping - // it inside the trace! invocation prevents running the range - // query unless the output will actually be used. - data.recent = self - .by_time - .range( - (Utc::now() - CD::from_std(constants::LIVE_PEER_DURATION).unwrap())..Utc::now() - ) - .count() - ); - - let MetaAddr { - addr, - services, - last_seen, - } = event; - - match self.by_addr.entry(addr) { - Entry::Occupied(mut entry) => { - let (prev_last_seen, _) = entry.get(); - // If the new timestamp event is older than the current - // one, discard it. This is irrelevant for the timestamp - // collector but is important for combining address - // information from different peers. - if *prev_last_seen > last_seen { - return; - } - self.by_time - .remove(prev_last_seen) - .expect("cannot have by_addr entry without by_time entry"); - entry.insert((last_seen, services)); - self.by_time.insert(last_seen, (addr, services)); - } - Entry::Vacant(entry) => { - entry.insert((last_seen, services)); - self.by_time.insert(last_seen, (addr, services)); - } - } - } -} - impl TimestampCollector { /// Constructs a new `TimestampCollector`, spawning a worker task to process updates. pub fn new() -> TimestampCollector { - let data = Arc::new(Mutex::new(TimestampData::default())); + let data = Arc::new(Mutex::new(AddressBook::default())); // We need to make a copy now so we can move data into the async block. let data2 = data.clone(); @@ -121,6 +68,11 @@ impl TimestampCollector { } } + /// Return a shared reference to the [`AddressBook`] this collector updates. + pub fn address_book(&self) -> Arc> { + self.data.clone() + } + pub(crate) fn sender_handle(&self) -> mpsc::Sender { self.worker_tx.clone() } diff --git a/zebrad/src/commands/connect.rs b/zebrad/src/commands/connect.rs index 70a5b1f6..6957711e 100644 --- a/zebrad/src/commands/connect.rs +++ b/zebrad/src/commands/connect.rs @@ -51,7 +51,7 @@ impl Runnable for ConnectCmd { impl ConnectCmd { async fn connect(&self) -> Result<(), failure::Error> { - use zebra_network::{Request, Response, TimestampCollector}; + use zebra_network::{Request, Response}; info!("begin tower-based peer handling test stub"); use tower::{buffer::Buffer, service_fn, Service, ServiceExt}; @@ -77,7 +77,7 @@ impl ConnectCmd { // Later, this should turn into initial_peers = vec![self.addr]; config.initial_peers = { use tokio::net::TcpStream; - use zebra_network::should_be_private::PeerConnector; + use zebra_network::should_be_private::{PeerConnector, TimestampCollector}; let collector = TimestampCollector::new(); let mut pc = Buffer::new( @@ -108,7 +108,7 @@ impl ConnectCmd { addrs.into_iter().map(|meta| meta.addr).collect::>() }; - let (mut peer_set, _tc) = zebra_network::init(config, node); + let (mut peer_set, _address_book) = zebra_network::init(config, node); info!("waiting for peer_set ready"); peer_set.ready().await.map_err(Error::from_boxed_compat)?;