change(state): Set iterator read bounds where possible in DiskDb (#7731)
* Adds zs_iter_opts method * uses checked_add * uses zs_range_iter for other read methods * fixes bug * Fixes bug in zs_iter_opts * Adds test & updates method docs * updates docs * Update zebra-state/src/service/finalized_state/disk_db/tests.rs * Corrects code comment * adds support for variable-sized keys * adds test case * Updates docs * Applies suggestions from code review * Add extra checks * Fix test code and rustfmt * fixes test * fixes test * adds reverse arg to new zs_range_iter calls --------- Co-authored-by: teor <teor@riseup.net>
This commit is contained in:
parent
ab3ce9a277
commit
56e6305f04
|
|
@ -21,6 +21,7 @@ use std::{
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use rlimit::increase_nofile_limit;
|
use rlimit::increase_nofile_limit;
|
||||||
|
|
||||||
|
use rocksdb::ReadOptions;
|
||||||
use zebra_chain::parameters::Network;
|
use zebra_chain::parameters::Network;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
|
@ -194,7 +195,7 @@ pub trait ReadDisk {
|
||||||
fn zs_first_key_value<C, K, V>(&self, cf: &C) -> Option<(K, V)>
|
fn zs_first_key_value<C, K, V>(&self, cf: &C) -> Option<(K, V)>
|
||||||
where
|
where
|
||||||
C: rocksdb::AsColumnFamilyRef,
|
C: rocksdb::AsColumnFamilyRef,
|
||||||
K: FromDisk,
|
K: IntoDisk + FromDisk,
|
||||||
V: FromDisk;
|
V: FromDisk;
|
||||||
|
|
||||||
/// Returns the highest key in `cf`, and the corresponding value.
|
/// Returns the highest key in `cf`, and the corresponding value.
|
||||||
|
|
@ -203,7 +204,7 @@ pub trait ReadDisk {
|
||||||
fn zs_last_key_value<C, K, V>(&self, cf: &C) -> Option<(K, V)>
|
fn zs_last_key_value<C, K, V>(&self, cf: &C) -> Option<(K, V)>
|
||||||
where
|
where
|
||||||
C: rocksdb::AsColumnFamilyRef,
|
C: rocksdb::AsColumnFamilyRef,
|
||||||
K: FromDisk,
|
K: IntoDisk + FromDisk,
|
||||||
V: FromDisk;
|
V: FromDisk;
|
||||||
|
|
||||||
/// Returns the first key greater than or equal to `lower_bound` in `cf`,
|
/// Returns the first key greater than or equal to `lower_bound` in `cf`,
|
||||||
|
|
@ -321,34 +322,22 @@ impl ReadDisk for DiskDb {
|
||||||
fn zs_first_key_value<C, K, V>(&self, cf: &C) -> Option<(K, V)>
|
fn zs_first_key_value<C, K, V>(&self, cf: &C) -> Option<(K, V)>
|
||||||
where
|
where
|
||||||
C: rocksdb::AsColumnFamilyRef,
|
C: rocksdb::AsColumnFamilyRef,
|
||||||
K: FromDisk,
|
K: IntoDisk + FromDisk,
|
||||||
V: FromDisk,
|
V: FromDisk,
|
||||||
{
|
{
|
||||||
// Reading individual values from iterators does not seem to cause database hangs.
|
// Reading individual values from iterators does not seem to cause database hangs.
|
||||||
self.db
|
self.zs_range_iter(cf, .., false).next()
|
||||||
.iterator_cf(cf, rocksdb::IteratorMode::Start)
|
|
||||||
.next()?
|
|
||||||
.map(|(key_bytes, value_bytes)| {
|
|
||||||
Some((K::from_bytes(key_bytes), V::from_bytes(value_bytes)))
|
|
||||||
})
|
|
||||||
.expect("unexpected database failure")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(clippy::unwrap_in_result)]
|
#[allow(clippy::unwrap_in_result)]
|
||||||
fn zs_last_key_value<C, K, V>(&self, cf: &C) -> Option<(K, V)>
|
fn zs_last_key_value<C, K, V>(&self, cf: &C) -> Option<(K, V)>
|
||||||
where
|
where
|
||||||
C: rocksdb::AsColumnFamilyRef,
|
C: rocksdb::AsColumnFamilyRef,
|
||||||
K: FromDisk,
|
K: IntoDisk + FromDisk,
|
||||||
V: FromDisk,
|
V: FromDisk,
|
||||||
{
|
{
|
||||||
// Reading individual values from iterators does not seem to cause database hangs.
|
// Reading individual values from iterators does not seem to cause database hangs.
|
||||||
self.db
|
self.zs_range_iter(cf, .., true).next()
|
||||||
.iterator_cf(cf, rocksdb::IteratorMode::End)
|
|
||||||
.next()?
|
|
||||||
.map(|(key_bytes, value_bytes)| {
|
|
||||||
Some((K::from_bytes(key_bytes), V::from_bytes(value_bytes)))
|
|
||||||
})
|
|
||||||
.expect("unexpected database failure")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(clippy::unwrap_in_result)]
|
#[allow(clippy::unwrap_in_result)]
|
||||||
|
|
@ -358,17 +347,8 @@ impl ReadDisk for DiskDb {
|
||||||
K: IntoDisk + FromDisk,
|
K: IntoDisk + FromDisk,
|
||||||
V: FromDisk,
|
V: FromDisk,
|
||||||
{
|
{
|
||||||
let lower_bound = lower_bound.as_bytes();
|
|
||||||
let from = rocksdb::IteratorMode::From(lower_bound.as_ref(), rocksdb::Direction::Forward);
|
|
||||||
|
|
||||||
// Reading individual values from iterators does not seem to cause database hangs.
|
// Reading individual values from iterators does not seem to cause database hangs.
|
||||||
self.db
|
self.zs_range_iter(cf, lower_bound.., false).next()
|
||||||
.iterator_cf(cf, from)
|
|
||||||
.next()?
|
|
||||||
.map(|(key_bytes, value_bytes)| {
|
|
||||||
Some((K::from_bytes(key_bytes), V::from_bytes(value_bytes)))
|
|
||||||
})
|
|
||||||
.expect("unexpected database failure")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(clippy::unwrap_in_result)]
|
#[allow(clippy::unwrap_in_result)]
|
||||||
|
|
@ -378,17 +358,8 @@ impl ReadDisk for DiskDb {
|
||||||
K: IntoDisk + FromDisk,
|
K: IntoDisk + FromDisk,
|
||||||
V: FromDisk,
|
V: FromDisk,
|
||||||
{
|
{
|
||||||
let upper_bound = upper_bound.as_bytes();
|
|
||||||
let from = rocksdb::IteratorMode::From(upper_bound.as_ref(), rocksdb::Direction::Reverse);
|
|
||||||
|
|
||||||
// Reading individual values from iterators does not seem to cause database hangs.
|
// Reading individual values from iterators does not seem to cause database hangs.
|
||||||
self.db
|
self.zs_range_iter(cf, ..=upper_bound, true).next()
|
||||||
.iterator_cf(cf, from)
|
|
||||||
.next()?
|
|
||||||
.map(|(key_bytes, value_bytes)| {
|
|
||||||
Some((K::from_bytes(key_bytes), V::from_bytes(value_bytes)))
|
|
||||||
})
|
|
||||||
.expect("unexpected database failure")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn zs_items_in_range_ordered<C, K, V, R>(&self, cf: &C, range: R) -> BTreeMap<K, V>
|
fn zs_items_in_range_ordered<C, K, V, R>(&self, cf: &C, range: R) -> BTreeMap<K, V>
|
||||||
|
|
@ -398,7 +369,7 @@ impl ReadDisk for DiskDb {
|
||||||
V: FromDisk,
|
V: FromDisk,
|
||||||
R: RangeBounds<K>,
|
R: RangeBounds<K>,
|
||||||
{
|
{
|
||||||
self.zs_range_iter(cf, range).collect()
|
self.zs_range_iter(cf, range, false).collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn zs_items_in_range_unordered<C, K, V, R>(&self, cf: &C, range: R) -> HashMap<K, V>
|
fn zs_items_in_range_unordered<C, K, V, R>(&self, cf: &C, range: R) -> HashMap<K, V>
|
||||||
|
|
@ -408,7 +379,7 @@ impl ReadDisk for DiskDb {
|
||||||
V: FromDisk,
|
V: FromDisk,
|
||||||
R: RangeBounds<K>,
|
R: RangeBounds<K>,
|
||||||
{
|
{
|
||||||
self.zs_range_iter(cf, range).collect()
|
self.zs_range_iter(cf, range, false).collect()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -430,15 +401,24 @@ impl DiskWriteBatch {
|
||||||
impl DiskDb {
|
impl DiskDb {
|
||||||
/// Returns an iterator over the items in `cf` in `range`.
|
/// Returns an iterator over the items in `cf` in `range`.
|
||||||
///
|
///
|
||||||
|
/// Accepts a `reverse` argument. If it is `true`, creates the iterator with an
|
||||||
|
/// [`IteratorMode`](rocksdb::IteratorMode) of [`End`](rocksdb::IteratorMode::End), or
|
||||||
|
/// [`From`](rocksdb::IteratorMode::From) with [`Direction::Reverse`](rocksdb::Direction::Reverse).
|
||||||
|
///
|
||||||
/// Holding this iterator open might delay block commit transactions.
|
/// Holding this iterator open might delay block commit transactions.
|
||||||
pub fn zs_range_iter<C, K, V, R>(&self, cf: &C, range: R) -> impl Iterator<Item = (K, V)> + '_
|
pub fn zs_range_iter<C, K, V, R>(
|
||||||
|
&self,
|
||||||
|
cf: &C,
|
||||||
|
range: R,
|
||||||
|
reverse: bool,
|
||||||
|
) -> impl Iterator<Item = (K, V)> + '_
|
||||||
where
|
where
|
||||||
C: rocksdb::AsColumnFamilyRef,
|
C: rocksdb::AsColumnFamilyRef,
|
||||||
K: IntoDisk + FromDisk,
|
K: IntoDisk + FromDisk,
|
||||||
V: FromDisk,
|
V: FromDisk,
|
||||||
R: RangeBounds<K>,
|
R: RangeBounds<K>,
|
||||||
{
|
{
|
||||||
self.zs_range_iter_with_direction(cf, range, false)
|
self.zs_range_iter_with_direction(cf, range, reverse)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns a reverse iterator over the items in `cf` in `range`.
|
/// Returns a reverse iterator over the items in `cf` in `range`.
|
||||||
|
|
@ -495,11 +475,12 @@ impl DiskDb {
|
||||||
let range = (start_bound, end_bound);
|
let range = (start_bound, end_bound);
|
||||||
|
|
||||||
let mode = Self::zs_iter_mode(&range, reverse);
|
let mode = Self::zs_iter_mode(&range, reverse);
|
||||||
|
let opts = Self::zs_iter_opts(&range);
|
||||||
|
|
||||||
// Reading multiple items from iterators has caused database hangs,
|
// Reading multiple items from iterators has caused database hangs,
|
||||||
// in previous RocksDB versions
|
// in previous RocksDB versions
|
||||||
self.db
|
self.db
|
||||||
.iterator_cf(cf, mode)
|
.iterator_cf_opt(cf, opts, mode)
|
||||||
.map(|result| result.expect("unexpected database failure"))
|
.map(|result| result.expect("unexpected database failure"))
|
||||||
.map(|(key, value)| (key.to_vec(), value))
|
.map(|(key, value)| (key.to_vec(), value))
|
||||||
// Skip excluded "from" bound and empty ranges. The `mode` already skips keys
|
// Skip excluded "from" bound and empty ranges. The `mode` already skips keys
|
||||||
|
|
@ -514,6 +495,64 @@ impl DiskDb {
|
||||||
.map(|(key, value)| (K::from_bytes(key), V::from_bytes(value)))
|
.map(|(key, value)| (K::from_bytes(key), V::from_bytes(value)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns the RocksDB ReadOptions with a lower and upper bound for a range.
|
||||||
|
fn zs_iter_opts<R>(range: &R) -> ReadOptions
|
||||||
|
where
|
||||||
|
R: RangeBounds<Vec<u8>>,
|
||||||
|
{
|
||||||
|
let mut opts = ReadOptions::default();
|
||||||
|
let (lower_bound, upper_bound) = Self::zs_iter_bounds(range);
|
||||||
|
|
||||||
|
if let Some(bound) = lower_bound {
|
||||||
|
opts.set_iterate_lower_bound(bound);
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(bound) = upper_bound {
|
||||||
|
opts.set_iterate_upper_bound(bound);
|
||||||
|
};
|
||||||
|
|
||||||
|
opts
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns a lower and upper iterate bounds for a range.
|
||||||
|
///
|
||||||
|
/// Note: Since upper iterate bounds are always exclusive in RocksDB, this method
|
||||||
|
/// will increment the upper bound by 1 if the end bound of the provided range
|
||||||
|
/// is inclusive.
|
||||||
|
fn zs_iter_bounds<R>(range: &R) -> (Option<Vec<u8>>, Option<Vec<u8>>)
|
||||||
|
where
|
||||||
|
R: RangeBounds<Vec<u8>>,
|
||||||
|
{
|
||||||
|
use std::ops::Bound::*;
|
||||||
|
|
||||||
|
let lower_bound = match range.start_bound() {
|
||||||
|
Included(bound) | Excluded(bound) => Some(bound.clone()),
|
||||||
|
Unbounded => None,
|
||||||
|
};
|
||||||
|
|
||||||
|
let upper_bound = match range.end_bound().cloned() {
|
||||||
|
Included(mut bound) => {
|
||||||
|
// Increment the last byte in the upper bound that is less than u8::MAX, and
|
||||||
|
// clear any bytes after it to increment the next key in lexicographic order
|
||||||
|
// (next big-endian number) this Vec represents to RocksDB.
|
||||||
|
let is_wrapped_overflow = bound.iter_mut().rev().all(|v| {
|
||||||
|
*v = v.wrapping_add(1);
|
||||||
|
v == &0
|
||||||
|
});
|
||||||
|
|
||||||
|
if is_wrapped_overflow {
|
||||||
|
bound.insert(0, 0x01)
|
||||||
|
}
|
||||||
|
|
||||||
|
Some(bound)
|
||||||
|
}
|
||||||
|
Excluded(bound) => Some(bound),
|
||||||
|
Unbounded => None,
|
||||||
|
};
|
||||||
|
|
||||||
|
(lower_bound, upper_bound)
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns the RocksDB iterator "from" mode for `range`.
|
/// Returns the RocksDB iterator "from" mode for `range`.
|
||||||
///
|
///
|
||||||
/// RocksDB iterators are ordered by increasing key bytes by default.
|
/// RocksDB iterators are ordered by increasing key bytes by default.
|
||||||
|
|
|
||||||
|
|
@ -24,3 +24,50 @@ impl DiskDb {
|
||||||
rocksdb::DB::list_cf(&opts, path)
|
rocksdb::DB::list_cf(&opts, path)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Check that zs_iter_opts returns an upper bound one greater than provided inclusive end bounds.
|
||||||
|
#[test]
|
||||||
|
fn zs_iter_opts_increments_key_by_one() {
|
||||||
|
let _init_guard = zebra_test::init();
|
||||||
|
|
||||||
|
// TODO: add an empty key (`()` type or `[]` when serialized) test case
|
||||||
|
let keys: [u32; 14] = [
|
||||||
|
0,
|
||||||
|
1,
|
||||||
|
200,
|
||||||
|
255,
|
||||||
|
256,
|
||||||
|
257,
|
||||||
|
65535,
|
||||||
|
65536,
|
||||||
|
65537,
|
||||||
|
16777215,
|
||||||
|
16777216,
|
||||||
|
16777217,
|
||||||
|
16777218,
|
||||||
|
u32::MAX,
|
||||||
|
];
|
||||||
|
|
||||||
|
for key in keys {
|
||||||
|
let (_, bytes) = DiskDb::zs_iter_bounds(&..=key.to_be_bytes().to_vec());
|
||||||
|
let mut extra_bytes = bytes.expect("there should be an upper bound");
|
||||||
|
let bytes = extra_bytes.split_off(extra_bytes.len() - 4);
|
||||||
|
let upper_bound = u32::from_be_bytes(bytes.clone().try_into().expect("should be 4 bytes"));
|
||||||
|
let expected_upper_bound = key.wrapping_add(1);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
expected_upper_bound, upper_bound,
|
||||||
|
"the upper bound should be 1 greater than the original key"
|
||||||
|
);
|
||||||
|
|
||||||
|
if expected_upper_bound == 0 {
|
||||||
|
assert_eq!(
|
||||||
|
extra_bytes,
|
||||||
|
vec![1],
|
||||||
|
"there should be an extra byte with a value of 1"
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
assert_eq!(extra_bytes.len(), 0, "there should be no extra bytes");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -75,7 +75,7 @@ impl ZebraDb {
|
||||||
) -> impl Iterator<Item = (RawBytes, Arc<HistoryTree>)> + '_ {
|
) -> impl Iterator<Item = (RawBytes, Arc<HistoryTree>)> + '_ {
|
||||||
let history_tree_cf = self.db.cf_handle("history_tree").unwrap();
|
let history_tree_cf = self.db.cf_handle("history_tree").unwrap();
|
||||||
|
|
||||||
self.db.zs_range_iter(&history_tree_cf, ..)
|
self.db.zs_range_iter(&history_tree_cf, .., false)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Value pool methods
|
// Value pool methods
|
||||||
|
|
|
||||||
|
|
@ -153,7 +153,7 @@ impl ZebraDb {
|
||||||
&self,
|
&self,
|
||||||
) -> impl Iterator<Item = (RawBytes, Arc<sprout::tree::NoteCommitmentTree>)> + '_ {
|
) -> impl Iterator<Item = (RawBytes, Arc<sprout::tree::NoteCommitmentTree>)> + '_ {
|
||||||
let sprout_trees = self.db.cf_handle("sprout_note_commitment_tree").unwrap();
|
let sprout_trees = self.db.cf_handle("sprout_note_commitment_tree").unwrap();
|
||||||
self.db.zs_range_iter(&sprout_trees, ..)
|
self.db.zs_range_iter(&sprout_trees, .., false)
|
||||||
}
|
}
|
||||||
|
|
||||||
// # Sapling trees
|
// # Sapling trees
|
||||||
|
|
@ -207,7 +207,7 @@ impl ZebraDb {
|
||||||
R: std::ops::RangeBounds<Height>,
|
R: std::ops::RangeBounds<Height>,
|
||||||
{
|
{
|
||||||
let sapling_trees = self.db.cf_handle("sapling_note_commitment_tree").unwrap();
|
let sapling_trees = self.db.cf_handle("sapling_note_commitment_tree").unwrap();
|
||||||
self.db.zs_range_iter(&sapling_trees, range)
|
self.db.zs_range_iter(&sapling_trees, range, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the Sapling note commitment trees in the reversed range, in decreasing height order.
|
/// Returns the Sapling note commitment trees in the reversed range, in decreasing height order.
|
||||||
|
|
@ -278,7 +278,7 @@ impl ZebraDb {
|
||||||
if let Some(exclusive_end_bound) = exclusive_end_bound {
|
if let Some(exclusive_end_bound) = exclusive_end_bound {
|
||||||
list = self
|
list = self
|
||||||
.db
|
.db
|
||||||
.zs_range_iter(&sapling_subtrees, start_index..exclusive_end_bound)
|
.zs_range_iter(&sapling_subtrees, start_index..exclusive_end_bound, false)
|
||||||
.collect();
|
.collect();
|
||||||
} else {
|
} else {
|
||||||
// If there is no end bound, just return all the trees.
|
// If there is no end bound, just return all the trees.
|
||||||
|
|
@ -287,7 +287,7 @@ impl ZebraDb {
|
||||||
// the trees run out.)
|
// the trees run out.)
|
||||||
list = self
|
list = self
|
||||||
.db
|
.db
|
||||||
.zs_range_iter(&sapling_subtrees, start_index..)
|
.zs_range_iter(&sapling_subtrees, start_index.., false)
|
||||||
.collect();
|
.collect();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -377,7 +377,7 @@ impl ZebraDb {
|
||||||
R: std::ops::RangeBounds<Height>,
|
R: std::ops::RangeBounds<Height>,
|
||||||
{
|
{
|
||||||
let orchard_trees = self.db.cf_handle("orchard_note_commitment_tree").unwrap();
|
let orchard_trees = self.db.cf_handle("orchard_note_commitment_tree").unwrap();
|
||||||
self.db.zs_range_iter(&orchard_trees, range)
|
self.db.zs_range_iter(&orchard_trees, range, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the Orchard note commitment trees in the reversed range, in decreasing height order.
|
/// Returns the Orchard note commitment trees in the reversed range, in decreasing height order.
|
||||||
|
|
@ -448,7 +448,7 @@ impl ZebraDb {
|
||||||
if let Some(exclusive_end_bound) = exclusive_end_bound {
|
if let Some(exclusive_end_bound) = exclusive_end_bound {
|
||||||
list = self
|
list = self
|
||||||
.db
|
.db
|
||||||
.zs_range_iter(&orchard_subtrees, start_index..exclusive_end_bound)
|
.zs_range_iter(&orchard_subtrees, start_index..exclusive_end_bound, false)
|
||||||
.collect();
|
.collect();
|
||||||
} else {
|
} else {
|
||||||
// If there is no end bound, just return all the trees.
|
// If there is no end bound, just return all the trees.
|
||||||
|
|
@ -457,7 +457,7 @@ impl ZebraDb {
|
||||||
// the trees run out.)
|
// the trees run out.)
|
||||||
list = self
|
list = self
|
||||||
.db
|
.db
|
||||||
.zs_range_iter(&orchard_subtrees, start_index..)
|
.zs_range_iter(&orchard_subtrees, start_index.., false)
|
||||||
.collect();
|
.collect();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -241,7 +241,11 @@ impl ZebraDb {
|
||||||
AddressTransaction::address_iterator_range(address_location, query_height_range);
|
AddressTransaction::address_iterator_range(address_location, query_height_range);
|
||||||
|
|
||||||
self.db
|
self.db
|
||||||
.zs_range_iter(&tx_loc_by_transparent_addr_loc, transaction_location_range)
|
.zs_range_iter(
|
||||||
|
&tx_loc_by_transparent_addr_loc,
|
||||||
|
transaction_location_range,
|
||||||
|
false,
|
||||||
|
)
|
||||||
.map(|(tx_loc, ())| tx_loc)
|
.map(|(tx_loc, ())| tx_loc)
|
||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue