From 2c0bc3ac9214e98186270aa7977dcdb51691338c Mon Sep 17 00:00:00 2001 From: Arya Date: Wed, 7 Feb 2024 17:36:01 -0500 Subject: [PATCH] add(scan): Start scanner gRPC server with `zebrad` (#8241) * adds clear_results RPC method for zebra-scan * adds delete_keys rpc method * adds docs * Update zebra-grpc/proto/scanner.proto Co-authored-by: Alfredo Garcia * Apply suggestions from code review * start zebra-scan gRPC server from zebrad start command * adds a test that the scanner starts with zebrad * adds a `listen_addr` field to the shielded scan config * updates test to use a random port and set the listen_addr config field * fixes test * Update zebra-scan/src/config.rs Co-authored-by: Alfredo Garcia * fixes panic when trying to open multiple mutable storage instances. * open db in blocking task * fixes test --------- Co-authored-by: Alfredo Garcia --- Cargo.lock | 1 + zebra-grpc/src/lib.rs | 5 ++ zebra-grpc/src/server.rs | 23 ++++---- zebra-scan/src/bin/rpc_server.rs | 2 +- zebra-scan/src/config.rs | 19 ++++++- zebra-scan/src/init.rs | 55 +++++++++++++++---- zebra-scan/src/lib.rs | 2 +- zebra-scan/src/service.rs | 13 +++-- zebra-scan/src/service/scan_task.rs | 18 +----- zebra-scan/src/service/scan_task/scan.rs | 27 +-------- zebrad/Cargo.toml | 1 + zebrad/src/commands/start.rs | 29 ++++------ zebrad/tests/acceptance.rs | 47 ++++++++++++++++ zebrad/tests/common/config.rs | 3 +- .../common/shielded_scan/scans_for_new_key.rs | 10 ++-- 15 files changed, 161 insertions(+), 94 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8d3043c4..e80fd818 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6065,6 +6065,7 @@ dependencies = [ "vergen", "zebra-chain", "zebra-consensus", + "zebra-grpc", "zebra-network", "zebra-node-services", "zebra-rpc", diff --git a/zebra-grpc/src/lib.rs b/zebra-grpc/src/lib.rs index dea37a1d..89198aed 100644 --- a/zebra-grpc/src/lib.rs +++ b/zebra-grpc/src/lib.rs @@ -5,3 +5,8 @@ #![doc(html_root_url = "https://docs.rs/zebra_grpc")] pub mod server; + +/// The generated scanner proto +pub mod scanner { + tonic::include_proto!("scanner"); +} diff --git a/zebra-grpc/src/server.rs b/zebra-grpc/src/server.rs index 39f2d02f..c1d3e0bb 100644 --- a/zebra-grpc/src/server.rs +++ b/zebra-grpc/src/server.rs @@ -1,20 +1,19 @@ //! The gRPC server implementation +use std::net::SocketAddr; + use futures_util::future::TryFutureExt; use tonic::{transport::Server, Response, Status}; use tower::ServiceExt; -use scanner::scanner_server::{Scanner, ScannerServer}; -use scanner::{ClearResultsRequest, DeleteKeysRequest, Empty, InfoReply}; - use zebra_node_services::scan_service::{ request::Request as ScanServiceRequest, response::Response as ScanServiceResponse, }; -/// The generated scanner proto -pub mod scanner { - tonic::include_proto!("scanner"); -} +use crate::scanner::{ + scanner_server::{Scanner, ScannerServer}, + ClearResultsRequest, DeleteKeysRequest, Empty, InfoReply, +}; type BoxError = Box; @@ -61,7 +60,7 @@ where )); }; - let reply = scanner::InfoReply { + let reply = InfoReply { min_sapling_birthday_height: min_sapling_birthday_height.0, }; @@ -124,7 +123,10 @@ where } /// Initializes the zebra-scan gRPC server -pub async fn init(scan_service: ScanService) -> Result<(), color_eyre::Report> +pub async fn init( + listen_addr: SocketAddr, + scan_service: ScanService, +) -> Result<(), color_eyre::Report> where ScanService: tower::Service + Clone @@ -133,12 +135,11 @@ where + 'static, >::Future: Send, { - let addr = "[::1]:50051".parse()?; let service = ScannerRPC { scan_service }; Server::builder() .add_service(ScannerServer::new(service)) - .serve(addr) + .serve(listen_addr) .await?; Ok(()) diff --git a/zebra-scan/src/bin/rpc_server.rs b/zebra-scan/src/bin/rpc_server.rs index 9ebe2b00..1463df3f 100644 --- a/zebra-scan/src/bin/rpc_server.rs +++ b/zebra-scan/src/bin/rpc_server.rs @@ -14,7 +14,7 @@ async fn main() -> Result<(), Box> { let scan_service = ServiceBuilder::new().buffer(10).service(scan_service); // Start the gRPC server. - zebra_grpc::server::init(scan_service).await?; + zebra_grpc::server::init("127.0.0.1:8231".parse()?, scan_service).await?; Ok(()) } diff --git a/zebra-scan/src/config.rs b/zebra-scan/src/config.rs index aba8a75a..915a5e32 100644 --- a/zebra-scan/src/config.rs +++ b/zebra-scan/src/config.rs @@ -1,6 +1,6 @@ //! Configuration for blockchain scanning tasks. -use std::fmt::Debug; +use std::{fmt::Debug, net::SocketAddr}; use indexmap::IndexMap; use serde::{Deserialize, Serialize}; @@ -20,6 +20,20 @@ pub struct Config { // TODO: allow keys without birthdays pub sapling_keys_to_scan: IndexMap, + /// IP address and port for the zebra-scan gRPC server. + /// + /// Note: The gRPC server is disabled by default. + /// To enable the gRPC server, set a listen address in the config: + /// ```toml + /// [shielded-scan] + /// listen_addr = '127.0.0.1:8231' + /// ``` + /// + /// The recommended ports for the gRPC server are: + /// - Mainnet: 127.0.0.1:8231 + /// - Testnet: 127.0.0.1:18231 + pub listen_addr: Option, + /// The scanner results database config. // // TODO: Remove fields that are only used by the state, and create a common database config. @@ -41,6 +55,9 @@ impl Default for Config { fn default() -> Self { Self { sapling_keys_to_scan: IndexMap::new(), + listen_addr: None, + + // TODO: Add a const generic for specifying the default cache_dir path, like 'zebra' or 'zebra-scan'? db_config: DbConfig::default(), } } diff --git a/zebra-scan/src/init.rs b/zebra-scan/src/init.rs index 36001434..5344acc5 100644 --- a/zebra-scan/src/init.rs +++ b/zebra-scan/src/init.rs @@ -1,31 +1,66 @@ //! Initializing the scanner and gRPC server. +use std::net::SocketAddr; + use color_eyre::Report; +use tokio::task::JoinHandle; use tower::ServiceBuilder; -use zebra_chain::parameters::Network; +use tracing::Instrument; +use zebra_chain::{diagnostic::task::WaitForPanics, parameters::Network}; use zebra_state::ChainTipChange; -use crate::{scan, service::ScanService, Config}; +use crate::{scan, service::ScanService, storage::Storage, Config}; /// Initialize [`ScanService`] based on its config. /// /// TODO: add a test for this function. -pub async fn init( +pub async fn init_with_server( + listen_addr: SocketAddr, config: Config, network: Network, state: scan::State, chain_tip_change: ChainTipChange, ) -> Result<(), Report> { - let scan_service = ServiceBuilder::new().buffer(10).service(ScanService::new( - &config, - network, - state, - chain_tip_change, - )); + info!(?config, "starting scan service"); + let scan_service = ServiceBuilder::new() + .buffer(10) + .service(ScanService::new(&config, network, state, chain_tip_change).await); + + // TODO: move this to zebra-grpc init() function and include addr + info!("starting scan gRPC server"); // Start the gRPC server. - zebra_grpc::server::init(scan_service).await?; + zebra_grpc::server::init(listen_addr, scan_service).await?; Ok(()) } + +/// Initialize the scanner and its gRPC server based on its config, and spawn a task for it. +pub fn spawn_init( + config: Config, + network: Network, + state: scan::State, + chain_tip_change: ChainTipChange, +) -> JoinHandle> { + if let Some(listen_addr) = config.listen_addr { + // TODO: spawn an entirely new executor here, to avoid timing attacks. + tokio::spawn( + init_with_server(listen_addr, config, network, state, chain_tip_change) + .in_current_span(), + ) + } else { + // TODO: spawn an entirely new executor here, to avoid timing attacks. + tokio::spawn( + async move { + let storage = + tokio::task::spawn_blocking(move || Storage::new(&config, network, false)) + .wait_for_panics() + .await; + let (_cmd_sender, cmd_receiver) = std::sync::mpsc::channel(); + scan::start(state, chain_tip_change, storage, cmd_receiver).await + } + .in_current_span(), + ) + } +} diff --git a/zebra-scan/src/lib.rs b/zebra-scan/src/lib.rs index 59594c8b..e1cf2d93 100644 --- a/zebra-scan/src/lib.rs +++ b/zebra-scan/src/lib.rs @@ -21,6 +21,6 @@ pub use service::scan_task::scan; pub mod tests; pub use config::Config; -pub use init::init; +pub use init::{init_with_server, spawn_init}; pub use zcash_primitives::{sapling::SaplingIvk, zip32::DiversifiableFullViewingKey}; diff --git a/zebra-scan/src/service.rs b/zebra-scan/src/service.rs index 3dee20f7..22d7bb16 100644 --- a/zebra-scan/src/service.rs +++ b/zebra-scan/src/service.rs @@ -5,7 +5,7 @@ use std::{collections::BTreeMap, future::Future, pin::Pin, task::Poll, time::Dur use futures::future::FutureExt; use tower::Service; -use zebra_chain::{parameters::Network, transaction::Hash}; +use zebra_chain::{diagnostic::task::WaitForPanics, parameters::Network, transaction::Hash}; use zebra_state::ChainTipChange; @@ -36,15 +36,20 @@ const DELETE_KEY_TIMEOUT: Duration = Duration::from_secs(15); impl ScanService { /// Create a new [`ScanService`]. - pub fn new( + pub async fn new( config: &Config, network: Network, state: scan::State, chain_tip_change: ChainTipChange, ) -> Self { + let config = config.clone(); + let storage = tokio::task::spawn_blocking(move || Storage::new(&config, network, false)) + .wait_for_panics() + .await; + Self { - db: Storage::new(config, network, false), - scan_task: ScanTask::spawn(config, network, state, chain_tip_change), + scan_task: ScanTask::spawn(storage.clone(), state, chain_tip_change), + db: storage, } } diff --git a/zebra-scan/src/service/scan_task.rs b/zebra-scan/src/service/scan_task.rs index f149b383..d7bd4a04 100644 --- a/zebra-scan/src/service/scan_task.rs +++ b/zebra-scan/src/service/scan_task.rs @@ -5,10 +5,9 @@ use std::sync::{mpsc, Arc}; use color_eyre::Report; use tokio::task::JoinHandle; -use zebra_chain::parameters::Network; use zebra_state::ChainTipChange; -use crate::Config; +use crate::storage::Storage; mod commands; mod executor; @@ -31,23 +30,12 @@ pub struct ScanTask { impl ScanTask { /// Spawns a new [`ScanTask`]. - pub fn spawn( - config: &Config, - network: Network, - state: scan::State, - chain_tip_change: ChainTipChange, - ) -> Self { + pub fn spawn(db: Storage, state: scan::State, chain_tip_change: ChainTipChange) -> Self { // TODO: Use a bounded channel or move this logic to the scan service or another service. let (cmd_sender, cmd_receiver) = mpsc::channel(); Self { - handle: Arc::new(scan::spawn_init( - config, - network, - state, - chain_tip_change, - cmd_receiver, - )), + handle: Arc::new(scan::spawn_init(db, state, chain_tip_change, cmd_receiver)), cmd_sender, } } diff --git a/zebra-scan/src/service/scan_task/scan.rs b/zebra-scan/src/service/scan_task/scan.rs index 9af1e3ef..e9afb36c 100644 --- a/zebra-scan/src/service/scan_task/scan.rs +++ b/zebra-scan/src/service/scan_task/scan.rs @@ -39,7 +39,6 @@ use zebra_state::{ChainTipChange, SaplingScannedResult, TransactionIndex}; use crate::{ service::{ScanTask, ScanTaskCommand}, storage::{SaplingScanningKey, Storage}, - Config, }; use super::executor; @@ -489,32 +488,10 @@ async fn tip_height(mut state: State) -> Result { /// /// TODO: add a test for this function. pub fn spawn_init( - config: &Config, - network: Network, + storage: Storage, state: State, chain_tip_change: ChainTipChange, cmd_receiver: Receiver, ) -> JoinHandle> { - let config = config.clone(); - - // TODO: spawn an entirely new executor here, to avoid timing attacks. - tokio::spawn(init(config, network, state, chain_tip_change, cmd_receiver).in_current_span()) -} - -/// Initialize the scanner based on its config. -/// -/// TODO: add a test for this function. -pub async fn init( - config: Config, - network: Network, - state: State, - chain_tip_change: ChainTipChange, - cmd_receiver: Receiver, -) -> Result<(), Report> { - let storage = tokio::task::spawn_blocking(move || Storage::new(&config, network, false)) - .wait_for_panics() - .await; - - // TODO: add more tasks here? - start(state, chain_tip_change, storage, cmd_receiver).await + tokio::spawn(start(state, chain_tip_change, storage, cmd_receiver).in_current_span()) } diff --git a/zebrad/Cargo.toml b/zebrad/Cargo.toml index c6e51792..89e57aa8 100644 --- a/zebrad/Cargo.toml +++ b/zebrad/Cargo.toml @@ -289,6 +289,7 @@ zebra-scan = { path = "../zebra-scan", features = ["proptest-impl"] } zebra-node-services = { path = "../zebra-node-services", features = ["rpc-client"] } zebra-test = { path = "../zebra-test" } +zebra-grpc = { path = "../zebra-grpc" } # Used by the checkpoint generation tests via the zebra-checkpoints feature # (the binaries in this crate won't be built unless their features are enabled). diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index 78af9b15..0e113e97 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -301,25 +301,16 @@ impl StartCmd { #[cfg(feature = "shielded-scan")] // Spawn never ending scan task only if we have keys to scan for. - let (scan_task_handle, _cmd_sender) = - if !config.shielded_scan.sapling_keys_to_scan.is_empty() { - // TODO: log the number of keys and update the scan_task_starts() test - info!("spawning shielded scanner with configured viewing keys"); - let scan_task = zebra_scan::service::scan_task::ScanTask::spawn( - &config.shielded_scan, - config.network.network, - state, - chain_tip_change, - ); - - ( - std::sync::Arc::into_inner(scan_task.handle) - .expect("should only have one reference here"), - Some(scan_task.cmd_sender), - ) - } else { - (tokio::spawn(std::future::pending().in_current_span()), None) - }; + let scan_task_handle = { + // TODO: log the number of keys and update the scan_task_starts() test + info!("spawning shielded scanner with configured viewing keys"); + zebra_scan::spawn_init( + config.shielded_scan.clone(), + config.network.network, + state, + chain_tip_change, + ) + }; #[cfg(not(feature = "shielded-scan"))] // Spawn a dummy scan task which doesn't do anything and never finishes. diff --git a/zebrad/tests/acceptance.rs b/zebrad/tests/acceptance.rs index 91ea1c8a..505f6c54 100644 --- a/zebrad/tests/acceptance.rs +++ b/zebrad/tests/acceptance.rs @@ -2866,6 +2866,53 @@ fn scan_task_starts() -> Result<()> { Ok(()) } +/// Test that the scanner gRPC server starts when the node starts. +#[tokio::test] +#[cfg(feature = "shielded-scan")] +async fn scan_rpc_server_starts() -> Result<()> { + use zebra_grpc::scanner::{scanner_client::ScannerClient, Empty}; + + let _init_guard = zebra_test::init(); + + let test_type = TestType::LaunchWithEmptyState { + launches_lightwalletd: false, + }; + + let port = random_known_port(); + let listen_addr = format!("127.0.0.1:{port}"); + let mut config = default_test_config(Mainnet)?; + config.shielded_scan.listen_addr = Some(listen_addr.parse()?); + + // Start zebra with the config. + let mut zebrad = testdir()? + .with_exact_config(&config)? + .spawn_child(args!["start"])? + .with_timeout(test_type.zebrad_timeout()); + + // Wait until gRPC server is starting. + tokio::time::sleep(LAUNCH_DELAY).await; + zebrad.expect_stdout_line_matches("starting scan gRPC server")?; + tokio::time::sleep(Duration::from_secs(1)).await; + + let mut client = ScannerClient::connect(format!("http://{listen_addr}")).await?; + + let request = tonic::Request::new(Empty {}); + + client.get_info(request).await?; + + // Kill the node. + zebrad.kill(false)?; + + // Check that scan task started and the first scanning is done. + let output = zebrad.wait_with_output()?; + + // Make sure the command was killed + output.assert_was_killed()?; + output.assert_failure()?; + + Ok(()) +} + /// Test that the scanner can continue scanning where it was left when zebrad restarts. /// /// Needs a cache state close to the tip. A possible way to run it locally is: diff --git a/zebrad/tests/common/config.rs b/zebrad/tests/common/config.rs index b75113e8..a67f1d48 100644 --- a/zebrad/tests/common/config.rs +++ b/zebrad/tests/common/config.rs @@ -82,7 +82,8 @@ pub fn default_test_config(net: Network) -> Result { #[cfg(feature = "shielded-scan")] { - let shielded_scan = zebra_scan::Config::ephemeral(); + let mut shielded_scan = zebra_scan::Config::ephemeral(); + shielded_scan.db_config_mut().cache_dir = "zebra-scan".into(); let config = ZebradConfig { network, diff --git a/zebrad/tests/common/shielded_scan/scans_for_new_key.rs b/zebrad/tests/common/shielded_scan/scans_for_new_key.rs index 35f0c320..20075b7a 100644 --- a/zebrad/tests/common/shielded_scan/scans_for_new_key.rs +++ b/zebrad/tests/common/shielded_scan/scans_for_new_key.rs @@ -82,15 +82,13 @@ pub(crate) async fn run() -> Result<()> { tracing::info!("opened state service with valid chain tip height, deleting any past keys in db and starting scan task",); - { - // Before spawning `ScanTask`, delete past results for the zecpages key, if any. - let mut storage = Storage::new(&shielded_scan_config, network, false); - storage.delete_sapling_keys(vec![ZECPAGES_SAPLING_VIEWING_KEY.to_string()]); - } + // Before spawning `ScanTask`, delete past results for the zecpages key, if any. + let mut storage = Storage::new(&shielded_scan_config, network, false); + storage.delete_sapling_keys(vec![ZECPAGES_SAPLING_VIEWING_KEY.to_string()]); let state = ServiceBuilder::new().buffer(10).service(state_service); - let mut scan_task = ScanTask::spawn(&shielded_scan_config, network, state, chain_tip_change); + let mut scan_task = ScanTask::spawn(storage, state, chain_tip_change); let (zecpages_dfvks, zecpages_ivks) = sapling_key_to_scan_block_keys(&ZECPAGES_SAPLING_VIEWING_KEY.to_string(), network)?;