From 3b75e912d1a64d3a61f26959f83598e2cda5dd75 Mon Sep 17 00:00:00 2001 From: teor Date: Wed, 22 Dec 2021 12:07:52 +1000 Subject: [PATCH] Add a copy-state zebrad command, which copies blocks between two state services (#3175) * Add a copy-state command, which copies blocks between two state services * Check blocks were written correctly * Add extra logging to debug shutdown * Add a block height limit argument * Let the target state start from any height Co-authored-by: Deirdre Connolly --- zebrad/src/commands.rs | 14 +- zebrad/src/commands/copy_state.rs | 437 ++++++++++++++++++++++++++++++ zebrad/src/commands/start.rs | 6 +- zebrad/src/components/tokio.rs | 6 +- 4 files changed, 457 insertions(+), 6 deletions(-) create mode 100644 zebrad/src/commands/copy_state.rs diff --git a/zebrad/src/commands.rs b/zebrad/src/commands.rs index f0d68e8a..a2ac67b7 100644 --- a/zebrad/src/commands.rs +++ b/zebrad/src/commands.rs @@ -1,12 +1,16 @@ //! Zebrad Subcommands +mod copy_state; mod download; mod generate; mod start; mod version; use self::ZebradCmd::*; -use self::{download::DownloadCmd, generate::GenerateCmd, start::StartCmd, version::VersionCmd}; +use self::{ + copy_state::CopyStateCmd, download::DownloadCmd, generate::GenerateCmd, start::StartCmd, + version::VersionCmd, +}; use crate::config::ZebradConfig; @@ -21,6 +25,11 @@ pub const CONFIG_FILE: &str = "zebrad.toml"; /// Zebrad Subcommands #[derive(Command, Debug, Options)] pub enum ZebradCmd { + /// The `copy-state` subcommand, used to debug cached chain state + // TODO: hide this command from users in release builds (#3279) + #[options(help = "copy cached chain state (debug only)")] + CopyState(CopyStateCmd), + /// The `download` subcommand #[options(help = "pre-download required parameter files")] Download(DownloadCmd), @@ -49,7 +58,7 @@ impl ZebradCmd { pub(crate) fn is_server(&self) -> bool { match self { // List all the commands, so new commands have to make a choice here - Start(_) => true, + CopyState(_) | Start(_) => true, Download(_) | Generate(_) | Help(_) | Version(_) => false, } } @@ -58,6 +67,7 @@ impl ZebradCmd { impl Runnable for ZebradCmd { fn run(&self) { match self { + CopyState(cmd) => cmd.run(), Download(cmd) => cmd.run(), Generate(cmd) => cmd.run(), ZebradCmd::Help(cmd) => cmd.run(), diff --git a/zebrad/src/commands/copy_state.rs b/zebrad/src/commands/copy_state.rs new file mode 100644 index 00000000..8a428ebc --- /dev/null +++ b/zebrad/src/commands/copy_state.rs @@ -0,0 +1,437 @@ +//! `copy-state` subcommand - copies state from one directory to another (debug only) +//! +//! Copying state helps Zebra developers modify and debug cached state formats. +//! +//! In order to test a new state format, blocks must be identical when they are: +//! - read from the old format, +//! - written to the new format, and +//! - read from the new format. +//! +//! The "old" and "new" states can also use the same format. +//! This tests the low-level state API's performance. +//! +//! ## Command Structure +//! +//! Copying cached state uses the following services and tasks: +//! +//! Tasks: +//! * Old to New Copy Task +//! * queries the source state for blocks, +//! copies those blocks to the target state, then +//! reads the copied blocks from the target state. +//! +//! Services: +//! * Source Old State Service +//! * fetches blocks from the best finalized chain from permanent storage, +//! in the old format +//! * Target New State Service +//! * writes best finalized chain blocks to permanent storage, +//! in the new format +//! * only performs essential contextual verification of blocks, +//! to make sure that block data hasn't been corrupted by +//! receiving blocks in the new format +//! * fetches blocks from the best finalized chain from permanent storage, +//! in the new format + +use std::{cmp::min, path::PathBuf}; + +use abscissa_core::{config, Command, FrameworkError, Options, Runnable}; +use color_eyre::eyre::{eyre, Report}; +use tokio::time::Instant; +use tower::{Service, ServiceExt}; + +use zebra_chain::{block::Height, parameters::Network}; +use zebra_state as old_zs; +use zebra_state as new_zs; + +use crate::{ + components::tokio::{RuntimeRun, TokioComponent}, + config::ZebradConfig, + prelude::*, + BoxError, +}; + +/// How often we log info-level progress messages +const PROGRESS_HEIGHT_INTERVAL: u32 = 5_000; + +/// `copy-state` subcommand +#[derive(Command, Debug, Options)] +pub struct CopyStateCmd { + /// Source height that the copy finishes at. + #[options(help = "stop copying at this source height")] + max_source_height: Option, + + /// Path to a Zebra config.toml for the target state. + /// Uses an ephemeral config by default. + /// + /// Zebra only uses the state options from this config. + /// All other options are ignored. + #[options(help = "config file path for the target state (default: ephemeral), \ + the source state uses the main zebrad config")] + target_config_path: Option, + + /// Filter strings which override the config file and defaults + #[options(free, help = "tracing filters which override the zebrad.toml config")] + filters: Vec, +} + +impl CopyStateCmd { + /// Configure and launch the copy command + async fn start(&self) -> Result<(), Report> { + let base_config = app_config().clone(); + let source_config = base_config.state.clone(); + + // The default load_config impl doesn't actually modify the app config. + let target_config = self + .target_config_path + .as_ref() + .map(|path| app_writer().load_config(path)) + .transpose()? + .map(|app_config| app_config.state) + .unwrap_or_else(new_zs::Config::ephemeral); + + info!(?base_config, "state copy base config"); + + self.copy(base_config.network.network, source_config, target_config) + .await + .map_err(|e| eyre!(e)) + } + + /// Initialize the source and target states, + /// then copy from the source to the target state. + async fn copy( + &self, + network: Network, + source_config: old_zs::Config, + target_config: new_zs::Config, + ) -> Result<(), BoxError> { + info!( + ?source_config, + "initializing source state service (old format)" + ); + + let source_start_time = Instant::now(); + let (mut source_state, _source_latest_chain_tip, _source_chain_tip_change) = + old_zs::init(source_config.clone(), network); + + let elapsed = source_start_time.elapsed(); + info!(?elapsed, "finished initializing source state service"); + + info!( + ?target_config, target_config_path = ?self.target_config_path, + "initializing target state service (new format)" + ); + + let target_start_time = Instant::now(); + let (mut target_state, _target_latest_chain_tip, _target_chain_tip_change) = + new_zs::init(target_config.clone(), network); + + let elapsed = target_start_time.elapsed(); + info!(?elapsed, "finished initializing target state service"); + + info!("fetching source and target tip heights"); + + let source_tip = source_state + .ready() + .await? + .call(old_zs::Request::Tip) + .await?; + let source_tip = match source_tip { + old_zs::Response::Tip(Some(source_tip)) => source_tip, + old_zs::Response::Tip(None) => Err("empty source state: no blocks to copy")?, + + response => Err(format!( + "unexpected response to Tip request: {:?}", + response, + ))?, + }; + let source_tip_height = source_tip.0 .0; + + let initial_target_tip = target_state + .ready() + .await? + .call(new_zs::Request::Tip) + .await?; + let initial_target_tip = match initial_target_tip { + new_zs::Response::Tip(target_tip) => target_tip, + + response => Err(format!( + "unexpected response to Tip request: {:?}", + response, + ))?, + }; + let min_target_height = initial_target_tip + .map(|target_tip| target_tip.0 .0 + 1) + .unwrap_or(0); + + let max_copy_height = self + .max_source_height + .map(|max_source_height| min(source_tip_height, max_source_height)) + .unwrap_or(source_tip_height); + + if min_target_height >= max_copy_height { + info!( + ?min_target_height, + ?max_copy_height, + max_source_height = ?self.max_source_height, + ?source_tip, + ?initial_target_tip, + "target is already at or after max copy height" + ); + + return Ok(()); + } + + info!( + ?min_target_height, + ?max_copy_height, + max_source_height = ?self.max_source_height, + ?source_tip, + ?initial_target_tip, + "starting copy from source to target" + ); + + let copy_start_time = Instant::now(); + for height in min_target_height..=max_copy_height { + // Read block from source + let source_block = source_state + .ready() + .await? + .call(old_zs::Request::Block(Height(height).into())) + .await?; + let source_block = match source_block { + old_zs::Response::Block(Some(source_block)) => { + trace!(?height, %source_block, "read source block"); + source_block + } + old_zs::Response::Block(None) => Err(format!( + "unexpected missing source block, height: {}", + height, + ))?, + + response => Err(format!( + "unexpected response to Block request, height: {}, \n \ + response: {:?}", + height, response, + ))?, + }; + let source_block_hash = source_block.hash(); + + // Write block to target + let target_block_commit_hash = target_state + .ready() + .await? + .call(new_zs::Request::CommitFinalizedBlock( + source_block.clone().into(), + )) + .await?; + let target_block_commit_hash = match target_block_commit_hash { + new_zs::Response::Committed(target_block_commit_hash) => { + trace!(?target_block_commit_hash, "wrote target block"); + target_block_commit_hash + } + response => Err(format!( + "unexpected response to CommitFinalizedBlock request, height: {}\n \ + response: {:?}", + height, response, + ))?, + }; + + // Read written block from target + let target_block = target_state + .ready() + .await? + .call(new_zs::Request::Block(Height(height).into())) + .await?; + let target_block = match target_block { + new_zs::Response::Block(Some(target_block)) => { + trace!(?height, %target_block, "read target block"); + target_block + } + new_zs::Response::Block(None) => Err(format!( + "unexpected missing target block, height: {}", + height, + ))?, + + response => Err(format!( + "unexpected response to Block request, height: {},\n \ + response: {:?}", + height, response, + ))?, + }; + let target_block_data_hash = target_block.hash(); + + // Check for data errors + // + // These checks make sure that Zebra doesn't corrupt the block data + // when serializing it in the new format. + // Zebra currently serializes `Block` structs into bytes while writing, + // then deserializes bytes into new `Block` structs when reading. + // So these checks are sufficient to detect block data corruption. + // + // If Zebra starts re-using cached `Block` structs after writing them, + // we'll also need to check `Block` structs created from the actual database bytes. + if source_block_hash != target_block_commit_hash + || source_block_hash != target_block_data_hash + || source_block != target_block + { + Err(format!( + "unexpected mismatch between source and target blocks,\n \ + max copy height: {:?},\n \ + source hash: {:?},\n \ + target commit hash: {:?},\n \ + target data hash: {:?},\n \ + source block: {:?},\n \ + target block: {:?}", + max_copy_height, + source_block_hash, + target_block_commit_hash, + target_block_data_hash, + source_block, + target_block, + ))?; + } + + // Log progress + if height % PROGRESS_HEIGHT_INTERVAL == 0 { + let elapsed = copy_start_time.elapsed(); + info!( + ?height, + ?max_copy_height, + ?elapsed, + "copied block from source to target" + ); + } + } + + let elapsed = copy_start_time.elapsed(); + info!(?max_copy_height, ?elapsed, "finished copying blocks"); + + info!(?max_copy_height, "fetching final target tip"); + + let final_target_tip = target_state + .ready() + .await? + .call(new_zs::Request::Tip) + .await?; + let final_target_tip = match final_target_tip { + new_zs::Response::Tip(Some(target_tip)) => target_tip, + new_zs::Response::Tip(None) => Err("empty target state: expected written blocks")?, + + response => Err(format!( + "unexpected response to Tip request: {:?}", + response, + ))?, + }; + let final_target_tip_height = final_target_tip.0 .0; + let final_target_tip_hash = final_target_tip.1; + + let target_tip_source_depth = source_state + .ready() + .await? + .call(old_zs::Request::Depth(final_target_tip_hash)) + .await?; + let target_tip_source_depth = match target_tip_source_depth { + old_zs::Response::Depth(source_depth) => source_depth, + + response => Err(format!( + "unexpected response to Depth request: {:?}", + response, + ))?, + }; + + // Check the tips match + // + // This check works because Zebra doesn't cache tip structs. + // (See details above.) + if max_copy_height == source_tip_height { + let expected_target_depth = Some(0); + if source_tip != final_target_tip || target_tip_source_depth != expected_target_depth { + Err(format!( + "unexpected mismatch between source and target tips,\n \ + max copy height: {:?},\n \ + source tip: {:?},\n \ + target tip: {:?},\n \ + actual target tip depth in source: {:?},\n \ + expect target tip depth in source: {:?}", + max_copy_height, + source_tip, + final_target_tip, + target_tip_source_depth, + expected_target_depth, + ))?; + } else { + info!( + ?max_copy_height, + ?source_tip, + ?final_target_tip, + ?target_tip_source_depth, + "source and target states contain the same blocks" + ); + } + } else { + let expected_target_depth = source_tip_height.checked_sub(final_target_tip_height); + if target_tip_source_depth != expected_target_depth { + Err(format!( + "unexpected mismatch between source and target tips,\n \ + max copy height: {:?},\n \ + source tip: {:?},\n \ + target tip: {:?},\n \ + actual target tip depth in source: {:?},\n \ + expect target tip depth in source: {:?}", + max_copy_height, + source_tip, + final_target_tip, + target_tip_source_depth, + expected_target_depth, + ))?; + } else { + info!( + ?max_copy_height, + ?source_tip, + ?final_target_tip, + ?target_tip_source_depth, + "target state reached the max copy height" + ); + } + } + + Ok(()) + } +} + +impl Runnable for CopyStateCmd { + /// Start the application. + fn run(&self) { + info!( + max_source_height = ?self.max_source_height, + target_config_path = ?self.target_config_path, + "starting cached chain state copy" + ); + let rt = app_writer() + .state_mut() + .components + .get_downcast_mut::() + .expect("TokioComponent should be available") + .rt + .take(); + + rt.expect("runtime should not already be taken") + .run(self.start()); + + info!("finished cached chain state copy"); + } +} + +impl config::Override for CopyStateCmd { + // Process the given command line options, overriding settings from + // a configuration file using explicit flags taken from command-line + // arguments. + fn override_config(&self, mut config: ZebradConfig) -> Result { + if !self.filters.is_empty() { + config.tracing.filter = Some(self.filters.join(",")); + } + + Ok(config) + } +} diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index 26672069..91111ae2 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -1,8 +1,8 @@ //! `start` subcommand - entry point for starting a zebra node //! -//! ## Application Structure +//! ## Application Structure //! -//! A zebra node consists of the following services and tasks: +//! A zebra node consists of the following services and tasks: //! //! Peers: //! * Network Service @@ -268,6 +268,8 @@ impl Runnable for StartCmd { rt.expect("runtime should not already be taken") .run(self.start()); + + info!("stopping zebrad"); } } diff --git a/zebrad/src/components/tokio.rs b/zebrad/src/components/tokio.rs index afa0d20c..d36e1440 100644 --- a/zebrad/src/components/tokio.rs +++ b/zebrad/src/components/tokio.rs @@ -59,11 +59,13 @@ impl RuntimeRun for Runtime { match result { Ok(()) => { + info!("shutting down Zebra"); + // Don't wait for the runtime to shut down all the tasks. app_writer().shutdown(Shutdown::Graceful); } - Err(e) => { - eprintln!("Error: {:?}", e); + Err(error) => { + warn!(?error, "shutting down Zebra due to an error"); app_writer().shutdown(Shutdown::Forced); } }