5. refactor(state): split database writes into separate functions (#3607)

* fix(state): mark DiskWriteBatch as must_use

* doc(state): add TODOs for moving database reads to blocking threads

* doc(state): minor comment tweaks

* refactor(state): split write batch into block, transactions, chain history

* refactor(state): split out a genesis block write method

* refactor(state): just use the empty note commitment trees directly

* refactor(state): split transaction writes into transparent, nullifiers, trees

And change DiskWriteBatch methods to take `&mut self`.

* refactor(state): split chain value pool writes out of history writes

* refactor(state): combine note commitment trees into an agrument struct

* refactor(state): split history and note commitment updates

* refactor(state): calculate current tip height and remove that argument
This commit is contained in:
teor 2022-02-25 18:14:00 +10:00 committed by GitHub
parent 397ba1fef7
commit 4fc10e5257
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 331 additions and 102 deletions

View File

@ -336,18 +336,15 @@ impl FinalizedState {
.flat_map(|outpoint| self.utxo(&outpoint).map(|utxo| (outpoint, utxo)))
.collect();
let batch = disk_db::DiskWriteBatch::new();
let mut batch = disk_db::DiskWriteBatch::new();
// In case of errors, propagate and do not write the batch.
let batch = batch.prepare_block_batch(
batch.prepare_block_batch(
&self.db,
finalized,
self.network,
self.finalized_tip_height(),
all_utxos_spent_by_block,
self.sprout_note_commitment_tree(),
self.sapling_note_commitment_tree(),
self.orchard_note_commitment_tree(),
self.note_commitment_trees(),
history_tree,
self.finalized_value_pool(),
)?;

View File

@ -32,6 +32,10 @@ pub struct DiskDb {
}
/// Wrapper struct to ensure low-level database writes go through the correct API.
///
/// [`rocksdb::WriteBatch`] is a batched set of database updates,
/// which must be written to the database using `DiskDb::write(batch)`.
#[must_use = "batches must be written to the database"]
pub struct DiskWriteBatch {
/// The inner RocksDB write batch.
batch: rocksdb::WriteBatch,
@ -99,6 +103,8 @@ impl ReadDisk for DiskDb {
// We use `get_pinned_cf` to avoid taking ownership of the serialized
// value, because we're going to deserialize it anyways, which avoids an
// extra copy
//
// TODO: move disk reads to a blocking thread (#2188)
let value_bytes = self
.db
.get_pinned_cf(cf, key_bytes)
@ -115,6 +121,8 @@ impl ReadDisk for DiskDb {
// We use `get_pinned_cf` to avoid taking ownership of the serialized
// value, because we don't use the value at all. This avoids an extra copy.
//
// TODO: move disk reads to a blocking thread (#2188)
self.db
.get_pinned_cf(cf, key_bytes)
.expect("expected that disk errors would not occur")
@ -213,11 +221,15 @@ impl DiskDb {
}
/// Returns an iterator over the keys in `cf_name`, starting from the first key.
///
/// TODO: add an iterator wrapper struct that does disk reads in a blocking thread (#2188)
pub fn forward_iterator(&self, cf_handle: &rocksdb::ColumnFamily) -> rocksdb::DBIterator {
self.db.iterator_cf(cf_handle, rocksdb::IteratorMode::Start)
}
/// Returns a reverse iterator over the keys in `cf_name`, starting from the last key.
///
/// TODO: add an iterator wrapper struct that does disk reads in a blocking thread (#2188)
pub fn reverse_iterator(&self, cf_handle: &rocksdb::ColumnFamily) -> rocksdb::DBIterator {
self.db.iterator_cf(cf_handle, rocksdb::IteratorMode::End)
}

View File

@ -13,7 +13,7 @@ use std::{borrow::Borrow, collections::HashMap, sync::Arc};
use zebra_chain::{
amount::NonNegative,
block::{self, Block, Height},
block::{self, Block},
history_tree::{HistoryTree, NonEmptyHistoryTree},
orchard,
parameters::{Network, GENESIS_PREVIOUS_BLOCK_HASH},
@ -25,14 +25,20 @@ use zebra_chain::{
use crate::{
service::finalized_state::{
disk_db::{DiskDb, ReadDisk, WriteDisk},
disk_db::{DiskDb, DiskWriteBatch, ReadDisk, WriteDisk},
disk_format::{FromDisk, TransactionLocation},
FinalizedBlock, FinalizedState,
},
BoxError, HashOrHeight,
};
use super::disk_db::DiskWriteBatch;
/// An argument wrapper struct for note commitment trees.
#[derive(Clone, Debug)]
pub struct NoteCommitmentTrees {
sprout: sprout::tree::NoteCommitmentTree,
sapling: sapling::tree::NoteCommitmentTree,
orchard: orchard::tree::NoteCommitmentTree,
}
impl FinalizedState {
// Read block methods
@ -142,6 +148,8 @@ impl FinalizedState {
self.db.zs_contains(orchard_anchors, &orchard_anchor)
}
// Read chain history methods
/// Returns the Sprout note commitment tree of the finalized tip
/// or the empty tree if the state is empty.
pub fn sprout_note_commitment_tree(&self) -> sprout::tree::NoteCommitmentTree {
@ -201,7 +209,15 @@ impl FinalizedState {
.expect("Orchard note commitment tree must exist if there is a finalized tip")
}
// Read chain methods
/// Returns the shielded note commitment trees of the finalized tip
/// or the empty trees if the state is empty.
pub fn note_commitment_trees(&self) -> NoteCommitmentTrees {
NoteCommitmentTrees {
sprout: self.sprout_note_commitment_tree(),
sapling: self.sapling_note_commitment_tree(),
orchard: self.orchard_note_commitment_tree(),
}
}
/// Returns the ZIP-221 history tree of the finalized tip or `None`
/// if it does not exist yet in the state (pre-Heartwood).
@ -229,7 +245,7 @@ impl FinalizedState {
.unwrap_or_else(ValueBalance::zero)
}
// Metrics methods
// Update metrics methods - used when writing
/// Update metrics before committing a block.
fn block_precommit_metrics(block: &Block, hash: block::Hash, height: block::Height) {
@ -307,71 +323,46 @@ impl FinalizedState {
}
}
// Write methods
impl DiskWriteBatch {
/// Prepare a database batch containing a `finalized` block,
/// Prepare a database batch containing `finalized.block`,
/// and return it (without actually writing anything).
///
/// If this method returns an error, it will be propagated,
/// and the batch will not be written to the database.
/// and the batch should not be written to the database.
///
/// # Errors
///
/// - Propagates any errors from writing to the DB
/// - Propagates any errors from updating history and note commitment trees
///
/// TODO: split up this function in the next PR.
/// - Propagates any errors from updating history tree, note commitment trees, or value pools
#[allow(clippy::too_many_arguments)]
pub fn prepare_block_batch(
mut self,
&mut self,
db: &DiskDb,
finalized: FinalizedBlock,
network: Network,
current_tip_height: Option<Height>,
mut all_utxos_spent_by_block: HashMap<transparent::OutPoint, transparent::Utxo>,
mut sprout_note_commitment_tree: sprout::tree::NoteCommitmentTree,
mut sapling_note_commitment_tree: sapling::tree::NoteCommitmentTree,
mut orchard_note_commitment_tree: orchard::tree::NoteCommitmentTree,
mut history_tree: HistoryTree,
current_value_pool: ValueBalance<NonNegative>,
) -> Result<DiskWriteBatch, BoxError> {
all_utxos_spent_by_block: HashMap<transparent::OutPoint, transparent::Utxo>,
// TODO: make an argument struct for all the current note commitment trees & history
mut note_commitment_trees: NoteCommitmentTrees,
history_tree: HistoryTree,
value_pool: ValueBalance<NonNegative>,
) -> Result<(), BoxError> {
let hash_by_height = db.cf_handle("hash_by_height").unwrap();
let height_by_hash = db.cf_handle("height_by_hash").unwrap();
let block_by_height = db.cf_handle("block_by_height").unwrap();
let tx_by_hash = db.cf_handle("tx_by_hash").unwrap();
let utxo_by_outpoint = db.cf_handle("utxo_by_outpoint").unwrap();
let sprout_nullifiers = db.cf_handle("sprout_nullifiers").unwrap();
let sapling_nullifiers = db.cf_handle("sapling_nullifiers").unwrap();
let orchard_nullifiers = db.cf_handle("orchard_nullifiers").unwrap();
let sprout_anchors = db.cf_handle("sprout_anchors").unwrap();
let sapling_anchors = db.cf_handle("sapling_anchors").unwrap();
let orchard_anchors = db.cf_handle("orchard_anchors").unwrap();
let sprout_note_commitment_tree_cf = db.cf_handle("sprout_note_commitment_tree").unwrap();
let sapling_note_commitment_tree_cf = db.cf_handle("sapling_note_commitment_tree").unwrap();
let orchard_note_commitment_tree_cf = db.cf_handle("orchard_note_commitment_tree").unwrap();
let history_tree_cf = db.cf_handle("history_tree").unwrap();
let tip_chain_value_pool = db.cf_handle("tip_chain_value_pool").unwrap();
let FinalizedBlock {
block,
hash,
height,
new_outputs,
transaction_hashes,
} = finalized;
..
} = &finalized;
// The block has passed contextual validation, so update the metrics
FinalizedState::block_precommit_metrics(&block, hash, height);
FinalizedState::block_precommit_metrics(block, *hash, *height);
// Index the block
self.zs_insert(hash_by_height, height, hash);
self.zs_insert(height_by_hash, hash, height);
self.zs_insert(block_by_height, height, &block);
self.zs_insert(block_by_height, height, block);
// # Consensus
//
@ -379,6 +370,45 @@ impl DiskWriteBatch {
// > (There is one such zero-valued output, on each of Testnet and Mainnet.)
//
// https://zips.z.cash/protocol/protocol.pdf#txnconsensus
//
// By returning early, Zebra commits the genesis block and transaction data,
// but it ignores the genesis UTXO and value pool updates.
//
// TODO: commit transaction data but not UTXOs in the next PR.
if self.prepare_genesis_batch(db, &finalized) {
return Ok(());
}
self.prepare_transaction_index_batch(db, &finalized, &mut note_commitment_trees)?;
self.prepare_note_commitment_batch(
db,
&finalized,
network,
note_commitment_trees,
history_tree,
)?;
self.prepare_chain_value_pools_batch(db, finalized, all_utxos_spent_by_block, value_pool)
}
/// If `finalized.block` is a genesis block,
/// prepare a database batch that finishes intializing the database,
/// and return `true` (without actually writing anything).
///
/// Since the genesis block's transactions are skipped,
/// the returned genesis batch should be written to the database immediately.
///
/// If `finalized.block` is not a genesis block, does nothing.
///
/// This method never returns an error.
pub fn prepare_genesis_batch(&mut self, db: &DiskDb, finalized: &FinalizedBlock) -> bool {
let sprout_note_commitment_tree_cf = db.cf_handle("sprout_note_commitment_tree").unwrap();
let sapling_note_commitment_tree_cf = db.cf_handle("sapling_note_commitment_tree").unwrap();
let orchard_note_commitment_tree_cf = db.cf_handle("orchard_note_commitment_tree").unwrap();
let FinalizedBlock { block, height, .. } = finalized;
if block.header.previous_block_hash == GENESIS_PREVIOUS_BLOCK_HASH {
// Insert empty note commitment trees. Note that these can't be
// used too early (e.g. the Orchard tree before Nu5 activates)
@ -387,27 +417,50 @@ impl DiskWriteBatch {
self.zs_insert(
sprout_note_commitment_tree_cf,
height,
sprout_note_commitment_tree,
sprout::tree::NoteCommitmentTree::default(),
);
self.zs_insert(
sapling_note_commitment_tree_cf,
height,
sapling_note_commitment_tree,
sapling::tree::NoteCommitmentTree::default(),
);
self.zs_insert(
orchard_note_commitment_tree_cf,
height,
orchard_note_commitment_tree,
orchard::tree::NoteCommitmentTree::default(),
);
return Ok(self);
return true;
}
// Index all new transparent outputs
for (outpoint, utxo) in new_outputs.borrow().iter() {
self.zs_insert(utxo_by_outpoint, outpoint, utxo);
}
false
}
// Index each transaction, spent inputs, nullifiers
/// Prepare a database batch containing `finalized.block`'s transaction indexes,
/// and return it (without actually writing anything).
///
/// If this method returns an error, it will be propagated,
/// and the batch should not be written to the database.
///
/// # Errors
///
/// - Propagates any errors from updating note commitment trees
pub fn prepare_transaction_index_batch(
&mut self,
db: &DiskDb,
finalized: &FinalizedBlock,
note_commitment_trees: &mut NoteCommitmentTrees,
) -> Result<(), BoxError> {
let tx_by_hash = db.cf_handle("tx_by_hash").unwrap();
let FinalizedBlock {
block,
height,
transaction_hashes,
..
} = finalized;
// Index each transaction hash
for (transaction_index, (transaction, transaction_hash)) in block
.transactions
.iter()
@ -415,95 +468,262 @@ impl DiskWriteBatch {
.enumerate()
{
let transaction_location = TransactionLocation {
height,
height: *height,
index: transaction_index
.try_into()
.expect("no more than 4 billion transactions per block"),
};
self.zs_insert(tx_by_hash, transaction_hash, transaction_location);
// Mark all transparent inputs as spent.
//
// Coinbase inputs represent new coins,
// so there are no UTXOs to mark as spent.
for outpoint in transaction
.inputs()
.iter()
.flat_map(|input| input.outpoint())
{
self.zs_delete(utxo_by_outpoint, outpoint);
}
self.prepare_nullifier_batch(db, transaction)?;
// Mark sprout, sapling and orchard nullifiers as spent
for sprout_nullifier in transaction.sprout_nullifiers() {
self.zs_insert(sprout_nullifiers, sprout_nullifier, ());
}
for sapling_nullifier in transaction.sapling_nullifiers() {
self.zs_insert(sapling_nullifiers, sapling_nullifier, ());
}
for orchard_nullifier in transaction.orchard_nullifiers() {
self.zs_insert(orchard_nullifiers, orchard_nullifier, ());
}
for sprout_note_commitment in transaction.sprout_note_commitments() {
sprout_note_commitment_tree.append(*sprout_note_commitment)?;
}
for sapling_note_commitment in transaction.sapling_note_commitments() {
sapling_note_commitment_tree.append(*sapling_note_commitment)?;
}
for orchard_note_commitment in transaction.orchard_note_commitments() {
orchard_note_commitment_tree.append(*orchard_note_commitment)?;
}
DiskWriteBatch::update_note_commitment_trees(transaction, note_commitment_trees)?;
}
let sprout_root = sprout_note_commitment_tree.root();
let sapling_root = sapling_note_commitment_tree.root();
let orchard_root = orchard_note_commitment_tree.root();
self.prepare_transparent_outputs_batch(db, finalized)
}
history_tree.push(network, block.clone(), sapling_root, orchard_root)?;
/// Prepare a database batch containing `finalized.block`'s UTXO changes,
/// and return it (without actually writing anything).
///
/// # Errors
///
/// - This method doesn't currently return any errors, but it might in future
pub fn prepare_transparent_outputs_batch(
&mut self,
db: &DiskDb,
finalized: &FinalizedBlock,
) -> Result<(), BoxError> {
let utxo_by_outpoint = db.cf_handle("utxo_by_outpoint").unwrap();
let FinalizedBlock {
block, new_outputs, ..
} = finalized;
// Index all new transparent outputs, before deleting any we've spent
for (outpoint, utxo) in new_outputs.borrow().iter() {
self.zs_insert(utxo_by_outpoint, outpoint, utxo);
}
// Mark all transparent inputs as spent.
//
// Coinbase inputs represent new coins,
// so there are no UTXOs to mark as spent.
for outpoint in block
.transactions
.iter()
.flat_map(|tx| tx.inputs())
.flat_map(|input| input.outpoint())
{
self.zs_delete(utxo_by_outpoint, outpoint);
}
Ok(())
}
/// Prepare a database batch containing `finalized.block`'s nullifiers,
/// and return it (without actually writing anything).
///
/// # Errors
///
/// - This method doesn't currently return any errors, but it might in future
pub fn prepare_nullifier_batch(
&mut self,
db: &DiskDb,
transaction: &Transaction,
) -> Result<(), BoxError> {
let sprout_nullifiers = db.cf_handle("sprout_nullifiers").unwrap();
let sapling_nullifiers = db.cf_handle("sapling_nullifiers").unwrap();
let orchard_nullifiers = db.cf_handle("orchard_nullifiers").unwrap();
// Mark sprout, sapling and orchard nullifiers as spent
for sprout_nullifier in transaction.sprout_nullifiers() {
self.zs_insert(sprout_nullifiers, sprout_nullifier, ());
}
for sapling_nullifier in transaction.sapling_nullifiers() {
self.zs_insert(sapling_nullifiers, sapling_nullifier, ());
}
for orchard_nullifier in transaction.orchard_nullifiers() {
self.zs_insert(orchard_nullifiers, orchard_nullifier, ());
}
Ok(())
}
/// Updates the supplied note commitment trees.
///
/// If this method returns an error, it will be propagated,
/// and the batch should not be written to the database.
///
/// # Errors
///
/// - Propagates any errors from updating note commitment trees
pub fn update_note_commitment_trees(
transaction: &Transaction,
note_commitment_trees: &mut NoteCommitmentTrees,
) -> Result<(), BoxError> {
// Update the note commitment trees
for sprout_note_commitment in transaction.sprout_note_commitments() {
note_commitment_trees
.sprout
.append(*sprout_note_commitment)?;
}
for sapling_note_commitment in transaction.sapling_note_commitments() {
note_commitment_trees
.sapling
.append(*sapling_note_commitment)?;
}
for orchard_note_commitment in transaction.orchard_note_commitments() {
note_commitment_trees
.orchard
.append(*orchard_note_commitment)?;
}
Ok(())
}
/// Prepare a database batch containing the note commitment and history tree updates
/// from `finalized.block`, and return it (without actually writing anything).
///
/// If this method returns an error, it will be propagated,
/// and the batch should not be written to the database.
///
/// # Errors
///
/// - Propagates any errors from updating the history tree
pub fn prepare_note_commitment_batch(
&mut self,
db: &DiskDb,
finalized: &FinalizedBlock,
network: Network,
// TODO: make an argument struct for all the note commitment trees & history
note_commitment_trees: NoteCommitmentTrees,
history_tree: HistoryTree,
) -> Result<(), BoxError> {
let sprout_anchors = db.cf_handle("sprout_anchors").unwrap();
let sapling_anchors = db.cf_handle("sapling_anchors").unwrap();
let orchard_anchors = db.cf_handle("orchard_anchors").unwrap();
let sprout_note_commitment_tree_cf = db.cf_handle("sprout_note_commitment_tree").unwrap();
let sapling_note_commitment_tree_cf = db.cf_handle("sapling_note_commitment_tree").unwrap();
let orchard_note_commitment_tree_cf = db.cf_handle("orchard_note_commitment_tree").unwrap();
let FinalizedBlock { height, .. } = finalized;
let sprout_root = note_commitment_trees.sprout.root();
let sapling_root = note_commitment_trees.sapling.root();
let orchard_root = note_commitment_trees.orchard.root();
// Compute the new anchors and index them
// Note: if the root hasn't changed, we write the same value again.
self.zs_insert(sprout_anchors, sprout_root, &sprout_note_commitment_tree);
self.zs_insert(sprout_anchors, sprout_root, &note_commitment_trees.sprout);
self.zs_insert(sapling_anchors, sapling_root, ());
self.zs_insert(orchard_anchors, orchard_root, ());
// Update the trees in state
let current_tip_height = *height - 1;
if let Some(h) = current_tip_height {
self.zs_delete(sprout_note_commitment_tree_cf, h);
self.zs_delete(sapling_note_commitment_tree_cf, h);
self.zs_delete(orchard_note_commitment_tree_cf, h);
self.zs_delete(history_tree_cf, h);
}
self.zs_insert(
sprout_note_commitment_tree_cf,
height,
sprout_note_commitment_tree,
note_commitment_trees.sprout,
);
self.zs_insert(
sapling_note_commitment_tree_cf,
height,
sapling_note_commitment_tree,
note_commitment_trees.sapling,
);
self.zs_insert(
orchard_note_commitment_tree_cf,
height,
orchard_note_commitment_tree,
note_commitment_trees.orchard,
);
self.prepare_history_batch(
db,
finalized,
network,
sapling_root,
orchard_root,
history_tree,
)
}
/// Prepare a database batch containing the history tree updates
/// from `finalized.block`, and return it (without actually writing anything).
///
/// If this method returns an error, it will be propagated,
/// and the batch should not be written to the database.
///
/// # Errors
///
/// - Returns any errors from updating the history tree
pub fn prepare_history_batch(
&mut self,
db: &DiskDb,
finalized: &FinalizedBlock,
network: Network,
sapling_root: sapling::tree::Root,
orchard_root: orchard::tree::Root,
mut history_tree: HistoryTree,
) -> Result<(), BoxError> {
let history_tree_cf = db.cf_handle("history_tree").unwrap();
let FinalizedBlock { block, height, .. } = finalized;
history_tree.push(network, block.clone(), sapling_root, orchard_root)?;
// Update the tree in state
let current_tip_height = *height - 1;
if let Some(h) = current_tip_height {
self.zs_delete(history_tree_cf, h);
}
// TODO: just store a single history tree, using `()` as the key,
// and remove the delete (like the chain value pool balances).
// This requires a database version update.
if let Some(history_tree) = history_tree.as_ref() {
self.zs_insert(history_tree_cf, height, history_tree);
}
Ok(())
}
/// Prepare a database batch containing the chain value pool update from `finalized.block`,
/// and return it (without actually writing anything).
///
/// If this method returns an error, it will be propagated,
/// and the batch should not be written to the database.
///
/// # Errors
///
/// - Propagates any errors from updating value pools
pub fn prepare_chain_value_pools_batch(
&mut self,
db: &DiskDb,
finalized: FinalizedBlock,
mut all_utxos_spent_by_block: HashMap<transparent::OutPoint, transparent::Utxo>,
value_pool: ValueBalance<NonNegative>,
) -> Result<(), BoxError> {
let tip_chain_value_pool = db.cf_handle("tip_chain_value_pool").unwrap();
let FinalizedBlock {
block, new_outputs, ..
} = finalized;
// Some utxos are spent in the same block so they will be in `new_outputs`.
all_utxos_spent_by_block.extend(new_outputs);
let new_pool = current_value_pool.add_block(block.borrow(), &all_utxos_spent_by_block)?;
let new_pool = value_pool.add_block(block.borrow(), &all_utxos_spent_by_block)?;
self.zs_insert(tip_chain_value_pool, (), new_pool);
Ok(self)
Ok(())
}
}