Spawn initial handshakes in separated task (#3189)
* spawn connector * expand comment Co-authored-by: teor <teor@riseup.net> * fix error handling Co-authored-by: teor <teor@riseup.net>
This commit is contained in:
parent
37808eaadb
commit
f750535961
|
|
@ -246,7 +246,9 @@ async fn add_initial_peers<S>(
|
||||||
) -> Result<ActiveConnectionCounter, BoxError>
|
) -> Result<ActiveConnectionCounter, BoxError>
|
||||||
where
|
where
|
||||||
S: Service<OutboundConnectorRequest, Response = (SocketAddr, peer::Client), Error = BoxError>
|
S: Service<OutboundConnectorRequest, Response = (SocketAddr, peer::Client), Error = BoxError>
|
||||||
+ Clone,
|
+ Clone
|
||||||
|
+ Send
|
||||||
|
+ 'static,
|
||||||
S::Future: Send + 'static,
|
S::Future: Send + 'static,
|
||||||
{
|
{
|
||||||
let initial_peers = limit_initial_peers(&config, address_book_updater).await;
|
let initial_peers = limit_initial_peers(&config, address_book_updater).await;
|
||||||
|
|
@ -285,20 +287,26 @@ where
|
||||||
connection_tracker,
|
connection_tracker,
|
||||||
};
|
};
|
||||||
|
|
||||||
let outbound_connector = outbound_connector.clone();
|
// Construct a connector future but do not drive it yet ...
|
||||||
async move {
|
let outbound_connector_future = outbound_connector
|
||||||
// Rate-limit the connection, sleeping for an interval according
|
.clone()
|
||||||
// to its index in the list.
|
.oneshot(req)
|
||||||
|
.map_err(move |e| (addr, e));
|
||||||
|
|
||||||
|
// ... instead, spawn a new task to handle this connector
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let task = outbound_connector_future.await;
|
||||||
|
// Only spawn one outbound connector per `MIN_PEER_CONNECTION_INTERVAL`,
|
||||||
|
// sleeping for an interval according to its index in the list.
|
||||||
sleep(constants::MIN_PEER_CONNECTION_INTERVAL.saturating_mul(i as u32)).await;
|
sleep(constants::MIN_PEER_CONNECTION_INTERVAL.saturating_mul(i as u32)).await;
|
||||||
outbound_connector
|
task
|
||||||
.oneshot(req)
|
})
|
||||||
.map_err(move |e| (addr, e))
|
|
||||||
.await
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
while let Some(handshake_result) = handshakes.next().await {
|
while let Some(handshake_result) = handshakes.next().await {
|
||||||
|
let handshake_result =
|
||||||
|
handshake_result.expect("unexpected panic in initial peer handshake");
|
||||||
match handshake_result {
|
match handshake_result {
|
||||||
Ok(ref change) => {
|
Ok(ref change) => {
|
||||||
handshake_success_total += 1;
|
handshake_success_total += 1;
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue