From b67ead665abf4a3707d8ad0710778e950cee61c0 Mon Sep 17 00:00:00 2001 From: Jane Lusby Date: Wed, 17 Jun 2020 17:44:17 -0700 Subject: [PATCH] cleaning --- tower-batch/src/future.rs | 20 +++++++++--------- tower-batch/src/layer.rs | 18 ++++++++-------- tower-batch/src/service.rs | 32 ++++++++++++++-------------- tower-batch/src/worker.rs | 43 +++++++++++++++++++------------------- 4 files changed, 57 insertions(+), 56 deletions(-) diff --git a/tower-batch/src/future.rs b/tower-batch/src/future.rs index 1a5bd30e..f41301fa 100644 --- a/tower-batch/src/future.rs +++ b/tower-batch/src/future.rs @@ -13,20 +13,20 @@ use tower::Service; /// Future that completes when the batch processing is complete. #[pin_project] -pub struct ResponseFuture +pub struct ResponseFuture where S: Service>, { #[pin] - state: ResponseState, + state: ResponseState, } -impl Debug for ResponseFuture +impl Debug for ResponseFuture where S: Service>, S::Future: Debug, S::Error: Debug, - E: Debug, + E2: Debug, { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ResponseFuture") @@ -36,21 +36,21 @@ where } #[pin_project(project = ResponseStateProj)] -enum ResponseState +enum ResponseState where S: Service>, { - Failed(Option), + Failed(Option), Rx(#[pin] message::Rx), Poll(#[pin] S::Future), } -impl Debug for ResponseState +impl Debug for ResponseState where S: Service>, S::Future: Debug, S::Error: Debug, - E: Debug, + E2: Debug, { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { @@ -61,7 +61,7 @@ where } } -impl ResponseFuture +impl ResponseFuture where S: Service>, { @@ -71,7 +71,7 @@ where } } - pub(crate) fn failed(err: E) -> Self { + pub(crate) fn failed(err: E2) -> Self { ResponseFuture { state: ResponseState::Failed(Some(err)), } diff --git a/tower-batch/src/layer.rs b/tower-batch/src/layer.rs index 187b6f5c..b137d19f 100644 --- a/tower-batch/src/layer.rs +++ b/tower-batch/src/layer.rs @@ -9,14 +9,14 @@ use tower::Service; /// which means that this layer can only be used on the Tokio runtime. /// /// See the module documentation for more details. -pub struct BatchLayer { +pub struct BatchLayer { max_items: usize, max_latency: std::time::Duration, _p: PhantomData, - _e: PhantomData, + _e: PhantomData, } -impl BatchLayer { +impl BatchLayer { /// Creates a new `BatchLayer`. /// /// The wrapper is responsible for telling the inner service when to flush a @@ -34,23 +34,23 @@ impl BatchLayer { } } -impl Layer for BatchLayer +impl Layer for BatchLayer where S: Service> + Send + 'static, S::Future: Send, - S::Error: Clone + Into + Send + Sync, + S::Error: Clone + Into + Send + Sync, Request: Send + 'static, - E: Send + 'static, - crate::error::Closed: Into, + E2: Send + 'static, + crate::error::Closed: Into, { - type Service = Batch; + type Service = Batch; fn layer(&self, service: S) -> Self::Service { Batch::new(service, self.max_items, self.max_latency) } } -impl fmt::Debug for BatchLayer { +impl fmt::Debug for BatchLayer { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("BufferLayer") .field("max_items", &self.max_items) diff --git a/tower-batch/src/service.rs b/tower-batch/src/service.rs index 1454fad6..44ed9cc8 100644 --- a/tower-batch/src/service.rs +++ b/tower-batch/src/service.rs @@ -17,21 +17,21 @@ use tower::Service; /// /// See the module documentation for more details. #[derive(Debug)] -pub struct Batch +pub struct Batch where T: Service>, { tx: mpsc::Sender>, - handle: Handle, - _error_type: PhantomData, + handle: Handle, + _e: PhantomData, } -impl Batch +impl Batch where T: Service>, - T::Error: Into + Clone, - E: Send + 'static, - crate::error::Closed: Into, + T::Error: Into + Clone, + E2: Send + 'static, + crate::error::Closed: Into, // crate::error::Closed: Into<>::Error> + Send + Sync + 'static, // crate::error::ServiceError: Into<>::Error> + Send + Sync + 'static, { @@ -59,25 +59,25 @@ where Batch { tx, handle, - _error_type: PhantomData, + _e: PhantomData, } } - fn get_worker_error(&self) -> E { + fn get_worker_error(&self) -> E2 { self.handle.get_error_on_closed() } } -impl Service for Batch +impl Service for Batch where T: Service>, - crate::error::Closed: Into, - T::Error: Into + Clone, - E: Send + 'static, + crate::error::Closed: Into, + T::Error: Into + Clone, + E2: Send + 'static, { type Response = T::Response; - type Error = E; - type Future = ResponseFuture; + type Error = E2; + type Future = ResponseFuture; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { // If the inner service has errored, then we error here. @@ -127,7 +127,7 @@ where Self { tx: self.tx.clone(), handle: self.handle.clone(), - _error_type: PhantomData, + _e: PhantomData, } } } diff --git a/tower-batch/src/worker.rs b/tower-batch/src/worker.rs index 6fdcf4f4..3d4121a1 100644 --- a/tower-batch/src/worker.rs +++ b/tower-batch/src/worker.rs @@ -26,18 +26,18 @@ use tracing_futures::Instrument; /// implement (only call). #[pin_project] #[derive(Debug)] -pub struct Worker +pub struct Worker where - T: Service>, - T::Error: Into, + S: Service>, + S::Error: Into, { - rx: mpsc::Receiver>, - service: T, - failed: Option, - handle: Handle, + rx: mpsc::Receiver>, + service: S, + failed: Option, + handle: Handle, max_items: usize, max_latency: std::time::Duration, - _error_type: PhantomData, + _e: PhantomData, } /// Get the error out @@ -47,17 +47,17 @@ pub(crate) struct Handle { _e: PhantomData, } -impl Worker +impl Worker where - T: Service>, - T::Error: Into + Clone, + S: Service>, + S::Error: Into + Clone, { pub(crate) fn new( - service: T, - rx: mpsc::Receiver>, + service: S, + rx: mpsc::Receiver>, max_items: usize, max_latency: std::time::Duration, - ) -> (Handle, Worker) { + ) -> (Handle, Worker) { let handle = Handle { inner: Arc::new(Mutex::new(None)), _e: PhantomData, @@ -70,13 +70,13 @@ where failed: None, max_items, max_latency, - _error_type: PhantomData, + _e: PhantomData, }; (handle, worker) } - async fn process_req(&mut self, req: Request, tx: message::Tx) { + async fn process_req(&mut self, req: Request, tx: message::Tx) { if let Some(failed) = self.failed.clone() { tracing::trace!("notifying caller about worker failure"); let _ = tx.send(Err(failed)); @@ -171,11 +171,12 @@ where } } - fn failed(&mut self, error: T::Error) { - // The underlying service failed when we called `poll_ready` on it with the given `error`. We - // need to communicate this to all the `Buffer` handles. To do so, we wrap up the error in - // an `Arc`, send that `Arc` to all pending requests, and store it so that subsequent - // requests will also fail with the same error. + fn failed(&mut self, error: S::Error) { + // The underlying service failed when we called `poll_ready` on it with + // the given `error`. We need to communicate this to all the `Buffer` + // handles. To do so, we require that `S::Error` implements `Clone`, + // clone the error to send to all pending requests, and store it so that + // subsequent requests will also fail with the same error. // Note that we need to handle the case where some handle is concurrently trying to send us // a request. We need to make sure that *either* the send of the request fails *or* it