This commit is contained in:
Jane Lusby 2020-06-17 17:44:17 -07:00 committed by Deirdre Connolly
parent b727beb778
commit b67ead665a
4 changed files with 57 additions and 56 deletions

View File

@ -13,20 +13,20 @@ use tower::Service;
/// Future that completes when the batch processing is complete. /// Future that completes when the batch processing is complete.
#[pin_project] #[pin_project]
pub struct ResponseFuture<S, E, Response> pub struct ResponseFuture<S, E2, Response>
where where
S: Service<crate::BatchControl<Response>>, S: Service<crate::BatchControl<Response>>,
{ {
#[pin] #[pin]
state: ResponseState<S, E, Response>, state: ResponseState<S, E2, Response>,
} }
impl<S, E, Response> Debug for ResponseFuture<S, E, Response> impl<S, E2, Response> Debug for ResponseFuture<S, E2, Response>
where where
S: Service<crate::BatchControl<Response>>, S: Service<crate::BatchControl<Response>>,
S::Future: Debug, S::Future: Debug,
S::Error: Debug, S::Error: Debug,
E: Debug, E2: Debug,
{ {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ResponseFuture") f.debug_struct("ResponseFuture")
@ -36,21 +36,21 @@ where
} }
#[pin_project(project = ResponseStateProj)] #[pin_project(project = ResponseStateProj)]
enum ResponseState<S, E, Response> enum ResponseState<S, E2, Response>
where where
S: Service<crate::BatchControl<Response>>, S: Service<crate::BatchControl<Response>>,
{ {
Failed(Option<E>), Failed(Option<E2>),
Rx(#[pin] message::Rx<S::Future, S::Error>), Rx(#[pin] message::Rx<S::Future, S::Error>),
Poll(#[pin] S::Future), Poll(#[pin] S::Future),
} }
impl<S, E, Response> Debug for ResponseState<S, E, Response> impl<S, E2, Response> Debug for ResponseState<S, E2, Response>
where where
S: Service<crate::BatchControl<Response>>, S: Service<crate::BatchControl<Response>>,
S::Future: Debug, S::Future: Debug,
S::Error: Debug, S::Error: Debug,
E: Debug, E2: Debug,
{ {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self { match self {
@ -61,7 +61,7 @@ where
} }
} }
impl<S, E, Response> ResponseFuture<S, E, Response> impl<S, E2, Response> ResponseFuture<S, E2, Response>
where where
S: Service<crate::BatchControl<Response>>, S: Service<crate::BatchControl<Response>>,
{ {
@ -71,7 +71,7 @@ where
} }
} }
pub(crate) fn failed(err: E) -> Self { pub(crate) fn failed(err: E2) -> Self {
ResponseFuture { ResponseFuture {
state: ResponseState::Failed(Some(err)), state: ResponseState::Failed(Some(err)),
} }

View File

@ -9,14 +9,14 @@ use tower::Service;
/// which means that this layer can only be used on the Tokio runtime. /// which means that this layer can only be used on the Tokio runtime.
/// ///
/// See the module documentation for more details. /// See the module documentation for more details.
pub struct BatchLayer<Request, E> { pub struct BatchLayer<Request, E2> {
max_items: usize, max_items: usize,
max_latency: std::time::Duration, max_latency: std::time::Duration,
_p: PhantomData<fn(Request)>, _p: PhantomData<fn(Request)>,
_e: PhantomData<E>, _e: PhantomData<E2>,
} }
impl<Request, E> BatchLayer<Request, E> { impl<Request, E2> BatchLayer<Request, E2> {
/// Creates a new `BatchLayer`. /// Creates a new `BatchLayer`.
/// ///
/// The wrapper is responsible for telling the inner service when to flush a /// The wrapper is responsible for telling the inner service when to flush a
@ -34,23 +34,23 @@ impl<Request, E> BatchLayer<Request, E> {
} }
} }
impl<S, Request, E> Layer<S> for BatchLayer<Request, E> impl<S, Request, E2> Layer<S> for BatchLayer<Request, E2>
where where
S: Service<BatchControl<Request>> + Send + 'static, S: Service<BatchControl<Request>> + Send + 'static,
S::Future: Send, S::Future: Send,
S::Error: Clone + Into<E> + Send + Sync, S::Error: Clone + Into<E2> + Send + Sync,
Request: Send + 'static, Request: Send + 'static,
E: Send + 'static, E2: Send + 'static,
crate::error::Closed: Into<E>, crate::error::Closed: Into<E2>,
{ {
type Service = Batch<S, Request, E>; type Service = Batch<S, Request, E2>;
fn layer(&self, service: S) -> Self::Service { fn layer(&self, service: S) -> Self::Service {
Batch::new(service, self.max_items, self.max_latency) Batch::new(service, self.max_items, self.max_latency)
} }
} }
impl<Request, E> fmt::Debug for BatchLayer<Request, E> { impl<Request, E2> fmt::Debug for BatchLayer<Request, E2> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("BufferLayer") f.debug_struct("BufferLayer")
.field("max_items", &self.max_items) .field("max_items", &self.max_items)

View File

@ -17,21 +17,21 @@ use tower::Service;
/// ///
/// See the module documentation for more details. /// See the module documentation for more details.
#[derive(Debug)] #[derive(Debug)]
pub struct Batch<T, Request, E = crate::BoxError> pub struct Batch<T, Request, E2 = crate::BoxError>
where where
T: Service<BatchControl<Request>>, T: Service<BatchControl<Request>>,
{ {
tx: mpsc::Sender<Message<Request, T::Future, T::Error>>, tx: mpsc::Sender<Message<Request, T::Future, T::Error>>,
handle: Handle<T::Error, E>, handle: Handle<T::Error, E2>,
_error_type: PhantomData<E>, _e: PhantomData<E2>,
} }
impl<T, Request, E> Batch<T, Request, E> impl<T, Request, E2> Batch<T, Request, E2>
where where
T: Service<BatchControl<Request>>, T: Service<BatchControl<Request>>,
T::Error: Into<E> + Clone, T::Error: Into<E2> + Clone,
E: Send + 'static, E2: Send + 'static,
crate::error::Closed: Into<E>, crate::error::Closed: Into<E2>,
// crate::error::Closed: Into<<Self as Service<Request>>::Error> + Send + Sync + 'static, // crate::error::Closed: Into<<Self as Service<Request>>::Error> + Send + Sync + 'static,
// crate::error::ServiceError: Into<<Self as Service<Request>>::Error> + Send + Sync + 'static, // crate::error::ServiceError: Into<<Self as Service<Request>>::Error> + Send + Sync + 'static,
{ {
@ -59,25 +59,25 @@ where
Batch { Batch {
tx, tx,
handle, handle,
_error_type: PhantomData, _e: PhantomData,
} }
} }
fn get_worker_error(&self) -> E { fn get_worker_error(&self) -> E2 {
self.handle.get_error_on_closed() self.handle.get_error_on_closed()
} }
} }
impl<T, Request, E> Service<Request> for Batch<T, Request, E> impl<T, Request, E2> Service<Request> for Batch<T, Request, E2>
where where
T: Service<BatchControl<Request>>, T: Service<BatchControl<Request>>,
crate::error::Closed: Into<E>, crate::error::Closed: Into<E2>,
T::Error: Into<E> + Clone, T::Error: Into<E2> + Clone,
E: Send + 'static, E2: Send + 'static,
{ {
type Response = T::Response; type Response = T::Response;
type Error = E; type Error = E2;
type Future = ResponseFuture<T, E, Request>; type Future = ResponseFuture<T, E2, Request>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// If the inner service has errored, then we error here. // If the inner service has errored, then we error here.
@ -127,7 +127,7 @@ where
Self { Self {
tx: self.tx.clone(), tx: self.tx.clone(),
handle: self.handle.clone(), handle: self.handle.clone(),
_error_type: PhantomData, _e: PhantomData,
} }
} }
} }

View File

@ -26,18 +26,18 @@ use tracing_futures::Instrument;
/// implement (only call). /// implement (only call).
#[pin_project] #[pin_project]
#[derive(Debug)] #[derive(Debug)]
pub struct Worker<T, Request, E> pub struct Worker<S, Request, E2>
where where
T: Service<BatchControl<Request>>, S: Service<BatchControl<Request>>,
T::Error: Into<E>, S::Error: Into<E2>,
{ {
rx: mpsc::Receiver<Message<Request, T::Future, T::Error>>, rx: mpsc::Receiver<Message<Request, S::Future, S::Error>>,
service: T, service: S,
failed: Option<T::Error>, failed: Option<S::Error>,
handle: Handle<T::Error, E>, handle: Handle<S::Error, E2>,
max_items: usize, max_items: usize,
max_latency: std::time::Duration, max_latency: std::time::Duration,
_error_type: PhantomData<E>, _e: PhantomData<E2>,
} }
/// Get the error out /// Get the error out
@ -47,17 +47,17 @@ pub(crate) struct Handle<E, E2> {
_e: PhantomData<E2>, _e: PhantomData<E2>,
} }
impl<T, Request, E> Worker<T, Request, E> impl<S, Request, E2> Worker<S, Request, E2>
where where
T: Service<BatchControl<Request>>, S: Service<BatchControl<Request>>,
T::Error: Into<E> + Clone, S::Error: Into<E2> + Clone,
{ {
pub(crate) fn new( pub(crate) fn new(
service: T, service: S,
rx: mpsc::Receiver<Message<Request, T::Future, T::Error>>, rx: mpsc::Receiver<Message<Request, S::Future, S::Error>>,
max_items: usize, max_items: usize,
max_latency: std::time::Duration, max_latency: std::time::Duration,
) -> (Handle<T::Error, E>, Worker<T, Request, E>) { ) -> (Handle<S::Error, E2>, Worker<S, Request, E2>) {
let handle = Handle { let handle = Handle {
inner: Arc::new(Mutex::new(None)), inner: Arc::new(Mutex::new(None)),
_e: PhantomData, _e: PhantomData,
@ -70,13 +70,13 @@ where
failed: None, failed: None,
max_items, max_items,
max_latency, max_latency,
_error_type: PhantomData, _e: PhantomData,
}; };
(handle, worker) (handle, worker)
} }
async fn process_req(&mut self, req: Request, tx: message::Tx<T::Future, T::Error>) { async fn process_req(&mut self, req: Request, tx: message::Tx<S::Future, S::Error>) {
if let Some(failed) = self.failed.clone() { if let Some(failed) = self.failed.clone() {
tracing::trace!("notifying caller about worker failure"); tracing::trace!("notifying caller about worker failure");
let _ = tx.send(Err(failed)); let _ = tx.send(Err(failed));
@ -171,11 +171,12 @@ where
} }
} }
fn failed(&mut self, error: T::Error) { fn failed(&mut self, error: S::Error) {
// The underlying service failed when we called `poll_ready` on it with the given `error`. We // The underlying service failed when we called `poll_ready` on it with
// need to communicate this to all the `Buffer` handles. To do so, we wrap up the error in // the given `error`. We need to communicate this to all the `Buffer`
// an `Arc`, send that `Arc<E>` to all pending requests, and store it so that subsequent // handles. To do so, we require that `S::Error` implements `Clone`,
// requests will also fail with the same error. // clone the error to send to all pending requests, and store it so that
// subsequent requests will also fail with the same error.
// Note that we need to handle the case where some handle is concurrently trying to send us // Note that we need to handle the case where some handle is concurrently trying to send us
// a request. We need to make sure that *either* the send of the request fails *or* it // a request. We need to make sure that *either* the send of the request fails *or* it