From ac1a1c76c61a86a125b90e0dd0353371f0f95c47 Mon Sep 17 00:00:00 2001 From: Janito Vaqueiro Ferreira Filho Date: Fri, 2 Jul 2021 04:01:26 -0300 Subject: [PATCH] Refactor asynchronous checks in `transaction::Verifier` (#2432) * Refactor `AsyncChecks` into a proper type Add some helper methods to it so that checks can be added by daisy-chaining calls. Also move the code to wait for the checks to finish into the new type. * Refactor inclusion of individual Sapling checks Use `oneshot` instead of `ready_and` so that the method becomes synchronous. * Make V4 and V5 verification methods synchronous There is no longer a need to wait for any internal service to be ready, since now that's always done as part of an asynchronous check included in the returned set of checks. --- zebra-consensus/src/transaction.rs | 174 ++++++++++++++++------------- 1 file changed, 95 insertions(+), 79 deletions(-) diff --git a/zebra-consensus/src/transaction.rs b/zebra-consensus/src/transaction.rs index c6a91355..f421c704 100644 --- a/zebra-consensus/src/transaction.rs +++ b/zebra-consensus/src/transaction.rs @@ -1,6 +1,7 @@ use std::{ collections::HashMap, future::Future, + iter::FromIterator, pin::Pin, sync::Arc, task::{Context, Poll}, @@ -31,9 +32,6 @@ mod check; #[cfg(test)] mod tests; -/// An alias for a set of asynchronous checks that should succeed. -type AsyncChecks = FuturesUnordered> + Send>>>; - /// Asynchronous transaction verification. /// /// # Correctness @@ -200,23 +198,20 @@ where joinsplit_data, sapling_shielded_data, .. - } => { - Self::verify_v4_transaction( - req, - network, - script_verifier, - inputs, - joinsplit_data, - sapling_shielded_data, - ) - .await? - } + } => Self::verify_v4_transaction( + req, + network, + script_verifier, + inputs, + joinsplit_data, + sapling_shielded_data, + )?, Transaction::V5 { inputs, .. } => { - Self::verify_v5_transaction(req, network, script_verifier, inputs).await? + Self::verify_v5_transaction(req, network, script_verifier, inputs)? } }; - Self::wait_for_checks(async_checks).await?; + async_checks.check().await?; Ok(tx.hash()) } @@ -248,7 +243,7 @@ where /// - the transparent `inputs` in the transaction /// - the Sprout `joinsplit_data` shielded data in the transaction /// - the `sapling_shielded_data` in the transaction - async fn verify_v4_transaction( + fn verify_v4_transaction( request: Request, network: Network, script_verifier: script::Verifier, @@ -256,33 +251,26 @@ where joinsplit_data: &Option>, sapling_shielded_data: &Option>, ) -> Result { - // A set of asynchronous checks which must all succeed. - // We finish by waiting on these below. - let mut async_checks = AsyncChecks::new(); - let tx = request.transaction(); let upgrade = request.upgrade(network); - - // Add asynchronous checks of the transparent inputs and outputs - async_checks.extend(Self::verify_transparent_inputs_and_outputs( - &request, - network, - inputs, - script_verifier, - )?); - let shielded_sighash = tx.sighash(upgrade, HashType::ALL, None); - async_checks.extend(Self::verify_sprout_shielded_data( - joinsplit_data, - &shielded_sighash, - )); - - async_checks.extend( - Self::verify_sapling_shielded_data(sapling_shielded_data, &shielded_sighash).await?, - ); - - Ok(async_checks) + Ok( + Self::verify_transparent_inputs_and_outputs( + &request, + network, + inputs, + script_verifier, + )? + .and(Self::verify_sprout_shielded_data( + joinsplit_data, + &shielded_sighash, + )) + .and(Self::verify_sapling_shielded_data( + sapling_shielded_data, + &shielded_sighash, + )?), + ) } /// Verify a V5 transaction. @@ -302,7 +290,7 @@ where /// - the `network` to consider when verifying /// - the `script_verifier` to use for verifying the transparent transfers /// - the transparent `inputs` in the transaction - async fn verify_v5_transaction( + fn verify_v5_transaction( request: Request, network: Network, script_verifier: script::Verifier, @@ -383,7 +371,7 @@ where input_index, }; - script_verifier.clone().oneshot(request).boxed() + script_verifier.clone().oneshot(request) }) .collect(); @@ -396,7 +384,7 @@ where joinsplit_data: &Option>, shielded_sighash: &blake2b_simd::Hash, ) -> AsyncChecks { - let checks = AsyncChecks::new(); + let mut checks = AsyncChecks::new(); if let Some(joinsplit_data) = joinsplit_data { // XXX create a method on JoinSplitData @@ -421,24 +409,20 @@ where let ed25519_item = (joinsplit_data.pub_key, joinsplit_data.sig, shielded_sighash).into(); - checks.push(ed25519_verifier.oneshot(ed25519_item).boxed()); + checks.push(ed25519_verifier.oneshot(ed25519_item)); } checks } /// Verifies a transaction's Sapling shielded data. - async fn verify_sapling_shielded_data( + fn verify_sapling_shielded_data( sapling_shielded_data: &Option>, shielded_sighash: &blake2b_simd::Hash, ) -> Result { - let async_checks = AsyncChecks::new(); + let mut async_checks = AsyncChecks::new(); if let Some(sapling_shielded_data) = sapling_shielded_data { - let mut spend_verifier = primitives::groth16::SPEND_VERIFIER.clone(); - let mut output_verifier = primitives::groth16::OUTPUT_VERIFIER.clone(); - let mut redjubjub_verifier = primitives::redjubjub::VERIFIER.clone(); - for spend in sapling_shielded_data.spends_per_anchor() { // Consensus rule: cv and rk MUST NOT be of small // order, i.e. [h_J]cv MUST NOT be 𝒪_J and [h_J]rk @@ -456,12 +440,11 @@ where // resulting future to our collection of async // checks that (at a minimum) must pass for the // transaction to verify. - let spend_rsp = spend_verifier - .ready_and() - .await? - .call(primitives::groth16::ItemWrapper::from(&spend).into()); - - async_checks.push(spend_rsp.boxed()); + async_checks.push( + primitives::groth16::SPEND_VERIFIER + .clone() + .oneshot(primitives::groth16::ItemWrapper::from(&spend).into()), + ); // Consensus rule: The spend authorization signature // MUST be a valid SpendAuthSig signature over @@ -472,12 +455,11 @@ where // description while adding the resulting future to // our collection of async checks that (at a // minimum) must pass for the transaction to verify. - let rsp = redjubjub_verifier - .ready_and() - .await? - .call((spend.rk, spend.spend_auth_sig, &shielded_sighash).into()); - - async_checks.push(rsp.boxed()); + async_checks.push( + primitives::redjubjub::VERIFIER + .clone() + .oneshot((spend.rk, spend.spend_auth_sig, shielded_sighash).into()), + ); } for output in sapling_shielded_data.outputs() { @@ -497,12 +479,11 @@ where // the resulting future to our collection of async // checks that (at a minimum) must pass for the // transaction to verify. - let output_rsp = output_verifier - .ready_and() - .await? - .call(primitives::groth16::ItemWrapper::from(output).into()); - - async_checks.push(output_rsp.boxed()); + async_checks.push( + primitives::groth16::OUTPUT_VERIFIER + .clone() + .oneshot(primitives::groth16::ItemWrapper::from(output).into()), + ); } let bvk = sapling_shielded_data.binding_verification_key(); @@ -522,31 +503,66 @@ where //.map_err(|e| BoxError::from(Box::new(e)))?; } - let _rsp = redjubjub_verifier - .ready_and() - .await? - .call((bvk, sapling_shielded_data.binding_sig, &shielded_sighash).into()) - .boxed(); - // TODO: stop ignoring binding signature errors - #1939 - // async_checks.push(rsp); + // async_checks.push( + // primitives::redjubjub::VERIFIER + // .clone() + // .oneshot((bvk, sapling_shielded_data.binding_sig, &shielded_sighash).into()), + // ); } Ok(async_checks) } +} - /// Await a set of checks that should all succeed. +/// A set of unordered asynchronous checks that should succeed. +/// +/// A wrapper around [`FuturesUnordered`] with some auxiliary methods. +struct AsyncChecks(FuturesUnordered> + Send>>>); + +impl AsyncChecks { + /// Create an empty set of unordered asynchronous checks. + pub fn new() -> Self { + AsyncChecks(FuturesUnordered::new()) + } + + /// Push a check into the set. + pub fn push(&mut self, check: impl Future> + Send + 'static) { + self.0.push(check.boxed()); + } + + /// Push a set of checks into the set. + /// + /// This method can be daisy-chained. + pub fn and(mut self, checks: AsyncChecks) -> Self { + self.0.extend(checks.0); + self + } + + /// Wait until all checks in the set finish. /// /// If any of the checks fail, this method immediately returns the error and cancels all other /// checks by dropping them. - async fn wait_for_checks(mut checks: AsyncChecks) -> Result<(), TransactionError> { + async fn check(mut self) -> Result<(), BoxError> { // Wait for all asynchronous checks to complete // successfully, or fail verification if they error. - while let Some(check) = checks.next().await { - tracing::trace!(?check, remaining = checks.len()); + while let Some(check) = self.0.next().await { + tracing::trace!(?check, remaining = self.0.len()); check?; } Ok(()) } } + +impl FromIterator for AsyncChecks +where + F: Future> + Send + 'static, +{ + fn from_iter(iterator: I) -> Self + where + I: IntoIterator, + { + AsyncChecks(iterator.into_iter().map(FutureExt::boxed).collect()) + } +}