From 92828bbb29554daddc433e03303dc033605f82a0 Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 7 May 2021 11:08:06 +1000 Subject: [PATCH] Reliability: send local listener address to peers When peers ask for peer addresses, add our local listener address to the set of addresses, sanitize, then truncate. Sanitize shuffles addresses, so if there are lots of addresses in the address book, our address will only be sent to some peers. --- zebra-network/src/address_book.rs | 19 ++++++++++++++----- zebra-network/src/meta_addr.rs | 15 +++++++++++++-- zebra-network/src/peer_set/initialize.rs | 2 +- zebra-network/src/timestamp_collector.rs | 12 ++++++------ zebrad/src/components/inbound.rs | 8 +++++++- 5 files changed, 41 insertions(+), 15 deletions(-) diff --git a/zebra-network/src/address_book.rs b/zebra-network/src/address_book.rs index 3061e5f5..8fa4864a 100644 --- a/zebra-network/src/address_book.rs +++ b/zebra-network/src/address_book.rs @@ -11,7 +11,7 @@ use std::{ use chrono::{DateTime, Utc}; use tracing::Span; -use crate::{constants, types::MetaAddr, PeerAddrState}; +use crate::{constants, types::MetaAddr, Config, PeerAddrState}; /// A database of peer listener addresses, their advertised services, and /// information on when they were last seen. @@ -48,13 +48,16 @@ use crate::{constants, types::MetaAddr, PeerAddrState}; /// - the canonical address of any connection. #[derive(Clone, Debug)] pub struct AddressBook { - /// Each known peer address has a matching `MetaAddr` + /// Each known peer address has a matching `MetaAddr`. by_addr: HashMap, + /// The local listener address. + local_listener: SocketAddr, + /// The span for operations on this address book. span: Span, - /// The last time we logged a message about the address metrics + /// The last time we logged a message about the address metrics. last_address_log: Option, } @@ -85,13 +88,14 @@ pub struct AddressMetrics { #[allow(clippy::len_without_is_empty)] impl AddressBook { - /// Construct an `AddressBook` with the given [`tracing::Span`]. - pub fn new(span: Span) -> AddressBook { + /// Construct an `AddressBook` with the given `config` and [`tracing::Span`]. + pub fn new(config: &Config, span: Span) -> AddressBook { let constructor_span = span.clone(); let _guard = constructor_span.enter(); let mut new_book = AddressBook { by_addr: HashMap::default(), + local_listener: config.listen_addr, span, last_address_log: None, }; @@ -100,6 +104,11 @@ impl AddressBook { new_book } + /// Get the local listener address. + pub fn get_local_listener(&self) -> MetaAddr { + MetaAddr::new_local_listener(&self.local_listener) + } + /// Get the contents of `self` in random order with sanitized timestamps. pub fn sanitized(&self) -> Vec { use rand::seq::SliceRandom; diff --git a/zebra-network/src/meta_addr.rs b/zebra-network/src/meta_addr.rs index 39b258fb..ea4b4b00 100644 --- a/zebra-network/src/meta_addr.rs +++ b/zebra-network/src/meta_addr.rs @@ -194,6 +194,17 @@ impl MetaAddr { } } + /// Create a new `MetaAddr` for our own listener address. + pub fn new_local_listener(addr: &SocketAddr) -> MetaAddr { + MetaAddr { + addr: *addr, + // TODO: create a "local services" constant + services: PeerServices::NODE_NETWORK, + last_seen: Utc::now(), + last_connection_state: Responded, + } + } + /// Create a new `MetaAddr` for a peer that has just had an error. pub fn new_errored(addr: &SocketAddr, services: &PeerServices) -> MetaAddr { MetaAddr { @@ -251,8 +262,8 @@ impl MetaAddr { let last_seen = Utc.timestamp(ts - ts.rem_euclid(interval), 0); MetaAddr { addr: self.addr, - // services are sanitized during parsing, so we don't need to make - // any changes here + // services are sanitized during parsing, or set to a fixed valued by + // new_local_listener, so we don't need to sanitize here services: self.services, last_seen, // the state isn't sent to the remote peer, but sanitize it anyway diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index 8841375f..26e9190e 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -64,7 +64,7 @@ where S: Service + Clone + Send + 'static, S::Future: Send + 'static, { - let (address_book, timestamp_collector) = TimestampCollector::spawn(); + let (address_book, timestamp_collector) = TimestampCollector::spawn(&config); let (inv_sender, inv_receiver) = broadcast::channel(100); // Construct services that handle inbound handshakes and perform outbound diff --git a/zebra-network/src/timestamp_collector.rs b/zebra-network/src/timestamp_collector.rs index 2dfe0bf2..143fe7cd 100644 --- a/zebra-network/src/timestamp_collector.rs +++ b/zebra-network/src/timestamp_collector.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use futures::{channel::mpsc, prelude::*}; -use crate::{types::MetaAddr, AddressBook}; +use crate::{types::MetaAddr, AddressBook, Config}; /// The timestamp collector hooks into incoming message streams for each peer and /// records per-connection last-seen timestamps into an [`AddressBook`]. @@ -14,14 +14,14 @@ impl TimestampCollector { /// Spawn a new [`TimestampCollector`] task, and return handles for the /// transmission channel for timestamp events and for the [`AddressBook`] it /// updates. - pub fn spawn() -> (Arc>, mpsc::Sender) { + pub fn spawn(config: &Config) -> (Arc>, mpsc::Sender) { use tracing::Level; const TIMESTAMP_WORKER_BUFFER_SIZE: usize = 100; let (worker_tx, mut worker_rx) = mpsc::channel(TIMESTAMP_WORKER_BUFFER_SIZE); - let address_book = Arc::new(std::sync::Mutex::new(AddressBook::new(span!( - Level::TRACE, - "timestamp collector" - )))); + let address_book = Arc::new(std::sync::Mutex::new(AddressBook::new( + config, + span!(Level::TRACE, "timestamp collector"), + ))); let worker_address_book = address_book.clone(); let worker = async move { diff --git a/zebrad/src/components/inbound.rs b/zebrad/src/components/inbound.rs index 0e91dbcd..e1e96561 100644 --- a/zebrad/src/components/inbound.rs +++ b/zebrad/src/components/inbound.rs @@ -240,7 +240,13 @@ impl Service for Inbound { // Briefly hold the address book threaded mutex while // cloning the address book. Then sanitize after releasing // the lock. - let peers = address_book.lock().unwrap().clone(); + let mut peers = address_book.lock().unwrap().clone(); + + // Add our local listener address to the advertised peers + let local_listener = address_book.lock().unwrap().get_local_listener(); + peers.update(local_listener); + + // Send a sanitized response let mut peers = peers.sanitized(); const MAX_ADDR: usize = 1000; // bitcoin protocol constant peers.truncate(MAX_ADDR);