diff --git a/Cargo.lock b/Cargo.lock index e3aeba52..d4f6ed64 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6473,6 +6473,7 @@ dependencies = [ "jsonrpc-core", "jsonrpc-derive", "jsonrpc-http-server", + "num_cpus", "proptest", "proptest-derive", "serde", diff --git a/README.md b/README.md index 85470f0a..b187ba45 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,7 @@ - [Beta Releases](#beta-releases) - [Getting Started](#getting-started) - [Build and Run Instructions](#build-and-run-instructions) + - [Configuring JSON-RPC for lightwalletd](#configuring-json-rpc-for-lightwalletd) - [Optional Features](#optional-features) - [System Requirements](#system-requirements) - [Memory Troubleshooting](#memory-troubleshooting) @@ -91,6 +92,22 @@ for your platform: For more detailed instructions, refer to the [documentation](https://zebra.zfnd.org/user/install.html). +### Configuring JSON-RPC for lightwalletd + +To use `zebrad` as a `lightwalletd` backend, give it this `~/.config/zebrad.toml`: + +```toml +[rpc] +# listen for RPC queries on localhost +listen_addr = '127.0.0.1:8232' + +# automatically use multiple CPU threads +parallel_cpu_threads = 0 +``` + +**WARNING:** This config allows multiple Zebra instances to share the same RPC port. +See the [RPC config documentation](https://doc.zebra.zfnd.org/zebra_rpc/config/struct.Config.html) for details. + ### Optional Features For performance reasons, some debugging and monitoring features are disabled in release builds. diff --git a/zebra-rpc/Cargo.toml b/zebra-rpc/Cargo.toml index a8a9e82d..9b4affdd 100644 --- a/zebra-rpc/Cargo.toml +++ b/zebra-rpc/Cargo.toml @@ -21,6 +21,8 @@ hyper = { version = "0.14.20", features = ["http1", "server"] } jsonrpc-core = "18.0.0" jsonrpc-derive = "18.0.0" jsonrpc-http-server = "18.0.0" +num_cpus = "1.13.1" + # zebra-rpc needs the preserve_order feature in serde_json, which is a dependency of jsonrpc-core serde_json = { version = "1.0.85", features = ["preserve_order"] } indexmap = { version = "1.9.1", features = ["serde"] } diff --git a/zebra-rpc/src/config.rs b/zebra-rpc/src/config.rs index 8f3d6d29..d8cb4127 100644 --- a/zebra-rpc/src/config.rs +++ b/zebra-rpc/src/config.rs @@ -5,7 +5,7 @@ use std::net::SocketAddr; use serde::{Deserialize, Serialize}; /// RPC configuration section. -#[derive(Clone, Debug, Default, Deserialize, Serialize)] +#[derive(Clone, Debug, Deserialize, Serialize)] #[serde(deny_unknown_fields, default)] pub struct Config { /// IP address and port for the RPC server. @@ -27,4 +27,38 @@ pub struct Config { /// anyone on the internet can send transactions via your node. /// They can also query your node's state. pub listen_addr: Option, + + /// The number of threads used to process RPC requests and responses. + /// + /// Zebra's RPC server has a separate thread pool and a `tokio` executor for each thread. + /// State queries are run concurrently using the shared thread pool controlled by + /// the [`SyncSection.parallel_cpu_threads`](https://doc.zebra.zfnd.org/zebrad/config/struct.SyncSection.html#structfield.parallel_cpu_threads) config. + /// + /// We recommend setting both configs to `0` (automatic scaling) for the best performance. + /// This uses one thread per available CPU core. + /// + /// Set to `1` by default, which runs all RPC queries on a single thread, and detects RPC + /// port conflicts from multiple Zebra or `zcashd` instances. + /// + /// For details, see [the `jsonrpc_http_server` documentation](https://docs.rs/jsonrpc-http-server/latest/jsonrpc_http_server/struct.ServerBuilder.html#method.threads). + /// + /// ## Warning + /// + /// Changing this config disables RPC port conflict detection. + /// This can allow multiple Zebra instances to share the same RPC port. + /// + /// If some of those instances are outdated or failed, RPC queries can be slow or inconsistent. + pub parallel_cpu_threads: usize, +} + +impl Default for Config { + fn default() -> Self { + Self { + // Disable RPCs by default. + listen_addr: None, + + // Use a single thread, so we can detect RPC port conflicts. + parallel_cpu_threads: 1, + } + } } diff --git a/zebra-rpc/src/server.rs b/zebra-rpc/src/server.rs index c63594df..b8cd316a 100644 --- a/zebra-rpc/src/server.rs +++ b/zebra-rpc/src/server.rs @@ -73,6 +73,12 @@ impl RpcServer { MetaIoHandler::new(Compatibility::Both, TracingMiddleware); io.extend_with(rpc_impl.to_delegate()); + // If zero, automatically scale threads to the number of CPU cores + let mut parallel_cpu_threads = config.parallel_cpu_threads; + if parallel_cpu_threads == 0 { + parallel_cpu_threads = num_cpus::get(); + } + // The server is a blocking task, which blocks on executor shutdown. // So we need to create and spawn it on a std::thread, inside a tokio blocking task. // (Otherwise tokio panics when we shut down the RPC server.) @@ -85,12 +91,8 @@ impl RpcServer { // TODO: // - return server.close_handle(), which can shut down the RPC server, // and add it to the server tests - // - allow multiple RPC threads - // (when jsonrpc_http_server has multiple threads, - // it lets any process share its port - do we need to warn users?) - // - make the number of RPC threads configurable let server = ServerBuilder::new(io) - .threads(1) + .threads(parallel_cpu_threads) // TODO: disable this security check if we see errors from lightwalletd //.allowed_hosts(DomainsValidation::Disabled) .request_middleware(FixHttpRequestMiddleware) diff --git a/zebra-rpc/src/server/tests/vectors.rs b/zebra-rpc/src/server/tests/vectors.rs index 465cec54..bb9ce71b 100644 --- a/zebra-rpc/src/server/tests/vectors.rs +++ b/zebra-rpc/src/server/tests/vectors.rs @@ -15,14 +15,29 @@ use zebra_test::mock_service::MockService; use super::super::*; -/// Test if the RPC server will spawn on a randomly generated port. +/// Test that the JSON-RPC server spawns when configured with a single thread. #[test] -fn rpc_server_spawn() { +fn rpc_server_spawn_single_thread() { + rpc_server_spawn(false) +} + +/// Test that the JSON-RPC server spawns when configured with multiple threads. +#[test] +fn rpc_sever_spawn_parallel_threads() { + rpc_server_spawn(true) +} + +/// Test if the RPC server will spawn on a randomly generated port. +/// +/// Set `parallel_cpu_threads` to true to auto-configure based on the number of CPU cores. +#[tracing::instrument] +fn rpc_server_spawn(parallel_cpu_threads: bool) { let _init_guard = zebra_test::init(); let port = zebra_test::net::random_known_port(); let config = Config { listen_addr: Some(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()), + parallel_cpu_threads: if parallel_cpu_threads { 2 } else { 1 }, }; let rt = tokio::runtime::Runtime::new().unwrap(); @@ -62,14 +77,31 @@ fn rpc_server_spawn() { rt.shutdown_timeout(Duration::from_secs(1)); } -/// Test if the RPC server will spawn on an OS-assigned unallocated port. +/// Test that the JSON-RPC server spawns when configured with a single thread, +/// on an OS-assigned unallocated port. #[test] -fn rpc_server_spawn_unallocated_port() { +fn rpc_server_spawn_unallocated_port_single_thread() { + rpc_server_spawn_unallocated_port(false) +} + +/// Test that the JSON-RPC server spawn when configured with multiple threads, +/// on an OS-assigned unallocated port. +#[test] +fn rpc_sever_spawn_unallocated_port_parallel_threads() { + rpc_server_spawn_unallocated_port(true) +} + +/// Test if the RPC server will spawn on an OS-assigned unallocated port. +/// +/// Set `parallel_cpu_threads` to true to auto-configure based on the number of CPU cores. +#[tracing::instrument] +fn rpc_server_spawn_unallocated_port(parallel_cpu_threads: bool) { let _init_guard = zebra_test::init(); let port = zebra_test::net::random_unallocated_port(); let config = Config { listen_addr: Some(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()), + parallel_cpu_threads: if parallel_cpu_threads { 0 } else { 1 }, }; let rt = tokio::runtime::Runtime::new().unwrap(); @@ -110,9 +142,6 @@ fn rpc_server_spawn_unallocated_port() { } /// Test if the RPC server will panic correctly when there is a port conflict. -/// -/// TODO: update this test when the number of threads is configurable -/// (when jsonrpc_http_server has multiple threads, it lets any process share its port!) #[test] #[should_panic(expected = "Unable to start RPC server")] fn rpc_server_spawn_port_conflict() { @@ -121,6 +150,7 @@ fn rpc_server_spawn_port_conflict() { let port = zebra_test::net::random_known_port(); let config = Config { listen_addr: Some(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()), + parallel_cpu_threads: 1, }; let rt = tokio::runtime::Runtime::new().unwrap(); @@ -199,3 +229,99 @@ fn rpc_server_spawn_port_conflict() { }, } } + +/// Check if the RPC server detects a port conflict when running parallel threads. +/// +/// If this test fails, that's great! +/// We can make parallel the default, and remove the warnings in the config docs. +#[test] +fn rpc_server_spawn_port_conflict_parallel_auto() { + let _init_guard = zebra_test::init(); + + let port = zebra_test::net::random_known_port(); + let config = Config { + listen_addr: Some(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()), + parallel_cpu_threads: 2, + }; + + let rt = tokio::runtime::Runtime::new().unwrap(); + + let test_task_handle = rt.spawn(async { + let mut mempool: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); + let mut state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); + + info!("spawning parallel RPC server 1..."); + + let (_rpc_server_1_task_handle, _rpc_tx_queue_1_task_handle) = RpcServer::spawn( + config.clone(), + "RPC server 1 test", + Buffer::new(mempool.clone(), 1), + Buffer::new(state.clone(), 1), + NoChainTip, + Mainnet, + ); + + tokio::time::sleep(Duration::from_secs(3)).await; + + info!("spawning parallel conflicted RPC server 2..."); + + let (rpc_server_2_task_handle, _rpc_tx_queue_2_task_handle) = RpcServer::spawn( + config, + "RPC server 2 conflict test", + Buffer::new(mempool.clone(), 1), + Buffer::new(state.clone(), 1), + NoChainTip, + Mainnet, + ); + + info!("spawned RPC servers, checking services..."); + + mempool.expect_no_requests().await; + state.expect_no_requests().await; + + // Because there might be a panic inside a multi-threaded executor, + // we can't depend on the exact behaviour of the other tasks, + // particularly across different machines and OSes. + + // The second server doesn't panic, but we'd like it to. + // (See the function docs for details.) + let rpc_server_2_task_result = rpc_server_2_task_handle.await; + match rpc_server_2_task_result { + Ok(()) => info!( + "Parallel RPC server with conflicting port should exit with an error: \ + but we're ok with it ignoring the conflict for now" + ), + Err(join_error) => match join_error.try_into_panic() { + Ok(panic_object) => panic::resume_unwind(panic_object), + Err(cancelled_error) => info!( + "Parallel RPC server with conflicting port should exit with an error: \ + but we're ok with it ignoring the conflict for now: \ + unexpected JoinError: {cancelled_error:?}" + ), + }, + } + + // Ignore the queue task result + }); + + // Wait until the spawned task finishes + std::thread::sleep(Duration::from_secs(10)); + + info!("waiting for parallel RPC server to shut down..."); + rt.shutdown_timeout(Duration::from_secs(3)); + + match test_task_handle.now_or_never() { + Some(Ok(())) => { + info!("parallel RPC server task successfully exited"); + } + None => panic!("unexpected test task hang"), + Some(Err(join_error)) => match join_error.try_into_panic() { + Ok(panic_object) => panic::resume_unwind(panic_object), + Err(cancelled_error) => info!( + "Parallel RPC server with conflicting port should exit with an error: \ + but we're ok with it ignoring the conflict for now: \ + unexpected JoinError: {cancelled_error:?}" + ), + }, + } +} diff --git a/zebrad/src/config.rs b/zebrad/src/config.rs index 5b42a0c8..da5c2322 100644 --- a/zebrad/src/config.rs +++ b/zebrad/src/config.rs @@ -213,7 +213,7 @@ pub struct SyncSection { /// The number of threads used to verify signatures, proofs, and other CPU-intensive code. /// /// Set to `0` by default, which uses one thread per available CPU core. - /// For details, see [the rayon documentation](https://docs.rs/rayon/latest/rayon/struct.ThreadPoolBuilder.html#method.num_threads). + /// For details, see [the `rayon` documentation](https://docs.rs/rayon/latest/rayon/struct.ThreadPoolBuilder.html#method.num_threads). pub parallel_cpu_threads: usize, } diff --git a/zebrad/tests/acceptance.rs b/zebrad/tests/acceptance.rs index d278b564..47058a2e 100644 --- a/zebrad/tests/acceptance.rs +++ b/zebrad/tests/acceptance.rs @@ -1145,8 +1145,25 @@ async fn tracing_endpoint() -> Result<()> { Ok(()) } +/// Test that the JSON-RPC endpoint responds to a request, +/// when configured with a single thread. #[tokio::test] -async fn rpc_endpoint() -> Result<()> { +async fn rpc_endpoint_single_thread() -> Result<()> { + rpc_endpoint(false).await +} + +/// Test that the JSON-RPC endpoint responds to a request, +/// when configured with multiple threads. +#[tokio::test] +async fn rpc_endpoint_parallel_threads() -> Result<()> { + rpc_endpoint(true).await +} + +/// Test that the JSON-RPC endpoint responds to a request. +/// +/// Set `parallel_cpu_threads` to true to auto-configure based on the number of CPU cores. +#[tracing::instrument] +async fn rpc_endpoint(parallel_cpu_threads: bool) -> Result<()> { use hyper::{body::to_bytes, Body, Client, Method, Request}; use serde_json::Value; @@ -1157,7 +1174,7 @@ async fn rpc_endpoint() -> Result<()> { // Write a configuration that has RPC listen_addr set // [Note on port conflict](#Note on port conflict) - let mut config = random_known_rpc_port_config()?; + let mut config = random_known_rpc_port_config(parallel_cpu_threads)?; let url = format!("http://{}", config.rpc.listen_addr.unwrap()); let dir = testdir()?.with_config(&mut config)?; @@ -1648,7 +1665,9 @@ fn zebra_rpc_conflict() -> Result<()> { // Write a configuration that has RPC listen_addr set // [Note on port conflict](#Note on port conflict) - let mut config = random_known_rpc_port_config()?; + // + // This is the required setting to detect port conflicts. + let mut config = random_known_rpc_port_config(false)?; let dir1 = testdir()?.with_config(&mut config)?; let regex1 = regex::escape(&format!( diff --git a/zebrad/tests/common/launch.rs b/zebrad/tests/common/launch.rs index 03e9a372..6231200e 100644 --- a/zebrad/tests/common/launch.rs +++ b/zebrad/tests/common/launch.rs @@ -215,7 +215,8 @@ pub fn spawn_zebrad_for_rpc_without_initial_peers Result<(TestChild

, SocketAddr)> { - let mut config = random_known_rpc_port_config() + // This is what we recommend our users configure. + let mut config = random_known_rpc_port_config(true) .expect("Failed to create a config file with a known RPC listener port"); config.state.ephemeral = false; diff --git a/zebrad/tests/common/lightwalletd.rs b/zebrad/tests/common/lightwalletd.rs index 13f79716..d740d397 100644 --- a/zebrad/tests/common/lightwalletd.rs +++ b/zebrad/tests/common/lightwalletd.rs @@ -83,7 +83,9 @@ pub fn zebra_skip_lightwalletd_tests() -> bool { } /// Returns a `zebrad` config with a random known RPC port. -pub fn random_known_rpc_port_config() -> Result { +/// +/// Set `parallel_cpu_threads` to true to auto-configure based on the number of CPU cores. +pub fn random_known_rpc_port_config(parallel_cpu_threads: bool) -> Result { // [Note on port conflict](#Note on port conflict) let listen_port = random_known_port(); let listen_ip = "127.0.0.1".parse().expect("hard-coded IP is valid"); @@ -93,6 +95,13 @@ pub fn random_known_rpc_port_config() -> Result { // TODO: split this config into another function? let mut config = default_test_config()?; config.rpc.listen_addr = Some(zebra_rpc_listener); + if parallel_cpu_threads { + // Auto-configure to the number of CPU cores: most users configre this + config.rpc.parallel_cpu_threads = 0; + } else { + // Default config, users who want to detect port conflicts configure this + config.rpc.parallel_cpu_threads = 1; + } Ok(config) } @@ -306,7 +315,8 @@ impl LightwalletdTestType { /// and `Some(Err(_))` if the config could not be created. pub fn zebrad_config(&self, test_name: String) -> Option> { let config = if self.launches_lightwalletd() { - random_known_rpc_port_config() + // This is what we recommend our users configure. + random_known_rpc_port_config(true) } else { default_test_config() };