diff --git a/.github/workflows/continous-integration-docker.patch-always.yml b/.github/workflows/continous-integration-docker.patch-always.yml index 82b7acec..5a9e1154 100644 --- a/.github/workflows/continous-integration-docker.patch-always.yml +++ b/.github/workflows/continous-integration-docker.patch-always.yml @@ -7,8 +7,6 @@ name: CI Docker on: pull_request: - branches: - - main jobs: regenerate-stateful-disks: diff --git a/.github/workflows/continous-integration-docker.patch.yml b/.github/workflows/continous-integration-docker.patch.yml index 87c10930..520e08dd 100644 --- a/.github/workflows/continous-integration-docker.patch.yml +++ b/.github/workflows/continous-integration-docker.patch.yml @@ -4,8 +4,6 @@ name: CI Docker # so they can be skipped when the modified files make the actual workflow run. on: pull_request: - branches: - - main paths-ignore: # code and tests - '**/*.rs' diff --git a/.github/workflows/continous-integration-docker.yml b/.github/workflows/continous-integration-docker.yml index f6fea3d8..800da62d 100644 --- a/.github/workflows/continous-integration-docker.yml +++ b/.github/workflows/continous-integration-docker.yml @@ -23,8 +23,6 @@ on: required: true pull_request: - branches: - - main paths: # code and tests - '**/*.rs' diff --git a/Cargo.lock b/Cargo.lock index 8cf1a2b4..17570527 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3792,9 +3792,9 @@ dependencies = [ [[package]] name = "rayon" -version = "1.5.1" +version = "1.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c06aca804d41dbc8ba42dfd964f0d01334eceb64314b9ecf7c5fad5188a06d90" +checksum = "bd99e5772ead8baa5215278c9b15bf92087709e9c1b2d1f97cdb5a183c933a7d" dependencies = [ "autocfg 1.1.0", "crossbeam-deque", @@ -3804,14 +3804,13 @@ dependencies = [ [[package]] name = "rayon-core" -version = "1.9.1" +version = "1.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d78120e2c850279833f1dd3582f730c4ab53ed95aeaaaa862a2a5c71b1656d8e" +checksum = "258bcdb5ac6dad48491bb2992db6b7cf74878b0384908af124823d118c99683f" dependencies = [ "crossbeam-channel", "crossbeam-deque", "crossbeam-utils", - "lazy_static", "num_cpus", ] @@ -5447,6 +5446,7 @@ dependencies = [ "futures-core", "pin-project 1.0.11", "rand 0.8.5", + "rayon", "tokio", "tokio-test", "tokio-util 0.7.3", @@ -6368,6 +6368,7 @@ dependencies = [ "proptest-derive", "rand 0.7.3", "rand 0.8.5", + "rayon", "serde", "spandoc", "thiserror", @@ -6576,6 +6577,7 @@ dependencies = [ "proptest-derive", "prost", "rand 0.8.5", + "rayon", "regex", "reqwest", "semver 1.0.11", diff --git a/tower-batch/Cargo.toml b/tower-batch/Cargo.toml index 9b962cc5..117219a9 100644 --- a/tower-batch/Cargo.toml +++ b/tower-batch/Cargo.toml @@ -9,6 +9,7 @@ edition = "2021" futures = "0.3.21" futures-core = "0.3.21" pin-project = "1.0.10" +rayon = "1.5.3" tokio = { version = "1.19.2", features = ["time", "sync", "tracing", "macros"] } tokio-util = "0.7.3" tower = { version = "0.4.13", features = ["util", "buffer"] } @@ -24,7 +25,6 @@ tokio = { version = "1.19.2", features = ["full", "tracing", "test-util"] } tokio-test = "0.4.2" tower-fallback = { path = "../tower-fallback/" } tower-test = "0.4.0" -tracing = "0.1.31" zebra-consensus = { path = "../zebra-consensus/" } zebra-test = { path = "../zebra-test/" } diff --git a/tower-batch/src/layer.rs b/tower-batch/src/layer.rs index 8fda7ba3..c3757eef 100644 --- a/tower-batch/src/layer.rs +++ b/tower-batch/src/layer.rs @@ -1,8 +1,12 @@ -use super::{service::Batch, BatchControl}; +//! Tower service layer for batch processing. + use std::{fmt, marker::PhantomData}; + use tower::layer::Layer; use tower::Service; +use super::{service::Batch, BatchControl}; + /// Adds a layer performing batch processing of requests. /// /// The default Tokio executor is used to run the given service, @@ -10,24 +14,31 @@ use tower::Service; /// /// See the module documentation for more details. pub struct BatchLayer { - max_items: usize, + max_items_in_batch: usize, + max_batches: Option, max_latency: std::time::Duration, - _p: PhantomData, + + // TODO: is the variance correct here? + // https://doc.rust-lang.org/1.33.0/nomicon/subtyping.html#variance + // https://doc.rust-lang.org/nomicon/phantom-data.html#table-of-phantomdata-patterns + _handles_requests: PhantomData, } impl BatchLayer { /// Creates a new `BatchLayer`. /// /// The wrapper is responsible for telling the inner service when to flush a - /// batch of requests. Two parameters control this policy: - /// - /// * `max_items` gives the maximum number of items per batch. - /// * `max_latency` gives the maximum latency for a batch item. - pub fn new(max_items: usize, max_latency: std::time::Duration) -> Self { + /// batch of requests. See [`Batch::new()`] for details. + pub fn new( + max_items_in_batch: usize, + max_batches: impl Into>, + max_latency: std::time::Duration, + ) -> Self { BatchLayer { - max_items, + max_items_in_batch, + max_batches: max_batches.into(), max_latency, - _p: PhantomData, + _handles_requests: PhantomData, } } } @@ -36,20 +47,27 @@ impl Layer for BatchLayer where S: Service> + Send + 'static, S::Future: Send, + S::Response: Send, S::Error: Into + Send + Sync, Request: Send + 'static, { type Service = Batch; fn layer(&self, service: S) -> Self::Service { - Batch::new(service, self.max_items, self.max_latency) + Batch::new( + service, + self.max_items_in_batch, + self.max_batches, + self.max_latency, + ) } } impl fmt::Debug for BatchLayer { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("BufferLayer") - .field("max_items", &self.max_items) + .field("max_items_in_batch", &self.max_items_in_batch) + .field("max_batches", &self.max_batches) .field("max_latency", &self.max_latency) .finish() } diff --git a/tower-batch/src/service.rs b/tower-batch/src/service.rs index 89456d55..29df281f 100644 --- a/tower-batch/src/service.rs +++ b/tower-batch/src/service.rs @@ -1,6 +1,7 @@ //! Wrapper service for batching items to an underlying service. use std::{ + cmp::max, fmt, future::Future, pin::Pin, @@ -25,6 +26,11 @@ use super::{ BatchControl, }; +/// The maximum number of batches in the queue. +/// +/// This avoids having very large queues on machines with hundreds or thousands of cores. +pub const QUEUE_BATCH_LIMIT: usize = 64; + /// Allows batch processing of requests. /// /// See the crate documentation for more details. @@ -32,6 +38,8 @@ pub struct Batch where T: Service>, { + // Batch management + // /// A custom-bounded channel for sending requests to the batch worker. /// /// Note: this actually _is_ bounded, but rather than using Tokio's unbounded @@ -53,6 +61,8 @@ where /// A semaphore permit that allows this service to send one message on `tx`. permit: Option, + // Errors + // /// An error handle shared between all service clones for the same worker. error_handle: ErrorHandle, @@ -71,6 +81,7 @@ where f.debug_struct(name) .field("tx", &self.tx) .field("semaphore", &self.semaphore) + .field("permit", &self.permit) .field("error_handle", &self.error_handle) .field("worker_handle", &self.worker_handle) .finish() @@ -80,26 +91,37 @@ where impl Batch where T: Service>, + T::Future: Send + 'static, T::Error: Into, { /// Creates a new `Batch` wrapping `service`. /// /// The wrapper is responsible for telling the inner service when to flush a - /// batch of requests. Two parameters control this policy: + /// batch of requests. These parameters control this policy: /// - /// * `max_items` gives the maximum number of items per batch. - /// * `max_latency` gives the maximum latency for a batch item. + /// * `max_items_in_batch` gives the maximum number of items per batch. + /// * `max_batches` is an upper bound on the number of batches in the queue, + /// and the number of concurrently executing batches. + /// If this is `None`, we use the current number of [`rayon`] threads. + /// The number of batches in the queue is also limited by [`QUEUE_BATCH_LIMIT`]. + /// * `max_latency` gives the maximum latency for a batch item to start verifying. /// /// The default Tokio executor is used to run the given service, which means /// that this method must be called while on the Tokio runtime. - pub fn new(service: T, max_items: usize, max_latency: std::time::Duration) -> Self + pub fn new( + service: T, + max_items_in_batch: usize, + max_batches: impl Into>, + max_latency: std::time::Duration, + ) -> Self where T: Send + 'static, T::Future: Send, + T::Response: Send, T::Error: Send + Sync, Request: Send + 'static, { - let (mut batch, worker) = Self::pair(service, max_items, max_latency); + let (mut batch, worker) = Self::pair(service, max_items_in_batch, max_batches, max_latency); let span = info_span!("batch worker", kind = std::any::type_name::()); @@ -131,7 +153,8 @@ where /// `Batch` and the background `Worker` that you can then spawn. pub fn pair( service: T, - max_items: usize, + max_items_in_batch: usize, + max_batches: impl Into>, max_latency: std::time::Duration, ) -> (Self, Worker) where @@ -141,16 +164,32 @@ where { let (tx, rx) = mpsc::unbounded_channel(); + // Clamp config to sensible values. + let max_items_in_batch = max(max_items_in_batch, 1); + let max_batches = max_batches + .into() + .unwrap_or_else(rayon::current_num_threads); + let max_batches_in_queue = max_batches.clamp(1, QUEUE_BATCH_LIMIT); + // The semaphore bound limits the maximum number of concurrent requests // (specifically, requests which got a `Ready` from `poll_ready`, but haven't // used their semaphore reservation in a `call` yet). - // We choose a bound that allows callers to check readiness for every item in - // a batch, then actually submit those items. - let semaphore = Semaphore::new(max_items); + // + // We choose a bound that allows callers to check readiness for one batch per rayon CPU thread. + // This helps keep all CPUs filled with work: there is one batch executing, and another ready to go. + // Often there is only one verifier running, when that happens we want it to take all the cores. + let semaphore = Semaphore::new(max_items_in_batch * max_batches_in_queue); let semaphore = PollSemaphore::new(Arc::new(semaphore)); - let (error_handle, worker) = - Worker::new(service, rx, max_items, max_latency, semaphore.clone()); + let (error_handle, worker) = Worker::new( + service, + rx, + max_items_in_batch, + max_batches, + max_latency, + semaphore.clone(), + ); + let batch = Batch { tx, semaphore, @@ -182,6 +221,7 @@ where impl Service for Batch where T: Service>, + T::Future: Send + 'static, T::Error: Into, { type Response = T::Response; diff --git a/tower-batch/src/worker.rs b/tower-batch/src/worker.rs index 7f55dee6..2613f2ea 100644 --- a/tower-batch/src/worker.rs +++ b/tower-batch/src/worker.rs @@ -5,7 +5,11 @@ use std::{ sync::{Arc, Mutex}, }; -use futures::future::TryFutureExt; +use futures::{ + future::{BoxFuture, OptionFuture}, + stream::FuturesUnordered, + FutureExt, StreamExt, +}; use pin_project::pin_project; use tokio::{ sync::mpsc, @@ -33,28 +37,52 @@ use super::{ pub struct Worker where T: Service>, + T::Future: Send + 'static, T::Error: Into, { + // Batch management + // /// A semaphore-bounded channel for receiving requests from the batch wrapper service. rx: mpsc::UnboundedReceiver>, /// The wrapped service that processes batches. service: T, + /// The number of pending items sent to `service`, since the last batch flush. + pending_items: usize, + + /// The timer for the pending batch, if it has any items. + /// + /// The timer is started when the first entry of a new batch is + /// submitted, so that the batch latency of all entries is at most + /// self.max_latency. However, we don't keep the timer running unless + /// there is a pending request to prevent wakeups on idle services. + pending_batch_timer: Option>>, + + /// The batches that the worker is concurrently executing. + concurrent_batches: FuturesUnordered>>, + + // Errors and termination + // /// An error that's populated on permanent service failure. failed: Option, /// A shared error handle that's populated on permanent service failure. error_handle: ErrorHandle, - /// The maximum number of items allowed in a batch. - max_items: usize, - - /// The maximum delay before processing a batch with fewer than `max_items`. - max_latency: std::time::Duration, - /// A cloned copy of the wrapper service's semaphore, used to close the semaphore. close: PollSemaphore, + + // Config + // + /// The maximum number of items allowed in a batch. + max_items_in_batch: usize, + + /// The maximum number of batches that are allowed to run concurrently. + max_concurrent_batches: usize, + + /// The maximum delay before processing a batch with fewer than `max_items_in_batch`. + max_latency: std::time::Duration, } /// Get the error out @@ -66,12 +94,17 @@ pub(crate) struct ErrorHandle { impl Worker where T: Service>, + T::Future: Send + 'static, T::Error: Into, { + /// Creates a new batch worker. + /// + /// See [`Service::new()`](crate::Service::new) for details. pub(crate) fn new( service: T, rx: mpsc::UnboundedReceiver>, - max_items: usize, + max_items_in_batch: usize, + max_concurrent_batches: usize, max_latency: std::time::Duration, close: PollSemaphore, ) -> (ErrorHandle, Worker) { @@ -82,134 +115,184 @@ where let worker = Worker { rx, service, - error_handle: error_handle.clone(), + pending_items: 0, + pending_batch_timer: None, + concurrent_batches: FuturesUnordered::new(), failed: None, - max_items, - max_latency, + error_handle: error_handle.clone(), close, + max_items_in_batch, + max_concurrent_batches, + max_latency, }; (error_handle, worker) } + /// Process a single worker request. async fn process_req(&mut self, req: Request, tx: message::Tx) { - if let Some(ref failed) = self.failed { - tracing::trace!("notifying caller about worker failure"); - let _ = tx.send(Err(failed.clone())); - } else { - match self.service.ready().await { - Ok(svc) => { - let rsp = svc.call(req.into()); - let _ = tx.send(Ok(rsp)); - } - Err(e) => { - self.failed(e.into()); - let _ = tx.send(Err(self - .failed - .as_ref() - .expect("Worker::failed did not set self.failed?") - .clone())); + if let Some(ref error) = self.failed { + tracing::trace!( + ?error, + "notifying batch request caller about worker failure", + ); + let _ = tx.send(Err(error.clone())); + return; + } - // Wake any tasks waiting on channel capacity. - tracing::debug!("waking pending tasks"); - self.close.close(); - } + match self.service.ready().await { + Ok(svc) => { + let rsp = svc.call(req.into()); + let _ = tx.send(Ok(rsp)); + + self.pending_items += 1; + } + Err(e) => { + self.failed(e.into()); + let _ = tx.send(Err(self + .failed + .as_ref() + .expect("Worker::failed did not set self.failed?") + .clone())); } } } + /// Tell the inner service to flush the current batch. + /// + /// Waits until the inner service is ready, + /// then stores a future which resolves when the batch finishes. async fn flush_service(&mut self) { - if let Err(e) = self - .service - .ready() - .and_then(|svc| svc.call(BatchControl::Flush)) - .await - { - self.failed(e.into()); + if self.failed.is_some() { + tracing::trace!("worker failure: skipping flush"); + return; } - // Correctness: allow other tasks to run at the end of every batch. - tokio::task::yield_now().await; + match self.service.ready().await { + Ok(ready_service) => { + let flush_future = ready_service.call(BatchControl::Flush); + self.concurrent_batches.push(flush_future.boxed()); + + // Now we have an empty batch. + self.pending_items = 0; + self.pending_batch_timer = None; + } + Err(error) => { + self.failed(error.into()); + } + } } + /// Is the current number of concurrent batches above the configured limit? + fn can_spawn_new_batches(&self) -> bool { + self.concurrent_batches.len() < self.max_concurrent_batches + } + + /// Run loop for batch requests, which implements the batch policies. + /// + /// See [`Service::new()`](crate::Service::new) for details. pub async fn run(mut self) { - // The timer is started when the first entry of a new batch is - // submitted, so that the batch latency of all entries is at most - // self.max_latency. However, we don't keep the timer running unless - // there is a pending request to prevent wakeups on idle services. - let mut timer: Option>> = None; - let mut pending_items = 0usize; loop { - match timer.as_mut() { - None => match self.rx.recv().await { - // The first message in a new batch. + // Wait on either a new message or the batch timer. + // + // If both are ready, end the batch now, because the timer has elapsed. + // If the timer elapses, any pending messages are preserved: + // https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety + tokio::select! { + biased; + + batch_result = self.concurrent_batches.next(), if !self.concurrent_batches.is_empty() => match batch_result.expect("only returns None when empty") { + Ok(_response) => { + tracing::trace!( + pending_items = self.pending_items, + batch_deadline = ?self.pending_batch_timer.as_ref().map(|sleep| sleep.deadline()), + running_batches = self.concurrent_batches.len(), + "batch finished executing", + ); + } + Err(error) => { + let error = error.into(); + tracing::trace!(?error, "batch execution failed"); + self.failed(error); + } + }, + + Some(()) = OptionFuture::from(self.pending_batch_timer.as_mut()), if self.pending_batch_timer.as_ref().is_some() => { + tracing::trace!( + pending_items = self.pending_items, + batch_deadline = ?self.pending_batch_timer.as_ref().map(|sleep| sleep.deadline()), + running_batches = self.concurrent_batches.len(), + "batch timer expired", + ); + + // TODO: use a batch-specific span to instrument this future. + self.flush_service().await; + }, + + maybe_msg = self.rx.recv(), if self.can_spawn_new_batches() => match maybe_msg { Some(msg) => { + tracing::trace!( + pending_items = self.pending_items, + batch_deadline = ?self.pending_batch_timer.as_ref().map(|sleep| sleep.deadline()), + running_batches = self.concurrent_batches.len(), + "batch message received", + ); + let span = msg.span; + self.process_req(msg.request, msg.tx) - // Apply the provided span to request processing + // Apply the provided span to request processing. .instrument(span) .await; - timer = Some(Box::pin(sleep(self.max_latency))); - pending_items = 1; - } - // No more messages, ever. - None => return, - }, - Some(sleep) => { - // Wait on either a new message or the batch timer. - // - // If both are ready, end the batch now, because the timer has elapsed. - // If the timer elapses, any pending messages are preserved: - // https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety - tokio::select! { - biased; - // The batch timer elapsed. - () = sleep => { + // Check whether we have too many pending items. + if self.pending_items >= self.max_items_in_batch { + tracing::trace!( + pending_items = self.pending_items, + batch_deadline = ?self.pending_batch_timer.as_ref().map(|sleep| sleep.deadline()), + running_batches = self.concurrent_batches.len(), + "batch is full", + ); + // TODO: use a batch-specific span to instrument this future. self.flush_service().await; + } else if self.pending_items == 1 { + tracing::trace!( + pending_items = self.pending_items, + batch_deadline = ?self.pending_batch_timer.as_ref().map(|sleep| sleep.deadline()), + running_batches = self.concurrent_batches.len(), + "batch is new, starting timer", + ); - // Now we have an empty batch. - timer = None; - pending_items = 0; + // The first message in a new batch. + self.pending_batch_timer = Some(Box::pin(sleep(self.max_latency))); + } else { + tracing::trace!( + pending_items = self.pending_items, + batch_deadline = ?self.pending_batch_timer.as_ref().map(|sleep| sleep.deadline()), + running_batches = self.concurrent_batches.len(), + "waiting for full batch or batch timer", + ); } - - maybe_msg = self.rx.recv() => match maybe_msg { - Some(msg) => { - let span = msg.span; - self.process_req(msg.request, msg.tx) - // Apply the provided span to request processing. - .instrument(span) - .await; - pending_items += 1; - // Check whether we have too many pending items. - if pending_items >= self.max_items { - // TODO: use a batch-specific span to instrument this future. - self.flush_service().await; - - // Now we have an empty batch. - timer = None; - pending_items = 0; - } else { - // The timer is still running. - } - } - None => { - // No more messages, ever. - return; - } - }, } - } + None => { + tracing::trace!("batch channel closed and emptied, exiting worker task"); + + return; + } + }, } } } + /// Register an inner service failure. + /// + /// The underlying service failed when we called `poll_ready` on it with the given `error`. We + /// need to communicate this to all the `Buffer` handles. To do so, we wrap up the error in + /// an `Arc`, send that `Arc` to all pending requests, and store it so that subsequent + /// requests will also fail with the same error. fn failed(&mut self, error: crate::BoxError) { - // The underlying service failed when we called `poll_ready` on it with the given `error`. We - // need to communicate this to all the `Buffer` handles. To do so, we wrap up the error in - // an `Arc`, send that `Arc` to all pending requests, and store it so that subsequent - // requests will also fail with the same error. + tracing::debug!(?error, "batch worker error"); // Note that we need to handle the case where some error_handle is concurrently trying to send us // a request. We need to make sure that *either* the send of the request fails *or* it @@ -222,15 +305,23 @@ where let mut inner = self.error_handle.inner.lock().unwrap(); + // Ignore duplicate failures if inner.is_some() { - // Future::poll was called after we've already errored out! return; } *inner = Some(error.clone()); drop(inner); + tracing::trace!( + ?error, + "worker failure: waking pending requests so they can be failed", + ); self.rx.close(); + self.close.close(); + + // We don't schedule any batches on an errored service + self.pending_batch_timer = None; // By closing the mpsc::Receiver, we know that that the run() loop will // drain all pending requests. We just need to make sure that any @@ -263,16 +354,39 @@ impl Clone for ErrorHandle { impl PinnedDrop for Worker where T: Service>, + T::Future: Send + 'static, T::Error: Into, { fn drop(mut self: Pin<&mut Self>) { + tracing::trace!( + pending_items = self.pending_items, + batch_deadline = ?self.pending_batch_timer.as_ref().map(|sleep| sleep.deadline()), + running_batches = self.concurrent_batches.len(), + error = ?self.failed, + "dropping batch worker", + ); + // Fail pending tasks self.failed(Closed::new().into()); - // Clear queued requests - while self.rx.try_recv().is_ok() {} + // Fail queued requests + while let Ok(msg) = self.rx.try_recv() { + let _ = msg + .tx + .send(Err(self.failed.as_ref().expect("just set failed").clone())); + } - // Stop accepting reservations - self.close.close(); + // Clear any finished batches, ignoring any errors. + // Ignore any batches that are still executing, because we can't cancel them. + // + // now_or_never() can stop futures waking up, but that's ok here, + // because we're manually polling, then dropping the stream. + while let Some(Some(_)) = self + .as_mut() + .project() + .concurrent_batches + .next() + .now_or_never() + {} } } diff --git a/tower-batch/tests/ed25519.rs b/tower-batch/tests/ed25519.rs index 02dc16b7..76fa1538 100644 --- a/tower-batch/tests/ed25519.rs +++ b/tower-batch/tests/ed25519.rs @@ -61,7 +61,7 @@ async fn batch_flushes_on_max_items() -> Result<(), Report> { // flushing is happening based on hitting max_items. // // Create our own verifier, so we don't shut down a shared verifier used by other tests. - let verifier = Batch::new(Ed25519Verifier::default(), 10, Duration::from_secs(1000)); + let verifier = Batch::new(Ed25519Verifier::default(), 10, 5, Duration::from_secs(1000)); timeout(Duration::from_secs(1), sign_and_verify(verifier, 100, None)) .await .map_err(|e| eyre!(e))? @@ -79,7 +79,12 @@ async fn batch_flushes_on_max_latency() -> Result<(), Report> { // flushing is happening based on hitting max_latency. // // Create our own verifier, so we don't shut down a shared verifier used by other tests. - let verifier = Batch::new(Ed25519Verifier::default(), 100, Duration::from_millis(500)); + let verifier = Batch::new( + Ed25519Verifier::default(), + 100, + 10, + Duration::from_millis(500), + ); timeout(Duration::from_secs(1), sign_and_verify(verifier, 10, None)) .await .map_err(|e| eyre!(e))? @@ -94,7 +99,12 @@ async fn fallback_verification() -> Result<(), Report> { // Create our own verifier, so we don't shut down a shared verifier used by other tests. let verifier = Fallback::new( - Batch::new(Ed25519Verifier::default(), 10, Duration::from_millis(100)), + Batch::new( + Ed25519Verifier::default(), + 10, + 1, + Duration::from_millis(100), + ), tower::service_fn(|item: Ed25519Item| async move { item.verify_single() }), ); diff --git a/tower-batch/tests/worker.rs b/tower-batch/tests/worker.rs index c2b0d214..7b591480 100644 --- a/tower-batch/tests/worker.rs +++ b/tower-batch/tests/worker.rs @@ -13,7 +13,7 @@ async fn wakes_pending_waiters_on_close() { let (service, mut handle) = mock::pair::<_, ()>(); - let (mut service, worker) = Batch::pair(service, 1, Duration::from_secs(1)); + let (mut service, worker) = Batch::pair(service, 1, 1, Duration::from_secs(1)); let mut worker = task::spawn(worker.run()); // // keep the request in the worker @@ -72,7 +72,7 @@ async fn wakes_pending_waiters_on_failure() { let (service, mut handle) = mock::pair::<_, ()>(); - let (mut service, worker) = Batch::pair(service, 1, Duration::from_secs(1)); + let (mut service, worker) = Batch::pair(service, 1, 1, Duration::from_secs(1)); let mut worker = task::spawn(worker.run()); // keep the request in the worker diff --git a/zebra-consensus/Cargo.toml b/zebra-consensus/Cargo.toml index abda323c..b55b6d4b 100644 --- a/zebra-consensus/Cargo.toml +++ b/zebra-consensus/Cargo.toml @@ -13,10 +13,10 @@ proptest-impl = ["proptest", "proptest-derive", "zebra-chain/proptest-impl", "ze blake2b_simd = "1.0.0" bellman = "0.13.0" bls12_381 = "0.7.0" +halo2 = { package = "halo2_proofs", version = "0.2.0" } jubjub = "0.9.0" rand = { version = "0.8.5", package = "rand" } - -halo2 = { package = "halo2_proofs", version = "0.2.0" } +rayon = "1.5.3" chrono = "0.4.19" dirs = "4.0.0" diff --git a/zebra-consensus/src/primitives/ed25519.rs b/zebra-consensus/src/primitives/ed25519.rs index 0dbac374..22b7f766 100644 --- a/zebra-consensus/src/primitives/ed25519.rs +++ b/zebra-consensus/src/primitives/ed25519.rs @@ -11,6 +11,7 @@ use futures::{future::BoxFuture, FutureExt}; use once_cell::sync::Lazy; use rand::thread_rng; +use rayon::prelude::*; use tokio::sync::watch; use tower::{util::ServiceFn, Service}; use tower_batch::{Batch, BatchControl}; @@ -48,6 +49,7 @@ pub static VERIFIER: Lazy< Batch::new( Verifier::default(), super::MAX_BATCH_SIZE, + None, super::MAX_BATCH_LATENCY, ), // We want to fallback to individual verification if batch verification fails, @@ -106,38 +108,48 @@ impl Verifier { } /// Flush the batch using a thread pool, and return the result via the channel. - /// This function blocks until the batch is completed on the thread pool. + /// This returns immediately, usually before the batch is completed. fn flush_blocking(&mut self) { let (batch, tx) = self.take(); - // # Correctness + // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. // - // Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. - // - // TODO: replace with the rayon thread pool - tokio::task::block_in_place(|| Self::verify(batch, tx)); + // We don't care about execution order here, because this method is only called on drop. + tokio::task::block_in_place(|| rayon::spawn_fifo(|| Self::verify(batch, tx))); } /// Flush the batch using a thread pool, and return the result via the channel. /// This function returns a future that becomes ready when the batch is completed. fn flush_spawning(batch: BatchVerifier, tx: Sender) -> impl Future { - // # Correctness - // - // Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. - // - // TODO: spawn on the rayon thread pool inside spawn_blocking - tokio::task::spawn_blocking(|| Self::verify(batch, tx)) - .map(|join_result| join_result.expect("panic in ed25519 batch verifier")) + // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. + tokio::task::spawn_blocking(|| { + // TODO: + // - spawn batches so rayon executes them in FIFO order + // possible implementation: return a closure in a Future, + // then run it using scope_fifo() in the worker task, + // limiting the number of concurrent batches to the number of rayon threads + rayon::scope_fifo(|s| s.spawn_fifo(|_s| Self::verify(batch, tx))) + }) + .map(|join_result| join_result.expect("panic in ed25519 batch verifier")) } /// Verify a single item using a thread pool, and return the result. /// This function returns a future that becomes ready when the item is completed. fn verify_single_spawning(item: Item) -> impl Future { // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. - // - // TODO: spawn on the rayon thread pool inside spawn_blocking - tokio::task::spawn_blocking(|| item.verify_single()) - .map(|join_result| join_result.expect("panic in ed25519 fallback verifier")) + tokio::task::spawn_blocking(|| { + // Rayon doesn't have a spawn function that returns a value, + // so we use a parallel iterator instead. + // + // TODO: + // - when a batch fails, spawn all its individual items into rayon using Vec::par_iter() + // - spawn fallback individual verifications so rayon executes them in FIFO order, + // if possible + rayon::iter::once(item) + .map(|item| item.verify_single()) + .collect() + }) + .map(|join_result| join_result.expect("panic in ed25519 fallback verifier")) } } @@ -192,9 +204,7 @@ impl Service> for Verifier { impl Drop for Verifier { fn drop(&mut self) { // We need to flush the current batch in case there are still any pending futures. - // This blocks the current thread and any futures running on it, until the batch is complete. - // - // TODO: move the batch onto the rayon thread pool, then drop the verifier immediately. + // This returns immediately, usually before the batch is completed. self.flush_blocking(); } } diff --git a/zebra-consensus/src/primitives/ed25519/tests.rs b/zebra-consensus/src/primitives/ed25519/tests.rs index ff68311a..4c13a5d6 100644 --- a/zebra-consensus/src/primitives/ed25519/tests.rs +++ b/zebra-consensus/src/primitives/ed25519/tests.rs @@ -44,7 +44,7 @@ async fn batch_flushes_on_max_items() -> Result<()> { // Use a very long max_latency and a short timeout to check that // flushing is happening based on hitting max_items. - let verifier = Batch::new(Verifier::default(), 10, Duration::from_secs(1000)); + let verifier = Batch::new(Verifier::default(), 10, 5, Duration::from_secs(1000)); timeout(Duration::from_secs(5), sign_and_verify(verifier, 100)) .await? .map_err(|e| eyre!(e))?; @@ -63,7 +63,7 @@ async fn batch_flushes_on_max_latency() -> Result<()> { // Use a very high max_items and a short timeout to check that // flushing is happening based on hitting max_latency. - let verifier = Batch::new(Verifier::default(), 100, Duration::from_millis(500)); + let verifier = Batch::new(Verifier::default(), 100, 10, Duration::from_millis(500)); timeout(Duration::from_secs(5), sign_and_verify(verifier, 10)) .await? .map_err(|e| eyre!(e))?; diff --git a/zebra-consensus/src/primitives/groth16.rs b/zebra-consensus/src/primitives/groth16.rs index 3b358640..29e325fa 100644 --- a/zebra-consensus/src/primitives/groth16.rs +++ b/zebra-consensus/src/primitives/groth16.rs @@ -17,6 +17,8 @@ use bls12_381::Bls12; use futures::{future::BoxFuture, FutureExt}; use once_cell::sync::Lazy; use rand::thread_rng; + +use rayon::prelude::*; use tokio::sync::watch; use tower::{util::ServiceFn, Service}; @@ -78,6 +80,7 @@ pub static SPEND_VERIFIER: Lazy< Batch::new( Verifier::new(&GROTH16_PARAMETERS.sapling.spend.vk), super::MAX_BATCH_SIZE, + None, super::MAX_BATCH_LATENCY, ), // We want to fallback to individual verification if batch verification fails, @@ -116,6 +119,7 @@ pub static OUTPUT_VERIFIER: Lazy< Batch::new( Verifier::new(&GROTH16_PARAMETERS.sapling.output.vk), super::MAX_BATCH_SIZE, + None, super::MAX_BATCH_LATENCY, ), // We want to fallback to individual verification if batch verification @@ -401,16 +405,14 @@ impl Verifier { } /// Flush the batch using a thread pool, and return the result via the channel. - /// This function blocks until the batch is completed on the thread pool. + /// This returns immediately, usually before the batch is completed. fn flush_blocking(&mut self) { let (batch, vk, tx) = self.take(); - // # Correctness + // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. // - // Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. - // - // TODO: replace with the rayon thread pool - tokio::task::block_in_place(|| Self::verify(batch, vk, tx)); + // We don't care about execution order here, because this method is only called on drop. + tokio::task::block_in_place(|| rayon::spawn_fifo(|| Self::verify(batch, vk, tx))); } /// Flush the batch using a thread pool, and return the result via the channel. @@ -420,13 +422,16 @@ impl Verifier { vk: &'static BatchVerifyingKey, tx: Sender, ) -> impl Future { - // # Correctness - // - // Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. - // - // TODO: spawn on the rayon thread pool inside spawn_blocking - tokio::task::spawn_blocking(move || Self::verify(batch, vk, tx)) - .map(|join_result| join_result.expect("panic in groth16 batch verifier")) + // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. + tokio::task::spawn_blocking(move || { + // TODO: + // - spawn batches so rayon executes them in FIFO order + // possible implementation: return a closure in a Future, + // then run it using scope_fifo() in the worker task, + // limiting the number of concurrent batches to the number of rayon threads + rayon::scope_fifo(move |s| s.spawn_fifo(move |_s| Self::verify(batch, vk, tx))) + }) + .map(|join_result| join_result.expect("panic in groth16 batch verifier")) } /// Verify a single item using a thread pool, and return the result. @@ -436,10 +441,19 @@ impl Verifier { pvk: &'static ItemVerifyingKey, ) -> impl Future { // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. - // - // TODO: spawn on the rayon thread pool inside spawn_blocking - tokio::task::spawn_blocking(move || item.verify_single(pvk)) - .map(|join_result| join_result.expect("panic in groth16 fallback verifier")) + tokio::task::spawn_blocking(move || { + // Rayon doesn't have a spawn function that returns a value, + // so we use a parallel iterator instead. + // + // TODO: + // - when a batch fails, spawn all its individual items into rayon using Vec::par_iter() + // - spawn fallback individual verifications so rayon executes them in FIFO order, + // if possible + rayon::iter::once(item) + .map(move |item| item.verify_single(pvk)) + .collect() + }) + .map(|join_result| join_result.expect("panic in groth16 fallback verifier")) } } @@ -510,9 +524,7 @@ impl Service> for Verifier { impl Drop for Verifier { fn drop(&mut self) { // We need to flush the current batch in case there are still any pending futures. - // This blocks the current thread and any futures running on it, until the batch is complete. - // - // TODO: move the batch onto the rayon thread pool, then drop the verifier immediately. + // This returns immediately, usually before the batch is completed. self.flush_blocking() } } diff --git a/zebra-consensus/src/primitives/groth16/tests.rs b/zebra-consensus/src/primitives/groth16/tests.rs index 9b2b0cca..cb349f04 100644 --- a/zebra-consensus/src/primitives/groth16/tests.rs +++ b/zebra-consensus/src/primitives/groth16/tests.rs @@ -75,6 +75,7 @@ async fn verify_sapling_groth16() { Batch::new( Verifier::new(&GROTH16_PARAMETERS.sapling.spend.vk), crate::primitives::MAX_BATCH_SIZE, + None, crate::primitives::MAX_BATCH_LATENCY, ), tower::service_fn( @@ -87,6 +88,7 @@ async fn verify_sapling_groth16() { Batch::new( Verifier::new(&GROTH16_PARAMETERS.sapling.output.vk), crate::primitives::MAX_BATCH_SIZE, + None, crate::primitives::MAX_BATCH_LATENCY, ), tower::service_fn( @@ -179,6 +181,7 @@ async fn correctly_err_on_invalid_output_proof() { Batch::new( Verifier::new(&GROTH16_PARAMETERS.sapling.output.vk), crate::primitives::MAX_BATCH_SIZE, + None, crate::primitives::MAX_BATCH_LATENCY, ), tower::service_fn( diff --git a/zebra-consensus/src/primitives/halo2.rs b/zebra-consensus/src/primitives/halo2.rs index 2b121006..a4c0d47c 100644 --- a/zebra-consensus/src/primitives/halo2.rs +++ b/zebra-consensus/src/primitives/halo2.rs @@ -12,6 +12,8 @@ use futures::{future::BoxFuture, FutureExt}; use once_cell::sync::Lazy; use orchard::circuit::VerifyingKey; use rand::{thread_rng, CryptoRng, RngCore}; + +use rayon::prelude::*; use thiserror::Error; use tokio::sync::watch; use tower::{util::ServiceFn, Service}; @@ -203,6 +205,7 @@ pub static VERIFIER: Lazy< Batch::new( Verifier::new(&VERIFYING_KEY), HALO2_MAX_BATCH_SIZE, + None, super::MAX_BATCH_LATENCY, ), // We want to fallback to individual verification if batch verification fails, @@ -269,16 +272,14 @@ impl Verifier { } /// Flush the batch using a thread pool, and return the result via the channel. - /// This function blocks until the batch is completed on the thread pool. + /// This returns immediately, usually before the batch is completed. fn flush_blocking(&mut self) { let (batch, vk, tx) = self.take(); - // # Correctness + // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. // - // Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. - // - // TODO: replace with the rayon thread pool - tokio::task::block_in_place(|| Self::verify(batch, vk, tx)); + // We don't care about execution order here, because this method is only called on drop. + tokio::task::block_in_place(|| rayon::spawn_fifo(|| Self::verify(batch, vk, tx))); } /// Flush the batch using a thread pool, and return the result via the channel. @@ -288,13 +289,16 @@ impl Verifier { vk: &'static BatchVerifyingKey, tx: Sender, ) -> impl Future { - // # Correctness - // - // Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. - // - // TODO: spawn on the rayon thread pool inside spawn_blocking - tokio::task::spawn_blocking(move || Self::verify(batch, vk, tx)) - .map(|join_result| join_result.expect("panic in halo2 batch verifier")) + // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. + tokio::task::spawn_blocking(move || { + // TODO: + // - spawn batches so rayon executes them in FIFO order + // possible implementation: return a closure in a Future, + // then run it using scope_fifo() in the worker task, + // limiting the number of concurrent batches to the number of rayon threads + rayon::scope_fifo(move |s| s.spawn_fifo(move |_s| Self::verify(batch, vk, tx))) + }) + .map(|join_result| join_result.expect("panic in halo2 batch verifier")) } /// Verify a single item using a thread pool, and return the result. @@ -304,10 +308,19 @@ impl Verifier { pvk: &'static ItemVerifyingKey, ) -> impl Future { // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. - // - // TODO: spawn on the rayon thread pool inside spawn_blocking - tokio::task::spawn_blocking(move || item.verify_single(pvk).map_err(Halo2Error::from)) - .map(|join_result| join_result.expect("panic in halo2 fallback verifier")) + tokio::task::spawn_blocking(move || { + // Rayon doesn't have a spawn function that returns a value, + // so we use a parallel iterator instead. + // + // TODO: + // - when a batch fails, spawn all its individual items into rayon using Vec::par_iter() + // - spawn fallback individual verifications so rayon executes them in FIFO order, + // if possible + rayon::iter::once(item) + .map(move |item| item.verify_single(pvk).map_err(Halo2Error::from)) + .collect() + }) + .map(|join_result| join_result.expect("panic in halo2 fallback verifier")) } } @@ -377,9 +390,7 @@ impl Service> for Verifier { impl Drop for Verifier { fn drop(&mut self) { // We need to flush the current batch in case there are still any pending futures. - // This blocks the current thread and any futures running on it, until the batch is complete. - // - // TODO: move the batch onto the rayon thread pool, then drop the verifier immediately. + // This returns immediately, usually before the batch is completed. self.flush_blocking() } } diff --git a/zebra-consensus/src/primitives/halo2/tests.rs b/zebra-consensus/src/primitives/halo2/tests.rs index 163400df..6c454f94 100644 --- a/zebra-consensus/src/primitives/halo2/tests.rs +++ b/zebra-consensus/src/primitives/halo2/tests.rs @@ -151,6 +151,7 @@ async fn verify_generated_halo2_proofs() { Batch::new( Verifier::new(&VERIFYING_KEY), crate::primitives::MAX_BATCH_SIZE, + None, crate::primitives::MAX_BATCH_LATENCY, ), tower::service_fn( @@ -217,6 +218,7 @@ async fn correctly_err_on_invalid_halo2_proofs() { Batch::new( Verifier::new(&VERIFYING_KEY), crate::primitives::MAX_BATCH_SIZE, + None, crate::primitives::MAX_BATCH_LATENCY, ), tower::service_fn( diff --git a/zebra-consensus/src/primitives/redjubjub.rs b/zebra-consensus/src/primitives/redjubjub.rs index 60b59962..1f11e662 100644 --- a/zebra-consensus/src/primitives/redjubjub.rs +++ b/zebra-consensus/src/primitives/redjubjub.rs @@ -11,6 +11,7 @@ use futures::{future::BoxFuture, FutureExt}; use once_cell::sync::Lazy; use rand::thread_rng; +use rayon::prelude::*; use tokio::sync::watch; use tower::{util::ServiceFn, Service}; use tower_batch::{Batch, BatchControl}; @@ -49,6 +50,7 @@ pub static VERIFIER: Lazy< Batch::new( Verifier::default(), super::MAX_BATCH_SIZE, + None, super::MAX_BATCH_LATENCY, ), // We want to fallback to individual verification if batch verification fails, @@ -107,38 +109,48 @@ impl Verifier { } /// Flush the batch using a thread pool, and return the result via the channel. - /// This function blocks until the batch is completed on the thread pool. + /// This returns immediately, usually before the batch is completed. fn flush_blocking(&mut self) { let (batch, tx) = self.take(); - // # Correctness + // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. // - // Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. - // - // TODO: replace with the rayon thread pool - tokio::task::block_in_place(|| Self::verify(batch, tx)); + // We don't care about execution order here, because this method is only called on drop. + tokio::task::block_in_place(|| rayon::spawn_fifo(|| Self::verify(batch, tx))); } /// Flush the batch using a thread pool, and return the result via the channel. /// This function returns a future that becomes ready when the batch is completed. fn flush_spawning(batch: BatchVerifier, tx: Sender) -> impl Future { - // # Correctness - // - // Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. - // - // TODO: spawn on the rayon thread pool inside spawn_blocking - tokio::task::spawn_blocking(|| Self::verify(batch, tx)) - .map(|join_result| join_result.expect("panic in redjubjub batch verifier")) + // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. + tokio::task::spawn_blocking(|| { + // TODO: + // - spawn batches so rayon executes them in FIFO order + // possible implementation: return a closure in a Future, + // then run it using scope_fifo() in the worker task, + // limiting the number of concurrent batches to the number of rayon threads + rayon::scope_fifo(|s| s.spawn_fifo(|_s| Self::verify(batch, tx))) + }) + .map(|join_result| join_result.expect("panic in redjubjub batch verifier")) } /// Verify a single item using a thread pool, and return the result. /// This function returns a future that becomes ready when the item is completed. fn verify_single_spawning(item: Item) -> impl Future { // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. - // - // TODO: spawn on the rayon thread pool inside spawn_blocking - tokio::task::spawn_blocking(|| item.verify_single()) - .map(|join_result| join_result.expect("panic in redjubjub fallback verifier")) + tokio::task::spawn_blocking(|| { + // Rayon doesn't have a spawn function that returns a value, + // so we use a parallel iterator instead. + // + // TODO: + // - when a batch fails, spawn all its individual items into rayon using Vec::par_iter() + // - spawn fallback individual verifications so rayon executes them in FIFO order, + // if possible + rayon::iter::once(item) + .map(|item| item.verify_single()) + .collect() + }) + .map(|join_result| join_result.expect("panic in redjubjub fallback verifier")) } } @@ -194,9 +206,7 @@ impl Service> for Verifier { impl Drop for Verifier { fn drop(&mut self) { // We need to flush the current batch in case there are still any pending futures. - // This blocks the current thread and any futures running on it, until the batch is complete. - // - // TODO: move the batch onto the rayon thread pool, then drop the verifier immediately. + // This returns immediately, usually before the batch is completed. self.flush_blocking(); } } diff --git a/zebra-consensus/src/primitives/redjubjub/tests.rs b/zebra-consensus/src/primitives/redjubjub/tests.rs index 0314c458..8c29e318 100644 --- a/zebra-consensus/src/primitives/redjubjub/tests.rs +++ b/zebra-consensus/src/primitives/redjubjub/tests.rs @@ -56,7 +56,7 @@ async fn batch_flushes_on_max_items() -> Result<()> { // Use a very long max_latency and a short timeout to check that // flushing is happening based on hitting max_items. - let verifier = Batch::new(Verifier::default(), 10, Duration::from_secs(1000)); + let verifier = Batch::new(Verifier::default(), 10, 5, Duration::from_secs(1000)); timeout(Duration::from_secs(5), sign_and_verify(verifier, 100)) .await? .map_err(|e| eyre!(e))?; @@ -75,7 +75,7 @@ async fn batch_flushes_on_max_latency() -> Result<()> { // Use a very high max_items and a short timeout to check that // flushing is happening based on hitting max_latency. - let verifier = Batch::new(Verifier::default(), 100, Duration::from_millis(500)); + let verifier = Batch::new(Verifier::default(), 100, 10, Duration::from_millis(500)); timeout(Duration::from_secs(5), sign_and_verify(verifier, 10)) .await? .map_err(|e| eyre!(e))?; diff --git a/zebra-consensus/src/primitives/redpallas.rs b/zebra-consensus/src/primitives/redpallas.rs index fbf72881..f13d1c05 100644 --- a/zebra-consensus/src/primitives/redpallas.rs +++ b/zebra-consensus/src/primitives/redpallas.rs @@ -10,6 +10,8 @@ use std::{ use futures::{future::BoxFuture, FutureExt}; use once_cell::sync::Lazy; use rand::thread_rng; + +use rayon::prelude::*; use tokio::sync::watch; use tower::{util::ServiceFn, Service}; use tower_batch::{Batch, BatchControl}; @@ -48,6 +50,7 @@ pub static VERIFIER: Lazy< Batch::new( Verifier::default(), super::MAX_BATCH_SIZE, + None, super::MAX_BATCH_LATENCY, ), // We want to fallback to individual verification if batch verification fails, @@ -106,38 +109,48 @@ impl Verifier { } /// Flush the batch using a thread pool, and return the result via the channel. - /// This function blocks until the batch is completed on the thread pool. + /// This returns immediately, usually before the batch is completed. fn flush_blocking(&mut self) { let (batch, tx) = self.take(); - // # Correctness + // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. // - // Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. - // - // TODO: replace with the rayon thread pool - tokio::task::block_in_place(|| Self::verify(batch, tx)); + // We don't care about execution order here, because this method is only called on drop. + tokio::task::block_in_place(|| rayon::spawn_fifo(|| Self::verify(batch, tx))); } /// Flush the batch using a thread pool, and return the result via the channel. /// This function returns a future that becomes ready when the batch is completed. fn flush_spawning(batch: BatchVerifier, tx: Sender) -> impl Future { - // # Correctness - // - // Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. - // - // TODO: spawn on the rayon thread pool inside spawn_blocking - tokio::task::spawn_blocking(|| Self::verify(batch, tx)) - .map(|join_result| join_result.expect("panic in redpallas batch verifier")) + // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. + tokio::task::spawn_blocking(|| { + // TODO: + // - spawn batches so rayon executes them in FIFO order + // possible implementation: return a closure in a Future, + // then run it using scope_fifo() in the worker task, + // limiting the number of concurrent batches to the number of rayon threads + rayon::scope_fifo(|s| s.spawn_fifo(|_s| Self::verify(batch, tx))) + }) + .map(|join_result| join_result.expect("panic in ed25519 batch verifier")) } /// Verify a single item using a thread pool, and return the result. /// This function returns a future that becomes ready when the item is completed. fn verify_single_spawning(item: Item) -> impl Future { // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. - // - // TODO: spawn on the rayon thread pool inside spawn_blocking - tokio::task::spawn_blocking(|| item.verify_single()) - .map(|join_result| join_result.expect("panic in redpallas fallback verifier")) + tokio::task::spawn_blocking(|| { + // Rayon doesn't have a spawn function that returns a value, + // so we use a parallel iterator instead. + // + // TODO: + // - when a batch fails, spawn all its individual items into rayon using Vec::par_iter() + // - spawn fallback individual verifications so rayon executes them in FIFO order, + // if possible + rayon::iter::once(item) + .map(|item| item.verify_single()) + .collect() + }) + .map(|join_result| join_result.expect("panic in redpallas fallback verifier")) } } @@ -192,9 +205,7 @@ impl Service> for Verifier { impl Drop for Verifier { fn drop(&mut self) { // We need to flush the current batch in case there are still any pending futures. - // This blocks the current thread and any futures running on it, until the batch is complete. - // - // TODO: move the batch onto the rayon thread pool, then drop the verifier immediately. + // This returns immediately, usually before the batch is completed. self.flush_blocking(); } } diff --git a/zebra-consensus/src/primitives/redpallas/tests.rs b/zebra-consensus/src/primitives/redpallas/tests.rs index 79e9553a..beacb3ff 100644 --- a/zebra-consensus/src/primitives/redpallas/tests.rs +++ b/zebra-consensus/src/primitives/redpallas/tests.rs @@ -51,7 +51,7 @@ async fn batch_flushes_on_max_items() -> Result<()> { // Use a very long max_latency and a short timeout to check that // flushing is happening based on hitting max_items. - let verifier = Batch::new(Verifier::default(), 10, Duration::from_secs(1000)); + let verifier = Batch::new(Verifier::default(), 10, 5, Duration::from_secs(1000)); timeout(Duration::from_secs(5), sign_and_verify(verifier, 100)) .await? .map_err(|e| eyre!(e))?; @@ -65,7 +65,7 @@ async fn batch_flushes_on_max_latency() -> Result<()> { // Use a very high max_items and a short timeout to check that // flushing is happening based on hitting max_latency. - let verifier = Batch::new(Verifier::default(), 100, Duration::from_millis(500)); + let verifier = Batch::new(Verifier::default(), 100, 10, Duration::from_millis(500)); timeout(Duration::from_secs(5), sign_and_verify(verifier, 10)) .await? .map_err(|e| eyre!(e))?; diff --git a/zebrad/Cargo.toml b/zebrad/Cargo.toml index cb174143..c3d4dd93 100644 --- a/zebrad/Cargo.toml +++ b/zebrad/Cargo.toml @@ -86,6 +86,7 @@ serde = { version = "1.0.137", features = ["serde_derive"] } toml = "0.5.9" futures = "0.3.21" +rayon = "1.5.3" tokio = { version = "1.19.2", features = ["time", "rt-multi-thread", "macros", "tracing", "signal"] } tower = { version = "0.4.13", features = ["hedge", "limit"] } pin-project = "1.0.10" diff --git a/zebrad/src/application.rs b/zebrad/src/application.rs index 46891e09..a3e78844 100644 --- a/zebrad/src/application.rs +++ b/zebrad/src/application.rs @@ -343,6 +343,19 @@ impl Application for ZebradApp { } })); + // Apply the configured number of threads to the thread pool. + // + // TODO: + // - set rayon panic handler to a function that takes `Box`, + // which forwards to sentry. If possible, use eyre's panic report for formatting. + // - do we also need to call this code in `zebra_consensus::init()`, + // when that crate is being used by itself? + rayon::ThreadPoolBuilder::new() + .num_threads(config.sync.parallel_cpu_threads) + .thread_name(|thread_index| format!("rayon {}", thread_index)) + .build_global() + .expect("unable to initialize rayon thread pool"); + self.config = Some(config); let cfg_ref = self @@ -389,6 +402,11 @@ impl Application for ZebradApp { // leak the global span, to make sure it stays active std::mem::forget(global_guard); + tracing::info!( + num_threads = rayon::current_num_threads(), + "initialized rayon thread pool for CPU-bound tasks", + ); + // Launch network and async endpoints only for long-running commands. if is_server { components.push(Box::new(TokioComponent::new()?)); diff --git a/zebrad/src/components/tokio.rs b/zebrad/src/components/tokio.rs index d942e288..9c1f481c 100644 --- a/zebrad/src/components/tokio.rs +++ b/zebrad/src/components/tokio.rs @@ -1,11 +1,20 @@ //! A component owning the Tokio runtime. +//! +//! The tokio runtime is used for: +//! - non-blocking async tasks, via [`Future`]s and +//! - blocking network and file tasks, via [`spawn_blocking`](tokio::task::spawn_blocking). +//! +//! The rayon thread pool is used for: +//! - long-running CPU-bound tasks like cryptography, via [`rayon::spawn_fifo`]. + +use std::{future::Future, time::Duration}; -use crate::prelude::*; use abscissa_core::{Application, Component, FrameworkError, Shutdown}; use color_eyre::Report; -use std::{future::Future, time::Duration}; use tokio::runtime::Runtime; +use crate::prelude::*; + /// When Zebra is shutting down, wait this long for tokio tasks to finish. const TOKIO_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(20); diff --git a/zebrad/src/config.rs b/zebrad/src/config.rs index 47d50735..5b42a0c8 100644 --- a/zebrad/src/config.rs +++ b/zebrad/src/config.rs @@ -209,6 +209,12 @@ pub struct SyncSection { /// This is set to a low value by default, to avoid verification timeouts on large blocks. /// Increasing this value may improve performance on machines with many cores. pub full_verify_concurrency_limit: usize, + + /// The number of threads used to verify signatures, proofs, and other CPU-intensive code. + /// + /// Set to `0` by default, which uses one thread per available CPU core. + /// For details, see [the rayon documentation](https://docs.rs/rayon/latest/rayon/struct.ThreadPoolBuilder.html#method.num_threads). + pub parallel_cpu_threads: usize, } impl Default for SyncSection { @@ -223,11 +229,20 @@ impl Default for SyncSection { // This default is deliberately very low, so Zebra can verify a few large blocks in under 60 seconds, // even on machines with only a few cores. // - // This lets users see the committed block height changing in every progress log. + // This lets users see the committed block height changing in every progress log, + // and avoids hangs due to out-of-order verifications flooding the CPUs. // - // TODO: when we implement orchard proof batching, try increasing to 20 or more - // limit full verification concurrency based on block transaction counts? - full_verify_concurrency_limit: 5, + // TODO: + // - limit full verification concurrency based on block transaction counts? + // - move more disk work to blocking tokio threads, + // and CPU work to the rayon thread pool inside blocking tokio threads + full_verify_concurrency_limit: 20, + + // Use one thread per CPU. + // + // If this causes tokio executor starvation, move CPU-intensive tasks to rayon threads, + // or reserve a few cores for tokio threads, based on `num_cpus()`. + parallel_cpu_threads: 0, } } }