Track the number of active inbound and outbound peer connections (#2912)

* Count the number of active inbound and outbound peer connections

And reduce the count when each connection fails.

* Fix a comment typo

Co-authored-by: Alfredo Garcia <oxarbitrage@gmail.com>

Co-authored-by: Alfredo Garcia <oxarbitrage@gmail.com>
This commit is contained in:
teor 2021-10-22 07:36:42 +10:00 committed by GitHub
parent 86d05c5e90
commit 4cdd12e2c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 367 additions and 93 deletions

View File

@ -16,7 +16,8 @@ use tower::{
use zebra_chain::chain_tip::NoChainTip; use zebra_chain::chain_tip::NoChainTip;
use crate::{ use crate::{
peer::{self, ConnectedAddr}, peer::{self, ConnectedAddr, HandshakeRequest},
peer_set::ActiveConnectionCounter,
BoxError, Config, Request, Response, BoxError, Config, Request, Response,
}; };
@ -62,11 +63,19 @@ pub fn connect_isolated(
.finish() .finish()
.expect("provided mandatory builder parameters"); .expect("provided mandatory builder parameters");
// Don't send any metadata about the connection // Don't send or track any metadata about the connection
let connected_addr = ConnectedAddr::new_isolated(); let connected_addr = ConnectedAddr::new_isolated();
let connection_tracker = ActiveConnectionCounter::new_counter().track_connection();
Oneshot::new(handshake, (conn, connected_addr)) Oneshot::new(
.map_ok(|client| BoxService::new(Wrapper(client))) handshake,
HandshakeRequest {
tcp_stream: conn,
connected_addr,
connection_tracker,
},
)
.map_ok(|client| BoxService::new(Wrapper(client)))
} }
// This can be deleted when a new version of Tower with map_err is released. // This can be deleted when a new version of Tower with map_err is released.

View File

@ -11,14 +11,11 @@ mod error;
/// Performs peer handshakes. /// Performs peer handshakes.
mod handshake; mod handshake;
use client::ClientRequest; use client::{ClientRequest, ClientRequestReceiver, InProgressClientRequest, MustUseOneshotSender};
use client::ClientRequestReceiver;
use client::InProgressClientRequest;
use client::MustUseOneshotSender;
use error::ErrorSlot; use error::ErrorSlot;
pub use client::Client; pub use client::Client;
pub use connection::Connection; pub use connection::Connection;
pub use connector::Connector; pub use connector::{Connector, OutboundConnectorRequest};
pub use error::{HandshakeError, PeerError, SharedPeerError}; pub use error::{HandshakeError, PeerError, SharedPeerError};
pub use handshake::{ConnectedAddr, Handshake, HandshakeRequest}; pub use handshake::{ConnectedAddr, Handshake, HandshakeRequest};

View File

@ -26,6 +26,11 @@ use zebra_chain::{
use crate::{ use crate::{
constants, constants,
peer::{
ClientRequestReceiver, ErrorSlot, InProgressClientRequest, MustUseOneshotSender, PeerError,
SharedPeerError,
},
peer_set::ConnectionTracker,
protocol::{ protocol::{
external::{types::Nonce, InventoryHash, Message}, external::{types::Nonce, InventoryHash, Message},
internal::{Request, Response}, internal::{Request, Response},
@ -33,11 +38,6 @@ use crate::{
BoxError, BoxError,
}; };
use super::{
ClientRequestReceiver, ErrorSlot, InProgressClientRequest, MustUseOneshotSender, PeerError,
SharedPeerError,
};
#[derive(Debug)] #[derive(Debug)]
pub(super) enum Handler { pub(super) enum Handler {
/// Indicates that the handler has finished processing the request. /// Indicates that the handler has finished processing the request.
@ -314,19 +314,41 @@ pub(super) enum State {
/// The state associated with a peer connection. /// The state associated with a peer connection.
pub struct Connection<S, Tx> { pub struct Connection<S, Tx> {
/// The state of this connection's current request or response.
pub(super) state: State, pub(super) state: State,
/// A timeout for a client request. This is stored separately from /// A timeout for a client request. This is stored separately from
/// State so that we can move the future out of it independently of /// State so that we can move the future out of it independently of
/// other state handling. /// other state handling.
pub(super) request_timer: Option<Sleep>, pub(super) request_timer: Option<Sleep>,
/// The `inbound` service, used to answer requests from this connection's peer.
pub(super) svc: S, pub(super) svc: S,
/// A `mpsc::Receiver<ClientRequest>` that converts its results to
/// `InProgressClientRequest` /// A channel that receives network requests from the rest of Zebra.
///
/// This channel produces `InProgressClientRequest`s.
pub(super) client_rx: ClientRequestReceiver, pub(super) client_rx: ClientRequestReceiver,
/// A slot for an error shared between the Connection and the Client that uses it. /// A slot for an error shared between the Connection and the Client that uses it.
pub(super) error_slot: ErrorSlot, pub(super) error_slot: ErrorSlot,
//pub(super) peer_rx: Rx,
/// A channel for sending requests to the connected peer.
pub(super) peer_tx: Tx, pub(super) peer_tx: Tx,
/// A connection tracker that reduces the open connection count when dropped.
/// Used to limit the number of open connections in Zebra.
///
/// This field does nothing until it is dropped.
///
/// # Security
///
/// If this connection tracker or `Connection`s are leaked,
/// the number of active connections will appear higher than it actually is.
///
/// Eventually, Zebra could stop making connections entirely.
#[allow(dead_code)]
pub(super) connection_tracker: ConnectionTracker,
} }
impl<S, Tx> Connection<S, Tx> impl<S, Tx> Connection<S, Tx>

View File

@ -12,9 +12,11 @@ use tracing_futures::Instrument;
use zebra_chain::chain_tip::{ChainTip, NoChainTip}; use zebra_chain::chain_tip::{ChainTip, NoChainTip};
use crate::{BoxError, Request, Response}; use crate::{
peer::{Client, ConnectedAddr, Handshake, HandshakeRequest},
use super::{Client, ConnectedAddr, Handshake}; peer_set::ConnectionTracker,
BoxError, Request, Response,
};
/// A wrapper around [`peer::Handshake`] that opens a TCP connection before /// A wrapper around [`peer::Handshake`] that opens a TCP connection before
/// forwarding to the inner handshake service. Writing this as its own /// forwarding to the inner handshake service. Writing this as its own
@ -37,7 +39,19 @@ impl<S, C> Connector<S, C> {
} }
} }
impl<S, C> Service<SocketAddr> for Connector<S, C> /// A connector request.
/// Contains the information needed to make an outbound connection to the peer.
pub struct OutboundConnectorRequest {
/// The Zcash listener address of the peer.
pub addr: SocketAddr,
/// A connection tracker that reduces the open connection count when dropped.
///
/// Used to limit the number of open connections in Zebra.
pub connection_tracker: ConnectionTracker,
}
impl<S, C> Service<OutboundConnectorRequest> for Connector<S, C>
where where
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static, S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
S::Future: Send, S::Future: Send,
@ -52,14 +66,26 @@ where
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
} }
fn call(&mut self, addr: SocketAddr) -> Self::Future { fn call(&mut self, req: OutboundConnectorRequest) -> Self::Future {
let OutboundConnectorRequest {
addr,
connection_tracker,
}: OutboundConnectorRequest = req;
let mut hs = self.handshaker.clone(); let mut hs = self.handshaker.clone();
let connected_addr = ConnectedAddr::new_outbound_direct(addr); let connected_addr = ConnectedAddr::new_outbound_direct(addr);
let connector_span = info_span!("connector", peer = ?connected_addr); let connector_span = info_span!("connector", peer = ?connected_addr);
async move { async move {
let stream = TcpStream::connect(addr).await?; let stream = TcpStream::connect(addr).await?;
hs.ready_and().await?; hs.ready_and().await?;
let client = hs.call((stream, connected_addr)).await?; let client = hs
.call(HandshakeRequest {
tcp_stream: stream,
connected_addr,
connection_tracker,
})
.await?;
Ok(Change::Insert(addr, client)) Ok(Change::Insert(addr, client))
} }
.instrument(connector_span) .instrument(connector_span)

View File

@ -29,6 +29,8 @@ use zebra_chain::{
use crate::{ use crate::{
constants, constants,
meta_addr::MetaAddrChange, meta_addr::MetaAddrChange,
peer::{Client, ClientRequest, Connection, ErrorSlot, HandshakeError, PeerError},
peer_set::ConnectionTracker,
protocol::{ protocol::{
external::{types::*, Codec, InventoryHash, Message}, external::{types::*, Codec, InventoryHash, Message},
internal::{Request, Response}, internal::{Request, Response},
@ -37,8 +39,6 @@ use crate::{
BoxError, Config, BoxError, Config,
}; };
use super::{Client, ClientRequest, Connection, ErrorSlot, HandshakeError, PeerError};
/// A [`Service`] that handshakes with a remote peer and constructs a /// A [`Service`] that handshakes with a remote peer and constructs a
/// client/server pair. /// client/server pair.
/// ///
@ -658,7 +658,20 @@ pub async fn negotiate_version(
Ok((remote_version, remote_services, remote_canonical_addr)) Ok((remote_version, remote_services, remote_canonical_addr))
} }
pub type HandshakeRequest = (TcpStream, ConnectedAddr); /// A handshake request.
/// Contains the information needed to handshake with the peer.
pub struct HandshakeRequest {
/// The TCP connection to the peer.
pub tcp_stream: TcpStream,
/// The address of the peer, and other related information.
pub connected_addr: ConnectedAddr,
/// A connection tracker that reduces the open connection count when dropped.
///
/// Used to limit the number of open connections in Zebra.
pub connection_tracker: ConnectionTracker,
}
impl<S, C> Service<HandshakeRequest> for Handshake<S, C> impl<S, C> Service<HandshakeRequest> for Handshake<S, C>
where where
@ -676,7 +689,11 @@ where
} }
fn call(&mut self, req: HandshakeRequest) -> Self::Future { fn call(&mut self, req: HandshakeRequest) -> Self::Future {
let (tcp_stream, connected_addr) = req; let HandshakeRequest {
tcp_stream,
connected_addr,
connection_tracker,
} = req;
let negotiator_span = debug_span!("negotiator", peer = ?connected_addr); let negotiator_span = debug_span!("negotiator", peer = ?connected_addr);
// set the peer connection span's parent to the global span, as it // set the peer connection span's parent to the global span, as it
@ -892,11 +909,12 @@ where
use super::connection; use super::connection;
let server = Connection { let server = Connection {
state: connection::State::AwaitingRequest, state: connection::State::AwaitingRequest,
request_timer: None,
svc: inbound_service, svc: inbound_service,
client_rx: server_rx.into(), client_rx: server_rx.into(),
error_slot: slot, error_slot: slot,
peer_tx, peer_tx,
request_timer: None, connection_tracker,
}; };
tokio::spawn( tokio::spawn(

View File

@ -1,10 +1,13 @@
pub(crate) mod candidate_set; pub(crate) mod candidate_set;
mod initialize; mod initialize;
mod inventory_registry; mod inventory_registry;
mod limit;
mod set; mod set;
mod unready_service; mod unready_service;
pub(crate) use candidate_set::CandidateSet; pub(crate) use candidate_set::CandidateSet;
pub(crate) use limit::{ActiveConnectionCounter, ConnectionTracker};
use inventory_registry::InventoryRegistry; use inventory_registry::InventoryRegistry;
use set::PeerSet; use set::PeerSet;

View File

@ -20,20 +20,23 @@ use tower::{
use tracing::Span; use tracing::Span;
use tracing_futures::Instrument; use tracing_futures::Instrument;
use crate::{
constants, meta_addr::MetaAddr, peer, timestamp_collector::TimestampCollector, AddressBook,
BoxError, Config, Request, Response,
};
use zebra_chain::{chain_tip::ChainTip, parameters::Network}; use zebra_chain::{chain_tip::ChainTip, parameters::Network};
use super::{CandidateSet, PeerSet}; use crate::{
constants,
use peer::Client; meta_addr::MetaAddr,
peer::{self, HandshakeRequest, OutboundConnectorRequest},
peer_set::{ActiveConnectionCounter, CandidateSet, ConnectionTracker, PeerSet},
timestamp_collector::TimestampCollector,
AddressBook, BoxError, Config, Request, Response,
};
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
/// The result of an outbound peer connection attempt or inbound connection handshake.
///
/// This result comes from the [`Handshaker`].
type PeerChange = Result<Change<SocketAddr, peer::Client>, BoxError>; type PeerChange = Result<Change<SocketAddr, peer::Client>, BoxError>;
/// Initialize a peer set, using a network `config`, `inbound_service`, /// Initialize a peer set, using a network `config`, `inbound_service`,
@ -125,6 +128,8 @@ where
); );
let peer_set = Buffer::new(BoxService::new(peer_set), constants::PEERSET_BUFFER_SIZE); let peer_set = Buffer::new(BoxService::new(peer_set), constants::PEERSET_BUFFER_SIZE);
// Connect peerset_tx to the 3 peer sources:
//
// 1. Incoming peer connections, via a listener. // 1. Incoming peer connections, via a listener.
let listen_guard = tokio::spawn( let listen_guard = tokio::spawn(
accept_inbound_connections(tcp_listener, listen_handshaker, peerset_tx.clone()) accept_inbound_connections(tcp_listener, listen_handshaker, peerset_tx.clone())
@ -132,33 +137,38 @@ where
); );
// 2. Initial peers, specified in the config. // 2. Initial peers, specified in the config.
let (initial_peer_count_tx, initial_peer_count_rx) = tokio::sync::oneshot::channel();
let initial_peers_fut = { let initial_peers_fut = {
let config = config.clone(); let config = config.clone();
let outbound_connector = outbound_connector.clone(); let outbound_connector = outbound_connector.clone();
let peerset_tx = peerset_tx.clone(); let peerset_tx = peerset_tx.clone();
async move { async move {
let initial_peers = config.initial_peers().await; let initial_peers = config.initial_peers().await;
let _ = initial_peer_count_tx.send(initial_peers.len());
// Connect the tx end to the 3 peer sources:
add_initial_peers(initial_peers, outbound_connector, peerset_tx).await add_initial_peers(initial_peers, outbound_connector, peerset_tx).await
} }
.boxed() .boxed()
}; };
let add_guard = tokio::spawn(initial_peers_fut.instrument(Span::current())); let initial_peers_join = tokio::spawn(initial_peers_fut.instrument(Span::current()));
// 3. Outgoing peers we connect to in response to load. // 3. Outgoing peers we connect to in response to load.
let mut candidates = CandidateSet::new(address_book.clone(), peer_set.clone()); let mut candidates = CandidateSet::new(address_book.clone(), peer_set.clone());
// Wait for the initial seed peer count
let mut active_outbound_connections = initial_peers_join
.await
.expect("unexpected panic in spawned initial peers task")
.expect("unexpected error connecting to initial peers");
let active_initial_peer_count = active_outbound_connections.update_count();
// We need to await candidates.update() here, because zcashd only sends one // We need to await candidates.update() here, because zcashd only sends one
// `addr` message per connection, and if we only have one initial peer we // `addr` message per connection, and if we only have one initial peer we
// need to ensure that its `addr` message is used by the crawler. // need to ensure that its `addr` message is used by the crawler.
info!("Sending initial request for peers"); info!(
let _ = candidates ?active_initial_peer_count,
.update_initial(initial_peer_count_rx.await.expect("value sent before drop")) "sending initial request for peers"
.await; );
let _ = candidates.update_initial(active_initial_peer_count).await;
for _ in 0..config.peerset_initial_target_size { for _ in 0..config.peerset_initial_target_size {
let _ = demand_tx.try_send(()); let _ = demand_tx.try_send(());
@ -172,33 +182,38 @@ where
candidates, candidates,
outbound_connector, outbound_connector,
peerset_tx, peerset_tx,
active_outbound_connections,
) )
.instrument(Span::current()), .instrument(Span::current()),
); );
handle_tx handle_tx.send(vec![listen_guard, crawl_guard]).unwrap();
.send(vec![add_guard, listen_guard, crawl_guard])
.unwrap();
(peer_set, address_book) (peer_set, address_book)
} }
/// Use the provided `handshaker` to connect to `initial_peers`, then send /// Use the provided `handshaker` to connect to `initial_peers`, then send
/// the results over `tx`. /// the results over `peerset_tx`.
#[instrument(skip(initial_peers, outbound_connector, tx))] #[instrument(skip(initial_peers, outbound_connector, peerset_tx))]
async fn add_initial_peers<S>( async fn add_initial_peers<S>(
initial_peers: std::collections::HashSet<SocketAddr>, initial_peers: std::collections::HashSet<SocketAddr>,
outbound_connector: S, outbound_connector: S,
mut tx: mpsc::Sender<PeerChange>, mut peerset_tx: mpsc::Sender<PeerChange>,
) -> Result<(), BoxError> ) -> Result<ActiveConnectionCounter, BoxError>
where where
S: Service<SocketAddr, Response = Change<SocketAddr, peer::Client>, Error = BoxError> + Clone, S: Service<
OutboundConnectorRequest,
Response = Change<SocketAddr, peer::Client>,
Error = BoxError,
> + Clone,
S::Future: Send + 'static, S::Future: Send + 'static,
{ {
let initial_peer_count = initial_peers.len(); let initial_peer_count = initial_peers.len();
let mut handshake_success_total: usize = 0; let mut handshake_success_total: usize = 0;
let mut handshake_error_total: usize = 0; let mut handshake_error_total: usize = 0;
let mut active_outbound_connections = ActiveConnectionCounter::new_counter();
info!( info!(
?initial_peer_count, ?initial_peer_count,
?initial_peers, ?initial_peers,
@ -218,9 +233,15 @@ where
let mut handshakes: FuturesUnordered<_> = initial_peers let mut handshakes: FuturesUnordered<_> = initial_peers
.into_iter() .into_iter()
.map(|addr| { .map(|addr| {
let connection_tracker = active_outbound_connections.track_connection();
let req = OutboundConnectorRequest {
addr,
connection_tracker,
};
outbound_connector outbound_connector
.clone() .clone()
.oneshot(addr) .oneshot(req)
.map_err(move |e| (addr, e)) .map_err(move |e| (addr, e))
}) })
.collect(); .collect();
@ -237,8 +258,8 @@ where
); );
} }
Err((addr, ref e)) => { Err((addr, ref e)) => {
// this is verbose, but it's better than just hanging with no output when there are errors
handshake_error_total += 1; handshake_error_total += 1;
// this is verbose, but it's better than just hanging with no output when there are errors
info!( info!(
?handshake_success_total, ?handshake_success_total,
?handshake_error_total, ?handshake_error_total,
@ -249,16 +270,20 @@ where
} }
} }
tx.send(handshake_result.map_err(|(_addr, e)| e)).await?; peerset_tx
.send(handshake_result.map_err(|(_addr, e)| e))
.await?;
} }
let outbound_connections = active_outbound_connections.update_count();
info!( info!(
?handshake_success_total, ?handshake_success_total,
?handshake_error_total, ?handshake_error_total,
?outbound_connections,
"finished connecting to initial seed peers" "finished connecting to initial seed peers"
); );
Ok(()) Ok(active_outbound_connections)
} }
/// Open a peer connection listener on `config.listen_addr`, /// Open a peer connection listener on `config.listen_addr`,
@ -317,19 +342,28 @@ async fn open_listener(config: &Config) -> (TcpListener, SocketAddr) {
/// Zcash peer. /// Zcash peer.
/// ///
/// Uses `handshaker` to perform a Zcash network protocol handshake, and sends /// Uses `handshaker` to perform a Zcash network protocol handshake, and sends
/// the [`Client`][peer::Client] result over `tx`. /// the [`peer::Client`] result over `peerset_tx`.
#[instrument(skip(listener, handshaker, tx), fields(listener_addr = ?listener.local_addr()))] #[instrument(skip(listener, handshaker, peerset_tx), fields(listener_addr = ?listener.local_addr()))]
async fn accept_inbound_connections<S>( async fn accept_inbound_connections<S>(
listener: TcpListener, listener: TcpListener,
mut handshaker: S, mut handshaker: S,
tx: mpsc::Sender<PeerChange>, peerset_tx: mpsc::Sender<PeerChange>,
) -> Result<(), BoxError> ) -> Result<(), BoxError>
where where
S: Service<peer::HandshakeRequest, Response = peer::Client, Error = BoxError> + Clone, S: Service<peer::HandshakeRequest, Response = peer::Client, Error = BoxError> + Clone,
S::Future: Send + 'static, S::Future: Send + 'static,
{ {
let mut active_inbound_connections = ActiveConnectionCounter::new_counter();
loop { loop {
if let Ok((tcp_stream, addr)) = listener.accept().await { if let Ok((tcp_stream, addr)) = listener.accept().await {
// The peer already opened a connection, so increment the connection count immediately.
let connection_tracker = active_inbound_connections.track_connection();
info!(
inbound_connections = ?active_inbound_connections.update_count(),
"handshaking on an open inbound peer connection"
);
let connected_addr = peer::ConnectedAddr::new_inbound_direct(addr); let connected_addr = peer::ConnectedAddr::new_inbound_direct(addr);
let accept_span = info_span!("listen_accept", peer = ?connected_addr); let accept_span = info_span!("listen_accept", peer = ?connected_addr);
let _guard = accept_span.enter(); let _guard = accept_span.enter();
@ -338,18 +372,25 @@ where
handshaker.ready_and().await?; handshaker.ready_and().await?;
// TODO: distinguish between proxied listeners and direct listeners // TODO: distinguish between proxied listeners and direct listeners
let handshaker_span = info_span!("listen_handshaker", peer = ?connected_addr); let handshaker_span = info_span!("listen_handshaker", peer = ?connected_addr);
// Construct a handshake future but do not drive it yet.... // Construct a handshake future but do not drive it yet....
let handshake = handshaker.call((tcp_stream, connected_addr)); let handshake = handshaker.call(HandshakeRequest {
tcp_stream,
connected_addr,
connection_tracker,
});
// ... instead, spawn a new task to handle this connection // ... instead, spawn a new task to handle this connection
let mut tx2 = tx.clone(); {
tokio::spawn( let mut peerset_tx = peerset_tx.clone();
async move { tokio::spawn(
if let Ok(client) = handshake.await { async move {
let _ = tx2.send(Ok(Change::Insert(addr, client))).await; if let Ok(client) = handshake.await {
let _ = peerset_tx.send(Ok(Change::Insert(addr, client))).await;
}
} }
} .instrument(handshaker_span),
.instrument(handshaker_span), );
); }
} }
} }
} }
@ -368,7 +409,7 @@ enum CrawlerAction {
TimerCrawl { tick: Instant }, TimerCrawl { tick: Instant },
/// Handle a successfully connected handshake `peer_set_change`. /// Handle a successfully connected handshake `peer_set_change`.
HandshakeConnected { HandshakeConnected {
peer_set_change: Change<SocketAddr, Client>, peer_set_change: Change<SocketAddr, peer::Client>,
}, },
/// Handle a handshake failure to `failed_addr`. /// Handle a handshake failure to `failed_addr`.
HandshakeFailed { failed_addr: MetaAddr }, HandshakeFailed { failed_addr: MetaAddr },
@ -376,7 +417,7 @@ enum CrawlerAction {
/// Given a channel `demand_rx` that signals a need for new peers, try to find /// Given a channel `demand_rx` that signals a need for new peers, try to find
/// and connect to new peers, and send the resulting `peer::Client`s through the /// and connect to new peers, and send the resulting `peer::Client`s through the
/// `success_tx` channel. /// `peerset_tx` channel.
/// ///
/// Crawl for new peers every `crawl_new_peer_interval`, and whenever there is /// Crawl for new peers every `crawl_new_peer_interval`, and whenever there is
/// demand, but no new peers in `candidates`. After crawling, try to connect to /// demand, but no new peers in `candidates`. After crawling, try to connect to
@ -385,21 +426,28 @@ enum CrawlerAction {
/// If a handshake fails, restore the unused demand signal by sending it to /// If a handshake fails, restore the unused demand signal by sending it to
/// `demand_tx`. /// `demand_tx`.
/// ///
/// The crawler terminates when `candidates.update()` or `success_tx` returns a /// The crawler terminates when `candidates.update()` or `peerset_tx` returns a
/// permanent internal error. Transient errors and individual peer errors should /// permanent internal error. Transient errors and individual peer errors should
/// be handled within the crawler. /// be handled within the crawler.
#[instrument(skip(demand_tx, demand_rx, candidates, outbound_connector, success_tx))] ///
/// Uses `active_outbound_connections` to track active outbound connections
/// in both the initial peers and crawler.
#[instrument(skip(demand_tx, demand_rx, candidates, outbound_connector, peerset_tx,))]
async fn crawl_and_dial<C, S>( async fn crawl_and_dial<C, S>(
crawl_new_peer_interval: std::time::Duration, crawl_new_peer_interval: std::time::Duration,
mut demand_tx: mpsc::Sender<()>, mut demand_tx: mpsc::Sender<()>,
mut demand_rx: mpsc::Receiver<()>, mut demand_rx: mpsc::Receiver<()>,
mut candidates: CandidateSet<S>, mut candidates: CandidateSet<S>,
outbound_connector: C, outbound_connector: C,
mut success_tx: mpsc::Sender<PeerChange>, mut peerset_tx: mpsc::Sender<PeerChange>,
mut active_outbound_connections: ActiveConnectionCounter,
) -> Result<(), BoxError> ) -> Result<(), BoxError>
where where
C: Service<SocketAddr, Response = Change<SocketAddr, peer::Client>, Error = BoxError> C: Service<
+ Clone OutboundConnectorRequest,
Response = Change<SocketAddr, peer::Client>,
Error = BoxError,
> + Clone
+ Send + Send
+ 'static, + 'static,
C::Future: Send + 'static, C::Future: Send + 'static,
@ -465,16 +513,27 @@ where
continue; continue;
} }
DemandHandshake { candidate } => { DemandHandshake { candidate } => {
// spawn each handshake into an independent task, so it can make // Increment the connection count before we spawn the connection.
// progress independently of the crawls let outbound_connection_tracker = active_outbound_connections.track_connection();
let hs_join = tokio::spawn(dial(candidate, outbound_connector.clone())) info!(
.map(move |res| match res { outbound_connections = ?active_outbound_connections.update_count(),
Ok(crawler_action) => crawler_action, "opening an outbound peer connection"
Err(e) => { );
panic!("panic during handshaking with {:?}: {:?} ", candidate, e);
} // Spawn each handshake into an independent task, so it can make
}) // progress independently of the crawls.
.instrument(Span::current()); let hs_join = tokio::spawn(dial(
candidate,
outbound_connector.clone(),
outbound_connection_tracker,
))
.map(move |res| match res {
Ok(crawler_action) => crawler_action,
Err(e) => {
panic!("panic during handshaking with {:?}: {:?} ", candidate, e);
}
})
.instrument(Span::current());
handshakes.push(Box::pin(hs_join)); handshakes.push(Box::pin(hs_join));
} }
DemandCrawl => { DemandCrawl => {
@ -506,9 +565,11 @@ where
} }
// successes are handled by an independent task, so they // successes are handled by an independent task, so they
// shouldn't hang // shouldn't hang
success_tx.send(Ok(peer_set_change)).await?; peerset_tx.send(Ok(peer_set_change)).await?;
} }
HandshakeFailed { failed_addr } => { HandshakeFailed { failed_addr } => {
// The connection was never opened, or it failed the handshake and was dropped.
debug!(?failed_addr.addr, "marking candidate as failed"); debug!(?failed_addr.addr, "marking candidate as failed");
candidates.report_failed(&failed_addr); candidates.report_failed(&failed_addr);
// The demand signal that was taken out of the queue // The demand signal that was taken out of the queue
@ -521,14 +582,22 @@ where
} }
/// Try to connect to `candidate` using `outbound_connector`. /// Try to connect to `candidate` using `outbound_connector`.
/// Uses `outbound_connection_tracker` to track the active connection count.
/// ///
/// Returns a `HandshakeConnected` action on success, and a /// Returns a `HandshakeConnected` action on success, and a
/// `HandshakeFailed` action on error. /// `HandshakeFailed` action on error.
#[instrument(skip(outbound_connector,))] #[instrument(skip(outbound_connector, outbound_connection_tracker))]
async fn dial<C>(candidate: MetaAddr, mut outbound_connector: C) -> CrawlerAction async fn dial<C>(
candidate: MetaAddr,
mut outbound_connector: C,
outbound_connection_tracker: ConnectionTracker,
) -> CrawlerAction
where where
C: Service<SocketAddr, Response = Change<SocketAddr, peer::Client>, Error = BoxError> C: Service<
+ Clone OutboundConnectorRequest,
Response = Change<SocketAddr, peer::Client>,
Error = BoxError,
> + Clone
+ Send + Send
+ 'static, + 'static,
C::Future: Send + 'static, C::Future: Send + 'static,
@ -547,16 +616,21 @@ where
.await .await
.expect("outbound connector never errors"); .expect("outbound connector never errors");
let req = OutboundConnectorRequest {
addr: candidate.addr,
connection_tracker: outbound_connection_tracker,
};
// the handshake has timeouts, so it shouldn't hang // the handshake has timeouts, so it shouldn't hang
outbound_connector outbound_connector
.call(candidate.addr) .call(req)
.map_err(|e| (candidate, e)) .map_err(|e| (candidate, e))
.map(Into::into) .map(Into::into)
.await .await
} }
impl From<Result<Change<SocketAddr, Client>, (MetaAddr, BoxError)>> for CrawlerAction { impl From<Result<Change<SocketAddr, peer::Client>, (MetaAddr, BoxError)>> for CrawlerAction {
fn from(dial_result: Result<Change<SocketAddr, Client>, (MetaAddr, BoxError)>) -> Self { fn from(dial_result: Result<Change<SocketAddr, peer::Client>, (MetaAddr, BoxError)>) -> Self {
use CrawlerAction::*; use CrawlerAction::*;
match dial_result { match dial_result {
Ok(peer_set_change) => HandshakeConnected { peer_set_change }, Ok(peer_set_change) => HandshakeConnected { peer_set_change },

View File

@ -0,0 +1,125 @@
//! Counting active connections used by Zebra.
//!
//! These types can be used to count any kind of active resource.
//! But they are currently used to track the number of open connections.
use std::fmt;
use tokio::sync::mpsc;
/// A signal sent by a [`Connection`] when it closes.
///
/// Used to count the number of open connections.
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub struct ConnectionClosed;
/// A counter for active connections.
///
/// Creates a [`ConnectionTracker`] to track each active connection.
/// When these trackers are dropped, the counter gets notified.
pub struct ActiveConnectionCounter {
/// The number of active peers tracked using this counter.
count: usize,
/// The channel used to send closed connection notifications.
close_notification_tx: mpsc::UnboundedSender<ConnectionClosed>,
/// The channel used to receive closed connection notifications.
close_notification_rx: mpsc::UnboundedReceiver<ConnectionClosed>,
}
impl fmt::Debug for ActiveConnectionCounter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ActiveConnectionCounter")
.field("count", &self.count)
.finish()
}
}
impl ActiveConnectionCounter {
/// Create and return a new active connection counter.
pub fn new_counter() -> Self {
// TODO: This channel will be bounded by the connection limit (#1850, #1851, #2902).
let (close_notification_tx, close_notification_rx) = mpsc::unbounded_channel();
Self {
count: 0,
close_notification_rx,
close_notification_tx,
}
}
/// Create and return a new [`ConnectionTracker`], and add 1 to this counter.
///
/// When the returned tracker is dropped, this counter will be notified, and decreased by 1.
pub fn track_connection(&mut self) -> ConnectionTracker {
ConnectionTracker::new(self)
}
/// Check for closed connection notifications, and return the current connection count.
pub fn update_count(&mut self) -> usize {
let previous_connections = self.count;
// We ignore errors here:
// - TryRecvError::Empty means that there are no pending close notifications
// - TryRecvError::Closed is unreachable, because we hold a sender
while let Ok(ConnectionClosed) = self.close_notification_rx.try_recv() {
self.count -= 1;
debug!(
open_connections = ?self.count,
?previous_connections,
"a peer connection was closed"
);
}
debug!(
open_connections = ?self.count,
?previous_connections,
"updated active connection count"
);
self.count
}
}
/// A per-connection tracker.
///
/// [`ActiveConnectionCounter`] creates a tracker instance for each active connection.
/// When these trackers are dropped, the counter gets notified.
pub struct ConnectionTracker {
/// The channel used to send closed connection notifications on drop.
close_notification_tx: mpsc::UnboundedSender<ConnectionClosed>,
}
impl fmt::Debug for ConnectionTracker {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ConnectionTracker").finish()
}
}
impl ConnectionTracker {
/// Create and return a new active connection tracker, and add 1 to `counter`.
///
/// When the returned tracker is dropped, `counter` will be notified, and decreased by 1.
fn new(counter: &mut ActiveConnectionCounter) -> Self {
counter.count += 1;
info!(open_connections = ?counter.count, "opening a new peer connection");
Self {
close_notification_tx: counter.close_notification_tx.clone(),
}
}
}
impl Drop for ConnectionTracker {
/// Notifies the corresponding connection counter that the connection has closed.
fn drop(&mut self) {
// We ignore disconnected errors, because the receiver can be dropped
// before some connections are dropped.
//
// TODO: This channel will be bounded by the connection limit (#1850, #1851, #2902).
let _ = self.close_notification_tx.send(ConnectionClosed);
}
}