Extract `TimestampData` to `AddressBook`.

This allows us to hide the `TimestampCollector` and to expose only the
address book data required by the inbound request service.  It also lets
us have a common data structure (the `AddressBook`) for collecting peer
information that can be used to manage information that other peers
report to us.
This commit is contained in:
Henry de Valence 2019-10-17 15:42:19 -07:00 committed by Deirdre Connolly
parent 15a698b23c
commit 53be838d51
5 changed files with 101 additions and 71 deletions

View File

@ -0,0 +1,76 @@
//! The addressbook manages information about what peers exist, when they were
//! seen, and what services they provide.
use std::{
collections::{BTreeMap, HashMap},
net::SocketAddr,
sync::{Arc, Mutex},
};
use chrono::{DateTime, Utc};
use futures::channel::mpsc;
use tokio::prelude::*;
use crate::{
constants,
types::{MetaAddr, PeerServices},
};
/// A database of peers, their advertised services, and information on when they
/// were last seen.
#[derive(Default, Debug)]
pub struct AddressBook {
by_addr: HashMap<SocketAddr, (DateTime<Utc>, PeerServices)>,
by_time: BTreeMap<DateTime<Utc>, (SocketAddr, PeerServices)>,
}
impl AddressBook {
/// Update the address book with `event`, a [`MetaAddr`] representing
/// observation of a peer.
pub fn update(&mut self, event: MetaAddr) {
use chrono::Duration as CD;
use std::collections::hash_map::Entry;
trace!(
?event,
data.total = self.by_time.len(),
// This would be cleaner if it used "variables" but keeping
// it inside the trace! invocation prevents running the range
// query unless the output will actually be used.
data.recent = self
.by_time
.range(
(Utc::now() - CD::from_std(constants::LIVE_PEER_DURATION).unwrap())..Utc::now()
)
.count()
);
let MetaAddr {
addr,
services,
last_seen,
} = event;
match self.by_addr.entry(addr) {
Entry::Occupied(mut entry) => {
let (prev_last_seen, _) = entry.get();
// If the new timestamp event is older than the current
// one, discard it. This is irrelevant for the timestamp
// collector but is important for combining address
// information from different peers.
if *prev_last_seen > last_seen {
return;
}
self.by_time
.remove(prev_last_seen)
.expect("cannot have by_addr entry without by_time entry");
entry.insert((last_seen, services));
self.by_time.insert(last_seen, (addr, services));
}
Entry::Vacant(entry) => {
entry.insert((last_seen, services));
self.by_time.insert(last_seen, (addr, services));
}
}
}
}

View File

@ -44,6 +44,7 @@ extern crate bitflags;
/// parameterized by 'a), *not* that the object itself has 'static lifetime.
pub type BoxedStdError = Box<dyn std::error::Error + Send + Sync + 'static>;
mod address_book;
mod config;
mod constants;
mod meta_addr;
@ -54,11 +55,10 @@ mod protocol;
mod timestamp_collector;
pub use crate::{
address_book::AddressBook,
config::Config,
peer_set::{init, BoxedZebraService},
protocol::internal::{Request, Response},
// XXX replace with `AddressBook`
timestamp_collector::TimestampCollector,
};
/// Types used in the definition of [`Request`] and [`Response`] messages.
@ -68,5 +68,5 @@ pub mod types {
/// This will be removed when we finish encapsulation
pub mod should_be_private {
pub use crate::peer::PeerConnector;
pub use crate::{peer::PeerConnector, timestamp_collector::TimestampCollector};
}

View File

@ -10,7 +10,11 @@ mod unready_service;
pub use discover::PeerDiscover;
pub use set::PeerSet;
use std::{net::SocketAddr, pin::Pin};
use std::{
net::SocketAddr,
pin::Pin,
sync::{Arc, Mutex},
};
use futures::{
channel::mpsc,
@ -29,11 +33,9 @@ use tracing::Level;
use tracing_futures::Instrument;
use crate::{
config::Config,
peer::{HandshakeError, PeerClient, PeerConnector},
protocol::internal::{Request, Response},
timestamp_collector::TimestampCollector,
BoxedStdError,
AddressBook, BoxedStdError, Config, Request, Response,
};
/// A type alias for a boxed [`tower::Service`] used to process [`Request`]s into [`Response`]s.
@ -50,7 +52,7 @@ pub type BoxedZebraService = Box<
type PeerChange = Result<Change<SocketAddr, PeerClient>, BoxedStdError>;
/// Initialize a peer set with the given `config`, forwarding peer requests to the `inbound_service`.
pub fn init<S>(config: Config, inbound_service: S) -> (BoxedZebraService, TimestampCollector)
pub fn init<S>(config: Config, inbound_service: S) -> (BoxedZebraService, Arc<Mutex<AddressBook>>)
where
S: Service<Request, Response = Response, Error = BoxedStdError> + Clone + Send + 'static,
S::Future: Send + 'static,
@ -96,7 +98,7 @@ where
// 3. Outgoing peers we connect to in response to load.
(Box::new(peer_set), timestamp_collector)
(Box::new(peer_set), timestamp_collector.address_book())
}
/// Use the provided `peer_connector` to connect to `initial_peers`, then send

View File

@ -1,4 +1,4 @@
//! Management of peer liveness / last-seen information.
//! The timestamp collector collects liveness information from peers.
use std::{
collections::{BTreeMap, HashMap},
@ -13,9 +13,11 @@ use tokio::prelude::*;
use crate::{
constants,
types::{MetaAddr, PeerServices},
AddressBook,
};
/// Maintains a lookup table from peer addresses to last-seen times.
/// The timestamp collector hooks into incoming message streams for each peer and
/// records per-connection last-seen timestamps into an [`AddressBook`].
///
/// On creation, the `TimestampCollector` spawns a worker task to process new
/// timestamp events. The resulting `TimestampCollector` can be cloned, and the
@ -27,70 +29,15 @@ pub struct TimestampCollector {
// We do not expect mutex contention to be a problem, because
// the dominant accessor is the collector worker, and it has a long
// event buffer to hide latency if other tasks block it temporarily.
data: Arc<Mutex<TimestampData>>,
data: Arc<Mutex<AddressBook>>,
shutdown: Arc<ShutdownSignal>,
worker_tx: mpsc::Sender<MetaAddr>,
}
#[derive(Default, Debug)]
struct TimestampData {
by_addr: HashMap<SocketAddr, (DateTime<Utc>, PeerServices)>,
by_time: BTreeMap<DateTime<Utc>, (SocketAddr, PeerServices)>,
}
impl TimestampData {
fn update(&mut self, event: MetaAddr) {
use chrono::Duration as CD;
use std::collections::hash_map::Entry;
trace!(
?event,
data.total = self.by_time.len(),
// This would be cleaner if it used "variables" but keeping
// it inside the trace! invocation prevents running the range
// query unless the output will actually be used.
data.recent = self
.by_time
.range(
(Utc::now() - CD::from_std(constants::LIVE_PEER_DURATION).unwrap())..Utc::now()
)
.count()
);
let MetaAddr {
addr,
services,
last_seen,
} = event;
match self.by_addr.entry(addr) {
Entry::Occupied(mut entry) => {
let (prev_last_seen, _) = entry.get();
// If the new timestamp event is older than the current
// one, discard it. This is irrelevant for the timestamp
// collector but is important for combining address
// information from different peers.
if *prev_last_seen > last_seen {
return;
}
self.by_time
.remove(prev_last_seen)
.expect("cannot have by_addr entry without by_time entry");
entry.insert((last_seen, services));
self.by_time.insert(last_seen, (addr, services));
}
Entry::Vacant(entry) => {
entry.insert((last_seen, services));
self.by_time.insert(last_seen, (addr, services));
}
}
}
}
impl TimestampCollector {
/// Constructs a new `TimestampCollector`, spawning a worker task to process updates.
pub fn new() -> TimestampCollector {
let data = Arc::new(Mutex::new(TimestampData::default()));
let data = Arc::new(Mutex::new(AddressBook::default()));
// We need to make a copy now so we can move data into the async block.
let data2 = data.clone();
@ -121,6 +68,11 @@ impl TimestampCollector {
}
}
/// Return a shared reference to the [`AddressBook`] this collector updates.
pub fn address_book(&self) -> Arc<Mutex<AddressBook>> {
self.data.clone()
}
pub(crate) fn sender_handle(&self) -> mpsc::Sender<MetaAddr> {
self.worker_tx.clone()
}

View File

@ -51,7 +51,7 @@ impl Runnable for ConnectCmd {
impl ConnectCmd {
async fn connect(&self) -> Result<(), failure::Error> {
use zebra_network::{Request, Response, TimestampCollector};
use zebra_network::{Request, Response};
info!("begin tower-based peer handling test stub");
use tower::{buffer::Buffer, service_fn, Service, ServiceExt};
@ -77,7 +77,7 @@ impl ConnectCmd {
// Later, this should turn into initial_peers = vec![self.addr];
config.initial_peers = {
use tokio::net::TcpStream;
use zebra_network::should_be_private::PeerConnector;
use zebra_network::should_be_private::{PeerConnector, TimestampCollector};
let collector = TimestampCollector::new();
let mut pc = Buffer::new(
@ -108,7 +108,7 @@ impl ConnectCmd {
addrs.into_iter().map(|meta| meta.addr).collect::<Vec<_>>()
};
let (mut peer_set, _tc) = zebra_network::init(config, node);
let (mut peer_set, _address_book) = zebra_network::init(config, node);
info!("waiting for peer_set ready");
peer_set.ready().await.map_err(Error::from_boxed_compat)?;