Spawn tasks for handshake futures.
Previously, we relied on the owner of the handshake future to drive it to completion. This meant that there were cases where handshakes might never be completed, just because nothing was actively polling them.
This commit is contained in:
parent
43b2d35dda
commit
8c938af579
|
|
@ -11,7 +11,7 @@ use tower::{discover::Change, Service, ServiceExt};
|
||||||
|
|
||||||
use crate::{BoxedStdError, Request, Response};
|
use crate::{BoxedStdError, Request, Response};
|
||||||
|
|
||||||
use super::{Client, Handshake, HandshakeError};
|
use super::{Client, Handshake};
|
||||||
|
|
||||||
/// A wrapper around [`peer::Handshake`] that opens a TCP connection before
|
/// A wrapper around [`peer::Handshake`] that opens a TCP connection before
|
||||||
/// forwarding to the inner handshake service. Writing this as its own
|
/// forwarding to the inner handshake service. Writing this as its own
|
||||||
|
|
@ -40,7 +40,7 @@ where
|
||||||
S::Future: Send,
|
S::Future: Send,
|
||||||
{
|
{
|
||||||
type Response = Change<SocketAddr, Client>;
|
type Response = Change<SocketAddr, Client>;
|
||||||
type Error = HandshakeError;
|
type Error = BoxedStdError;
|
||||||
type Future =
|
type Future =
|
||||||
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
|
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -80,7 +80,7 @@ where
|
||||||
S::Future: Send,
|
S::Future: Send,
|
||||||
{
|
{
|
||||||
type Response = Client;
|
type Response = Client;
|
||||||
type Error = HandshakeError;
|
type Error = BoxedStdError;
|
||||||
type Future =
|
type Future =
|
||||||
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
|
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
|
||||||
|
|
||||||
|
|
@ -257,6 +257,16 @@ where
|
||||||
|
|
||||||
Ok(client)
|
Ok(client)
|
||||||
};
|
};
|
||||||
fut.instrument(connector_span).boxed()
|
|
||||||
|
// Spawn a new task to drive this handshake.
|
||||||
|
tokio::spawn(fut.instrument(connector_span))
|
||||||
|
// This is required to get error types to line up.
|
||||||
|
// Probably there's a nicer way to express this using combinators.
|
||||||
|
.map(|x| match x {
|
||||||
|
Ok(Ok(client)) => Ok(client),
|
||||||
|
Ok(Err(handshake_err)) => Err(handshake_err.into()),
|
||||||
|
Err(join_err) => Err(join_err.into()),
|
||||||
|
})
|
||||||
|
.boxed()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue