diff --git a/zebra-network/src/peer_set/candidate_set.rs b/zebra-network/src/peer_set/candidate_set.rs index 76006672..f3126e6a 100644 --- a/zebra-network/src/peer_set/candidate_set.rs +++ b/zebra-network/src/peer_set/candidate_set.rs @@ -125,7 +125,11 @@ mod tests; // When we add the Seed state: // * show that seed peers that transition to other never attempted // states are already in the address book -pub(crate) struct CandidateSet { +pub(crate) struct CandidateSet +where + S: Service + Send, + S::Future: Send + 'static, +{ // Correctness: the address book must be private, // so all operations are performed on a blocking thread (see #1976). address_book: Arc>, @@ -136,7 +140,7 @@ pub(crate) struct CandidateSet { impl CandidateSet where - S: Service, + S: Service + Send, S::Future: Send + 'static, { /// Uses `address_book` and `peer_service` to manage a [`CandidateSet`] of peers. @@ -180,8 +184,6 @@ where /// The handshaker sets up the peer message receiver so it also sends a /// [`Responded`] peer address update. /// - /// [`report_failed`][Self::report_failed] puts peers into the [`Failed`] state. - /// /// [`next`][Self::next] puts peers into the [`AttemptPending`] state. /// /// ## Security @@ -411,21 +413,9 @@ where Some(next_peer) } - /// Mark `addr` as a failed peer. - pub async fn report_failed(&mut self, addr: &MetaAddr) { - let addr = MetaAddr::new_errored(addr.addr, addr.services); - - // # Correctness - // - // Spawn address book accesses on a blocking thread, - // to avoid deadlocks (see #1976). - let address_book = self.address_book.clone(); - let span = Span::current(); - tokio::task::spawn_blocking(move || { - span.in_scope(|| address_book.lock().unwrap().update(addr)) - }) - .await - .expect("panic in peer failure address book update task"); + /// Returns the address book for this `CandidateSet`. + pub async fn address_book(&self) -> Arc> { + self.address_book.clone() } } diff --git a/zebra-network/src/peer_set/candidate_set/tests/prop.rs b/zebra-network/src/peer_set/candidate_set/tests/prop.rs index 394e35df..d77b190c 100644 --- a/zebra-network/src/peer_set/candidate_set/tests/prop.rs +++ b/zebra-network/src/peer_set/candidate_set/tests/prop.rs @@ -139,7 +139,7 @@ proptest! { /// - if no reconnection peer is returned at all. async fn check_candidates_rate_limiting(candidate_set: &mut CandidateSet, candidates: u32) where - S: tower::Service, + S: tower::Service + Send, S::Future: Send + 'static, { let mut now = Instant::now(); diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index 72e1b887..d306475b 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -5,6 +5,7 @@ use std::{ collections::{BTreeMap, HashSet}, + convert::Infallible, net::SocketAddr, sync::Arc, time::Duration, @@ -13,7 +14,7 @@ use std::{ use futures::{ future::{self, FutureExt}, sink::SinkExt, - stream::{FuturesUnordered, StreamExt, TryStreamExt}, + stream::{FuturesUnordered, StreamExt}, TryFutureExt, }; use rand::seq::SliceRandom; @@ -26,6 +27,7 @@ use tokio_stream::wrappers::IntervalStream; use tower::{ buffer::Buffer, discover::Change, layer::Layer, util::BoxService, Service, ServiceExt, }; +use tracing::Span; use tracing_futures::Instrument; use zebra_chain::chain_tip::ChainTip; @@ -46,11 +48,15 @@ use crate::{ #[cfg(test)] mod tests; -/// The result of an outbound peer connection attempt or inbound connection -/// handshake. +/// A successful outbound peer connection attempt or inbound connection handshake. /// -/// This result comes from the `Handshaker`. -type DiscoveredPeer = Result<(PeerSocketAddr, peer::Client), BoxError>; +/// The [`Handshake`](peer::Handshake) service returns a [`Result`]. Only successful connections +/// should be sent on the channel. Errors should be logged or ignored. +/// +/// We don't allow any errors in this type, because: +/// - The connection limits don't include failed connections +/// - tower::Discover interprets an error as stream termination +type DiscoveredPeer = (PeerSocketAddr, peer::Client); /// Initialize a peer set, using a network `config`, `inbound_service`, /// and `latest_chain_tip`. @@ -146,14 +152,15 @@ where // Create an mpsc channel for peer changes, // based on the maximum number of inbound and outbound peers. + // + // The connection limit does not apply to errors, + // so they need to be handled before sending to this channel. let (peerset_tx, peerset_rx) = futures::channel::mpsc::channel::(config.peerset_total_connection_limit()); - let discovered_peers = peerset_rx - // Discover interprets an error as stream termination, - // so discard any errored connections... - .filter(|result| future::ready(result.is_ok())) - .map_ok(|(address, client)| Change::Insert(address, client.into())); + let discovered_peers = peerset_rx.map(|(address, client)| { + Result::<_, Infallible>::Ok(Change::Insert(address, client.into())) + }); // Create an mpsc channel for peerset demand signaling, // based on the maximum number of outbound peers. @@ -210,6 +217,9 @@ where // because zcashd rate-limits `addr`/`addrv2` messages per connection, // and if we only have one initial peer, // we need to ensure that its `Response::Addr` is used by the crawler. + // + // TODO: this might not be needed after we added the Connection peer address cache, + // try removing it in a future release? info!( ?active_initial_peer_count, "sending initial request for peers" @@ -342,7 +352,7 @@ where let handshake_result = handshake_result.expect("unexpected panic in initial peer handshake"); match handshake_result { - Ok(ref change) => { + Ok(change) => { handshake_success_total += 1; debug!( ?handshake_success_total, @@ -350,6 +360,9 @@ where ?change, "an initial peer handshake succeeded" ); + + // The connection limit makes sure this send doesn't block + peerset_tx.send(change).await?; } Err((addr, ref e)) => { handshake_error_total += 1; @@ -384,10 +397,6 @@ where } } - peerset_tx - .send(handshake_result.map_err(|(_addr, e)| e)) - .await?; - // Security: Let other tasks run after each connection is processed. // // Avoids remote peers starving other Zebra tasks using initial connection successes or errors. @@ -617,7 +626,8 @@ where let handshake_result = handshake.await; if let Ok(client) = handshake_result { - let _ = peerset_tx.send(Ok((addr, client))).await; + // The connection limit makes sure this send doesn't block + let _ = peerset_tx.send((addr, client)).await; } else { debug!(?handshake_result, "error handshaking with inbound peer"); } @@ -660,20 +670,18 @@ where enum CrawlerAction { /// Drop the demand signal because there are too many pending handshakes. DemandDrop, - /// Initiate a handshake to `candidate` in response to demand. - DemandHandshake { candidate: MetaAddr }, - /// Crawl existing peers for more peers in response to demand, because there - /// are no available candidates. - DemandCrawl, + /// Initiate a handshake to the next candidate peer in response to demand. + /// + /// If there are no available candidates, crawl existing peers. + DemandHandshakeOrCrawl, /// Crawl existing peers for more peers in response to a timer `tick`. TimerCrawl { tick: Instant }, - /// Handle a successfully connected handshake `peer_set_change`. - HandshakeConnected { - address: PeerSocketAddr, - client: peer::Client, - }, - /// Handle a handshake failure to `failed_addr`. - HandshakeFailed { failed_addr: MetaAddr }, + /// Clear a finished handshake. + HandshakeFinished, + /// Clear a finished demand crawl (DemandHandshakeOrCrawl with no peers). + DemandCrawlFinished, + /// Clear a finished TimerCrawl. + TimerCrawlFinished, } /// Given a channel `demand_rx` that signals a need for new peers, try to find @@ -709,11 +717,11 @@ enum CrawlerAction { )] async fn crawl_and_dial( config: Config, - mut demand_tx: futures::channel::mpsc::Sender, + demand_tx: futures::channel::mpsc::Sender, mut demand_rx: futures::channel::mpsc::Receiver, - mut candidates: CandidateSet, + candidates: CandidateSet, outbound_connector: C, - mut peerset_tx: futures::channel::mpsc::Sender, + peerset_tx: futures::channel::mpsc::Sender, mut active_outbound_connections: ActiveConnectionCounter, ) -> Result<(), BoxError> where @@ -725,31 +733,30 @@ where + Send + 'static, C::Future: Send + 'static, - S: Service, + S: Service + Send + Sync + 'static, S::Future: Send + 'static, { use CrawlerAction::*; - // CORRECTNESS - // - // To avoid hangs and starvation, the crawler must: - // - spawn a separate task for each crawl and handshake, so they can make - // progress independently (and avoid deadlocking each other) - // - use the `select!` macro for all actions, because the `select` function - // is biased towards the first ready future - info!( crawl_new_peer_interval = ?config.crawl_new_peer_interval, outbound_connections = ?active_outbound_connections.update_count(), "starting the peer address crawler", ); + let address_book = candidates.address_book().await; + + // # Concurrency + // + // Allow tasks using the candidate set to be spawned, so they can run concurrently. + // Previously, Zebra has had deadlocks and long hangs caused by running dependent + // candidate set futures in the same async task. + let candidates = Arc::new(futures::lock::Mutex::new(candidates)); + + // This contains both crawl and handshake tasks. let mut handshakes = FuturesUnordered::new(); // returns None when empty. - // Keeping an unresolved future in the pool means the stream - // never terminates. - // We could use StreamExt::select_next_some and StreamExt::fuse, but `fuse` - // prevents us from adding items to the stream and checking its length. + // Keeping an unresolved future in the pool means the stream never terminates. handshakes.push(future::pending().boxed()); let mut crawl_timer = tokio::time::interval(config.crawl_new_peer_interval); @@ -759,6 +766,10 @@ where let mut crawl_timer = IntervalStream::new(crawl_timer).map(|tick| TimerCrawl { tick }); + // # Concurrency + // + // To avoid hangs and starvation, the crawler must spawn a separate task for each crawl + // and handshake, so they can make progress independently (and avoid deadlocking each other). loop { metrics::gauge!( "crawler.in_flight_handshakes", @@ -769,33 +780,45 @@ where ); let crawler_action = tokio::select! { + biased; + // Check for completed handshakes first, because the rest of the app needs them. + // Pending handshakes are limited by the connection limit. next_handshake_res = handshakes.next() => next_handshake_res.expect( "handshakes never terminates, because it contains a future that never resolves" ), - next_timer = crawl_timer.next() => next_timer.expect("timers never terminate"), - // turn the demand into an action, based on the crawler's current state - _ = demand_rx.next() => { + // The timer is rate-limited + next_timer = crawl_timer.next() => Ok(next_timer.expect("timers never terminate")), + // Turn any new demand into an action, based on the crawler's current state. + // + // # Concurrency + // + // Demand is potentially unlimited, so it must go last in a biased select!. + next_demand = demand_rx.next() => next_demand.ok_or("demand stream closed, is Zebra shutting down?".into()).map(|MorePeers|{ if active_outbound_connections.update_count() >= config.peerset_outbound_connection_limit() { // Too many open outbound connections or pending handshakes already DemandDrop - } else if let Some(candidate) = candidates.next().await { - // candidates.next has a short delay, and briefly holds the address - // book lock, so it shouldn't hang - DemandHandshake { candidate } } else { - DemandCrawl + DemandHandshakeOrCrawl } - } + }) }; match crawler_action { - DemandDrop => { + // Dummy actions + Ok(DemandDrop) => { // This is set to trace level because when the peerset is - // congested it can generate a lot of demand signal very - // rapidly. + // congested it can generate a lot of demand signal very rapidly. trace!("too many open connections or in-flight handshakes, dropping demand signal"); } - DemandHandshake { candidate } => { + + // Spawned tasks + Ok(DemandHandshakeOrCrawl) => { + let candidates = candidates.clone(); + let outbound_connector = outbound_connector.clone(); + let peerset_tx = peerset_tx.clone(); + let address_book = address_book.clone(); + let demand_tx = demand_tx.clone(); + // Increment the connection count before we spawn the connection. let outbound_connection_tracker = active_outbound_connections.track_connection(); debug!( @@ -803,74 +826,91 @@ where "opening an outbound peer connection" ); - // Spawn each handshake into an independent task, so it can make - // progress independently of the crawls. - let hs_join = tokio::spawn(dial( - candidate, - outbound_connector.clone(), - outbound_connection_tracker, - )) + // Spawn each handshake or crawl into an independent task, so handshakes can make + // progress while crawls are running. + let handshake_or_crawl_handle = tokio::spawn(async move { + // Try to get the next available peer for a handshake. + // + // candidates.next() has a short timeout, and briefly holds the address + // book lock, so it shouldn't hang. + // + // Hold the lock for as short a time as possible. + let candidate = { candidates.lock().await.next().await }; + + if let Some(candidate) = candidate { + // we don't need to spawn here, because there's nothing running concurrently + dial( + candidate, + outbound_connector, + outbound_connection_tracker, + peerset_tx, + address_book, + demand_tx, + ) + .await?; + + Ok(HandshakeFinished) + } else { + // There weren't any peers, so try to get more peers. + debug!("demand for peers but no available candidates"); + + crawl(candidates, demand_tx).await?; + + Ok(DemandCrawlFinished) + } + }) .map(move |res| match res { Ok(crawler_action) => crawler_action, Err(e) => { - panic!("panic during handshaking with {candidate:?}: {e:?} "); + panic!("panic during handshaking: {e:?}"); } }) .in_current_span(); - handshakes.push(Box::pin(hs_join)); + handshakes.push(Box::pin(handshake_or_crawl_handle)); } - DemandCrawl => { - debug!("demand for peers but no available candidates"); - // update has timeouts, and briefly holds the address book - // lock, so it shouldn't hang - // - // TODO: refactor candidates into a buffered service, so we can - // spawn independent tasks to avoid deadlocks - let more_peers = candidates.update().await?; + Ok(TimerCrawl { tick }) => { + let candidates = candidates.clone(); + let demand_tx = demand_tx.clone(); - // If we got more peers, try to connect to a new peer. - // - // # Security - // - // Update attempts are rate-limited by the candidate set. - // - // We only try peers if there was actually an update. - // So if all peers have had a recent attempt, - // and there was recent update with no peers, - // the channel will drain. - // This prevents useless update attempt loops. - if let Some(more_peers) = more_peers { - let _ = demand_tx.try_send(more_peers); - } - } - TimerCrawl { tick } => { - debug!( - ?tick, - "crawling for more peers in response to the crawl timer" - ); - // TODO: spawn independent tasks to avoid deadlocks - candidates.update().await?; - // Try to connect to a new peer. - let _ = demand_tx.try_send(MorePeers); - } - HandshakeConnected { address, client } => { - debug!(candidate.addr = ?address, "successfully dialed new peer"); - // successes are handled by an independent task, except for `candidates.update` in - // this task, which has a timeout, so they shouldn't hang - peerset_tx.send(Ok((address, client))).await?; - } - HandshakeFailed { failed_addr } => { - // The connection was never opened, or it failed the handshake and was dropped. + let crawl_handle = tokio::spawn(async move { + debug!( + ?tick, + "crawling for more peers in response to the crawl timer" + ); - debug!(?failed_addr.addr, "marking candidate as failed"); - candidates.report_failed(&failed_addr).await; - // The demand signal that was taken out of the queue - // to attempt to connect to the failed candidate never - // turned into a connection, so add it back: - // - // Security: handshake failures are rate-limited by peer attempt timeouts. - let _ = demand_tx.try_send(MorePeers); + crawl(candidates, demand_tx).await?; + + Ok(TimerCrawlFinished) + }) + .map(move |res| match res { + Ok(crawler_action) => crawler_action, + Err(e) => { + panic!("panic during TimerCrawl: {tick:?} {e:?}"); + } + }) + .in_current_span(); + + handshakes.push(Box::pin(crawl_handle)); + } + + // Completed spawned tasks + Ok(HandshakeFinished) => { + // Already logged in dial() + } + Ok(DemandCrawlFinished) => { + // This is set to trace level because when the peerset is + // congested it can generate a lot of demand signal very rapidly. + trace!("demand-based crawl finished"); + } + Ok(TimerCrawlFinished) => { + debug!("timer-based crawl finished"); + } + + // Fatal errors and shutdowns + Err(error) => { + info!(?error, "crawler task exiting due to an error"); + return Err(error); } } @@ -881,17 +921,79 @@ where } } +/// Try to get more peers using `candidates`, then queue a connection attempt using `demand_tx`. +/// If there were no new peers, the connection attempt is skipped. +#[instrument(skip(candidates, demand_tx))] +async fn crawl( + candidates: Arc>>, + mut demand_tx: futures::channel::mpsc::Sender, +) -> Result<(), BoxError> +where + S: Service + Send + Sync + 'static, + S::Future: Send + 'static, +{ + // update() has timeouts, and briefly holds the address book + // lock, so it shouldn't hang. + // Try to get new peers, holding the lock for as short a time as possible. + let result = { + let result = candidates.lock().await.update().await; + std::mem::drop(candidates); + result + }; + let more_peers = match result { + Ok(more_peers) => more_peers, + Err(e) => { + info!( + ?e, + "candidate set returned an error, is Zebra shutting down?" + ); + return Err(e); + } + }; + + // If we got more peers, try to connect to a new peer on our next loop. + // + // # Security + // + // Update attempts are rate-limited by the candidate set, + // and we only try peers if there was actually an update. + // + // So if all peers have had a recent attempt, and there was recent update + // with no peers, the channel will drain. This prevents useless update attempt + // loops. + if let Some(more_peers) = more_peers { + if let Err(send_error) = demand_tx.try_send(more_peers) { + if send_error.is_disconnected() { + // Zebra is shutting down + return Err(send_error.into()); + } + } + } + + Ok(()) +} + /// Try to connect to `candidate` using `outbound_connector`. /// Uses `outbound_connection_tracker` to track the active connection count. /// -/// Returns a `HandshakeConnected` action on success, and a -/// `HandshakeFailed` action on error. -#[instrument(skip(outbound_connector, outbound_connection_tracker))] +/// On success, sends peers to `peerset_tx`. +/// On failure, marks the peer as failed in the address book, +/// then re-adds demand to `demand_tx`. +#[instrument(skip( + outbound_connector, + outbound_connection_tracker, + peerset_tx, + address_book, + demand_tx +))] async fn dial( candidate: MetaAddr, mut outbound_connector: C, outbound_connection_tracker: ConnectionTracker, -) -> CrawlerAction + mut peerset_tx: futures::channel::mpsc::Sender, + address_book: Arc>, + mut demand_tx: futures::channel::mpsc::Sender, +) -> Result<(), BoxError> where C: Service< OutboundConnectorRequest, @@ -902,7 +1004,7 @@ where + 'static, C::Future: Send + 'static, { - // CORRECTNESS + // # Correctness // // To avoid hangs, the dialer must only await: // - functions that return immediately, or @@ -911,10 +1013,7 @@ where debug!(?candidate.addr, "attempting outbound connection in response to demand"); // the connector is always ready, so this can't hang - let outbound_connector = outbound_connector - .ready() - .await - .expect("outbound connector never errors"); + let outbound_connector = outbound_connector.ready().await?; let req = OutboundConnectorRequest { addr: candidate.addr, @@ -922,24 +1021,51 @@ where }; // the handshake has timeouts, so it shouldn't hang - outbound_connector - .call(req) - .map_err(|e| (candidate, e)) - .map(Into::into) - .await -} + let handshake_result = outbound_connector.call(req).map(Into::into).await; -impl From> for CrawlerAction { - fn from(dial_result: Result<(PeerSocketAddr, peer::Client), (MetaAddr, BoxError)>) -> Self { - use CrawlerAction::*; - match dial_result { - Ok((address, client)) => HandshakeConnected { address, client }, - Err((candidate, e)) => { - debug!(?candidate.addr, ?e, "failed to connect to candidate"); - HandshakeFailed { - failed_addr: candidate, + match handshake_result { + Ok((address, client)) => { + debug!(?candidate.addr, "successfully dialed new peer"); + + // The connection limit makes sure this send doesn't block. + peerset_tx.send((address, client)).await?; + } + // The connection was never opened, or it failed the handshake and was dropped. + Err(error) => { + debug!(?error, ?candidate.addr, "failed to make outbound connection to peer"); + report_failed(address_book.clone(), candidate).await; + + // The demand signal that was taken out of the queue to attempt to connect to the + // failed candidate never turned into a connection, so add it back. + // + // # Security + // + // Handshake failures are rate-limited by peer attempt timeouts. + if let Err(send_error) = demand_tx.try_send(MorePeers) { + if send_error.is_disconnected() { + // Zebra is shutting down + return Err(send_error.into()); } } } } + + Ok(()) +} + +/// Mark `addr` as a failed peer in `address_book`. +#[instrument(skip(address_book))] +async fn report_failed(address_book: Arc>, addr: MetaAddr) { + let addr = MetaAddr::new_errored(addr.addr, addr.services); + + // # Correctness + // + // Spawn address book accesses on a blocking thread, + // to avoid deadlocks (see #1976). + let span = Span::current(); + tokio::task::spawn_blocking(move || { + span.in_scope(|| address_book.lock().unwrap().update(addr)) + }) + .await + .expect("panic in peer failure address book update task"); } diff --git a/zebra-network/src/peer_set/initialize/tests/vectors.rs b/zebra-network/src/peer_set/initialize/tests/vectors.rs index d9fdf9a1..f949506c 100644 --- a/zebra-network/src/peer_set/initialize/tests/vectors.rs +++ b/zebra-network/src/peer_set/initialize/tests/vectors.rs @@ -459,15 +459,7 @@ async fn crawler_peer_limit_one_connect_ok_then_drop() { let peer_result = peerset_rx.try_next(); match peer_result { // A peer handshake succeeded. - Ok(Some(peer_result)) => { - assert!( - matches!(peer_result, Ok((_, _))), - "unexpected connection error: {peer_result:?}\n\ - {peer_count} previous peers succeeded", - ); - peer_count += 1; - } - + Ok(Some(_peer_change)) => peer_count += 1, // The channel is closed and there are no messages left in the channel. Ok(None) => break, // The channel is still open, but there are no messages left in the channel. @@ -521,15 +513,7 @@ async fn crawler_peer_limit_one_connect_ok_stay_open() { let peer_change_result = peerset_rx.try_next(); match peer_change_result { // A peer handshake succeeded. - Ok(Some(peer_change_result)) => { - assert!( - matches!(peer_change_result, Ok((_, _))), - "unexpected connection error: {peer_change_result:?}\n\ - {peer_change_count} previous peers succeeded", - ); - peer_change_count += 1; - } - + Ok(Some(_peer_change)) => peer_change_count += 1, // The channel is closed and there are no messages left in the channel. Ok(None) => break, // The channel is still open, but there are no messages left in the channel. @@ -631,15 +615,7 @@ async fn crawler_peer_limit_default_connect_ok_then_drop() { let peer_result = peerset_rx.try_next(); match peer_result { // A peer handshake succeeded. - Ok(Some(peer_result)) => { - assert!( - matches!(peer_result, Ok((_, _))), - "unexpected connection error: {peer_result:?}\n\ - {peer_count} previous peers succeeded", - ); - peer_count += 1; - } - + Ok(Some(_peer_change)) => peer_count += 1, // The channel is closed and there are no messages left in the channel. Ok(None) => break, // The channel is still open, but there are no messages left in the channel. @@ -694,15 +670,7 @@ async fn crawler_peer_limit_default_connect_ok_stay_open() { let peer_change_result = peerset_rx.try_next(); match peer_change_result { // A peer handshake succeeded. - Ok(Some(peer_change_result)) => { - assert!( - matches!(peer_change_result, Ok((_, _))), - "unexpected connection error: {peer_change_result:?}\n\ - {peer_change_count} previous peers succeeded", - ); - peer_change_count += 1; - } - + Ok(Some(_peer_change)) => peer_change_count += 1, // The channel is closed and there are no messages left in the channel. Ok(None) => break, // The channel is still open, but there are no messages left in the channel. @@ -834,15 +802,7 @@ async fn listener_peer_limit_one_handshake_ok_then_drop() { let peer_result = peerset_rx.try_next(); match peer_result { // A peer handshake succeeded. - Ok(Some(peer_result)) => { - assert!( - matches!(peer_result, Ok((_, _))), - "unexpected connection error: {peer_result:?}\n\ - {peer_count} previous peers succeeded", - ); - peer_count += 1; - } - + Ok(Some(_peer_change)) => peer_count += 1, // The channel is closed and there are no messages left in the channel. Ok(None) => break, // The channel is still open, but there are no messages left in the channel. @@ -900,15 +860,7 @@ async fn listener_peer_limit_one_handshake_ok_stay_open() { let peer_change_result = peerset_rx.try_next(); match peer_change_result { // A peer handshake succeeded. - Ok(Some(peer_change_result)) => { - assert!( - matches!(peer_change_result, Ok((_, _))), - "unexpected connection error: {peer_change_result:?}\n\ - {peer_change_count} previous peers succeeded", - ); - peer_change_count += 1; - } - + Ok(Some(_peer_change)) => peer_change_count += 1, // The channel is closed and there are no messages left in the channel. Ok(None) => break, // The channel is still open, but there are no messages left in the channel. @@ -1019,15 +971,7 @@ async fn listener_peer_limit_default_handshake_ok_then_drop() { let peer_result = peerset_rx.try_next(); match peer_result { // A peer handshake succeeded. - Ok(Some(peer_result)) => { - assert!( - matches!(peer_result, Ok((_, _))), - "unexpected connection error: {peer_result:?}\n\ - {peer_count} previous peers succeeded", - ); - peer_count += 1; - } - + Ok(Some(_peer_change)) => peer_count += 1, // The channel is closed and there are no messages left in the channel. Ok(None) => break, // The channel is still open, but there are no messages left in the channel. @@ -1085,15 +1029,7 @@ async fn listener_peer_limit_default_handshake_ok_stay_open() { let peer_change_result = peerset_rx.try_next(); match peer_change_result { // A peer handshake succeeded. - Ok(Some(peer_change_result)) => { - assert!( - matches!(peer_change_result, Ok((_, _))), - "unexpected connection error: {peer_change_result:?}\n\ - {peer_change_count} previous peers succeeded", - ); - peer_change_count += 1; - } - + Ok(Some(_peer_change)) => peer_change_count += 1, // The channel is closed and there are no messages left in the channel. Ok(None) => break, // The channel is still open, but there are no messages left in the channel. @@ -1158,7 +1094,8 @@ async fn add_initial_peers_is_rate_limited() { let elapsed = Instant::now() - before; - assert_eq!(connections.len(), PEER_COUNT); + // Errors are ignored, so we don't expect any peers here + assert_eq!(connections.len(), 0); // Make sure the rate limiting worked by checking if it took long enough assert!( elapsed