change(state): Check database format is valid every 5 minutes, to catch format errors in new block code (#7602)

* Refactor format checks, stop running quick check before upgrade

* Speed up startup by always running the format checks in parallel

* Make detailed format checks cancellable on shutdown

* Prepare for periodic format checks

* Time upgrades and validity checks

* Run a database format check every 5 minutes

* Wait for a cancel signal rather than unconditionally sleeping

* Move check_max_on_disk_height() into the format checks

* Move spawn_format_change() into its own method
This commit is contained in:
teor 2023-09-22 11:33:52 +10:00 committed by GitHub
parent 5b3ecfb913
commit b737ccf570
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 435 additions and 176 deletions

View File

@ -1,5 +1,7 @@
//! Constants that impact state behaviour. //! Constants that impact state behaviour.
use std::time::Duration;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use regex::Regex; use regex::Regex;
use semver::Version; use semver::Version;
@ -67,6 +69,12 @@ pub fn latest_version_for_adding_subtrees() -> Version {
/// Use [`Config::version_file_path()`] to get the path to this file. /// Use [`Config::version_file_path()`] to get the path to this file.
pub(crate) const DATABASE_FORMAT_VERSION_FILE_NAME: &str = "version"; pub(crate) const DATABASE_FORMAT_VERSION_FILE_NAME: &str = "version";
/// The interval between database format checks for newly added blocks.
///
/// This should be short enough that format bugs cause CI test failures,
/// but long enough that it doesn't impact performance.
pub(crate) const DATABASE_FORMAT_CHECK_INTERVAL: Duration = Duration::from_secs(5 * 60);
/// The maximum number of blocks to check for NU5 transactions, /// The maximum number of blocks to check for NU5 transactions,
/// before we assume we are on a pre-NU5 legacy chain. /// before we assume we are on a pre-NU5 legacy chain.
/// ///

View File

@ -1,7 +1,8 @@
//! In-place format upgrades for the Zebra state database. //! In-place format upgrades and format validity checks for the Zebra state database.
use std::{ use std::{
cmp::Ordering, cmp::Ordering,
convert::Infallible,
sync::{mpsc, Arc}, sync::{mpsc, Arc},
thread::{self, JoinHandle}, thread::{self, JoinHandle},
}; };
@ -11,7 +12,10 @@ use tracing::Span;
use zebra_chain::{ use zebra_chain::{
block::Height, block::Height,
diagnostic::task::{CheckForPanics, WaitForPanics}, diagnostic::{
task::{CheckForPanics, WaitForPanics},
CodeTimer,
},
parameters::Network, parameters::Network,
}; };
@ -19,7 +23,9 @@ use DbFormatChange::*;
use crate::{ use crate::{
config::write_database_format_version_to_disk, config::write_database_format_version_to_disk,
constants::{latest_version_for_adding_subtrees, DATABASE_FORMAT_VERSION}, constants::{
latest_version_for_adding_subtrees, DATABASE_FORMAT_CHECK_INTERVAL, DATABASE_FORMAT_VERSION,
},
database_format_version_in_code, database_format_version_on_disk, database_format_version_in_code, database_format_version_on_disk,
service::finalized_state::{DiskWriteBatch, ZebraDb}, service::finalized_state::{DiskWriteBatch, ZebraDb},
Config, Config,
@ -27,15 +33,12 @@ use crate::{
pub(crate) mod add_subtrees; pub(crate) mod add_subtrees;
/// The kind of database format change we're performing. /// The kind of database format change or validity check we're performing.
#[derive(Clone, Debug, Eq, PartialEq)] #[derive(Clone, Debug, Eq, PartialEq)]
pub enum DbFormatChange { pub enum DbFormatChange {
/// Marking the format as newly created by `running_version`. // Data Format Changes
/// //
/// Newly created databases have no disk version. /// Upgrade the format from `older_disk_version` to `newer_running_version`.
NewlyCreated { running_version: Version },
/// Upgrading the format from `older_disk_version` to `newer_running_version`.
/// ///
/// Until this upgrade is complete, the format is a mixture of both versions. /// Until this upgrade is complete, the format is a mixture of both versions.
Upgrade { Upgrade {
@ -43,7 +46,15 @@ pub enum DbFormatChange {
newer_running_version: Version, newer_running_version: Version,
}, },
/// Marking the format as downgraded from `newer_disk_version` to `older_running_version`. // Format Version File Changes
//
/// Mark the format as newly created by `running_version`.
///
/// Newly created databases are opened with no disk version.
/// It is set to the running version by the format change code.
NewlyCreated { running_version: Version },
/// Mark the format as downgraded from `newer_disk_version` to `older_running_version`.
/// ///
/// Until the state is upgraded to `newer_disk_version` by a Zebra version with that state /// Until the state is upgraded to `newer_disk_version` by a Zebra version with that state
/// version (or greater), the format will be a mixture of both versions. /// version (or greater), the format will be a mixture of both versions.
@ -51,6 +62,22 @@ pub enum DbFormatChange {
newer_disk_version: Version, newer_disk_version: Version,
older_running_version: Version, older_running_version: Version,
}, },
// Data Format Checks
//
/// Check that the database from a previous instance has the current `running_version` format.
///
/// Current version databases have a disk version that matches the running version.
/// No upgrades are needed, so we just run a format check on the database.
/// The data in that database was created or updated by a previous Zebra instance.
CheckOpenCurrent { running_version: Version },
/// Check that the database from this instance has the current `running_version` format.
///
/// The data in that database was created or updated by the currently running Zebra instance.
/// So we periodically check for data bugs, which can happen if the upgrade and new block
/// code produce different data. (They can also be caused by disk corruption.)
CheckNewBlocksCurrent { running_version: Version },
} }
/// A handle to a spawned format change thread. /// A handle to a spawned format change thread.
@ -62,39 +89,39 @@ pub enum DbFormatChange {
/// Cancelling the thread on drop has a race condition, because two handles can be dropped at /// Cancelling the thread on drop has a race condition, because two handles can be dropped at
/// the same time. /// the same time.
/// ///
/// If cancelling the thread is important, the owner of the handle must call force_cancel(). /// If cancelling the thread is required for correct operation or usability, the owner of the
/// handle must call force_cancel().
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct DbFormatChangeThreadHandle { pub struct DbFormatChangeThreadHandle {
/// A handle that can wait for the running format change thread to finish. /// A handle to the format change/check thread.
/// This thread continues running so it can perform periodic format checks.
/// ///
/// Panics from this thread are propagated into Zebra's state service. /// Panics from this thread are propagated into Zebra's state service.
/// The task returns an error if the upgrade was cancelled by a shutdown. /// The task returns an error if the upgrade was cancelled by a shutdown.
update_task: Option<Arc<JoinHandle<Result<(), CancelFormatChange>>>>, update_task: Option<Arc<JoinHandle<Result<Infallible, CancelFormatChange>>>>,
/// A channel that tells the running format thread to finish early.
/// A channel that tells the running format thread to finish early. /// A channel that tells the running format thread to finish early.
cancel_handle: mpsc::SyncSender<CancelFormatChange>, cancel_handle: mpsc::SyncSender<CancelFormatChange>,
} }
/// Marker for cancelling a format upgrade. /// Marker type that is sent to cancel a format upgrade, and returned as a error on cancellation.
#[derive(Copy, Clone, Debug, Eq, PartialEq)] #[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct CancelFormatChange; pub struct CancelFormatChange;
impl DbFormatChange { impl DbFormatChange {
/// Check if loading `disk_version` into `running_version` needs a format change, /// Returns the format change for `running_version` code loading a `disk_version` database.
/// and if it does, return the required format change.
/// ///
/// Also logs the kind of change at info level. /// Also logs that change at info level.
/// ///
/// If `disk_version` is `None`, Zebra is creating a new database. /// If `disk_version` is `None`, Zebra is creating a new database.
pub fn new(running_version: Version, disk_version: Option<Version>) -> Option<Self> { pub fn open_database(running_version: Version, disk_version: Option<Version>) -> Self {
let Some(disk_version) = disk_version else { let Some(disk_version) = disk_version else {
info!( info!(
%running_version, %running_version,
"creating new database with the current format" "creating new database with the current format"
); );
return Some(NewlyCreated { running_version }); return NewlyCreated { running_version };
}; };
match disk_version.cmp(&running_version) { match disk_version.cmp(&running_version) {
@ -105,10 +132,10 @@ impl DbFormatChange {
"trying to open older database format: launching upgrade task" "trying to open older database format: launching upgrade task"
); );
Some(Upgrade { Upgrade {
older_disk_version: disk_version, older_disk_version: disk_version,
newer_running_version: running_version, newer_running_version: running_version,
}) }
} }
Ordering::Greater => { Ordering::Greater => {
info!( info!(
@ -117,29 +144,84 @@ impl DbFormatChange {
"trying to open newer database format: data should be compatible" "trying to open newer database format: data should be compatible"
); );
Some(Downgrade { Downgrade {
newer_disk_version: disk_version, newer_disk_version: disk_version,
older_running_version: running_version, older_running_version: running_version,
}) }
} }
Ordering::Equal => { Ordering::Equal => {
info!(%running_version, "trying to open current database format"); info!(%running_version, "trying to open current database format");
None CheckOpenCurrent { running_version }
} }
} }
} }
/// Returns true if this change is an upgrade. /// Returns a format check for newly added blocks in the currently running Zebra version.
/// This check makes sure the upgrade and new block code produce the same data.
///
/// Also logs the check at info level.
pub fn check_new_blocks() -> Self {
let running_version = database_format_version_in_code();
info!(%running_version, "checking new blocks were written in current database format");
CheckNewBlocksCurrent { running_version }
}
/// Returns true if this format change/check is an upgrade.
#[allow(dead_code)] #[allow(dead_code)]
pub fn is_upgrade(&self) -> bool { pub fn is_upgrade(&self) -> bool {
matches!(self, Upgrade { .. }) matches!(self, Upgrade { .. })
} }
/// Launch a `std::thread` that applies this format change to the database. /// Returns true if this format change/check happens at startup.
#[allow(dead_code)]
pub fn is_run_at_startup(&self) -> bool {
!matches!(self, CheckNewBlocksCurrent { .. })
}
/// Returns the running version in this format change.
pub fn running_version(&self) -> Version {
match self {
Upgrade {
newer_running_version,
..
} => newer_running_version,
Downgrade {
older_running_version,
..
} => older_running_version,
NewlyCreated { running_version }
| CheckOpenCurrent { running_version }
| CheckNewBlocksCurrent { running_version } => running_version,
}
.clone()
}
/// Returns the initial database version before this format change.
///
/// Returns `None` if the database was newly created.
pub fn initial_disk_version(&self) -> Option<Version> {
match self {
Upgrade {
older_disk_version, ..
} => Some(older_disk_version),
Downgrade {
newer_disk_version, ..
} => Some(newer_disk_version),
CheckOpenCurrent { running_version } | CheckNewBlocksCurrent { running_version } => {
Some(running_version)
}
NewlyCreated { .. } => None,
}
.cloned()
}
/// Launch a `std::thread` that applies this format change to the database,
/// then continues running to perform periodic format checks.
/// ///
/// `initial_tip_height` is the database height when it was opened, and `upgrade_db` is the /// `initial_tip_height` is the database height when it was opened, and `upgrade_db` is the
/// database instance to upgrade. /// database instance to upgrade or check.
pub fn spawn_format_change( pub fn spawn_format_change(
self, self,
config: Config, config: Config,
@ -156,7 +238,7 @@ impl DbFormatChange {
let span = Span::current(); let span = Span::current();
let update_task = thread::spawn(move || { let update_task = thread::spawn(move || {
span.in_scope(move || { span.in_scope(move || {
self.apply_format_change( self.format_change_run_loop(
config, config,
network, network,
initial_tip_height, initial_tip_height,
@ -176,36 +258,68 @@ impl DbFormatChange {
handle handle
} }
/// Apply this format change to the database. /// Run the initial format change or check to the database, then periodically check the format
/// of newly added blocks matches the current format.
/// ///
/// Format changes are launched in an independent `std::thread` by `apply_format_upgrade()`. /// This method runs until it is cancelled or panics.
/// This thread runs until the upgrade is finished or cancelled. fn format_change_run_loop(
///
/// See `apply_format_upgrade()` for details.
fn apply_format_change(
self, self,
config: Config, config: Config,
network: Network, network: Network,
initial_tip_height: Option<Height>, initial_tip_height: Option<Height>,
upgrade_db: ZebraDb, upgrade_db: ZebraDb,
cancel_receiver: mpsc::Receiver<CancelFormatChange>, cancel_receiver: mpsc::Receiver<CancelFormatChange>,
) -> Result<(), CancelFormatChange> { ) -> Result<Infallible, CancelFormatChange> {
// These quick checks should pass for all format changes. self.run_format_change_or_check(
// (See the detailed comment at the end of this method.) &config,
add_subtrees::quick_check(&upgrade_db); network,
initial_tip_height,
&upgrade_db,
&cancel_receiver,
)?;
loop {
// We've just run a format check, so sleep first, then run another one.
// But return early if there is a cancel signal.
if !matches!(
cancel_receiver.recv_timeout(DATABASE_FORMAT_CHECK_INTERVAL),
Err(mpsc::RecvTimeoutError::Timeout)
) {
return Err(CancelFormatChange);
}
Self::check_new_blocks().run_format_change_or_check(
&config,
network,
initial_tip_height,
&upgrade_db,
&cancel_receiver,
)?;
}
}
/// Run a format change in the database, or check the format of the database once.
#[allow(clippy::unwrap_in_result)]
fn run_format_change_or_check(
&self,
config: &Config,
network: Network,
initial_tip_height: Option<Height>,
upgrade_db: &ZebraDb,
cancel_receiver: &mpsc::Receiver<CancelFormatChange>,
) -> Result<(), CancelFormatChange> {
match self { match self {
// Perform any required upgrades, then mark the state as upgraded. // Perform any required upgrades, then mark the state as upgraded.
Upgrade { .. } => self.apply_format_upgrade( Upgrade { .. } => self.apply_format_upgrade(
config, config,
network, network,
initial_tip_height, initial_tip_height,
upgrade_db.clone(), upgrade_db,
&cancel_receiver, cancel_receiver,
)?, )?,
NewlyCreated { .. } => { NewlyCreated { .. } => {
Self::mark_as_newly_created(&config, network); Self::mark_as_newly_created(config, network);
} }
Downgrade { .. } => { Downgrade { .. } => {
@ -214,7 +328,7 @@ impl DbFormatChange {
// At the start of a format downgrade, the database must be marked as partially or // At the start of a format downgrade, the database must be marked as partially or
// fully downgraded. This lets newer Zebra versions know that some blocks with older // fully downgraded. This lets newer Zebra versions know that some blocks with older
// formats have been added to the database. // formats have been added to the database.
Self::mark_as_downgraded(&config, network); Self::mark_as_downgraded(config, network);
// Older supported versions just assume they can read newer formats, // Older supported versions just assume they can read newer formats,
// because they can't predict all changes a newer Zebra version could make. // because they can't predict all changes a newer Zebra version could make.
@ -222,6 +336,29 @@ impl DbFormatChange {
// The responsibility of staying backwards-compatible is on the newer version. // The responsibility of staying backwards-compatible is on the newer version.
// We do this on a best-effort basis for versions that are still supported. // We do this on a best-effort basis for versions that are still supported.
} }
CheckOpenCurrent { running_version } => {
// If we're re-opening a previously upgraded or newly created database,
// the database format should be valid. This check is done below.
info!(
%running_version,
"checking database format produced by a previous zebra instance \
is current and valid"
);
}
CheckNewBlocksCurrent { running_version } => {
// If we've added new blocks using the non-upgrade code,
// the database format should be valid. This check is done below.
//
// TODO: should this check panic or just log an error?
// Currently, we panic to avoid consensus bugs, but this could cause a denial
// of service. We can make errors fail in CI using ZEBRA_FAILURE_MESSAGES.
info!(
%running_version,
"checking database format produced by new blocks in this instance is valid"
);
}
} }
// These checks should pass for all format changes: // These checks should pass for all format changes:
@ -230,8 +367,19 @@ impl DbFormatChange {
// - since the running Zebra code knows how to upgrade the database to this format, // - since the running Zebra code knows how to upgrade the database to this format,
// downgrades using this running code still know how to create a valid database // downgrades using this running code still know how to create a valid database
// (unless a future upgrade breaks these format checks) // (unless a future upgrade breaks these format checks)
Self::check_for_duplicate_trees(upgrade_db.clone()); // - re-opening the current version should be valid, regardless of whether the upgrade
add_subtrees::check(&upgrade_db); // or new block code created the format (or any combination).
Self::format_validity_checks_detailed(upgrade_db, cancel_receiver)?
.expect("new, upgraded, or downgraded database format is valid");
let inital_disk_version = self
.initial_disk_version()
.map_or_else(|| "None".to_string(), |version| version.to_string());
info!(
running_version = %self.running_version(),
%inital_disk_version,
"database format is valid"
);
Ok(()) Ok(())
} }
@ -248,11 +396,11 @@ impl DbFormatChange {
// New format upgrades must be added to the *end* of this method. // New format upgrades must be added to the *end* of this method.
#[allow(clippy::unwrap_in_result)] #[allow(clippy::unwrap_in_result)]
fn apply_format_upgrade( fn apply_format_upgrade(
self, &self,
config: Config, config: &Config,
network: Network, network: Network,
initial_tip_height: Option<Height>, initial_tip_height: Option<Height>,
db: ZebraDb, db: &ZebraDb,
cancel_receiver: &mpsc::Receiver<CancelFormatChange>, cancel_receiver: &mpsc::Receiver<CancelFormatChange>,
) -> Result<(), CancelFormatChange> { ) -> Result<(), CancelFormatChange> {
let Upgrade { let Upgrade {
@ -274,7 +422,7 @@ impl DbFormatChange {
"marking empty database as upgraded" "marking empty database as upgraded"
); );
Self::mark_as_upgraded_to(&database_format_version_in_code(), &config, network); Self::mark_as_upgraded_to(&database_format_version_in_code(), config, network);
info!( info!(
%newer_running_version, %newer_running_version,
@ -291,7 +439,9 @@ impl DbFormatChange {
Version::parse("25.1.1").expect("Hardcoded version string should be valid."); Version::parse("25.1.1").expect("Hardcoded version string should be valid.");
// Check if we need to prune the note commitment trees in the database. // Check if we need to prune the note commitment trees in the database.
if older_disk_version < version_for_pruning_trees { if older_disk_version < &version_for_pruning_trees {
let timer = CodeTimer::start();
// Prune duplicate Sapling note commitment trees. // Prune duplicate Sapling note commitment trees.
// The last tree we checked. // The last tree we checked.
@ -310,7 +460,7 @@ impl DbFormatChange {
// Delete any duplicate trees. // Delete any duplicate trees.
if tree == last_tree { if tree == last_tree {
let mut batch = DiskWriteBatch::new(); let mut batch = DiskWriteBatch::new();
batch.delete_sapling_tree(&db, &height); batch.delete_sapling_tree(db, &height);
db.write_batch(batch) db.write_batch(batch)
.expect("Deleting Sapling note commitment trees should always succeed."); .expect("Deleting Sapling note commitment trees should always succeed.");
} }
@ -337,7 +487,7 @@ impl DbFormatChange {
// Delete any duplicate trees. // Delete any duplicate trees.
if tree == last_tree { if tree == last_tree {
let mut batch = DiskWriteBatch::new(); let mut batch = DiskWriteBatch::new();
batch.delete_orchard_tree(&db, &height); batch.delete_orchard_tree(db, &height);
db.write_batch(batch) db.write_batch(batch)
.expect("Deleting Orchard note commitment trees should always succeed."); .expect("Deleting Orchard note commitment trees should always succeed.");
} }
@ -347,11 +497,14 @@ impl DbFormatChange {
} }
// Before marking the state as upgraded, check that the upgrade completed successfully. // Before marking the state as upgraded, check that the upgrade completed successfully.
Self::check_for_duplicate_trees(db.clone()); Self::check_for_duplicate_trees(db, cancel_receiver)?
.expect("database format is valid after upgrade");
// Mark the database as upgraded. Zebra won't repeat the upgrade anymore once the // Mark the database as upgraded. Zebra won't repeat the upgrade anymore once the
// database is marked, so the upgrade MUST be complete at this point. // database is marked, so the upgrade MUST be complete at this point.
Self::mark_as_upgraded_to(&version_for_pruning_trees, &config, network); Self::mark_as_upgraded_to(&version_for_pruning_trees, config, network);
timer.finish(module_path!(), line!(), "deduplicate trees upgrade");
} }
// Note commitment subtree creation database upgrade task. // Note commitment subtree creation database upgrade task.
@ -361,20 +514,25 @@ impl DbFormatChange {
Version::parse("25.2.0").expect("Hardcoded version string should be valid."); Version::parse("25.2.0").expect("Hardcoded version string should be valid.");
// Check if we need to add or fix note commitment subtrees in the database. // Check if we need to add or fix note commitment subtrees in the database.
if older_disk_version < latest_version_for_adding_subtrees { if older_disk_version < &latest_version_for_adding_subtrees {
if older_disk_version >= first_version_for_adding_subtrees { let timer = CodeTimer::start();
if older_disk_version >= &first_version_for_adding_subtrees {
// Clear previous upgrade data, because it was incorrect. // Clear previous upgrade data, because it was incorrect.
add_subtrees::reset(initial_tip_height, &db, cancel_receiver)?; add_subtrees::reset(initial_tip_height, db, cancel_receiver)?;
} }
add_subtrees::run(initial_tip_height, &db, cancel_receiver)?; add_subtrees::run(initial_tip_height, db, cancel_receiver)?;
// Before marking the state as upgraded, check that the upgrade completed successfully. // Before marking the state as upgraded, check that the upgrade completed successfully.
add_subtrees::check(&db); add_subtrees::subtree_format_validity_checks_detailed(db, cancel_receiver)?
.expect("database format is valid after upgrade");
// Mark the database as upgraded. Zebra won't repeat the upgrade anymore once the // Mark the database as upgraded. Zebra won't repeat the upgrade anymore once the
// database is marked, so the upgrade MUST be complete at this point. // database is marked, so the upgrade MUST be complete at this point.
Self::mark_as_upgraded_to(&latest_version_for_adding_subtrees, &config, network); Self::mark_as_upgraded_to(&latest_version_for_adding_subtrees, config, network);
timer.finish(module_path!(), line!(), "add subtrees upgrade");
} }
// # New Upgrades Usually Go Here // # New Upgrades Usually Go Here
@ -393,30 +551,93 @@ impl DbFormatChange {
Ok(()) Ok(())
} }
/// Run quick checks that the current database format is valid.
#[allow(clippy::vec_init_then_push)]
pub fn format_validity_checks_quick(db: &ZebraDb) -> Result<(), String> {
let timer = CodeTimer::start();
let mut results = Vec::new();
// Check the entire format before returning any errors.
results.push(db.check_max_on_disk_tip_height());
// This check can be run before the upgrade, but the upgrade code is finished, so we don't
// run it early any more. (If future code changes accidentally make it depend on the
// upgrade, they would accidentally break compatibility with older Zebra cached states.)
results.push(add_subtrees::subtree_format_calculation_pre_checks(db));
// The work is done in the functions we just called.
timer.finish(module_path!(), line!(), "format_validity_checks_quick()");
if results.iter().any(Result::is_err) {
let err = Err(format!("invalid quick check: {results:?}"));
error!(?err);
return err;
}
Ok(())
}
/// Run detailed checks that the current database format is valid.
#[allow(clippy::vec_init_then_push)]
pub fn format_validity_checks_detailed(
db: &ZebraDb,
cancel_receiver: &mpsc::Receiver<CancelFormatChange>,
) -> Result<Result<(), String>, CancelFormatChange> {
let timer = CodeTimer::start();
let mut results = Vec::new();
// Check the entire format before returning any errors.
//
// Do the quick checks first, so we don't have to do this in every detailed check.
results.push(Self::format_validity_checks_quick(db));
results.push(Self::check_for_duplicate_trees(db, cancel_receiver)?);
results.push(add_subtrees::subtree_format_validity_checks_detailed(
db,
cancel_receiver,
)?);
// The work is done in the functions we just called.
timer.finish(module_path!(), line!(), "format_validity_checks_detailed()");
if results.iter().any(Result::is_err) {
let err = Err(format!("invalid detailed check: {results:?}"));
error!(?err);
return Ok(err);
}
Ok(Ok(()))
}
/// Check that note commitment trees were correctly de-duplicated. /// Check that note commitment trees were correctly de-duplicated.
/// //
/// # Panics // TODO: move this method into an deduplication upgrade module file,
/// // along with the upgrade code above.
/// If a duplicate tree is found. #[allow(clippy::unwrap_in_result)]
pub fn check_for_duplicate_trees(upgrade_db: ZebraDb) { fn check_for_duplicate_trees(
db: &ZebraDb,
cancel_receiver: &mpsc::Receiver<CancelFormatChange>,
) -> Result<Result<(), String>, CancelFormatChange> {
// Runtime test: make sure we removed all duplicates. // Runtime test: make sure we removed all duplicates.
// We always run this test, even if the state has supposedly been upgraded. // We always run this test, even if the state has supposedly been upgraded.
let mut duplicate_found = false; let mut result = Ok(());
let mut prev_height = None; let mut prev_height = None;
let mut prev_tree = None; let mut prev_tree = None;
for (height, tree) in upgrade_db.sapling_tree_by_height_range(..) { for (height, tree) in db.sapling_tree_by_height_range(..) {
if prev_tree == Some(tree.clone()) { // Return early if the format check is cancelled.
// TODO: replace this with a panic because it indicates an unrecoverable if !matches!(cancel_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)) {
// bug, which should fail the tests immediately return Err(CancelFormatChange);
error!( }
height = ?height,
prev_height = ?prev_height.unwrap(),
tree_root = ?tree.root(),
"found duplicate sapling trees after running de-duplicate tree upgrade"
);
duplicate_found = true; if prev_tree == Some(tree.clone()) {
result = Err(format!(
"found duplicate sapling trees after running de-duplicate tree upgrade:\
height: {height:?}, previous height: {:?}, tree root: {:?}",
prev_height.unwrap(),
tree.root()
));
error!(?result);
} }
prev_height = Some(height); prev_height = Some(height);
@ -425,30 +646,27 @@ impl DbFormatChange {
let mut prev_height = None; let mut prev_height = None;
let mut prev_tree = None; let mut prev_tree = None;
for (height, tree) in upgrade_db.orchard_tree_by_height_range(..) { for (height, tree) in db.orchard_tree_by_height_range(..) {
if prev_tree == Some(tree.clone()) { // Return early if the format check is cancelled.
// TODO: replace this with a panic because it indicates an unrecoverable if !matches!(cancel_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)) {
// bug, which should fail the tests immediately return Err(CancelFormatChange);
error!( }
height = ?height,
prev_height = ?prev_height.unwrap(),
tree_root = ?tree.root(),
"found duplicate orchard trees after running de-duplicate tree upgrade"
);
duplicate_found = true; if prev_tree == Some(tree.clone()) {
result = Err(format!(
"found duplicate orchard trees after running de-duplicate tree upgrade:\
height: {height:?}, previous height: {:?}, tree root: {:?}",
prev_height.unwrap(),
tree.root()
));
error!(?result);
} }
prev_height = Some(height); prev_height = Some(height);
prev_tree = Some(tree); prev_tree = Some(tree);
} }
if duplicate_found { Ok(result)
panic!(
"found duplicate sapling or orchard trees \
after running de-duplicate tree upgrade"
);
}
} }
/// Mark a newly created database with the current format version. /// Mark a newly created database with the current format version.

View File

@ -145,20 +145,22 @@ pub fn reset(
/// This allows us to fail the upgrade quickly in tests and during development, /// This allows us to fail the upgrade quickly in tests and during development,
/// rather than waiting ~20 minutes to see if it failed. /// rather than waiting ~20 minutes to see if it failed.
/// ///
/// # Panics /// This check runs the first subtree calculation, but it doesn't read the subtree data in the
/// /// database. So it can be run before the upgrade is started.
/// If a note commitment subtree is missing or incorrect. pub fn subtree_format_calculation_pre_checks(db: &ZebraDb) -> Result<(), String> {
pub fn quick_check(db: &ZebraDb) { // Check the entire format before returning any errors.
let sapling_result = quick_check_sapling_subtrees(db); let sapling_result = quick_check_sapling_subtrees(db);
let orchard_result = quick_check_orchard_subtrees(db); let orchard_result = quick_check_orchard_subtrees(db);
if sapling_result.is_err() || orchard_result.is_err() { if sapling_result.is_err() || orchard_result.is_err() {
// TODO: when the check functions are refactored so they are called from a single function, let err = Err(format!(
// move this panic into that function, but still log a detailed message here
panic!(
"missing or bad first subtree: sapling: {sapling_result:?}, orchard: {orchard_result:?}" "missing or bad first subtree: sapling: {sapling_result:?}, orchard: {orchard_result:?}"
); ));
warn!(?err);
return err;
} }
Ok(())
} }
/// A quick test vector that allows us to fail an incorrect upgrade within a few seconds. /// A quick test vector that allows us to fail an incorrect upgrade within a few seconds.
@ -300,34 +302,40 @@ fn quick_check_orchard_subtrees(db: &ZebraDb) -> Result<(), &'static str> {
} }
/// Check that note commitment subtrees were correctly added. /// Check that note commitment subtrees were correctly added.
/// pub fn subtree_format_validity_checks_detailed(
/// # Panics db: &ZebraDb,
/// cancel_receiver: &mpsc::Receiver<CancelFormatChange>,
/// If a note commitment subtree is missing or incorrect. ) -> Result<Result<(), String>, CancelFormatChange> {
pub fn check(db: &ZebraDb) { // This is redundant in some code paths, but not in others. But it's quick anyway.
// This check is partly redundant, but we want to make sure it's never missed. let quick_result = subtree_format_calculation_pre_checks(db);
quick_check(db);
let sapling_result = check_sapling_subtrees(db); // Check the entire format before returning any errors.
let orchard_result = check_orchard_subtrees(db); let sapling_result = check_sapling_subtrees(db, cancel_receiver)?;
let orchard_result = check_orchard_subtrees(db, cancel_receiver)?;
if sapling_result.is_err() || orchard_result.is_err() { if quick_result.is_err() || sapling_result.is_err() || orchard_result.is_err() {
// TODO: when the check functions are refactored so they are called from a single function, let err = Err(format!(
// move this panic into that function, but still log a detailed message here "missing or invalid subtree(s): \
panic!( quick: {quick_result:?}, sapling: {sapling_result:?}, orchard: {orchard_result:?}"
"missing or bad subtree(s): sapling: {sapling_result:?}, orchard: {orchard_result:?}" ));
); warn!(?err);
return Ok(err);
} }
Ok(Ok(()))
} }
/// Check that Sapling note commitment subtrees were correctly added. /// Check that Sapling note commitment subtrees were correctly added.
/// ///
/// Returns an error if a note commitment subtree is missing or incorrect. /// Returns an error if a note commitment subtree is missing or incorrect.
fn check_sapling_subtrees(db: &ZebraDb) -> Result<(), &'static str> { fn check_sapling_subtrees(
db: &ZebraDb,
cancel_receiver: &mpsc::Receiver<CancelFormatChange>,
) -> Result<Result<(), &'static str>, CancelFormatChange> {
let Some(NoteCommitmentSubtreeIndex(mut first_incomplete_subtree_index)) = let Some(NoteCommitmentSubtreeIndex(mut first_incomplete_subtree_index)) =
db.sapling_tree().subtree_index() db.sapling_tree().subtree_index()
else { else {
return Ok(()); return Ok(Ok(()));
}; };
// If there are no incomplete subtrees in the tree, also expect a subtree for the final index. // If there are no incomplete subtrees in the tree, also expect a subtree for the final index.
@ -337,6 +345,11 @@ fn check_sapling_subtrees(db: &ZebraDb) -> Result<(), &'static str> {
let mut result = Ok(()); let mut result = Ok(());
for index in 0..first_incomplete_subtree_index { for index in 0..first_incomplete_subtree_index {
// Return early if the format check is cancelled.
if !matches!(cancel_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)) {
return Err(CancelFormatChange);
}
// Check that there's a continuous range of subtrees from index [0, first_incomplete_subtree_index) // Check that there's a continuous range of subtrees from index [0, first_incomplete_subtree_index)
let Some(subtree) = db.sapling_subtree_by_index(index) else { let Some(subtree) = db.sapling_subtree_by_index(index) else {
result = Err("missing subtree"); result = Err("missing subtree");
@ -402,6 +415,11 @@ fn check_sapling_subtrees(db: &ZebraDb) -> Result<(), &'static str> {
} }
}) })
{ {
// Return early if the format check is cancelled.
if !matches!(cancel_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)) {
return Err(CancelFormatChange);
}
// Check that there's an entry for every completed sapling subtree root in all sapling trees // Check that there's an entry for every completed sapling subtree root in all sapling trees
let Some(subtree) = db.sapling_subtree_by_index(index) else { let Some(subtree) = db.sapling_subtree_by_index(index) else {
result = Err("missing subtree"); result = Err("missing subtree");
@ -434,17 +452,20 @@ fn check_sapling_subtrees(db: &ZebraDb) -> Result<(), &'static str> {
); );
} }
result Ok(result)
} }
/// Check that Orchard note commitment subtrees were correctly added. /// Check that Orchard note commitment subtrees were correctly added.
/// ///
/// Returns an error if a note commitment subtree is missing or incorrect. /// Returns an error if a note commitment subtree is missing or incorrect.
fn check_orchard_subtrees(db: &ZebraDb) -> Result<(), &'static str> { fn check_orchard_subtrees(
db: &ZebraDb,
cancel_receiver: &mpsc::Receiver<CancelFormatChange>,
) -> Result<Result<(), &'static str>, CancelFormatChange> {
let Some(NoteCommitmentSubtreeIndex(mut first_incomplete_subtree_index)) = let Some(NoteCommitmentSubtreeIndex(mut first_incomplete_subtree_index)) =
db.orchard_tree().subtree_index() db.orchard_tree().subtree_index()
else { else {
return Ok(()); return Ok(Ok(()));
}; };
// If there are no incomplete subtrees in the tree, also expect a subtree for the final index. // If there are no incomplete subtrees in the tree, also expect a subtree for the final index.
@ -454,6 +475,11 @@ fn check_orchard_subtrees(db: &ZebraDb) -> Result<(), &'static str> {
let mut result = Ok(()); let mut result = Ok(());
for index in 0..first_incomplete_subtree_index { for index in 0..first_incomplete_subtree_index {
// Return early if the format check is cancelled.
if !matches!(cancel_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)) {
return Err(CancelFormatChange);
}
// Check that there's a continuous range of subtrees from index [0, first_incomplete_subtree_index) // Check that there's a continuous range of subtrees from index [0, first_incomplete_subtree_index)
let Some(subtree) = db.orchard_subtree_by_index(index) else { let Some(subtree) = db.orchard_subtree_by_index(index) else {
result = Err("missing subtree"); result = Err("missing subtree");
@ -519,6 +545,11 @@ fn check_orchard_subtrees(db: &ZebraDb) -> Result<(), &'static str> {
} }
}) })
{ {
// Return early if the format check is cancelled.
if !matches!(cancel_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)) {
return Err(CancelFormatChange);
}
// Check that there's an entry for every completed orchard subtree root in all orchard trees // Check that there's an entry for every completed orchard subtree root in all orchard trees
let Some(subtree) = db.orchard_subtree_by_index(index) else { let Some(subtree) = db.orchard_subtree_by_index(index) else {
result = Err("missing subtree"); result = Err("missing subtree");
@ -551,7 +582,7 @@ fn check_orchard_subtrees(db: &ZebraDb) -> Result<(), &'static str> {
); );
} }
result Ok(result)
} }
/// Calculates a note commitment subtree for Sapling, reading blocks from `read_db` if needed. /// Calculates a note commitment subtree for Sapling, reading blocks from `read_db` if needed.

View File

@ -19,7 +19,7 @@ use crate::{
disk_db::DiskDb, disk_db::DiskDb,
disk_format::{ disk_format::{
block::MAX_ON_DISK_HEIGHT, block::MAX_ON_DISK_HEIGHT,
upgrade::{self, DbFormatChange, DbFormatChangeThreadHandle}, upgrade::{DbFormatChange, DbFormatChangeThreadHandle},
}, },
}, },
Config, Config,
@ -41,6 +41,15 @@ pub mod arbitrary;
/// the database is closed. /// the database is closed.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct ZebraDb { pub struct ZebraDb {
// Configuration
//
// This configuration cannot be modified after the database is initialized,
// because some clones would have different values.
//
/// Should format upgrades and format checks be skipped for this instance?
/// Only used in test code.
debug_skip_format_upgrades: bool,
// Owned State // Owned State
// //
// Everything contained in this state must be shared by all clones, or read-only. // Everything contained in this state must be shared by all clones, or read-only.
@ -69,10 +78,11 @@ impl ZebraDb {
.expect("unable to read database format version file"); .expect("unable to read database format version file");
// Log any format changes before opening the database, in case opening fails. // Log any format changes before opening the database, in case opening fails.
let format_change = DbFormatChange::new(running_version, disk_version); let format_change = DbFormatChange::open_database(running_version, disk_version);
// Open the database and do initial checks. // Open the database and do initial checks.
let mut db = ZebraDb { let mut db = ZebraDb {
debug_skip_format_upgrades,
format_change_handle: None, format_change_handle: None,
// After the database directory is created, a newly created database temporarily // After the database directory is created, a newly created database temporarily
// changes to the default database version. Then we set the correct version in the // changes to the default database version. Then we set the correct version in the
@ -81,52 +91,44 @@ impl ZebraDb {
db: DiskDb::new(config, network), db: DiskDb::new(config, network),
}; };
db.check_max_on_disk_tip_height(); db.spawn_format_change(config, network, format_change);
db
}
/// Launch any required format changes or format checks, and store their thread handle.
pub fn spawn_format_change(
&mut self,
config: &Config,
network: Network,
format_change: DbFormatChange,
) {
// Always do format upgrades & checks in production code.
if cfg!(test) && self.debug_skip_format_upgrades {
return;
}
// We have to get this height before we spawn the upgrade task, because threads can take // We have to get this height before we spawn the upgrade task, because threads can take
// a while to start, and new blocks can be committed as soon as we return from this method. // a while to start, and new blocks can be committed as soon as we return from this method.
let initial_tip_height = db.finalized_tip_height(); let initial_tip_height = self.finalized_tip_height();
// Always do format upgrades & checks in production code. // `upgrade_db` is a special clone of this database, which can't be used to shut down
if cfg!(test) && debug_skip_format_upgrades { // the upgrade task. (Because the task hasn't been launched yet,
return db; // its `db.format_change_handle` is always None.)
} let upgrade_db = self.clone();
// Start any required format changes, and do format checks. // TODO:
// // - should debug_stop_at_height wait for the upgrade task to finish?
// TODO: should debug_stop_at_height wait for these upgrades, or not? // - if needed, make upgrade_db into a FinalizedState,
if let Some(format_change) = format_change { // or move the FinalizedState methods needed for upgrades to ZebraDb.
// Launch the format change and install its handle in the database. let format_change_handle = format_change.spawn_format_change(
// config.clone(),
// `upgrade_db` is a special clone of the database, which can't be used to shut down network,
// the upgrade task. (Because the task hasn't been launched yet, initial_tip_height,
// `db.format_change_handle` is always None.) upgrade_db,
// );
// It can be a FinalizedState if needed, or the FinalizedState methods needed for
// upgrades can be moved to ZebraDb.
let upgrade_db = db.clone();
let format_change_handle = format_change.spawn_format_change( self.format_change_handle = Some(format_change_handle);
config.clone(),
network,
initial_tip_height,
upgrade_db,
);
db.format_change_handle = Some(format_change_handle);
} else {
// If we're re-opening a previously upgraded or newly created database,
// the database format should be valid.
// (There's no format change here, so the format change checks won't run.)
//
// Do the quick checks first, then the slower checks.
upgrade::add_subtrees::quick_check(&db);
DbFormatChange::check_for_duplicate_trees(db.clone());
upgrade::add_subtrees::check(&db);
}
db
} }
/// Returns the configured network for this database. /// Returns the configured network for this database.
@ -147,9 +149,6 @@ impl ZebraDb {
if let Some(format_change_handle) = self.format_change_handle.as_mut() { if let Some(format_change_handle) = self.format_change_handle.as_mut() {
format_change_handle.check_for_panics(); format_change_handle.check_for_panics();
} }
// This check doesn't panic, but we want to check it regularly anyway.
self.check_max_on_disk_tip_height();
} }
/// Shut down the database, cleaning up background tasks and ephemeral data. /// Shut down the database, cleaning up background tasks and ephemeral data.
@ -184,17 +183,20 @@ impl ZebraDb {
/// # Logs an Error /// # Logs an Error
/// ///
/// If Zebra is storing block heights that are close to [`MAX_ON_DISK_HEIGHT`]. /// If Zebra is storing block heights that are close to [`MAX_ON_DISK_HEIGHT`].
fn check_max_on_disk_tip_height(&self) { pub(crate) fn check_max_on_disk_tip_height(&self) -> Result<(), String> {
if let Some((tip_height, tip_hash)) = self.tip() { if let Some((tip_height, tip_hash)) = self.tip() {
if tip_height.0 > MAX_ON_DISK_HEIGHT.0 / 2 { if tip_height.0 > MAX_ON_DISK_HEIGHT.0 / 2 {
error!( let err = Err(format!(
?tip_height, "unexpectedly large tip height, database format upgrade required: \
?tip_hash, tip height: {tip_height:?}, tip hash: {tip_hash:?}, \
?MAX_ON_DISK_HEIGHT, max height: {MAX_ON_DISK_HEIGHT:?}"
"unexpectedly large tip height, database format upgrade required", ));
); error!(?err);
return err;
} }
} }
Ok(())
} }
} }