Use MetaAddr in the timestamp collector.

We will need service bits information to report on peer addresses, so we
need to collect it in the timestamp collector.
This commit is contained in:
Henry de Valence 2019-10-17 14:33:45 -07:00 committed by Deirdre Connolly
parent 9ec1e01c8f
commit 15a698b23c
2 changed files with 47 additions and 24 deletions

View File

@ -18,8 +18,9 @@ use zebra_chain::types::BlockHeight;
use crate::{ use crate::{
constants, constants,
protocol::{codec::*, internal::*, message::*, types::*}, protocol::{codec::*, internal::*, message::*, types::*},
timestamp_collector::{PeerLastSeen, TimestampCollector}, timestamp_collector::TimestampCollector,
BoxedStdError, Config, types::Network, types::{MetaAddr, Network},
BoxedStdError, Config,
}; };
use super::{error::ErrorSlot, server::ServerState, HandshakeError, PeerClient, PeerServer}; use super::{error::ErrorSlot, server::ServerState, HandshakeError, PeerClient, PeerServer};
@ -28,7 +29,7 @@ use super::{error::ErrorSlot, server::ServerState, HandshakeError, PeerClient, P
pub struct PeerConnector<S> { pub struct PeerConnector<S> {
config: Config, config: Config,
internal_service: S, internal_service: S,
sender: mpsc::Sender<PeerLastSeen>, sender: mpsc::Sender<MetaAddr>,
nonces: Arc<Mutex<HashSet<Nonce>>>, nonces: Arc<Mutex<HashSet<Nonce>>>,
} }
@ -120,8 +121,11 @@ where
// Check that we got a Version and destructure its fields into the local scope. // Check that we got a Version and destructure its fields into the local scope.
debug!(?remote_msg, "got message from remote peer"); debug!(?remote_msg, "got message from remote peer");
let remote_nonce = if let Message::Version { nonce, .. } = remote_msg { let (remote_nonce, remote_services) = if let Message::Version {
nonce nonce, services, ..
} = remote_msg
{
(nonce, services)
} else { } else {
return Err(HandshakeError::UnexpectedMessage(remote_msg)); return Err(HandshakeError::UnexpectedMessage(remote_msg));
}; };
@ -181,7 +185,13 @@ where
async move { async move {
if let Ok(_) = msg { if let Ok(_) = msg {
use futures::sink::SinkExt; use futures::sink::SinkExt;
let _ = sender.send((addr, Utc::now())).await; let _ = sender
.send(MetaAddr {
addr,
services: remote_services,
last_seen: Utc::now(),
})
.await;
} }
msg msg
} }

View File

@ -10,10 +10,10 @@ use chrono::{DateTime, Utc};
use futures::channel::mpsc; use futures::channel::mpsc;
use tokio::prelude::*; use tokio::prelude::*;
use crate::constants; use crate::{
constants,
/// A type alias for a timestamp event sent to a `TimestampCollector`. types::{MetaAddr, PeerServices},
pub(crate) type PeerLastSeen = (SocketAddr, DateTime<Utc>); };
/// Maintains a lookup table from peer addresses to last-seen times. /// Maintains a lookup table from peer addresses to last-seen times.
/// ///
@ -29,23 +29,22 @@ pub struct TimestampCollector {
// event buffer to hide latency if other tasks block it temporarily. // event buffer to hide latency if other tasks block it temporarily.
data: Arc<Mutex<TimestampData>>, data: Arc<Mutex<TimestampData>>,
shutdown: Arc<ShutdownSignal>, shutdown: Arc<ShutdownSignal>,
worker_tx: mpsc::Sender<PeerLastSeen>, worker_tx: mpsc::Sender<MetaAddr>,
} }
#[derive(Default, Debug)] #[derive(Default, Debug)]
struct TimestampData { struct TimestampData {
by_addr: HashMap<SocketAddr, DateTime<Utc>>, by_addr: HashMap<SocketAddr, (DateTime<Utc>, PeerServices)>,
by_time: BTreeMap<DateTime<Utc>, SocketAddr>, by_time: BTreeMap<DateTime<Utc>, (SocketAddr, PeerServices)>,
} }
impl TimestampData { impl TimestampData {
fn update(&mut self, event: PeerLastSeen) { fn update(&mut self, event: MetaAddr) {
use chrono::Duration as CD; use chrono::Duration as CD;
use std::collections::hash_map::Entry; use std::collections::hash_map::Entry;
let (addr, timestamp) = event;
trace!( trace!(
?addr, ?event,
?timestamp,
data.total = self.by_time.len(), data.total = self.by_time.len(),
// This would be cleaner if it used "variables" but keeping // This would be cleaner if it used "variables" but keeping
// it inside the trace! invocation prevents running the range // it inside the trace! invocation prevents running the range
@ -57,18 +56,32 @@ impl TimestampData {
) )
.count() .count()
); );
let MetaAddr {
addr,
services,
last_seen,
} = event;
match self.by_addr.entry(addr) { match self.by_addr.entry(addr) {
Entry::Occupied(mut entry) => { Entry::Occupied(mut entry) => {
let last_timestamp = entry.get(); let (prev_last_seen, _) = entry.get();
// If the new timestamp event is older than the current
// one, discard it. This is irrelevant for the timestamp
// collector but is important for combining address
// information from different peers.
if *prev_last_seen > last_seen {
return;
}
self.by_time self.by_time
.remove(last_timestamp) .remove(prev_last_seen)
.expect("cannot have by_addr entry without by_time entry"); .expect("cannot have by_addr entry without by_time entry");
entry.insert(timestamp); entry.insert((last_seen, services));
self.by_time.insert(timestamp, addr); self.by_time.insert(last_seen, (addr, services));
} }
Entry::Vacant(entry) => { Entry::Vacant(entry) => {
entry.insert(timestamp); entry.insert((last_seen, services));
self.by_time.insert(timestamp, addr); self.by_time.insert(last_seen, (addr, services));
} }
} }
} }
@ -108,7 +121,7 @@ impl TimestampCollector {
} }
} }
pub(crate) fn sender_handle(&self) -> mpsc::Sender<PeerLastSeen> { pub(crate) fn sender_handle(&self) -> mpsc::Sender<MetaAddr> {
self.worker_tx.clone() self.worker_tx.clone()
} }
} }