From e05103323e95f76e2b8325c9239846cb2c3eaf71 Mon Sep 17 00:00:00 2001 From: Jane Lusby Date: Wed, 14 Oct 2020 14:06:32 -0700 Subject: [PATCH] Implement Async Script Verification RFC (#961) This commit begins the process of integrating `zcash_script` with the rest of the system for verifying scripts while syncing the block chain. It does so by adding the necessary support for looking up UTXOs from the state service and implements the first parts of the `script::Verifier` for looking up the necessary UTXOs in the state and then generating the necessary call to `zcash_script` to verify the script itself. Co-authored-by: teor --- Cargo.lock | 1 + zebra-consensus/Cargo.toml | 1 + zebra-consensus/src/lib.rs | 2 +- zebra-consensus/src/script.rs | 79 +++++++++++++- zebra-state/src/request.rs | 5 +- zebra-state/src/response.rs | 4 + zebra-state/src/service.rs | 32 ++++++ zebra-state/src/service/memory_state.rs | 37 +++++-- zebra-state/src/service/utxo.rs | 69 +++++++++++++ zebra-state/src/sled_state.rs | 132 +++++++++++++++++++++--- 10 files changed, 330 insertions(+), 32 deletions(-) create mode 100644 zebra-state/src/service/utxo.rs diff --git a/Cargo.lock b/Cargo.lock index 95f15369..b0992155 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3203,6 +3203,7 @@ dependencies = [ "tracing-futures", "tracing-subscriber 0.2.13", "zebra-chain", + "zebra-script", "zebra-state", "zebra-test", ] diff --git a/zebra-consensus/Cargo.toml b/zebra-consensus/Cargo.toml index 2ab3ccc5..8270752b 100644 --- a/zebra-consensus/Cargo.toml +++ b/zebra-consensus/Cargo.toml @@ -27,6 +27,7 @@ tower-fallback = { path = "../tower-fallback/" } tower-batch = { path = "../tower-batch/" } zebra-chain = { path = "../zebra-chain" } zebra-state = { path = "../zebra-state" } +zebra-script = { path = "../zebra-script" } displaydoc = "0.1.7" [dev-dependencies] diff --git a/zebra-consensus/src/lib.rs b/zebra-consensus/src/lib.rs index f9bf88a6..02d53fca 100644 --- a/zebra-consensus/src/lib.rs +++ b/zebra-consensus/src/lib.rs @@ -23,10 +23,10 @@ pub mod config; pub mod error; pub mod mempool; pub mod parameters; +pub mod script; #[allow(dead_code)] // Remove this once transaction verification is implemented mod primitives; -mod script; mod transaction; pub use crate::config::Config; diff --git a/zebra-consensus/src/script.rs b/zebra-consensus/src/script.rs index 80b0ab68..07e4af5a 100644 --- a/zebra-consensus/src/script.rs +++ b/zebra-consensus/src/script.rs @@ -12,11 +12,82 @@ //! This is an internal module. Use `verify::BlockVerifier` for blocks and their //! transactions, or `mempool::MempoolTransactionVerifier` for mempool transactions. +use std::{pin::Pin, sync::Arc}; + +use std::future::Future; +use zebra_chain::{parameters::ConsensusBranchId, transaction::Transaction, transparent}; + +use crate::BoxError; + /// Internal script verification service. /// /// After verification, the script future completes. State changes are handled by /// `BlockVerifier` or `MempoolTransactionVerifier`. -/// -/// `ScriptVerifier` is not yet implemented. -#[derive(Default)] -pub(crate) struct ScriptVerifier {} +pub struct Verifier { + state: ZS, + branch: ConsensusBranchId, +} + +impl Verifier { + pub fn new(state: ZS, branch: ConsensusBranchId) -> Self { + Self { state, branch } + } +} + +#[derive(Debug)] +struct Request { + transaction: Arc, + input_index: usize, +} + +impl tower::Service for Verifier +where + ZS: tower::Service, + ZS::Future: Send + 'static, +{ + type Response = (); + type Error = BoxError; + type Future = + Pin> + Send + 'static>>; + + fn poll_ready( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.state.poll_ready(cx) + } + + fn call(&mut self, req: Request) -> Self::Future { + use futures_util::FutureExt; + + let input = &req.transaction.inputs()[req.input_index]; + + match input { + transparent::Input::PrevOut { outpoint, .. } => { + let output = self.state.call(zebra_state::Request::AwaitUtxo(*outpoint)); + let transaction = req.transaction; + let branch_id = self.branch; + let input_index = req.input_index; + + async move { + let previous_output = match output.await? { + zebra_state::Response::Utxo(output) => output, + _ => unreachable!("AwaitUtxo always responds with Utxo"), + }; + + zebra_script::is_valid( + transaction, + branch_id, + (input_index as u32, previous_output), + )?; + + Ok(()) + } + .boxed() + } + transparent::Input::Coinbase { .. } => { + async { Err("unexpected coinbase input".into()) }.boxed() + } + } + } +} diff --git a/zebra-state/src/request.rs b/zebra-state/src/request.rs index 29fddf86..e02b128f 100644 --- a/zebra-state/src/request.rs +++ b/zebra-state/src/request.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use zebra_chain::{ block::{self, Block}, - transaction, + transaction, transparent, }; // Allow *only* this unused import, so that rustdoc link resolution @@ -107,4 +107,7 @@ pub enum Request { /// Note: the [`HashOrHeight`] can be constructed from a [`block::Hash`] or /// [`block::Height`] using `.into()`. Block(HashOrHeight), + + /// Request a UTXO identified by the given Outpoint + AwaitUtxo(transparent::OutPoint), } diff --git a/zebra-state/src/response.rs b/zebra-state/src/response.rs index df7f9f69..b171bfe6 100644 --- a/zebra-state/src/response.rs +++ b/zebra-state/src/response.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use zebra_chain::{ block::{self, Block}, transaction::Transaction, + transparent, }; // Allow *only* this unused import, so that rustdoc link resolution @@ -30,4 +31,7 @@ pub enum Response { /// Response to [`Request::Block`] with the specified block. Block(Option>), + + /// The response to a `AwaitUtxo` request + Utxo(transparent::Output), } diff --git a/zebra-state/src/service.rs b/zebra-state/src/service.rs index 196b9e1e..1b862f01 100644 --- a/zebra-state/src/service.rs +++ b/zebra-state/src/service.rs @@ -3,6 +3,8 @@ use std::{ pin::Pin, sync::Arc, task::{Context, Poll}, + time::Duration, + time::Instant, }; use futures::future::{FutureExt, TryFutureExt}; @@ -21,6 +23,7 @@ use crate::{ }; mod memory_state; +mod utxo; // todo: put this somewhere #[derive(Debug)] @@ -39,18 +42,27 @@ struct StateService { mem: NonFinalizedState, /// Blocks awaiting their parent blocks for contextual verification. queued_blocks: QueuedBlocks, + /// The set of outpoints with pending requests for their associated transparent::Output + pending_utxos: utxo::PendingUtxos, + /// Instant tracking the last time `pending_utxos` was pruned + last_prune: Instant, } impl StateService { + const PRUNE_INTERVAL: Duration = Duration::from_secs(30); + pub fn new(config: Config, network: Network) -> Self { let sled = FinalizedState::new(&config, network); let mem = NonFinalizedState::default(); let queued_blocks = QueuedBlocks::default(); + let pending_utxos = utxo::PendingUtxos::default(); Self { sled, mem, queued_blocks, + pending_utxos, + last_prune: Instant::now(), } } @@ -154,6 +166,13 @@ impl Service for StateService { Pin> + Send + 'static>>; fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + let now = Instant::now(); + + if self.last_prune + Self::PRUNE_INTERVAL < now { + self.pending_utxos.prune(); + self.last_prune = now; + } + Poll::Ready(Ok(())) } @@ -162,6 +181,7 @@ impl Service for StateService { Request::CommitBlock { block } => { let (rsp_tx, mut rsp_rx) = broadcast::channel(1); + self.pending_utxos.check_block(&block); self.queue_and_commit_non_finalized_blocks(QueuedBlock { block, rsp_tx }); async move { @@ -177,6 +197,7 @@ impl Service for StateService { Request::CommitFinalizedBlock { block } => { let (rsp_tx, mut rsp_rx) = broadcast::channel(1); + self.pending_utxos.check_block(&block); self.sled .queue_and_commit_finalized_blocks(QueuedBlock { block, rsp_tx }); @@ -213,6 +234,17 @@ impl Service for StateService { .map_ok(Response::Block) .boxed() } + Request::AwaitUtxo(outpoint) => { + let fut = self.pending_utxos.queue(outpoint); + + if let Some(finalized_utxo) = self.sled.utxo(&outpoint).unwrap() { + self.pending_utxos.respond(outpoint, finalized_utxo); + } else if let Some(non_finalized_utxo) = self.mem.utxo(&outpoint) { + self.pending_utxos.respond(outpoint, non_finalized_utxo); + } + + fut.boxed() + } } } } diff --git a/zebra-state/src/service/memory_state.rs b/zebra-state/src/service/memory_state.rs index 3d47af30..576c3da8 100644 --- a/zebra-state/src/service/memory_state.rs +++ b/zebra-state/src/service/memory_state.rs @@ -26,7 +26,7 @@ struct Chain { height_by_hash: HashMap, tx_by_hash: HashMap, - created_utxos: HashSet, + created_utxos: HashMap, spent_utxos: HashSet, sprout_anchors: HashSet, sapling_anchors: HashSet, @@ -258,11 +258,14 @@ impl UpdateWith<(transaction::Hash, &Vec)> for Chain { &mut self, (transaction_hash, outputs): &(transaction::Hash, &Vec), ) { - for (utxo_index, _) in outputs.iter().enumerate() { - self.created_utxos.insert(transparent::OutPoint { - hash: *transaction_hash, - index: utxo_index as u32, - }); + for (utxo_index, output) in outputs.iter().enumerate() { + self.created_utxos.insert( + transparent::OutPoint { + hash: *transaction_hash, + index: utxo_index as u32, + }, + output.clone(), + ); } } @@ -272,10 +275,12 @@ impl UpdateWith<(transaction::Hash, &Vec)> for Chain { ) { for (utxo_index, _) in outputs.iter().enumerate() { assert!( - self.created_utxos.remove(&transparent::OutPoint { - hash: *transaction_hash, - index: utxo_index as u32, - }), + self.created_utxos + .remove(&transparent::OutPoint { + hash: *transaction_hash, + index: utxo_index as u32, + }) + .is_some(), "created_utxos must be present if block was" ); } @@ -530,6 +535,18 @@ impl NonFinalizedState { None } + + /// Returns the `transparent::Output` pointed to by the given + /// `transparent::OutPoint` if it is present. + pub fn utxo(&self, outpoint: &transparent::OutPoint) -> Option { + for chain in self.chain_set.iter().rev() { + if let Some(output) = chain.created_utxos.get(outpoint) { + return Some(output.clone()); + } + } + + None + } } /// A queue of blocks, awaiting the arrival of parent blocks. diff --git a/zebra-state/src/service/utxo.rs b/zebra-state/src/service/utxo.rs new file mode 100644 index 00000000..8f4736ad --- /dev/null +++ b/zebra-state/src/service/utxo.rs @@ -0,0 +1,69 @@ +#![allow(dead_code)] +use crate::{BoxError, Response}; +use std::collections::HashMap; +use std::future::Future; +use tokio::sync::broadcast; +use zebra_chain::{block::Block, transparent}; + +#[derive(Debug, Default)] +pub struct PendingUtxos(HashMap>); + +impl PendingUtxos { + /// Returns a future that will resolve to the `transparent::Output` pointed + /// to by the given `transparent::OutPoint` when it is available. + pub fn queue( + &mut self, + outpoint: transparent::OutPoint, + ) -> impl Future> { + let mut receiver = self + .0 + .entry(outpoint) + .or_insert_with(|| { + let (sender, _) = broadcast::channel(1); + sender + }) + .subscribe(); + + async move { + receiver + .recv() + .await + .map(Response::Utxo) + .map_err(BoxError::from) + } + } + + /// Notify all utxo requests waiting for the `transparent::Output` pointed to + /// by the given `transparent::OutPoint` that the `Output` has arrived. + pub fn respond(&mut self, outpoint: transparent::OutPoint, output: transparent::Output) { + if let Some(sender) = self.0.remove(&outpoint) { + let _ = sender.send(output); + } + } + + /// For each notifies waiting utxo requests for each `transparent::Output` in + /// `block` that the output has arrived. + pub fn check_block(&mut self, block: &Block) { + if self.0.is_empty() { + return; + } + + for transaction in block.transactions.iter() { + let transaction_hash = transaction.hash(); + for (index, output) in transaction.outputs().iter().enumerate() { + let outpoint = transparent::OutPoint { + hash: transaction_hash, + index: index as _, + }; + + self.respond(outpoint, output.clone()); + } + } + } + + /// Scan the set of waiting utxo requests for channels where all recievers + /// have been dropped and remove the corresponding sender. + pub fn prune(&mut self) { + self.0.retain(|_, chan| chan.receiver_count() > 0); + } +} diff --git a/zebra-state/src/sled_state.rs b/zebra-state/src/sled_state.rs index 8a788e1a..1fe23ed1 100644 --- a/zebra-state/src/sled_state.rs +++ b/zebra-state/src/sled_state.rs @@ -3,11 +3,14 @@ use std::{collections::HashMap, convert::TryInto, future::Future, sync::Arc}; use tracing::trace; -use zebra_chain::serialization::{ZcashDeserialize, ZcashSerialize}; use zebra_chain::{ block::{self, Block}, parameters::{Network, GENESIS_PREVIOUS_BLOCK_HASH}, }; +use zebra_chain::{ + serialization::{ZcashDeserialize, ZcashSerialize}, + transparent, +}; use crate::{BoxError, Config, HashOrHeight, QueuedBlock}; @@ -36,13 +39,83 @@ pub struct FinalizedState { height_by_hash: sled::Tree, block_by_height: sled::Tree, // tx_by_hash: sled::Tree, - // utxo_by_outpoint: sled::Tree, + utxo_by_outpoint: sled::Tree, // sprout_nullifiers: sled::Tree, // sapling_nullifiers: sled::Tree, // sprout_anchors: sled::Tree, // sapling_anchors: sled::Tree, } +/// Helper trait for inserting (Key, Value) pairs into sled when both the key and +/// value implement ZcashSerialize. +trait SledSerialize { + /// Serialize and insert the given key and value into a sled tree. + fn zs_insert( + &self, + key: &K, + value: &V, + ) -> Result<(), sled::transaction::UnabortableTransactionError> + where + K: ZcashSerialize, + V: ZcashSerialize; +} + +/// Helper trait for retrieving values from sled trees when the key and value +/// implement ZcashSerialize/ZcashDeserialize. +trait SledDeserialize { + /// Serialize the given key and use that to get and deserialize the + /// corresponding value from a sled tree, if it is present. + fn zs_get(&self, key: &K) -> Result, BoxError> + where + K: ZcashSerialize, + V: ZcashDeserialize; +} + +impl SledSerialize for sled::transaction::TransactionalTree { + fn zs_insert( + &self, + key: &K, + value: &V, + ) -> Result<(), sled::transaction::UnabortableTransactionError> + where + K: ZcashSerialize, + V: ZcashSerialize, + { + let key_bytes = key + .zcash_serialize_to_vec() + .expect("serializing into a vec won't fail"); + + let value_bytes = value + .zcash_serialize_to_vec() + .expect("serializing into a vec won't fail"); + + self.insert(key_bytes, value_bytes)?; + + Ok(()) + } +} + +impl SledDeserialize for sled::Tree { + fn zs_get(&self, key: &K) -> Result, BoxError> + where + K: ZcashSerialize, + V: ZcashDeserialize, + { + let key_bytes = key + .zcash_serialize_to_vec() + .expect("serializing into a vec won't fail"); + + let value_bytes = self.get(&key_bytes)?; + + let value = value_bytes + .as_deref() + .map(ZcashDeserialize::zcash_deserialize) + .transpose()?; + + Ok(value) + } +} + impl FinalizedState { pub fn new(config: &Config, network: Network) -> Self { let db = config.sled_config(network).open().unwrap(); @@ -53,7 +126,7 @@ impl FinalizedState { height_by_hash: db.open_tree(b"height_by_hash").unwrap(), block_by_height: db.open_tree(b"block_by_height").unwrap(), // tx_by_hash: db.open_tree(b"tx_by_hash").unwrap(), - // utxo_by_outpoint: db.open_tree(b"utxo_by_outpoint").unwrap(), + utxo_by_outpoint: db.open_tree(b"utxo_by_outpoint").unwrap(), // sprout_nullifiers: db.open_tree(b"sprout_nullifiers").unwrap(), // sapling_nullifiers: db.open_tree(b"sapling_nullifiers").unwrap(), } @@ -115,23 +188,41 @@ impl FinalizedState { &self.hash_by_height, &self.height_by_hash, &self.block_by_height, + &self.utxo_by_outpoint, ) - .transaction(move |(hash_by_height, height_by_hash, block_by_height)| { - // TODO: do serialization above - // for some reason this wouldn't move into the closure (??) - let block_bytes = block - .zcash_serialize_to_vec() - .expect("zcash_serialize_to_vec has wrong return type"); + .transaction( + move |(hash_by_height, height_by_hash, block_by_height, utxo_by_outpoint)| { + // TODO: do serialization above + // for some reason this wouldn't move into the closure (??) + let block_bytes = block + .zcash_serialize_to_vec() + .expect("zcash_serialize_to_vec has wrong return type"); - // TODO: check highest entry of hash_by_height as in RFC + // TODO: check highest entry of hash_by_height as in RFC - hash_by_height.insert(&height_bytes, &hash.0)?; - height_by_hash.insert(&hash.0, &height_bytes)?; - block_by_height.insert(&height_bytes, block_bytes)?; + hash_by_height.insert(&height_bytes, &hash.0)?; + height_by_hash.insert(&hash.0, &height_bytes)?; + block_by_height.insert(&height_bytes, block_bytes)?; + // tx_by_hash - // for some reason type inference fails here - Ok::<_, sled::transaction::ConflictableTransactionError>(hash) - }) + for transaction in block.transactions.iter() { + let transaction_hash = transaction.hash(); + for (index, output) in transaction.outputs().iter().enumerate() { + let outpoint = transparent::OutPoint { + hash: transaction_hash, + index: index as _, + }; + + utxo_by_outpoint.zs_insert(&outpoint, output)?; + } + } + // sprout_nullifiers + // sapling_nullifiers + + // for some reason type inference fails here + Ok::<_, sled::transaction::ConflictableTransactionError>(hash) + }, + ) .map_err(Into::into) } @@ -222,6 +313,15 @@ impl FinalizedState { } } } + + /// Returns the `transparent::Output` pointed to by the given + /// `transparent::OutPoint` if it is present. + pub fn utxo( + &self, + outpoint: &transparent::OutPoint, + ) -> Result, BoxError> { + self.utxo_by_outpoint.zs_get(outpoint) + } } // Split into a helper function to be called synchronously or asynchronously.