diff --git a/zebra-consensus/src/block.rs b/zebra-consensus/src/block.rs index 2657b09c..85772e64 100644 --- a/zebra-consensus/src/block.rs +++ b/zebra-consensus/src/block.rs @@ -29,21 +29,21 @@ use zebra_chain::{ }; use zebra_state as zs; -use crate::{error::*, transaction as tx}; -use crate::{script, BoxError}; +use crate::{error::*, transaction as tx, BoxError}; pub mod check; mod subsidy; + #[cfg(test)] mod tests; /// Asynchronous block verification. #[derive(Debug)] -pub struct BlockVerifier { +pub struct BlockVerifier { /// The network to be verified. network: Network, state_service: S, - transaction_verifier: tx::Verifier, + transaction_verifier: V, } // TODO: dedupe with crate::error::BlockError @@ -72,18 +72,17 @@ pub enum VerifyBlockError { Commit(#[source] BoxError), #[error("invalid transaction")] - Transaction(#[source] TransactionError), + Transaction(#[from] TransactionError), } -impl BlockVerifier +impl BlockVerifier where S: Service + Send + Clone + 'static, S::Future: Send + 'static, + V: Service + Send + Clone + 'static, + V::Future: Send + 'static, { - pub fn new(network: Network, state_service: S) -> Self { - let transaction_verifier = - tx::Verifier::new(network, script::Verifier::new(state_service.clone())); - + pub fn new(network: Network, state_service: S, transaction_verifier: V) -> Self { Self { network, state_service, @@ -92,10 +91,12 @@ where } } -impl Service> for BlockVerifier +impl Service> for BlockVerifier where S: Service + Send + Clone + 'static, S::Future: Send + 'static, + V: Service + Send + Clone + 'static, + V::Future: Send + 'static, { type Response = block::Hash; type Error = VerifyBlockError; @@ -195,7 +196,9 @@ where use futures::StreamExt; while let Some(result) = async_checks.next().await { tracing::trace!(?result, remaining = async_checks.len()); - result.map_err(VerifyBlockError::Transaction)?; + result + .map_err(Into::into) + .map_err(VerifyBlockError::Transaction)?; } // Update the metrics after all the validation is finished diff --git a/zebra-consensus/src/block/tests.rs b/zebra-consensus/src/block/tests.rs index 035d1929..601d2f8a 100644 --- a/zebra-consensus/src/block/tests.rs +++ b/zebra-consensus/src/block/tests.rs @@ -1,6 +1,6 @@ //! Tests for block verification -use crate::parameters::SLOW_START_INTERVAL; +use crate::{parameters::SLOW_START_INTERVAL, script}; use super::*; @@ -9,7 +9,7 @@ use std::sync::Arc; use chrono::Utc; use color_eyre::eyre::{eyre, Report}; use once_cell::sync::Lazy; -use tower::buffer::Buffer; +use tower::{buffer::Buffer, util::BoxService}; use zebra_chain::{ block::{self, Block, Height}, @@ -20,6 +20,8 @@ use zebra_chain::{ }; use zebra_test::transcript::{ExpectedTranscriptError, Transcript}; +use crate::transaction; + static VALID_BLOCK_TRANSCRIPT: Lazy< Vec<(Arc, Result)>, > = Lazy::new(|| { @@ -119,7 +121,13 @@ async fn check_transcripts() -> Result<(), Report> { let network = Network::Mainnet; let state_service = zebra_state::init_test(network); - let block_verifier = Buffer::new(BlockVerifier::new(network, state_service.clone()), 1); + let script = script::Verifier::new(state_service.clone()); + let transaction = transaction::Verifier::new(network, script); + let transaction = Buffer::new(BoxService::new(transaction), 1); + let block_verifier = Buffer::new( + BlockVerifier::new(network, state_service.clone(), transaction), + 1, + ); for transcript_data in &[ &VALID_BLOCK_TRANSCRIPT, diff --git a/zebra-consensus/src/chain.rs b/zebra-consensus/src/chain.rs index 7daa8c2f..5e0d946a 100644 --- a/zebra-consensus/src/chain.rs +++ b/zebra-consensus/src/chain.rs @@ -12,17 +12,15 @@ //! Otherwise, verification of out-of-order and invalid blocks and transactions can hang //! indefinitely. -#[cfg(test)] -mod tests; - -use displaydoc::Display; -use futures::{FutureExt, TryFutureExt}; use std::{ future::Future, pin::Pin, sync::Arc, task::{Context, Poll}, }; + +use displaydoc::Display; +use futures::{FutureExt, TryFutureExt}; use thiserror::Error; use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt}; use tracing::instrument; @@ -38,21 +36,26 @@ use crate::{ block::BlockVerifier, block::VerifyBlockError, checkpoint::{CheckpointList, CheckpointVerifier, VerifyCheckpointError}, - BoxError, Config, + error::TransactionError, + script, transaction, BoxError, Config, }; -/// The bound for the chain verifier's buffer. +#[cfg(test)] +mod tests; + +/// The bound for the chain verifier and transaction verifier buffers. /// /// We choose the verifier buffer bound based on the maximum number of /// concurrent verifier users, to avoid contention: -/// - the `ChainSync` component -/// - the `Inbound` service -/// - a miner component, which we might add in future, and +/// - the `ChainSync` block download and verify stream +/// - the `Inbound` block download and verify stream +/// - the `Mempool` transaction download and verify stream +/// - a block miner component, which we might add in future, and /// - 1 extra slot to avoid contention. /// /// We deliberately add extra slots, because they only cost a small amount of /// memory, but missing slots can significantly slow down Zebra. -const VERIFIER_BUFFER_BOUND: usize = 4; +const VERIFIER_BUFFER_BOUND: usize = 5; /// The chain verifier routes requests to either the checkpoint verifier or the /// block verifier, depending on the maximum checkpoint height. @@ -62,12 +65,17 @@ const VERIFIER_BUFFER_BOUND: usize = 4; /// Block verification requests should be wrapped in a timeout, so that /// out-of-order and invalid requests do not hang indefinitely. See the [`chain`](`crate::chain`) /// module documentation for details. -struct ChainVerifier +struct ChainVerifier where S: Service + Send + Clone + 'static, S::Future: Send + 'static, + V: Service + + Send + + Clone + + 'static, + V::Future: Send + 'static, { - block: BlockVerifier, + block: BlockVerifier, checkpoint: CheckpointVerifier, max_checkpoint_height: block::Height, } @@ -81,10 +89,15 @@ pub enum VerifyChainError { Block(#[source] VerifyBlockError), } -impl Service> for ChainVerifier +impl Service> for ChainVerifier where S: Service + Send + Clone + 'static, S::Future: Send + 'static, + V: Service + + Send + + Clone + + 'static, + V::Future: Send + 'static, { type Response = block::Hash; type Error = VerifyChainError; @@ -135,7 +148,7 @@ where } } -/// Initialize a block verification service. +/// Initialize block and transaction verification services. /// /// The consensus configuration is specified by `config`, and the Zcash network /// to verify blocks for is specified by `network`. @@ -144,25 +157,42 @@ where /// checks. Blocks that pass semantic verification are submitted to the supplied /// `state_service` for contextual verification before being committed to the chain. /// +/// The transaction verification service asynchronously performs semantic verification +/// checks. Transactions that pass semantic verification return an `Ok` result to the caller. +/// /// This function should only be called once for a particular state service. /// /// Dropped requests are cancelled on a best-effort basis, but may continue to be processed. /// /// # Correctness /// -/// Block verification requests should be wrapped in a timeout, so that -/// out-of-order and invalid requests do not hang indefinitely. See the [`chain`](`crate::chain`) -/// module documentation for details. +/// Block and transaction verification requests should be wrapped in a timeout, +/// so that out-of-order and invalid requests do not hang indefinitely. +/// See the [`chain`](`crate::chain`) module documentation for details. #[instrument(skip(state_service))] pub async fn init( config: Config, network: Network, mut state_service: S, -) -> Buffer, block::Hash, VerifyChainError>, Arc> +) -> ( + Buffer, block::Hash, VerifyChainError>, Arc>, + Buffer< + BoxService, + transaction::Request, + >, +) where S: Service + Send + Clone + 'static, S::Future: Send + 'static, { + // transaction verification + + let script = script::Verifier::new(state_service.clone()); + let transaction = transaction::Verifier::new(network, script); + let transaction = Buffer::new(BoxService::new(transaction), VERIFIER_BUFFER_BOUND); + + // block verification + let list = CheckpointList::new(network); let max_checkpoint_height = if config.checkpoint_sync { @@ -185,15 +215,15 @@ where }; tracing::info!(?tip, ?max_checkpoint_height, "initializing chain verifier"); - let block = BlockVerifier::new(network, state_service.clone()); + let block = BlockVerifier::new(network, state_service.clone(), transaction.clone()); let checkpoint = CheckpointVerifier::from_checkpoint_list(list, network, tip, state_service); + let chain = ChainVerifier { + block, + checkpoint, + max_checkpoint_height, + }; - Buffer::new( - BoxService::new(ChainVerifier { - block, - checkpoint, - max_checkpoint_height, - }), - VERIFIER_BUFFER_BOUND, - ) + let chain = Buffer::new(BoxService::new(chain), VERIFIER_BUFFER_BOUND); + + (chain, transaction) } diff --git a/zebra-consensus/src/chain/tests.rs b/zebra-consensus/src/chain/tests.rs index 53b168cc..f414baa5 100644 --- a/zebra-consensus/src/chain/tests.rs +++ b/zebra-consensus/src/chain/tests.rs @@ -64,7 +64,7 @@ async fn verifiers_from_network( + 'static, ) { let state_service = zs::init_test(network); - let chain_verifier = + let (chain_verifier, _transaction_verifier) = crate::chain::init(Config::default(), network, state_service.clone()).await; (chain_verifier, state_service) @@ -153,7 +153,8 @@ async fn verify_checkpoint(config: Config) -> Result<(), Report> { // Test that the chain::init function works. Most of the other tests use // init_from_verifiers. - let chain_verifier = super::init(config.clone(), network, zs::init_test(network)).await; + let (chain_verifier, _transaction_verifier) = + super::init(config.clone(), network, zs::init_test(network)).await; // Add a timeout layer let chain_verifier = diff --git a/zebra-consensus/src/error.rs b/zebra-consensus/src/error.rs index 0e324ec7..50a1ad75 100644 --- a/zebra-consensus/src/error.rs +++ b/zebra-consensus/src/error.rs @@ -90,15 +90,23 @@ pub enum TransactionError { } impl From for TransactionError { - fn from(err: BoxError) -> Self { - // TODO: handle redpallas Error? + fn from(mut err: BoxError) -> Self { + // TODO: handle redpallas::Error, ScriptInvalid, InvalidSignature match err.downcast::() { - Ok(e) => TransactionError::RedJubjub(*e), - Err(e) => TransactionError::InternalDowncastError(format!( - "downcast to redjubjub::Error failed, original error: {:?}", - e - )), + Ok(e) => return TransactionError::RedJubjub(*e), + Err(e) => err = e, } + + // buffered transaction verifier service error + match err.downcast::() { + Ok(e) => return *e, + Err(e) => err = e, + } + + TransactionError::InternalDowncastError(format!( + "downcast to known transaction error type failed, original error: {:?}", + err, + )) } } diff --git a/zebra-consensus/src/transaction.rs b/zebra-consensus/src/transaction.rs index 79143688..626c6da6 100644 --- a/zebra-consensus/src/transaction.rs +++ b/zebra-consensus/src/transaction.rs @@ -90,6 +90,10 @@ pub enum Request { }, } +/// The response type for the transaction verifier service. +/// Responses identify the transaction that was verified. +pub type Response = zebra_chain::transaction::Hash; + impl Request { /// The transaction to verify that's in this request. pub fn transaction(&self) -> Arc { @@ -127,7 +131,7 @@ where ZS: Service + Send + Clone + 'static, ZS::Future: Send + 'static, { - type Response = transaction::Hash; + type Response = Response; type Error = TransactionError; type Future = Pin> + Send + 'static>>; diff --git a/zebra-consensus/src/transaction/tests.rs b/zebra-consensus/src/transaction/tests.rs index 039f8493..e5aec48d 100644 --- a/zebra-consensus/src/transaction/tests.rs +++ b/zebra-consensus/src/transaction/tests.rs @@ -406,7 +406,8 @@ async fn v4_transaction_with_transparent_transfer_is_rejected_by_the_script() { assert_eq!( result, Err(TransactionError::InternalDowncastError( - "downcast to redjubjub::Error failed, original error: ScriptInvalid".to_string() + "downcast to known transaction error type failed, original error: ScriptInvalid" + .to_string() )) ); } @@ -683,7 +684,7 @@ fn v4_with_unsigned_sprout_transfer_is_rejected() { // TODO: Fix error downcast // Err(TransactionError::Ed25519(ed25519::Error::InvalidSignature)) TransactionError::InternalDowncastError( - "downcast to redjubjub::Error failed, original error: InvalidSignature" + "downcast to known transaction error type failed, original error: InvalidSignature" .to_string(), ) ) diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index ed74c45d..5a71dc2a 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -55,7 +55,8 @@ impl StartCmd { let state = ServiceBuilder::new().buffer(20).service(state_service); info!("initializing verifiers"); - let verifier = zebra_consensus::chain::init( + // TODO: use the transaction verifier to verify mempool transactions (#2637, #2606) + let (chain_verifier, _tx_verifier) = zebra_consensus::chain::init( config.consensus.clone(), config.network.network, state.clone(), @@ -70,7 +71,11 @@ impl StartCmd { let inbound = ServiceBuilder::new() .load_shed() .buffer(20) - .service(Inbound::new(setup_rx, state.clone(), verifier.clone())); + .service(Inbound::new( + setup_rx, + state.clone(), + chain_verifier.clone(), + )); let (peer_set, address_book) = zebra_network::init(config.network.clone(), inbound, Some(best_tip_height)).await; @@ -81,7 +86,7 @@ impl StartCmd { info!("initializing syncer"); // TODO: use sync_length_receiver to activate the mempool (#2592) let (syncer, _sync_length_receiver) = - ChainSync::new(&config, peer_set.clone(), state, verifier); + ChainSync::new(&config, peer_set.clone(), state, chain_verifier); select! { result = syncer.sync().fuse() => result,