diff --git a/zebra-network/src/address_book.rs b/zebra-network/src/address_book.rs index 1a162f2a..07d40a6c 100644 --- a/zebra-network/src/address_book.rs +++ b/zebra-network/src/address_book.rs @@ -2,7 +2,7 @@ //! seen, and what services they provide. use std::{ - collections::{BTreeMap, HashMap}, + collections::{BTreeSet, HashMap}, iter::Extend, net::SocketAddr, sync::{Arc, Mutex}, @@ -22,20 +22,31 @@ use crate::{ #[derive(Default, Debug)] pub struct AddressBook { by_addr: HashMap, PeerServices)>, - by_time: BTreeMap, (SocketAddr, PeerServices)>, + by_time: BTreeSet, } impl AddressBook { + fn assert_consistency(&self) { + for (a, (t, s)) in self.by_addr.iter() { + for meta in self.by_time.iter().filter(|meta| meta.addr == *a) { + if meta.last_seen != *t || meta.services != *s { + panic!("meta {:?} is not {:?}, {:?}, {:?}", meta, a, t, s); + } + } + } + } + /// Update the address book with `event`, a [`MetaAddr`] representing /// observation of a peer. pub fn update(&mut self, event: MetaAddr) { use std::collections::hash_map::Entry; - debug!( + trace!( ?event, data.total = self.by_time.len(), data.recent = (self.by_time.len() - self.disconnected_peers().count()), ); + //self.assert_consistency(); let MetaAddr { addr, @@ -45,37 +56,40 @@ impl AddressBook { match self.by_addr.entry(addr) { Entry::Occupied(mut entry) => { - let (prev_last_seen, _) = entry.get(); - // If the new timestamp event is older than the current - // one, discard it. This is irrelevant for the timestamp - // collector but is important for combining address - // information from different peers. - if *prev_last_seen > last_seen { + let (prev_last_seen, prev_services) = entry.get().clone(); + // Ignore stale entries. + if prev_last_seen > last_seen { return; } self.by_time - .remove(prev_last_seen) + .take(&MetaAddr { + addr, + services: prev_services, + last_seen: prev_last_seen, + }) .expect("cannot have by_addr entry without by_time entry"); entry.insert((last_seen, services)); - self.by_time.insert(last_seen, (addr, services)); + self.by_time.insert(event); } Entry::Vacant(entry) => { entry.insert((last_seen, services)); - self.by_time.insert(last_seen, (addr, services)); + self.by_time.insert(event); } } + //self.assert_consistency(); } /// Return an iterator over all peers, ordered from most recently seen to /// least recently seen. pub fn peers<'a>(&'a self) -> impl Iterator + 'a { - self.by_time.iter().rev().map(from_by_time_kv) + self.by_time.iter().rev().cloned() } /// Return an iterator over peers known to be disconnected, ordered from most /// recently seen to least recently seen. pub fn disconnected_peers<'a>(&'a self) -> impl Iterator + 'a { use chrono::Duration as CD; + use std::net::{IpAddr, Ipv4Addr}; use std::ops::Bound::{Excluded, Unbounded}; // LIVE_PEER_DURATION represents the time interval in which we are @@ -83,11 +97,18 @@ impl AddressBook { // connection. Therefore, if the last-seen timestamp is older than // LIVE_PEER_DURATION ago, we know we must have disconnected from it. let cutoff = Utc::now() - CD::from_std(constants::LIVE_PEER_DURATION).unwrap(); + let cutoff_meta = MetaAddr { + last_seen: cutoff, + // The ordering on MetaAddrs is newest-first, then arbitrary, + // so any fields will do here. + addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0), + services: PeerServices::default(), + }; self.by_time - .range((Unbounded, Excluded(cutoff))) + .range((Excluded(cutoff_meta), Unbounded)) .rev() - .map(from_by_time_kv) + .cloned() } /// Returns an iterator that drains entries from the address book, removing @@ -97,17 +118,6 @@ impl AddressBook { } } -// Helper impl to convert by_time Iterator Items back to MetaAddrs -// This could easily be a From impl, but trait impls are public, and this shouldn't be. -fn from_by_time_kv(by_time_kv: (&DateTime, &(SocketAddr, PeerServices))) -> MetaAddr { - let (last_seen, (addr, services)) = by_time_kv; - MetaAddr { - last_seen: last_seen.clone(), - addr: addr.clone(), - services: services.clone(), - } -} - impl Extend for AddressBook { fn extend(&mut self, iter: T) where @@ -127,17 +137,12 @@ impl<'a> Iterator for Drain<'a> { type Item = MetaAddr; fn next(&mut self) -> Option { - let most_recent = self.book.by_time.keys().rev().next()?.clone(); - let (addr, services) = self - .book - .by_time - .remove(&most_recent) - .expect("key from keys() must be present in btreemap"); - self.book.by_addr.remove(&addr); - Some(MetaAddr { - addr, - services, - last_seen: most_recent, - }) + let most_recent = self.book.by_time.iter().next()?.clone(); + self.book.by_time.remove(&most_recent); + self.book + .by_addr + .remove(&most_recent.addr) + .expect("cannot have by_time entry without by_addr entry"); + Some(most_recent) } -} \ No newline at end of file +} diff --git a/zebra-network/src/meta_addr.rs b/zebra-network/src/meta_addr.rs index 91eda3b4..20d6491c 100644 --- a/zebra-network/src/meta_addr.rs +++ b/zebra-network/src/meta_addr.rs @@ -1,7 +1,10 @@ //! An address-with-metadata type used in Bitcoin networking. -use std::io::{Read, Write}; -use std::net::SocketAddr; +use std::{ + cmp::{Ord, Ordering}, + io::{Read, Write}, + net::SocketAddr, +}; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use chrono::{DateTime, TimeZone, Utc}; @@ -28,6 +31,34 @@ pub struct MetaAddr { pub last_seen: DateTime, } +impl Ord for MetaAddr { + /// `MetaAddr`s are sorted newest-first, and then in an arbitrary + /// but determinate total order. + fn cmp(&self, other: &Self) -> Ordering { + let newest_first = self.last_seen.cmp(&other.last_seen).reverse(); + newest_first.then_with(|| { + // The remainder is meaningless as an ordering, but required so that we + // have a total order on `MetaAddr` values: self and other must compare + // as Ordering::Equal iff they are equal. + use std::net::IpAddr::{V4, V6}; + match (self.addr.ip(), other.addr.ip()) { + (V4(a), V4(b)) => a.octets().cmp(&b.octets()), + (V6(a), V6(b)) => a.octets().cmp(&b.octets()), + (V4(_), V6(_)) => Ordering::Less, + (V6(_), V4(_)) => Ordering::Greater, + } + .then(self.addr.port().cmp(&other.addr.port())) + .then(self.services.bits().cmp(&other.services.bits())) + }) + } +} + +impl PartialOrd for MetaAddr { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + impl ZcashSerialize for MetaAddr { fn zcash_serialize(&self, mut writer: W) -> Result<(), SerializationError> { writer.write_u32::(self.last_seen.timestamp() as u32)?; diff --git a/zebrad/src/commands/connect.rs b/zebrad/src/commands/connect.rs index dc046509..6bc5893f 100644 --- a/zebrad/src/commands/connect.rs +++ b/zebrad/src/commands/connect.rs @@ -142,6 +142,11 @@ impl ConnectCmd { let addrs = all_addrs.drain_recent().collect::>(); info!(addrs.len = addrs.len(), ab.len = all_addrs.peers().count()); + let mut head = Vec::new(); + head.extend_from_slice(&addrs[0..5]); + let mut tail = Vec::new(); + tail.extend_from_slice(&addrs[addrs.len() - 5..]); + info!(addrs.first = ?head, addrs.last = ?tail); loop { // empty loop ensures we don't exit the application,