From 074733d183c136ed219eeaf3b4587ccab59e0f58 Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 11 Nov 2022 00:51:53 +1000 Subject: [PATCH] fix(rpc): Shut down the RPC server properly when Zebra shuts down (#5591) * Make the queue runner task shut down when the RpcImpl is dropped * Move RPC server startup into the spawn() tokio future * Return a shutdown handle from the RPC spawn() method * Shut down the RPC server properly when Zebra shuts down * Add a changelog entry for this security fix * Call RpcServer::shutdown() when it is dropped, and wait * Block on RPC server shutdown when Zebra's tasks have an error --- CHANGELOG.md | 18 +++ zebra-rpc/src/methods.rs | 12 +- zebra-rpc/src/queue.rs | 47 ++++---- zebra-rpc/src/queue/tests/prop.rs | 14 +-- zebra-rpc/src/server.rs | 164 +++++++++++++++++++++----- zebra-rpc/src/server/tests/vectors.rs | 110 +++++++++++------ zebrad/src/commands/start.rs | 10 +- 7 files changed, 271 insertions(+), 104 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7731a93b..4e404446 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,24 @@ All notable changes to Zebra are documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org). +## [Zebra 1.0.0-rc.2](https://github.com/ZcashFoundation/zebra/releases/tag/v1.0.0-rc.2) - 2022-11-TODO + +Zebra's latest release continues work on mining pool RPCs, and fixes a rare RPC crash that could lead to memory corruption. + +Zebra's consensus rules, node sync, and `lightwalletd` RPCs are ready for user testing and experimental use. Zebra has not been audited yet. + +### Breaking Changes + +This release has the following breaking changes: +- TODO: search the changelog for breaking changes + +### Security + +- Fix a rare crash and memory errors when Zebra's RPC server shuts down ([#5591](https://github.com/ZcashFoundation/zebra/pull/5591)) + +TODO: the rest of the changelog + + ## [Zebra 1.0.0-rc.1](https://github.com/ZcashFoundation/zebra/releases/tag/v1.0.0-rc.1) - 2022-11-02 This is the second Zebra release candidate. Zebra's consensus rules, node sync, and `lightwalletd` RPCs are ready for user testing and experimental use. Zebra has not been audited yet. diff --git a/zebra-rpc/src/methods.rs b/zebra-rpc/src/methods.rs index bf8281fb..1577ab8d 100644 --- a/zebra-rpc/src/methods.rs +++ b/zebra-rpc/src/methods.rs @@ -14,7 +14,7 @@ use hex::{FromHex, ToHex}; use indexmap::IndexMap; use jsonrpc_core::{self, BoxFuture, Error, ErrorCode, Result}; use jsonrpc_derive::rpc; -use tokio::{sync::broadcast::Sender, task::JoinHandle}; +use tokio::{sync::broadcast, task::JoinHandle}; use tower::{buffer::Buffer, Service, ServiceExt}; use tracing::Instrument; @@ -278,8 +278,8 @@ where // Tasks // - /// A sender component of a channel used to send transactions to the queue. - queue_sender: Sender>, + /// A sender component of a channel used to send transactions to the mempool queue. + queue_sender: broadcast::Sender, } impl RpcImpl @@ -313,7 +313,7 @@ where >::Future: Send, >::Future: Send, { - let runner = Queue::start(); + let (runner, queue_sender) = Queue::start(); let mut app_version = app_version.to_string(); @@ -329,7 +329,7 @@ where mempool: mempool.clone(), state: state.clone(), latest_chain_tip: latest_chain_tip.clone(), - queue_sender: runner.sender(), + queue_sender, }; // run the process queue @@ -517,7 +517,7 @@ where // send transaction to the rpc queue, ignore any error. let unmined_transaction = UnminedTx::from(raw_transaction.clone()); - let _ = queue_sender.send(Some(unmined_transaction)); + let _ = queue_sender.send(unmined_transaction); let transaction_parameter = mempool::Gossip::Tx(raw_transaction.into()); let request = mempool::Request::Queue(vec![transaction_parameter]); diff --git a/zebra-rpc/src/queue.rs b/zebra-rpc/src/queue.rs index dd8b607f..4dc1a37a 100644 --- a/zebra-rpc/src/queue.rs +++ b/zebra-rpc/src/queue.rs @@ -13,7 +13,7 @@ use std::{collections::HashSet, sync::Arc}; use chrono::Duration; use indexmap::IndexMap; use tokio::{ - sync::broadcast::{channel, Receiver, Sender}, + sync::broadcast::{self, error::TryRecvError}, time::Instant, }; @@ -55,24 +55,26 @@ pub struct Queue { /// The runner will make the processing of the transactions in the queue. pub struct Runner { queue: Queue, - sender: Sender>, + receiver: broadcast::Receiver, tip_height: Height, } impl Queue { /// Start a new queue - pub fn start() -> Runner { - let (sender, _receiver) = channel(CHANNEL_AND_QUEUE_CAPACITY); + pub fn start() -> (Runner, broadcast::Sender) { + let (sender, receiver) = broadcast::channel(CHANNEL_AND_QUEUE_CAPACITY); let queue = Queue { transactions: IndexMap::new(), }; - Runner { + let runner = Runner { queue, - sender, + receiver, tip_height: Height(0), - } + }; + + (runner, sender) } /// Get the transactions in the queue. @@ -103,16 +105,6 @@ impl Queue { } impl Runner { - /// Create a new sender for this runner. - pub fn sender(&self) -> Sender> { - self.sender.clone() - } - - /// Create a new receiver. - pub fn receiver(&self) -> Receiver> { - self.sender.subscribe() - } - /// Get the queue transactions as a `HashSet` of unmined ids. fn transactions_as_hash_set(&self) -> HashSet { let transactions = self.queue.transactions(); @@ -157,8 +149,6 @@ impl Runner { + 'static, Tip: ChainTip + Clone + Send + Sync + 'static, { - let mut receiver = self.sender.subscribe(); - loop { // if we don't have a chain use `NO_CHAIN_TIP_HEIGHT` to get block spacing let tip_height = match tip.best_tip_height() { @@ -173,8 +163,23 @@ impl Runner { tokio::time::sleep(spacing.to_std().expect("should never be less than zero")).await; // get transactions from the channel - while let Ok(Some(tx)) = receiver.try_recv() { - let _ = &self.queue.insert(tx.clone()); + loop { + let tx = match self.receiver.try_recv() { + Ok(tx) => tx, + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Lagged(skipped_count)) => { + tracing::info!("sendrawtransaction queue was full: skipped {skipped_count} transactions"); + continue; + } + Err(TryRecvError::Closed) => { + tracing::info!( + "sendrawtransaction queue was closed: is Zebra shutting down?" + ); + return; + } + }; + + self.queue.insert(tx.clone()); } // skip some work if stored tip height is the same as the one arriving diff --git a/zebra-rpc/src/queue/tests/prop.rs b/zebra-rpc/src/queue/tests/prop.rs index 577d75f8..b2cdc9dc 100644 --- a/zebra-rpc/src/queue/tests/prop.rs +++ b/zebra-rpc/src/queue/tests/prop.rs @@ -34,7 +34,7 @@ proptest! { #[test] fn insert_remove_to_from_queue(transaction in any::()) { // create a queue - let mut runner = Queue::start(); + let (mut runner, _sender) = Queue::start(); // insert transaction runner.queue.insert(transaction.clone()); @@ -54,7 +54,7 @@ proptest! { #[test] fn queue_size_limit(transactions in any::<[UnminedTx; CHANNEL_AND_QUEUE_CAPACITY + 1]>()) { // create a queue - let mut runner = Queue::start(); + let (mut runner, _sender) = Queue::start(); // insert all transactions we have transactions.iter().for_each(|t| runner.queue.insert(t.clone())); @@ -68,7 +68,7 @@ proptest! { #[test] fn queue_order(transactions in any::<[UnminedTx; 32]>()) { // create a queue - let mut runner = Queue::start(); + let (mut runner, _sender) = Queue::start(); // fill the queue and check insertion order for i in 0..CHANNEL_AND_QUEUE_CAPACITY { let transaction = transactions[i].clone(); @@ -108,7 +108,7 @@ proptest! { time::pause(); // create a queue - let mut runner = Queue::start(); + let (mut runner, _sender) = Queue::start(); // insert a transaction to the queue runner.queue.insert(transaction); @@ -165,7 +165,7 @@ proptest! { let mut mempool = MockService::build().for_prop_tests(); // create a queue - let mut runner = Queue::start(); + let (mut runner, _sender) = Queue::start(); // insert a transaction to the queue let unmined_transaction = UnminedTx::from(transaction); @@ -246,7 +246,7 @@ proptest! { let mut write_state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); // create a queue - let mut runner = Queue::start(); + let (mut runner, _sender) = Queue::start(); // insert a transaction to the queue let unmined_transaction = UnminedTx::from(&transaction); @@ -320,7 +320,7 @@ proptest! { let mut mempool = MockService::build().for_prop_tests(); // create a queue - let mut runner = Queue::start(); + let (mut runner, _sender) = Queue::start(); // insert a transaction to the queue let unmined_transaction = UnminedTx::from(transaction.clone()); diff --git a/zebra-rpc/src/server.rs b/zebra-rpc/src/server.rs index 9dc6c957..080c4159 100644 --- a/zebra-rpc/src/server.rs +++ b/zebra-rpc/src/server.rs @@ -7,15 +7,14 @@ //! See the full list of //! [Differences between JSON-RPC 1.0 and 2.0.](https://www.simple-is-better.org/rpc/#differences-between-1-0-and-2-0) -use std::{panic, sync::Arc}; +use std::{fmt, panic, sync::Arc}; use jsonrpc_core::{Compatibility, MetaIoHandler}; -use jsonrpc_http_server::ServerBuilder; +use jsonrpc_http_server::{CloseHandle, ServerBuilder}; use tokio::task::JoinHandle; use tower::{buffer::Buffer, Service}; -use tracing::*; -use tracing_futures::Instrument; +use tracing::{Instrument, *}; use zebra_chain::{ block::{self, Block}, @@ -40,11 +39,35 @@ mod tracing_middleware; mod tests; /// Zebra RPC Server -#[derive(Clone, Debug)] -pub struct RpcServer; +#[derive(Clone)] +pub struct RpcServer { + config: Config, + network: Network, + app_version: String, + close_handle: CloseHandle, +} + +impl fmt::Debug for RpcServer { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("RpcServer") + .field("config", &self.config) + .field("network", &self.network) + .field("app_version", &self.app_version) + .field( + "close_handle", + // TODO: when it stabilises, use std::any::type_name_of_val(&self.close_handle) + &"CloseHandle", + ) + .finish() + } +} impl RpcServer { - /// Start a new RPC server endpoint. + /// Start a new RPC server endpoint using the supplied configs and services. + /// `app_version` is a version string for the application, which is used in RPC responses. + /// + /// Returns [`JoinHandle`]s for the RPC server and `sendrawtransaction` queue tasks, + /// and a [`RpcServer`] handle, which can be used to shut down the RPC server task. // // TODO: put some of the configs or services in their own struct? #[allow(clippy::too_many_arguments)] @@ -62,9 +85,9 @@ impl RpcServer { chain_verifier: ChainVerifier, latest_chain_tip: Tip, network: Network, - ) -> (JoinHandle<()>, JoinHandle<()>) + ) -> (JoinHandle<()>, JoinHandle<()>, Option) where - Version: ToString + Clone, + Version: ToString + Clone + Send + 'static, Mempool: tower::Service< mempool::Request, Response = mempool::Response, @@ -112,7 +135,7 @@ impl RpcServer { // Initialize the rpc methods with the zebra version let (rpc_impl, rpc_tx_queue_task_handle) = RpcImpl::new( - app_version, + app_version.clone(), network, config.debug_force_finished_sync, mempool, @@ -129,18 +152,14 @@ impl RpcServer { } // The server is a blocking task, which blocks on executor shutdown. - // So we need to create and spawn it on a std::thread, inside a tokio blocking task. - // (Otherwise tokio panics when we shut down the RPC server.) + // So we need to start it in a std::thread. + // (Otherwise tokio panics on RPC port conflict, which shuts down the RPC server.) let span = Span::current(); - let server = move || { + let start_server = move || { span.in_scope(|| { // Use a different tokio executor from the rest of Zebra, // so that large RPCs and any task handling bugs don't impact Zebra. - // - // TODO: - // - return server.close_handle(), which can shut down the RPC server, - // and add it to the server tests - let server = ServerBuilder::new(io) + let server_instance = ServerBuilder::new(io) .threads(parallel_cpu_threads) // TODO: disable this security check if we see errors from lightwalletd //.allowed_hosts(DomainsValidation::Disabled) @@ -148,32 +167,117 @@ impl RpcServer { .start_http(&listen_addr) .expect("Unable to start RPC server"); - info!("Opened RPC endpoint at {}", server.address()); + info!("Opened RPC endpoint at {}", server_instance.address()); - server.wait(); + let close_handle = server_instance.close_handle(); - info!("Stopping RPC endpoint"); + let rpc_server_handle = RpcServer { + config, + network, + app_version: app_version.to_string(), + close_handle, + }; + + (server_instance, rpc_server_handle) }) }; - ( - tokio::task::spawn_blocking(|| { - let thread_handle = std::thread::spawn(server); + // Propagate panics from the std::thread + let (server_instance, rpc_server_handle) = match std::thread::spawn(start_server).join() + { + Ok(rpc_server) => rpc_server, + Err(panic_object) => panic::resume_unwind(panic_object), + }; - // Propagate panics from the inner std::thread to the outer tokio blocking task - match thread_handle.join() { - Ok(()) => (), - Err(panic_object) => panic::resume_unwind(panic_object), - } - }), + // The server is a blocking task, which blocks on executor shutdown. + // So we need to wait on it on a std::thread, inside a tokio blocking task. + // (Otherwise tokio panics when we shut down the RPC server.) + let span = Span::current(); + let wait_on_server = move || { + span.in_scope(|| { + server_instance.wait(); + + info!("Stopped RPC endpoint"); + }) + }; + + let span = Span::current(); + let rpc_server_task_handle = tokio::task::spawn_blocking(move || { + let thread_handle = std::thread::spawn(wait_on_server); + + // Propagate panics from the inner std::thread to the outer tokio blocking task + span.in_scope(|| match thread_handle.join() { + Ok(()) => (), + Err(panic_object) => panic::resume_unwind(panic_object), + }) + }); + + ( + rpc_server_task_handle, rpc_tx_queue_task_handle, + Some(rpc_server_handle), ) } else { // There is no RPC port, so the RPC tasks do nothing. ( tokio::task::spawn(futures::future::pending().in_current_span()), tokio::task::spawn(futures::future::pending().in_current_span()), + None, ) } } + + /// Shut down this RPC server, blocking the current thread. + /// + /// This method can be called from within a tokio executor without panicking. + /// But it is blocking, so `shutdown()` should be used instead. + pub fn shutdown_blocking(&self) { + Self::shutdown_blocking_inner(self.close_handle.clone()) + } + + /// Shut down this RPC server asynchronously. + /// Returns a task that completes when the server is shut down. + pub fn shutdown(&self) -> JoinHandle<()> { + let close_handle = self.close_handle.clone(); + + let span = Span::current(); + tokio::task::spawn_blocking(move || { + span.in_scope(|| Self::shutdown_blocking_inner(close_handle)) + }) + } + + /// Shuts down this RPC server using its `close_handle`. + /// + /// See `shutdown_blocking()` for details. + fn shutdown_blocking_inner(close_handle: CloseHandle) { + // The server is a blocking task, so it can't run inside a tokio thread. + // See the note at wait_on_server. + let span = Span::current(); + let wait_on_shutdown = move || { + span.in_scope(|| { + info!("Stopping RPC server"); + close_handle.clone().close(); + info!("Stopped RPC server"); + }) + }; + + let span = Span::current(); + let thread_handle = std::thread::spawn(wait_on_shutdown); + + // Propagate panics from the inner std::thread to the outer tokio blocking task + span.in_scope(|| match thread_handle.join() { + Ok(()) => (), + Err(panic_object) => panic::resume_unwind(panic_object), + }) + } +} + +impl Drop for RpcServer { + fn drop(&mut self) { + // Block on shutting down, propagating panics. + // This can take around 150 seconds. + // + // Without this shutdown, Zebra's RPC unit tests sometimes crashed with memory errors. + self.shutdown_blocking(); + } } diff --git a/zebra-rpc/src/server/tests/vectors.rs b/zebra-rpc/src/server/tests/vectors.rs index 70325690..1588c8d8 100644 --- a/zebra-rpc/src/server/tests/vectors.rs +++ b/zebra-rpc/src/server/tests/vectors.rs @@ -51,7 +51,7 @@ fn rpc_server_spawn(parallel_cpu_threads: bool) { info!("spawning RPC server..."); - let (rpc_server_task_handle, rpc_tx_queue_task_handle) = RpcServer::spawn( + let (rpc_server_task_handle, rpc_tx_queue_task_handle, _rpc_server) = RpcServer::spawn( config, Default::default(), "RPC server test", @@ -74,9 +74,6 @@ fn rpc_server_spawn(parallel_cpu_threads: bool) { let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); assert!(matches!(rpc_tx_queue_task_result, None)); - - // TODO: when we return server.close_handle(), use it to shut down the server here, - // and remove the shutdown timeout }); info!("waiting for RPC server to shut down..."); @@ -87,21 +84,36 @@ fn rpc_server_spawn(parallel_cpu_threads: bool) { /// on an OS-assigned unallocated port. #[test] fn rpc_server_spawn_unallocated_port_single_thread() { - rpc_server_spawn_unallocated_port(false) + rpc_server_spawn_unallocated_port(false, false) } -/// Test that the JSON-RPC server spawn when configured with multiple threads, +/// Test that the JSON-RPC server spawns and shuts down when configured with a single thread, +/// on an OS-assigned unallocated port. +#[test] +fn rpc_server_spawn_unallocated_port_single_thread_shutdown() { + rpc_server_spawn_unallocated_port(false, true) +} + +/// Test that the JSON-RPC server spawns when configured with multiple threads, /// on an OS-assigned unallocated port. #[test] fn rpc_sever_spawn_unallocated_port_parallel_threads() { - rpc_server_spawn_unallocated_port(true) + rpc_server_spawn_unallocated_port(true, false) +} + +/// Test that the JSON-RPC server spawns and shuts down when configured with multiple threads, +/// on an OS-assigned unallocated port. +#[test] +fn rpc_sever_spawn_unallocated_port_parallel_threads_shutdown() { + rpc_server_spawn_unallocated_port(true, true) } /// Test if the RPC server will spawn on an OS-assigned unallocated port. /// -/// Set `parallel_cpu_threads` to true to auto-configure based on the number of CPU cores. +/// Set `parallel_cpu_threads` to true to auto-configure based on the number of CPU cores, +/// and `do_shutdown` to true to close the server using the close handle. #[tracing::instrument] -fn rpc_server_spawn_unallocated_port(parallel_cpu_threads: bool) { +fn rpc_server_spawn_unallocated_port(parallel_cpu_threads: bool, do_shutdown: bool) { let _init_guard = zebra_test::init(); let port = zebra_test::net::random_unallocated_port(); @@ -123,7 +135,7 @@ fn rpc_server_spawn_unallocated_port(parallel_cpu_threads: bool) { info!("spawning RPC server..."); - let (rpc_server_task_handle, rpc_tx_queue_task_handle) = RpcServer::spawn( + let (rpc_server_task_handle, rpc_tx_queue_task_handle, rpc_server) = RpcServer::spawn( config, Default::default(), "RPC server test", @@ -140,15 +152,33 @@ fn rpc_server_spawn_unallocated_port(parallel_cpu_threads: bool) { state.expect_no_requests().await; chain_verifier.expect_no_requests().await; - // The server and queue tasks should continue without errors or panics - let rpc_server_task_result = rpc_server_task_handle.now_or_never(); - assert!(matches!(rpc_server_task_result, None)); + if do_shutdown { + rpc_server + .expect("unexpected missing RpcServer for configured RPC port") + .shutdown() + .await + .expect("unexpected panic during RpcServer shutdown"); - let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); - assert!(matches!(rpc_tx_queue_task_result, None)); + // The server and queue tasks should shut down without errors or panics + let rpc_server_task_result = rpc_server_task_handle.await; + assert!( + matches!(rpc_server_task_result, Ok(())), + "unexpected server task panic during shutdown: {rpc_server_task_result:?}" + ); - // TODO: when we return server.close_handle(), use it to shut down the server here - // and remove the shutdown timeout + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.await; + assert!( + matches!(rpc_tx_queue_task_result, Ok(())), + "unexpected queue task panic during shutdown: {rpc_tx_queue_task_result:?}" + ); + } else { + // The server and queue tasks should continue without errors or panics + let rpc_server_task_result = rpc_server_task_handle.now_or_never(); + assert!(matches!(rpc_server_task_result, None)); + + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + assert!(matches!(rpc_tx_queue_task_result, None)); + } }); info!("waiting for RPC server to shut down..."); @@ -182,22 +212,23 @@ fn rpc_server_spawn_port_conflict() { info!("spawning RPC server 1..."); - let (_rpc_server_1_task_handle, _rpc_tx_queue_1_task_handle) = RpcServer::spawn( - config.clone(), - Default::default(), - "RPC server 1 test", - Buffer::new(mempool.clone(), 1), - Buffer::new(state.clone(), 1), - Buffer::new(chain_verifier.clone(), 1), - NoChainTip, - Mainnet, - ); + let (_rpc_server_1_task_handle, _rpc_tx_queue_1_task_handle, _rpc_server) = + RpcServer::spawn( + config.clone(), + Default::default(), + "RPC server 1 test", + Buffer::new(mempool.clone(), 1), + Buffer::new(state.clone(), 1), + Buffer::new(chain_verifier.clone(), 1), + NoChainTip, + Mainnet, + ); tokio::time::sleep(Duration::from_secs(3)).await; info!("spawning conflicted RPC server 2..."); - let (rpc_server_2_task_handle, _rpc_tx_queue_2_task_handle) = RpcServer::spawn( + let (rpc_server_2_task_handle, _rpc_tx_queue_2_task_handle, _rpc_server) = RpcServer::spawn( config, Default::default(), "RPC server 2 conflict test", @@ -285,22 +316,23 @@ fn rpc_server_spawn_port_conflict_parallel_auto() { info!("spawning parallel RPC server 1..."); - let (_rpc_server_1_task_handle, _rpc_tx_queue_1_task_handle) = RpcServer::spawn( - config.clone(), - Default::default(), - "RPC server 1 test", - Buffer::new(mempool.clone(), 1), - Buffer::new(state.clone(), 1), - Buffer::new(chain_verifier.clone(), 1), - NoChainTip, - Mainnet, - ); + let (_rpc_server_1_task_handle, _rpc_tx_queue_1_task_handle, _rpc_server) = + RpcServer::spawn( + config.clone(), + Default::default(), + "RPC server 1 test", + Buffer::new(mempool.clone(), 1), + Buffer::new(state.clone(), 1), + Buffer::new(chain_verifier.clone(), 1), + NoChainTip, + Mainnet, + ); tokio::time::sleep(Duration::from_secs(3)).await; info!("spawning parallel conflicted RPC server 2..."); - let (rpc_server_2_task_handle, _rpc_tx_queue_2_task_handle) = RpcServer::spawn( + let (rpc_server_2_task_handle, _rpc_tx_queue_2_task_handle, _rpc_server) = RpcServer::spawn( config, Default::default(), "RPC server 2 conflict test", diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index 6ff6a9e8..b4de3e77 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -176,7 +176,7 @@ impl StartCmd { .service(mempool); // Launch RPC server - let (rpc_task_handle, rpc_tx_queue_task_handle) = RpcServer::spawn( + let (rpc_task_handle, rpc_tx_queue_task_handle, rpc_server) = RpcServer::spawn( config.rpc, #[cfg(feature = "getblocktemplate-rpcs")] config.mining, @@ -356,6 +356,14 @@ impl StartCmd { groth16_download_handle.abort(); old_databases_task_handle.abort(); + // Wait until the RPC server shuts down. + // This can take around 150 seconds. + // + // Without this shutdown, Zebra's RPC unit tests sometimes crashed with memory errors. + if let Some(rpc_server) = rpc_server { + rpc_server.shutdown_blocking(); + } + exit_status }