Merge pull request #2318 from ZcashFoundation/redpallas-verifier
RedPallas async verifier service
This commit is contained in:
parent
14137bfc8e
commit
ff299781c1
|
|
@ -71,13 +71,16 @@ pub enum TransactionError {
|
|||
Groth16,
|
||||
|
||||
#[error(
|
||||
"joinSplitSig MUST represent a valid signature under joinSplitPubKey of dataToBeSigned"
|
||||
"Sprout joinSplitSig MUST represent a valid signature under joinSplitPubKey of dataToBeSigned"
|
||||
)]
|
||||
Ed25519(#[from] zebra_chain::primitives::ed25519::Error),
|
||||
|
||||
#[error("bindingSig MUST represent a valid signature under the transaction binding validating key bvk of SigHash")]
|
||||
#[error("Sapling bindingSig MUST represent a valid signature under the transaction binding validating key bvk of SigHash")]
|
||||
RedJubjub(zebra_chain::primitives::redjubjub::Error),
|
||||
|
||||
#[error("Orchard bindingSig MUST represent a valid signature under the transaction binding validating key bvk of SigHash")]
|
||||
RedPallas(zebra_chain::primitives::redpallas::Error),
|
||||
|
||||
// temporary error type until #1186 is fixed
|
||||
#[error("Downcast from BoxError to redjubjub::Error failed")]
|
||||
InternalDowncastError(String),
|
||||
|
|
@ -88,6 +91,7 @@ pub enum TransactionError {
|
|||
|
||||
impl From<BoxError> for TransactionError {
|
||||
fn from(err: BoxError) -> Self {
|
||||
// TODO: handle redpallas Error?
|
||||
match err.downcast::<zebra_chain::primitives::redjubjub::Error>() {
|
||||
Ok(e) => TransactionError::RedJubjub(*e),
|
||||
Err(e) => TransactionError::InternalDowncastError(format!(
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@
|
|||
pub mod ed25519;
|
||||
pub mod groth16;
|
||||
pub mod redjubjub;
|
||||
pub mod redpallas;
|
||||
|
||||
/// The maximum batch size for any of the batch verifiers.
|
||||
const MAX_BATCH_SIZE: usize = 64;
|
||||
|
|
|
|||
|
|
@ -0,0 +1,129 @@
|
|||
//! Async RedPallas batch verifier service
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
use std::{
|
||||
future::Future,
|
||||
mem,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use futures::future::{ready, Ready};
|
||||
use once_cell::sync::Lazy;
|
||||
use rand::thread_rng;
|
||||
use tokio::sync::broadcast::{channel, error::RecvError, Sender};
|
||||
use tower::{util::ServiceFn, Service};
|
||||
use tower_batch::{Batch, BatchControl};
|
||||
use tower_fallback::Fallback;
|
||||
|
||||
use zebra_chain::primitives::redpallas::{batch, *};
|
||||
|
||||
/// Global batch verification context for RedPallas signatures.
|
||||
///
|
||||
/// This service transparently batches contemporaneous signature verifications,
|
||||
/// handling batch failures by falling back to individual verification.
|
||||
///
|
||||
/// Note that making a `Service` call requires mutable access to the service, so
|
||||
/// you should call `.clone()` on the global handle to create a local, mutable
|
||||
/// handle.
|
||||
#[allow(dead_code)]
|
||||
pub static VERIFIER: Lazy<
|
||||
Fallback<Batch<Verifier, Item>, ServiceFn<fn(Item) -> Ready<Result<(), Error>>>>,
|
||||
> = Lazy::new(|| {
|
||||
Fallback::new(
|
||||
Batch::new(
|
||||
Verifier::default(),
|
||||
super::MAX_BATCH_SIZE,
|
||||
super::MAX_BATCH_LATENCY,
|
||||
),
|
||||
// We want to fallback to individual verification if batch verification
|
||||
// fails, so we need a Service to use. The obvious way to do this would
|
||||
// be to write a closure that returns an async block. But because we
|
||||
// have to specify the type of a static, we need to be able to write the
|
||||
// type of the closure and its return value, and both closures and async
|
||||
// blocks have eldritch types whose names cannot be written. So instead,
|
||||
// we use a Ready to avoid an async block and cast the closure to a
|
||||
// function (which is possible because it doesn't capture any state).
|
||||
tower::service_fn((|item: Item| ready(item.verify_single())) as fn(_) -> _),
|
||||
)
|
||||
});
|
||||
|
||||
/// RedPallas signature verifier service
|
||||
pub struct Verifier {
|
||||
batch: batch::Verifier,
|
||||
// This uses a "broadcast" channel, which is an mpmc channel. Tokio also
|
||||
// provides a spmc channel, "watch", but it only keeps the latest value, so
|
||||
// using it would require thinking through whether it was possible for
|
||||
// results from one batch to be mixed with another.
|
||||
tx: Sender<Result<(), Error>>,
|
||||
}
|
||||
|
||||
impl Default for Verifier {
|
||||
fn default() -> Self {
|
||||
let batch = batch::Verifier::default();
|
||||
let (tx, _) = channel(super::BROADCAST_BUFFER_SIZE);
|
||||
Self { batch, tx }
|
||||
}
|
||||
}
|
||||
|
||||
/// Type alias to clarify that this batch::Item is a RedPallasItem
|
||||
pub type Item = batch::Item;
|
||||
|
||||
impl Service<BatchControl<Item>> for Verifier {
|
||||
type Response = ();
|
||||
type Error = Error;
|
||||
type Future = Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'static>>;
|
||||
|
||||
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn call(&mut self, req: BatchControl<Item>) -> Self::Future {
|
||||
match req {
|
||||
BatchControl::Item(item) => {
|
||||
tracing::trace!("got item");
|
||||
self.batch.queue(item);
|
||||
let mut rx = self.tx.subscribe();
|
||||
Box::pin(async move {
|
||||
match rx.recv().await {
|
||||
Ok(result) => {
|
||||
if result.is_ok() {
|
||||
tracing::trace!(?result, "validated redpallas signature");
|
||||
metrics::counter!("signatures.redpallas.validated", 1);
|
||||
} else {
|
||||
tracing::trace!(?result, "invalid redpallas signature");
|
||||
metrics::counter!("signatures.redpallas.invalid", 1);
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
Err(RecvError::Lagged(_)) => {
|
||||
tracing::error!(
|
||||
"batch verification receiver lagged and lost verification results"
|
||||
);
|
||||
Err(Error::InvalidSignature)
|
||||
}
|
||||
Err(RecvError::Closed) => panic!("verifier was dropped without flushing"),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
BatchControl::Flush => {
|
||||
tracing::trace!("got flush command");
|
||||
let batch = mem::take(&mut self.batch);
|
||||
let _ = self.tx.send(batch.verify(thread_rng()));
|
||||
Box::pin(async { Ok(()) })
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Verifier {
|
||||
fn drop(&mut self) {
|
||||
// We need to flush the current batch in case there are still any pending futures.
|
||||
let batch = mem::take(&mut self.batch);
|
||||
let _ = self.tx.send(batch.verify(thread_rng()));
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,74 @@
|
|||
//! Tests for redpallas signature verification
|
||||
|
||||
use super::*;
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use color_eyre::eyre::{eyre, Result};
|
||||
use futures::stream::{FuturesUnordered, StreamExt};
|
||||
use tower::ServiceExt;
|
||||
use tower_batch::Batch;
|
||||
|
||||
async fn sign_and_verify<V>(mut verifier: V, n: usize) -> Result<(), V::Error>
|
||||
where
|
||||
V: Service<Item, Response = ()>,
|
||||
{
|
||||
let mut rng = thread_rng();
|
||||
let mut results = FuturesUnordered::new();
|
||||
for i in 0..n {
|
||||
let span = tracing::trace_span!("sig", i);
|
||||
let msg = b"BatchVerifyTest";
|
||||
|
||||
match i % 2 {
|
||||
0 => {
|
||||
let sk = SigningKey::<SpendAuth>::new(&mut rng);
|
||||
let vk = VerificationKey::from(&sk);
|
||||
let sig = sk.sign(&mut rng, &msg[..]);
|
||||
verifier.ready_and().await?;
|
||||
results.push(span.in_scope(|| verifier.call((vk.into(), sig, msg).into())))
|
||||
}
|
||||
1 => {
|
||||
let sk = SigningKey::<Binding>::new(&mut rng);
|
||||
let vk = VerificationKey::from(&sk);
|
||||
let sig = sk.sign(&mut rng, &msg[..]);
|
||||
verifier.ready_and().await?;
|
||||
results.push(span.in_scope(|| verifier.call((vk.into(), sig, msg).into())))
|
||||
}
|
||||
_ => panic!(),
|
||||
}
|
||||
}
|
||||
|
||||
while let Some(result) = results.next().await {
|
||||
result?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn batch_flushes_on_max_items() -> Result<()> {
|
||||
use tokio::time::timeout;
|
||||
|
||||
// Use a very long max_latency and a short timeout to check that
|
||||
// flushing is happening based on hitting max_items.
|
||||
let verifier = Batch::new(Verifier::default(), 10, Duration::from_secs(1000));
|
||||
timeout(Duration::from_secs(5), sign_and_verify(verifier, 100))
|
||||
.await?
|
||||
.map_err(|e| eyre!(e))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn batch_flushes_on_max_latency() -> Result<()> {
|
||||
use tokio::time::timeout;
|
||||
|
||||
// Use a very high max_items and a short timeout to check that
|
||||
// flushing is happening based on hitting max_latency.
|
||||
let verifier = Batch::new(Verifier::default(), 100, Duration::from_millis(500));
|
||||
timeout(Duration::from_secs(5), sign_and_verify(verifier, 10))
|
||||
.await?
|
||||
.map_err(|e| eyre!(e))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Loading…
Reference in New Issue