From ec207cfa95ee096d7b4bca642744fe80e8652752 Mon Sep 17 00:00:00 2001 From: Janito Vaqueiro Ferreira Filho Date: Thu, 20 Jan 2022 05:14:16 -0300 Subject: [PATCH] Ignore unexpected block responses to fix error cascade when synchronizing blocks (#3374) * Refactor setup of `Connection` test vectors Add a `new_test_connection` helper function to create a `Connection` instance that's ready for testing. * Check that no inbound requests are sent Return the mock inbound service from `new_test_connection` and assert that no requests were sent to it in any test. * Replace `&mut Vec` with an `mpsc` channel Make it easier to run the connection task in the background, i.e., remove any lifetime constraints from the borrowed buffer so that `Connection` is `'static`. It's now also easier to assert on individual messages sent from the `Connection` instance. * Make `MockServiceBuilder::finish` public Allow test functions to be generic when creating a `MockService`, so that caller functions actually determine if the type of `MockService` assertions. * Move `new_test_connection` to parent module Make it more generic so that it can be used later in property tests as well. * Derive `Eq` and `PartialEq` for network `Response` Allow intercepted `Response` instances to be easily compared in tests. * Test block request cancel causes an error cascade This is the scenario that caused the block synchronizer to reset every few minutes, which made it considerably slower. * Ignore unexpected block responses It's likely that it's just a response for a previously cancelled block request. --- zebra-network/src/peer/connection.rs | 29 +- zebra-network/src/peer/connection/tests.rs | 64 ++++ .../src/peer/connection/tests/prop.rs | 154 +++++++++ .../src/peer/connection/tests/vectors.rs | 293 +++++------------- .../src/protocol/internal/response.rs | 2 +- zebra-test/src/mock_service.rs | 2 +- 6 files changed, 318 insertions(+), 226 deletions(-) create mode 100644 zebra-network/src/peer/connection/tests/prop.rs diff --git a/zebra-network/src/peer/connection.rs b/zebra-network/src/peer/connection.rs index 20c02a66..eb1de91a 100644 --- a/zebra-network/src/peer/connection.rs +++ b/zebra-network/src/peer/connection.rs @@ -267,28 +267,31 @@ impl Handler { // We got a block we didn't ask for. // // So either: - // 1. The peer doesn't know any of the blocks we asked for. + // 1. The response is for a previously cancelled block request. + // We should ignore that block, and wait for the actual response. + // 2. The peer doesn't know any of the blocks we asked for. // We should cancel the request, so we don't hang waiting for blocks that // will never arrive. - // 2. The peer sent an unsolicited block. + // 3. The peer sent an unsolicited block. // We should ignore that block, and wait for the actual response. // - // We end the request, so we don't hang on forked or lagging peers (case 1). - // But we keep the connection open, so the inbound service can process blocks - // from good peers (case 2). + // We ignore the message, so we don't desynchronize with the peer. This happens + // when we cancel a request and send a second different request, but receive a + // response for the first request. If we ended the request then, we could send + // a third request to the peer, and end up having to end that request as well + // when the response for the second request arrives. + // + // Ignoring the message gives us a chance to synchronize back to the correct + // request. ignored_msg = Some(Message::Block(block)); if !blocks.is_empty() { // TODO: does the caller need a list of missing blocks? (#1515) Handler::Finished(Ok(Response::Blocks(blocks))) } else { - // TODO: is it really an error if we ask for a block hash, but the peer - // doesn't know it? Should we close the connection on that kind of error? - // Should we fake a NotFound response here? (#1515) - let items = pending_hashes - .iter() - .map(|h| InventoryHash::Block(*h)) - .collect(); - Handler::Finished(Err(PeerError::NotFound(items))) + Handler::BlocksByHash { + pending_hashes, + blocks, + } } } } diff --git a/zebra-network/src/peer/connection/tests.rs b/zebra-network/src/peer/connection/tests.rs index d82199e8..bf843466 100644 --- a/zebra-network/src/peer/connection/tests.rs +++ b/zebra-network/src/peer/connection/tests.rs @@ -1,3 +1,67 @@ //! Tests for peer connections +use std::io; + +use futures::{channel::mpsc, sink::SinkMapErr, SinkExt}; + +use zebra_chain::serialization::SerializationError; +use zebra_test::mock_service::MockService; + +use crate::{ + peer::{ + client::ClientRequestReceiver, connection::State, ClientRequest, Connection, ErrorSlot, + }, + peer_set::ActiveConnectionCounter, + protocol::external::Message, + Request, Response, +}; + +mod prop; mod vectors; + +/// Creates a new [`Connection`] instance for testing. +fn new_test_connection() -> ( + Connection< + MockService, + SinkMapErr, fn(mpsc::SendError) -> SerializationError>, + >, + mpsc::Sender, + MockService, + mpsc::UnboundedReceiver, + ErrorSlot, +) { + let mock_inbound_service = MockService::build().finish(); + let (client_tx, client_rx) = mpsc::channel(1); + let shared_error_slot = ErrorSlot::default(); + let (peer_outbound_tx, peer_outbound_rx) = mpsc::unbounded(); + + let error_converter: fn(mpsc::SendError) -> SerializationError = |_| { + io::Error::new( + io::ErrorKind::BrokenPipe, + "peer outbound message stream was closed", + ) + .into() + }; + let peer_tx = peer_outbound_tx.sink_map_err(error_converter); + + let connection = Connection { + state: State::AwaitingRequest, + request_timer: None, + cached_addrs: Vec::new(), + svc: mock_inbound_service.clone(), + client_rx: ClientRequestReceiver::from(client_rx), + error_slot: shared_error_slot.clone(), + peer_tx, + connection_tracker: ActiveConnectionCounter::new_counter().track_connection(), + metrics_label: "test".to_string(), + last_metrics_state: None, + }; + + ( + connection, + client_tx, + mock_inbound_service, + peer_outbound_rx, + shared_error_slot, + ) +} diff --git a/zebra-network/src/peer/connection/tests/prop.rs b/zebra-network/src/peer/connection/tests/prop.rs new file mode 100644 index 00000000..5c548d26 --- /dev/null +++ b/zebra-network/src/peer/connection/tests/prop.rs @@ -0,0 +1,154 @@ +use std::{collections::HashSet, env, mem, sync::Arc}; + +use futures::{ + channel::{mpsc, oneshot}, + sink::SinkMapErr, + SinkExt, StreamExt, +}; +use proptest::prelude::*; +use tracing::Span; + +use zebra_chain::{ + block::{self, Block}, + serialization::SerializationError, +}; +use zebra_test::mock_service::{MockService, PropTestAssertion}; + +use crate::{ + peer::{connection::Connection, ClientRequest, ErrorSlot}, + protocol::external::Message, + Request, Response, SharedPeerError, +}; + +proptest! { + // The default value of proptest cases (256) causes this test to take more than ten seconds on + // most machines, so this reduces the value a little to reduce the test time. + // Set the PROPTEST_CASES env var to override this default. + #![proptest_config( + proptest::test_runner::Config::with_cases(env::var("PROPTEST_CASES") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(32)) + )] + + #[test] + fn connection_is_not_desynchronized_when_request_is_cancelled( + first_block in any::>(), + second_block in any::>(), + ) { + let runtime = zebra_test::init_async(); + + runtime.block_on(async move { + // The real stream and sink are from a split TCP connection, + // but that doesn't change how the state machine behaves. + let (mut peer_inbound_tx, peer_inbound_rx) = mpsc::channel(1); + + let ( + connection, + mut client_tx, + mut inbound_service, + mut peer_outbound_messages, + shared_error_slot, + ) = new_test_connection(); + + let connection_task = tokio::spawn(connection.run(peer_inbound_rx)); + + let response_to_first_request = send_block_request( + first_block.hash(), + &mut client_tx, + &mut peer_outbound_messages, + ) + .await; + + // Cancel first request. + mem::drop(response_to_first_request); + + let response_to_second_request = send_block_request( + second_block.hash(), + &mut client_tx, + &mut peer_outbound_messages, + ) + .await; + + // Reply to first request + peer_inbound_tx + .send(Ok(Message::Block(first_block))) + .await + .expect("Failed to send response to first block request"); + + // Reply to second request + peer_inbound_tx + .send(Ok(Message::Block(second_block.clone()))) + .await + .expect("Failed to send response to second block request"); + + // Check second response is correctly received + let receive_response_result = response_to_second_request.await; + + prop_assert!(receive_response_result.is_ok()); + let response_result = receive_response_result.unwrap(); + + prop_assert!(response_result.is_ok()); + let response = response_result.unwrap(); + + prop_assert_eq!(response, Response::Blocks(vec![second_block])); + + // Check the state after the response + let error = shared_error_slot.try_get_error(); + assert!(matches!(error, None)); + + inbound_service.expect_no_requests().await?; + + // Stop the connection thread + mem::drop(peer_inbound_tx); + + let connection_task_result = connection_task.await; + prop_assert!(connection_task_result.is_ok()); + + Ok(()) + })?; + } +} + +/// Creates a new [`Connection`] instance for property tests. +fn new_test_connection() -> ( + Connection< + MockService, + SinkMapErr, fn(mpsc::SendError) -> SerializationError>, + >, + mpsc::Sender, + MockService, + mpsc::UnboundedReceiver, + ErrorSlot, +) { + super::new_test_connection() +} + +async fn send_block_request( + block: block::Hash, + client_requests: &mut mpsc::Sender, + outbound_messages: &mut mpsc::UnboundedReceiver, +) -> oneshot::Receiver> { + let (response_sender, response_receiver) = oneshot::channel(); + + let request = Request::BlocksByHash(HashSet::from_iter([block])); + let client_request = ClientRequest { + request, + tx: response_sender, + span: Span::none(), + }; + + client_requests + .send(client_request) + .await + .expect("failed to send block request to connection task"); + + let request_message = outbound_messages + .next() + .await + .expect("First block request message not sent"); + + assert_eq!(request_message, Message::GetData(vec![block.into()])); + + response_receiver +} diff --git a/zebra-network/src/peer/connection/tests/vectors.rs b/zebra-network/src/peer/connection/tests/vectors.rs index 1964e912..d4ba0060 100644 --- a/zebra-network/src/peer/connection/tests/vectors.rs +++ b/zebra-network/src/peer/connection/tests/vectors.rs @@ -4,54 +4,30 @@ //! - connection tests when awaiting requests (#3232) //! - connection tests with closed/dropped peer_outbound_tx (#3233) -use futures::{channel::mpsc, FutureExt}; -use tokio_util::codec::FramedWrite; -use tower::service_fn; -use zebra_chain::parameters::Network; +use futures::{channel::mpsc, sink::SinkMapErr, FutureExt, StreamExt}; + +use zebra_chain::serialization::SerializationError; +use zebra_test::mock_service::{MockService, PanicAssertion}; use crate::{ - peer::{client::ClientRequestReceiver, connection::State, Connection, ErrorSlot}, - peer_set::ActiveConnectionCounter, - protocol::external::Codec, - PeerError, + peer::{ + connection::{Connection, State}, + ClientRequest, ErrorSlot, + }, + protocol::external::Message, + PeerError, Request, Response, }; #[tokio::test] async fn connection_run_loop_ok() { zebra_test::init(); - let (client_tx, client_rx) = mpsc::channel(1); - // The real stream and sink are from a split TCP connection, // but that doesn't change how the state machine behaves. let (peer_inbound_tx, peer_inbound_rx) = mpsc::channel(1); - let mut peer_outbound_bytes = Vec::::new(); - let peer_outbound_tx = FramedWrite::new( - &mut peer_outbound_bytes, - Codec::builder() - .for_network(Network::Mainnet) - .with_metrics_addr_label("test".into()) - .finish(), - ); - - let unused_inbound_service = - service_fn(|_| async { unreachable!("inbound service should never be called") }); - - let shared_error_slot = ErrorSlot::default(); - - let connection = Connection { - state: State::AwaitingRequest, - request_timer: None, - cached_addrs: Vec::new(), - svc: unused_inbound_service, - client_rx: ClientRequestReceiver::from(client_rx), - error_slot: shared_error_slot.clone(), - peer_tx: peer_outbound_tx, - connection_tracker: ActiveConnectionCounter::new_counter().track_connection(), - metrics_label: "test".to_string(), - last_metrics_state: None, - }; + let (connection, client_tx, mut inbound_service, mut peer_outbound_messages, shared_error_slot) = + new_test_connection(); let connection = connection.run(peer_inbound_rx); @@ -76,43 +52,21 @@ async fn connection_run_loop_ok() { // We need to drop the future, because it holds a mutable reference to the bytes. std::mem::drop(connection_guard); - assert_eq!(peer_outbound_bytes, Vec::::new()); + assert!(peer_outbound_messages.next().await.is_none()); + + inbound_service.expect_no_requests().await; } #[tokio::test] async fn connection_run_loop_future_drop() { zebra_test::init(); - let (client_tx, client_rx) = mpsc::channel(1); - + // The real stream and sink are from a split TCP connection, + // but that doesn't change how the state machine behaves. let (peer_inbound_tx, peer_inbound_rx) = mpsc::channel(1); - let mut peer_outbound_bytes = Vec::::new(); - let peer_outbound_tx = FramedWrite::new( - &mut peer_outbound_bytes, - Codec::builder() - .for_network(Network::Mainnet) - .with_metrics_addr_label("test".into()) - .finish(), - ); - - let unused_inbound_service = - service_fn(|_| async { unreachable!("inbound service should never be called") }); - - let shared_error_slot = ErrorSlot::default(); - - let connection = Connection { - state: State::AwaitingRequest, - request_timer: None, - cached_addrs: Vec::new(), - svc: unused_inbound_service, - client_rx: ClientRequestReceiver::from(client_rx), - error_slot: shared_error_slot.clone(), - peer_tx: peer_outbound_tx, - connection_tracker: ActiveConnectionCounter::new_counter().track_connection(), - metrics_label: "test".to_string(), - last_metrics_state: None, - }; + let (connection, client_tx, mut inbound_service, mut peer_outbound_messages, shared_error_slot) = + new_test_connection(); let connection = connection.run(peer_inbound_rx); @@ -126,43 +80,26 @@ async fn connection_run_loop_future_drop() { assert!(client_tx.is_closed()); assert!(peer_inbound_tx.is_closed()); - assert_eq!(peer_outbound_bytes, Vec::::new()); + assert!(peer_outbound_messages.next().await.is_none()); + + inbound_service.expect_no_requests().await; } #[tokio::test] async fn connection_run_loop_client_close() { zebra_test::init(); - let (mut client_tx, client_rx) = mpsc::channel(1); - + // The real stream and sink are from a split TCP connection, + // but that doesn't change how the state machine behaves. let (peer_inbound_tx, peer_inbound_rx) = mpsc::channel(1); - let mut peer_outbound_bytes = Vec::::new(); - let peer_outbound_tx = FramedWrite::new( - &mut peer_outbound_bytes, - Codec::builder() - .for_network(Network::Mainnet) - .with_metrics_addr_label("test".into()) - .finish(), - ); - - let unused_inbound_service = - service_fn(|_| async { unreachable!("inbound service should never be called") }); - - let shared_error_slot = ErrorSlot::default(); - - let connection = Connection { - state: State::AwaitingRequest, - request_timer: None, - cached_addrs: Vec::new(), - svc: unused_inbound_service, - client_rx: ClientRequestReceiver::from(client_rx), - error_slot: shared_error_slot.clone(), - peer_tx: peer_outbound_tx, - connection_tracker: ActiveConnectionCounter::new_counter().track_connection(), - metrics_label: "test".to_string(), - last_metrics_state: None, - }; + let ( + connection, + mut client_tx, + mut inbound_service, + mut peer_outbound_messages, + shared_error_slot, + ) = new_test_connection(); let connection = connection.run(peer_inbound_rx); @@ -183,43 +120,21 @@ async fn connection_run_loop_client_close() { // We need to drop the future, because it holds a mutable reference to the bytes. std::mem::drop(connection_guard); - assert_eq!(peer_outbound_bytes, Vec::::new()); + assert!(peer_outbound_messages.next().await.is_none()); + + inbound_service.expect_no_requests().await; } #[tokio::test] async fn connection_run_loop_client_drop() { zebra_test::init(); - let (client_tx, client_rx) = mpsc::channel(1); - + // The real stream and sink are from a split TCP connection, + // but that doesn't change how the state machine behaves. let (peer_inbound_tx, peer_inbound_rx) = mpsc::channel(1); - let mut peer_outbound_bytes = Vec::::new(); - let peer_outbound_tx = FramedWrite::new( - &mut peer_outbound_bytes, - Codec::builder() - .for_network(Network::Mainnet) - .with_metrics_addr_label("test".into()) - .finish(), - ); - - let unused_inbound_service = - service_fn(|_| async { unreachable!("inbound service should never be called") }); - - let shared_error_slot = ErrorSlot::default(); - - let connection = Connection { - state: State::AwaitingRequest, - request_timer: None, - cached_addrs: Vec::new(), - svc: unused_inbound_service, - client_rx: ClientRequestReceiver::from(client_rx), - error_slot: shared_error_slot.clone(), - peer_tx: peer_outbound_tx, - connection_tracker: ActiveConnectionCounter::new_counter().track_connection(), - metrics_label: "test".to_string(), - last_metrics_state: None, - }; + let (connection, client_tx, mut inbound_service, mut peer_outbound_messages, shared_error_slot) = + new_test_connection(); let connection = connection.run(peer_inbound_rx); @@ -239,43 +154,21 @@ async fn connection_run_loop_client_drop() { // We need to drop the future, because it holds a mutable reference to the bytes. std::mem::drop(connection_guard); - assert_eq!(peer_outbound_bytes, Vec::::new()); + assert!(peer_outbound_messages.next().await.is_none()); + + inbound_service.expect_no_requests().await; } #[tokio::test] async fn connection_run_loop_inbound_close() { zebra_test::init(); - let (client_tx, client_rx) = mpsc::channel(1); - + // The real stream and sink are from a split TCP connection, + // but that doesn't change how the state machine behaves. let (mut peer_inbound_tx, peer_inbound_rx) = mpsc::channel(1); - let mut peer_outbound_bytes = Vec::::new(); - let peer_outbound_tx = FramedWrite::new( - &mut peer_outbound_bytes, - Codec::builder() - .for_network(Network::Mainnet) - .with_metrics_addr_label("test".into()) - .finish(), - ); - - let unused_inbound_service = - service_fn(|_| async { unreachable!("inbound service should never be called") }); - - let shared_error_slot = ErrorSlot::default(); - - let connection = Connection { - state: State::AwaitingRequest, - request_timer: None, - cached_addrs: Vec::new(), - svc: unused_inbound_service, - client_rx: ClientRequestReceiver::from(client_rx), - error_slot: shared_error_slot.clone(), - peer_tx: peer_outbound_tx, - connection_tracker: ActiveConnectionCounter::new_counter().track_connection(), - metrics_label: "test".to_string(), - last_metrics_state: None, - }; + let (connection, client_tx, mut inbound_service, mut peer_outbound_messages, shared_error_slot) = + new_test_connection(); let connection = connection.run(peer_inbound_rx); @@ -296,43 +189,21 @@ async fn connection_run_loop_inbound_close() { // We need to drop the future, because it holds a mutable reference to the bytes. std::mem::drop(connection_guard); - assert_eq!(peer_outbound_bytes, Vec::::new()); + assert!(peer_outbound_messages.next().await.is_none()); + + inbound_service.expect_no_requests().await; } #[tokio::test] async fn connection_run_loop_inbound_drop() { zebra_test::init(); - let (client_tx, client_rx) = mpsc::channel(1); - + // The real stream and sink are from a split TCP connection, + // but that doesn't change how the state machine behaves. let (peer_inbound_tx, peer_inbound_rx) = mpsc::channel(1); - let mut peer_outbound_bytes = Vec::::new(); - let peer_outbound_tx = FramedWrite::new( - &mut peer_outbound_bytes, - Codec::builder() - .for_network(Network::Mainnet) - .with_metrics_addr_label("test".into()) - .finish(), - ); - - let unused_inbound_service = - service_fn(|_| async { unreachable!("inbound service should never be called") }); - - let shared_error_slot = ErrorSlot::default(); - - let connection = Connection { - state: State::AwaitingRequest, - request_timer: None, - cached_addrs: Vec::new(), - svc: unused_inbound_service, - client_rx: ClientRequestReceiver::from(client_rx), - error_slot: shared_error_slot.clone(), - peer_tx: peer_outbound_tx, - connection_tracker: ActiveConnectionCounter::new_counter().track_connection(), - metrics_label: "test".to_string(), - last_metrics_state: None, - }; + let (connection, client_tx, mut inbound_service, mut peer_outbound_messages, shared_error_slot) = + new_test_connection(); let connection = connection.run(peer_inbound_rx); @@ -352,49 +223,33 @@ async fn connection_run_loop_inbound_drop() { // We need to drop the future, because it holds a mutable reference to the bytes. std::mem::drop(connection_guard); - assert_eq!(peer_outbound_bytes, Vec::::new()); + assert!(peer_outbound_messages.next().await.is_none()); + + inbound_service.expect_no_requests().await; } #[tokio::test] async fn connection_run_loop_failed() { zebra_test::init(); - let (client_tx, client_rx) = mpsc::channel(1); - + // The real stream and sink are from a split TCP connection, + // but that doesn't change how the state machine behaves. let (peer_inbound_tx, peer_inbound_rx) = mpsc::channel(1); - let mut peer_outbound_bytes = Vec::::new(); - let peer_outbound_tx = FramedWrite::new( - &mut peer_outbound_bytes, - Codec::builder() - .for_network(Network::Mainnet) - .with_metrics_addr_label("test".into()) - .finish(), - ); - - let unused_inbound_service = - service_fn(|_| async { unreachable!("inbound service should never be called") }); - - let shared_error_slot = ErrorSlot::default(); + let ( + mut connection, + client_tx, + mut inbound_service, + mut peer_outbound_messages, + shared_error_slot, + ) = new_test_connection(); // Simulate an internal connection error. + connection.state = State::Failed; shared_error_slot .try_update_error(PeerError::ClientRequestTimeout.into()) .expect("unexpected previous error in tests"); - let connection = Connection { - state: State::Failed, - request_timer: None, - cached_addrs: Vec::new(), - svc: unused_inbound_service, - client_rx: ClientRequestReceiver::from(client_rx), - error_slot: shared_error_slot.clone(), - peer_tx: peer_outbound_tx, - connection_tracker: ActiveConnectionCounter::new_counter().track_connection(), - metrics_label: "test".to_string(), - last_metrics_state: None, - }; - let connection = connection.run(peer_inbound_rx); // If we drop the future, the connection will close anyway, so we avoid the drop by cloning it. @@ -413,5 +268,21 @@ async fn connection_run_loop_failed() { // We need to drop the future, because it holds a mutable reference to the bytes. std::mem::drop(connection_guard); - assert_eq!(peer_outbound_bytes, Vec::::new()); + assert!(peer_outbound_messages.next().await.is_none()); + + inbound_service.expect_no_requests().await; +} + +/// Creates a new [`Connection`] instance for unit tests. +fn new_test_connection() -> ( + Connection< + MockService, + SinkMapErr, fn(mpsc::SendError) -> SerializationError>, + >, + mpsc::Sender, + MockService, + mpsc::UnboundedReceiver, + ErrorSlot, +) { + super::new_test_connection() } diff --git a/zebra-network/src/protocol/internal/response.rs b/zebra-network/src/protocol/internal/response.rs index df0f5639..fefc4fc2 100644 --- a/zebra-network/src/protocol/internal/response.rs +++ b/zebra-network/src/protocol/internal/response.rs @@ -11,7 +11,7 @@ use std::{fmt, sync::Arc}; use proptest_derive::Arbitrary; /// A response to a network request, represented in internal format. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Eq, PartialEq)] #[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))] pub enum Response { /// Do not send any response to this request. diff --git a/zebra-test/src/mock_service.rs b/zebra-test/src/mock_service.rs index 078a5922..ba5c70b0 100644 --- a/zebra-test/src/mock_service.rs +++ b/zebra-test/src/mock_service.rs @@ -256,7 +256,7 @@ impl MockServiceBuilder { /// Note that this is used by both [`Self::for_prop_tests`] and [`Self::for_unit_tests`], the /// only difference being the `Assertion` generic type parameter, which Rust infers /// automatically. - fn finish( + pub fn finish( self, ) -> MockService { let proxy_channel_size = self