Return a transaction verifier from `zebra_consensus::init` (#2665)

* Return a transaction verifier from `zebra_consensus::init`

This verifier is temporarily created separately from the block verifier's
transaction verifier.

* Return the same transaction verifier used by the block verifier

* Clarify that the mempool verifier is the transaction verifier

Co-authored-by: Deirdre Connolly <deirdre@zfnd.org>

Co-authored-by: Deirdre Connolly <deirdre@zfnd.org>
Co-authored-by: Conrado Gouvea <conrado@zfnd.org>
This commit is contained in:
teor 2021-08-26 01:07:26 +10:00 committed by GitHub
parent d7eb01d7f0
commit ace7aec933
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 118 additions and 58 deletions

View File

@ -29,21 +29,21 @@ use zebra_chain::{
}; };
use zebra_state as zs; use zebra_state as zs;
use crate::{error::*, transaction as tx}; use crate::{error::*, transaction as tx, BoxError};
use crate::{script, BoxError};
pub mod check; pub mod check;
mod subsidy; mod subsidy;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
/// Asynchronous block verification. /// Asynchronous block verification.
#[derive(Debug)] #[derive(Debug)]
pub struct BlockVerifier<S> { pub struct BlockVerifier<S, V> {
/// The network to be verified. /// The network to be verified.
network: Network, network: Network,
state_service: S, state_service: S,
transaction_verifier: tx::Verifier<S>, transaction_verifier: V,
} }
// TODO: dedupe with crate::error::BlockError // TODO: dedupe with crate::error::BlockError
@ -72,18 +72,17 @@ pub enum VerifyBlockError {
Commit(#[source] BoxError), Commit(#[source] BoxError),
#[error("invalid transaction")] #[error("invalid transaction")]
Transaction(#[source] TransactionError), Transaction(#[from] TransactionError),
} }
impl<S> BlockVerifier<S> impl<S, V> BlockVerifier<S, V>
where where
S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static, S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
S::Future: Send + 'static, S::Future: Send + 'static,
V: Service<tx::Request, Response = tx::Response, Error = BoxError> + Send + Clone + 'static,
V::Future: Send + 'static,
{ {
pub fn new(network: Network, state_service: S) -> Self { pub fn new(network: Network, state_service: S, transaction_verifier: V) -> Self {
let transaction_verifier =
tx::Verifier::new(network, script::Verifier::new(state_service.clone()));
Self { Self {
network, network,
state_service, state_service,
@ -92,10 +91,12 @@ where
} }
} }
impl<S> Service<Arc<Block>> for BlockVerifier<S> impl<S, V> Service<Arc<Block>> for BlockVerifier<S, V>
where where
S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static, S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
S::Future: Send + 'static, S::Future: Send + 'static,
V: Service<tx::Request, Response = tx::Response, Error = BoxError> + Send + Clone + 'static,
V::Future: Send + 'static,
{ {
type Response = block::Hash; type Response = block::Hash;
type Error = VerifyBlockError; type Error = VerifyBlockError;
@ -195,7 +196,9 @@ where
use futures::StreamExt; use futures::StreamExt;
while let Some(result) = async_checks.next().await { while let Some(result) = async_checks.next().await {
tracing::trace!(?result, remaining = async_checks.len()); tracing::trace!(?result, remaining = async_checks.len());
result.map_err(VerifyBlockError::Transaction)?; result
.map_err(Into::into)
.map_err(VerifyBlockError::Transaction)?;
} }
// Update the metrics after all the validation is finished // Update the metrics after all the validation is finished

View File

@ -1,6 +1,6 @@
//! Tests for block verification //! Tests for block verification
use crate::parameters::SLOW_START_INTERVAL; use crate::{parameters::SLOW_START_INTERVAL, script};
use super::*; use super::*;
@ -9,7 +9,7 @@ use std::sync::Arc;
use chrono::Utc; use chrono::Utc;
use color_eyre::eyre::{eyre, Report}; use color_eyre::eyre::{eyre, Report};
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use tower::buffer::Buffer; use tower::{buffer::Buffer, util::BoxService};
use zebra_chain::{ use zebra_chain::{
block::{self, Block, Height}, block::{self, Block, Height},
@ -20,6 +20,8 @@ use zebra_chain::{
}; };
use zebra_test::transcript::{ExpectedTranscriptError, Transcript}; use zebra_test::transcript::{ExpectedTranscriptError, Transcript};
use crate::transaction;
static VALID_BLOCK_TRANSCRIPT: Lazy< static VALID_BLOCK_TRANSCRIPT: Lazy<
Vec<(Arc<Block>, Result<block::Hash, ExpectedTranscriptError>)>, Vec<(Arc<Block>, Result<block::Hash, ExpectedTranscriptError>)>,
> = Lazy::new(|| { > = Lazy::new(|| {
@ -119,7 +121,13 @@ async fn check_transcripts() -> Result<(), Report> {
let network = Network::Mainnet; let network = Network::Mainnet;
let state_service = zebra_state::init_test(network); let state_service = zebra_state::init_test(network);
let block_verifier = Buffer::new(BlockVerifier::new(network, state_service.clone()), 1); let script = script::Verifier::new(state_service.clone());
let transaction = transaction::Verifier::new(network, script);
let transaction = Buffer::new(BoxService::new(transaction), 1);
let block_verifier = Buffer::new(
BlockVerifier::new(network, state_service.clone(), transaction),
1,
);
for transcript_data in &[ for transcript_data in &[
&VALID_BLOCK_TRANSCRIPT, &VALID_BLOCK_TRANSCRIPT,

View File

@ -12,17 +12,15 @@
//! Otherwise, verification of out-of-order and invalid blocks and transactions can hang //! Otherwise, verification of out-of-order and invalid blocks and transactions can hang
//! indefinitely. //! indefinitely.
#[cfg(test)]
mod tests;
use displaydoc::Display;
use futures::{FutureExt, TryFutureExt};
use std::{ use std::{
future::Future, future::Future,
pin::Pin, pin::Pin,
sync::Arc, sync::Arc,
task::{Context, Poll}, task::{Context, Poll},
}; };
use displaydoc::Display;
use futures::{FutureExt, TryFutureExt};
use thiserror::Error; use thiserror::Error;
use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt}; use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt};
use tracing::instrument; use tracing::instrument;
@ -38,21 +36,26 @@ use crate::{
block::BlockVerifier, block::BlockVerifier,
block::VerifyBlockError, block::VerifyBlockError,
checkpoint::{CheckpointList, CheckpointVerifier, VerifyCheckpointError}, checkpoint::{CheckpointList, CheckpointVerifier, VerifyCheckpointError},
BoxError, Config, error::TransactionError,
script, transaction, BoxError, Config,
}; };
/// The bound for the chain verifier's buffer. #[cfg(test)]
mod tests;
/// The bound for the chain verifier and transaction verifier buffers.
/// ///
/// We choose the verifier buffer bound based on the maximum number of /// We choose the verifier buffer bound based on the maximum number of
/// concurrent verifier users, to avoid contention: /// concurrent verifier users, to avoid contention:
/// - the `ChainSync` component /// - the `ChainSync` block download and verify stream
/// - the `Inbound` service /// - the `Inbound` block download and verify stream
/// - a miner component, which we might add in future, and /// - the `Mempool` transaction download and verify stream
/// - a block miner component, which we might add in future, and
/// - 1 extra slot to avoid contention. /// - 1 extra slot to avoid contention.
/// ///
/// We deliberately add extra slots, because they only cost a small amount of /// We deliberately add extra slots, because they only cost a small amount of
/// memory, but missing slots can significantly slow down Zebra. /// memory, but missing slots can significantly slow down Zebra.
const VERIFIER_BUFFER_BOUND: usize = 4; const VERIFIER_BUFFER_BOUND: usize = 5;
/// The chain verifier routes requests to either the checkpoint verifier or the /// The chain verifier routes requests to either the checkpoint verifier or the
/// block verifier, depending on the maximum checkpoint height. /// block verifier, depending on the maximum checkpoint height.
@ -62,12 +65,17 @@ const VERIFIER_BUFFER_BOUND: usize = 4;
/// Block verification requests should be wrapped in a timeout, so that /// Block verification requests should be wrapped in a timeout, so that
/// out-of-order and invalid requests do not hang indefinitely. See the [`chain`](`crate::chain`) /// out-of-order and invalid requests do not hang indefinitely. See the [`chain`](`crate::chain`)
/// module documentation for details. /// module documentation for details.
struct ChainVerifier<S> struct ChainVerifier<S, V>
where where
S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static, S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
S::Future: Send + 'static, S::Future: Send + 'static,
V: Service<transaction::Request, Response = transaction::Response, Error = BoxError>
+ Send
+ Clone
+ 'static,
V::Future: Send + 'static,
{ {
block: BlockVerifier<S>, block: BlockVerifier<S, V>,
checkpoint: CheckpointVerifier<S>, checkpoint: CheckpointVerifier<S>,
max_checkpoint_height: block::Height, max_checkpoint_height: block::Height,
} }
@ -81,10 +89,15 @@ pub enum VerifyChainError {
Block(#[source] VerifyBlockError), Block(#[source] VerifyBlockError),
} }
impl<S> Service<Arc<Block>> for ChainVerifier<S> impl<S, V> Service<Arc<Block>> for ChainVerifier<S, V>
where where
S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static, S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
S::Future: Send + 'static, S::Future: Send + 'static,
V: Service<transaction::Request, Response = transaction::Response, Error = BoxError>
+ Send
+ Clone
+ 'static,
V::Future: Send + 'static,
{ {
type Response = block::Hash; type Response = block::Hash;
type Error = VerifyChainError; type Error = VerifyChainError;
@ -135,7 +148,7 @@ where
} }
} }
/// Initialize a block verification service. /// Initialize block and transaction verification services.
/// ///
/// The consensus configuration is specified by `config`, and the Zcash network /// The consensus configuration is specified by `config`, and the Zcash network
/// to verify blocks for is specified by `network`. /// to verify blocks for is specified by `network`.
@ -144,25 +157,42 @@ where
/// checks. Blocks that pass semantic verification are submitted to the supplied /// checks. Blocks that pass semantic verification are submitted to the supplied
/// `state_service` for contextual verification before being committed to the chain. /// `state_service` for contextual verification before being committed to the chain.
/// ///
/// The transaction verification service asynchronously performs semantic verification
/// checks. Transactions that pass semantic verification return an `Ok` result to the caller.
///
/// This function should only be called once for a particular state service. /// This function should only be called once for a particular state service.
/// ///
/// Dropped requests are cancelled on a best-effort basis, but may continue to be processed. /// Dropped requests are cancelled on a best-effort basis, but may continue to be processed.
/// ///
/// # Correctness /// # Correctness
/// ///
/// Block verification requests should be wrapped in a timeout, so that /// Block and transaction verification requests should be wrapped in a timeout,
/// out-of-order and invalid requests do not hang indefinitely. See the [`chain`](`crate::chain`) /// so that out-of-order and invalid requests do not hang indefinitely.
/// module documentation for details. /// See the [`chain`](`crate::chain`) module documentation for details.
#[instrument(skip(state_service))] #[instrument(skip(state_service))]
pub async fn init<S>( pub async fn init<S>(
config: Config, config: Config,
network: Network, network: Network,
mut state_service: S, mut state_service: S,
) -> Buffer<BoxService<Arc<Block>, block::Hash, VerifyChainError>, Arc<Block>> ) -> (
Buffer<BoxService<Arc<Block>, block::Hash, VerifyChainError>, Arc<Block>>,
Buffer<
BoxService<transaction::Request, transaction::Response, TransactionError>,
transaction::Request,
>,
)
where where
S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static, S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
S::Future: Send + 'static, S::Future: Send + 'static,
{ {
// transaction verification
let script = script::Verifier::new(state_service.clone());
let transaction = transaction::Verifier::new(network, script);
let transaction = Buffer::new(BoxService::new(transaction), VERIFIER_BUFFER_BOUND);
// block verification
let list = CheckpointList::new(network); let list = CheckpointList::new(network);
let max_checkpoint_height = if config.checkpoint_sync { let max_checkpoint_height = if config.checkpoint_sync {
@ -185,15 +215,15 @@ where
}; };
tracing::info!(?tip, ?max_checkpoint_height, "initializing chain verifier"); tracing::info!(?tip, ?max_checkpoint_height, "initializing chain verifier");
let block = BlockVerifier::new(network, state_service.clone()); let block = BlockVerifier::new(network, state_service.clone(), transaction.clone());
let checkpoint = CheckpointVerifier::from_checkpoint_list(list, network, tip, state_service); let checkpoint = CheckpointVerifier::from_checkpoint_list(list, network, tip, state_service);
let chain = ChainVerifier {
Buffer::new(
BoxService::new(ChainVerifier {
block, block,
checkpoint, checkpoint,
max_checkpoint_height, max_checkpoint_height,
}), };
VERIFIER_BUFFER_BOUND,
) let chain = Buffer::new(BoxService::new(chain), VERIFIER_BUFFER_BOUND);
(chain, transaction)
} }

View File

@ -64,7 +64,7 @@ async fn verifiers_from_network(
+ 'static, + 'static,
) { ) {
let state_service = zs::init_test(network); let state_service = zs::init_test(network);
let chain_verifier = let (chain_verifier, _transaction_verifier) =
crate::chain::init(Config::default(), network, state_service.clone()).await; crate::chain::init(Config::default(), network, state_service.clone()).await;
(chain_verifier, state_service) (chain_verifier, state_service)
@ -153,7 +153,8 @@ async fn verify_checkpoint(config: Config) -> Result<(), Report> {
// Test that the chain::init function works. Most of the other tests use // Test that the chain::init function works. Most of the other tests use
// init_from_verifiers. // init_from_verifiers.
let chain_verifier = super::init(config.clone(), network, zs::init_test(network)).await; let (chain_verifier, _transaction_verifier) =
super::init(config.clone(), network, zs::init_test(network)).await;
// Add a timeout layer // Add a timeout layer
let chain_verifier = let chain_verifier =

View File

@ -90,15 +90,23 @@ pub enum TransactionError {
} }
impl From<BoxError> for TransactionError { impl From<BoxError> for TransactionError {
fn from(err: BoxError) -> Self { fn from(mut err: BoxError) -> Self {
// TODO: handle redpallas Error? // TODO: handle redpallas::Error, ScriptInvalid, InvalidSignature
match err.downcast::<zebra_chain::primitives::redjubjub::Error>() { match err.downcast::<zebra_chain::primitives::redjubjub::Error>() {
Ok(e) => TransactionError::RedJubjub(*e), Ok(e) => return TransactionError::RedJubjub(*e),
Err(e) => TransactionError::InternalDowncastError(format!( Err(e) => err = e,
"downcast to redjubjub::Error failed, original error: {:?}",
e
)),
} }
// buffered transaction verifier service error
match err.downcast::<TransactionError>() {
Ok(e) => return *e,
Err(e) => err = e,
}
TransactionError::InternalDowncastError(format!(
"downcast to known transaction error type failed, original error: {:?}",
err,
))
} }
} }

View File

@ -90,6 +90,10 @@ pub enum Request {
}, },
} }
/// The response type for the transaction verifier service.
/// Responses identify the transaction that was verified.
pub type Response = zebra_chain::transaction::Hash;
impl Request { impl Request {
/// The transaction to verify that's in this request. /// The transaction to verify that's in this request.
pub fn transaction(&self) -> Arc<Transaction> { pub fn transaction(&self) -> Arc<Transaction> {
@ -127,7 +131,7 @@ where
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static, ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
ZS::Future: Send + 'static, ZS::Future: Send + 'static,
{ {
type Response = transaction::Hash; type Response = Response;
type Error = TransactionError; type Error = TransactionError;
type Future = type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>; Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;

View File

@ -406,7 +406,8 @@ async fn v4_transaction_with_transparent_transfer_is_rejected_by_the_script() {
assert_eq!( assert_eq!(
result, result,
Err(TransactionError::InternalDowncastError( Err(TransactionError::InternalDowncastError(
"downcast to redjubjub::Error failed, original error: ScriptInvalid".to_string() "downcast to known transaction error type failed, original error: ScriptInvalid"
.to_string()
)) ))
); );
} }
@ -683,7 +684,7 @@ fn v4_with_unsigned_sprout_transfer_is_rejected() {
// TODO: Fix error downcast // TODO: Fix error downcast
// Err(TransactionError::Ed25519(ed25519::Error::InvalidSignature)) // Err(TransactionError::Ed25519(ed25519::Error::InvalidSignature))
TransactionError::InternalDowncastError( TransactionError::InternalDowncastError(
"downcast to redjubjub::Error failed, original error: InvalidSignature" "downcast to known transaction error type failed, original error: InvalidSignature"
.to_string(), .to_string(),
) )
) )

View File

@ -55,7 +55,8 @@ impl StartCmd {
let state = ServiceBuilder::new().buffer(20).service(state_service); let state = ServiceBuilder::new().buffer(20).service(state_service);
info!("initializing verifiers"); info!("initializing verifiers");
let verifier = zebra_consensus::chain::init( // TODO: use the transaction verifier to verify mempool transactions (#2637, #2606)
let (chain_verifier, _tx_verifier) = zebra_consensus::chain::init(
config.consensus.clone(), config.consensus.clone(),
config.network.network, config.network.network,
state.clone(), state.clone(),
@ -70,7 +71,11 @@ impl StartCmd {
let inbound = ServiceBuilder::new() let inbound = ServiceBuilder::new()
.load_shed() .load_shed()
.buffer(20) .buffer(20)
.service(Inbound::new(setup_rx, state.clone(), verifier.clone())); .service(Inbound::new(
setup_rx,
state.clone(),
chain_verifier.clone(),
));
let (peer_set, address_book) = let (peer_set, address_book) =
zebra_network::init(config.network.clone(), inbound, Some(best_tip_height)).await; zebra_network::init(config.network.clone(), inbound, Some(best_tip_height)).await;
@ -81,7 +86,7 @@ impl StartCmd {
info!("initializing syncer"); info!("initializing syncer");
// TODO: use sync_length_receiver to activate the mempool (#2592) // TODO: use sync_length_receiver to activate the mempool (#2592)
let (syncer, _sync_length_receiver) = let (syncer, _sync_length_receiver) =
ChainSync::new(&config, peer_set.clone(), state, verifier); ChainSync::new(&config, peer_set.clone(), state, chain_verifier);
select! { select! {
result = syncer.sync().fuse() => result, result = syncer.sync().fuse() => result,