From 15a698b23cea5c8d8cb5d913ec6b1551033c3096 Mon Sep 17 00:00:00 2001 From: Henry de Valence Date: Thu, 17 Oct 2019 14:33:45 -0700 Subject: [PATCH] Use MetaAddr in the timestamp collector. We will need service bits information to report on peer addresses, so we need to collect it in the timestamp collector. --- zebra-network/src/peer/connector.rs | 22 ++++++++--- zebra-network/src/timestamp_collector.rs | 49 +++++++++++++++--------- 2 files changed, 47 insertions(+), 24 deletions(-) diff --git a/zebra-network/src/peer/connector.rs b/zebra-network/src/peer/connector.rs index aa069a96..104d94b0 100644 --- a/zebra-network/src/peer/connector.rs +++ b/zebra-network/src/peer/connector.rs @@ -18,8 +18,9 @@ use zebra_chain::types::BlockHeight; use crate::{ constants, protocol::{codec::*, internal::*, message::*, types::*}, - timestamp_collector::{PeerLastSeen, TimestampCollector}, - BoxedStdError, Config, types::Network, + timestamp_collector::TimestampCollector, + types::{MetaAddr, Network}, + BoxedStdError, Config, }; use super::{error::ErrorSlot, server::ServerState, HandshakeError, PeerClient, PeerServer}; @@ -28,7 +29,7 @@ use super::{error::ErrorSlot, server::ServerState, HandshakeError, PeerClient, P pub struct PeerConnector { config: Config, internal_service: S, - sender: mpsc::Sender, + sender: mpsc::Sender, nonces: Arc>>, } @@ -120,8 +121,11 @@ where // Check that we got a Version and destructure its fields into the local scope. debug!(?remote_msg, "got message from remote peer"); - let remote_nonce = if let Message::Version { nonce, .. } = remote_msg { - nonce + let (remote_nonce, remote_services) = if let Message::Version { + nonce, services, .. + } = remote_msg + { + (nonce, services) } else { return Err(HandshakeError::UnexpectedMessage(remote_msg)); }; @@ -181,7 +185,13 @@ where async move { if let Ok(_) = msg { use futures::sink::SinkExt; - let _ = sender.send((addr, Utc::now())).await; + let _ = sender + .send(MetaAddr { + addr, + services: remote_services, + last_seen: Utc::now(), + }) + .await; } msg } diff --git a/zebra-network/src/timestamp_collector.rs b/zebra-network/src/timestamp_collector.rs index a0125793..586320af 100644 --- a/zebra-network/src/timestamp_collector.rs +++ b/zebra-network/src/timestamp_collector.rs @@ -10,10 +10,10 @@ use chrono::{DateTime, Utc}; use futures::channel::mpsc; use tokio::prelude::*; -use crate::constants; - -/// A type alias for a timestamp event sent to a `TimestampCollector`. -pub(crate) type PeerLastSeen = (SocketAddr, DateTime); +use crate::{ + constants, + types::{MetaAddr, PeerServices}, +}; /// Maintains a lookup table from peer addresses to last-seen times. /// @@ -29,23 +29,22 @@ pub struct TimestampCollector { // event buffer to hide latency if other tasks block it temporarily. data: Arc>, shutdown: Arc, - worker_tx: mpsc::Sender, + worker_tx: mpsc::Sender, } #[derive(Default, Debug)] struct TimestampData { - by_addr: HashMap>, - by_time: BTreeMap, SocketAddr>, + by_addr: HashMap, PeerServices)>, + by_time: BTreeMap, (SocketAddr, PeerServices)>, } impl TimestampData { - fn update(&mut self, event: PeerLastSeen) { + fn update(&mut self, event: MetaAddr) { use chrono::Duration as CD; use std::collections::hash_map::Entry; - let (addr, timestamp) = event; + trace!( - ?addr, - ?timestamp, + ?event, data.total = self.by_time.len(), // This would be cleaner if it used "variables" but keeping // it inside the trace! invocation prevents running the range @@ -57,18 +56,32 @@ impl TimestampData { ) .count() ); + + let MetaAddr { + addr, + services, + last_seen, + } = event; + match self.by_addr.entry(addr) { Entry::Occupied(mut entry) => { - let last_timestamp = entry.get(); + let (prev_last_seen, _) = entry.get(); + // If the new timestamp event is older than the current + // one, discard it. This is irrelevant for the timestamp + // collector but is important for combining address + // information from different peers. + if *prev_last_seen > last_seen { + return; + } self.by_time - .remove(last_timestamp) + .remove(prev_last_seen) .expect("cannot have by_addr entry without by_time entry"); - entry.insert(timestamp); - self.by_time.insert(timestamp, addr); + entry.insert((last_seen, services)); + self.by_time.insert(last_seen, (addr, services)); } Entry::Vacant(entry) => { - entry.insert(timestamp); - self.by_time.insert(timestamp, addr); + entry.insert((last_seen, services)); + self.by_time.insert(last_seen, (addr, services)); } } } @@ -108,7 +121,7 @@ impl TimestampCollector { } } - pub(crate) fn sender_handle(&self) -> mpsc::Sender { + pub(crate) fn sender_handle(&self) -> mpsc::Sender { self.worker_tx.clone() } }