diff --git a/Cargo.lock b/Cargo.lock index 311101b6..9f770c58 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -426,6 +426,19 @@ version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4358a9e11b9a09cf52383b451b49a169e8d797b68aa02301ff586d70d9661ea3" +[[package]] +name = "ed25519-zebra" +version = "0.3.0" +source = "git+https://github.com/zcashfoundation/ed25519-zebra?branch=batch2#e12a4cb32230512856bb7d1ccac51248ce2b180c" +dependencies = [ + "curve25519-dalek", + "hex", + "rand_core 0.5.1", + "serde", + "sha2", + "thiserror", +] + [[package]] name = "ed25519-zebra" version = "0.3.0" @@ -1927,7 +1940,7 @@ dependencies = [ name = "tower-batch" version = "0.1.0" dependencies = [ - "ed25519-zebra", + "ed25519-zebra 0.3.0 (git+https://github.com/zcashfoundation/ed25519-zebra?branch=batch2)", "futures", "futures-core", "pin-project", @@ -1935,7 +1948,9 @@ dependencies = [ "tokio", "tower", "tracing", + "tracing-error", "tracing-futures", + "tracing-subscriber", ] [[package]] @@ -2278,7 +2293,7 @@ dependencies = [ "bs58", "byteorder", "chrono", - "ed25519-zebra", + "ed25519-zebra 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures", "hex", "jubjub", diff --git a/tower-batch/Cargo.toml b/tower-batch/Cargo.toml index e54a07cf..7d2b08a2 100644 --- a/tower-batch/Cargo.toml +++ b/tower-batch/Cargo.toml @@ -15,6 +15,9 @@ tracing-futures = "0.2.4" futures = "0.3.5" [dev-dependencies] -ed25519-zebra = "0.3" +ed25519-zebra = { git = "https://github.com/zcashfoundation/ed25519-zebra", branch = "batch2" } rand = "0.7" tokio = { version = "0.2", features = ["full"]} +tracing-error = "0.1.2" +tracing-subscriber = "0.2.5" +tracing = "0.1.15" diff --git a/tower-batch/tests/ed25519.rs b/tower-batch/tests/ed25519.rs index fc20e6d9..1ec9aee2 100644 --- a/tower-batch/tests/ed25519.rs +++ b/tower-batch/tests/ed25519.rs @@ -2,6 +2,7 @@ use std::{ convert::TryFrom, future::Future, pin::Pin, + sync::Once, task::{Context, Poll}, time::Duration, }; @@ -16,7 +17,7 @@ use tower_batch::{Batch, BatchControl}; // ============ service impl ============ pub struct Ed25519Verifier { - batch: BatchVerifier, + batch: batch::Verifier, // This uses a "broadcast" channel, which is an mpmc channel. Tokio also // provides a spmc channel, "watch", but it only keeps the latest value, so // using it would require thinking through whether it was possible for @@ -26,15 +27,16 @@ pub struct Ed25519Verifier { impl Ed25519Verifier { pub fn new() -> Self { - let batch = BatchVerifier::default(); - let (tx, _) = channel(1); + let batch = batch::Verifier::default(); + // XXX(hdevalence) what's a reasonable choice here? + let (tx, _) = channel(10); Self { tx, batch } } } -type Request<'msg> = (VerificationKeyBytes, Signature, &'msg [u8]); +pub type Ed25519Item = batch::Item; -impl<'msg> Service>> for Ed25519Verifier { +impl<'msg> Service> for Ed25519Verifier { type Response = (); type Error = Error; type Future = Pin> + Send + 'static>>; @@ -43,22 +45,28 @@ impl<'msg> Service>> for Ed25519Verifier { Poll::Ready(Ok(())) } - fn call(&mut self, req: BatchControl>) -> Self::Future { + fn call(&mut self, req: BatchControl) -> Self::Future { match req { - BatchControl::Item((vk_bytes, sig, msg)) => { - self.batch.queue(vk_bytes, sig, msg); + BatchControl::Item(item) => { + tracing::trace!("got item"); + self.batch.queue(item); let mut rx = self.tx.subscribe(); Box::pin(async move { match rx.recv().await { Ok(result) => result, - // this would be bad - Err(RecvError::Lagged(_)) => Err(Error::InvalidSignature), + Err(RecvError::Lagged(_)) => { + tracing::warn!( + "missed channel updates for the correct signature batch!" + ); + Err(Error::InvalidSignature) + } Err(RecvError::Closed) => panic!("verifier was dropped without flushing"), } }) } BatchControl::Flush => { - let batch = std::mem::replace(&mut self.batch, BatchVerifier::default()); + tracing::trace!("got flush command"); + let batch = std::mem::replace(&mut self.batch, batch::Verifier::default()); let _ = self.tx.send(batch.verify(thread_rng())); Box::pin(async { Ok(()) }) } @@ -69,53 +77,75 @@ impl<'msg> Service>> for Ed25519Verifier { impl Drop for Ed25519Verifier { fn drop(&mut self) { // We need to flush the current batch in case there are still any pending futures. - let batch = std::mem::replace(&mut self.batch, BatchVerifier::default()); + let batch = std::mem::replace(&mut self.batch, batch::Verifier::default()); let _ = self.tx.send(batch.verify(thread_rng())); } } // =============== testing code ======== +static LOGGER_INIT: Once = Once::new(); + +fn install_tracing() { + use tracing_error::ErrorLayer; + use tracing_subscriber::prelude::*; + use tracing_subscriber::{fmt, EnvFilter}; + + LOGGER_INIT.call_once(|| { + let fmt_layer = fmt::layer().with_target(false); + let filter_layer = EnvFilter::try_from_default_env() + .or_else(|_| EnvFilter::try_new("info")) + .unwrap(); + + tracing_subscriber::registry() + .with(filter_layer) + .with(fmt_layer) + .with(ErrorLayer::default()) + .init(); + }) +} + async fn sign_and_verify(mut verifier: V, n: usize) where - for<'msg> V: Service>, - for<'msg> >>::Error: - Into>, + V: Service, + >::Error: Into>, { let mut results = FuturesUnordered::new(); - for _ in 0..n { + for i in 0..n { + let span = tracing::trace_span!("sig", i); let sk = SigningKey::new(thread_rng()); let vk_bytes = VerificationKeyBytes::from(&sk); let msg = b"BatchVerifyTest"; let sig = sk.sign(&msg[..]); - results.push( - verifier - .ready_and() - .await - .map_err(|e| e.into()) - .unwrap() - .call((vk_bytes, sig, &msg[..])), - ) + + verifier.ready_and().await.map_err(|e| e.into()).unwrap(); + results.push(span.in_scope(|| verifier.call((vk_bytes, sig, msg).into()))) } while let Some(result) = results.next().await { + let result = result.map_err(|e| e.into()); + tracing::trace!(?result); assert!(result.is_ok()); } } +/* #[tokio::test] async fn individual_verification_with_service_fn() { - let verifier = tower::service_fn(|(vk_bytes, sig, msg): Request| { + let verifier = tower::service_fn(|item: Ed25519Item| { + // now this is actually impossible to write, oops let result = VerificationKey::try_from(vk_bytes).and_then(|vk| vk.verify(&sig, msg)); async move { result } }); sign_and_verify(verifier, 100).await; } +*/ #[tokio::test] async fn batch_flushes_on_max_items() { use tokio::time::timeout; + install_tracing(); // Use a very long max_latency and a short timeout to check that // flushing is happening based on hitting max_items. @@ -130,6 +160,7 @@ async fn batch_flushes_on_max_items() { #[tokio::test] async fn batch_flushes_on_max_latency() { use tokio::time::timeout; + install_tracing(); // Use a very high max_items and a short timeout to check that // flushing is happening based on hitting max_latency.