change(state): Deduplicate note commitment trees stored in the finalized state (#7312)
* Add support for deleting the trees * Prune the trees * Remove `Network` from `DiskWriteBatch` Removing the `Network` from `DiskWriteBatch` makes it easy to instantiate `DiskWriteBatch`es in `ZebraDb` that remove individual note commitment trees. The `Network` from `DiskWriteBatch` was used only for transparent addresses, so the refactor isn't large. After removing it from `DiskWriteBatch`, I passed it as a function argument instead. However, we should simplify the parameter lists because at least two functions have more than seven parameters now. * Support individual tree removal in `ZebraDb` * Refactor the tree removal task * Prune old comments * Remove redundant code * Batch the removals * delete ranges before relevant network upgrades * moves prev_tree inits * add iterator methods for reading note commitment trees * Sets up skeleton of sapling pipeline * Replaces .filter with .take_while Fills in pipeline Reuses zs_range_iter instead of repeating that code Updates logic to stop at initial tip height * uses std threads * delete_range excludes end key * fixes off by one bugs * Log warning when a send fails * Removes progress logs * Log join errors instead of panicking * Revert: Make the `db` field of `ZebraDb` private * Move `delete_range_sapling_tree` * Remove a redundant `else if` branch Rationale: The condition `n == 1` for the removed branch is true for a subset of values of `n` in the preceding condition `n >= 1`. * Use more specific error messages * Revert: Remove redundant methods for tree removal * Suggestions for Deduplicate note commitment trees stored in the finalized state (#7330) * Add TODOs to some `Height` methods * Add methods for deleting individual trees * Refactor the tasks for deleting trees --------- Co-authored-by: arya2 <aryasolhi@gmail.com>
This commit is contained in:
parent
5eaaeae08c
commit
d8f5d6b6f1
|
|
@ -71,6 +71,7 @@ impl Height {
|
|||
/// # Panics
|
||||
///
|
||||
/// - If the current height is at its maximum.
|
||||
// TODO Return an error instead of panicking #7263.
|
||||
pub fn next(self) -> Self {
|
||||
(self + 1).expect("Height should not be at its maximum.")
|
||||
}
|
||||
|
|
@ -80,6 +81,7 @@ impl Height {
|
|||
/// # Panics
|
||||
///
|
||||
/// - If the current height is at its minimum.
|
||||
// TODO Return an error instead of panicking #7263.
|
||||
pub fn previous(self) -> Self {
|
||||
(self - 1).expect("Height should not be at its minimum.")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -42,6 +42,8 @@ pub use disk_format::{OutputIndex, OutputLocation, TransactionLocation, MAX_ON_D
|
|||
|
||||
pub(super) use zebra_db::ZebraDb;
|
||||
|
||||
#[cfg(not(any(test, feature = "proptest-impl")))]
|
||||
pub(super) use disk_db::DiskWriteBatch;
|
||||
#[cfg(any(test, feature = "proptest-impl"))]
|
||||
pub use disk_db::{DiskWriteBatch, WriteDisk};
|
||||
|
||||
|
|
|
|||
|
|
@ -94,12 +94,10 @@ pub struct DiskDb {
|
|||
// (DiskDb can be cloned),
|
||||
// and make them accessible via read-only methods
|
||||
#[must_use = "batches must be written to the database"]
|
||||
#[derive(Default)]
|
||||
pub struct DiskWriteBatch {
|
||||
/// The inner RocksDB write batch.
|
||||
batch: rocksdb::WriteBatch,
|
||||
|
||||
/// The configured network.
|
||||
network: Network,
|
||||
}
|
||||
|
||||
/// Helper trait for inserting (Key, Value) pairs into rocksdb with a consistently
|
||||
|
|
@ -115,11 +113,17 @@ pub trait WriteDisk {
|
|||
K: IntoDisk + Debug,
|
||||
V: IntoDisk;
|
||||
|
||||
/// Remove the given key form rocksdb column family if it exists.
|
||||
/// Remove the given key from rocksdb column family if it exists.
|
||||
fn zs_delete<C, K>(&mut self, cf: &C, key: K)
|
||||
where
|
||||
C: rocksdb::AsColumnFamilyRef,
|
||||
K: IntoDisk + Debug;
|
||||
|
||||
/// Remove the given key range from rocksdb column family if it exists.
|
||||
fn zs_delete_range<C, K>(&mut self, cf: &C, from: K, to: K)
|
||||
where
|
||||
C: rocksdb::AsColumnFamilyRef,
|
||||
K: IntoDisk + Debug;
|
||||
}
|
||||
|
||||
impl WriteDisk for DiskWriteBatch {
|
||||
|
|
@ -142,6 +146,16 @@ impl WriteDisk for DiskWriteBatch {
|
|||
let key_bytes = key.as_bytes();
|
||||
self.batch.delete_cf(cf, key_bytes);
|
||||
}
|
||||
|
||||
fn zs_delete_range<C, K>(&mut self, cf: &C, from: K, to: K)
|
||||
where
|
||||
C: rocksdb::AsColumnFamilyRef,
|
||||
K: IntoDisk + Debug,
|
||||
{
|
||||
let from_bytes = from.as_bytes();
|
||||
let to_bytes = to.as_bytes();
|
||||
self.batch.delete_range_cf(cf, from_bytes, to_bytes);
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper trait for retrieving values from rocksdb column familys with a consistently
|
||||
|
|
@ -395,24 +409,18 @@ impl DiskWriteBatch {
|
|||
/// Each block must be written to the state inside a batch, so that:
|
||||
/// - concurrent `ReadStateService` queries don't see half-written blocks, and
|
||||
/// - if Zebra calls `exit`, panics, or crashes, half-written blocks are rolled back.
|
||||
pub fn new(network: Network) -> Self {
|
||||
pub fn new() -> Self {
|
||||
DiskWriteBatch {
|
||||
batch: rocksdb::WriteBatch::default(),
|
||||
network,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the configured network for this write batch.
|
||||
pub fn network(&self) -> Network {
|
||||
self.network
|
||||
}
|
||||
}
|
||||
|
||||
impl DiskDb {
|
||||
/// Returns an iterator over the items in `cf` in `range`.
|
||||
///
|
||||
/// Holding this iterator open might delay block commit transactions.
|
||||
fn zs_range_iter<C, K, V, R>(&self, cf: &C, range: R) -> impl Iterator<Item = (K, V)> + '_
|
||||
pub fn zs_range_iter<C, K, V, R>(&self, cf: &C, range: R) -> impl Iterator<Item = (K, V)> + '_
|
||||
where
|
||||
C: rocksdb::AsColumnFamilyRef,
|
||||
K: IntoDisk + FromDisk,
|
||||
|
|
@ -457,7 +465,7 @@ impl DiskDb {
|
|||
.map(|result| result.expect("unexpected database failure"))
|
||||
.map(|(key, value)| (key.to_vec(), value))
|
||||
// Handle Excluded start and the end bound
|
||||
.filter(move |(key, _value)| range.contains(key))
|
||||
.take_while(move |(key, _value)| range.contains(key))
|
||||
.map(|(key, value)| (K::from_bytes(key), V::from_bytes(value)))
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -2,7 +2,10 @@
|
|||
|
||||
use std::{
|
||||
cmp::Ordering,
|
||||
sync::{mpsc, Arc},
|
||||
sync::{
|
||||
atomic::{self, AtomicBool},
|
||||
mpsc, Arc,
|
||||
},
|
||||
thread::{self, JoinHandle},
|
||||
};
|
||||
|
||||
|
|
@ -12,14 +15,16 @@ use tracing::Span;
|
|||
use zebra_chain::{
|
||||
block::Height,
|
||||
diagnostic::task::{CheckForPanics, WaitForPanics},
|
||||
parameters::Network,
|
||||
parameters::{Network, NetworkUpgrade},
|
||||
};
|
||||
|
||||
use DbFormatChange::*;
|
||||
|
||||
use crate::{
|
||||
config::write_database_format_version_to_disk, database_format_version_in_code,
|
||||
database_format_version_on_disk, service::finalized_state::ZebraDb, Config,
|
||||
config::write_database_format_version_to_disk,
|
||||
database_format_version_in_code, database_format_version_on_disk,
|
||||
service::finalized_state::{DiskWriteBatch, ZebraDb},
|
||||
Config,
|
||||
};
|
||||
|
||||
/// The kind of database format change we're performing.
|
||||
|
|
@ -66,7 +71,7 @@ pub struct DbFormatChangeThreadHandle {
|
|||
update_task: Option<Arc<JoinHandle<()>>>,
|
||||
|
||||
/// A channel that tells the running format thread to finish early.
|
||||
cancel_handle: mpsc::SyncSender<CancelFormatChange>,
|
||||
should_cancel_format_change: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
/// Marker for cancelling a format upgrade.
|
||||
|
|
@ -144,24 +149,27 @@ impl DbFormatChange {
|
|||
//
|
||||
// Cancel handles must use try_send() to avoid blocking waiting for the format change
|
||||
// thread to shut down.
|
||||
let (cancel_handle, cancel_receiver) = mpsc::sync_channel(1);
|
||||
let should_cancel_format_change = Arc::new(AtomicBool::new(false));
|
||||
|
||||
let span = Span::current();
|
||||
let update_task = thread::spawn(move || {
|
||||
span.in_scope(move || {
|
||||
self.apply_format_change(
|
||||
config,
|
||||
network,
|
||||
initial_tip_height,
|
||||
upgrade_db,
|
||||
cancel_receiver,
|
||||
);
|
||||
let update_task = {
|
||||
let should_cancel_format_change = should_cancel_format_change.clone();
|
||||
thread::spawn(move || {
|
||||
span.in_scope(move || {
|
||||
self.apply_format_change(
|
||||
config,
|
||||
network,
|
||||
initial_tip_height,
|
||||
upgrade_db,
|
||||
should_cancel_format_change,
|
||||
);
|
||||
})
|
||||
})
|
||||
});
|
||||
};
|
||||
|
||||
let mut handle = DbFormatChangeThreadHandle {
|
||||
update_task: Some(Arc::new(update_task)),
|
||||
cancel_handle,
|
||||
should_cancel_format_change,
|
||||
};
|
||||
|
||||
handle.check_for_panics();
|
||||
|
|
@ -181,7 +189,7 @@ impl DbFormatChange {
|
|||
network: Network,
|
||||
initial_tip_height: Option<Height>,
|
||||
upgrade_db: ZebraDb,
|
||||
cancel_receiver: mpsc::Receiver<CancelFormatChange>,
|
||||
should_cancel_format_change: Arc<AtomicBool>,
|
||||
) {
|
||||
match self {
|
||||
// Handled in the rest of this function.
|
||||
|
|
@ -190,7 +198,7 @@ impl DbFormatChange {
|
|||
network,
|
||||
initial_tip_height,
|
||||
upgrade_db,
|
||||
cancel_receiver,
|
||||
should_cancel_format_change,
|
||||
),
|
||||
|
||||
NewlyCreated { .. } => {
|
||||
|
|
@ -229,7 +237,7 @@ impl DbFormatChange {
|
|||
network: Network,
|
||||
initial_tip_height: Option<Height>,
|
||||
upgrade_db: ZebraDb,
|
||||
cancel_receiver: mpsc::Receiver<CancelFormatChange>,
|
||||
should_cancel_format_change: Arc<AtomicBool>,
|
||||
) {
|
||||
let Upgrade {
|
||||
newer_running_version,
|
||||
|
|
@ -261,46 +269,187 @@ impl DbFormatChange {
|
|||
return;
|
||||
};
|
||||
|
||||
// Example format change.
|
||||
// Start of a database upgrade task.
|
||||
|
||||
// Check if we need to do this upgrade.
|
||||
let database_format_add_format_change_task =
|
||||
Version::parse("25.0.2").expect("version constant is valid");
|
||||
let version_for_pruning_trees =
|
||||
Version::parse("25.1.0").expect("Hardcoded version string should be valid.");
|
||||
|
||||
if older_disk_version < database_format_add_format_change_task {
|
||||
let mut upgrade_height = Height(0);
|
||||
// Check if we need to prune the note commitment trees in the database.
|
||||
if older_disk_version < version_for_pruning_trees {
|
||||
// Get network upgrade heights
|
||||
let (&sapling_height, _) = NetworkUpgrade::activation_list(network)
|
||||
.iter()
|
||||
.find(|(_, upgrade)| **upgrade == NetworkUpgrade::Sapling)
|
||||
.expect("there should be sapling upgrade");
|
||||
let (&orchard_height, _) = NetworkUpgrade::activation_list(network)
|
||||
.iter()
|
||||
.find(|(_, upgrade)| **upgrade == NetworkUpgrade::Nu5)
|
||||
.expect("there should be Nu5 upgrade");
|
||||
|
||||
// Go through every height from genesis to the tip of the old version.
|
||||
// If the state was downgraded, some heights might already be upgraded.
|
||||
// (Since the upgraded format is added to the tip, the database can switch between
|
||||
// lower and higher versions at any block.)
|
||||
//
|
||||
// Keep upgrading until the initial database has been upgraded,
|
||||
// or this task is cancelled by a shutdown.
|
||||
while upgrade_height <= initial_tip_height
|
||||
&& matches!(cancel_receiver.try_recv(), Err(mpsc::TryRecvError::Empty))
|
||||
{
|
||||
// TODO: Do one format upgrade step here
|
||||
//
|
||||
// This fake step just shows how to access the database.
|
||||
let _replace_me_ = upgrade_db.tip();
|
||||
// Delete duplicates before sapling and orchard heights
|
||||
let (mut prev_sapling_tree, mut prev_orchard_tree) = {
|
||||
let height = Height(1);
|
||||
let mut batch = DiskWriteBatch::new();
|
||||
|
||||
upgrade_height = (upgrade_height + 1).expect("task exits before maximum height");
|
||||
batch.delete_range_sapling_tree(&upgrade_db, &height, &sapling_height);
|
||||
batch.delete_range_orchard_tree(&upgrade_db, &height, &orchard_height);
|
||||
upgrade_db
|
||||
.write_batch(batch)
|
||||
.expect("Deleting note commitment trees should always succeed.");
|
||||
|
||||
(
|
||||
upgrade_db.sapling_tree_by_height(&Height(0)),
|
||||
upgrade_db.orchard_tree_by_height(&Height(0)),
|
||||
)
|
||||
};
|
||||
|
||||
// Create an unbounded channel for reading note commitment trees
|
||||
let (sapling_tree_tx, sapling_tree_rx) = mpsc::channel();
|
||||
|
||||
// Set up task for reading sapling note commitment trees
|
||||
let db = upgrade_db.clone();
|
||||
let should_cancel_flag = should_cancel_format_change.clone();
|
||||
let sapling_read_task = std::thread::spawn(move || {
|
||||
for (height, tree) in
|
||||
db.sapling_tree_by_height_range(sapling_height..initial_tip_height)
|
||||
{
|
||||
// Breaking from this loop and dropping the sapling_tree channel
|
||||
// will cause the sapling compare and delete tasks to finish.
|
||||
if should_cancel_flag.load(atomic::Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
|
||||
if let Err(error) = sapling_tree_tx.send((height, tree)) {
|
||||
warn!(?error, "unexpected send error")
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Create an unbounded channel for duplicate sapling note commitment tree heights
|
||||
let (unique_sapling_tree_height_tx, unique_sapling_tree_height_rx) = mpsc::channel();
|
||||
|
||||
// Set up task for reading sapling note commitment trees
|
||||
let sapling_compare_task = std::thread::spawn(move || {
|
||||
while let Ok((height, tree)) = sapling_tree_rx.recv() {
|
||||
let tree = Some(tree);
|
||||
if prev_sapling_tree != tree {
|
||||
if let Err(error) = unique_sapling_tree_height_tx.send(height) {
|
||||
warn!(?error, "unexpected send error")
|
||||
}
|
||||
prev_sapling_tree = tree;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Set up task for deleting sapling note commitment trees
|
||||
let db = upgrade_db.clone();
|
||||
let sapling_delete_task = std::thread::spawn(move || {
|
||||
let mut delete_from = sapling_height.next();
|
||||
while let Ok(delete_to) = unique_sapling_tree_height_rx.recv() {
|
||||
let num_entries = delete_to - delete_from;
|
||||
if num_entries > 0 {
|
||||
let mut batch = DiskWriteBatch::new();
|
||||
|
||||
if num_entries == 1 {
|
||||
batch.delete_sapling_tree(&db, &delete_from);
|
||||
} else {
|
||||
batch.delete_range_sapling_tree(&db, &delete_from, &delete_to);
|
||||
}
|
||||
|
||||
db.write_batch(batch).expect(
|
||||
"Deleting Sapling note commitment trees should always succeed.",
|
||||
);
|
||||
}
|
||||
|
||||
delete_from = delete_to.next();
|
||||
}
|
||||
});
|
||||
|
||||
// Create an unbounded channel for reading note commitment trees
|
||||
let (orchard_tree_tx, orchard_tree_rx) = mpsc::channel();
|
||||
|
||||
// Set up task for reading orchard note commitment trees
|
||||
let db = upgrade_db.clone();
|
||||
let should_cancel_flag = should_cancel_format_change;
|
||||
let orchard_read_task = std::thread::spawn(move || {
|
||||
for (height, tree) in
|
||||
db.orchard_tree_by_height_range(orchard_height..initial_tip_height)
|
||||
{
|
||||
// Breaking from this loop and dropping the orchard_tree channel
|
||||
// will cause the orchard compare and delete tasks to finish.
|
||||
if should_cancel_flag.load(atomic::Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
|
||||
if let Err(error) = orchard_tree_tx.send((height, tree)) {
|
||||
warn!(?error, "unexpected send error")
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Create an unbounded channel for duplicate orchard note commitment tree heights
|
||||
let (unique_orchard_tree_height_tx, unique_orchard_tree_height_rx) = mpsc::channel();
|
||||
|
||||
// Set up task for reading orchard note commitment trees
|
||||
let orchard_compare_task = std::thread::spawn(move || {
|
||||
while let Ok((height, tree)) = orchard_tree_rx.recv() {
|
||||
let tree = Some(tree);
|
||||
if prev_orchard_tree != tree {
|
||||
if let Err(error) = unique_orchard_tree_height_tx.send(height) {
|
||||
warn!(?error, "unexpected send error")
|
||||
}
|
||||
|
||||
prev_orchard_tree = tree;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Set up task for deleting orchard note commitment trees
|
||||
let db = upgrade_db;
|
||||
let orchard_delete_task = std::thread::spawn(move || {
|
||||
let mut delete_from = orchard_height.next();
|
||||
while let Ok(delete_to) = unique_orchard_tree_height_rx.recv() {
|
||||
let num_entries = delete_to - delete_from;
|
||||
if num_entries > 0 {
|
||||
let mut batch = DiskWriteBatch::new();
|
||||
|
||||
if num_entries == 1 {
|
||||
batch.delete_orchard_tree(&db, &delete_from);
|
||||
} else {
|
||||
batch.delete_range_orchard_tree(&db, &delete_from, &delete_to);
|
||||
}
|
||||
|
||||
db.write_batch(batch).expect(
|
||||
"Deleting Orchard note commitment trees should always succeed.",
|
||||
);
|
||||
}
|
||||
|
||||
delete_from = delete_to.next();
|
||||
}
|
||||
});
|
||||
|
||||
for task in [
|
||||
sapling_read_task,
|
||||
sapling_compare_task,
|
||||
sapling_delete_task,
|
||||
orchard_read_task,
|
||||
orchard_compare_task,
|
||||
orchard_delete_task,
|
||||
] {
|
||||
if let Err(error) = task.join() {
|
||||
warn!(?error, "unexpected join error")
|
||||
}
|
||||
}
|
||||
|
||||
// At the end of each format upgrade, the database is marked as upgraded to that version.
|
||||
// Upgrades can be run more than once if Zebra is restarted, so this is just a performance
|
||||
// optimisation.
|
||||
info!(
|
||||
?initial_tip_height,
|
||||
?newer_running_version,
|
||||
?older_disk_version,
|
||||
"marking database as upgraded"
|
||||
);
|
||||
Self::mark_as_upgraded_to(&database_format_add_format_change_task, &config, network);
|
||||
// At the end of each format upgrade, we mark the database as upgraded to that version.
|
||||
// We don't mark the database if `height` didn't reach the `initial_tip_height` because
|
||||
// Zebra wouldn't run the upgrade anymore, and the part of the database above `height`
|
||||
// wouldn't be upgraded.
|
||||
info!(?newer_running_version, "Database has been upgraded to:");
|
||||
Self::mark_as_upgraded_to(&version_for_pruning_trees, &config, network);
|
||||
}
|
||||
|
||||
// End of example format change.
|
||||
// End of a database upgrade task.
|
||||
|
||||
// # New Upgrades Usually Go Here
|
||||
//
|
||||
|
|
@ -309,12 +458,6 @@ impl DbFormatChange {
|
|||
// Run the latest format upgrade code after the other upgrades are complete,
|
||||
// then mark the format as upgraded. The code should check `cancel_receiver`
|
||||
// every time it runs its inner update loop.
|
||||
info!(
|
||||
?initial_tip_height,
|
||||
?newer_running_version,
|
||||
?older_disk_version,
|
||||
"database is fully upgraded"
|
||||
);
|
||||
}
|
||||
|
||||
/// Mark a newly created database with the current format version.
|
||||
|
|
@ -478,7 +621,8 @@ impl DbFormatChangeThreadHandle {
|
|||
// There's nothing we can do about errors here.
|
||||
// If the channel is disconnected, the task has exited.
|
||||
// If it's full, it's already been cancelled.
|
||||
let _ = self.cancel_handle.try_send(CancelFormatChange);
|
||||
self.should_cancel_format_change
|
||||
.store(true, atomic::Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Check for panics in the code running in the spawned thread.
|
||||
|
|
|
|||
|
|
@ -2,9 +2,7 @@
|
|||
|
||||
use std::ops::Deref;
|
||||
|
||||
use zebra_chain::{
|
||||
amount::NonNegative, block::Block, parameters::Network::*, sprout, value_balance::ValueBalance,
|
||||
};
|
||||
use zebra_chain::{amount::NonNegative, block::Block, sprout, value_balance::ValueBalance};
|
||||
|
||||
use crate::service::finalized_state::{
|
||||
disk_db::{DiskDb, DiskWriteBatch, WriteDisk},
|
||||
|
|
@ -31,7 +29,7 @@ impl ZebraDb {
|
|||
|
||||
/// Allow to set up a fake value pool in the database for testing purposes.
|
||||
pub fn set_finalized_value_pool(&self, fake_value_pool: ValueBalance<NonNegative>) {
|
||||
let mut batch = DiskWriteBatch::new(Mainnet);
|
||||
let mut batch = DiskWriteBatch::new();
|
||||
let value_pool_cf = self.db().cf_handle("tip_chain_value_pool").unwrap();
|
||||
|
||||
batch.zs_insert(&value_pool_cf, (), fake_value_pool);
|
||||
|
|
@ -41,7 +39,7 @@ impl ZebraDb {
|
|||
/// Artificially prime the note commitment tree anchor sets with anchors
|
||||
/// referenced in a block, for testing purposes _only_.
|
||||
pub fn populate_with_anchors(&self, block: &Block) {
|
||||
let mut batch = DiskWriteBatch::new(Mainnet);
|
||||
let mut batch = DiskWriteBatch::new();
|
||||
|
||||
let sprout_anchors = self.db().cf_handle("sprout_anchors").unwrap();
|
||||
let sapling_anchors = self.db().cf_handle("sapling_anchors").unwrap();
|
||||
|
|
|
|||
|
|
@ -368,11 +368,12 @@ impl ZebraDb {
|
|||
.filter_map(|address| Some((address, self.address_balance_location(&address)?)))
|
||||
.collect();
|
||||
|
||||
let mut batch = DiskWriteBatch::new(network);
|
||||
let mut batch = DiskWriteBatch::new();
|
||||
|
||||
// In case of errors, propagate and do not write the batch.
|
||||
batch.prepare_block_batch(
|
||||
self,
|
||||
network,
|
||||
&finalized,
|
||||
new_outputs_by_out_loc,
|
||||
spent_utxos_by_outpoint,
|
||||
|
|
@ -388,6 +389,11 @@ impl ZebraDb {
|
|||
|
||||
Ok(finalized.verified.hash)
|
||||
}
|
||||
|
||||
/// Writes the given batch to the database.
|
||||
pub(crate) fn write_batch(&self, batch: DiskWriteBatch) -> Result<(), rocksdb::Error> {
|
||||
self.db.write(batch)
|
||||
}
|
||||
}
|
||||
|
||||
/// Lookup the output location for an outpoint.
|
||||
|
|
@ -419,12 +425,11 @@ impl DiskWriteBatch {
|
|||
/// # Errors
|
||||
///
|
||||
/// - Propagates any errors from updating history tree, note commitment trees, or value pools
|
||||
//
|
||||
// TODO: move db, finalized, and maybe other arguments into DiskWriteBatch
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn prepare_block_batch(
|
||||
&mut self,
|
||||
zebra_db: &ZebraDb,
|
||||
network: Network,
|
||||
finalized: &SemanticallyVerifiedBlockWithTrees,
|
||||
new_outputs_by_out_loc: BTreeMap<OutputLocation, transparent::Utxo>,
|
||||
spent_utxos_by_outpoint: HashMap<transparent::OutPoint, transparent::Utxo>,
|
||||
|
|
@ -454,6 +459,7 @@ impl DiskWriteBatch {
|
|||
// Commit transaction indexes
|
||||
self.prepare_transparent_transaction_batch(
|
||||
db,
|
||||
network,
|
||||
&finalized.verified,
|
||||
&new_outputs_by_out_loc,
|
||||
&spent_utxos_by_outpoint,
|
||||
|
|
|
|||
|
|
@ -120,7 +120,7 @@ fn test_block_db_round_trip_with(
|
|||
};
|
||||
|
||||
// Skip validation by writing the block directly to the database
|
||||
let mut batch = DiskWriteBatch::new(Mainnet);
|
||||
let mut batch = DiskWriteBatch::new();
|
||||
batch
|
||||
.prepare_block_header_and_transaction_data_batch(&state.db, &finalized)
|
||||
.expect("block is valid for batch");
|
||||
|
|
|
|||
|
|
@ -112,8 +112,7 @@ impl ZebraDb {
|
|||
.zs_items_in_range_unordered(&sprout_anchors_handle, ..)
|
||||
}
|
||||
|
||||
/// Returns the Sapling note commitment tree of the finalized tip
|
||||
/// or the empty tree if the state is empty.
|
||||
/// Returns the Sapling note commitment trees starting from the given block height up to the chain tip
|
||||
pub fn sapling_tree(&self) -> Arc<sapling::tree::NoteCommitmentTree> {
|
||||
let height = match self.finalized_tip_height() {
|
||||
Some(h) => h,
|
||||
|
|
@ -124,6 +123,33 @@ impl ZebraDb {
|
|||
.expect("Sapling note commitment tree must exist if there is a finalized tip")
|
||||
}
|
||||
|
||||
/// Returns the Orchard note commitment trees starting from the given block height up to the chain tip
|
||||
#[allow(clippy::unwrap_in_result)]
|
||||
pub fn orchard_tree_by_height_range<R>(
|
||||
&self,
|
||||
range: R,
|
||||
) -> impl Iterator<Item = (Height, Arc<orchard::tree::NoteCommitmentTree>)> + '_
|
||||
where
|
||||
R: std::ops::RangeBounds<Height>,
|
||||
{
|
||||
let orchard_trees = self.db.cf_handle("orchard_note_commitment_tree").unwrap();
|
||||
self.db.zs_range_iter(&orchard_trees, range)
|
||||
}
|
||||
|
||||
/// Returns the Sapling note commitment tree matching the given block height,
|
||||
/// or `None` if the height is above the finalized tip.
|
||||
#[allow(clippy::unwrap_in_result)]
|
||||
pub fn sapling_tree_by_height_range<R>(
|
||||
&self,
|
||||
range: R,
|
||||
) -> impl Iterator<Item = (Height, Arc<sapling::tree::NoteCommitmentTree>)> + '_
|
||||
where
|
||||
R: std::ops::RangeBounds<Height>,
|
||||
{
|
||||
let sapling_trees = self.db.cf_handle("sapling_note_commitment_tree").unwrap();
|
||||
self.db.zs_range_iter(&sapling_trees, range)
|
||||
}
|
||||
|
||||
/// Returns the Sapling note commitment tree matching the given block height,
|
||||
/// or `None` if the height is above the finalized tip.
|
||||
#[allow(clippy::unwrap_in_result)]
|
||||
|
|
@ -142,11 +168,6 @@ impl ZebraDb {
|
|||
let sapling_trees = self.db.cf_handle("sapling_note_commitment_tree").unwrap();
|
||||
|
||||
// If we know there must be a tree, search backwards for it.
|
||||
//
|
||||
// # Compatibility
|
||||
//
|
||||
// Allow older Zebra versions to read future database formats, after note commitment trees
|
||||
// have been deduplicated. See ticket #6642 for details.
|
||||
let (_first_duplicate_height, tree) = self
|
||||
.db
|
||||
.zs_prev_key_value_back_from(&sapling_trees, height)
|
||||
|
|
@ -186,9 +207,7 @@ impl ZebraDb {
|
|||
|
||||
let orchard_trees = self.db.cf_handle("orchard_note_commitment_tree").unwrap();
|
||||
|
||||
// # Compatibility
|
||||
//
|
||||
// Allow older Zebra versions to read future database formats. See ticket #6642 for details.
|
||||
// If we know there must be a tree, search backwards for it.
|
||||
let (_first_duplicate_height, tree) = self
|
||||
.db
|
||||
.zs_prev_key_value_back_from(&orchard_trees, height)
|
||||
|
|
@ -338,4 +357,40 @@ impl DiskWriteBatch {
|
|||
|
||||
self.prepare_history_batch(db, finalized)
|
||||
}
|
||||
|
||||
/// Deletes the Sapling note commitment tree at the given [`Height`].
|
||||
pub fn delete_sapling_tree(&mut self, zebra_db: &ZebraDb, height: &Height) {
|
||||
let sapling_tree_cf = zebra_db
|
||||
.db
|
||||
.cf_handle("sapling_note_commitment_tree")
|
||||
.unwrap();
|
||||
self.zs_delete(&sapling_tree_cf, height);
|
||||
}
|
||||
|
||||
/// Deletes the range of Sapling note commitment trees at the given [`Height`]s. Doesn't delete the upper bound.
|
||||
pub fn delete_range_sapling_tree(&mut self, zebra_db: &ZebraDb, from: &Height, to: &Height) {
|
||||
let sapling_tree_cf = zebra_db
|
||||
.db
|
||||
.cf_handle("sapling_note_commitment_tree")
|
||||
.unwrap();
|
||||
self.zs_delete_range(&sapling_tree_cf, from, to);
|
||||
}
|
||||
|
||||
/// Deletes the Orchard note commitment tree at the given [`Height`].
|
||||
pub fn delete_orchard_tree(&mut self, zebra_db: &ZebraDb, height: &Height) {
|
||||
let orchard_tree_cf = zebra_db
|
||||
.db
|
||||
.cf_handle("orchard_note_commitment_tree")
|
||||
.unwrap();
|
||||
self.zs_delete(&orchard_tree_cf, height);
|
||||
}
|
||||
|
||||
/// Deletes the range of Orchard note commitment trees at the given [`Height`]s. Doesn't delete the upper bound.
|
||||
pub fn delete_range_orchard_tree(&mut self, zebra_db: &ZebraDb, from: &Height, to: &Height) {
|
||||
let orchard_tree_cf = zebra_db
|
||||
.db
|
||||
.cf_handle("orchard_note_commitment_tree")
|
||||
.unwrap();
|
||||
self.zs_delete_range(&orchard_tree_cf, from, to);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ use std::{
|
|||
use zebra_chain::{
|
||||
amount::{self, Amount, NonNegative},
|
||||
block::Height,
|
||||
parameters::Network,
|
||||
transaction::{self, Transaction},
|
||||
transparent::{self, Input},
|
||||
};
|
||||
|
|
@ -366,9 +367,11 @@ impl DiskWriteBatch {
|
|||
/// # Errors
|
||||
///
|
||||
/// - Propagates any errors from updating note commitment trees
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn prepare_transparent_transaction_batch(
|
||||
&mut self,
|
||||
db: &DiskDb,
|
||||
network: Network,
|
||||
finalized: &SemanticallyVerifiedBlock,
|
||||
new_outputs_by_out_loc: &BTreeMap<OutputLocation, transparent::Utxo>,
|
||||
spent_utxos_by_outpoint: &HashMap<transparent::OutPoint, transparent::Utxo>,
|
||||
|
|
@ -380,11 +383,13 @@ impl DiskWriteBatch {
|
|||
// Update created and spent transparent outputs
|
||||
self.prepare_new_transparent_outputs_batch(
|
||||
db,
|
||||
network,
|
||||
new_outputs_by_out_loc,
|
||||
&mut address_balances,
|
||||
)?;
|
||||
self.prepare_spent_transparent_outputs_batch(
|
||||
db,
|
||||
network,
|
||||
spent_utxos_by_out_loc,
|
||||
&mut address_balances,
|
||||
)?;
|
||||
|
|
@ -395,6 +400,7 @@ impl DiskWriteBatch {
|
|||
|
||||
self.prepare_spending_transparent_tx_ids_batch(
|
||||
db,
|
||||
network,
|
||||
spending_tx_location,
|
||||
transaction,
|
||||
spent_utxos_by_outpoint,
|
||||
|
|
@ -422,6 +428,7 @@ impl DiskWriteBatch {
|
|||
pub fn prepare_new_transparent_outputs_batch(
|
||||
&mut self,
|
||||
db: &DiskDb,
|
||||
network: Network,
|
||||
new_outputs_by_out_loc: &BTreeMap<OutputLocation, transparent::Utxo>,
|
||||
address_balances: &mut HashMap<transparent::Address, AddressBalanceLocation>,
|
||||
) -> Result<(), BoxError> {
|
||||
|
|
@ -434,7 +441,7 @@ impl DiskWriteBatch {
|
|||
// Index all new transparent outputs
|
||||
for (new_output_location, utxo) in new_outputs_by_out_loc {
|
||||
let unspent_output = &utxo.output;
|
||||
let receiving_address = unspent_output.address(self.network());
|
||||
let receiving_address = unspent_output.address(network);
|
||||
|
||||
// Update the address balance by adding this UTXO's value
|
||||
if let Some(receiving_address) = receiving_address {
|
||||
|
|
@ -498,6 +505,7 @@ impl DiskWriteBatch {
|
|||
pub fn prepare_spent_transparent_outputs_batch(
|
||||
&mut self,
|
||||
db: &DiskDb,
|
||||
network: Network,
|
||||
spent_utxos_by_out_loc: &BTreeMap<OutputLocation, transparent::Utxo>,
|
||||
address_balances: &mut HashMap<transparent::Address, AddressBalanceLocation>,
|
||||
) -> Result<(), BoxError> {
|
||||
|
|
@ -510,7 +518,7 @@ impl DiskWriteBatch {
|
|||
// Coinbase inputs represent new coins, so there are no UTXOs to mark as spent.
|
||||
for (spent_output_location, utxo) in spent_utxos_by_out_loc {
|
||||
let spent_output = &utxo.output;
|
||||
let sending_address = spent_output.address(self.network());
|
||||
let sending_address = spent_output.address(network);
|
||||
|
||||
// Fetch the balance, and the link from the address to the AddressLocation, from memory.
|
||||
if let Some(sending_address) = sending_address {
|
||||
|
|
@ -552,6 +560,7 @@ impl DiskWriteBatch {
|
|||
pub fn prepare_spending_transparent_tx_ids_batch(
|
||||
&mut self,
|
||||
db: &DiskDb,
|
||||
network: Network,
|
||||
spending_tx_location: TransactionLocation,
|
||||
transaction: &Transaction,
|
||||
spent_utxos_by_outpoint: &HashMap<transparent::OutPoint, transparent::Utxo>,
|
||||
|
|
@ -567,7 +576,7 @@ impl DiskWriteBatch {
|
|||
let spent_utxo = spent_utxos_by_outpoint
|
||||
.get(&spent_outpoint)
|
||||
.expect("unexpected missing spent output");
|
||||
let sending_address = spent_utxo.output.address(self.network());
|
||||
let sending_address = spent_utxo.output.address(network);
|
||||
|
||||
// Fetch the balance, and the link from the address to the AddressLocation, from memory.
|
||||
if let Some(sending_address) = sending_address {
|
||||
|
|
|
|||
Loading…
Reference in New Issue