From 4fc10e52570721c41f2a9f1bced2338449dd6450 Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 25 Feb 2022 18:14:00 +1000 Subject: [PATCH] 5. refactor(state): split database writes into separate functions (#3607) * fix(state): mark DiskWriteBatch as must_use * doc(state): add TODOs for moving database reads to blocking threads * doc(state): minor comment tweaks * refactor(state): split write batch into block, transactions, chain history * refactor(state): split out a genesis block write method * refactor(state): just use the empty note commitment trees directly * refactor(state): split transaction writes into transparent, nullifiers, trees And change DiskWriteBatch methods to take `&mut self`. * refactor(state): split chain value pool writes out of history writes * refactor(state): combine note commitment trees into an agrument struct * refactor(state): split history and note commitment updates * refactor(state): calculate current tip height and remove that argument --- zebra-state/src/service/finalized_state.rs | 9 +- .../src/service/finalized_state/disk_db.rs | 12 + .../src/service/finalized_state/zebra_db.rs | 412 ++++++++++++++---- 3 files changed, 331 insertions(+), 102 deletions(-) diff --git a/zebra-state/src/service/finalized_state.rs b/zebra-state/src/service/finalized_state.rs index 184af047..3d08e991 100644 --- a/zebra-state/src/service/finalized_state.rs +++ b/zebra-state/src/service/finalized_state.rs @@ -336,18 +336,15 @@ impl FinalizedState { .flat_map(|outpoint| self.utxo(&outpoint).map(|utxo| (outpoint, utxo))) .collect(); - let batch = disk_db::DiskWriteBatch::new(); + let mut batch = disk_db::DiskWriteBatch::new(); // In case of errors, propagate and do not write the batch. - let batch = batch.prepare_block_batch( + batch.prepare_block_batch( &self.db, finalized, self.network, - self.finalized_tip_height(), all_utxos_spent_by_block, - self.sprout_note_commitment_tree(), - self.sapling_note_commitment_tree(), - self.orchard_note_commitment_tree(), + self.note_commitment_trees(), history_tree, self.finalized_value_pool(), )?; diff --git a/zebra-state/src/service/finalized_state/disk_db.rs b/zebra-state/src/service/finalized_state/disk_db.rs index 85f3e87f..e682d457 100644 --- a/zebra-state/src/service/finalized_state/disk_db.rs +++ b/zebra-state/src/service/finalized_state/disk_db.rs @@ -32,6 +32,10 @@ pub struct DiskDb { } /// Wrapper struct to ensure low-level database writes go through the correct API. +/// +/// [`rocksdb::WriteBatch`] is a batched set of database updates, +/// which must be written to the database using `DiskDb::write(batch)`. +#[must_use = "batches must be written to the database"] pub struct DiskWriteBatch { /// The inner RocksDB write batch. batch: rocksdb::WriteBatch, @@ -99,6 +103,8 @@ impl ReadDisk for DiskDb { // We use `get_pinned_cf` to avoid taking ownership of the serialized // value, because we're going to deserialize it anyways, which avoids an // extra copy + // + // TODO: move disk reads to a blocking thread (#2188) let value_bytes = self .db .get_pinned_cf(cf, key_bytes) @@ -115,6 +121,8 @@ impl ReadDisk for DiskDb { // We use `get_pinned_cf` to avoid taking ownership of the serialized // value, because we don't use the value at all. This avoids an extra copy. + // + // TODO: move disk reads to a blocking thread (#2188) self.db .get_pinned_cf(cf, key_bytes) .expect("expected that disk errors would not occur") @@ -213,11 +221,15 @@ impl DiskDb { } /// Returns an iterator over the keys in `cf_name`, starting from the first key. + /// + /// TODO: add an iterator wrapper struct that does disk reads in a blocking thread (#2188) pub fn forward_iterator(&self, cf_handle: &rocksdb::ColumnFamily) -> rocksdb::DBIterator { self.db.iterator_cf(cf_handle, rocksdb::IteratorMode::Start) } /// Returns a reverse iterator over the keys in `cf_name`, starting from the last key. + /// + /// TODO: add an iterator wrapper struct that does disk reads in a blocking thread (#2188) pub fn reverse_iterator(&self, cf_handle: &rocksdb::ColumnFamily) -> rocksdb::DBIterator { self.db.iterator_cf(cf_handle, rocksdb::IteratorMode::End) } diff --git a/zebra-state/src/service/finalized_state/zebra_db.rs b/zebra-state/src/service/finalized_state/zebra_db.rs index b03ec0ea..7df4552c 100644 --- a/zebra-state/src/service/finalized_state/zebra_db.rs +++ b/zebra-state/src/service/finalized_state/zebra_db.rs @@ -13,7 +13,7 @@ use std::{borrow::Borrow, collections::HashMap, sync::Arc}; use zebra_chain::{ amount::NonNegative, - block::{self, Block, Height}, + block::{self, Block}, history_tree::{HistoryTree, NonEmptyHistoryTree}, orchard, parameters::{Network, GENESIS_PREVIOUS_BLOCK_HASH}, @@ -25,14 +25,20 @@ use zebra_chain::{ use crate::{ service::finalized_state::{ - disk_db::{DiskDb, ReadDisk, WriteDisk}, + disk_db::{DiskDb, DiskWriteBatch, ReadDisk, WriteDisk}, disk_format::{FromDisk, TransactionLocation}, FinalizedBlock, FinalizedState, }, BoxError, HashOrHeight, }; -use super::disk_db::DiskWriteBatch; +/// An argument wrapper struct for note commitment trees. +#[derive(Clone, Debug)] +pub struct NoteCommitmentTrees { + sprout: sprout::tree::NoteCommitmentTree, + sapling: sapling::tree::NoteCommitmentTree, + orchard: orchard::tree::NoteCommitmentTree, +} impl FinalizedState { // Read block methods @@ -142,6 +148,8 @@ impl FinalizedState { self.db.zs_contains(orchard_anchors, &orchard_anchor) } + // Read chain history methods + /// Returns the Sprout note commitment tree of the finalized tip /// or the empty tree if the state is empty. pub fn sprout_note_commitment_tree(&self) -> sprout::tree::NoteCommitmentTree { @@ -201,7 +209,15 @@ impl FinalizedState { .expect("Orchard note commitment tree must exist if there is a finalized tip") } - // Read chain methods + /// Returns the shielded note commitment trees of the finalized tip + /// or the empty trees if the state is empty. + pub fn note_commitment_trees(&self) -> NoteCommitmentTrees { + NoteCommitmentTrees { + sprout: self.sprout_note_commitment_tree(), + sapling: self.sapling_note_commitment_tree(), + orchard: self.orchard_note_commitment_tree(), + } + } /// Returns the ZIP-221 history tree of the finalized tip or `None` /// if it does not exist yet in the state (pre-Heartwood). @@ -229,7 +245,7 @@ impl FinalizedState { .unwrap_or_else(ValueBalance::zero) } - // Metrics methods + // Update metrics methods - used when writing /// Update metrics before committing a block. fn block_precommit_metrics(block: &Block, hash: block::Hash, height: block::Height) { @@ -307,71 +323,46 @@ impl FinalizedState { } } -// Write methods - impl DiskWriteBatch { - /// Prepare a database batch containing a `finalized` block, + /// Prepare a database batch containing `finalized.block`, /// and return it (without actually writing anything). /// /// If this method returns an error, it will be propagated, - /// and the batch will not be written to the database. + /// and the batch should not be written to the database. /// /// # Errors /// - /// - Propagates any errors from writing to the DB - /// - Propagates any errors from updating history and note commitment trees - /// - /// TODO: split up this function in the next PR. + /// - Propagates any errors from updating history tree, note commitment trees, or value pools #[allow(clippy::too_many_arguments)] pub fn prepare_block_batch( - mut self, + &mut self, db: &DiskDb, finalized: FinalizedBlock, network: Network, - current_tip_height: Option, - mut all_utxos_spent_by_block: HashMap, - mut sprout_note_commitment_tree: sprout::tree::NoteCommitmentTree, - mut sapling_note_commitment_tree: sapling::tree::NoteCommitmentTree, - mut orchard_note_commitment_tree: orchard::tree::NoteCommitmentTree, - mut history_tree: HistoryTree, - current_value_pool: ValueBalance, - ) -> Result { + all_utxos_spent_by_block: HashMap, + // TODO: make an argument struct for all the current note commitment trees & history + mut note_commitment_trees: NoteCommitmentTrees, + history_tree: HistoryTree, + value_pool: ValueBalance, + ) -> Result<(), BoxError> { let hash_by_height = db.cf_handle("hash_by_height").unwrap(); let height_by_hash = db.cf_handle("height_by_hash").unwrap(); let block_by_height = db.cf_handle("block_by_height").unwrap(); - let tx_by_hash = db.cf_handle("tx_by_hash").unwrap(); - let utxo_by_outpoint = db.cf_handle("utxo_by_outpoint").unwrap(); - - let sprout_nullifiers = db.cf_handle("sprout_nullifiers").unwrap(); - let sapling_nullifiers = db.cf_handle("sapling_nullifiers").unwrap(); - let orchard_nullifiers = db.cf_handle("orchard_nullifiers").unwrap(); - - let sprout_anchors = db.cf_handle("sprout_anchors").unwrap(); - let sapling_anchors = db.cf_handle("sapling_anchors").unwrap(); - let orchard_anchors = db.cf_handle("orchard_anchors").unwrap(); - - let sprout_note_commitment_tree_cf = db.cf_handle("sprout_note_commitment_tree").unwrap(); - let sapling_note_commitment_tree_cf = db.cf_handle("sapling_note_commitment_tree").unwrap(); - let orchard_note_commitment_tree_cf = db.cf_handle("orchard_note_commitment_tree").unwrap(); - let history_tree_cf = db.cf_handle("history_tree").unwrap(); - - let tip_chain_value_pool = db.cf_handle("tip_chain_value_pool").unwrap(); let FinalizedBlock { block, hash, height, - new_outputs, - transaction_hashes, - } = finalized; + .. + } = &finalized; // The block has passed contextual validation, so update the metrics - FinalizedState::block_precommit_metrics(&block, hash, height); + FinalizedState::block_precommit_metrics(block, *hash, *height); // Index the block self.zs_insert(hash_by_height, height, hash); self.zs_insert(height_by_hash, hash, height); - self.zs_insert(block_by_height, height, &block); + self.zs_insert(block_by_height, height, block); // # Consensus // @@ -379,6 +370,45 @@ impl DiskWriteBatch { // > (There is one such zero-valued output, on each of Testnet and Mainnet.) // // https://zips.z.cash/protocol/protocol.pdf#txnconsensus + // + // By returning early, Zebra commits the genesis block and transaction data, + // but it ignores the genesis UTXO and value pool updates. + // + // TODO: commit transaction data but not UTXOs in the next PR. + if self.prepare_genesis_batch(db, &finalized) { + return Ok(()); + } + + self.prepare_transaction_index_batch(db, &finalized, &mut note_commitment_trees)?; + + self.prepare_note_commitment_batch( + db, + &finalized, + network, + note_commitment_trees, + history_tree, + )?; + + self.prepare_chain_value_pools_batch(db, finalized, all_utxos_spent_by_block, value_pool) + } + + /// If `finalized.block` is a genesis block, + /// prepare a database batch that finishes intializing the database, + /// and return `true` (without actually writing anything). + /// + /// Since the genesis block's transactions are skipped, + /// the returned genesis batch should be written to the database immediately. + /// + /// If `finalized.block` is not a genesis block, does nothing. + /// + /// This method never returns an error. + pub fn prepare_genesis_batch(&mut self, db: &DiskDb, finalized: &FinalizedBlock) -> bool { + let sprout_note_commitment_tree_cf = db.cf_handle("sprout_note_commitment_tree").unwrap(); + let sapling_note_commitment_tree_cf = db.cf_handle("sapling_note_commitment_tree").unwrap(); + let orchard_note_commitment_tree_cf = db.cf_handle("orchard_note_commitment_tree").unwrap(); + + let FinalizedBlock { block, height, .. } = finalized; + if block.header.previous_block_hash == GENESIS_PREVIOUS_BLOCK_HASH { // Insert empty note commitment trees. Note that these can't be // used too early (e.g. the Orchard tree before Nu5 activates) @@ -387,27 +417,50 @@ impl DiskWriteBatch { self.zs_insert( sprout_note_commitment_tree_cf, height, - sprout_note_commitment_tree, + sprout::tree::NoteCommitmentTree::default(), ); self.zs_insert( sapling_note_commitment_tree_cf, height, - sapling_note_commitment_tree, + sapling::tree::NoteCommitmentTree::default(), ); self.zs_insert( orchard_note_commitment_tree_cf, height, - orchard_note_commitment_tree, + orchard::tree::NoteCommitmentTree::default(), ); - return Ok(self); + + return true; } - // Index all new transparent outputs - for (outpoint, utxo) in new_outputs.borrow().iter() { - self.zs_insert(utxo_by_outpoint, outpoint, utxo); - } + false + } - // Index each transaction, spent inputs, nullifiers + /// Prepare a database batch containing `finalized.block`'s transaction indexes, + /// and return it (without actually writing anything). + /// + /// If this method returns an error, it will be propagated, + /// and the batch should not be written to the database. + /// + /// # Errors + /// + /// - Propagates any errors from updating note commitment trees + pub fn prepare_transaction_index_batch( + &mut self, + db: &DiskDb, + finalized: &FinalizedBlock, + note_commitment_trees: &mut NoteCommitmentTrees, + ) -> Result<(), BoxError> { + let tx_by_hash = db.cf_handle("tx_by_hash").unwrap(); + + let FinalizedBlock { + block, + height, + transaction_hashes, + .. + } = finalized; + + // Index each transaction hash for (transaction_index, (transaction, transaction_hash)) in block .transactions .iter() @@ -415,95 +468,262 @@ impl DiskWriteBatch { .enumerate() { let transaction_location = TransactionLocation { - height, + height: *height, index: transaction_index .try_into() .expect("no more than 4 billion transactions per block"), }; self.zs_insert(tx_by_hash, transaction_hash, transaction_location); - // Mark all transparent inputs as spent. - // - // Coinbase inputs represent new coins, - // so there are no UTXOs to mark as spent. - for outpoint in transaction - .inputs() - .iter() - .flat_map(|input| input.outpoint()) - { - self.zs_delete(utxo_by_outpoint, outpoint); - } + self.prepare_nullifier_batch(db, transaction)?; - // Mark sprout, sapling and orchard nullifiers as spent - for sprout_nullifier in transaction.sprout_nullifiers() { - self.zs_insert(sprout_nullifiers, sprout_nullifier, ()); - } - for sapling_nullifier in transaction.sapling_nullifiers() { - self.zs_insert(sapling_nullifiers, sapling_nullifier, ()); - } - for orchard_nullifier in transaction.orchard_nullifiers() { - self.zs_insert(orchard_nullifiers, orchard_nullifier, ()); - } - - for sprout_note_commitment in transaction.sprout_note_commitments() { - sprout_note_commitment_tree.append(*sprout_note_commitment)?; - } - for sapling_note_commitment in transaction.sapling_note_commitments() { - sapling_note_commitment_tree.append(*sapling_note_commitment)?; - } - for orchard_note_commitment in transaction.orchard_note_commitments() { - orchard_note_commitment_tree.append(*orchard_note_commitment)?; - } + DiskWriteBatch::update_note_commitment_trees(transaction, note_commitment_trees)?; } - let sprout_root = sprout_note_commitment_tree.root(); - let sapling_root = sapling_note_commitment_tree.root(); - let orchard_root = orchard_note_commitment_tree.root(); + self.prepare_transparent_outputs_batch(db, finalized) + } - history_tree.push(network, block.clone(), sapling_root, orchard_root)?; + /// Prepare a database batch containing `finalized.block`'s UTXO changes, + /// and return it (without actually writing anything). + /// + /// # Errors + /// + /// - This method doesn't currently return any errors, but it might in future + pub fn prepare_transparent_outputs_batch( + &mut self, + db: &DiskDb, + finalized: &FinalizedBlock, + ) -> Result<(), BoxError> { + let utxo_by_outpoint = db.cf_handle("utxo_by_outpoint").unwrap(); + + let FinalizedBlock { + block, new_outputs, .. + } = finalized; + + // Index all new transparent outputs, before deleting any we've spent + for (outpoint, utxo) in new_outputs.borrow().iter() { + self.zs_insert(utxo_by_outpoint, outpoint, utxo); + } + + // Mark all transparent inputs as spent. + // + // Coinbase inputs represent new coins, + // so there are no UTXOs to mark as spent. + for outpoint in block + .transactions + .iter() + .flat_map(|tx| tx.inputs()) + .flat_map(|input| input.outpoint()) + { + self.zs_delete(utxo_by_outpoint, outpoint); + } + + Ok(()) + } + + /// Prepare a database batch containing `finalized.block`'s nullifiers, + /// and return it (without actually writing anything). + /// + /// # Errors + /// + /// - This method doesn't currently return any errors, but it might in future + pub fn prepare_nullifier_batch( + &mut self, + db: &DiskDb, + transaction: &Transaction, + ) -> Result<(), BoxError> { + let sprout_nullifiers = db.cf_handle("sprout_nullifiers").unwrap(); + let sapling_nullifiers = db.cf_handle("sapling_nullifiers").unwrap(); + let orchard_nullifiers = db.cf_handle("orchard_nullifiers").unwrap(); + + // Mark sprout, sapling and orchard nullifiers as spent + for sprout_nullifier in transaction.sprout_nullifiers() { + self.zs_insert(sprout_nullifiers, sprout_nullifier, ()); + } + for sapling_nullifier in transaction.sapling_nullifiers() { + self.zs_insert(sapling_nullifiers, sapling_nullifier, ()); + } + for orchard_nullifier in transaction.orchard_nullifiers() { + self.zs_insert(orchard_nullifiers, orchard_nullifier, ()); + } + + Ok(()) + } + + /// Updates the supplied note commitment trees. + /// + /// If this method returns an error, it will be propagated, + /// and the batch should not be written to the database. + /// + /// # Errors + /// + /// - Propagates any errors from updating note commitment trees + pub fn update_note_commitment_trees( + transaction: &Transaction, + note_commitment_trees: &mut NoteCommitmentTrees, + ) -> Result<(), BoxError> { + // Update the note commitment trees + for sprout_note_commitment in transaction.sprout_note_commitments() { + note_commitment_trees + .sprout + .append(*sprout_note_commitment)?; + } + for sapling_note_commitment in transaction.sapling_note_commitments() { + note_commitment_trees + .sapling + .append(*sapling_note_commitment)?; + } + for orchard_note_commitment in transaction.orchard_note_commitments() { + note_commitment_trees + .orchard + .append(*orchard_note_commitment)?; + } + + Ok(()) + } + + /// Prepare a database batch containing the note commitment and history tree updates + /// from `finalized.block`, and return it (without actually writing anything). + /// + /// If this method returns an error, it will be propagated, + /// and the batch should not be written to the database. + /// + /// # Errors + /// + /// - Propagates any errors from updating the history tree + pub fn prepare_note_commitment_batch( + &mut self, + db: &DiskDb, + finalized: &FinalizedBlock, + network: Network, + // TODO: make an argument struct for all the note commitment trees & history + note_commitment_trees: NoteCommitmentTrees, + history_tree: HistoryTree, + ) -> Result<(), BoxError> { + let sprout_anchors = db.cf_handle("sprout_anchors").unwrap(); + let sapling_anchors = db.cf_handle("sapling_anchors").unwrap(); + let orchard_anchors = db.cf_handle("orchard_anchors").unwrap(); + + let sprout_note_commitment_tree_cf = db.cf_handle("sprout_note_commitment_tree").unwrap(); + let sapling_note_commitment_tree_cf = db.cf_handle("sapling_note_commitment_tree").unwrap(); + let orchard_note_commitment_tree_cf = db.cf_handle("orchard_note_commitment_tree").unwrap(); + + let FinalizedBlock { height, .. } = finalized; + + let sprout_root = note_commitment_trees.sprout.root(); + let sapling_root = note_commitment_trees.sapling.root(); + let orchard_root = note_commitment_trees.orchard.root(); // Compute the new anchors and index them // Note: if the root hasn't changed, we write the same value again. - self.zs_insert(sprout_anchors, sprout_root, &sprout_note_commitment_tree); + self.zs_insert(sprout_anchors, sprout_root, ¬e_commitment_trees.sprout); self.zs_insert(sapling_anchors, sapling_root, ()); self.zs_insert(orchard_anchors, orchard_root, ()); // Update the trees in state + let current_tip_height = *height - 1; if let Some(h) = current_tip_height { self.zs_delete(sprout_note_commitment_tree_cf, h); self.zs_delete(sapling_note_commitment_tree_cf, h); self.zs_delete(orchard_note_commitment_tree_cf, h); - self.zs_delete(history_tree_cf, h); } self.zs_insert( sprout_note_commitment_tree_cf, height, - sprout_note_commitment_tree, + note_commitment_trees.sprout, ); self.zs_insert( sapling_note_commitment_tree_cf, height, - sapling_note_commitment_tree, + note_commitment_trees.sapling, ); self.zs_insert( orchard_note_commitment_tree_cf, height, - orchard_note_commitment_tree, + note_commitment_trees.orchard, ); + self.prepare_history_batch( + db, + finalized, + network, + sapling_root, + orchard_root, + history_tree, + ) + } + + /// Prepare a database batch containing the history tree updates + /// from `finalized.block`, and return it (without actually writing anything). + /// + /// If this method returns an error, it will be propagated, + /// and the batch should not be written to the database. + /// + /// # Errors + /// + /// - Returns any errors from updating the history tree + pub fn prepare_history_batch( + &mut self, + db: &DiskDb, + finalized: &FinalizedBlock, + network: Network, + sapling_root: sapling::tree::Root, + orchard_root: orchard::tree::Root, + mut history_tree: HistoryTree, + ) -> Result<(), BoxError> { + let history_tree_cf = db.cf_handle("history_tree").unwrap(); + + let FinalizedBlock { block, height, .. } = finalized; + + history_tree.push(network, block.clone(), sapling_root, orchard_root)?; + + // Update the tree in state + let current_tip_height = *height - 1; + if let Some(h) = current_tip_height { + self.zs_delete(history_tree_cf, h); + } + + // TODO: just store a single history tree, using `()` as the key, + // and remove the delete (like the chain value pool balances). + // This requires a database version update. if let Some(history_tree) = history_tree.as_ref() { self.zs_insert(history_tree_cf, height, history_tree); } + Ok(()) + } + + /// Prepare a database batch containing the chain value pool update from `finalized.block`, + /// and return it (without actually writing anything). + /// + /// If this method returns an error, it will be propagated, + /// and the batch should not be written to the database. + /// + /// # Errors + /// + /// - Propagates any errors from updating value pools + pub fn prepare_chain_value_pools_batch( + &mut self, + db: &DiskDb, + finalized: FinalizedBlock, + mut all_utxos_spent_by_block: HashMap, + value_pool: ValueBalance, + ) -> Result<(), BoxError> { + let tip_chain_value_pool = db.cf_handle("tip_chain_value_pool").unwrap(); + + let FinalizedBlock { + block, new_outputs, .. + } = finalized; + // Some utxos are spent in the same block so they will be in `new_outputs`. all_utxos_spent_by_block.extend(new_outputs); - let new_pool = current_value_pool.add_block(block.borrow(), &all_utxos_spent_by_block)?; + let new_pool = value_pool.add_block(block.borrow(), &all_utxos_spent_by_block)?; self.zs_insert(tip_chain_value_pool, (), new_pool); - Ok(self) + Ok(()) } }