diff --git a/Cargo.lock b/Cargo.lock index 95f15369..b0992155 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3203,6 +3203,7 @@ dependencies = [ "tracing-futures", "tracing-subscriber 0.2.13", "zebra-chain", + "zebra-script", "zebra-state", "zebra-test", ] diff --git a/zebra-consensus/Cargo.toml b/zebra-consensus/Cargo.toml index 2ab3ccc5..8270752b 100644 --- a/zebra-consensus/Cargo.toml +++ b/zebra-consensus/Cargo.toml @@ -27,6 +27,7 @@ tower-fallback = { path = "../tower-fallback/" } tower-batch = { path = "../tower-batch/" } zebra-chain = { path = "../zebra-chain" } zebra-state = { path = "../zebra-state" } +zebra-script = { path = "../zebra-script" } displaydoc = "0.1.7" [dev-dependencies] diff --git a/zebra-consensus/src/lib.rs b/zebra-consensus/src/lib.rs index f9bf88a6..02d53fca 100644 --- a/zebra-consensus/src/lib.rs +++ b/zebra-consensus/src/lib.rs @@ -23,10 +23,10 @@ pub mod config; pub mod error; pub mod mempool; pub mod parameters; +pub mod script; #[allow(dead_code)] // Remove this once transaction verification is implemented mod primitives; -mod script; mod transaction; pub use crate::config::Config; diff --git a/zebra-consensus/src/script.rs b/zebra-consensus/src/script.rs index 80b0ab68..07e4af5a 100644 --- a/zebra-consensus/src/script.rs +++ b/zebra-consensus/src/script.rs @@ -12,11 +12,82 @@ //! This is an internal module. Use `verify::BlockVerifier` for blocks and their //! transactions, or `mempool::MempoolTransactionVerifier` for mempool transactions. +use std::{pin::Pin, sync::Arc}; + +use std::future::Future; +use zebra_chain::{parameters::ConsensusBranchId, transaction::Transaction, transparent}; + +use crate::BoxError; + /// Internal script verification service. /// /// After verification, the script future completes. State changes are handled by /// `BlockVerifier` or `MempoolTransactionVerifier`. -/// -/// `ScriptVerifier` is not yet implemented. -#[derive(Default)] -pub(crate) struct ScriptVerifier {} +pub struct Verifier { + state: ZS, + branch: ConsensusBranchId, +} + +impl Verifier { + pub fn new(state: ZS, branch: ConsensusBranchId) -> Self { + Self { state, branch } + } +} + +#[derive(Debug)] +struct Request { + transaction: Arc, + input_index: usize, +} + +impl tower::Service for Verifier +where + ZS: tower::Service, + ZS::Future: Send + 'static, +{ + type Response = (); + type Error = BoxError; + type Future = + Pin> + Send + 'static>>; + + fn poll_ready( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.state.poll_ready(cx) + } + + fn call(&mut self, req: Request) -> Self::Future { + use futures_util::FutureExt; + + let input = &req.transaction.inputs()[req.input_index]; + + match input { + transparent::Input::PrevOut { outpoint, .. } => { + let output = self.state.call(zebra_state::Request::AwaitUtxo(*outpoint)); + let transaction = req.transaction; + let branch_id = self.branch; + let input_index = req.input_index; + + async move { + let previous_output = match output.await? { + zebra_state::Response::Utxo(output) => output, + _ => unreachable!("AwaitUtxo always responds with Utxo"), + }; + + zebra_script::is_valid( + transaction, + branch_id, + (input_index as u32, previous_output), + )?; + + Ok(()) + } + .boxed() + } + transparent::Input::Coinbase { .. } => { + async { Err("unexpected coinbase input".into()) }.boxed() + } + } + } +} diff --git a/zebra-state/src/request.rs b/zebra-state/src/request.rs index 29fddf86..e02b128f 100644 --- a/zebra-state/src/request.rs +++ b/zebra-state/src/request.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use zebra_chain::{ block::{self, Block}, - transaction, + transaction, transparent, }; // Allow *only* this unused import, so that rustdoc link resolution @@ -107,4 +107,7 @@ pub enum Request { /// Note: the [`HashOrHeight`] can be constructed from a [`block::Hash`] or /// [`block::Height`] using `.into()`. Block(HashOrHeight), + + /// Request a UTXO identified by the given Outpoint + AwaitUtxo(transparent::OutPoint), } diff --git a/zebra-state/src/response.rs b/zebra-state/src/response.rs index df7f9f69..b171bfe6 100644 --- a/zebra-state/src/response.rs +++ b/zebra-state/src/response.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use zebra_chain::{ block::{self, Block}, transaction::Transaction, + transparent, }; // Allow *only* this unused import, so that rustdoc link resolution @@ -30,4 +31,7 @@ pub enum Response { /// Response to [`Request::Block`] with the specified block. Block(Option>), + + /// The response to a `AwaitUtxo` request + Utxo(transparent::Output), } diff --git a/zebra-state/src/service.rs b/zebra-state/src/service.rs index 196b9e1e..1b862f01 100644 --- a/zebra-state/src/service.rs +++ b/zebra-state/src/service.rs @@ -3,6 +3,8 @@ use std::{ pin::Pin, sync::Arc, task::{Context, Poll}, + time::Duration, + time::Instant, }; use futures::future::{FutureExt, TryFutureExt}; @@ -21,6 +23,7 @@ use crate::{ }; mod memory_state; +mod utxo; // todo: put this somewhere #[derive(Debug)] @@ -39,18 +42,27 @@ struct StateService { mem: NonFinalizedState, /// Blocks awaiting their parent blocks for contextual verification. queued_blocks: QueuedBlocks, + /// The set of outpoints with pending requests for their associated transparent::Output + pending_utxos: utxo::PendingUtxos, + /// Instant tracking the last time `pending_utxos` was pruned + last_prune: Instant, } impl StateService { + const PRUNE_INTERVAL: Duration = Duration::from_secs(30); + pub fn new(config: Config, network: Network) -> Self { let sled = FinalizedState::new(&config, network); let mem = NonFinalizedState::default(); let queued_blocks = QueuedBlocks::default(); + let pending_utxos = utxo::PendingUtxos::default(); Self { sled, mem, queued_blocks, + pending_utxos, + last_prune: Instant::now(), } } @@ -154,6 +166,13 @@ impl Service for StateService { Pin> + Send + 'static>>; fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + let now = Instant::now(); + + if self.last_prune + Self::PRUNE_INTERVAL < now { + self.pending_utxos.prune(); + self.last_prune = now; + } + Poll::Ready(Ok(())) } @@ -162,6 +181,7 @@ impl Service for StateService { Request::CommitBlock { block } => { let (rsp_tx, mut rsp_rx) = broadcast::channel(1); + self.pending_utxos.check_block(&block); self.queue_and_commit_non_finalized_blocks(QueuedBlock { block, rsp_tx }); async move { @@ -177,6 +197,7 @@ impl Service for StateService { Request::CommitFinalizedBlock { block } => { let (rsp_tx, mut rsp_rx) = broadcast::channel(1); + self.pending_utxos.check_block(&block); self.sled .queue_and_commit_finalized_blocks(QueuedBlock { block, rsp_tx }); @@ -213,6 +234,17 @@ impl Service for StateService { .map_ok(Response::Block) .boxed() } + Request::AwaitUtxo(outpoint) => { + let fut = self.pending_utxos.queue(outpoint); + + if let Some(finalized_utxo) = self.sled.utxo(&outpoint).unwrap() { + self.pending_utxos.respond(outpoint, finalized_utxo); + } else if let Some(non_finalized_utxo) = self.mem.utxo(&outpoint) { + self.pending_utxos.respond(outpoint, non_finalized_utxo); + } + + fut.boxed() + } } } } diff --git a/zebra-state/src/service/memory_state.rs b/zebra-state/src/service/memory_state.rs index 3d47af30..576c3da8 100644 --- a/zebra-state/src/service/memory_state.rs +++ b/zebra-state/src/service/memory_state.rs @@ -26,7 +26,7 @@ struct Chain { height_by_hash: HashMap, tx_by_hash: HashMap, - created_utxos: HashSet, + created_utxos: HashMap, spent_utxos: HashSet, sprout_anchors: HashSet, sapling_anchors: HashSet, @@ -258,11 +258,14 @@ impl UpdateWith<(transaction::Hash, &Vec)> for Chain { &mut self, (transaction_hash, outputs): &(transaction::Hash, &Vec), ) { - for (utxo_index, _) in outputs.iter().enumerate() { - self.created_utxos.insert(transparent::OutPoint { - hash: *transaction_hash, - index: utxo_index as u32, - }); + for (utxo_index, output) in outputs.iter().enumerate() { + self.created_utxos.insert( + transparent::OutPoint { + hash: *transaction_hash, + index: utxo_index as u32, + }, + output.clone(), + ); } } @@ -272,10 +275,12 @@ impl UpdateWith<(transaction::Hash, &Vec)> for Chain { ) { for (utxo_index, _) in outputs.iter().enumerate() { assert!( - self.created_utxos.remove(&transparent::OutPoint { - hash: *transaction_hash, - index: utxo_index as u32, - }), + self.created_utxos + .remove(&transparent::OutPoint { + hash: *transaction_hash, + index: utxo_index as u32, + }) + .is_some(), "created_utxos must be present if block was" ); } @@ -530,6 +535,18 @@ impl NonFinalizedState { None } + + /// Returns the `transparent::Output` pointed to by the given + /// `transparent::OutPoint` if it is present. + pub fn utxo(&self, outpoint: &transparent::OutPoint) -> Option { + for chain in self.chain_set.iter().rev() { + if let Some(output) = chain.created_utxos.get(outpoint) { + return Some(output.clone()); + } + } + + None + } } /// A queue of blocks, awaiting the arrival of parent blocks. diff --git a/zebra-state/src/service/utxo.rs b/zebra-state/src/service/utxo.rs new file mode 100644 index 00000000..8f4736ad --- /dev/null +++ b/zebra-state/src/service/utxo.rs @@ -0,0 +1,69 @@ +#![allow(dead_code)] +use crate::{BoxError, Response}; +use std::collections::HashMap; +use std::future::Future; +use tokio::sync::broadcast; +use zebra_chain::{block::Block, transparent}; + +#[derive(Debug, Default)] +pub struct PendingUtxos(HashMap>); + +impl PendingUtxos { + /// Returns a future that will resolve to the `transparent::Output` pointed + /// to by the given `transparent::OutPoint` when it is available. + pub fn queue( + &mut self, + outpoint: transparent::OutPoint, + ) -> impl Future> { + let mut receiver = self + .0 + .entry(outpoint) + .or_insert_with(|| { + let (sender, _) = broadcast::channel(1); + sender + }) + .subscribe(); + + async move { + receiver + .recv() + .await + .map(Response::Utxo) + .map_err(BoxError::from) + } + } + + /// Notify all utxo requests waiting for the `transparent::Output` pointed to + /// by the given `transparent::OutPoint` that the `Output` has arrived. + pub fn respond(&mut self, outpoint: transparent::OutPoint, output: transparent::Output) { + if let Some(sender) = self.0.remove(&outpoint) { + let _ = sender.send(output); + } + } + + /// For each notifies waiting utxo requests for each `transparent::Output` in + /// `block` that the output has arrived. + pub fn check_block(&mut self, block: &Block) { + if self.0.is_empty() { + return; + } + + for transaction in block.transactions.iter() { + let transaction_hash = transaction.hash(); + for (index, output) in transaction.outputs().iter().enumerate() { + let outpoint = transparent::OutPoint { + hash: transaction_hash, + index: index as _, + }; + + self.respond(outpoint, output.clone()); + } + } + } + + /// Scan the set of waiting utxo requests for channels where all recievers + /// have been dropped and remove the corresponding sender. + pub fn prune(&mut self) { + self.0.retain(|_, chan| chan.receiver_count() > 0); + } +} diff --git a/zebra-state/src/sled_state.rs b/zebra-state/src/sled_state.rs index 8a788e1a..1fe23ed1 100644 --- a/zebra-state/src/sled_state.rs +++ b/zebra-state/src/sled_state.rs @@ -3,11 +3,14 @@ use std::{collections::HashMap, convert::TryInto, future::Future, sync::Arc}; use tracing::trace; -use zebra_chain::serialization::{ZcashDeserialize, ZcashSerialize}; use zebra_chain::{ block::{self, Block}, parameters::{Network, GENESIS_PREVIOUS_BLOCK_HASH}, }; +use zebra_chain::{ + serialization::{ZcashDeserialize, ZcashSerialize}, + transparent, +}; use crate::{BoxError, Config, HashOrHeight, QueuedBlock}; @@ -36,13 +39,83 @@ pub struct FinalizedState { height_by_hash: sled::Tree, block_by_height: sled::Tree, // tx_by_hash: sled::Tree, - // utxo_by_outpoint: sled::Tree, + utxo_by_outpoint: sled::Tree, // sprout_nullifiers: sled::Tree, // sapling_nullifiers: sled::Tree, // sprout_anchors: sled::Tree, // sapling_anchors: sled::Tree, } +/// Helper trait for inserting (Key, Value) pairs into sled when both the key and +/// value implement ZcashSerialize. +trait SledSerialize { + /// Serialize and insert the given key and value into a sled tree. + fn zs_insert( + &self, + key: &K, + value: &V, + ) -> Result<(), sled::transaction::UnabortableTransactionError> + where + K: ZcashSerialize, + V: ZcashSerialize; +} + +/// Helper trait for retrieving values from sled trees when the key and value +/// implement ZcashSerialize/ZcashDeserialize. +trait SledDeserialize { + /// Serialize the given key and use that to get and deserialize the + /// corresponding value from a sled tree, if it is present. + fn zs_get(&self, key: &K) -> Result, BoxError> + where + K: ZcashSerialize, + V: ZcashDeserialize; +} + +impl SledSerialize for sled::transaction::TransactionalTree { + fn zs_insert( + &self, + key: &K, + value: &V, + ) -> Result<(), sled::transaction::UnabortableTransactionError> + where + K: ZcashSerialize, + V: ZcashSerialize, + { + let key_bytes = key + .zcash_serialize_to_vec() + .expect("serializing into a vec won't fail"); + + let value_bytes = value + .zcash_serialize_to_vec() + .expect("serializing into a vec won't fail"); + + self.insert(key_bytes, value_bytes)?; + + Ok(()) + } +} + +impl SledDeserialize for sled::Tree { + fn zs_get(&self, key: &K) -> Result, BoxError> + where + K: ZcashSerialize, + V: ZcashDeserialize, + { + let key_bytes = key + .zcash_serialize_to_vec() + .expect("serializing into a vec won't fail"); + + let value_bytes = self.get(&key_bytes)?; + + let value = value_bytes + .as_deref() + .map(ZcashDeserialize::zcash_deserialize) + .transpose()?; + + Ok(value) + } +} + impl FinalizedState { pub fn new(config: &Config, network: Network) -> Self { let db = config.sled_config(network).open().unwrap(); @@ -53,7 +126,7 @@ impl FinalizedState { height_by_hash: db.open_tree(b"height_by_hash").unwrap(), block_by_height: db.open_tree(b"block_by_height").unwrap(), // tx_by_hash: db.open_tree(b"tx_by_hash").unwrap(), - // utxo_by_outpoint: db.open_tree(b"utxo_by_outpoint").unwrap(), + utxo_by_outpoint: db.open_tree(b"utxo_by_outpoint").unwrap(), // sprout_nullifiers: db.open_tree(b"sprout_nullifiers").unwrap(), // sapling_nullifiers: db.open_tree(b"sapling_nullifiers").unwrap(), } @@ -115,23 +188,41 @@ impl FinalizedState { &self.hash_by_height, &self.height_by_hash, &self.block_by_height, + &self.utxo_by_outpoint, ) - .transaction(move |(hash_by_height, height_by_hash, block_by_height)| { - // TODO: do serialization above - // for some reason this wouldn't move into the closure (??) - let block_bytes = block - .zcash_serialize_to_vec() - .expect("zcash_serialize_to_vec has wrong return type"); + .transaction( + move |(hash_by_height, height_by_hash, block_by_height, utxo_by_outpoint)| { + // TODO: do serialization above + // for some reason this wouldn't move into the closure (??) + let block_bytes = block + .zcash_serialize_to_vec() + .expect("zcash_serialize_to_vec has wrong return type"); - // TODO: check highest entry of hash_by_height as in RFC + // TODO: check highest entry of hash_by_height as in RFC - hash_by_height.insert(&height_bytes, &hash.0)?; - height_by_hash.insert(&hash.0, &height_bytes)?; - block_by_height.insert(&height_bytes, block_bytes)?; + hash_by_height.insert(&height_bytes, &hash.0)?; + height_by_hash.insert(&hash.0, &height_bytes)?; + block_by_height.insert(&height_bytes, block_bytes)?; + // tx_by_hash - // for some reason type inference fails here - Ok::<_, sled::transaction::ConflictableTransactionError>(hash) - }) + for transaction in block.transactions.iter() { + let transaction_hash = transaction.hash(); + for (index, output) in transaction.outputs().iter().enumerate() { + let outpoint = transparent::OutPoint { + hash: transaction_hash, + index: index as _, + }; + + utxo_by_outpoint.zs_insert(&outpoint, output)?; + } + } + // sprout_nullifiers + // sapling_nullifiers + + // for some reason type inference fails here + Ok::<_, sled::transaction::ConflictableTransactionError>(hash) + }, + ) .map_err(Into::into) } @@ -222,6 +313,15 @@ impl FinalizedState { } } } + + /// Returns the `transparent::Output` pointed to by the given + /// `transparent::OutPoint` if it is present. + pub fn utxo( + &self, + outpoint: &transparent::OutPoint, + ) -> Result, BoxError> { + self.utxo_by_outpoint.zs_get(outpoint) + } } // Split into a helper function to be called synchronously or asynchronously.