diff --git a/Cargo.lock b/Cargo.lock index 768b68b6..45ac6fdd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1927,11 +1927,13 @@ dependencies = [ name = "tower-batch" version = "0.1.0" dependencies = [ + "futures", "futures-core", "pin-project", "tokio", "tower", "tracing", + "tracing-futures", ] [[package]] diff --git a/tower-batch/Cargo.toml b/tower-batch/Cargo.toml index 8290e58c..4419b90e 100644 --- a/tower-batch/Cargo.toml +++ b/tower-batch/Cargo.toml @@ -6,8 +6,10 @@ license = "MIT" edition = "2018" [dependencies] -tokio = { version = "0.2", features = ["time"] } +tokio = { version = "0.2", features = ["time", "sync", "stream"] } tower = "0.3" futures-core = "0.3.5" pin-project = "0.4.20" tracing = "0.1.15" +tracing-futures = "0.2.4" +futures = "0.3.5" diff --git a/tower-batch/src/error.rs b/tower-batch/src/error.rs index f8753902..418957fc 100644 --- a/tower-batch/src/error.rs +++ b/tower-batch/src/error.rs @@ -1,15 +1,15 @@ -//! Error types for the `Buffer` middleware. +//! Error types for the `Batch` middleware. use crate::BoxError; use std::{fmt, sync::Arc}; -/// An error produced by a `Service` wrapped by a `Buffer` +/// An error produced by a `Service` wrapped by a `Batch`. #[derive(Debug)] pub struct ServiceError { inner: Arc, } -/// An error produced when the a buffer's worker closes unexpectedly. +/// An error produced when the batch worker closes unexpectedly. pub struct Closed { _p: (), } @@ -32,7 +32,7 @@ impl ServiceError { impl fmt::Display for ServiceError { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - write!(fmt, "buffered service failed: {}", self.inner) + write!(fmt, "batching service failed: {}", self.inner) } } @@ -58,7 +58,7 @@ impl fmt::Debug for Closed { impl fmt::Display for Closed { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.write_str("buffer's worker closed unexpectedly") + fmt.write_str("batch worker closed unexpectedly") } } diff --git a/tower-batch/src/future.rs b/tower-batch/src/future.rs index 6b5ae641..cd1292ba 100644 --- a/tower-batch/src/future.rs +++ b/tower-batch/src/future.rs @@ -1,4 +1,4 @@ -//! Future types for the `Buffer` middleware. +//! Future types for the `Batch` middleware. use super::{error::Closed, message}; use futures_core::ready; @@ -9,7 +9,7 @@ use std::{ task::{Context, Poll}, }; -/// Future that completes when the buffered service eventually services the submitted request. +/// Future that completes when the batch processing is complete. #[pin_project] #[derive(Debug)] pub struct ResponseFuture { diff --git a/tower-batch/src/layer.rs b/tower-batch/src/layer.rs index 5bddc924..8fda7ba3 100644 --- a/tower-batch/src/layer.rs +++ b/tower-batch/src/layer.rs @@ -1,60 +1,56 @@ -use super::service::Buffer; +use super::{service::Batch, BatchControl}; use std::{fmt, marker::PhantomData}; -use tower_layer::Layer; -use tower_service::Service; +use tower::layer::Layer; +use tower::Service; -/// Adds an mpsc buffer in front of an inner service. +/// Adds a layer performing batch processing of requests. /// /// The default Tokio executor is used to run the given service, /// which means that this layer can only be used on the Tokio runtime. /// /// See the module documentation for more details. -pub struct BufferLayer { - bound: usize, +pub struct BatchLayer { + max_items: usize, + max_latency: std::time::Duration, _p: PhantomData, } -impl BufferLayer { - /// Creates a new `BufferLayer` with the provided `bound`. +impl BatchLayer { + /// Creates a new `BatchLayer`. /// - /// `bound` gives the maximal number of requests that can be queued for the service before - /// backpressure is applied to callers. + /// The wrapper is responsible for telling the inner service when to flush a + /// batch of requests. Two parameters control this policy: /// - /// # A note on choosing a `bound` - /// - /// When `Buffer`'s implementation of `poll_ready` returns `Poll::Ready`, it reserves a - /// slot in the channel for the forthcoming `call()`. However, if this call doesn't arrive, - /// this reserved slot may be held up for a long time. As a result, it's advisable to set - /// `bound` to be at least the maximum number of concurrent requests the `Buffer` will see. - /// If you do not, all the slots in the buffer may be held up by futures that have just called - /// `poll_ready` but will not issue a `call`, which prevents other senders from issuing new - /// requests. - pub fn new(bound: usize) -> Self { - BufferLayer { - bound, + /// * `max_items` gives the maximum number of items per batch. + /// * `max_latency` gives the maximum latency for a batch item. + pub fn new(max_items: usize, max_latency: std::time::Duration) -> Self { + BatchLayer { + max_items, + max_latency, _p: PhantomData, } } } -impl Layer for BufferLayer +impl Layer for BatchLayer where - S: Service + Send + 'static, + S: Service> + Send + 'static, S::Future: Send, S::Error: Into + Send + Sync, Request: Send + 'static, { - type Service = Buffer; + type Service = Batch; fn layer(&self, service: S) -> Self::Service { - Buffer::new(service, self.bound) + Batch::new(service, self.max_items, self.max_latency) } } -impl fmt::Debug for BufferLayer { +impl fmt::Debug for BatchLayer { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("BufferLayer") - .field("bound", &self.bound) + .field("max_items", &self.max_items) + .field("max_latency", &self.max_latency) .finish() } } diff --git a/tower-batch/src/lib.rs b/tower-batch/src/lib.rs index eebee513..a76a0b56 100644 --- a/tower-batch/src/lib.rs +++ b/tower-batch/src/lib.rs @@ -7,5 +7,16 @@ mod worker; type BoxError = Box; -pub use self::layer::BufferLayer; -pub use self::service::Buffer; +pub enum BatchControl { + Item(R), + Flush, +} + +impl From for BatchControl { + fn from(req: R) -> BatchControl { + BatchControl::Item(req) + } +} + +pub use self::layer::BatchLayer; +pub use self::service::Batch; diff --git a/tower-batch/src/message.rs b/tower-batch/src/message.rs index 6d13aa12..dc73a6ad 100644 --- a/tower-batch/src/message.rs +++ b/tower-batch/src/message.rs @@ -1,7 +1,7 @@ use super::error::ServiceError; use tokio::sync::oneshot; -/// Message sent over buffer +/// Message sent to the batch worker #[derive(Debug)] pub(crate) struct Message { pub(crate) request: Request, diff --git a/tower-batch/src/service.rs b/tower-batch/src/service.rs index 14d11ab3..0b9b7cf9 100644 --- a/tower-batch/src/service.rs +++ b/tower-batch/src/service.rs @@ -2,6 +2,7 @@ use super::{ future::ResponseFuture, message::Message, worker::{Handle, Worker}, + BatchControl, }; use futures_core::ready; @@ -9,67 +10,45 @@ use std::task::{Context, Poll}; use tokio::sync::{mpsc, oneshot}; use tower::Service; -/// Adds an mpsc buffer in front of an inner service. +/// Allows batch processing of requests. /// /// See the module documentation for more details. #[derive(Debug)] -pub struct Buffer +pub struct Batch where - T: Service, + T: Service>, { tx: mpsc::Sender>, handle: Handle, } -impl Buffer +impl Batch where - T: Service, + T: Service>, T::Error: Into, { - /// Creates a new `Buffer` wrapping `service`. + /// Creates a new `Batch` wrapping `service`. /// - /// `bound` gives the maximal number of requests that can be queued for the service before - /// backpressure is applied to callers. + /// The wrapper is responsible for telling the inner service when to flush a + /// batch of requests. Two parameters control this policy: /// - /// The default Tokio executor is used to run the given service, which means that this method - /// must be called while on the Tokio runtime. + /// * `max_items` gives the maximum number of items per batch. + /// * `max_latency` gives the maximum latency for a batch item. /// - /// # A note on choosing a `bound` - /// - /// When `Buffer`'s implementation of `poll_ready` returns `Poll::Ready`, it reserves a - /// slot in the channel for the forthcoming `call()`. However, if this call doesn't arrive, - /// this reserved slot may be held up for a long time. As a result, it's advisable to set - /// `bound` to be at least the maximum number of concurrent requests the `Buffer` will see. - /// If you do not, all the slots in the buffer may be held up by futures that have just called - /// `poll_ready` but will not issue a `call`, which prevents other senders from issuing new - /// requests. - pub fn new(service: T, bound: usize) -> Self + /// The default Tokio executor is used to run the given service, which means + /// that this method must be called while on the Tokio runtime. + pub fn new(service: T, max_items: usize, max_latency: std::time::Duration) -> Self where T: Send + 'static, T::Future: Send, T::Error: Send + Sync, Request: Send + 'static, { - let (tx, rx) = mpsc::channel(bound); - let (handle, worker) = Worker::new(service, rx); - tokio::spawn(worker); - Buffer { tx, handle } - } - - /// Creates a new `Buffer` wrapping `service`, but returns the background worker. - /// - /// This is useful if you do not want to spawn directly onto the `tokio` runtime - /// but instead want to use your own executor. This will return the `Buffer` and - /// the background `Worker` that you can then spawn. - pub fn pair(service: T, bound: usize) -> (Buffer, Worker) - where - T: Send + 'static, - T::Error: Send + Sync, - Request: Send + 'static, - { - let (tx, rx) = mpsc::channel(bound); - let (handle, worker) = Worker::new(service, rx); - (Buffer { tx, handle }, worker) + // XXX(hdevalence): is this bound good + let (tx, rx) = mpsc::channel(1); + let (handle, worker) = Worker::new(service, rx, max_items, max_latency); + tokio::spawn(worker.run()); + Batch { tx, handle } } fn get_worker_error(&self) -> crate::BoxError { @@ -77,9 +56,9 @@ where } } -impl Service for Buffer +impl Service for Batch where - T: Service, + T: Service>, T::Error: Into, { type Response = T::Response; @@ -106,7 +85,7 @@ where // if we didn't do this, events on the worker related to this span wouldn't be counted // towards that span since the worker would have no way of entering it. let span = tracing::Span::current(); - tracing::trace!(parent: &span, "sending request to buffer worker"); + tracing::trace!(parent: &span, "sending request to batch worker"); match self.tx.try_send(Message { request, span, tx }) { Err(mpsc::error::TrySendError::Closed(_)) => { ResponseFuture::failed(self.get_worker_error()) @@ -126,9 +105,9 @@ where } } -impl Clone for Buffer +impl Clone for Batch where - T: Service, + T: Service>, { fn clone(&self) -> Self { Self { diff --git a/tower-batch/src/worker.rs b/tower-batch/src/worker.rs index b77b6dc4..8d4ab367 100644 --- a/tower-batch/src/worker.rs +++ b/tower-batch/src/worker.rs @@ -1,17 +1,18 @@ use super::{ error::{Closed, ServiceError}, - message::Message, + message::{self, Message}, + BatchControl, }; -use futures_core::ready; +use futures::future::TryFutureExt; use pin_project::pin_project; use std::sync::{Arc, Mutex}; -use std::{ - future::Future, - pin::Pin, - task::{Context, Poll}, +use tokio::{ + stream::StreamExt, + sync::mpsc, + time::{delay_for, Delay}, }; -use tokio::sync::mpsc; -use tower::Service; +use tower::{Service, ServiceExt}; +use tracing_futures::Instrument; /// Task that handles processing the buffer. This type should not be used /// directly, instead `Buffer` requires an `Executor` that can accept this task. @@ -24,15 +25,15 @@ use tower::Service; #[derive(Debug)] pub struct Worker where - T: Service, + T: Service>, T::Error: Into, { - current_message: Option>, rx: mpsc::Receiver>, service: T, - finish: bool, failed: Option, handle: Handle, + max_items: usize, + max_latency: std::time::Duration, } /// Get the error out @@ -43,66 +44,125 @@ pub(crate) struct Handle { impl Worker where - T: Service, + T: Service>, T::Error: Into, { pub(crate) fn new( service: T, rx: mpsc::Receiver>, + max_items: usize, + max_latency: std::time::Duration, ) -> (Handle, Worker) { let handle = Handle { inner: Arc::new(Mutex::new(None)), }; let worker = Worker { - current_message: None, - finish: false, - failed: None, rx, service, handle: handle.clone(), + failed: None, + max_items, + max_latency, }; (handle, worker) } - /// Return the next queued Message that hasn't been canceled. - /// - /// If a `Message` is returned, the `bool` is true if this is the first time we received this - /// message, and false otherwise (i.e., we tried to forward it to the backing service before). - fn poll_next_msg( - &mut self, - cx: &mut Context<'_>, - ) -> Poll, bool)>> { - if self.finish { - // We've already received None and are shutting down - return Poll::Ready(None); - } - - tracing::trace!("worker polling for next message"); - if let Some(mut msg) = self.current_message.take() { - // poll_closed returns Poll::Ready is the receiver is dropped. - // Returning Pending means it is still alive, so we should still - // use it. - if msg.tx.poll_closed(cx).is_pending() { - tracing::trace!("resuming buffered request"); - return Poll::Ready(Some((msg, false))); + async fn process_req(&mut self, req: Request, tx: message::Tx) { + if let Some(ref failed) = self.failed { + tracing::trace!("notifying caller about worker failure"); + let _ = tx.send(Err(failed.clone())); + } else { + match self.service.ready_and().await { + Ok(svc) => { + let rsp = svc.call(req.into()); + let _ = tx.send(Ok(rsp)); + } + Err(e) => { + self.failed(e.into()); + let _ = tx.send(Err(self + .failed + .as_ref() + .expect("Worker::failed did not set self.failed?") + .clone())); + } } - - tracing::trace!("dropping cancelled buffered request"); } + } - // Get the next request - while let Some(mut msg) = ready!(Pin::new(&mut self.rx).poll_recv(cx)) { - if msg.tx.poll_closed(cx).is_pending() { - tracing::trace!("processing new request"); - return Poll::Ready(Some((msg, true))); + async fn flush_service(&mut self) { + if let Err(e) = self + .service + .ready_and() + .and_then(|svc| svc.call(BatchControl::Flush)) + .await + { + self.failed(e.into()); + } + } + + pub async fn run(mut self) { + use futures::future::Either::{Left, Right}; + // The timer is started when the first entry of a new batch is + // submitted, so that the batch latency of all entries is at most + // self.max_latency. However, we don't keep the timer running unless + // there is a pending request to prevent wakeups on idle services. + let mut timer: Option = None; + let mut pending_items = 0usize; + loop { + match timer { + None => match self.rx.next().await { + // The first message in a new batch. + Some(msg) => { + let span = msg.span; + self.process_req(msg.request, msg.tx) + // Apply the provided span to request processing + .instrument(span) + .await; + timer = Some(delay_for(self.max_latency)); + pending_items = 1; + } + // No more messages, ever. + None => return, + }, + Some(delay) => { + // Wait on either a new message or the batch timer. + match futures::future::select(self.rx.next(), delay).await { + Left((Some(msg), delay)) => { + let span = msg.span; + self.process_req(msg.request, msg.tx) + // Apply the provided span to request processing. + .instrument(span) + .await; + pending_items += 1; + // Check whether we have too many pending items. + if pending_items >= self.max_items { + // XXX(hdevalence): what span should instrument this? + self.flush_service().await; + // Now we have an empty batch. + timer = None; + pending_items = 0; + } else { + // The timer is still running, set it back! + timer = Some(delay); + } + } + // No more messages, ever. + Left((None, _delay)) => { + return; + } + // The batch timer elapsed. + Right(((), _next)) => { + // XXX(hdevalence): what span should instrument this? + self.flush_service().await; + timer = None; + pending_items = 0; + } + } + } } - // Otherwise, request is canceled, so pop the next one. - tracing::trace!("dropping cancelled request"); } - - Poll::Ready(None) } fn failed(&mut self, error: crate::BoxError) { @@ -132,82 +192,14 @@ where self.rx.close(); - // By closing the mpsc::Receiver, we know that poll_next_msg will soon return Ready(None), - // which will trigger the `self.finish == true` phase. We just need to make sure that any - // requests that we receive before we've exhausted the receiver receive the error: + // By closing the mpsc::Receiver, we know that that the run() loop will + // drain all pending requests. We just need to make sure that any + // requests that we receive before we've exhausted the receiver receive + // the error: self.failed = Some(error); } } -impl Future for Worker -where - T: Service, - T::Error: Into, -{ - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - if self.finish { - return Poll::Ready(()); - } - - loop { - match ready!(self.poll_next_msg(cx)) { - Some((msg, first)) => { - let _guard = msg.span.enter(); - if let Some(ref failed) = self.failed { - tracing::trace!("notifying caller about worker failure"); - let _ = msg.tx.send(Err(failed.clone())); - continue; - } - - // Wait for the service to be ready - tracing::trace!( - resumed = !first, - message = "worker received request; waiting for service readiness" - ); - match self.service.poll_ready(cx) { - Poll::Ready(Ok(())) => { - tracing::debug!(service.ready = true, message = "processing request"); - let response = self.service.call(msg.request); - - // Send the response future back to the sender. - // - // An error means the request had been canceled in-between - // our calls, the response future will just be dropped. - tracing::trace!("returning response future"); - let _ = msg.tx.send(Ok(response)); - } - Poll::Pending => { - tracing::trace!(service.ready = false, message = "delay"); - // Put out current message back in its slot. - drop(_guard); - self.current_message = Some(msg); - return Poll::Pending; - } - Poll::Ready(Err(e)) => { - let error = e.into(); - tracing::debug!({ %error }, "service failed"); - drop(_guard); - self.failed(error); - let _ = msg.tx.send(Err(self - .failed - .as_ref() - .expect("Worker::failed did not set self.failed?") - .clone())); - } - } - } - None => { - // No more more requests _ever_. - self.finish = true; - return Poll::Ready(()); - } - } - } - } -} - impl Handle { pub(crate) fn get_error_on_closed(&self) -> crate::BoxError { self.inner