From 9b9cd5509756b07c6128b6e39c013ca9927d8ce4 Mon Sep 17 00:00:00 2001 From: teor Date: Mon, 18 Jul 2022 08:41:18 +1000 Subject: [PATCH] fix(batch): Improve batch verifier async, correctness, and performance (#4750) * Use a new channel for each batch * Prefer the batch timer if there are also new batch requests * Allow other tasks to run after each batch * Label each batch worker with the verifier's type * Rename Handle to ErrorHandle, and fix up some docs * Check batch worker tasks for panics and task termination * Use tokio's PollSemaphore instead of an outdated Semaphore impl * Run all verifier cryptography on a blocking thread Also use a new verifier channel for each batch. * Make flush and drop behaviour consistent for all verifiers * Partly fix an incorrect NU5 test * Switch batch tests to the multi-threaded runtime * Export all verifier primitive modules from zebra-consensus * Remove outdated test code in tower-batch * Use a watch channel to send batch verifier results * Use spawn_blocking for batch fallback verifiers * Spawn cryptography batches onto blocking tokio threads * Use smaller batches for halo2 * Minor tower-batch cleanups * Fix doc link in zebra-test * Drop previous permit before acquiring another to avoid a deadlock edge case --- Cargo.lock | 2 + tower-batch/Cargo.toml | 2 + tower-batch/src/lib.rs | 11 +- tower-batch/src/message.rs | 7 +- tower-batch/src/semaphore.rs | 128 ---------- tower-batch/src/service.rs | 194 ++++++++++---- tower-batch/src/worker.rs | 97 ++++--- tower-batch/tests/ed25519.rs | 97 ++----- tower-batch/tests/worker.rs | 21 +- zebra-consensus/Cargo.toml | 2 +- zebra-consensus/src/lib.rs | 2 +- zebra-consensus/src/primitives.rs | 7 - zebra-consensus/src/primitives/ed25519.rs | 155 +++++++++--- .../src/primitives/ed25519/tests.rs | 4 +- zebra-consensus/src/primitives/groth16.rs | 239 +++++++++++++----- .../src/primitives/groth16/tests.rs | 17 +- zebra-consensus/src/primitives/halo2.rs | 219 ++++++++++++---- zebra-consensus/src/primitives/halo2/tests.rs | 6 +- zebra-consensus/src/primitives/redjubjub.rs | 156 +++++++++--- .../src/primitives/redjubjub/tests.rs | 4 +- zebra-consensus/src/primitives/redpallas.rs | 155 +++++++++--- .../src/primitives/redpallas/tests.rs | 4 +- zebra-consensus/src/transaction.rs | 3 +- zebra-consensus/src/transaction/tests.rs | 131 +++++----- zebra-consensus/src/transaction/tests/prop.rs | 4 +- zebra-test/src/lib.rs | 17 +- 26 files changed, 1035 insertions(+), 649 deletions(-) delete mode 100644 tower-batch/src/semaphore.rs diff --git a/Cargo.lock b/Cargo.lock index 0c389ee1..8cf1a2b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5449,11 +5449,13 @@ dependencies = [ "rand 0.8.5", "tokio", "tokio-test", + "tokio-util 0.7.3", "tower", "tower-fallback", "tower-test", "tracing", "tracing-futures", + "zebra-consensus", "zebra-test", ] diff --git a/tower-batch/Cargo.toml b/tower-batch/Cargo.toml index 504e8d87..9b962cc5 100644 --- a/tower-batch/Cargo.toml +++ b/tower-batch/Cargo.toml @@ -10,6 +10,7 @@ futures = "0.3.21" futures-core = "0.3.21" pin-project = "1.0.10" tokio = { version = "1.19.2", features = ["time", "sync", "tracing", "macros"] } +tokio-util = "0.7.3" tower = { version = "0.4.13", features = ["util", "buffer"] } tracing = "0.1.31" tracing-futures = "0.2.5" @@ -25,4 +26,5 @@ tower-fallback = { path = "../tower-fallback/" } tower-test = "0.4.0" tracing = "0.1.31" +zebra-consensus = { path = "../zebra-consensus/" } zebra-test = { path = "../zebra-test/" } diff --git a/tower-batch/src/lib.rs b/tower-batch/src/lib.rs index 2cf9f770..855b1a96 100644 --- a/tower-batch/src/lib.rs +++ b/tower-batch/src/lib.rs @@ -89,22 +89,23 @@ pub mod error; pub mod future; mod layer; mod message; -mod semaphore; mod service; mod worker; type BoxError = Box; /// Signaling mechanism for batchable services that allows explicit flushing. -pub enum BatchControl { +/// +/// This request type is a generic wrapper for the inner `Req` type. +pub enum BatchControl { /// A new batch item. - Item(R), + Item(Req), /// The current batch should be flushed. Flush, } -impl From for BatchControl { - fn from(req: R) -> BatchControl { +impl From for BatchControl { + fn from(req: Req) -> BatchControl { BatchControl::Item(req) } } diff --git a/tower-batch/src/message.rs b/tower-batch/src/message.rs index 287076f2..05415e2b 100644 --- a/tower-batch/src/message.rs +++ b/tower-batch/src/message.rs @@ -1,5 +1,8 @@ +//! Batch message types. + +use tokio::sync::{oneshot, OwnedSemaphorePermit}; + use super::error::ServiceError; -use tokio::sync::oneshot; /// Message sent to the batch worker #[derive(Debug)] @@ -7,7 +10,7 @@ pub(crate) struct Message { pub(crate) request: Request, pub(crate) tx: Tx, pub(crate) span: tracing::Span, - pub(super) _permit: crate::semaphore::Permit, + pub(super) _permit: OwnedSemaphorePermit, } /// Response sender diff --git a/tower-batch/src/semaphore.rs b/tower-batch/src/semaphore.rs deleted file mode 100644 index f09e31bc..00000000 --- a/tower-batch/src/semaphore.rs +++ /dev/null @@ -1,128 +0,0 @@ -// Copied from tower/src/semaphore.rs, commit: -// d4d1c67 hedge: use auto-resizing histograms (#484) -// -// When we upgrade to tower 0.4, we can use tokio's PollSemaphore, like tower's: -// ccfaffc buffer, limit: use `tokio-util`'s `PollSemaphore` (#556) - -// Ignore lints on this copied code -#![allow(dead_code)] - -pub(crate) use self::sync::OwnedSemaphorePermit as Permit; -use futures::FutureExt; -use futures_core::ready; -use std::{ - fmt, - future::Future, - mem, - pin::Pin, - sync::{Arc, Weak}, - task::{Context, Poll}, -}; -use tokio::sync; - -#[derive(Debug)] -pub(crate) struct Semaphore { - semaphore: Arc, - state: State, -} - -#[derive(Debug)] -pub(crate) struct Close { - semaphore: Weak, - permits: usize, -} - -enum State { - Waiting(Pin + Send + Sync + 'static>>), - Ready(Permit), - Empty, -} - -impl Semaphore { - pub(crate) fn new_with_close(permits: usize) -> (Self, Close) { - let semaphore = Arc::new(sync::Semaphore::new(permits)); - let close = Close { - semaphore: Arc::downgrade(&semaphore), - permits, - }; - let semaphore = Self { - semaphore, - state: State::Empty, - }; - (semaphore, close) - } - - pub(crate) fn new(permits: usize) -> Self { - Self { - semaphore: Arc::new(sync::Semaphore::new(permits)), - state: State::Empty, - } - } - - pub(crate) fn poll_acquire(&mut self, cx: &mut Context<'_>) -> Poll<()> { - loop { - self.state = match self.state { - State::Ready(_) => return Poll::Ready(()), - State::Waiting(ref mut fut) => { - let permit = ready!(Pin::new(fut).poll(cx)); - State::Ready(permit) - } - State::Empty => State::Waiting(Box::pin( - self.semaphore - .clone() - .acquire_owned() - .map(|result| result.expect("internal semaphore is never closed")), - )), - }; - } - } - - pub(crate) fn take_permit(&mut self) -> Option { - if let State::Ready(permit) = mem::replace(&mut self.state, State::Empty) { - return Some(permit); - } - None - } -} - -impl Clone for Semaphore { - fn clone(&self) -> Self { - Self { - semaphore: self.semaphore.clone(), - state: State::Empty, - } - } -} - -impl fmt::Debug for State { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - State::Waiting(_) => f - .debug_tuple("State::Waiting") - .field(&format_args!("...")) - .finish(), - State::Ready(ref r) => f.debug_tuple("State::Ready").field(&r).finish(), - State::Empty => f.debug_tuple("State::Empty").finish(), - } - } -} - -impl Close { - /// Close the semaphore, waking any remaining tasks currently awaiting a permit. - pub(crate) fn close(self) { - // The maximum number of permits that a `tokio::sync::Semaphore` - // can hold is usize::MAX >> 3. If we attempt to add more than that - // number of permits, the semaphore will panic. - // XXX(eliza): another shift is kinda janky but if we add (usize::MAX - // > 3 - initial permits) the semaphore impl panics (I think due to a - // bug in tokio?). - // TODO(eliza): Tokio should _really_ just expose `Semaphore::close` - // publicly so we don't have to do this nonsense... - const MAX: usize = std::usize::MAX >> 4; - if let Some(semaphore) = self.semaphore.upgrade() { - // If we added `MAX - available_permits`, any tasks that are - // currently holding permits could drop them, overflowing the max. - semaphore.add_permits(MAX - self.permits); - } - } -} diff --git a/tower-batch/src/service.rs b/tower-batch/src/service.rs index 2cc7b374..89456d55 100644 --- a/tower-batch/src/service.rs +++ b/tower-batch/src/service.rs @@ -1,40 +1,65 @@ +//! Wrapper service for batching items to an underlying service. + +use std::{ + fmt, + future::Future, + pin::Pin, + sync::{Arc, Mutex}, + task::{Context, Poll}, +}; + +use futures_core::ready; +use tokio::{ + pin, + sync::{mpsc, oneshot, OwnedSemaphorePermit, Semaphore}, + task::JoinHandle, +}; +use tokio_util::sync::PollSemaphore; +use tower::Service; +use tracing::{info_span, Instrument}; + use super::{ future::ResponseFuture, message::Message, - worker::{Handle, Worker}, + worker::{ErrorHandle, Worker}, BatchControl, }; -use crate::semaphore::Semaphore; -use futures_core::ready; -use std::{ - fmt, - task::{Context, Poll}, -}; -use tokio::sync::{mpsc, oneshot}; -use tower::Service; - /// Allows batch processing of requests. /// -/// See the module documentation for more details. +/// See the crate documentation for more details. pub struct Batch where T: Service>, { - // Note: this actually _is_ bounded, but rather than using Tokio's unbounded - // channel, we use tokio's semaphore separately to implement the bound. + /// A custom-bounded channel for sending requests to the batch worker. + /// + /// Note: this actually _is_ bounded, but rather than using Tokio's unbounded + /// channel, we use tokio's semaphore separately to implement the bound. tx: mpsc::UnboundedSender>, - // When the buffer's channel is full, we want to exert backpressure in - // `poll_ready`, so that callers such as load balancers could choose to call - // another service rather than waiting for buffer capacity. - // - // Unfortunately, this can't be done easily using Tokio's bounded MPSC - // channel, because it doesn't expose a polling-based interface, only an - // `async fn ready`, which borrows the sender. Therefore, we implement our - // own bounded MPSC on top of the unbounded channel, using a semaphore to - // limit how many items are in the channel. - semaphore: Semaphore, - handle: Handle, + + /// A semaphore used to bound the channel. + /// + /// When the buffer's channel is full, we want to exert backpressure in + /// `poll_ready`, so that callers such as load balancers could choose to call + /// another service rather than waiting for buffer capacity. + /// + /// Unfortunately, this can't be done easily using Tokio's bounded MPSC + /// channel, because it doesn't wake pending tasks on close. Therefore, we implement our + /// own bounded MPSC on top of the unbounded channel, using a semaphore to + /// limit how many items are in the channel. + semaphore: PollSemaphore, + + /// A semaphore permit that allows this service to send one message on `tx`. + permit: Option, + + /// An error handle shared between all service clones for the same worker. + error_handle: ErrorHandle, + + /// A worker task handle shared between all service clones for the same worker. + /// + /// Only used when the worker is spawned on the tokio runtime. + worker_handle: Arc>>>, } impl fmt::Debug for Batch @@ -46,7 +71,8 @@ where f.debug_struct(name) .field("tx", &self.tx) .field("semaphore", &self.semaphore) - .field("handle", &self.handle) + .field("error_handle", &self.error_handle) + .field("worker_handle", &self.worker_handle) .finish() } } @@ -73,8 +99,28 @@ where T::Error: Send + Sync, Request: Send + 'static, { - let (batch, worker) = Self::pair(service, max_items, max_latency); - tokio::spawn(worker.run()); + let (mut batch, worker) = Self::pair(service, max_items, max_latency); + + let span = info_span!("batch worker", kind = std::any::type_name::()); + + #[cfg(tokio_unstable)] + let worker_handle = { + let batch_kind = std::any::type_name::(); + + // TODO: identify the unique part of the type name generically, + // or make it an argument to this method + let batch_kind = batch_kind.trim_start_matches("zebra_consensus::primitives::"); + let batch_kind = batch_kind.trim_end_matches("::Verifier"); + + tokio::task::Builder::new() + .name(&format!("{} batch", batch_kind)) + .spawn(worker.run().instrument(span)) + }; + #[cfg(not(tokio_unstable))] + let worker_handle = tokio::spawn(worker.run().instrument(span)); + + batch.register_worker(worker_handle); + batch } @@ -100,21 +146,36 @@ where // used their semaphore reservation in a `call` yet). // We choose a bound that allows callers to check readiness for every item in // a batch, then actually submit those items. - let bound = max_items; - let (semaphore, close) = Semaphore::new_with_close(bound); + let semaphore = Semaphore::new(max_items); + let semaphore = PollSemaphore::new(Arc::new(semaphore)); - let (handle, worker) = Worker::new(service, rx, max_items, max_latency, close); + let (error_handle, worker) = + Worker::new(service, rx, max_items, max_latency, semaphore.clone()); let batch = Batch { tx, semaphore, - handle, + permit: None, + error_handle, + worker_handle: Arc::new(Mutex::new(None)), }; (batch, worker) } + /// Ask the `Batch` to monitor the spawned worker task's [`JoinHandle`](tokio::task::JoinHandle). + /// + /// Only used when the task is spawned on the tokio runtime. + pub fn register_worker(&mut self, worker_handle: JoinHandle<()>) { + *self + .worker_handle + .lock() + .expect("previous task panicked while holding the worker handle mutex") = + Some(worker_handle); + } + + /// Returns the error from the batch worker's `error_handle`. fn get_worker_error(&self) -> crate::BoxError { - self.handle.get_error_on_closed() + self.error_handle.get_error_on_closed() } } @@ -128,26 +189,59 @@ where type Future = ResponseFuture; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - // First, check if the worker is still alive. - if self.tx.is_closed() { - // If the inner service has errored, then we error here. + // Check to see if the worker has returned or panicked. + // + // Correctness: Registers this task for wakeup when the worker finishes. + if let Some(worker_handle) = self + .worker_handle + .lock() + .expect("previous task panicked while holding the worker handle mutex") + .as_mut() + { + match Pin::new(worker_handle).poll(cx) { + Poll::Ready(Ok(())) => return Poll::Ready(Err(self.get_worker_error())), + Poll::Ready(task_panic) => { + task_panic.expect("unexpected panic in batch worker task") + } + Poll::Pending => {} + } + } + + // Check if the worker has set an error and closed its channels. + // + // Correctness: Registers this task for wakeup when the channel is closed. + let tx = self.tx.clone(); + let closed = tx.closed(); + pin!(closed); + if closed.poll(cx).is_ready() { return Poll::Ready(Err(self.get_worker_error())); } + // Poll to acquire a semaphore permit. + // // CORRECTNESS // - // Poll to acquire a semaphore permit. If we acquire a permit, then - // there's enough buffer capacity to send a new request. Otherwise, we - // need to wait for capacity. + // If we acquire a permit, then there's enough buffer capacity to send a new request. + // Otherwise, we need to wait for capacity. When that happens, `poll_acquire()` registers + // this task for wakeup when the next permit is available, or when the semaphore is closed. // - // In tokio 0.3.7, `acquire_owned` panics if its semaphore returns an - // error, so we don't need to handle errors until we upgrade to - // tokio 1.0. + // When `poll_ready()` is called multiple times, and channel capacity is 1, + // avoid deadlocks by dropping any previous permit before acquiring another one. + // This also stops tasks holding a permit after an error. // - // The current task must be scheduled for wakeup every time we return - // `Poll::Pending`. If it returns Pending, the semaphore also schedules - // the task for wakeup when the next permit is available. - ready!(self.semaphore.poll_acquire(cx)); + // Calling `poll_ready()` multiple times can make tasks lose their previous permit + // to another concurrent task. + self.permit = None; + + let permit = ready!(self.semaphore.poll_acquire(cx)); + if let Some(permit) = permit { + // Calling poll_ready() more than once will drop any previous permit, + // releasing its capacity back to the semaphore. + self.permit = Some(permit); + } else { + // The semaphore has been closed. + return Poll::Ready(Err(self.get_worker_error())); + } Poll::Ready(Ok(())) } @@ -155,9 +249,9 @@ where fn call(&mut self, request: Request) -> Self::Future { tracing::trace!("sending request to buffer worker"); let _permit = self - .semaphore - .take_permit() - .expect("buffer full; poll_ready must be called first"); + .permit + .take() + .expect("poll_ready must be called before a batch request"); // get the current Span so that we can explicitly propagate it to the worker // if we didn't do this, events on the worker related to this span wouldn't be counted @@ -187,8 +281,10 @@ where fn clone(&self) -> Self { Self { tx: self.tx.clone(), - handle: self.handle.clone(), semaphore: self.semaphore.clone(), + permit: None, + error_handle: self.error_handle.clone(), + worker_handle: self.worker_handle.clone(), } } } diff --git a/tower-batch/src/worker.rs b/tower-batch/src/worker.rs index 75c39215..7f55dee6 100644 --- a/tower-batch/src/worker.rs +++ b/tower-batch/src/worker.rs @@ -1,3 +1,5 @@ +//! Batch worker item handling and run loop implementation. + use std::{ pin::Pin, sync::{Arc, Mutex}, @@ -9,11 +11,10 @@ use tokio::{ sync::mpsc, time::{sleep, Sleep}, }; +use tokio_util::sync::PollSemaphore; use tower::{Service, ServiceExt}; use tracing_futures::Instrument; -use crate::semaphore; - use super::{ error::{Closed, ServiceError}, message::{self, Message}, @@ -34,18 +35,31 @@ where T: Service>, T::Error: Into, { + /// A semaphore-bounded channel for receiving requests from the batch wrapper service. rx: mpsc::UnboundedReceiver>, + + /// The wrapped service that processes batches. service: T, + + /// An error that's populated on permanent service failure. failed: Option, - handle: Handle, + + /// A shared error handle that's populated on permanent service failure. + error_handle: ErrorHandle, + + /// The maximum number of items allowed in a batch. max_items: usize, + + /// The maximum delay before processing a batch with fewer than `max_items`. max_latency: std::time::Duration, - close: Option, + + /// A cloned copy of the wrapper service's semaphore, used to close the semaphore. + close: PollSemaphore, } /// Get the error out #[derive(Debug)] -pub(crate) struct Handle { +pub(crate) struct ErrorHandle { inner: Arc>>, } @@ -59,23 +73,23 @@ where rx: mpsc::UnboundedReceiver>, max_items: usize, max_latency: std::time::Duration, - close: semaphore::Close, - ) -> (Handle, Worker) { - let handle = Handle { + close: PollSemaphore, + ) -> (ErrorHandle, Worker) { + let error_handle = ErrorHandle { inner: Arc::new(Mutex::new(None)), }; let worker = Worker { rx, service, - handle: handle.clone(), + error_handle: error_handle.clone(), failed: None, max_items, max_latency, - close: Some(close), + close, }; - (handle, worker) + (error_handle, worker) } async fn process_req(&mut self, req: Request, tx: message::Tx) { @@ -97,10 +111,8 @@ where .clone())); // Wake any tasks waiting on channel capacity. - if let Some(close) = self.close.take() { - tracing::debug!("waking pending tasks"); - close.close(); - } + tracing::debug!("waking pending tasks"); + self.close.close(); } } } @@ -115,6 +127,9 @@ where { self.failed(e.into()); } + + // Correctness: allow other tasks to run at the end of every batch. + tokio::task::yield_now().await; } pub async fn run(mut self) { @@ -142,8 +157,23 @@ where }, Some(sleep) => { // Wait on either a new message or the batch timer. - // If both are ready, select! chooses one of them at random. + // + // If both are ready, end the batch now, because the timer has elapsed. + // If the timer elapses, any pending messages are preserved: + // https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety tokio::select! { + biased; + + // The batch timer elapsed. + () = sleep => { + // TODO: use a batch-specific span to instrument this future. + self.flush_service().await; + + // Now we have an empty batch. + timer = None; + pending_items = 0; + } + maybe_msg = self.rx.recv() => match maybe_msg { Some(msg) => { let span = msg.span; @@ -154,8 +184,9 @@ where pending_items += 1; // Check whether we have too many pending items. if pending_items >= self.max_items { - // XXX(hdevalence): what span should instrument this? + // TODO: use a batch-specific span to instrument this future. self.flush_service().await; + // Now we have an empty batch. timer = None; pending_items = 0; @@ -168,13 +199,6 @@ where return; } }, - () = sleep => { - // The batch timer elapsed. - // XXX(hdevalence): what span should instrument this? - self.flush_service().await; - timer = None; - pending_items = 0; - } } } } @@ -187,7 +211,7 @@ where // an `Arc`, send that `Arc` to all pending requests, and store it so that subsequent // requests will also fail with the same error. - // Note that we need to handle the case where some handle is concurrently trying to send us + // Note that we need to handle the case where some error_handle is concurrently trying to send us // a request. We need to make sure that *either* the send of the request fails *or* it // receives an error on the `oneshot` it constructed. Specifically, we want to avoid the // case where we send errors to all outstanding requests, and *then* the caller sends its @@ -196,7 +220,7 @@ where // sending the error to all outstanding requests. let error = ServiceError::new(error); - let mut inner = self.handle.inner.lock().unwrap(); + let mut inner = self.error_handle.inner.lock().unwrap(); if inner.is_some() { // Future::poll was called after we've already errored out! @@ -216,20 +240,20 @@ where } } -impl Handle { +impl ErrorHandle { pub(crate) fn get_error_on_closed(&self) -> crate::BoxError { self.inner .lock() - .unwrap() + .expect("previous task panicked while holding the error handle mutex") .as_ref() .map(|svc_err| svc_err.clone().into()) .unwrap_or_else(|| Closed::new().into()) } } -impl Clone for Handle { - fn clone(&self) -> Handle { - Handle { +impl Clone for ErrorHandle { + fn clone(&self) -> ErrorHandle { + ErrorHandle { inner: self.inner.clone(), } } @@ -242,8 +266,13 @@ where T::Error: Into, { fn drop(mut self: Pin<&mut Self>) { - if let Some(close) = self.as_mut().close.take() { - close.close(); - } + // Fail pending tasks + self.failed(Closed::new().into()); + + // Clear queued requests + while self.rx.try_recv().is_ok() {} + + // Stop accepting reservations + self.close.close(); } } diff --git a/tower-batch/tests/ed25519.rs b/tower-batch/tests/ed25519.rs index 7bef3971..02dc16b7 100644 --- a/tower-batch/tests/ed25519.rs +++ b/tower-batch/tests/ed25519.rs @@ -1,88 +1,18 @@ -use std::{ - future::Future, - mem, - pin::Pin, - task::{Context, Poll}, - time::Duration, -}; +//! Test batching using ed25519 verification. + +use std::time::Duration; use color_eyre::{eyre::eyre, Report}; use ed25519_zebra::*; use futures::stream::{FuturesUnordered, StreamExt}; use rand::thread_rng; -use tokio::sync::broadcast::{channel, error::RecvError, Sender}; use tower::{Service, ServiceExt}; -use tower_batch::{Batch, BatchControl}; +use tower_batch::Batch; use tower_fallback::Fallback; // ============ service impl ============ -pub struct Ed25519Verifier { - batch: batch::Verifier, - // This uses a "broadcast" channel, which is an mpmc channel. Tokio also - // provides a spmc channel, "watch", but it only keeps the latest value, so - // using it would require thinking through whether it was possible for - // results from one batch to be mixed with another. - tx: Sender>, -} - -#[allow(clippy::new_without_default)] -impl Ed25519Verifier { - pub fn new() -> Self { - let batch = batch::Verifier::default(); - // XXX(hdevalence) what's a reasonable choice here? - let (tx, _) = channel(10); - Self { batch, tx } - } -} - -pub type Ed25519Item = batch::Item; - -impl Service> for Ed25519Verifier { - type Response = (); - type Error = Error; - type Future = Pin> + Send + 'static>>; - - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, req: BatchControl) -> Self::Future { - match req { - BatchControl::Item(item) => { - tracing::trace!("got item"); - self.batch.queue(item); - let mut rx = self.tx.subscribe(); - Box::pin(async move { - match rx.recv().await { - Ok(result) => result, - Err(RecvError::Lagged(_)) => { - tracing::warn!( - "missed channel updates for the correct signature batch!" - ); - Err(Error::InvalidSignature) - } - Err(RecvError::Closed) => panic!("verifier was dropped without flushing"), - } - }) - } - BatchControl::Flush => { - tracing::trace!("got flush command"); - let batch = mem::take(&mut self.batch); - let _ = self.tx.send(batch.verify(thread_rng())); - Box::pin(async { Ok(()) }) - } - } - } -} - -impl Drop for Ed25519Verifier { - fn drop(&mut self) { - // We need to flush the current batch in case there are still any pending futures. - let batch = mem::take(&mut self.batch); - let _ = self.tx.send(batch.verify(thread_rng())); - } -} +use zebra_consensus::ed25519::{Item as Ed25519Item, Verifier as Ed25519Verifier}; // =============== testing code ======== @@ -122,14 +52,16 @@ where Ok(()) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn batch_flushes_on_max_items() -> Result<(), Report> { use tokio::time::timeout; zebra_test::init(); // Use a very long max_latency and a short timeout to check that // flushing is happening based on hitting max_items. - let verifier = Batch::new(Ed25519Verifier::new(), 10, Duration::from_secs(1000)); + // + // Create our own verifier, so we don't shut down a shared verifier used by other tests. + let verifier = Batch::new(Ed25519Verifier::default(), 10, Duration::from_secs(1000)); timeout(Duration::from_secs(1), sign_and_verify(verifier, 100, None)) .await .map_err(|e| eyre!(e))? @@ -138,14 +70,16 @@ async fn batch_flushes_on_max_items() -> Result<(), Report> { Ok(()) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn batch_flushes_on_max_latency() -> Result<(), Report> { use tokio::time::timeout; zebra_test::init(); // Use a very high max_items and a short timeout to check that // flushing is happening based on hitting max_latency. - let verifier = Batch::new(Ed25519Verifier::new(), 100, Duration::from_millis(500)); + // + // Create our own verifier, so we don't shut down a shared verifier used by other tests. + let verifier = Batch::new(Ed25519Verifier::default(), 100, Duration::from_millis(500)); timeout(Duration::from_secs(1), sign_and_verify(verifier, 10, None)) .await .map_err(|e| eyre!(e))? @@ -154,12 +88,13 @@ async fn batch_flushes_on_max_latency() -> Result<(), Report> { Ok(()) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn fallback_verification() -> Result<(), Report> { zebra_test::init(); + // Create our own verifier, so we don't shut down a shared verifier used by other tests. let verifier = Fallback::new( - Batch::new(Ed25519Verifier::new(), 10, Duration::from_millis(100)), + Batch::new(Ed25519Verifier::default(), 10, Duration::from_millis(100)), tower::service_fn(|item: Ed25519Item| async move { item.verify_single() }), ); diff --git a/tower-batch/tests/worker.rs b/tower-batch/tests/worker.rs index 7bec206b..c2b0d214 100644 --- a/tower-batch/tests/worker.rs +++ b/tower-batch/tests/worker.rs @@ -1,4 +1,7 @@ +//! Fixed test cases for batch worker tasks. + use std::time::Duration; + use tokio_test::{assert_pending, assert_ready, assert_ready_err, task}; use tower::{Service, ServiceExt}; use tower_batch::{error, Batch}; @@ -37,29 +40,29 @@ async fn wakes_pending_waiters_on_close() { assert!( err.is::(), "response should fail with a Closed, got: {:?}", - err + err, ); assert!( ready1.is_woken(), - "dropping worker should wake ready task 1" + "dropping worker should wake ready task 1", ); let err = assert_ready_err!(ready1.poll()); assert!( - err.is::(), - "ready 1 should fail with a Closed, got: {:?}", - err + err.is::(), + "ready 1 should fail with a ServiceError {{ Closed }}, got: {:?}", + err, ); assert!( ready2.is_woken(), - "dropping worker should wake ready task 2" + "dropping worker should wake ready task 2", ); let err = assert_ready_err!(ready1.poll()); assert!( - err.is::(), - "ready 2 should fail with a Closed, got: {:?}", - err + err.is::(), + "ready 2 should fail with a ServiceError {{ Closed }}, got: {:?}", + err, ); } diff --git a/zebra-consensus/Cargo.toml b/zebra-consensus/Cargo.toml index 99ab5d3e..abda323c 100644 --- a/zebra-consensus/Cargo.toml +++ b/zebra-consensus/Cargo.toml @@ -29,7 +29,7 @@ futures = "0.3.21" futures-util = "0.3.21" metrics = "0.18.1" thiserror = "1.0.31" -tokio = { version = "1.19.2", features = ["time", "sync", "tracing"] } +tokio = { version = "1.19.2", features = ["time", "sync", "tracing", "rt-multi-thread"] } tower = { version = "0.4.13", features = ["timeout", "util", "buffer"] } tracing = "0.1.31" tracing-futures = "0.2.5" diff --git a/zebra-consensus/src/lib.rs b/zebra-consensus/src/lib.rs index 392971e1..df40c972 100644 --- a/zebra-consensus/src/lib.rs +++ b/zebra-consensus/src/lib.rs @@ -53,7 +53,7 @@ pub use checkpoint::{ }; pub use config::Config; pub use error::BlockError; -pub use primitives::groth16; +pub use primitives::{ed25519, groth16, halo2, redjubjub, redpallas}; /// A boxed [`std::error::Error`]. pub type BoxError = Box; diff --git a/zebra-consensus/src/primitives.rs b/zebra-consensus/src/primitives.rs index ee25e718..333ff115 100644 --- a/zebra-consensus/src/primitives.rs +++ b/zebra-consensus/src/primitives.rs @@ -11,10 +11,3 @@ const MAX_BATCH_SIZE: usize = 64; /// The maximum latency bound for any of the batch verifiers. const MAX_BATCH_LATENCY: std::time::Duration = std::time::Duration::from_millis(100); - -/// The size of the buffer in the broadcast channels used by batch verifiers. -/// -/// This bound limits the number of concurrent batches for each verifier. -/// If tasks delay checking for verifier results, and the bound is too small, -/// new batches will be rejected with `RecvError`s. -const BROADCAST_BUFFER_SIZE: usize = 512; diff --git a/zebra-consensus/src/primitives/ed25519.rs b/zebra-consensus/src/primitives/ed25519.rs index 82f7b988..0dbac374 100644 --- a/zebra-consensus/src/primitives/ed25519.rs +++ b/zebra-consensus/src/primitives/ed25519.rs @@ -1,8 +1,5 @@ //! Async Ed25519 batch verifier service -#[cfg(test)] -mod tests; - use std::{ future::Future, mem, @@ -10,16 +7,32 @@ use std::{ task::{Context, Poll}, }; -use futures::future::{ready, Ready}; +use futures::{future::BoxFuture, FutureExt}; use once_cell::sync::Lazy; use rand::thread_rng; -use tokio::sync::broadcast::{channel, error::RecvError, Sender}; +use tokio::sync::watch; use tower::{util::ServiceFn, Service}; use tower_batch::{Batch, BatchControl}; use tower_fallback::Fallback; use zebra_chain::primitives::ed25519::{batch, *}; +#[cfg(test)] +mod tests; + +/// The type of the batch verifier. +type BatchVerifier = batch::Verifier; + +/// The type of verification results. +type VerifyResult = Result<(), Error>; + +/// The type of the batch sender channel. +type Sender = watch::Sender>; + +/// The type of the batch item. +/// This is an `Ed25519Item`. +pub type Item = batch::Item; + /// Global batch verification context for Ed25519 signatures. /// /// This service transparently batches contemporaneous signature verifications, @@ -29,7 +42,7 @@ use zebra_chain::primitives::ed25519::{batch, *}; /// you should call `.clone()` on the global handle to create a local, mutable /// handle. pub static VERIFIER: Lazy< - Fallback, ServiceFn Ready>>>, + Fallback, ServiceFn BoxFuture<'static, VerifyResult>>>, > = Lazy::new(|| { Fallback::new( Batch::new( @@ -37,43 +50,101 @@ pub static VERIFIER: Lazy< super::MAX_BATCH_SIZE, super::MAX_BATCH_LATENCY, ), - // We want to fallback to individual verification if batch verification - // fails, so we need a Service to use. The obvious way to do this would - // be to write a closure that returns an async block. But because we - // have to specify the type of a static, we need to be able to write the - // type of the closure and its return value, and both closures and async - // blocks have eldritch types whose names cannot be written. So instead, - // we use a Ready to avoid an async block and cast the closure to a - // function (which is possible because it doesn't capture any state). - tower::service_fn((|item: Item| ready(item.verify_single())) as fn(_) -> _), + // We want to fallback to individual verification if batch verification fails, + // so we need a Service to use. + // + // Because we have to specify the type of a static, we need to be able to + // write the type of the closure and its return value. But both closures and + // async blocks have unnameable types. So instead we cast the closure to a function + // (which is possible because it doesn't capture any state), and use a BoxFuture + // to erase the result type. + // (We can't use BoxCloneService to erase the service type, because it is !Sync.) + tower::service_fn( + (|item: Item| Verifier::verify_single_spawning(item).boxed()) as fn(_) -> _, + ), ) }); /// Ed25519 signature verifier service pub struct Verifier { - batch: batch::Verifier, - // This uses a "broadcast" channel, which is an mpmc channel. Tokio also - // provides a spmc channel, "watch", but it only keeps the latest value, so - // using it would require thinking through whether it was possible for - // results from one batch to be mixed with another. - tx: Sender>, + /// A batch verifier for ed25519 signatures. + batch: BatchVerifier, + + /// A channel for broadcasting the result of a batch to the futures for each batch item. + /// + /// Each batch gets a newly created channel, so there is only ever one result sent per channel. + /// Tokio doesn't have a oneshot multi-consumer channel, so we use a watch channel. + tx: Sender, } impl Default for Verifier { fn default() -> Self { - let batch = batch::Verifier::default(); - let (tx, _) = channel(super::BROADCAST_BUFFER_SIZE); + let batch = BatchVerifier::default(); + let (tx, _) = watch::channel(None); Self { batch, tx } } } -/// Type alias to clarify that this `batch::Item` is a `Ed25519Item` -pub type Item = batch::Item; +impl Verifier { + /// Returns the batch verifier and channel sender from `self`, + /// replacing them with a new empty batch. + fn take(&mut self) -> (BatchVerifier, Sender) { + // Use a new verifier and channel for each batch. + let batch = mem::take(&mut self.batch); + + let (tx, _) = watch::channel(None); + let tx = mem::replace(&mut self.tx, tx); + + (batch, tx) + } + + /// Synchronously process the batch, and send the result using the channel sender. + /// This function blocks until the batch is completed. + fn verify(batch: BatchVerifier, tx: Sender) { + let result = batch.verify(thread_rng()); + let _ = tx.send(Some(result)); + } + + /// Flush the batch using a thread pool, and return the result via the channel. + /// This function blocks until the batch is completed on the thread pool. + fn flush_blocking(&mut self) { + let (batch, tx) = self.take(); + + // # Correctness + // + // Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. + // + // TODO: replace with the rayon thread pool + tokio::task::block_in_place(|| Self::verify(batch, tx)); + } + + /// Flush the batch using a thread pool, and return the result via the channel. + /// This function returns a future that becomes ready when the batch is completed. + fn flush_spawning(batch: BatchVerifier, tx: Sender) -> impl Future { + // # Correctness + // + // Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. + // + // TODO: spawn on the rayon thread pool inside spawn_blocking + tokio::task::spawn_blocking(|| Self::verify(batch, tx)) + .map(|join_result| join_result.expect("panic in ed25519 batch verifier")) + } + + /// Verify a single item using a thread pool, and return the result. + /// This function returns a future that becomes ready when the item is completed. + fn verify_single_spawning(item: Item) -> impl Future { + // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. + // + // TODO: spawn on the rayon thread pool inside spawn_blocking + tokio::task::spawn_blocking(|| item.verify_single()) + .map(|join_result| join_result.expect("panic in ed25519 fallback verifier")) + } +} impl Service> for Verifier { type Response = (); type Error = Error; - type Future = Pin> + Send + 'static>>; + type Future = Pin + Send + 'static>>; fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) @@ -85,9 +156,14 @@ impl Service> for Verifier { tracing::trace!("got ed25519 item"); self.batch.queue(item); let mut rx = self.tx.subscribe(); + Box::pin(async move { - match rx.recv().await { - Ok(result) => { + match rx.changed().await { + Ok(()) => { + // We use a new channel for each batch, + // so we always get the correct batch result here. + let result = rx.borrow().expect("completed batch must send a value"); + if result.is_ok() { tracing::trace!(?result, "validated ed25519 signature"); metrics::counter!("signatures.ed25519.validated", 1); @@ -97,24 +173,17 @@ impl Service> for Verifier { } result } - Err(RecvError::Lagged(_)) => { - tracing::error!( - "ed25519 batch verification receiver lagged and lost verification results" - ); - Err(Error::InvalidSignature) - } - Err(RecvError::Closed) => { - panic!("ed25519 verifier was dropped without flushing") - } + Err(_recv_error) => panic!("ed25519 verifier was dropped without flushing"), } }) } BatchControl::Flush => { tracing::trace!("got ed25519 flush command"); - let batch = mem::take(&mut self.batch); - let _ = self.tx.send(batch.verify(thread_rng())); - Box::pin(async { Ok(()) }) + + let (batch, tx) = self.take(); + + Box::pin(Self::flush_spawning(batch, tx).map(Ok)) } } } @@ -123,7 +192,9 @@ impl Service> for Verifier { impl Drop for Verifier { fn drop(&mut self) { // We need to flush the current batch in case there are still any pending futures. - let batch = mem::take(&mut self.batch); - let _ = self.tx.send(batch.verify(thread_rng())); + // This blocks the current thread and any futures running on it, until the batch is complete. + // + // TODO: move the batch onto the rayon thread pool, then drop the verifier immediately. + self.flush_blocking(); } } diff --git a/zebra-consensus/src/primitives/ed25519/tests.rs b/zebra-consensus/src/primitives/ed25519/tests.rs index a95fd3bc..ff68311a 100644 --- a/zebra-consensus/src/primitives/ed25519/tests.rs +++ b/zebra-consensus/src/primitives/ed25519/tests.rs @@ -33,7 +33,7 @@ where Ok(()) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn batch_flushes_on_max_items_test() -> Result<()> { batch_flushes_on_max_items().await } @@ -52,7 +52,7 @@ async fn batch_flushes_on_max_items() -> Result<()> { Ok(()) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn batch_flushes_on_max_latency_test() -> Result<()> { batch_flushes_on_max_latency().await } diff --git a/zebra-consensus/src/primitives/groth16.rs b/zebra-consensus/src/primitives/groth16.rs index 6a5d294b..3b358640 100644 --- a/zebra-consensus/src/primitives/groth16.rs +++ b/zebra-consensus/src/primitives/groth16.rs @@ -1,7 +1,6 @@ //! Async Groth16 batch verifier service use std::{ - convert::{TryFrom, TryInto}, fmt, future::Future, mem, @@ -11,14 +10,14 @@ use std::{ use bellman::{ gadgets::multipack, - groth16::{batch, VerifyingKey}, + groth16::{batch, PreparedVerifyingKey, VerifyingKey}, VerificationError, }; use bls12_381::Bls12; -use futures::future::{ready, Ready}; +use futures::{future::BoxFuture, FutureExt}; use once_cell::sync::Lazy; use rand::thread_rng; -use tokio::sync::broadcast::{channel, error::RecvError, Sender}; +use tokio::sync::watch; use tower::{util::ServiceFn, Service}; use tower_batch::{Batch, BatchControl}; @@ -43,6 +42,27 @@ pub use params::{Groth16Parameters, GROTH16_PARAMETERS}; use crate::error::TransactionError; +/// The type of the batch verifier. +type BatchVerifier = batch::Verifier; + +/// The type of verification results. +type VerifyResult = Result<(), VerificationError>; + +/// The type of the batch sender channel. +type Sender = watch::Sender>; + +/// The type of the batch item. +/// This is a Groth16 verification item. +pub type Item = batch::Item; + +/// The type of a raw verifying key. +/// This is the key used to verify batches. +pub type BatchVerifyingKey = VerifyingKey; + +/// The type of a prepared verifying key. +/// This is the key used to verify individual items. +pub type ItemVerifyingKey = PreparedVerifyingKey; + /// Global batch verification context for Groth16 proofs of Spend statements. /// /// This service transparently batches contemporaneous proof verifications, @@ -52,7 +72,7 @@ use crate::error::TransactionError; /// you should call `.clone()` on the global handle to create a local, mutable /// handle. pub static SPEND_VERIFIER: Lazy< - Fallback, ServiceFn Ready>>>, + Fallback, ServiceFn BoxFuture<'static, VerifyResult>>>, > = Lazy::new(|| { Fallback::new( Batch::new( @@ -60,17 +80,22 @@ pub static SPEND_VERIFIER: Lazy< super::MAX_BATCH_SIZE, super::MAX_BATCH_LATENCY, ), - // We want to fallback to individual verification if batch verification - // fails, so we need a Service to use. The obvious way to do this would - // be to write a closure that returns an async block. But because we - // have to specify the type of a static, we need to be able to write the - // type of the closure and its return value, and both closures and async - // blocks have eldritch types whose names cannot be written. So instead, - // we use a Ready to avoid an async block and cast the closure to a - // function (which is possible because it doesn't capture any state). + // We want to fallback to individual verification if batch verification fails, + // so we need a Service to use. + // + // Because we have to specify the type of a static, we need to be able to + // write the type of the closure and its return value. But both closures and + // async blocks have unnameable types. So instead we cast the closure to a function + // (which is possible because it doesn't capture any state), and use a BoxFuture + // to erase the result type. + // (We can't use BoxCloneService to erase the service type, because it is !Sync.) tower::service_fn( (|item: Item| { - ready(item.verify_single(&GROTH16_PARAMETERS.sapling.spend_prepared_verifying_key)) + Verifier::verify_single_spawning( + item, + &GROTH16_PARAMETERS.sapling.spend_prepared_verifying_key, + ) + .boxed() }) as fn(_) -> _, ), ) @@ -85,7 +110,7 @@ pub static SPEND_VERIFIER: Lazy< /// you should call `.clone()` on the global handle to create a local, mutable /// handle. pub static OUTPUT_VERIFIER: Lazy< - Fallback, ServiceFn Ready>>>, + Fallback, ServiceFn BoxFuture<'static, VerifyResult>>>, > = Lazy::new(|| { Fallback::new( Batch::new( @@ -94,16 +119,16 @@ pub static OUTPUT_VERIFIER: Lazy< super::MAX_BATCH_LATENCY, ), // We want to fallback to individual verification if batch verification - // fails, so we need a Service to use. The obvious way to do this would - // be to write a closure that returns an async block. But because we - // have to specify the type of a static, we need to be able to write the - // type of the closure and its return value, and both closures and async - // blocks have eldritch types whose names cannot be written. So instead, - // we use a Ready to avoid an async block and cast the closure to a - // function (which is possible because it doesn't capture any state). + // fails, so we need a Service to use. + // + // See the note on [`SPEND_VERIFIER`] for details. tower::service_fn( (|item: Item| { - ready(item.verify_single(&GROTH16_PARAMETERS.sapling.output_prepared_verifying_key)) + Verifier::verify_single_spawning( + item, + &GROTH16_PARAMETERS.sapling.output_prepared_verifying_key, + ) + .boxed() }) as fn(_) -> _, ), ) @@ -117,25 +142,27 @@ pub static OUTPUT_VERIFIER: Lazy< /// Note that making a `Service` call requires mutable access to the service, so /// you should call `.clone()` on the global handle to create a local, mutable /// handle. -pub static JOINSPLIT_VERIFIER: Lazy Ready>>> = - Lazy::new(|| { - // We need a Service to use. The obvious way to do this would - // be to write a closure that returns an async block. But because we - // have to specify the type of a static, we need to be able to write the - // type of the closure and its return value, and both closures and async - // blocks have eldritch types whose names cannot be written. So instead, - // we use a Ready to avoid an async block and cast the closure to a - // function (which is possible because it doesn't capture any state). - tower::service_fn( - (|item: Item| { - ready( - item.verify_single(&GROTH16_PARAMETERS.sprout.joinsplit_prepared_verifying_key) - .map_err(|e| TransactionError::Groth16(e.to_string())) - .map_err(tower_fallback::BoxedError::from), - ) - }) as fn(_) -> _, - ) - }); +pub static JOINSPLIT_VERIFIER: Lazy< + ServiceFn BoxFuture<'static, Result<(), BoxedError>>>, +> = Lazy::new(|| { + // We just need a Service to use: there is no batch verification for JoinSplits. + // + // See the note on [`SPEND_VERIFIER`] for details. + tower::service_fn( + (|item: Item| { + Verifier::verify_single_spawning( + item, + &GROTH16_PARAMETERS.sprout.joinsplit_prepared_verifying_key, + ) + .map(|result| { + result + .map_err(|e| TransactionError::Groth16(e.to_string())) + .map_err(tower_fallback::BoxedError::from) + }) + .boxed() + }) as fn(_) -> _, + ) +}); /// A Groth16 Description (JoinSplit, Spend, or Output) with a Groth16 proof /// and its inputs encoded as scalars. @@ -297,9 +324,6 @@ impl Description for (&JoinSplit, &ed25519::VerificationKeyBytes) } } -/// A Groth16 verification item, used as the request type of the service. -pub type Item = batch::Item; - /// A wrapper to allow a TryFrom blanket implementation of the [`Description`] /// trait for the [`Item`] struct. /// See for more details. @@ -334,20 +358,89 @@ where /// verifier. It handles batching incoming requests, driving batches to /// completion, and reporting results. pub struct Verifier { - batch: batch::Verifier, - // Making this 'static makes managing lifetimes much easier. - vk: &'static VerifyingKey, - /// Broadcast sender used to send the result of a batch verification to each - /// request source in the batch. - tx: Sender>, + /// A batch verifier for groth16 proofs. + batch: BatchVerifier, + + /// The proof verification key. + /// + /// Making this 'static makes managing lifetimes much easier. + vk: &'static BatchVerifyingKey, + + /// A channel for broadcasting the result of a batch to the futures for each batch item. + /// + /// Each batch gets a newly created channel, so there is only ever one result sent per channel. + /// Tokio doesn't have a oneshot multi-consumer channel, so we use a watch channel. + tx: Sender, } impl Verifier { - fn new(vk: &'static VerifyingKey) -> Self { - let batch = batch::Verifier::default(); - let (tx, _) = channel(super::BROADCAST_BUFFER_SIZE); + /// Create and return a new verifier using the verification key `vk`. + fn new(vk: &'static BatchVerifyingKey) -> Self { + let batch = BatchVerifier::default(); + let (tx, _) = watch::channel(None); Self { batch, vk, tx } } + + /// Returns the batch verifier and channel sender from `self`, + /// replacing them with a new empty batch. + fn take(&mut self) -> (BatchVerifier, &'static BatchVerifyingKey, Sender) { + // Use a new verifier and channel for each batch. + let batch = mem::take(&mut self.batch); + + let (tx, _) = watch::channel(None); + let tx = mem::replace(&mut self.tx, tx); + + (batch, self.vk, tx) + } + + /// Synchronously process the batch, and send the result using the channel sender. + /// This function blocks until the batch is completed. + fn verify(batch: BatchVerifier, vk: &'static BatchVerifyingKey, tx: Sender) { + let result = batch.verify(thread_rng(), vk); + let _ = tx.send(Some(result)); + } + + /// Flush the batch using a thread pool, and return the result via the channel. + /// This function blocks until the batch is completed on the thread pool. + fn flush_blocking(&mut self) { + let (batch, vk, tx) = self.take(); + + // # Correctness + // + // Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. + // + // TODO: replace with the rayon thread pool + tokio::task::block_in_place(|| Self::verify(batch, vk, tx)); + } + + /// Flush the batch using a thread pool, and return the result via the channel. + /// This function returns a future that becomes ready when the batch is completed. + fn flush_spawning( + batch: BatchVerifier, + vk: &'static BatchVerifyingKey, + tx: Sender, + ) -> impl Future { + // # Correctness + // + // Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. + // + // TODO: spawn on the rayon thread pool inside spawn_blocking + tokio::task::spawn_blocking(move || Self::verify(batch, vk, tx)) + .map(|join_result| join_result.expect("panic in groth16 batch verifier")) + } + + /// Verify a single item using a thread pool, and return the result. + /// This function returns a future that becomes ready when the item is completed. + fn verify_single_spawning( + item: Item, + pvk: &'static ItemVerifyingKey, + ) -> impl Future { + // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. + // + // TODO: spawn on the rayon thread pool inside spawn_blocking + tokio::task::spawn_blocking(move || item.verify_single(pvk)) + .map(|join_result| join_result.expect("panic in groth16 fallback verifier")) + } } impl fmt::Debug for Verifier { @@ -364,7 +457,7 @@ impl fmt::Debug for Verifier { impl Service> for Verifier { type Response = (); type Error = VerificationError; - type Future = Pin> + Send + 'static>>; + type Future = Pin + Send + 'static>>; fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) @@ -376,9 +469,18 @@ impl Service> for Verifier { tracing::trace!("got item"); self.batch.queue(item); let mut rx = self.tx.subscribe(); + Box::pin(async move { - match rx.recv().await { - Ok(result) => { + match rx.changed().await { + Ok(()) => { + // We use a new channel for each batch, + // so we always get the correct batch result here. + let result = rx + .borrow() + .as_ref() + .expect("completed batch must send a value") + .clone(); + if result.is_ok() { tracing::trace!(?result, "verified groth16 proof"); metrics::counter!("proofs.groth16.verified", 1); @@ -389,22 +491,17 @@ impl Service> for Verifier { result } - Err(RecvError::Lagged(_)) => { - tracing::error!( - "missed channel updates, BROADCAST_BUFFER_SIZE is too low!!" - ); - Err(VerificationError::InvalidProof) - } - Err(RecvError::Closed) => panic!("verifier was dropped without flushing"), + Err(_recv_error) => panic!("verifier was dropped without flushing"), } }) } BatchControl::Flush => { - tracing::trace!("got flush command"); - let batch = mem::take(&mut self.batch); - let _ = self.tx.send(batch.verify(thread_rng(), self.vk)); - Box::pin(async { Ok(()) }) + tracing::trace!("got groth16 flush command"); + + let (batch, vk, tx) = self.take(); + + Box::pin(Self::flush_spawning(batch, vk, tx).map(Ok)) } } } @@ -413,7 +510,9 @@ impl Service> for Verifier { impl Drop for Verifier { fn drop(&mut self) { // We need to flush the current batch in case there are still any pending futures. - let batch = mem::take(&mut self.batch); - let _ = self.tx.send(batch.verify(thread_rng(), self.vk)); + // This blocks the current thread and any futures running on it, until the batch is complete. + // + // TODO: move the batch onto the rayon thread pool, then drop the verifier immediately. + self.flush_blocking() } } diff --git a/zebra-consensus/src/primitives/groth16/tests.rs b/zebra-consensus/src/primitives/groth16/tests.rs index 665df39e..9b2b0cca 100644 --- a/zebra-consensus/src/primitives/groth16/tests.rs +++ b/zebra-consensus/src/primitives/groth16/tests.rs @@ -1,8 +1,9 @@ //! Tests for transaction verification -use std::convert::TryInto; - -use futures::stream::{FuturesUnordered, StreamExt}; +use futures::{ + future::ready, + stream::{FuturesUnordered, StreamExt}, +}; use hex::FromHex; use tower::ServiceExt; @@ -67,7 +68,7 @@ where Ok(()) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn verify_sapling_groth16() { // Use separate verifiers so shared batch tasks aren't killed when the test ends (#2390) let mut spend_verifier = Fallback::new( @@ -170,7 +171,7 @@ where Ok(()) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn correctly_err_on_invalid_output_proof() { // Use separate verifiers so shared batch tasks aren't killed when the test ends (#2390). // Also, since we expect these to fail, we don't want to slow down the communal verifiers. @@ -246,7 +247,7 @@ where Ok(()) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn verify_sprout_groth16() { let mut verifier = tower::service_fn( (|item: Item| { @@ -309,7 +310,7 @@ where Ok(()) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn verify_sprout_groth16_vector() { let mut verifier = tower::service_fn( (|item: Item| { @@ -431,7 +432,7 @@ where Ok(()) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn correctly_err_on_invalid_joinsplit_proof() { // Use separate verifiers so shared batch tasks aren't killed when the test ends (#2390). // Also, since we expect these to fail, we don't want to slow down the communal verifiers. diff --git a/zebra-consensus/src/primitives/halo2.rs b/zebra-consensus/src/primitives/halo2.rs index c1ea271d..2b121006 100644 --- a/zebra-consensus/src/primitives/halo2.rs +++ b/zebra-consensus/src/primitives/halo2.rs @@ -1,7 +1,6 @@ //! Async Halo2 batch verifier service use std::{ - convert::TryFrom, fmt, future::Future, mem, @@ -9,12 +8,12 @@ use std::{ task::{Context, Poll}, }; -use futures::future::{ready, Ready}; +use futures::{future::BoxFuture, FutureExt}; use once_cell::sync::Lazy; use orchard::circuit::VerifyingKey; use rand::{thread_rng, CryptoRng, RngCore}; use thiserror::Error; -use tokio::sync::broadcast::{channel, error::RecvError, Sender}; +use tokio::sync::watch; use tower::{util::ServiceFn, Service}; use tower_batch::{Batch, BatchControl}; use tower_fallback::Fallback; @@ -22,8 +21,56 @@ use tower_fallback::Fallback; #[cfg(test)] mod tests; +/// Adjusted batch size for halo2 batches. +/// +/// Unlike other batch verifiers, halo2 has aggregate proofs. +/// This means that there can be hundreds of actions verified by some proofs, +/// but just one action in others. +/// +/// To compensate for larger proofs, we decrease the batch size. +/// +/// We also decrease the batch size for these reasons: +/// - the default number of actions in `zcashd` is 2, +/// - halo2 proofs take longer to verify than Sapling proofs, and +/// - transactions with many actions generate very large proofs. +/// +/// # TODO +/// +/// Count each halo2 action as a batch item. +/// We could increase the batch item count by the action count each time a batch request +/// is received, which would reduce batch size, but keep the batch queue size larger. +const HALO2_MAX_BATCH_SIZE: usize = 2; + +/* TODO: implement batch verification + +/// The type of the batch verifier. +type BatchVerifier = plonk::BatchVerifier; + */ + +/// The type of verification results. +type VerifyResult = Result<(), Halo2Error>; + +/// The type of the batch sender channel. +type Sender = watch::Sender>; + +/* TODO: implement batch verification + +/// The type of a raw verifying key. +/// This is the key used to verify batches. +pub type BatchVerifyingKey = VerifyingKey; + */ +/// Temporary substitute type for fake batch verification. +/// +/// TODO: implement batch verification +pub type BatchVerifyingKey = ItemVerifyingKey; + +/// The type of a prepared verifying key. +/// This is the key used to verify individual items. +pub type ItemVerifyingKey = VerifyingKey; + lazy_static::lazy_static! { - pub static ref VERIFYING_KEY: VerifyingKey = VerifyingKey::build(); + /// The halo2 proof verifying key. + pub static ref VERIFYING_KEY: ItemVerifyingKey = ItemVerifyingKey::build(); } // === TEMPORARY BATCH HALO2 SUBSTITUTE === @@ -45,25 +92,28 @@ impl Item { /// /// This is useful (in combination with `Item::clone`) for implementing /// fallback logic when batch verification fails. - pub fn verify_single(&self, vk: &VerifyingKey) -> Result<(), halo2::plonk::Error> { + pub fn verify_single(&self, vk: &ItemVerifyingKey) -> Result<(), halo2::plonk::Error> { self.proof.verify(vk, &self.instances[..]) } } +/// A fake batch verifier that queues and verifies halo2 proofs. #[derive(Default)] pub struct BatchVerifier { queue: Vec, } impl BatchVerifier { + /// Queues an item for fake batch verification. pub fn queue(&mut self, item: Item) { self.queue.push(item); } + /// Verifies the current fake batch. pub fn verify( self, _rng: R, - vk: &VerifyingKey, + vk: &ItemVerifyingKey, ) -> Result<(), halo2::plonk::Error> { for item in self.queue { item.verify_single(vk)?; @@ -121,6 +171,7 @@ impl From<&zebra_chain::orchard::ShieldedData> for Item { // remove this and just wrap `halo2::plonk::Error` as an enum variant of // `crate::transaction::Error`, which does the trait derivation via `thiserror` #[derive(Clone, Debug, Error, Eq, PartialEq)] +#[allow(missing_docs)] pub enum Halo2Error { #[error("the constraint system is not satisfied")] ConstraintSystemFailure, @@ -145,26 +196,26 @@ impl From for Halo2Error { /// Note that making a `Service` call requires mutable access to the service, so /// you should call `.clone()` on the global handle to create a local, mutable /// handle. -#[allow(dead_code)] pub static VERIFIER: Lazy< - Fallback, ServiceFn Ready>>>, + Fallback, ServiceFn BoxFuture<'static, VerifyResult>>>, > = Lazy::new(|| { Fallback::new( Batch::new( Verifier::new(&VERIFYING_KEY), - super::MAX_BATCH_SIZE, + HALO2_MAX_BATCH_SIZE, super::MAX_BATCH_LATENCY, ), - // We want to fallback to individual verification if batch verification - // fails, so we need a Service to use. The obvious way to do this would - // be to write a closure that returns an async block. But because we - // have to specify the type of a static, we need to be able to write the - // type of the closure and its return value, and both closures and async - // blocks have eldritch types whose names cannot be written. So instead, - // we use a Ready to avoid an async block and cast the closure to a - // function (which is possible because it doesn't capture any state). + // We want to fallback to individual verification if batch verification fails, + // so we need a Service to use. + // + // Because we have to specify the type of a static, we need to be able to + // write the type of the closure and its return value. But both closures and + // async blocks have unnameable types. So instead we cast the closure to a function + // (which is possible because it doesn't capture any state), and use a BoxFuture + // to erase the result type. + // (We can't use BoxCloneService to erase the service type, because it is !Sync.) tower::service_fn( - (|item: Item| ready(item.verify_single(&VERIFYING_KEY).map_err(Halo2Error::from))) + (|item: Item| Verifier::verify_single_spawning(item, &VERIFYING_KEY).boxed()) as fn(_) -> _, ), ) @@ -176,22 +227,88 @@ pub static VERIFIER: Lazy< /// Halo2 verifier. It handles batching incoming requests, driving batches to /// completion, and reporting results. pub struct Verifier { - /// The sync Halo2 batch verifier. + /// The synchronous Halo2 batch verifier. batch: BatchVerifier, - // Making this 'static makes managing lifetimes much easier. - vk: &'static VerifyingKey, - /// Broadcast sender used to send the result of a batch verification to each - /// request source in the batch. - tx: Sender>, + + /// The halo2 proof verification key. + /// + /// Making this 'static makes managing lifetimes much easier. + vk: &'static ItemVerifyingKey, + + /// A channel for broadcasting the result of a batch to the futures for each batch item. + /// + /// Each batch gets a newly created channel, so there is only ever one result sent per channel. + /// Tokio doesn't have a oneshot multi-consumer channel, so we use a watch channel. + tx: Sender, } impl Verifier { - #[allow(dead_code)] - fn new(vk: &'static VerifyingKey) -> Self { + fn new(vk: &'static ItemVerifyingKey) -> Self { let batch = BatchVerifier::default(); - let (tx, _) = channel(super::BROADCAST_BUFFER_SIZE); + let (tx, _) = watch::channel(None); Self { batch, vk, tx } } + + /// Returns the batch verifier and channel sender from `self`, + /// replacing them with a new empty batch. + fn take(&mut self) -> (BatchVerifier, &'static BatchVerifyingKey, Sender) { + // Use a new verifier and channel for each batch. + let batch = mem::take(&mut self.batch); + + let (tx, _) = watch::channel(None); + let tx = mem::replace(&mut self.tx, tx); + + (batch, self.vk, tx) + } + + /// Synchronously process the batch, and send the result using the channel sender. + /// This function blocks until the batch is completed. + fn verify(batch: BatchVerifier, vk: &'static BatchVerifyingKey, tx: Sender) { + let result = batch.verify(thread_rng(), vk).map_err(Halo2Error::from); + let _ = tx.send(Some(result)); + } + + /// Flush the batch using a thread pool, and return the result via the channel. + /// This function blocks until the batch is completed on the thread pool. + fn flush_blocking(&mut self) { + let (batch, vk, tx) = self.take(); + + // # Correctness + // + // Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. + // + // TODO: replace with the rayon thread pool + tokio::task::block_in_place(|| Self::verify(batch, vk, tx)); + } + + /// Flush the batch using a thread pool, and return the result via the channel. + /// This function returns a future that becomes ready when the batch is completed. + fn flush_spawning( + batch: BatchVerifier, + vk: &'static BatchVerifyingKey, + tx: Sender, + ) -> impl Future { + // # Correctness + // + // Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. + // + // TODO: spawn on the rayon thread pool inside spawn_blocking + tokio::task::spawn_blocking(move || Self::verify(batch, vk, tx)) + .map(|join_result| join_result.expect("panic in halo2 batch verifier")) + } + + /// Verify a single item using a thread pool, and return the result. + /// This function returns a future that becomes ready when the item is completed. + fn verify_single_spawning( + item: Item, + pvk: &'static ItemVerifyingKey, + ) -> impl Future { + // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. + // + // TODO: spawn on the rayon thread pool inside spawn_blocking + tokio::task::spawn_blocking(move || item.verify_single(pvk).map_err(Halo2Error::from)) + .map(|join_result| join_result.expect("panic in halo2 fallback verifier")) + } } impl fmt::Debug for Verifier { @@ -208,7 +325,7 @@ impl fmt::Debug for Verifier { impl Service> for Verifier { type Response = (); type Error = Halo2Error; - type Future = Pin> + Send + 'static>>; + type Future = Pin + Send + 'static>>; fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) @@ -221,8 +338,16 @@ impl Service> for Verifier { self.batch.queue(item); let mut rx = self.tx.subscribe(); Box::pin(async move { - match rx.recv().await { - Ok(result) => { + match rx.changed().await { + Ok(()) => { + // We use a new channel for each batch, + // so we always get the correct batch result here. + let result = rx + .borrow() + .as_ref() + .expect("completed batch must send a value") + .clone(); + if result.is_ok() { tracing::trace!(?result, "verified halo2 proof"); metrics::counter!("proofs.halo2.verified", 1); @@ -233,29 +358,17 @@ impl Service> for Verifier { result } - Err(RecvError::Lagged(_)) => { - tracing::error!( - "missed channel updates, BROADCAST_BUFFER_SIZE is too low!!" - ); - // This is the enum variant that - // orchard::circuit::Proof.verify() returns on - // evaluation failure. - Err(Halo2Error::ConstraintSystemFailure) - } - Err(RecvError::Closed) => panic!("verifier was dropped without flushing"), + Err(_recv_error) => panic!("verifier was dropped without flushing"), } }) } BatchControl::Flush => { - tracing::trace!("got flush command"); - let batch = mem::take(&mut self.batch); - let _ = self.tx.send( - batch - .verify(thread_rng(), self.vk) - .map_err(Halo2Error::from), - ); - Box::pin(async { Ok(()) }) + tracing::trace!("got halo2 flush command"); + + let (batch, vk, tx) = self.take(); + + Box::pin(Self::flush_spawning(batch, vk, tx).map(Ok)) } } } @@ -264,11 +377,9 @@ impl Service> for Verifier { impl Drop for Verifier { fn drop(&mut self) { // We need to flush the current batch in case there are still any pending futures. - let batch = mem::take(&mut self.batch); - let _ = self.tx.send( - batch - .verify(thread_rng(), self.vk) - .map_err(Halo2Error::from), - ); + // This blocks the current thread and any futures running on it, until the batch is complete. + // + // TODO: move the batch onto the rayon thread pool, then drop the verifier immediately. + self.flush_blocking() } } diff --git a/zebra-consensus/src/primitives/halo2/tests.rs b/zebra-consensus/src/primitives/halo2/tests.rs index 9bdd7e07..163400df 100644 --- a/zebra-consensus/src/primitives/halo2/tests.rs +++ b/zebra-consensus/src/primitives/halo2/tests.rs @@ -1,6 +1,6 @@ //! Tests for verifying simple Halo2 proofs with the async verifier -use std::convert::TryInto; +use std::future::ready; use futures::stream::{FuturesUnordered, StreamExt}; use tower::ServiceExt; @@ -130,7 +130,7 @@ where Ok(()) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn verify_generated_halo2_proofs() { zebra_test::init(); @@ -196,7 +196,7 @@ where Ok(()) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn correctly_err_on_invalid_halo2_proofs() { zebra_test::init(); diff --git a/zebra-consensus/src/primitives/redjubjub.rs b/zebra-consensus/src/primitives/redjubjub.rs index d965ecb8..60b59962 100644 --- a/zebra-consensus/src/primitives/redjubjub.rs +++ b/zebra-consensus/src/primitives/redjubjub.rs @@ -1,8 +1,5 @@ //! Async RedJubjub batch verifier service -#[cfg(test)] -mod tests; - use std::{ future::Future, mem, @@ -10,16 +7,33 @@ use std::{ task::{Context, Poll}, }; -use futures::future::{ready, Ready}; +use futures::{future::BoxFuture, FutureExt}; use once_cell::sync::Lazy; use rand::thread_rng; -use tokio::sync::broadcast::{channel, error::RecvError, Sender}; +use tokio::sync::watch; use tower::{util::ServiceFn, Service}; use tower_batch::{Batch, BatchControl}; use tower_fallback::Fallback; + use zebra_chain::primitives::redjubjub::{batch, *}; +#[cfg(test)] +mod tests; + +/// The type of the batch verifier. +type BatchVerifier = batch::Verifier; + +/// The type of verification results. +type VerifyResult = Result<(), Error>; + +/// The type of the batch sender channel. +type Sender = watch::Sender>; + +/// The type of the batch item. +/// This is a `RedJubjubItem`. +pub type Item = batch::Item; + /// Global batch verification context for RedJubjub signatures. /// /// This service transparently batches contemporaneous signature verifications, @@ -29,7 +43,7 @@ use zebra_chain::primitives::redjubjub::{batch, *}; /// you should call `.clone()` on the global handle to create a local, mutable /// handle. pub static VERIFIER: Lazy< - Fallback, ServiceFn Ready>>>, + Fallback, ServiceFn BoxFuture<'static, VerifyResult>>>, > = Lazy::new(|| { Fallback::new( Batch::new( @@ -37,43 +51,101 @@ pub static VERIFIER: Lazy< super::MAX_BATCH_SIZE, super::MAX_BATCH_LATENCY, ), - // We want to fallback to individual verification if batch verification - // fails, so we need a Service to use. The obvious way to do this would - // be to write a closure that returns an async block. But because we - // have to specify the type of a static, we need to be able to write the - // type of the closure and its return value, and both closures and async - // blocks have eldritch types whose names cannot be written. So instead, - // we use a Ready to avoid an async block and cast the closure to a - // function (which is possible because it doesn't capture any state). - tower::service_fn((|item: Item| ready(item.verify_single())) as fn(_) -> _), + // We want to fallback to individual verification if batch verification fails, + // so we need a Service to use. + // + // Because we have to specify the type of a static, we need to be able to + // write the type of the closure and its return value. But both closures and + // async blocks have unnameable types. So instead we cast the closure to a function + // (which is possible because it doesn't capture any state), and use a BoxFuture + // to erase the result type. + // (We can't use BoxCloneService to erase the service type, because it is !Sync.) + tower::service_fn( + (|item: Item| Verifier::verify_single_spawning(item).boxed()) as fn(_) -> _, + ), ) }); /// RedJubjub signature verifier service pub struct Verifier { - batch: batch::Verifier, - // This uses a "broadcast" channel, which is an mpmc channel. Tokio also - // provides a spmc channel, "watch", but it only keeps the latest value, so - // using it would require thinking through whether it was possible for - // results from one batch to be mixed with another. - tx: Sender>, + /// A batch verifier for RedJubjub signatures. + batch: BatchVerifier, + + /// A channel for broadcasting the result of a batch to the futures for each batch item. + /// + /// Each batch gets a newly created channel, so there is only ever one result sent per channel. + /// Tokio doesn't have a oneshot multi-consumer channel, so we use a watch channel. + tx: Sender, } impl Default for Verifier { fn default() -> Self { - let batch = batch::Verifier::default(); - let (tx, _) = channel(super::BROADCAST_BUFFER_SIZE); + let batch = BatchVerifier::default(); + let (tx, _) = watch::channel(None); Self { batch, tx } } } -/// Type alias to clarify that this batch::Item is a RedJubjubItem -pub type Item = batch::Item; +impl Verifier { + /// Returns the batch verifier and channel sender from `self`, + /// replacing them with a new empty batch. + fn take(&mut self) -> (BatchVerifier, Sender) { + // Use a new verifier and channel for each batch. + let batch = mem::take(&mut self.batch); + + let (tx, _) = watch::channel(None); + let tx = mem::replace(&mut self.tx, tx); + + (batch, tx) + } + + /// Synchronously process the batch, and send the result using the channel sender. + /// This function blocks until the batch is completed. + fn verify(batch: BatchVerifier, tx: Sender) { + let result = batch.verify(thread_rng()); + let _ = tx.send(Some(result)); + } + + /// Flush the batch using a thread pool, and return the result via the channel. + /// This function blocks until the batch is completed on the thread pool. + fn flush_blocking(&mut self) { + let (batch, tx) = self.take(); + + // # Correctness + // + // Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. + // + // TODO: replace with the rayon thread pool + tokio::task::block_in_place(|| Self::verify(batch, tx)); + } + + /// Flush the batch using a thread pool, and return the result via the channel. + /// This function returns a future that becomes ready when the batch is completed. + fn flush_spawning(batch: BatchVerifier, tx: Sender) -> impl Future { + // # Correctness + // + // Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. + // + // TODO: spawn on the rayon thread pool inside spawn_blocking + tokio::task::spawn_blocking(|| Self::verify(batch, tx)) + .map(|join_result| join_result.expect("panic in redjubjub batch verifier")) + } + + /// Verify a single item using a thread pool, and return the result. + /// This function returns a future that becomes ready when the item is completed. + fn verify_single_spawning(item: Item) -> impl Future { + // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. + // + // TODO: spawn on the rayon thread pool inside spawn_blocking + tokio::task::spawn_blocking(|| item.verify_single()) + .map(|join_result| join_result.expect("panic in redjubjub fallback verifier")) + } +} impl Service> for Verifier { type Response = (); type Error = Error; - type Future = Pin> + Send + 'static>>; + type Future = Pin + Send + 'static>>; fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) @@ -85,9 +157,14 @@ impl Service> for Verifier { tracing::trace!("got item"); self.batch.queue(item); let mut rx = self.tx.subscribe(); + Box::pin(async move { - match rx.recv().await { - Ok(result) => { + match rx.changed().await { + Ok(()) => { + // We use a new channel for each batch, + // so we always get the correct batch result here. + let result = rx.borrow().expect("completed batch must send a value"); + if result.is_ok() { tracing::trace!(?result, "validated redjubjub signature"); metrics::counter!("signatures.redjubjub.validated", 1); @@ -98,22 +175,17 @@ impl Service> for Verifier { result } - Err(RecvError::Lagged(_)) => { - tracing::error!( - "batch verification receiver lagged and lost verification results" - ); - Err(Error::InvalidSignature) - } - Err(RecvError::Closed) => panic!("verifier was dropped without flushing"), + Err(_recv_error) => panic!("verifier was dropped without flushing"), } }) } BatchControl::Flush => { - tracing::trace!("got flush command"); - let batch = mem::take(&mut self.batch); - let _ = self.tx.send(batch.verify(thread_rng())); - Box::pin(async { Ok(()) }) + tracing::trace!("got redjubjub flush command"); + + let (batch, tx) = self.take(); + + Box::pin(Self::flush_spawning(batch, tx).map(Ok)) } } } @@ -122,7 +194,9 @@ impl Service> for Verifier { impl Drop for Verifier { fn drop(&mut self) { // We need to flush the current batch in case there are still any pending futures. - let batch = mem::take(&mut self.batch); - let _ = self.tx.send(batch.verify(thread_rng())); + // This blocks the current thread and any futures running on it, until the batch is complete. + // + // TODO: move the batch onto the rayon thread pool, then drop the verifier immediately. + self.flush_blocking(); } } diff --git a/zebra-consensus/src/primitives/redjubjub/tests.rs b/zebra-consensus/src/primitives/redjubjub/tests.rs index 0f070009..0314c458 100644 --- a/zebra-consensus/src/primitives/redjubjub/tests.rs +++ b/zebra-consensus/src/primitives/redjubjub/tests.rs @@ -45,7 +45,7 @@ where Ok(()) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn batch_flushes_on_max_items_test() -> Result<()> { batch_flushes_on_max_items().await } @@ -64,7 +64,7 @@ async fn batch_flushes_on_max_items() -> Result<()> { Ok(()) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn batch_flushes_on_max_latency_test() -> Result<()> { batch_flushes_on_max_latency().await } diff --git a/zebra-consensus/src/primitives/redpallas.rs b/zebra-consensus/src/primitives/redpallas.rs index 5ee17cf4..fbf72881 100644 --- a/zebra-consensus/src/primitives/redpallas.rs +++ b/zebra-consensus/src/primitives/redpallas.rs @@ -1,8 +1,5 @@ //! Async RedPallas batch verifier service -#[cfg(test)] -mod tests; - use std::{ future::Future, mem, @@ -10,16 +7,32 @@ use std::{ task::{Context, Poll}, }; -use futures::future::{ready, Ready}; +use futures::{future::BoxFuture, FutureExt}; use once_cell::sync::Lazy; use rand::thread_rng; -use tokio::sync::broadcast::{channel, error::RecvError, Sender}; +use tokio::sync::watch; use tower::{util::ServiceFn, Service}; use tower_batch::{Batch, BatchControl}; use tower_fallback::Fallback; use zebra_chain::primitives::redpallas::{batch, *}; +#[cfg(test)] +mod tests; + +/// The type of the batch verifier. +type BatchVerifier = batch::Verifier; + +/// The type of verification results. +type VerifyResult = Result<(), Error>; + +/// The type of the batch sender channel. +type Sender = watch::Sender>; + +/// The type of the batch item. +/// This is a `RedPallasItem`. +pub type Item = batch::Item; + /// Global batch verification context for RedPallas signatures. /// /// This service transparently batches contemporaneous signature verifications, @@ -28,9 +41,8 @@ use zebra_chain::primitives::redpallas::{batch, *}; /// Note that making a `Service` call requires mutable access to the service, so /// you should call `.clone()` on the global handle to create a local, mutable /// handle. -#[allow(dead_code)] pub static VERIFIER: Lazy< - Fallback, ServiceFn Ready>>>, + Fallback, ServiceFn BoxFuture<'static, VerifyResult>>>, > = Lazy::new(|| { Fallback::new( Batch::new( @@ -38,43 +50,101 @@ pub static VERIFIER: Lazy< super::MAX_BATCH_SIZE, super::MAX_BATCH_LATENCY, ), - // We want to fallback to individual verification if batch verification - // fails, so we need a Service to use. The obvious way to do this would - // be to write a closure that returns an async block. But because we - // have to specify the type of a static, we need to be able to write the - // type of the closure and its return value, and both closures and async - // blocks have eldritch types whose names cannot be written. So instead, - // we use a Ready to avoid an async block and cast the closure to a - // function (which is possible because it doesn't capture any state). - tower::service_fn((|item: Item| ready(item.verify_single())) as fn(_) -> _), + // We want to fallback to individual verification if batch verification fails, + // so we need a Service to use. + // + // Because we have to specify the type of a static, we need to be able to + // write the type of the closure and its return value. But both closures and + // async blocks have unnameable types. So instead we cast the closure to a function + // (which is possible because it doesn't capture any state), and use a BoxFuture + // to erase the result type. + // (We can't use BoxCloneService to erase the service type, because it is !Sync.) + tower::service_fn( + (|item: Item| Verifier::verify_single_spawning(item).boxed()) as fn(_) -> _, + ), ) }); /// RedPallas signature verifier service pub struct Verifier { - batch: batch::Verifier, - // This uses a "broadcast" channel, which is an mpmc channel. Tokio also - // provides a spmc channel, "watch", but it only keeps the latest value, so - // using it would require thinking through whether it was possible for - // results from one batch to be mixed with another. - tx: Sender>, + /// A batch verifier for RedPallas signatures. + batch: BatchVerifier, + + /// A channel for broadcasting the result of a batch to the futures for each batch item. + /// + /// Each batch gets a newly created channel, so there is only ever one result sent per channel. + /// Tokio doesn't have a oneshot multi-consumer channel, so we use a watch channel. + tx: Sender, } impl Default for Verifier { fn default() -> Self { - let batch = batch::Verifier::default(); - let (tx, _) = channel(super::BROADCAST_BUFFER_SIZE); + let batch = BatchVerifier::default(); + let (tx, _) = watch::channel(None); Self { batch, tx } } } -/// Type alias to clarify that this batch::Item is a RedPallasItem -pub type Item = batch::Item; +impl Verifier { + /// Returns the batch verifier and channel sender from `self`, + /// replacing them with a new empty batch. + fn take(&mut self) -> (BatchVerifier, Sender) { + // Use a new verifier and channel for each batch. + let batch = mem::take(&mut self.batch); + + let (tx, _) = watch::channel(None); + let tx = mem::replace(&mut self.tx, tx); + + (batch, tx) + } + + /// Synchronously process the batch, and send the result using the channel sender. + /// This function blocks until the batch is completed. + fn verify(batch: BatchVerifier, tx: Sender) { + let result = batch.verify(thread_rng()); + let _ = tx.send(Some(result)); + } + + /// Flush the batch using a thread pool, and return the result via the channel. + /// This function blocks until the batch is completed on the thread pool. + fn flush_blocking(&mut self) { + let (batch, tx) = self.take(); + + // # Correctness + // + // Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. + // + // TODO: replace with the rayon thread pool + tokio::task::block_in_place(|| Self::verify(batch, tx)); + } + + /// Flush the batch using a thread pool, and return the result via the channel. + /// This function returns a future that becomes ready when the batch is completed. + fn flush_spawning(batch: BatchVerifier, tx: Sender) -> impl Future { + // # Correctness + // + // Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. + // + // TODO: spawn on the rayon thread pool inside spawn_blocking + tokio::task::spawn_blocking(|| Self::verify(batch, tx)) + .map(|join_result| join_result.expect("panic in redpallas batch verifier")) + } + + /// Verify a single item using a thread pool, and return the result. + /// This function returns a future that becomes ready when the item is completed. + fn verify_single_spawning(item: Item) -> impl Future { + // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. + // + // TODO: spawn on the rayon thread pool inside spawn_blocking + tokio::task::spawn_blocking(|| item.verify_single()) + .map(|join_result| join_result.expect("panic in redpallas fallback verifier")) + } +} impl Service> for Verifier { type Response = (); type Error = Error; - type Future = Pin> + Send + 'static>>; + type Future = Pin + Send + 'static>>; fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) @@ -87,8 +157,12 @@ impl Service> for Verifier { self.batch.queue(item); let mut rx = self.tx.subscribe(); Box::pin(async move { - match rx.recv().await { - Ok(result) => { + match rx.changed().await { + Ok(()) => { + // We use a new channel for each batch, + // so we always get the correct batch result here. + let result = rx.borrow().expect("completed batch must send a value"); + if result.is_ok() { tracing::trace!(?result, "validated redpallas signature"); metrics::counter!("signatures.redpallas.validated", 1); @@ -99,22 +173,17 @@ impl Service> for Verifier { result } - Err(RecvError::Lagged(_)) => { - tracing::error!( - "batch verification receiver lagged and lost verification results" - ); - Err(Error::InvalidSignature) - } - Err(RecvError::Closed) => panic!("verifier was dropped without flushing"), + Err(_recv_error) => panic!("verifier was dropped without flushing"), } }) } BatchControl::Flush => { - tracing::trace!("got flush command"); - let batch = mem::take(&mut self.batch); - let _ = self.tx.send(batch.verify(thread_rng())); - Box::pin(async { Ok(()) }) + tracing::trace!("got redpallas flush command"); + + let (batch, tx) = self.take(); + + Box::pin(Self::flush_spawning(batch, tx).map(Ok)) } } } @@ -123,7 +192,9 @@ impl Service> for Verifier { impl Drop for Verifier { fn drop(&mut self) { // We need to flush the current batch in case there are still any pending futures. - let batch = mem::take(&mut self.batch); - let _ = self.tx.send(batch.verify(thread_rng())); + // This blocks the current thread and any futures running on it, until the batch is complete. + // + // TODO: move the batch onto the rayon thread pool, then drop the verifier immediately. + self.flush_blocking(); } } diff --git a/zebra-consensus/src/primitives/redpallas/tests.rs b/zebra-consensus/src/primitives/redpallas/tests.rs index a5ed4e2a..79e9553a 100644 --- a/zebra-consensus/src/primitives/redpallas/tests.rs +++ b/zebra-consensus/src/primitives/redpallas/tests.rs @@ -45,7 +45,7 @@ where Ok(()) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn batch_flushes_on_max_items() -> Result<()> { use tokio::time::timeout; @@ -59,7 +59,7 @@ async fn batch_flushes_on_max_items() -> Result<()> { Ok(()) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn batch_flushes_on_max_latency() -> Result<()> { use tokio::time::timeout; diff --git a/zebra-consensus/src/transaction.rs b/zebra-consensus/src/transaction.rs index c9bf3030..ef95ae28 100644 --- a/zebra-consensus/src/transaction.rs +++ b/zebra-consensus/src/transaction.rs @@ -1,8 +1,7 @@ //! Asynchronous verification of transactions. -//! + use std::{ collections::HashMap, - convert::TryInto, future::Future, iter::FromIterator, pin::Pin, diff --git a/zebra-consensus/src/transaction/tests.rs b/zebra-consensus/src/transaction/tests.rs index db7c3eb8..29a51515 100644 --- a/zebra-consensus/src/transaction/tests.rs +++ b/zebra-consensus/src/transaction/tests.rs @@ -278,61 +278,64 @@ async fn v5_transaction_is_rejected_before_nu5_activation() { } } -#[tokio::test] -async fn v5_transaction_is_accepted_after_nu5_activation_mainnet() { - v5_transaction_is_accepted_after_nu5_activation_for_network(Network::Mainnet).await +#[test] +fn v5_transaction_is_accepted_after_nu5_activation_mainnet() { + v5_transaction_is_accepted_after_nu5_activation_for_network(Network::Mainnet) } -#[tokio::test] -async fn v5_transaction_is_accepted_after_nu5_activation_testnet() { - v5_transaction_is_accepted_after_nu5_activation_for_network(Network::Testnet).await +#[test] +fn v5_transaction_is_accepted_after_nu5_activation_testnet() { + v5_transaction_is_accepted_after_nu5_activation_for_network(Network::Testnet) } -async fn v5_transaction_is_accepted_after_nu5_activation_for_network(network: Network) { - let nu5 = NetworkUpgrade::Nu5; - let nu5_activation_height = nu5 - .activation_height(network) - .expect("NU5 activation height is specified"); +fn v5_transaction_is_accepted_after_nu5_activation_for_network(network: Network) { + zebra_test::init(); + zebra_test::MULTI_THREADED_RUNTIME.block_on(async { + let nu5 = NetworkUpgrade::Nu5; + let nu5_activation_height = nu5 + .activation_height(network) + .expect("NU5 activation height is specified"); - let blocks = match network { - Network::Mainnet => zebra_test::vectors::MAINNET_BLOCKS.iter(), - Network::Testnet => zebra_test::vectors::TESTNET_BLOCKS.iter(), - }; + let blocks = match network { + Network::Mainnet => zebra_test::vectors::MAINNET_BLOCKS.iter(), + Network::Testnet => zebra_test::vectors::TESTNET_BLOCKS.iter(), + }; - let state_service = service_fn(|_| async { unreachable!("Service should not be called") }); - let verifier = Verifier::new(network, state_service); + let state_service = service_fn(|_| async { unreachable!("Service should not be called") }); + let verifier = Verifier::new(network, state_service); - let mut transaction = fake_v5_transactions_for_network(network, blocks) - .rev() - .next() - .expect("At least one fake V5 transaction in the test vectors"); - if transaction - .expiry_height() - .expect("V5 must have expiry_height") - < nu5_activation_height - { - let expiry_height = transaction.expiry_height_mut(); - *expiry_height = nu5_activation_height; - } + let mut transaction = fake_v5_transactions_for_network(network, blocks) + .rev() + .next() + .expect("At least one fake V5 transaction in the test vectors"); + if transaction + .expiry_height() + .expect("V5 must have expiry_height") + < nu5_activation_height + { + let expiry_height = transaction.expiry_height_mut(); + *expiry_height = nu5_activation_height; + } - let expected_hash = transaction.unmined_id(); - let expiry_height = transaction - .expiry_height() - .expect("V5 must have expiry_height"); + let expected_hash = transaction.unmined_id(); + let expiry_height = transaction + .expiry_height() + .expect("V5 must have expiry_height"); - let result = verifier - .oneshot(Request::Block { - transaction: Arc::new(transaction), - known_utxos: Arc::new(HashMap::new()), - height: expiry_height, - time: chrono::MAX_DATETIME, - }) - .await; + let result = verifier + .oneshot(Request::Block { + transaction: Arc::new(transaction), + known_utxos: Arc::new(HashMap::new()), + height: expiry_height, + time: chrono::MAX_DATETIME, + }) + .await; - assert_eq!( - result.expect("unexpected error response").tx_id(), - expected_hash - ); + assert_eq!( + result.expect("unexpected error response").tx_id(), + expected_hash + ); + }) } /// Test if V4 transaction with transparent funds is accepted. @@ -767,7 +770,7 @@ async fn v4_transaction_with_conflicting_transparent_spend_is_rejected() { #[test] fn v4_transaction_with_conflicting_sprout_nullifier_inside_joinsplit_is_rejected() { zebra_test::init(); - zebra_test::RUNTIME.block_on(async { + zebra_test::MULTI_THREADED_RUNTIME.block_on(async { let network = Network::Mainnet; let network_upgrade = NetworkUpgrade::Canopy; @@ -832,7 +835,7 @@ fn v4_transaction_with_conflicting_sprout_nullifier_inside_joinsplit_is_rejected #[test] fn v4_transaction_with_conflicting_sprout_nullifier_across_joinsplits_is_rejected() { zebra_test::init(); - zebra_test::RUNTIME.block_on(async { + zebra_test::MULTI_THREADED_RUNTIME.block_on(async { let network = Network::Mainnet; let network_upgrade = NetworkUpgrade::Canopy; @@ -1357,13 +1360,12 @@ async fn v5_transaction_with_conflicting_transparent_spend_is_rejected() { } /// Test if signed V4 transaction with a dummy [`sprout::JoinSplit`] is accepted. -/// - Test if an unsigned V4 transaction with a dummy [`sprout::JoinSplit`] is rejected. /// /// This test verifies if the transaction verifier correctly accepts a signed transaction. #[test] fn v4_with_signed_sprout_transfer_is_accepted() { zebra_test::init(); - zebra_test::RUNTIME.block_on(async { + zebra_test::MULTI_THREADED_RUNTIME.block_on(async { let network = Network::Mainnet; let (height, transaction) = test_transactions(network) @@ -1396,7 +1398,7 @@ fn v4_with_signed_sprout_transfer_is_accepted() { result.expect("unexpected error response").tx_id(), expected_hash ); - }); + }) } /// Test if an V4 transaction with a modified [`sprout::JoinSplit`] is rejected. @@ -1406,7 +1408,7 @@ fn v4_with_signed_sprout_transfer_is_accepted() { #[test] fn v4_with_modified_joinsplit_is_rejected() { zebra_test::init(); - zebra_test::RUNTIME.block_on(async { + zebra_test::MULTI_THREADED_RUNTIME.block_on(async { v4_with_joinsplit_is_rejected_for_modification( JoinSplitModification::CorruptSignature, // TODO: Fix error downcast @@ -1417,17 +1419,19 @@ fn v4_with_modified_joinsplit_is_rejected() { ), ) .await; + v4_with_joinsplit_is_rejected_for_modification( JoinSplitModification::CorruptProof, TransactionError::Groth16("proof verification failed".to_string()), ) .await; + v4_with_joinsplit_is_rejected_for_modification( JoinSplitModification::ZeroProof, TransactionError::MalformedGroth16("invalid G1".to_string()), ) .await; - }); + }) } async fn v4_with_joinsplit_is_rejected_for_modification( @@ -1470,7 +1474,7 @@ async fn v4_with_joinsplit_is_rejected_for_modification( #[test] fn v4_with_sapling_spends() { zebra_test::init(); - zebra_test::RUNTIME.block_on(async { + zebra_test::MULTI_THREADED_RUNTIME.block_on(async { let network = Network::Mainnet; let (height, transaction) = test_transactions(network) @@ -1510,7 +1514,7 @@ fn v4_with_sapling_spends() { #[test] fn v4_with_duplicate_sapling_spends() { zebra_test::init(); - zebra_test::RUNTIME.block_on(async { + zebra_test::MULTI_THREADED_RUNTIME.block_on(async { let network = Network::Mainnet; let (height, mut transaction) = test_transactions(network) @@ -1555,7 +1559,7 @@ fn v4_with_duplicate_sapling_spends() { #[test] fn v4_with_sapling_outputs_and_no_spends() { zebra_test::init(); - zebra_test::RUNTIME.block_on(async { + zebra_test::MULTI_THREADED_RUNTIME.block_on(async { let network = Network::Mainnet; let (height, transaction) = test_transactions(network) @@ -1591,22 +1595,27 @@ fn v4_with_sapling_outputs_and_no_spends() { result.expect("unexpected error response").tx_id(), expected_hash ); - }); + }) } /// Test if a V5 transaction with Sapling spends is accepted by the verifier. #[test] -// TODO: Remove `should_panic` once V5 transaction verification is complete. +// TODO: add NU5 mainnet test vectors with Sapling spends, then remove should_panic #[should_panic] fn v5_with_sapling_spends() { zebra_test::init(); - zebra_test::RUNTIME.block_on(async { + zebra_test::MULTI_THREADED_RUNTIME.block_on(async { let network = Network::Mainnet; + let nu5_activation = NetworkUpgrade::Nu5.activation_height(network); let transaction = fake_v5_transactions_for_network(network, zebra_test::vectors::MAINNET_BLOCKS.iter()) .rev() - .filter(|transaction| !transaction.is_coinbase() && transaction.inputs().is_empty()) + .filter(|transaction| { + !transaction.is_coinbase() + && transaction.inputs().is_empty() + && transaction.expiry_height() >= nu5_activation + }) .find(|transaction| transaction.sapling_spends_per_anchor().next().is_some()) .expect("No transaction found with Sapling spends"); @@ -1642,7 +1651,7 @@ fn v5_with_sapling_spends() { #[test] fn v5_with_duplicate_sapling_spends() { zebra_test::init(); - zebra_test::RUNTIME.block_on(async { + zebra_test::MULTI_THREADED_RUNTIME.block_on(async { let network = Network::Mainnet; let mut transaction = @@ -1688,7 +1697,7 @@ fn v5_with_duplicate_sapling_spends() { #[test] fn v5_with_duplicate_orchard_action() { zebra_test::init(); - zebra_test::RUNTIME.block_on(async { + zebra_test::MULTI_THREADED_RUNTIME.block_on(async { let network = Network::Mainnet; // Find a transaction with no inputs or outputs to use as base diff --git a/zebra-consensus/src/transaction/tests/prop.rs b/zebra-consensus/src/transaction/tests/prop.rs index 49ddfd22..226b28fc 100644 --- a/zebra-consensus/src/transaction/tests/prop.rs +++ b/zebra-consensus/src/transaction/tests/prop.rs @@ -1,3 +1,5 @@ +//! Randomised property tests for transaction verification. + use std::{collections::HashMap, convert::TryInto, sync::Arc}; use chrono::{DateTime, Duration, Utc}; @@ -438,7 +440,7 @@ fn validate( known_utxos: HashMap, network: Network, ) -> Result { - zebra_test::RUNTIME.block_on(async { + zebra_test::MULTI_THREADED_RUNTIME.block_on(async { // Initialize the verifier let state_service = tower::service_fn(|_| async { unreachable!("State service should not be called") }); diff --git a/zebra-test/src/lib.rs b/zebra-test/src/lib.rs index 9cd2aa07..ca23a912 100644 --- a/zebra-test/src/lib.rs +++ b/zebra-test/src/lib.rs @@ -26,6 +26,7 @@ pub mod zip0243; pub mod zip0244; /// A single-threaded Tokio runtime that can be shared between tests. +/// This runtime should be used for tests that need a single thread for consistent timings. /// /// This shared runtime should be used in tests that use shared background tasks. An example is /// with shared global `Lazy` types, because they spawn a background task when they @@ -40,13 +41,24 @@ pub mod zip0244; /// at a time, there's a risk of a test finishing while the timer is paused (due to a test failure, /// for example) and that means that the next test will already start with an incorrect timer /// state. -pub static RUNTIME: Lazy = Lazy::new(|| { +pub static SINGLE_THREADED_RUNTIME: Lazy = Lazy::new(|| { tokio::runtime::Builder::new_current_thread() .enable_all() .build() .expect("Failed to create Tokio runtime") }); +/// A multi-threaded Tokio runtime that can be shared between tests. +/// This runtime should be used for tests that spawn blocking threads. +/// +/// See [`SINGLE_THREADED_RUNTIME`] for details. +pub static MULTI_THREADED_RUNTIME: Lazy = Lazy::new(|| { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .expect("Failed to create Tokio runtime") +}); + static INIT: Once = Once::new(); /// Initialize global and thread-specific settings for tests, @@ -134,7 +146,8 @@ pub fn init() { /// /// This is generally used in proptests, which don't support the `#[tokio::test]` attribute. /// -/// If a runtime needs to be shared between tests, use the [`RUNTIME`] instance instead. +/// If a runtime needs to be shared between tests, use the [`SINGLE_THREADED_RUNTIME`] or +/// [`MULTI_THREADED_RUNTIME`] instances instead. /// /// See also the [`init`] function, which is called by this function. pub fn init_async() -> tokio::runtime::Runtime {