From 0203d1475a95e90eb6fd7c4101caa26aeddece5b Mon Sep 17 00:00:00 2001 From: teor Date: Mon, 19 Apr 2021 16:04:24 +1000 Subject: [PATCH] Refactor and document correctness for std::sync::Mutex --- zebra-network/src/address_book.rs | 2 +- zebra-network/src/peer_set/candidate_set.rs | 65 +++++++++++++-------- zebra-network/src/peer_set/initialize.rs | 7 +-- zebra-network/src/peer_set/set.rs | 13 +++-- zebra-network/src/timestamp_collector.rs | 10 +++- zebrad/src/components/inbound.rs | 17 ++++-- 6 files changed, 73 insertions(+), 41 deletions(-) diff --git a/zebra-network/src/address_book.rs b/zebra-network/src/address_book.rs index f18d3654..63b0c0c1 100644 --- a/zebra-network/src/address_book.rs +++ b/zebra-network/src/address_book.rs @@ -15,7 +15,7 @@ use crate::{constants, types::MetaAddr, PeerAddrState}; /// A database of peers, their advertised services, and information on when they /// were last seen. -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct AddressBook { /// Each known peer address has a matching `MetaAddr` by_addr: HashMap, diff --git a/zebra-network/src/peer_set/candidate_set.rs b/zebra-network/src/peer_set/candidate_set.rs index 602624b8..d296747b 100644 --- a/zebra-network/src/peer_set/candidate_set.rs +++ b/zebra-network/src/peer_set/candidate_set.rs @@ -1,8 +1,4 @@ -use std::{ - mem, - sync::{Arc, Mutex}, - time::Duration, -}; +use std::{mem, sync::Arc, time::Duration}; use futures::stream::{FuturesUnordered, StreamExt}; use tokio::time::{sleep, sleep_until, timeout, Sleep}; @@ -105,7 +101,7 @@ use crate::{constants, types::MetaAddr, AddressBook, BoxError, Request, Response // * draw arrow from the "peer message" box into the `Responded` state box // * make the "disjoint states" box include `AttemptPending` pub(super) struct CandidateSet { - pub(super) peer_set: Arc>, + pub(super) address_book: Arc>, pub(super) peer_service: S, next_peer_min_wait: Sleep, } @@ -123,10 +119,13 @@ where /// are initiated at least `MIN_PEER_CONNECTION_INTERVAL` apart. const MIN_PEER_CONNECTION_INTERVAL: Duration = Duration::from_millis(100); - /// Uses `peer_set` and `peer_service` to manage a [`CandidateSet`] of peers. - pub fn new(peer_set: Arc>, peer_service: S) -> CandidateSet { + /// Uses `address_book` and `peer_service` to manage a [`CandidateSet`] of peers. + pub fn new( + address_book: Arc>, + peer_service: S, + ) -> CandidateSet { CandidateSet { - peer_set, + address_book, peer_service, next_peer_min_wait: sleep(Duration::from_secs(0)), } @@ -163,9 +162,11 @@ where for _ in 0..constants::GET_ADDR_FANOUT { // CORRECTNESS // - // avoid deadlocks when there are no connected peers, and: + // Use a timeout to avoid deadlocks when there are no connected + // peers, and: // - we're waiting on a handshake to complete so there are peers, or - // - another task that handles or adds peers is waiting on this task to complete. + // - another task that handles or adds peers is waiting on this task + // to complete. let peer_service = match timeout(constants::REQUEST_TIMEOUT, self.peer_service.ready_and()).await { // update must only return an error for permanent failures @@ -185,20 +186,31 @@ where match rsp { Ok(Response::Peers(rsp_addrs)) => { // Filter new addresses to ensure that gossiped addresses are actually new - let peer_set = &self.peer_set; + let address_book = &self.address_book; + // # Correctness + // + // Briefly hold the address book threaded mutex, each time we + // check an address. + // // TODO: reduce mutex contention by moving the filtering into - // the address book itself + // the address book itself (#1976) let new_addrs = rsp_addrs .iter() - .filter(|meta| !peer_set.lock().unwrap().contains_addr(&meta.addr)) + .filter(|meta| !address_book.lock().unwrap().contains_addr(&meta.addr)) .collect::>(); trace!( ?rsp_addrs, new_addr_count = ?new_addrs.len(), "got response to GetPeers" ); + // New addresses are deserialized in the `NeverAttempted` state - peer_set + // + // # Correctness + // + // Briefly hold the address book threaded mutex, to extend + // the address list. + address_book .lock() .unwrap() .extend(new_addrs.into_iter().cloned()); @@ -242,9 +254,10 @@ where let mut sleep = sleep_until(current_deadline + Self::MIN_PEER_CONNECTION_INTERVAL); mem::swap(&mut self.next_peer_min_wait, &mut sleep); - // CORRECTNESS + // # Correctness // - // In this critical section, we hold the address mutex. + // In this critical section, we hold the address mutex, blocking the + // current thread, and all async tasks scheduled on that thread. // // To avoid deadlocks, the critical section: // - must not acquire any other locks @@ -253,17 +266,17 @@ where // To avoid hangs, any computation in the critical section should // be kept to a minimum. let reconnect = { - let mut peer_set_guard = self.peer_set.lock().unwrap(); - // It's okay to early return here because we're returning None - // instead of yielding the next connection. - let reconnect = peer_set_guard.reconnection_peers().next()?; + let mut guard = self.address_book.lock().unwrap(); + // It's okay to return without sleeping here, because we're returning + // `None`. We only need to sleep before yielding an address. + let reconnect = guard.reconnection_peers().next()?; let reconnect = MetaAddr::new_reconnect(&reconnect.addr, &reconnect.services); - peer_set_guard.update(reconnect); + guard.update(reconnect); reconnect }; - // This is the line that is most relevant to the above ## Security section + // SECURITY: rate-limit new candidate connections sleep.await; Some(reconnect) @@ -272,6 +285,10 @@ where /// Mark `addr` as a failed peer. pub fn report_failed(&mut self, addr: &MetaAddr) { let addr = MetaAddr::new_errored(&addr.addr, &addr.services); - self.peer_set.lock().unwrap().update(addr); + // # Correctness + // + // Briefly hold the address book threaded mutex, to update the state for + // a single address. + self.address_book.lock().unwrap().update(addr); } } diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index 9e6e4a6e..b97cc5aa 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -3,10 +3,7 @@ // Portions of this submodule were adapted from tower-balance, // which is (c) 2019 Tower Contributors (MIT licensed). -use std::{ - net::SocketAddr, - sync::{Arc, Mutex}, -}; +use std::{net::SocketAddr, sync::Arc}; use futures::{ channel::mpsc, @@ -65,7 +62,7 @@ pub async fn init( inbound_service: S, ) -> ( Buffer, Request>, - Arc>, + Arc>, ) where S: Service + Clone + Send + 'static, diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index 8413635b..44d6a878 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -5,7 +5,7 @@ use std::{ future::Future, marker::PhantomData, pin::Pin, - sync::{Arc, Mutex}, + sync::Arc, task::{Context, Poll}, time::Instant, }; @@ -106,7 +106,7 @@ where /// A shared list of peer addresses. /// /// Used for logging diagnostics. - address_book: Arc>, + address_book: Arc>, } impl PeerSet @@ -124,7 +124,7 @@ where demand_signal: mpsc::Sender<()>, handle_rx: tokio::sync::oneshot::Receiver>>>, inv_stream: broadcast::Receiver<(InventoryHash, SocketAddr)>, - address_book: Arc>, + address_book: Arc>, ) -> Self { Self { discover, @@ -379,8 +379,13 @@ where } self.last_peer_log = Some(Instant::now()); + + // # Correctness + // // Only log address metrics in exceptional circumstances, to avoid lock contention. - // TODO: replace with a watch channel that is updated in `AddressBook::update_metrics()`. + // + // TODO: replace with a watch channel that is updated in `AddressBook::update_metrics()`, + // or turn the address book into a service (#1976) let address_metrics = self.address_book.lock().unwrap().address_metrics(); if unready_services_len == 0 { warn!( diff --git a/zebra-network/src/timestamp_collector.rs b/zebra-network/src/timestamp_collector.rs index e78304da..2dfe0bf2 100644 --- a/zebra-network/src/timestamp_collector.rs +++ b/zebra-network/src/timestamp_collector.rs @@ -1,6 +1,6 @@ //! The timestamp collector collects liveness information from peers. -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use futures::{channel::mpsc, prelude::*}; @@ -14,11 +14,11 @@ impl TimestampCollector { /// Spawn a new [`TimestampCollector`] task, and return handles for the /// transmission channel for timestamp events and for the [`AddressBook`] it /// updates. - pub fn spawn() -> (Arc>, mpsc::Sender) { + pub fn spawn() -> (Arc>, mpsc::Sender) { use tracing::Level; const TIMESTAMP_WORKER_BUFFER_SIZE: usize = 100; let (worker_tx, mut worker_rx) = mpsc::channel(TIMESTAMP_WORKER_BUFFER_SIZE); - let address_book = Arc::new(Mutex::new(AddressBook::new(span!( + let address_book = Arc::new(std::sync::Mutex::new(AddressBook::new(span!( Level::TRACE, "timestamp collector" )))); @@ -26,6 +26,10 @@ impl TimestampCollector { let worker = async move { while let Some(event) = worker_rx.next().await { + // # Correctness + // + // Briefly hold the address book threaded mutex, to update the + // state for a single address. worker_address_book .lock() .expect("mutex should be unpoisoned") diff --git a/zebrad/src/components/inbound.rs b/zebrad/src/components/inbound.rs index 283bbd7b..d8b8d697 100644 --- a/zebrad/src/components/inbound.rs +++ b/zebrad/src/components/inbound.rs @@ -1,7 +1,7 @@ use std::{ future::Future, pin::Pin, - sync::{Arc, Mutex}, + sync::Arc, task::{Context, Poll}, }; @@ -31,7 +31,7 @@ type State = Buffer, zs::Req type Verifier = Buffer, block::Hash, VerifyChainError>, Arc>; type InboundDownloads = Downloads, Timeout, State>; -pub type NetworkSetupData = (Outbound, Arc>); +pub type NetworkSetupData = (Outbound, Arc>); /// Tracks the internal state of the [`Inbound`] service during network setup. pub enum Setup { @@ -54,7 +54,7 @@ pub enum Setup { /// All requests are answered. Initialized { /// A shared list of peer addresses. - address_book: Arc>, + address_book: Arc>, /// A `futures::Stream` that downloads and verifies gossipped blocks. downloads: Pin>, @@ -228,11 +228,20 @@ impl Service for Inbound { match req { zn::Request::Peers => { if let Setup::Initialized { address_book, .. } = &self.network_setup { + // # Security + // // We could truncate the list to try to not reveal our entire // peer set. But because we don't monitor repeated requests, // this wouldn't actually achieve anything, because a crawler // could just repeatedly query it. - let mut peers = address_book.lock().unwrap().sanitized(); + // + // # Correctness + // + // Briefly hold the address book threaded mutex while + // cloning the address book. Then sanitize after releasing + // the lock. + let peers = address_book.lock().unwrap().clone(); + let mut peers = peers.sanitized(); const MAX_ADDR: usize = 1000; // bitcoin protocol constant peers.truncate(MAX_ADDR); async { Ok(zn::Response::Peers(peers)) }.boxed()