Fix Ed25519Verifier using a patched batch API.

Changing the batch API to have an explicit item type removes the lifetime
problems described in the previous commit.
This commit is contained in:
Henry de Valence 2020-06-16 01:34:31 -07:00
parent 5bcef64514
commit 34ca9a7ed2
3 changed files with 77 additions and 28 deletions

19
Cargo.lock generated
View File

@ -426,6 +426,19 @@ version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4358a9e11b9a09cf52383b451b49a169e8d797b68aa02301ff586d70d9661ea3" checksum = "4358a9e11b9a09cf52383b451b49a169e8d797b68aa02301ff586d70d9661ea3"
[[package]]
name = "ed25519-zebra"
version = "0.3.0"
source = "git+https://github.com/zcashfoundation/ed25519-zebra?branch=batch2#e12a4cb32230512856bb7d1ccac51248ce2b180c"
dependencies = [
"curve25519-dalek",
"hex",
"rand_core 0.5.1",
"serde",
"sha2",
"thiserror",
]
[[package]] [[package]]
name = "ed25519-zebra" name = "ed25519-zebra"
version = "0.3.0" version = "0.3.0"
@ -1927,7 +1940,7 @@ dependencies = [
name = "tower-batch" name = "tower-batch"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"ed25519-zebra", "ed25519-zebra 0.3.0 (git+https://github.com/zcashfoundation/ed25519-zebra?branch=batch2)",
"futures", "futures",
"futures-core", "futures-core",
"pin-project", "pin-project",
@ -1935,7 +1948,9 @@ dependencies = [
"tokio", "tokio",
"tower", "tower",
"tracing", "tracing",
"tracing-error",
"tracing-futures", "tracing-futures",
"tracing-subscriber",
] ]
[[package]] [[package]]
@ -2278,7 +2293,7 @@ dependencies = [
"bs58", "bs58",
"byteorder", "byteorder",
"chrono", "chrono",
"ed25519-zebra", "ed25519-zebra 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"futures", "futures",
"hex", "hex",
"jubjub", "jubjub",

View File

@ -15,6 +15,9 @@ tracing-futures = "0.2.4"
futures = "0.3.5" futures = "0.3.5"
[dev-dependencies] [dev-dependencies]
ed25519-zebra = "0.3" ed25519-zebra = { git = "https://github.com/zcashfoundation/ed25519-zebra", branch = "batch2" }
rand = "0.7" rand = "0.7"
tokio = { version = "0.2", features = ["full"]} tokio = { version = "0.2", features = ["full"]}
tracing-error = "0.1.2"
tracing-subscriber = "0.2.5"
tracing = "0.1.15"

View File

@ -2,6 +2,7 @@ use std::{
convert::TryFrom, convert::TryFrom,
future::Future, future::Future,
pin::Pin, pin::Pin,
sync::Once,
task::{Context, Poll}, task::{Context, Poll},
time::Duration, time::Duration,
}; };
@ -16,7 +17,7 @@ use tower_batch::{Batch, BatchControl};
// ============ service impl ============ // ============ service impl ============
pub struct Ed25519Verifier { pub struct Ed25519Verifier {
batch: BatchVerifier, batch: batch::Verifier,
// This uses a "broadcast" channel, which is an mpmc channel. Tokio also // This uses a "broadcast" channel, which is an mpmc channel. Tokio also
// provides a spmc channel, "watch", but it only keeps the latest value, so // provides a spmc channel, "watch", but it only keeps the latest value, so
// using it would require thinking through whether it was possible for // using it would require thinking through whether it was possible for
@ -26,15 +27,16 @@ pub struct Ed25519Verifier {
impl Ed25519Verifier { impl Ed25519Verifier {
pub fn new() -> Self { pub fn new() -> Self {
let batch = BatchVerifier::default(); let batch = batch::Verifier::default();
let (tx, _) = channel(1); // XXX(hdevalence) what's a reasonable choice here?
let (tx, _) = channel(10);
Self { tx, batch } Self { tx, batch }
} }
} }
type Request<'msg> = (VerificationKeyBytes, Signature, &'msg [u8]); pub type Ed25519Item = batch::Item;
impl<'msg> Service<BatchControl<Request<'msg>>> for Ed25519Verifier { impl<'msg> Service<BatchControl<Ed25519Item>> for Ed25519Verifier {
type Response = (); type Response = ();
type Error = Error; type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'static>>; type Future = Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'static>>;
@ -43,22 +45,28 @@ impl<'msg> Service<BatchControl<Request<'msg>>> for Ed25519Verifier {
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
} }
fn call(&mut self, req: BatchControl<Request<'msg>>) -> Self::Future { fn call(&mut self, req: BatchControl<Ed25519Item>) -> Self::Future {
match req { match req {
BatchControl::Item((vk_bytes, sig, msg)) => { BatchControl::Item(item) => {
self.batch.queue(vk_bytes, sig, msg); tracing::trace!("got item");
self.batch.queue(item);
let mut rx = self.tx.subscribe(); let mut rx = self.tx.subscribe();
Box::pin(async move { Box::pin(async move {
match rx.recv().await { match rx.recv().await {
Ok(result) => result, Ok(result) => result,
// this would be bad Err(RecvError::Lagged(_)) => {
Err(RecvError::Lagged(_)) => Err(Error::InvalidSignature), tracing::warn!(
"missed channel updates for the correct signature batch!"
);
Err(Error::InvalidSignature)
}
Err(RecvError::Closed) => panic!("verifier was dropped without flushing"), Err(RecvError::Closed) => panic!("verifier was dropped without flushing"),
} }
}) })
} }
BatchControl::Flush => { BatchControl::Flush => {
let batch = std::mem::replace(&mut self.batch, BatchVerifier::default()); tracing::trace!("got flush command");
let batch = std::mem::replace(&mut self.batch, batch::Verifier::default());
let _ = self.tx.send(batch.verify(thread_rng())); let _ = self.tx.send(batch.verify(thread_rng()));
Box::pin(async { Ok(()) }) Box::pin(async { Ok(()) })
} }
@ -69,53 +77,75 @@ impl<'msg> Service<BatchControl<Request<'msg>>> for Ed25519Verifier {
impl Drop for Ed25519Verifier { impl Drop for Ed25519Verifier {
fn drop(&mut self) { fn drop(&mut self) {
// We need to flush the current batch in case there are still any pending futures. // We need to flush the current batch in case there are still any pending futures.
let batch = std::mem::replace(&mut self.batch, BatchVerifier::default()); let batch = std::mem::replace(&mut self.batch, batch::Verifier::default());
let _ = self.tx.send(batch.verify(thread_rng())); let _ = self.tx.send(batch.verify(thread_rng()));
} }
} }
// =============== testing code ======== // =============== testing code ========
static LOGGER_INIT: Once = Once::new();
fn install_tracing() {
use tracing_error::ErrorLayer;
use tracing_subscriber::prelude::*;
use tracing_subscriber::{fmt, EnvFilter};
LOGGER_INIT.call_once(|| {
let fmt_layer = fmt::layer().with_target(false);
let filter_layer = EnvFilter::try_from_default_env()
.or_else(|_| EnvFilter::try_new("info"))
.unwrap();
tracing_subscriber::registry()
.with(filter_layer)
.with(fmt_layer)
.with(ErrorLayer::default())
.init();
})
}
async fn sign_and_verify<V>(mut verifier: V, n: usize) async fn sign_and_verify<V>(mut verifier: V, n: usize)
where where
for<'msg> V: Service<Request<'msg>>, V: Service<Ed25519Item, Response = ()>,
for<'msg> <V as Service<Request<'msg>>>::Error: <V as Service<Ed25519Item>>::Error: Into<Box<dyn std::error::Error + Send + Sync + 'static>>,
Into<Box<dyn std::error::Error + Send + Sync + 'static>>,
{ {
let mut results = FuturesUnordered::new(); let mut results = FuturesUnordered::new();
for _ in 0..n { for i in 0..n {
let span = tracing::trace_span!("sig", i);
let sk = SigningKey::new(thread_rng()); let sk = SigningKey::new(thread_rng());
let vk_bytes = VerificationKeyBytes::from(&sk); let vk_bytes = VerificationKeyBytes::from(&sk);
let msg = b"BatchVerifyTest"; let msg = b"BatchVerifyTest";
let sig = sk.sign(&msg[..]); let sig = sk.sign(&msg[..]);
results.push(
verifier verifier.ready_and().await.map_err(|e| e.into()).unwrap();
.ready_and() results.push(span.in_scope(|| verifier.call((vk_bytes, sig, msg).into())))
.await
.map_err(|e| e.into())
.unwrap()
.call((vk_bytes, sig, &msg[..])),
)
} }
while let Some(result) = results.next().await { while let Some(result) = results.next().await {
let result = result.map_err(|e| e.into());
tracing::trace!(?result);
assert!(result.is_ok()); assert!(result.is_ok());
} }
} }
/*
#[tokio::test] #[tokio::test]
async fn individual_verification_with_service_fn() { async fn individual_verification_with_service_fn() {
let verifier = tower::service_fn(|(vk_bytes, sig, msg): Request| { let verifier = tower::service_fn(|item: Ed25519Item| {
// now this is actually impossible to write, oops
let result = VerificationKey::try_from(vk_bytes).and_then(|vk| vk.verify(&sig, msg)); let result = VerificationKey::try_from(vk_bytes).and_then(|vk| vk.verify(&sig, msg));
async move { result } async move { result }
}); });
sign_and_verify(verifier, 100).await; sign_and_verify(verifier, 100).await;
} }
*/
#[tokio::test] #[tokio::test]
async fn batch_flushes_on_max_items() { async fn batch_flushes_on_max_items() {
use tokio::time::timeout; use tokio::time::timeout;
install_tracing();
// Use a very long max_latency and a short timeout to check that // Use a very long max_latency and a short timeout to check that
// flushing is happening based on hitting max_items. // flushing is happening based on hitting max_items.
@ -130,6 +160,7 @@ async fn batch_flushes_on_max_items() {
#[tokio::test] #[tokio::test]
async fn batch_flushes_on_max_latency() { async fn batch_flushes_on_max_latency() {
use tokio::time::timeout; use tokio::time::timeout;
install_tracing();
// Use a very high max_items and a short timeout to check that // Use a very high max_items and a short timeout to check that
// flushing is happening based on hitting max_latency. // flushing is happening based on hitting max_latency.