diff --git a/zebra-rpc/src/server.rs b/zebra-rpc/src/server.rs index dec2ee3f..c63594df 100644 --- a/zebra-rpc/src/server.rs +++ b/zebra-rpc/src/server.rs @@ -7,6 +7,8 @@ //! See the full list of //! [Differences between JSON-RPC 1.0 and 2.0.](https://www.simple-is-better.org/rpc/#differences-between-1-0-and-2-0) +use std::panic; + use jsonrpc_core::{Compatibility, MetaIoHandler}; use jsonrpc_http_server::ServerBuilder; use tokio::task::JoinHandle; @@ -26,6 +28,9 @@ use crate::{ pub mod compatibility; mod tracing_middleware; +#[cfg(test)] +mod tests; + /// Zebra RPC Server #[derive(Clone, Debug)] pub struct RpcServer; @@ -68,20 +73,30 @@ impl RpcServer { MetaIoHandler::new(Compatibility::Both, TracingMiddleware); io.extend_with(rpc_impl.to_delegate()); - let server = ServerBuilder::new(io) - // use the same tokio executor as the rest of Zebra - .event_loop_executor(tokio::runtime::Handle::current()) - .threads(1) - // TODO: disable this security check if we see errors from lightwalletd. - //.allowed_hosts(DomainsValidation::Disabled) - .request_middleware(FixHttpRequestMiddleware) - .start_http(&listen_addr) - .expect("Unable to start RPC server"); - - // The server is a blocking task, so we need to spawn it on a blocking thread. + // The server is a blocking task, which blocks on executor shutdown. + // So we need to create and spawn it on a std::thread, inside a tokio blocking task. + // (Otherwise tokio panics when we shut down the RPC server.) let span = Span::current(); let server = move || { span.in_scope(|| { + // Use a different tokio executor from the rest of Zebra, + // so that large RPCs and any task handling bugs don't impact Zebra. + // + // TODO: + // - return server.close_handle(), which can shut down the RPC server, + // and add it to the server tests + // - allow multiple RPC threads + // (when jsonrpc_http_server has multiple threads, + // it lets any process share its port - do we need to warn users?) + // - make the number of RPC threads configurable + let server = ServerBuilder::new(io) + .threads(1) + // TODO: disable this security check if we see errors from lightwalletd + //.allowed_hosts(DomainsValidation::Disabled) + .request_middleware(FixHttpRequestMiddleware) + .start_http(&listen_addr) + .expect("Unable to start RPC server"); + info!("Opened RPC endpoint at {}", server.address()); server.wait(); @@ -91,7 +106,15 @@ impl RpcServer { }; ( - tokio::task::spawn_blocking(server), + tokio::task::spawn_blocking(|| { + let thread_handle = std::thread::spawn(server); + + // Propagate panics from the inner std::thread to the outer tokio blocking task + match thread_handle.join() { + Ok(()) => (), + Err(panic_object) => panic::resume_unwind(panic_object), + } + }), rpc_tx_queue_task_handle, ) } else { diff --git a/zebra-rpc/src/server/tests.rs b/zebra-rpc/src/server/tests.rs new file mode 100644 index 00000000..85cb8476 --- /dev/null +++ b/zebra-rpc/src/server/tests.rs @@ -0,0 +1,3 @@ +//! RPC server tests. + +mod vectors; diff --git a/zebra-rpc/src/server/tests/vectors.rs b/zebra-rpc/src/server/tests/vectors.rs new file mode 100644 index 00000000..465cec54 --- /dev/null +++ b/zebra-rpc/src/server/tests/vectors.rs @@ -0,0 +1,201 @@ +//! Fixed test vectors for the RPC server. + +use std::{ + net::{Ipv4Addr, SocketAddrV4}, + time::Duration, +}; + +use futures::FutureExt; +use tower::buffer::Buffer; + +use zebra_chain::{chain_tip::NoChainTip, parameters::Network::*}; +use zebra_node_services::BoxError; + +use zebra_test::mock_service::MockService; + +use super::super::*; + +/// Test if the RPC server will spawn on a randomly generated port. +#[test] +fn rpc_server_spawn() { + let _init_guard = zebra_test::init(); + + let port = zebra_test::net::random_known_port(); + let config = Config { + listen_addr: Some(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()), + }; + + let rt = tokio::runtime::Runtime::new().unwrap(); + + rt.block_on(async { + let mut mempool: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); + let mut state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); + + info!("spawning RPC server..."); + + let (rpc_server_task_handle, rpc_tx_queue_task_handle) = RpcServer::spawn( + config, + "RPC server test", + Buffer::new(mempool.clone(), 1), + Buffer::new(state.clone(), 1), + NoChainTip, + Mainnet, + ); + + info!("spawned RPC server, checking services..."); + + mempool.expect_no_requests().await; + state.expect_no_requests().await; + + // The server and queue tasks should continue without errors or panics + let rpc_server_task_result = rpc_server_task_handle.now_or_never(); + assert!(matches!(rpc_server_task_result, None)); + + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + assert!(matches!(rpc_tx_queue_task_result, None)); + + // TODO: when we return server.close_handle(), use it to shut down the server here, + // and remove the shutdown timeout + }); + + info!("waiting for RPC server to shut down..."); + rt.shutdown_timeout(Duration::from_secs(1)); +} + +/// Test if the RPC server will spawn on an OS-assigned unallocated port. +#[test] +fn rpc_server_spawn_unallocated_port() { + let _init_guard = zebra_test::init(); + + let port = zebra_test::net::random_unallocated_port(); + let config = Config { + listen_addr: Some(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()), + }; + + let rt = tokio::runtime::Runtime::new().unwrap(); + + rt.block_on(async { + let mut mempool: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); + let mut state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); + + info!("spawning RPC server..."); + + let (rpc_server_task_handle, rpc_tx_queue_task_handle) = RpcServer::spawn( + config, + "RPC server test", + Buffer::new(mempool.clone(), 1), + Buffer::new(state.clone(), 1), + NoChainTip, + Mainnet, + ); + + info!("spawned RPC server, checking services..."); + + mempool.expect_no_requests().await; + state.expect_no_requests().await; + + // The server and queue tasks should continue without errors or panics + let rpc_server_task_result = rpc_server_task_handle.now_or_never(); + assert!(matches!(rpc_server_task_result, None)); + + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + assert!(matches!(rpc_tx_queue_task_result, None)); + + // TODO: when we return server.close_handle(), use it to shut down the server here + // and remove the shutdown timeout + }); + + info!("waiting for RPC server to shut down..."); + rt.shutdown_timeout(Duration::from_secs(1)); +} + +/// Test if the RPC server will panic correctly when there is a port conflict. +/// +/// TODO: update this test when the number of threads is configurable +/// (when jsonrpc_http_server has multiple threads, it lets any process share its port!) +#[test] +#[should_panic(expected = "Unable to start RPC server")] +fn rpc_server_spawn_port_conflict() { + let _init_guard = zebra_test::init(); + + let port = zebra_test::net::random_known_port(); + let config = Config { + listen_addr: Some(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()), + }; + + let rt = tokio::runtime::Runtime::new().unwrap(); + + let test_task_handle = rt.spawn(async { + let mut mempool: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); + let mut state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); + + info!("spawning RPC server 1..."); + + let (_rpc_server_1_task_handle, _rpc_tx_queue_1_task_handle) = RpcServer::spawn( + config.clone(), + "RPC server 1 test", + Buffer::new(mempool.clone(), 1), + Buffer::new(state.clone(), 1), + NoChainTip, + Mainnet, + ); + + tokio::time::sleep(Duration::from_secs(3)).await; + + info!("spawning conflicted RPC server 2..."); + + let (rpc_server_2_task_handle, _rpc_tx_queue_2_task_handle) = RpcServer::spawn( + config, + "RPC server 2 conflict test", + Buffer::new(mempool.clone(), 1), + Buffer::new(state.clone(), 1), + NoChainTip, + Mainnet, + ); + + info!("spawned RPC servers, checking services..."); + + mempool.expect_no_requests().await; + state.expect_no_requests().await; + + // Because there is a panic inside a multi-threaded executor, + // we can't depend on the exact behaviour of the other tasks, + // particularly across different machines and OSes. + + // The second server should panic, so its task handle should return the panic + let rpc_server_2_task_result = rpc_server_2_task_handle.await; + match rpc_server_2_task_result { + Ok(()) => panic!( + "RPC server with conflicting port should exit with an error: \ + unexpected Ok result" + ), + Err(join_error) => match join_error.try_into_panic() { + Ok(panic_object) => panic::resume_unwind(panic_object), + Err(cancelled_error) => panic!( + "RPC server with conflicting port should exit with an error: \ + unexpected JoinError: {cancelled_error:?}" + ), + }, + } + + // Ignore the queue task result + }); + + // Wait until the spawned task finishes + std::thread::sleep(Duration::from_secs(10)); + + info!("waiting for RPC server to shut down..."); + rt.shutdown_timeout(Duration::from_secs(3)); + + match test_task_handle.now_or_never() { + Some(Ok(_never)) => unreachable!("test task always panics"), + None => panic!("unexpected test task hang"), + Some(Err(join_error)) => match join_error.try_into_panic() { + Ok(panic_object) => panic::resume_unwind(panic_object), + Err(cancelled_error) => panic!( + "test task should exit with a RPC server panic: \ + unexpected non-panic JoinError: {cancelled_error:?}" + ), + }, + } +} diff --git a/zebra-rpc/src/tests/vectors.rs b/zebra-rpc/src/tests/vectors.rs index 1779334d..45eb4e2d 100644 --- a/zebra-rpc/src/tests/vectors.rs +++ b/zebra-rpc/src/tests/vectors.rs @@ -1,3 +1,5 @@ +//! Fixed Zebra RPC serialization test vectors. + use crate::methods::{GetBlock, GetRawTransaction}; #[test]