tower-batch: test fallback verification example.

This commit is contained in:
Henry de Valence 2020-07-15 13:27:39 -07:00
parent 1e787aecb9
commit 04044be6d1
3 changed files with 59 additions and 10 deletions

20
Cargo.lock generated
View File

@ -576,6 +576,20 @@ dependencies = [
"thiserror", "thiserror",
] ]
[[package]]
name = "ed25519-zebra"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a045d3ca7d15222d578515dc6b54fea6c3591763b8fe2f67a45bbd56d5f1989b"
dependencies = [
"curve25519-dalek",
"hex",
"rand_core 0.5.1",
"serde",
"sha2",
"thiserror",
]
[[package]] [[package]]
name = "equihash" name = "equihash"
version = "0.1.0" version = "0.1.0"
@ -2212,13 +2226,15 @@ dependencies = [
name = "tower-batch" name = "tower-batch"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"ed25519-zebra", "color-eyre",
"ed25519-zebra 2.1.0",
"futures", "futures",
"futures-core", "futures-core",
"pin-project", "pin-project",
"rand 0.7.3", "rand 0.7.3",
"tokio", "tokio",
"tower", "tower",
"tower-fallback",
"tracing", "tracing",
"tracing-futures", "tracing-futures",
"zebra-test", "zebra-test",
@ -2602,7 +2618,7 @@ dependencies = [
"chrono", "chrono",
"color-eyre", "color-eyre",
"displaydoc", "displaydoc",
"ed25519-zebra", "ed25519-zebra 1.0.0",
"equihash", "equihash",
"futures", "futures",
"hex", "hex",

View File

@ -15,8 +15,10 @@ tracing-futures = "0.2.4"
futures = "0.3.5" futures = "0.3.5"
[dev-dependencies] [dev-dependencies]
ed25519-zebra = "1.0" ed25519-zebra = "2.1.0"
rand = "0.7" rand = "0.7"
tokio = { version = "0.2", features = ["full"]} tokio = { version = "0.2", features = ["full"]}
tracing = "0.1.17" tracing = "0.1.17"
zebra-test = { path = "../zebra-test/" } zebra-test = { path = "../zebra-test/" }
tower-fallback = { path = "../tower-fallback/" }
color-eyre = "0.5"

View File

@ -6,12 +6,14 @@ use std::{
time::Duration, time::Duration,
}; };
use color_eyre::{eyre::eyre, Report};
use ed25519_zebra::*; use ed25519_zebra::*;
use futures::stream::{FuturesUnordered, StreamExt}; use futures::stream::{FuturesUnordered, StreamExt};
use rand::thread_rng; use rand::thread_rng;
use tokio::sync::broadcast::{channel, RecvError, Sender}; use tokio::sync::broadcast::{channel, RecvError, Sender};
use tower::{Service, ServiceExt}; use tower::{Service, ServiceExt};
use tower_batch::{Batch, BatchControl}; use tower_batch::{Batch, BatchControl};
use tower_fallback::Fallback;
// ============ service impl ============ // ============ service impl ============
@ -84,24 +86,37 @@ impl Drop for Ed25519Verifier {
// =============== testing code ======== // =============== testing code ========
async fn sign_and_verify<V>(mut verifier: V, n: usize) -> Result<(), V::Error> async fn sign_and_verify<V>(
mut verifier: V,
n: usize,
bad_index: Option<usize>,
) -> Result<(), V::Error>
where where
V: Service<Ed25519Item, Response = ()>, V: Service<Ed25519Item, Response = ()>,
{ {
let mut results = FuturesUnordered::new(); let results = FuturesUnordered::new();
for i in 0..n { for i in 0..n {
let span = tracing::trace_span!("sig", i); let span = tracing::trace_span!("sig", i);
let sk = SigningKey::new(thread_rng()); let sk = SigningKey::new(thread_rng());
let vk_bytes = VerificationKeyBytes::from(&sk); let vk_bytes = VerificationKeyBytes::from(&sk);
let msg = b"BatchVerifyTest"; let msg = b"BatchVerifyTest";
let sig = sk.sign(&msg[..]); let sig = if Some(i) == bad_index {
sk.sign(b"badmsg")
} else {
sk.sign(&msg[..])
};
verifier.ready_and().await?; verifier.ready_and().await?;
results.push(span.in_scope(|| verifier.call((vk_bytes, sig, msg).into()))) results.push(span.in_scope(|| verifier.call((vk_bytes, sig, msg).into())))
} }
while let Some(result) = results.next().await { let mut numbered_results = results.enumerate();
result?; while let Some((i, result)) = numbered_results.next().await {
if Some(i) == bad_index {
assert!(result.is_err());
} else {
result?;
}
} }
Ok(()) Ok(())
@ -116,7 +131,7 @@ async fn batch_flushes_on_max_items() {
// flushing is happening based on hitting max_items. // flushing is happening based on hitting max_items.
let verifier = Batch::new(Ed25519Verifier::new(), 10, Duration::from_secs(1000)); let verifier = Batch::new(Ed25519Verifier::new(), 10, Duration::from_secs(1000));
assert!( assert!(
timeout(Duration::from_secs(1), sign_and_verify(verifier, 100)) timeout(Duration::from_secs(1), sign_and_verify(verifier, 100, None))
.await .await
.is_ok() .is_ok()
); );
@ -131,8 +146,24 @@ async fn batch_flushes_on_max_latency() {
// flushing is happening based on hitting max_latency. // flushing is happening based on hitting max_latency.
let verifier = Batch::new(Ed25519Verifier::new(), 100, Duration::from_millis(500)); let verifier = Batch::new(Ed25519Verifier::new(), 100, Duration::from_millis(500));
assert!( assert!(
timeout(Duration::from_secs(1), sign_and_verify(verifier, 10)) timeout(Duration::from_secs(1), sign_and_verify(verifier, 10, None))
.await .await
.is_ok() .is_ok()
); );
} }
#[tokio::test]
async fn fallback_verification() -> Result<(), Report> {
zebra_test::init();
let verifier = Fallback::new(
Batch::new(Ed25519Verifier::new(), 10, Duration::from_millis(100)),
tower::service_fn(|item: Ed25519Item| async move { item.verify_single() }),
);
sign_and_verify(verifier, 100, Some(39))
.await
.map_err(|e| eyre!(e))?;
Ok(())
}