change(state): Prepare for in-place database format upgrades, but don't make any format changes yet (#7031)

* Move format upgrades to their own module and enum

* Launch a format change thread if needed, and shut it down during shutdown

* Add some TODOs and remove a redundant timer

* Regularly check for panics in the state upgrade task

* Only run example upgrade once, change version field names

* Increment database format to 25.0.2: add format change task

* Log the running and initial disk database format versions on startup

* Add initial disk and running state versions to cached state images in CI

* Fix missing imports

* Fix typo in logs workflow command

* Add a force_save_to_disk argument to the CI workflow

* Move use_internet_connection into zebrad_config()

* fastmod can_spawn_zebrad_for_rpc can_spawn_zebrad_for_test_type zebra*

* Add a spawn_zebrad_without_rpc() function

* Remove unused copy_state() test code

* Assert that upgrades and downgrades happen with the correct versions

* Add a kill_and_return_output() method for tests

* Add a test for new_state_format() versions (no upgrades or downgrades)

* Add use_internet_connection to can_spawn_zebrad_for_test_type()

* Fix workflow parameter passing

* Check that reopening a new database doesn't upgrade (or downgrade) the format

* Allow ephemeral to be set to false even if we don't have a cached state

* Add a test type that will accept any kind of state

* When re-using a directory, configure the state test config with that path

* Actually mark newly created databases with their format versions

* Wait for the state to be opened before testing the format

* Run state format tests on mainnet and testnet configs (no network access)

* run multiple reopens in tests

* Test upgrades run correctly

* Test that version downgrades work as expected (best effort)

* Add a TODO for testing partial updates

* Fix missing test arguments

* clippy if chain

* Fix typo

* another typo

* Pass a database instance to the format upgrade task

* Fix a timing issue in the tests

* Fix version matching in CI

* Use correct env var reference

* Use correct github env file

* Wait for the database to be written before killing Zebra

* Use correct workflow syntax

* Version changes aren't always upgrades

---------

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
This commit is contained in:
teor 2023-07-14 07:36:15 +10:00 committed by GitHub
parent 3b34e48bf2
commit be5cfad07f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1260 additions and 250 deletions

View File

@ -35,6 +35,11 @@ on:
default: false
description: 'Just run a lightwalletd full sync and update tip disks'
required: true
force_save_to_disk:
required: false
type: boolean
default: false
description: 'Force tests to always create a cached state disk, if they already create disks'
no_cache:
description: 'Disable the Docker cache for this build'
required: false
@ -337,6 +342,7 @@ jobs:
network: ${{ inputs.network || vars.ZCASH_NETWORK }}
needs_zebra_state: false
saves_to_disk: true
force_save_to_disk: ${{ inputs.force_save_to_disk || false }}
disk_suffix: checkpoint
height_grep_text: 'flushing database to disk .*height.*=.*Height.*\('
secrets: inherit
@ -399,6 +405,7 @@ jobs:
is_long_test: true
needs_zebra_state: false
saves_to_disk: true
force_save_to_disk: ${{ inputs.force_save_to_disk || false }}
disk_suffix: tip
height_grep_text: 'current_height.*=.*Height.*\('
secrets: inherit
@ -439,6 +446,7 @@ jobs:
needs_zebra_state: true
# update the disk on every PR, to increase CI speed
saves_to_disk: true
force_save_to_disk: ${{ inputs.force_save_to_disk || false }}
disk_suffix: tip
root_state_path: '/var/cache'
zebra_state_dir: 'zebrad-cache'
@ -511,6 +519,7 @@ jobs:
is_long_test: true
needs_zebra_state: false
saves_to_disk: true
force_save_to_disk: ${{ inputs.force_save_to_disk || false }}
disk_suffix: tip
height_grep_text: 'current_height.*=.*Height.*\('
secrets: inherit
@ -554,6 +563,7 @@ jobs:
# update the disk on every PR, to increase CI speed
# we don't have a test-update-sync-testnet job, so we need to update the disk here
saves_to_disk: true
force_save_to_disk: ${{ inputs.force_save_to_disk || false }}
disk_suffix: tip
root_state_path: '/var/cache'
zebra_state_dir: 'zebrad-cache'
@ -587,6 +597,7 @@ jobs:
needs_zebra_state: true
needs_lwd_state: false
saves_to_disk: true
force_save_to_disk: ${{ inputs.force_save_to_disk || false }}
disk_prefix: lwd-cache
disk_suffix: tip
root_state_path: '/var/cache'

View File

@ -79,7 +79,12 @@ on:
saves_to_disk:
required: true
type: boolean
description: 'Does the test create a new cached state disk?'
description: 'Can this test create new or updated cached state disks?'
force_save_to_disk:
required: false
type: boolean
default: false
description: 'Force this test to create a new or updated cached state disk'
app_name:
required: false
type: string
@ -1702,7 +1707,7 @@ jobs:
# We run exactly one of without-cached-state or with-cached-state, and we always skip the other one.
# Normally, if a job is skipped, all the jobs that depend on it are also skipped.
# So we need to override the default success() check to make this job run.
if: ${{ !cancelled() && !failure() && inputs.saves_to_disk }}
if: ${{ !cancelled() && !failure() && (inputs.saves_to_disk || inputs.force_save_to_disk) }}
permissions:
contents: 'read'
id-token: 'write'
@ -1791,6 +1796,96 @@ jobs:
echo "UPDATE_SUFFIX=$UPDATE_SUFFIX" >> "$GITHUB_ENV"
echo "TIME_SUFFIX=$TIME_SUFFIX" >> "$GITHUB_ENV"
# Get the full initial and running database versions from the test logs.
# These versions are used as part of the disk description and labels.
#
# If these versions are missing from the logs, the job fails.
#
# Typically, the database versions are around line 20 in the logs..
# But we check the first 1000 log lines, just in case the test harness recompiles all the
# dependencies before running the test. (This can happen if the cache is invalid.)
#
# Passes the versions to subsequent steps using the $INITIAL_DISK_DB_VERSION,
# $RUNNING_DB_VERSION, and $DB_VERSION_SUMMARY env variables.
- name: Get database versions from logs
run: |
INITIAL_DISK_DB_VERSION=""
RUNNING_DB_VERSION=""
DB_VERSION_SUMMARY=""
DOCKER_LOGS=$( \
gcloud compute ssh ${{ inputs.test_id }}-${{ env.GITHUB_REF_SLUG_URL }}-${{ env.GITHUB_SHA_SHORT }} \
--zone ${{ vars.GCP_ZONE }} \
--ssh-flag="-o ServerAliveInterval=5" \
--ssh-flag="-o ConnectionAttempts=20" \
--ssh-flag="-o ConnectTimeout=5" \
--command=" \
sudo docker logs ${{ inputs.test_id }} | head -1000 \
")
# either a semantic version or "creating new database"
INITIAL_DISK_DB_VERSION=$( \
echo "$DOCKER_LOGS" | \
grep --extended-regexp --only-matching 'initial disk state version: [0-9a-z\.]+' | \
grep --extended-regexp --only-matching '[0-9a-z\.]+' | \
tail -1 || \
[[ $? == 1 ]] \
)
if [[ -z "$INITIAL_DISK_DB_VERSION" ]]; then
echo "Checked logs:"
echo ""
echo "$DOCKER_LOGS"
echo ""
echo "Missing initial disk database version in logs: $INITIAL_DISK_DB_VERSION"
# Fail the tests, because Zebra didn't log the initial disk database version,
# or the regex in this step is wrong.
false
fi
if [[ "$INITIAL_DISK_DB_VERSION" = "creating.new.database" ]]; then
INITIAL_DISK_DB_VERSION="new"
else
INITIAL_DISK_DB_VERSION="v${INITIAL_DISK_DB_VERSION//./-}"
fi
echo "Found initial disk database version in logs: $INITIAL_DISK_DB_VERSION"
echo "INITIAL_DISK_DB_VERSION=$INITIAL_DISK_DB_VERSION" >> "$GITHUB_ENV"
RUNNING_DB_VERSION=$( \
echo "$DOCKER_LOGS" | \
grep --extended-regexp --only-matching 'running state version: [0-9\.]+' | \
grep --extended-regexp --only-matching '[0-9\.]+' | \
tail -1 || \
[[ $? == 1 ]] \
)
if [[ -z "$RUNNING_DB_VERSION" ]]; then
echo "Checked logs:"
echo ""
echo "$DOCKER_LOGS"
echo ""
echo "Missing running database version in logs: $RUNNING_DB_VERSION"
# Fail the tests, because Zebra didn't log the running database version,
# or the regex in this step is wrong.
false
fi
RUNNING_DB_VERSION="v${RUNNING_DB_VERSION//./-}"
echo "Found running database version in logs: $RUNNING_DB_VERSION"
echo "RUNNING_DB_VERSION=$RUNNING_DB_VERSION" >> "$GITHUB_ENV"
if [[ "$INITIAL_DISK_DB_VERSION" = "$RUNNING_DB_VERSION" ]]; then
DB_VERSION_SUMMARY="$RUNNING_DB_VERSION"
elif [[ "$INITIAL_DISK_DB_VERSION" = "new" ]]; then
DB_VERSION_SUMMARY="$RUNNING_DB_VERSION in new database"
else
DB_VERSION_SUMMARY="$INITIAL_DISK_DB_VERSION changing to $RUNNING_DB_VERSION"
fi
echo "Summarised database versions from logs: $DB_VERSION_SUMMARY"
echo "DB_VERSION_SUMMARY=$DB_VERSION_SUMMARY" >> "$GITHUB_ENV"
# Get the sync height from the test logs, which is later used as part of the
# disk description and labels.
#
@ -1800,7 +1895,7 @@ jobs:
#
# If the sync height is missing from the logs, the job fails.
#
# Passes the sync height to subsequent steps using $SYNC_HEIGHT env variable.
# Passes the sync height to subsequent steps using the $SYNC_HEIGHT env variable.
- name: Get sync height from logs
run: |
SYNC_HEIGHT=""
@ -1818,12 +1913,16 @@ jobs:
SYNC_HEIGHT=$( \
echo "$DOCKER_LOGS" | \
grep --extended-regexp --only-matching '${{ inputs.height_grep_text }}[0-9]+' | \
grep --extended-regexp --only-matching '[0-9]+' | \
grep --extended-regexp --only-matching '[0-9]+' | \
tail -1 || \
[[ $? == 1 ]] \
)
if [[ -z "$SYNC_HEIGHT" ]]; then
echo "Checked logs:"
echo ""
echo "$DOCKER_LOGS"
echo ""
echo "Missing sync height in logs: $SYNC_HEIGHT"
# Fail the tests, because Zebra and lightwalletd didn't log their sync heights,
# or the CI workflow sync height regex is wrong.
@ -1885,15 +1984,15 @@ jobs:
- name: Create image from state disk
run: |
MINIMUM_UPDATE_HEIGHT=$((ORIGINAL_HEIGHT+CACHED_STATE_UPDATE_LIMIT))
if [[ -z "$UPDATE_SUFFIX" ]] || [[ "$SYNC_HEIGHT" -gt "$MINIMUM_UPDATE_HEIGHT" ]]; then
if [[ -z "$UPDATE_SUFFIX" ]] || [[ "$SYNC_HEIGHT" -gt "$MINIMUM_UPDATE_HEIGHT" ]] || [[ "${{ inputs.force_save_to_disk }}" == "true" ]]; then
gcloud compute images create \
"${{ inputs.disk_prefix }}-${SHORT_GITHUB_REF}-${{ env.GITHUB_SHA_SHORT }}-v${{ env.STATE_VERSION }}-${NETWORK}-${{ inputs.disk_suffix }}${UPDATE_SUFFIX}-${TIME_SUFFIX}" \
--force \
--source-disk=${{ inputs.test_id }}-${{ env.GITHUB_SHA_SHORT }} \
--source-disk-zone=${{ vars.GCP_ZONE }} \
--storage-location=us \
--description="Created from commit ${{ env.GITHUB_SHA_SHORT }} with height ${{ env.SYNC_HEIGHT }}" \
--labels="height=${{ env.SYNC_HEIGHT }},purpose=${{ inputs.disk_prefix }},commit=${{ env.GITHUB_SHA_SHORT }},state-version=${{ env.STATE_VERSION }},network=${NETWORK},target-height-kind=${{ inputs.disk_suffix }},update-flag=${UPDATE_SUFFIX},updated-from-height=${ORIGINAL_HEIGHT},test-id=${{ inputs.test_id }},app-name=${{ inputs.app_name }}"
--description="Created from commit ${{ env.GITHUB_SHA_SHORT }} with height ${{ env.SYNC_HEIGHT }} and database format ${{ env.DB_VERSION_SUMMARY }}" \
--labels="height=${{ env.SYNC_HEIGHT }},purpose=${{ inputs.disk_prefix }},commit=${{ env.GITHUB_SHA_SHORT }},state-version=${{ env.STATE_VERSION }},state-running-version=${RUNNING_DB_VERSION},initial-state-disk-version=${INITIAL_DISK_DB_VERSION},network=${NETWORK},target-height-kind=${{ inputs.disk_suffix }},update-flag=${UPDATE_SUFFIX},force-save=${{ inputs.force_save_to_disk }},updated-from-height=${ORIGINAL_HEIGHT},test-id=${{ inputs.test_id }},app-name=${{ inputs.app_name }}"
else
echo "Skipped cached state update because the new sync height $SYNC_HEIGHT was less than $CACHED_STATE_UPDATE_LIMIT blocks above the original height $ORIGINAL_HEIGHT"
fi

View File

@ -175,6 +175,7 @@ impl Default for Config {
}
// Cleaning up old database versions
// TODO: put this in a different module?
/// Spawns a task that checks if there are old database folders,
/// and deletes them from the filesystem.
@ -292,6 +293,8 @@ fn parse_version_number(dir_name: &str) -> Option<u64> {
.and_then(|version| version.parse().ok())
}
// TODO: move these to the format upgrade module
/// Returns the full semantic version of the currently running database format code.
///
/// This is the version implemented by the Zebra code that's currently running,
@ -336,29 +339,40 @@ pub fn database_format_version_on_disk(
)))
}
/// Writes the currently running semantic database version to the on-disk database.
/// Writes `changed_version` to the on-disk database after the format is changed.
/// (Or a new database is created.)
///
/// # Correctness
///
/// This should only be called after all running format upgrades are complete.
/// This should only be called:
/// - after each format upgrade is complete,
/// - when creating a new database, or
/// - when an older Zebra version opens a newer database.
///
/// # Concurrency
///
/// This must only be called while RocksDB has an open database for `config`.
/// Otherwise, multiple Zebra processes could write the version at the same time,
/// corrupting the file.
///
/// # Panics
///
/// If the major versions do not match. (The format is incompatible.)
pub fn write_database_format_version_to_disk(
changed_version: &Version,
config: &Config,
network: Network,
) -> Result<(), BoxError> {
let version_path = config.version_file_path(network);
// The major version is already in the directory path.
let version = format!(
"{}.{}",
DATABASE_FORMAT_MINOR_VERSION, DATABASE_FORMAT_PATCH_VERSION
assert_eq!(
changed_version.major, DATABASE_FORMAT_VERSION,
"tried to do in-place database format change to an incompatible version"
);
let version = format!("{}.{}", changed_version.minor, changed_version.patch);
// # Concurrency
//
// The caller handles locking for this file write.

View File

@ -5,7 +5,7 @@ use regex::Regex;
// For doc comment links
#[allow(unused_imports)]
use crate::config::{database_format_version_in_code, database_format_version_on_disk};
use crate::config::{self, Config};
pub use zebra_chain::transparent::MIN_TRANSPARENT_COINBASE_MATURITY;
@ -37,9 +37,9 @@ pub const MAX_BLOCK_REORG_HEIGHT: u32 = MIN_TRANSPARENT_COINBASE_MATURITY - 1;
/// - we previously added compatibility code, and
/// - it's available in all supported Zebra versions.
///
/// Use [`database_format_version_in_code()`] or [`database_format_version_on_disk()`]
/// to get the full semantic format version.
pub const DATABASE_FORMAT_VERSION: u64 = 25;
/// Use [`config::database_format_version_in_code()`] or
/// [`config::database_format_version_on_disk()`] to get the full semantic format version.
pub(crate) const DATABASE_FORMAT_VERSION: u64 = 25;
/// The database format minor version, incremented each time the on-disk database format has a
/// significant data format change.
@ -48,14 +48,16 @@ pub const DATABASE_FORMAT_VERSION: u64 = 25;
/// - adding new column families,
/// - changing the format of a column family in a compatible way, or
/// - breaking changes with compatibility code in all supported Zebra versions.
pub const DATABASE_FORMAT_MINOR_VERSION: u64 = 0;
pub(crate) const DATABASE_FORMAT_MINOR_VERSION: u64 = 0;
/// The database format patch version, incremented each time the on-disk database format has a
/// significant format compatibility fix.
pub const DATABASE_FORMAT_PATCH_VERSION: u64 = 1;
pub(crate) const DATABASE_FORMAT_PATCH_VERSION: u64 = 2;
/// The name of the file containing the minor and patch database versions.
pub const DATABASE_FORMAT_VERSION_FILE_NAME: &str = "version";
///
/// Use [`Config::version_file_path()`] to get the path to this file.
pub(crate) const DATABASE_FORMAT_VERSION_FILE_NAME: &str = "version";
/// The maximum number of blocks to check for NU5 transactions,
/// before we assume we are on a pre-NU5 legacy chain.

View File

@ -29,7 +29,10 @@ mod service;
#[cfg(test)]
mod tests;
pub use config::{check_and_delete_old_databases, Config};
pub use config::{
check_and_delete_old_databases, database_format_version_in_code,
database_format_version_on_disk, Config,
};
pub use constants::MAX_BLOCK_REORG_HEIGHT;
pub use error::{
BoxError, CloneError, CommitSemanticallyVerifiedError, DuplicateNullifierError,
@ -57,4 +60,7 @@ pub use service::{
init_test, init_test_services, ReadStateService,
};
#[cfg(any(test, feature = "proptest-impl"))]
pub use config::write_database_format_version_to_disk;
pub(crate) use request::ContextuallyVerifiedBlock;

View File

@ -263,6 +263,7 @@ impl Drop for ReadStateService {
// The read state service shares the state,
// so dropping it should check if we can shut down.
// TODO: move this into a try_shutdown() method
if let Some(block_write_task) = self.block_write_task.take() {
if let Some(block_write_task_handle) = Arc::into_inner(block_write_task) {
// We're the last database user, so we can tell it to shut down (blocking):
@ -280,6 +281,7 @@ impl Drop for ReadStateService {
#[cfg(test)]
debug!("waiting for the block write task to finish");
// TODO: move this into a check_for_panics() method
if let Err(thread_panic) = block_write_task_handle.join() {
std::panic::resume_unwind(thread_panic);
} else {
@ -343,9 +345,7 @@ impl StateService {
.tip_block()
.map(CheckpointVerifiedBlock::from)
.map(ChainTipBlock::from);
timer.finish(module_path!(), line!(), "fetching database tip");
let timer = CodeTimer::start();
let (chain_tip_sender, latest_chain_tip, chain_tip_change) =
ChainTipSender::new(initial_tip, network);
@ -1161,6 +1161,8 @@ impl Service<ReadRequest> for ReadStateService {
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// Check for panics in the block write task
//
// TODO: move into a check_for_panics() method
let block_write_task = self.block_write_task.take();
if let Some(block_write_task) = block_write_task {
@ -1177,6 +1179,8 @@ impl Service<ReadRequest> for ReadStateService {
}
}
self.db.check_for_panics();
Poll::Ready(Ok(()))
}

View File

@ -7,11 +7,10 @@
//!
//! # Correctness
//!
//! The [`crate::constants::DATABASE_FORMAT_VERSION`] constant must
//! The [`crate::constants::DATABASE_FORMAT_VERSION`] constants must
//! be incremented each time the database format (column, serialization, etc) changes.
use std::{
cmp::Ordering,
collections::{BTreeMap, HashMap},
fmt::Debug,
ops::RangeBounds,
@ -25,10 +24,6 @@ use rlimit::increase_nofile_limit;
use zebra_chain::parameters::Network;
use crate::{
config::{
database_format_version_in_code, database_format_version_on_disk,
write_database_format_version_to_disk,
},
service::finalized_state::disk_format::{FromDisk, IntoDisk},
Config,
};
@ -528,35 +523,6 @@ impl DiskDb {
pub fn new(config: &Config, network: Network) -> DiskDb {
let path = config.db_path(network);
let running_version = database_format_version_in_code();
let disk_version = database_format_version_on_disk(config, network)
.expect("unable to read database format version file");
match disk_version.as_ref().map(|disk| disk.cmp(&running_version)) {
// TODO: if the on-disk format is older, actually run the upgrade task after the
// database has been opened (#6642)
Some(Ordering::Less) => info!(
?running_version,
?disk_version,
"trying to open older database format: launching upgrade task"
),
// TODO: if the on-disk format is newer, downgrade the version after the
// database has been opened (#6642)
Some(Ordering::Greater) => info!(
?running_version,
?disk_version,
"trying to open newer database format: data should be compatible"
),
Some(Ordering::Equal) => info!(
?running_version,
"trying to open compatible database format"
),
None => info!(
?running_version,
"creating new database with the current format"
),
}
let db_options = DiskDb::options();
// When opening the database in read/write mode, all column families must be opened.
@ -590,27 +556,6 @@ impl DiskDb {
db.assert_default_cf_is_empty();
// Now we've checked that the database format is up-to-date,
// mark it as updated on disk.
//
// # Concurrency
//
// The version must only be updated while RocksDB is holding the database
// directory lock. This prevents multiple Zebra instances corrupting the version
// file.
//
// # TODO
//
// - only update the version at the end of the format upgrade task (#6642)
// - add a note to the format upgrade task code to update the version constants
// whenever the format changes
// - add a test that the format upgrade runs exactly once when:
// 1. if an older cached state format is opened, the format is upgraded,
// then if Zebra is launched again the format is not upgraded
// 2. if the current cached state format is opened, the format is not upgraded
write_database_format_version_to_disk(config, network)
.expect("unable to write database format version file to disk");
db
}
@ -809,6 +754,19 @@ impl DiskDb {
// Cleanup methods
/// Returns the number of shared instances of this database.
///
/// # Concurrency
///
/// The actual number of owners can be higher or lower than the returned value,
/// because databases can simultaneously be cloned or dropped in other threads.
///
/// However, if the number of owners is 1, and the caller has exclusive access,
/// the count can't increase unless that caller clones the database.
pub(crate) fn shared_database_owners(&self) -> usize {
Arc::strong_count(&self.db) + Arc::weak_count(&self.db)
}
/// Shut down the database, cleaning up background tasks and ephemeral data.
///
/// If `force` is true, clean up regardless of any shared references.
@ -829,9 +787,8 @@ impl DiskDb {
// instance. If they do, they must drop it before:
// - shutting down database threads, or
// - deleting database files.
let shared_database_owners = Arc::strong_count(&self.db) + Arc::weak_count(&self.db);
if shared_database_owners > 1 {
if self.shared_database_owners() > 1 {
let path = self.path();
let mut ephemeral_note = "";

View File

@ -11,6 +11,7 @@ pub mod block;
pub mod chain;
pub mod shielded;
pub mod transparent;
pub mod upgrade;
#[cfg(test)]
mod tests;

View File

@ -0,0 +1,533 @@
//! In-place format upgrades for the Zebra state database.
use std::{
cmp::Ordering,
panic,
sync::{mpsc, Arc},
thread::{self, JoinHandle},
};
use semver::Version;
use tracing::Span;
use zebra_chain::{block::Height, parameters::Network};
use DbFormatChange::*;
use crate::{
config::write_database_format_version_to_disk, database_format_version_in_code,
database_format_version_on_disk, service::finalized_state::ZebraDb, Config,
};
/// The kind of database format change we're performing.
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum DbFormatChange {
/// Marking the format as newly created by `running_version`.
///
/// Newly created databases have no disk version.
NewlyCreated { running_version: Version },
/// Upgrading the format from `older_disk_version` to `newer_running_version`.
///
/// Until this upgrade is complete, the format is a mixture of both versions.
Upgrade {
older_disk_version: Version,
newer_running_version: Version,
},
/// Marking the format as downgraded from `newer_disk_version` to `older_running_version`.
///
/// Until the state is upgraded to `newer_disk_version` by a Zebra version with that state
/// version (or greater), the format will be a mixture of both versions.
Downgrade {
newer_disk_version: Version,
older_running_version: Version,
},
}
/// A handle to a spawned format change thread.
///
/// Cloning this struct creates an additional handle to the same thread.
///
/// # Concurrency
///
/// Cancelling the thread on drop has a race condition, because two handles can be dropped at
/// the same time.
///
/// If cancelling the thread is important, the owner of the handle must call force_cancel().
#[derive(Clone, Debug)]
pub struct DbFormatChangeThreadHandle {
/// A handle that can wait for the running format change thread to finish.
///
/// Panics from this thread are propagated into Zebra's state service.
update_task: Option<Arc<JoinHandle<()>>>,
/// A channel that tells the running format thread to finish early.
cancel_handle: mpsc::SyncSender<CancelFormatChange>,
}
/// Marker for cancelling a format upgrade.
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct CancelFormatChange;
impl DbFormatChange {
/// Check if loading `disk_version` into `running_version` needs a format change,
/// and if it does, return the required format change.
///
/// Also logs the kind of change at info level.
///
/// If `disk_version` is `None`, Zebra is creating a new database.
pub fn new(running_version: Version, disk_version: Option<Version>) -> Option<Self> {
let Some(disk_version) = disk_version else {
info!(
?running_version,
"creating new database with the current format"
);
return Some(NewlyCreated { running_version });
};
match disk_version.cmp(&running_version) {
Ordering::Less => {
info!(
?running_version,
?disk_version,
"trying to open older database format: launching upgrade task"
);
Some(Upgrade {
older_disk_version: disk_version,
newer_running_version: running_version,
})
}
Ordering::Greater => {
info!(
?running_version,
?disk_version,
"trying to open newer database format: data should be compatible"
);
Some(Downgrade {
newer_disk_version: disk_version,
older_running_version: running_version,
})
}
Ordering::Equal => {
info!(?running_version, "trying to open current database format");
None
}
}
}
/// Returns true if this change is an upgrade.
#[allow(dead_code)]
pub fn is_upgrade(&self) -> bool {
matches!(self, Upgrade { .. })
}
/// Launch a `std::thread` that applies this format change to the database.
///
/// `initial_tip_height` is the database height when it was opened, and `upgrade_db` is the
/// database instance to upgrade.
pub fn spawn_format_change(
self,
config: Config,
network: Network,
initial_tip_height: Option<Height>,
upgrade_db: ZebraDb,
) -> DbFormatChangeThreadHandle {
// # Correctness
//
// Cancel handles must use try_send() to avoid blocking waiting for the format change
// thread to shut down.
let (cancel_handle, cancel_receiver) = mpsc::sync_channel(1);
let span = Span::current();
let update_task = thread::spawn(move || {
span.in_scope(move || {
self.apply_format_change(
config,
network,
initial_tip_height,
upgrade_db,
cancel_receiver,
);
})
});
let mut handle = DbFormatChangeThreadHandle {
update_task: Some(Arc::new(update_task)),
cancel_handle,
};
handle.check_for_panics();
handle
}
/// Apply this format change to the database.
///
/// Format changes should be launched in an independent `std::thread`, which runs until the
/// upgrade is finished.
///
/// See `apply_format_upgrade()` for details.
fn apply_format_change(
self,
config: Config,
network: Network,
initial_tip_height: Option<Height>,
upgrade_db: ZebraDb,
cancel_receiver: mpsc::Receiver<CancelFormatChange>,
) {
match self {
// Handled in the rest of this function.
Upgrade { .. } => self.apply_format_upgrade(
config,
network,
initial_tip_height,
upgrade_db,
cancel_receiver,
),
NewlyCreated { .. } => {
Self::mark_as_newly_created(&config, network);
}
Downgrade { .. } => {
// # Correctness
//
// At the start of a format downgrade, the database must be marked as partially or
// fully downgraded. This lets newer Zebra versions know that some blocks with older
// formats have been added to the database.
Self::mark_as_downgraded(&config, network);
// Older supported versions just assume they can read newer formats,
// because they can't predict all changes a newer Zebra version could make.
//
// The responsibility of staying backwards-compatible is on the newer version.
// We do this on a best-effort basis for versions that are still supported.
}
}
}
/// Apply any required format updates to the database.
/// Format changes should be launched in an independent `std::thread`.
///
/// If `cancel_receiver` gets a message, or its sender is dropped,
/// the format change stops running early.
//
// New format upgrades must be added to the *end* of this method.
fn apply_format_upgrade(
self,
config: Config,
network: Network,
initial_tip_height: Option<Height>,
upgrade_db: ZebraDb,
cancel_receiver: mpsc::Receiver<CancelFormatChange>,
) {
let Upgrade {
newer_running_version,
older_disk_version,
} = self
else {
unreachable!("already checked for Upgrade")
};
// # New Upgrades Sometimes Go Here
//
// If the format change is outside RocksDb, put new code above this comment!
let Some(initial_tip_height) = initial_tip_height else {
// If the database is empty, then the RocksDb format doesn't need any changes.
info!(
?newer_running_version,
?older_disk_version,
"marking empty database as upgraded"
);
Self::mark_as_upgraded_to(&database_format_version_in_code(), &config, network);
info!(
?newer_running_version,
?older_disk_version,
"empty database is fully upgraded"
);
return;
};
// Example format change.
//
// TODO: link to format upgrade instructions doc here
// Check if we need to do this upgrade.
let database_format_add_format_change_task =
Version::parse("25.0.2").expect("version constant is valid");
if older_disk_version < database_format_add_format_change_task {
let mut upgrade_height = Height(0);
// Go through every height from genesis to the tip of the old version.
// If the state was downgraded, some heights might already be upgraded.
// (Since the upgraded format is added to the tip, the database can switch between
// lower and higher versions at any block.)
//
// Keep upgrading until the initial database has been upgraded,
// or this task is cancelled by a shutdown.
while upgrade_height <= initial_tip_height
&& matches!(cancel_receiver.try_recv(), Err(mpsc::TryRecvError::Empty))
{
// TODO: Do one format upgrade step here
//
// This fake step just shows how to access the database.
let _replace_me_ = upgrade_db.tip();
upgrade_height = (upgrade_height + 1).expect("task exits before maximum height");
}
}
// At the end of each format upgrade, the database is marked as upgraded to that version.
// Upgrades can be run more than once if Zebra is restarted, so this is just a performance
// optimisation.
info!(
?initial_tip_height,
?newer_running_version,
?older_disk_version,
"marking database as upgraded"
);
Self::mark_as_upgraded_to(&database_format_add_format_change_task, &config, network);
// End of example format change.
// # New Upgrades Usually Go Here
//
// New code goes above this comment!
//
// Run the latest format upgrade code after the other upgrades are complete,
// then mark the format as upgraded. The code should check `cancel_receiver`
// every time it runs its inner update loop.
info!(
?initial_tip_height,
?newer_running_version,
?older_disk_version,
"database is fully upgraded"
);
}
/// Mark a newly created database with the current format version.
///
/// This should be called when a newly created database is opened.
///
/// # Concurrency
///
/// The version must only be updated while RocksDB is holding the database
/// directory lock. This prevents multiple Zebra instances corrupting the version
/// file.
///
/// # Panics
///
/// If the format should not have been upgraded, because the database is not newly created.
fn mark_as_newly_created(config: &Config, network: Network) {
let disk_version = database_format_version_on_disk(config, network)
.expect("unable to read database format version file path");
let running_version = database_format_version_in_code();
assert_eq!(
disk_version, None,
"can't overwrite the format version in an existing database:\n\
disk: {disk_version:?}\n\
running: {running_version}"
);
write_database_format_version_to_disk(&running_version, config, network)
.expect("unable to write database format version file to disk");
info!(
?running_version,
?disk_version,
"marked database format as newly created"
);
}
/// Mark the database as upgraded to `format_upgrade_version`.
///
/// This should be called when an older database is opened by an older Zebra version,
/// after each version upgrade is complete.
///
/// # Concurrency
///
/// The version must only be updated while RocksDB is holding the database
/// directory lock. This prevents multiple Zebra instances corrupting the version
/// file.
///
/// # Panics
///
/// If the format should not have been upgraded, because the running version is:
/// - older than the disk version (that's a downgrade)
/// - the same as to the disk version (no upgrade needed)
///
/// If the format should not have been upgraded, because the format upgrade version is:
/// - older or the same as the disk version
/// (multiple upgrades to the same version are not allowed)
/// - greater than the running version (that's a logic bug)
fn mark_as_upgraded_to(format_upgrade_version: &Version, config: &Config, network: Network) {
let disk_version = database_format_version_on_disk(config, network)
.expect("unable to read database format version file")
.expect("tried to upgrade a newly created database");
let running_version = database_format_version_in_code();
assert!(
running_version > disk_version,
"can't upgrade a database that is being opened by an older or the same Zebra version:\n\
disk: {disk_version}\n\
upgrade: {format_upgrade_version}\n\
running: {running_version}"
);
assert!(
format_upgrade_version > &disk_version,
"can't upgrade a database that has already been upgraded, or is newer:\n\
disk: {disk_version}\n\
upgrade: {format_upgrade_version}\n\
running: {running_version}"
);
assert!(
format_upgrade_version <= &running_version,
"can't upgrade to a newer version than the running Zebra version:\n\
disk: {disk_version}\n\
upgrade: {format_upgrade_version}\n\
running: {running_version}"
);
write_database_format_version_to_disk(format_upgrade_version, config, network)
.expect("unable to write database format version file to disk");
info!(
?running_version,
?format_upgrade_version,
?disk_version,
"marked database format as upgraded"
);
}
/// Mark the database as downgraded to the running database version.
/// This should be called after a newer database is opened by an older Zebra version.
///
/// # Concurrency
///
/// The version must only be updated while RocksDB is holding the database
/// directory lock. This prevents multiple Zebra instances corrupting the version
/// file.
///
/// # Panics
///
/// If the format should have been upgraded, because the running version is newer.
/// If the state is newly created, because the running version should be the same.
///
/// Multiple downgrades are allowed, because they all downgrade to the same running version.
fn mark_as_downgraded(config: &Config, network: Network) {
let disk_version = database_format_version_on_disk(config, network)
.expect("unable to read database format version file")
.expect("can't downgrade a newly created database");
let running_version = database_format_version_in_code();
assert!(
disk_version >= running_version,
"can't downgrade a database that is being opened by a newer Zebra version:\n\
disk: {disk_version}\n\
running: {running_version}"
);
write_database_format_version_to_disk(&running_version, config, network)
.expect("unable to write database format version file to disk");
info!(
?running_version,
?disk_version,
"marked database format as downgraded"
);
}
}
impl DbFormatChangeThreadHandle {
/// Cancel the running format change thread, if this is the last handle.
/// Returns true if it was actually cancelled.
pub fn cancel_if_needed(&self) -> bool {
// # Correctness
//
// Checking the strong count has a race condition, because two handles can be dropped at
// the same time.
//
// If cancelling the thread is important, the owner of the handle must call force_cancel().
if let Some(update_task) = self.update_task.as_ref() {
if Arc::strong_count(update_task) <= 1 {
self.force_cancel();
return true;
}
}
false
}
/// Force the running format change thread to cancel, even if there are other handles.
pub fn force_cancel(&self) {
// There's nothing we can do about errors here.
// If the channel is disconnected, the task has exited.
// If it's full, it's already been cancelled.
let _ = self.cancel_handle.try_send(CancelFormatChange);
}
/// Check for panics in the code running in the spawned thread.
/// If the thread exited with a panic, resume that panic.
///
/// This method should be called regularly, so that panics are detected as soon as possible.
pub fn check_for_panics(&mut self) {
let update_task = self.update_task.take();
if let Some(update_task) = update_task {
if update_task.is_finished() {
// We use into_inner() because it guarantees that exactly one of the tasks
// gets the JoinHandle. try_unwrap() lets us keep the JoinHandle, but it can also
// miss panics.
if let Some(update_task) = Arc::into_inner(update_task) {
// We are the last handle with a reference to this task,
// so we can propagate any panics
if let Err(thread_panic) = update_task.join() {
panic::resume_unwind(thread_panic);
}
}
} else {
// It hasn't finished, so we need to put it back
self.update_task = Some(update_task);
}
}
}
/// Wait for the spawned thread to finish. If it exited with a panic, resume that panic.
///
/// This method should be called during shutdown.
pub fn wait_for_panics(&mut self) {
if let Some(update_task) = self.update_task.take() {
// We use into_inner() because it guarantees that exactly one of the tasks
// gets the JoinHandle. See the comments in check_for_panics().
if let Some(update_task) = Arc::into_inner(update_task) {
// We are the last handle with a reference to this task,
// so we can propagate any panics
if let Err(thread_panic) = update_task.join() {
panic::resume_unwind(thread_panic);
}
}
}
}
}
impl Drop for DbFormatChangeThreadHandle {
fn drop(&mut self) {
// Only cancel the format change if the state service is shutting down.
if self.cancel_if_needed() {
self.wait_for_panics();
} else {
self.check_for_panics();
}
}
}

View File

@ -14,7 +14,14 @@ use std::path::Path;
use zebra_chain::parameters::Network;
use crate::{
service::finalized_state::{disk_db::DiskDb, disk_format::block::MAX_ON_DISK_HEIGHT},
config::{database_format_version_in_code, database_format_version_on_disk},
service::finalized_state::{
disk_db::DiskDb,
disk_format::{
block::MAX_ON_DISK_HEIGHT,
upgrade::{DbFormatChange, DbFormatChangeThreadHandle},
},
},
Config,
};
@ -32,12 +39,20 @@ pub mod arbitrary;
/// `rocksdb` allows concurrent writes through a shared reference,
/// so database instances are cloneable. When the final clone is dropped,
/// the database is closed.
#[derive(Clone, Debug, Eq, PartialEq)]
#[derive(Clone, Debug)]
pub struct ZebraDb {
// Owned State
//
// Everything contained in this state must be shared by all clones, or read-only.
//
/// A handle to a running format change task, which cancels the task when dropped.
///
/// # Concurrency
///
/// This field should be dropped before the database field, so the format upgrade task is
/// cancelled before the database is dropped. This helps avoid some kinds of deadlocks.
format_change_handle: Option<DbFormatChangeThreadHandle>,
/// The inner low-level database wrapper for the RocksDB database.
db: DiskDb,
}
@ -46,12 +61,49 @@ impl ZebraDb {
/// Opens or creates the database at `config.path` for `network`,
/// and returns a shared high-level typed database wrapper.
pub fn new(config: &Config, network: Network) -> ZebraDb {
let db = ZebraDb {
let running_version = database_format_version_in_code();
let disk_version = database_format_version_on_disk(config, network)
.expect("unable to read database format version file");
// Log any format changes before opening the database, in case opening fails.
let format_change = DbFormatChange::new(running_version, disk_version);
// Open the database and do initial checks.
let mut db = ZebraDb {
format_change_handle: None,
db: DiskDb::new(config, network),
};
db.check_max_on_disk_tip_height();
// We have to get this height before we spawn the upgrade task, because threads can take
// a while to start, and new blocks can be committed as soon as we return from this method.
let initial_tip_height = db.finalized_tip_height();
// Start any required format changes.
//
// TODO: should debug_stop_at_height wait for these upgrades, or not?
if let Some(format_change) = format_change {
// Launch the format change and install its handle in the database.
//
// `upgrade_db` is a special clone of the database, which can't be used to shut down
// the upgrade task. (Because the task hasn't been launched yet,
// `db.format_change_handle` is always None.)
//
// It can be a FinalizedState if needed, or the FinalizedState methods needed for
// upgrades can be moved to ZebraDb.
let upgrade_db = db.clone();
let format_change_handle = format_change.spawn_format_change(
config.clone(),
network,
initial_tip_height,
upgrade_db,
);
db.format_change_handle = Some(format_change_handle);
}
db
}
@ -60,6 +112,19 @@ impl ZebraDb {
self.db.path()
}
/// Check for panics in code running in spawned threads.
/// If a thread exited with a panic, resume that panic.
///
/// This method should be called regularly, so that panics are detected as soon as possible.
pub fn check_for_panics(&mut self) {
if let Some(format_change_handle) = self.format_change_handle.as_mut() {
format_change_handle.check_for_panics();
}
// This check doesn't panic, but we want to check it regularly anyway.
self.check_max_on_disk_tip_height();
}
/// Shut down the database, cleaning up background tasks and ephemeral data.
///
/// If `force` is true, clean up regardless of any shared references.
@ -68,7 +133,19 @@ impl ZebraDb {
///
/// See [`DiskDb::shutdown`] for details.
pub fn shutdown(&mut self, force: bool) {
self.check_max_on_disk_tip_height();
// # Concurrency
//
// The format upgrade task should be cancelled before the database is flushed or shut down.
// This helps avoid some kinds of deadlocks.
//
// See also the correctness note in `DiskDb::shutdown()`.
if force || self.db.shared_database_owners() <= 1 {
if let Some(format_change_handle) = self.format_change_handle.as_mut() {
format_change_handle.force_cancel();
}
}
self.check_for_panics();
self.db.shutdown(force);
}
@ -93,3 +170,9 @@ impl ZebraDb {
}
}
}
impl Drop for ZebraDb {
fn drop(&mut self) {
self.shutdown(false);
}
}

View File

@ -540,7 +540,9 @@ impl<T> TestChild<T> {
// Read unread child output.
//
// This checks for failure logs, and prevents some test hangs and deadlocks.
if self.child.is_some() || self.stdout.is_some() {
//
// TODO: this could block if stderr is full and stdout is waiting for stderr to be read.
if self.stdout.is_some() {
let wrote_lines =
self.wait_for_stdout_line(format!("\n{} Child Stdout:", self.command_path));
@ -552,7 +554,7 @@ impl<T> TestChild<T> {
}
}
if self.child.is_some() || self.stderr.is_some() {
if self.stderr.is_some() {
let wrote_lines =
self.wait_for_stderr_line(format!("\n{} Child Stderr:", self.command_path));
@ -566,6 +568,56 @@ impl<T> TestChild<T> {
kill_result
}
/// Kill the process, and return all its remaining standard output and standard error output.
///
/// If `ignore_exited` is `true`, log "can't kill an exited process" errors,
/// rather than returning them.
///
/// Returns `Ok(output)`, or an error if the kill failed.
pub fn kill_and_return_output(&mut self, ignore_exited: bool) -> Result<String> {
self.apply_failure_regexes_to_outputs();
// Prevent a hang when consuming output,
// by making sure the child's output actually finishes.
let kill_result = self.kill(ignore_exited);
// Read unread child output.
let mut stdout_buf = String::new();
let mut stderr_buf = String::new();
// This also checks for failure logs, and prevents some test hangs and deadlocks.
loop {
let mut remaining_output = false;
if let Some(stdout) = self.stdout.as_mut() {
if let Some(line) =
Self::wait_and_return_output_line(stdout, self.bypass_test_capture)
{
stdout_buf.push_str(&line);
remaining_output = true;
}
}
if let Some(stderr) = self.stderr.as_mut() {
if let Some(line) =
Self::wait_and_return_output_line(stderr, self.bypass_test_capture)
{
stderr_buf.push_str(&line);
remaining_output = true;
}
}
if !remaining_output {
break;
}
}
let mut output = stdout_buf;
output.push_str(&stderr_buf);
kill_result.map(|()| output)
}
/// Waits until a line of standard output is available, then consumes it.
///
/// If there is a line, and `write_context` is `Some`, writes the context to the test logs.
@ -632,15 +684,40 @@ impl<T> TestChild<T> {
false
}
/// Waits until a line of `output` is available, then returns it.
///
/// If there is a line, and `write_context` is `Some`, writes the context to the test logs.
/// Always writes the line to the test logs.
///
/// Returns `true` if a line was available,
/// or `false` if the standard output has finished.
#[allow(clippy::unwrap_in_result)]
fn wait_and_return_output_line(
mut output: impl Iterator<Item = std::io::Result<String>>,
bypass_test_capture: bool,
) -> Option<String> {
if let Some(line_result) = output.next() {
let line_result = line_result.expect("failure reading test process logs");
Self::write_to_test_logs(&line_result, bypass_test_capture);
return Some(line_result);
}
None
}
/// Waits for the child process to exit, then returns its output.
///
/// # Correctness
///
/// The other test child output methods take one or both outputs,
/// making them unavailable to this method.
///
/// Ignores any configured timeouts.
///
/// Returns an error if the child has already been taken,
/// or both outputs have already been taken.
/// Returns an error if the child has already been taken.
/// TODO: return an error if both outputs have already been taken.
#[spandoc::spandoc]
pub fn wait_with_output(mut self) -> Result<TestOutput<T>> {
let child = match self.child.take() {
@ -708,6 +785,8 @@ impl<T> TestChild<T> {
///
/// Kills the child on error, or after the configured timeout has elapsed.
/// See [`Self::expect_line_matching_regex_set`] for details.
//
// TODO: these methods could block if stderr is full and stdout is waiting for stderr to be read
#[instrument(skip(self))]
#[allow(clippy::unwrap_in_result)]
pub fn expect_stdout_line_matches<R>(&mut self, success_regex: R) -> Result<String>
@ -1293,6 +1372,11 @@ impl<T> TestOutput<T> {
fn was_killed(&self) -> bool {
self.output.status.signal() == Some(9)
}
/// Takes the generic `dir` parameter out of this `TestOutput`.
pub fn take_dir(&mut self) -> Option<T> {
self.dir.take()
}
}
/// Add context to an error report

View File

@ -12,7 +12,9 @@ use abscissa_core::{
use semver::{BuildMetadata, Version};
use zebra_network::constants::PORT_IN_USE_ERROR;
use zebra_state::constants::{DATABASE_FORMAT_VERSION, LOCK_FILE_ERROR};
use zebra_state::{
constants::LOCK_FILE_ERROR, database_format_version_in_code, database_format_version_on_disk,
};
use crate::{
commands::EntryPoint,
@ -260,13 +262,32 @@ impl Application for ZebradApp {
// collect the common metadata for the issue URL and panic report,
// skipping any env vars that aren't present
// reads state disk version file, doesn't open RocksDB database
let disk_db_version =
match database_format_version_on_disk(&config.state, config.network.network) {
Ok(Some(version)) => version.to_string(),
// This "version" is specially formatted to match a relaxed version regex in CI
Ok(None) => "creating.new.database".to_string(),
Err(error) => {
let mut error = format!("error: {error:?}");
error.truncate(100);
error
}
};
let app_metadata = vec![
// cargo or git tag + short commit
// build-time constant: cargo or git tag + short commit
("version", build_version().to_string()),
// config
("Zcash network", config.network.network.to_string()),
// constants
("state version", DATABASE_FORMAT_VERSION.to_string()),
// code constant
(
"running state version",
database_format_version_in_code().to_string(),
),
// state disk file, doesn't open database
("initial disk state version", disk_db_version),
// build-time constant
("features", env!("VERGEN_CARGO_FEATURES").to_string()),
];

View File

@ -136,6 +136,7 @@
//! ```
use std::{
cmp::Ordering,
collections::HashSet,
env, fs, panic,
path::PathBuf,
@ -146,6 +147,7 @@ use color_eyre::{
eyre::{eyre, Result, WrapErr},
Help,
};
use semver::Version;
use zebra_chain::{
block::{self, Height},
@ -153,7 +155,7 @@ use zebra_chain::{
};
use zebra_network::constants::PORT_IN_USE_ERROR;
use zebra_node_services::rpc_client::RpcRequestClient;
use zebra_state::constants::LOCK_FILE_ERROR;
use zebra_state::{constants::LOCK_FILE_ERROR, database_format_version_in_code};
use zebra_test::{args, command::ContextFrom, net::random_known_port, prelude::*};
@ -166,8 +168,8 @@ use common::{
config_file_full_path, configs_dir, default_test_config, persistent_test_config, testdir,
},
launch::{
spawn_zebrad_for_rpc, ZebradTestDirExt, BETWEEN_NODES_DELAY, EXTENDED_LAUNCH_DELAY,
LAUNCH_DELAY,
spawn_zebrad_for_rpc, spawn_zebrad_without_rpc, ZebradTestDirExt, BETWEEN_NODES_DELAY,
EXTENDED_LAUNCH_DELAY, LAUNCH_DELAY,
},
lightwalletd::{can_spawn_lightwalletd_for_rpc, spawn_lightwalletd_for_rpc},
sync::{
@ -2089,7 +2091,7 @@ fn zebra_state_conflict() -> Result<()> {
dir_conflict_full.push("state");
dir_conflict_full.push(format!(
"v{}",
zebra_state::constants::DATABASE_FORMAT_VERSION
zebra_state::database_format_version_in_code().major,
));
dir_conflict_full.push(config.network.network.to_string().to_lowercase());
format!(
@ -2381,6 +2383,7 @@ fn end_of_support_is_checked_at_start() -> Result<()> {
Ok(())
}
/// Test `zebra-checkpoints` on mainnet.
///
/// If you want to run this test individually, see the module documentation.
@ -2403,3 +2406,199 @@ async fn generate_checkpoints_mainnet() -> Result<()> {
async fn generate_checkpoints_testnet() -> Result<()> {
common::checkpoints::run(Testnet).await
}
/// Check that new states are created with the current state format version,
/// and that restarting `zebrad` doesn't change the format version.
#[tokio::test]
async fn new_state_format() -> Result<()> {
for network in [Mainnet, Testnet] {
state_format_test("new_state_format_test", network, 2, None).await?;
}
Ok(())
}
/// Check that outdated states are updated to the current state format version,
/// and that restarting `zebrad` doesn't change the updated format version.
///
/// TODO: test partial updates, once we have some updates that take a while.
/// (or just add a delay during tests)
#[tokio::test]
async fn update_state_format() -> Result<()> {
let mut fake_version = database_format_version_in_code();
fake_version.minor = 0;
fake_version.patch = 0;
for network in [Mainnet, Testnet] {
state_format_test("update_state_format_test", network, 3, Some(&fake_version)).await?;
}
Ok(())
}
/// Check that newer state formats are downgraded to the current state format version,
/// and that restarting `zebrad` doesn't change the format version.
///
/// Future version compatibility is a best-effort attempt, this test can be disabled if it fails.
#[tokio::test]
async fn downgrade_state_format() -> Result<()> {
let mut fake_version = database_format_version_in_code();
fake_version.minor = u16::MAX.into();
fake_version.patch = 0;
for network in [Mainnet, Testnet] {
state_format_test(
"downgrade_state_format_test",
network,
3,
Some(&fake_version),
)
.await?;
}
Ok(())
}
/// Test state format changes, see calling tests for details.
async fn state_format_test(
base_test_name: &str,
network: Network,
reopen_count: usize,
fake_version: Option<&Version>,
) -> Result<()> {
let _init_guard = zebra_test::init();
let test_name = &format!("{base_test_name}/new");
// # Create a new state and check it has the current version
let zebrad = spawn_zebrad_without_rpc(network, test_name, false, false, None, false)?;
// Skip the test unless it has the required state and environmental variables.
let Some(mut zebrad) = zebrad else {
return Ok(());
};
tracing::info!(?network, "running {test_name} using zebrad");
zebrad.expect_stdout_line_matches("creating new database with the current format")?;
zebrad.expect_stdout_line_matches("loaded Zebra state cache")?;
// Give Zebra enough time to actually write the database to disk.
tokio::time::sleep(Duration::from_secs(1)).await;
let logs = zebrad.kill_and_return_output(false)?;
assert!(
!logs.contains("marked database format as upgraded"),
"unexpected format upgrade in logs:\n\
{logs}"
);
assert!(
!logs.contains("marked database format as downgraded"),
"unexpected format downgrade in logs:\n\
{logs}"
);
let output = zebrad.wait_with_output()?;
let mut output = output.assert_failure()?;
let mut dir = output
.take_dir()
.expect("dir should not already have been taken");
// [Note on port conflict](#Note on port conflict)
output
.assert_was_killed()
.wrap_err("Possible port conflict. Are there other acceptance tests running?")?;
// # Apply the fake version if needed
let mut expect_older_version = false;
let mut expect_newer_version = false;
if let Some(fake_version) = fake_version {
let test_name = &format!("{base_test_name}/apply_fake_version/{fake_version}");
tracing::info!(?network, "running {test_name} using zebra-state");
let mut config = UseAnyState
.zebrad_config(test_name, false, Some(dir.path()))
.expect("already checked config")?;
config.network.network = network;
zebra_state::write_database_format_version_to_disk(fake_version, &config.state, network)
.expect("can't write fake database version to disk");
// Give zebra_state enough time to actually write the database version to disk.
tokio::time::sleep(Duration::from_secs(1)).await;
let running_version = database_format_version_in_code();
match fake_version.cmp(&running_version) {
Ordering::Less => expect_older_version = true,
Ordering::Equal => {}
Ordering::Greater => expect_newer_version = true,
}
}
// # Reopen that state and check the version hasn't changed
for reopened in 0..reopen_count {
let test_name = &format!("{base_test_name}/reopen/{reopened}");
if reopened > 0 {
expect_older_version = false;
expect_newer_version = false;
}
let mut zebrad = spawn_zebrad_without_rpc(network, test_name, false, false, dir, false)?
.expect("unexpectedly missing required state or env vars");
tracing::info!(?network, "running {test_name} using zebrad");
if expect_older_version {
zebrad.expect_stdout_line_matches("trying to open older database format")?;
zebrad.expect_stdout_line_matches("marked database format as upgraded")?;
zebrad.expect_stdout_line_matches("database is fully upgraded")?;
} else if expect_newer_version {
zebrad.expect_stdout_line_matches("trying to open newer database format")?;
zebrad.expect_stdout_line_matches("marked database format as downgraded")?;
} else {
zebrad.expect_stdout_line_matches("trying to open current database format")?;
zebrad.expect_stdout_line_matches("loaded Zebra state cache")?;
}
// Give Zebra enough time to actually write the database to disk.
tokio::time::sleep(Duration::from_secs(1)).await;
let logs = zebrad.kill_and_return_output(false)?;
if !expect_older_version {
assert!(
!logs.contains("marked database format as upgraded"),
"unexpected format upgrade in logs:\n\
{logs}"
);
}
if !expect_newer_version {
assert!(
!logs.contains("marked database format as downgraded"),
"unexpected format downgrade in logs:\n\
{logs}"
);
}
let output = zebrad.wait_with_output()?;
let mut output = output.assert_failure()?;
dir = output
.take_dir()
.expect("dir should not already have been taken");
// [Note on port conflict](#Note on port conflict)
output
.assert_was_killed()
.wrap_err("Possible port conflict. Are there other acceptance tests running?")?;
}
Ok(())
}

View File

@ -11,8 +11,6 @@ use std::{
};
use color_eyre::eyre::{eyre, Result};
use tempfile::TempDir;
use tokio::fs;
use tower::{util::BoxService, Service};
use zebra_chain::{
@ -25,7 +23,6 @@ use zebra_node_services::rpc_client::RpcRequestClient;
use zebra_state::{ChainTipChange, LatestChainTip, MAX_BLOCK_REORG_HEIGHT};
use crate::common::{
config::testdir,
launch::spawn_zebrad_for_rpc,
sync::{check_sync_logs_until, MempoolBehavior, SYNC_FINISHED_REGEX},
test_type::TestType,
@ -78,83 +75,6 @@ pub async fn load_tip_height_from_state_directory(
Ok(chain_tip_height)
}
/// Recursively copy a chain state database directory into a new temporary directory.
pub async fn copy_state_directory(network: Network, source: impl AsRef<Path>) -> Result<TempDir> {
// Copy the database files for this state and network, excluding testnet and other state versions
let source = source.as_ref();
let state_config = zebra_state::Config {
cache_dir: source.into(),
..Default::default()
};
let source_net_dir = state_config.db_path(network);
let source_net_dir = source_net_dir.as_path();
let state_suffix = source_net_dir
.strip_prefix(source)
.expect("db_path() is a subdirectory");
let destination = testdir()?;
let destination_net_dir = destination.path().join(state_suffix);
tracing::info!(
?source,
?source_net_dir,
?state_suffix,
?destination,
?destination_net_dir,
"copying cached state files (this may take some time)...",
);
let mut remaining_directories = vec![PathBuf::from(source_net_dir)];
while let Some(directory) = remaining_directories.pop() {
let sub_directories =
copy_directory(&directory, source_net_dir, destination_net_dir.as_ref()).await?;
remaining_directories.extend(sub_directories);
}
Ok(destination)
}
/// Copy the contents of a directory, and return the sub-directories it contains.
///
/// Copies all files from the `directory` into the destination specified by the concatenation of
/// the `base_destination_path` and `directory` stripped of its `prefix`.
#[tracing::instrument]
async fn copy_directory(
directory: &Path,
prefix: &Path,
base_destination_path: &Path,
) -> Result<Vec<PathBuf>> {
let mut sub_directories = Vec::new();
let mut entries = fs::read_dir(directory).await?;
let destination =
base_destination_path.join(directory.strip_prefix(prefix).expect("Invalid path prefix"));
fs::create_dir_all(&destination).await?;
while let Some(entry) = entries.next_entry().await? {
let entry_path = entry.path();
let file_type = entry.file_type().await?;
if file_type.is_file() {
let file_name = entry_path.file_name().expect("Missing file name");
let destination_path = destination.join(file_name);
fs::copy(&entry_path, destination_path).await?;
} else if file_type.is_dir() {
sub_directories.push(entry_path);
} else if file_type.is_symlink() {
unimplemented!("Symbolic link support is currently not necessary");
} else {
panic!("Unknown file type");
}
}
Ok(sub_directories)
}
/// Accepts a network, test_type, test_name, and num_blocks (how many blocks past the finalized tip to try getting)
///
/// Syncs zebra until the tip, gets some blocks near the tip, via getblock rpc calls,

View File

@ -21,7 +21,7 @@ use zebra_rpc::methods::get_block_template_rpcs::{
};
use crate::common::{
launch::{can_spawn_zebrad_for_rpc, spawn_zebrad_for_rpc},
launch::{can_spawn_zebrad_for_test_type, spawn_zebrad_for_rpc},
sync::{check_sync_logs_until, MempoolBehavior, SYNC_FINISHED_REGEX},
test_type::TestType,
};
@ -66,7 +66,7 @@ pub(crate) async fn run() -> Result<()> {
let network = Network::Mainnet;
// Skip the test unless the user specifically asked for it and there is a zebrad_state_path
if !can_spawn_zebrad_for_rpc(test_name, test_type) {
if !can_spawn_zebrad_for_test_type(test_name, test_type, true) {
return Ok(());
}

View File

@ -7,7 +7,7 @@ use zebra_node_services::rpc_client::RpcRequestClient;
use zebra_rpc::methods::get_block_template_rpcs::types::peer_info::PeerInfo;
use crate::common::{
launch::{can_spawn_zebrad_for_rpc, spawn_zebrad_for_rpc},
launch::{can_spawn_zebrad_for_test_type, spawn_zebrad_for_rpc},
test_type::TestType,
};
@ -21,7 +21,7 @@ pub(crate) async fn run() -> Result<()> {
let network = Network::Mainnet;
// Skip the test unless the user specifically asked for it and there is a zebrad_state_path
if !can_spawn_zebrad_for_rpc(test_name, test_type) {
if !can_spawn_zebrad_for_test_type(test_name, test_type, true) {
return Ok(());
}
@ -29,7 +29,7 @@ pub(crate) async fn run() -> Result<()> {
let (mut zebrad, zebra_rpc_address) =
spawn_zebrad_for_rpc(network, test_name, test_type, true)?
.expect("Already checked zebra state path with can_spawn_zebrad_for_rpc");
.expect("Already checked zebra state path with can_spawn_zebrad_for_test_type");
let rpc_address = zebra_rpc_address.expect("getpeerinfo test must have RPC port");

View File

@ -15,7 +15,7 @@ use zebra_node_services::rpc_client::RpcRequestClient;
use crate::common::{
cached_state::get_raw_future_blocks,
launch::{can_spawn_zebrad_for_rpc, spawn_zebrad_for_rpc},
launch::{can_spawn_zebrad_for_test_type, spawn_zebrad_for_rpc},
test_type::TestType,
};
@ -31,7 +31,7 @@ pub(crate) async fn run() -> Result<()> {
let network = Network::Mainnet;
// Skip the test unless the user specifically asked for it and there is a zebrad_state_path
if !can_spawn_zebrad_for_rpc(test_name, test_type) {
if !can_spawn_zebrad_for_test_type(test_name, test_type, true) {
return Ok(());
}
@ -50,7 +50,7 @@ pub(crate) async fn run() -> Result<()> {
let should_sync = false;
let (mut zebrad, zebra_rpc_address) =
spawn_zebrad_for_rpc(network, test_name, test_type, should_sync)?
.expect("Already checked zebra state path with can_spawn_zebrad_for_rpc");
.expect("Already checked zebra state path with can_spawn_zebrad_for_test_type");
let rpc_address = zebra_rpc_address.expect("submitblock test must have RPC port");

View File

@ -7,13 +7,13 @@
use std::{
env,
fmt::Debug,
net::SocketAddr,
path::{Path, PathBuf},
time::Duration,
};
use color_eyre::eyre::Result;
use indexmap::IndexSet;
use tempfile::TempDir;
use zebra_chain::parameters::Network;
@ -221,7 +221,7 @@ where
///
/// `zebra_rpc_address` is `None` if the test type doesn't need an RPC port.
#[tracing::instrument]
pub fn spawn_zebrad_for_rpc<S: AsRef<str> + std::fmt::Debug>(
pub fn spawn_zebrad_for_rpc<S: AsRef<str> + Debug>(
network: Network,
test_name: S,
test_type: TestType,
@ -230,25 +230,16 @@ pub fn spawn_zebrad_for_rpc<S: AsRef<str> + std::fmt::Debug>(
let test_name = test_name.as_ref();
// Skip the test unless the user specifically asked for it
if !can_spawn_zebrad_for_rpc(test_name, test_type) {
if !can_spawn_zebrad_for_test_type(test_name, test_type, use_internet_connection) {
return Ok(None);
}
// Get the zebrad config
let mut config = test_type
.zebrad_config(test_name)
.zebrad_config(test_name, use_internet_connection, None)
.expect("already checked config")?;
// TODO: move this into zebrad_config()
config.network.network = network;
if !use_internet_connection {
config.network.initial_mainnet_peers = IndexSet::new();
config.network.initial_testnet_peers = IndexSet::new();
// Avoid re-using cached peers from disk when we're supposed to be a disconnected instance
config.network.cache_dir = CacheDir::disabled();
config.mempool.debug_enable_at_height = Some(0);
}
let (zebrad_failure_messages, zebrad_ignore_messages) = test_type.zebrad_failure_messages();
@ -264,13 +255,90 @@ pub fn spawn_zebrad_for_rpc<S: AsRef<str> + std::fmt::Debug>(
Ok(Some((zebrad, config.rpc.listen_addr)))
}
/// Spawns a zebrad instance on `network` without RPCs or `lightwalletd`.
///
/// If `use_cached_state` is `true`, then update the cached state to the tip.
/// If `ephemeral` is `true`, then use an ephemeral state path.
/// If `reuse_state_path` is `Some(path)`, then use the state at that path, and take ownership of
/// the temporary directory, so it isn't deleted until the test ends.
/// Otherwise, just create an empty state in this test's new temporary directory.
///
/// If `use_internet_connection` is `false` then spawn, but without any peers.
/// This prevents it from downloading blocks. Instead, use the `ZEBRA_CACHED_STATE_DIR`
/// environmental variable to provide an initial state to the zebrad instance.
///
/// Returns:
/// - `Ok(Some(zebrad))` on success,
/// - `Ok(None)` if the test doesn't have the required network or cached state, and
/// - `Err(_)` if spawning zebrad fails.
#[tracing::instrument]
pub fn spawn_zebrad_without_rpc<Str, Dir>(
network: Network,
test_name: Str,
use_cached_state: bool,
ephemeral: bool,
reuse_state_path: Dir,
use_internet_connection: bool,
) -> Result<Option<TestChild<TempDir>>>
where
Str: AsRef<str> + Debug,
Dir: Into<Option<TempDir>> + Debug,
{
use TestType::*;
let test_name = test_name.as_ref();
let reuse_state_path = reuse_state_path.into();
let testdir = reuse_state_path
.unwrap_or_else(|| testdir().expect("failed to create test temporary directory"));
let (test_type, replace_cache_dir) = if use_cached_state {
(UpdateZebraCachedStateNoRpc, None)
} else if ephemeral {
(
LaunchWithEmptyState {
launches_lightwalletd: false,
},
None,
)
} else {
(UseAnyState, Some(testdir.path()))
};
// Skip the test unless the user specifically asked for it
if !can_spawn_zebrad_for_test_type(test_name, test_type, use_internet_connection) {
return Ok(None);
}
// Get the zebrad config
let mut config = test_type
.zebrad_config(test_name, use_internet_connection, replace_cache_dir)
.expect("already checked config")?;
config.network.network = network;
let (zebrad_failure_messages, zebrad_ignore_messages) = test_type.zebrad_failure_messages();
// Writes a configuration that does not have RPC listen_addr set.
// If the state path env var is set, uses it in the config.
let zebrad = testdir
.with_exact_config(&config)?
.spawn_child(args!["start"])?
.bypass_test_capture(true)
.with_timeout(test_type.zebrad_timeout())
.with_failure_regex_iter(zebrad_failure_messages, zebrad_ignore_messages);
Ok(Some(zebrad))
}
/// Returns `true` if a zebrad test for `test_type` has everything it needs to run.
#[tracing::instrument]
pub fn can_spawn_zebrad_for_rpc<S: AsRef<str> + std::fmt::Debug>(
pub fn can_spawn_zebrad_for_test_type<S: AsRef<str> + Debug>(
test_name: S,
test_type: TestType,
use_internet_connection: bool,
) -> bool {
if zebra_test::net::zebra_skip_network_tests() {
if use_internet_connection && zebra_test::net::zebra_skip_network_tests() {
return false;
}
@ -281,8 +349,9 @@ pub fn can_spawn_zebrad_for_rpc<S: AsRef<str> + std::fmt::Debug>(
return false;
}
// Check if we have any necessary cached states for the zebrad config
test_type.zebrad_config(test_name).is_some()
// Check if we have any necessary cached states for the zebrad config.
// The cache_dir value doesn't matter here.
test_type.zebrad_config(test_name, true, None).is_some()
}
/// Panics if `$pred` is false, with an error report containing:

View File

@ -30,7 +30,7 @@ use zebrad::components::mempool::downloads::MAX_INBOUND_CONCURRENCY;
use crate::common::{
cached_state::get_future_blocks,
launch::{can_spawn_zebrad_for_rpc, spawn_zebrad_for_rpc},
launch::{can_spawn_zebrad_for_test_type, spawn_zebrad_for_rpc},
lightwalletd::{
can_spawn_lightwalletd_for_rpc, spawn_lightwalletd_for_rpc,
sync::wait_for_zebrad_and_lightwalletd_sync,
@ -62,7 +62,7 @@ pub async fn run() -> Result<()> {
let network = Mainnet;
// Skip the test unless the user specifically asked for it
if !can_spawn_zebrad_for_rpc(test_name, test_type) {
if !can_spawn_zebrad_for_test_type(test_name, test_type, true) {
return Ok(());
}

View File

@ -5,10 +5,7 @@
//! Test functions in this file will not be run.
//! This file is only for test library code.
use std::{
path::{Path, PathBuf},
time::Duration,
};
use std::{path::PathBuf, time::Duration};
use color_eyre::eyre::Result;
use tempfile::TempDir;
@ -19,7 +16,6 @@ use zebrad::{components::sync, config::ZebradConfig};
use zebra_test::{args, prelude::*};
use super::{
cached_state::copy_state_directory,
config::{persistent_test_config, testdir},
launch::ZebradTestDirExt,
};
@ -341,30 +337,6 @@ pub fn check_sync_logs_until(
Ok(zebrad)
}
/// Runs a zebrad instance to synchronize the chain to the network tip.
///
/// The zebrad instance is executed on a copy of the partially synchronized chain state. This copy
/// is returned afterwards, containing the fully synchronized chain state.
#[allow(dead_code)]
#[tracing::instrument]
pub async fn copy_state_and_perform_full_sync(
network: Network,
partial_sync_path: &Path,
) -> Result<TempDir> {
let fully_synced_path = copy_state_directory(network, &partial_sync_path).await?;
sync_until(
Height::MAX,
network,
SYNC_FINISHED_REGEX,
FINISH_PARTIAL_SYNC_TIMEOUT,
fully_synced_path,
MempoolBehavior::ShouldAutomaticallyActivate,
true,
false,
)
}
/// Returns a test config for caching Zebra's state up to the mandatory checkpoint.
pub fn cached_mandatory_checkpoint_test_config() -> Result<ZebradConfig> {
let mut config = persistent_test_config()?;

View File

@ -1,7 +1,14 @@
//! Provides TestType enum with shared code for acceptance tests
use std::{env, path::PathBuf, time::Duration};
use std::{
env,
path::{Path, PathBuf},
time::Duration,
};
use indexmap::IndexSet;
use zebra_network::CacheDir;
use zebra_test::{command::NO_MATCHES_REGEX_ITER, prelude::*};
use zebrad::config::ZebradConfig;
@ -41,6 +48,9 @@ pub enum TestType {
allow_lightwalletd_cached_state: bool,
},
/// Launch with a Zebra and lightwalletd state that might or might not be empty.
UseAnyState,
/// Sync to tip from a lightwalletd cached state.
///
/// This test requires a cached Zebra and lightwalletd state.
@ -69,7 +79,7 @@ impl TestType {
// - FullSyncFromGenesis, UpdateCachedState, UpdateZebraCachedStateNoRpc:
// skip the test if it is not available
match self {
LaunchWithEmptyState { .. } => false,
LaunchWithEmptyState { .. } | UseAnyState => false,
FullSyncFromGenesis { .. }
| UpdateCachedState
| UpdateZebraCachedStateNoRpc
@ -81,16 +91,17 @@ impl TestType {
pub fn needs_zebra_rpc_server(&self) -> bool {
match self {
UpdateZebraCachedStateWithRpc | LaunchWithEmptyState { .. } => true,
UpdateZebraCachedStateNoRpc | FullSyncFromGenesis { .. } | UpdateCachedState => {
self.launches_lightwalletd()
}
UseAnyState
| UpdateZebraCachedStateNoRpc
| FullSyncFromGenesis { .. }
| UpdateCachedState => self.launches_lightwalletd(),
}
}
/// Does this test launch `lightwalletd`?
pub fn launches_lightwalletd(&self) -> bool {
match self {
UpdateZebraCachedStateNoRpc | UpdateZebraCachedStateWithRpc => false,
UseAnyState | UpdateZebraCachedStateNoRpc | UpdateZebraCachedStateWithRpc => false,
FullSyncFromGenesis { .. } | UpdateCachedState => true,
LaunchWithEmptyState {
launches_lightwalletd,
@ -106,6 +117,7 @@ impl TestType {
// - UpdateCachedState: skip the test if it is not available
match self {
LaunchWithEmptyState { .. }
| UseAnyState
| FullSyncFromGenesis { .. }
| UpdateZebraCachedStateNoRpc
| UpdateZebraCachedStateWithRpc => false,
@ -120,14 +132,17 @@ impl TestType {
FullSyncFromGenesis {
allow_lightwalletd_cached_state,
} => *allow_lightwalletd_cached_state,
UpdateCachedState | UpdateZebraCachedStateNoRpc | UpdateZebraCachedStateWithRpc => true,
UseAnyState
| UpdateCachedState
| UpdateZebraCachedStateNoRpc
| UpdateZebraCachedStateWithRpc => true,
}
}
/// Can this test create a new `LIGHTWALLETD_DATA_DIR` cached state?
pub fn can_create_lightwalletd_cached_state(&self) -> bool {
match self {
LaunchWithEmptyState { .. } => false,
LaunchWithEmptyState { .. } | UseAnyState => false,
FullSyncFromGenesis { .. } | UpdateCachedState => true,
UpdateZebraCachedStateNoRpc | UpdateZebraCachedStateWithRpc => false,
}
@ -152,9 +167,16 @@ impl TestType {
/// Returns a Zebra config for this test.
///
/// `replace_cache_dir` replaces any cached or ephemeral state.
///
/// Returns `None` if the test should be skipped,
/// and `Some(Err(_))` if the config could not be created.
pub fn zebrad_config<S: AsRef<str>>(&self, test_name: S) -> Option<Result<ZebradConfig>> {
pub fn zebrad_config<Str: AsRef<str>>(
&self,
test_name: Str,
use_internet_connection: bool,
replace_cache_dir: Option<&Path>,
) -> Option<Result<ZebradConfig>> {
let config = if self.needs_zebra_rpc_server() {
// This is what we recommend our users configure.
random_known_rpc_port_config(true)
@ -177,22 +199,35 @@ impl TestType {
config.rpc.parallel_cpu_threads = 0;
}
if !self.needs_zebra_cached_state() {
return Some(Ok(config));
if !use_internet_connection {
config.network.initial_mainnet_peers = IndexSet::new();
config.network.initial_testnet_peers = IndexSet::new();
// Avoid re-using cached peers from disk when we're supposed to be a disconnected instance
config.network.cache_dir = CacheDir::disabled();
// Activate the mempool immediately by default
config.mempool.debug_enable_at_height = Some(0);
}
// Add a fake miner address for mining RPCs
#[cfg(feature = "getblocktemplate-rpcs")]
let _ = config.mining.miner_address.insert(
zebra_chain::transparent::Address::from_script_hash(config.network.network, [0x7e; 20]),
);
let zebra_state_path = self.zebrad_state_path(test_name)?;
// If we have a cached state, or we don't want to be ephemeral, update the config to use it
if replace_cache_dir.is_some() || self.needs_zebra_cached_state() {
let zebra_state_path = replace_cache_dir
.map(|path| path.to_owned())
.or_else(|| self.zebrad_state_path(test_name))?;
config.sync.checkpoint_verify_concurrency_limit =
zebrad::components::sync::DEFAULT_CHECKPOINT_CONCURRENCY_LIMIT;
config.state.ephemeral = false;
config.state.cache_dir = zebra_state_path;
config.state.ephemeral = false;
config.state.cache_dir = zebra_state_path;
// And reset the concurrency to the default value
config.sync.checkpoint_verify_concurrency_limit =
zebrad::components::sync::DEFAULT_CHECKPOINT_CONCURRENCY_LIMIT;
}
Some(Ok(config))
}
@ -237,7 +272,7 @@ impl TestType {
/// Returns the `zebrad` timeout for this test type.
pub fn zebrad_timeout(&self) -> Duration {
match self {
LaunchWithEmptyState { .. } => LIGHTWALLETD_DELAY,
LaunchWithEmptyState { .. } | UseAnyState => LIGHTWALLETD_DELAY,
FullSyncFromGenesis { .. } => LIGHTWALLETD_FULL_SYNC_TIP_DELAY,
UpdateCachedState | UpdateZebraCachedStateNoRpc => LIGHTWALLETD_UPDATE_TIP_DELAY,
UpdateZebraCachedStateWithRpc => FINISH_PARTIAL_SYNC_TIMEOUT,
@ -254,7 +289,7 @@ impl TestType {
// We use the same timeouts for zebrad and lightwalletd,
// because the tests check zebrad and lightwalletd concurrently.
match self {
LaunchWithEmptyState { .. } => LIGHTWALLETD_DELAY,
LaunchWithEmptyState { .. } | UseAnyState => LIGHTWALLETD_DELAY,
FullSyncFromGenesis { .. } => LIGHTWALLETD_FULL_SYNC_TIP_DELAY,
UpdateCachedState | UpdateZebraCachedStateNoRpc | UpdateZebraCachedStateWithRpc => {
LIGHTWALLETD_UPDATE_TIP_DELAY