Ignore unexpected block responses to fix error cascade when synchronizing blocks (#3374)

* Refactor setup of `Connection` test vectors

Add a `new_test_connection` helper function to create a `Connection`
instance that's ready for testing.

* Check that no inbound requests are sent

Return the mock inbound service from `new_test_connection` and assert
that no requests were sent to it in any test.

* Replace `&mut Vec<u8>` with an `mpsc` channel

Make it easier to run the connection task in the background, i.e.,
remove any lifetime constraints from the borrowed buffer so that
`Connection` is `'static`.

It's now also easier to assert on individual messages sent from the
`Connection` instance.

* Make `MockServiceBuilder::finish` public

Allow test functions to be generic when creating a `MockService`, so
that caller functions actually determine if the type of `MockService`
assertions.

* Move `new_test_connection` to parent module

Make it more generic so that it can be used later in property tests as
well.

* Derive `Eq` and `PartialEq` for network `Response`

Allow intercepted `Response` instances to be easily compared in tests.

* Test block request cancel causes an error cascade

This is the scenario that caused the block synchronizer to reset every
few minutes, which made it considerably slower.

* Ignore unexpected block responses

It's likely that it's just a response for a previously cancelled block
request.
This commit is contained in:
Janito Vaqueiro Ferreira Filho 2022-01-20 05:14:16 -03:00 committed by GitHub
parent fb724d3b24
commit ec207cfa95
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 318 additions and 226 deletions

View File

@ -267,28 +267,31 @@ impl Handler {
// We got a block we didn't ask for.
//
// So either:
// 1. The peer doesn't know any of the blocks we asked for.
// 1. The response is for a previously cancelled block request.
// We should ignore that block, and wait for the actual response.
// 2. The peer doesn't know any of the blocks we asked for.
// We should cancel the request, so we don't hang waiting for blocks that
// will never arrive.
// 2. The peer sent an unsolicited block.
// 3. The peer sent an unsolicited block.
// We should ignore that block, and wait for the actual response.
//
// We end the request, so we don't hang on forked or lagging peers (case 1).
// But we keep the connection open, so the inbound service can process blocks
// from good peers (case 2).
// We ignore the message, so we don't desynchronize with the peer. This happens
// when we cancel a request and send a second different request, but receive a
// response for the first request. If we ended the request then, we could send
// a third request to the peer, and end up having to end that request as well
// when the response for the second request arrives.
//
// Ignoring the message gives us a chance to synchronize back to the correct
// request.
ignored_msg = Some(Message::Block(block));
if !blocks.is_empty() {
// TODO: does the caller need a list of missing blocks? (#1515)
Handler::Finished(Ok(Response::Blocks(blocks)))
} else {
// TODO: is it really an error if we ask for a block hash, but the peer
// doesn't know it? Should we close the connection on that kind of error?
// Should we fake a NotFound response here? (#1515)
let items = pending_hashes
.iter()
.map(|h| InventoryHash::Block(*h))
.collect();
Handler::Finished(Err(PeerError::NotFound(items)))
Handler::BlocksByHash {
pending_hashes,
blocks,
}
}
}
}

View File

@ -1,3 +1,67 @@
//! Tests for peer connections
use std::io;
use futures::{channel::mpsc, sink::SinkMapErr, SinkExt};
use zebra_chain::serialization::SerializationError;
use zebra_test::mock_service::MockService;
use crate::{
peer::{
client::ClientRequestReceiver, connection::State, ClientRequest, Connection, ErrorSlot,
},
peer_set::ActiveConnectionCounter,
protocol::external::Message,
Request, Response,
};
mod prop;
mod vectors;
/// Creates a new [`Connection`] instance for testing.
fn new_test_connection<A>() -> (
Connection<
MockService<Request, Response, A>,
SinkMapErr<mpsc::UnboundedSender<Message>, fn(mpsc::SendError) -> SerializationError>,
>,
mpsc::Sender<ClientRequest>,
MockService<Request, Response, A>,
mpsc::UnboundedReceiver<Message>,
ErrorSlot,
) {
let mock_inbound_service = MockService::build().finish();
let (client_tx, client_rx) = mpsc::channel(1);
let shared_error_slot = ErrorSlot::default();
let (peer_outbound_tx, peer_outbound_rx) = mpsc::unbounded();
let error_converter: fn(mpsc::SendError) -> SerializationError = |_| {
io::Error::new(
io::ErrorKind::BrokenPipe,
"peer outbound message stream was closed",
)
.into()
};
let peer_tx = peer_outbound_tx.sink_map_err(error_converter);
let connection = Connection {
state: State::AwaitingRequest,
request_timer: None,
cached_addrs: Vec::new(),
svc: mock_inbound_service.clone(),
client_rx: ClientRequestReceiver::from(client_rx),
error_slot: shared_error_slot.clone(),
peer_tx,
connection_tracker: ActiveConnectionCounter::new_counter().track_connection(),
metrics_label: "test".to_string(),
last_metrics_state: None,
};
(
connection,
client_tx,
mock_inbound_service,
peer_outbound_rx,
shared_error_slot,
)
}

View File

@ -0,0 +1,154 @@
use std::{collections::HashSet, env, mem, sync::Arc};
use futures::{
channel::{mpsc, oneshot},
sink::SinkMapErr,
SinkExt, StreamExt,
};
use proptest::prelude::*;
use tracing::Span;
use zebra_chain::{
block::{self, Block},
serialization::SerializationError,
};
use zebra_test::mock_service::{MockService, PropTestAssertion};
use crate::{
peer::{connection::Connection, ClientRequest, ErrorSlot},
protocol::external::Message,
Request, Response, SharedPeerError,
};
proptest! {
// The default value of proptest cases (256) causes this test to take more than ten seconds on
// most machines, so this reduces the value a little to reduce the test time.
// Set the PROPTEST_CASES env var to override this default.
#![proptest_config(
proptest::test_runner::Config::with_cases(env::var("PROPTEST_CASES")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(32))
)]
#[test]
fn connection_is_not_desynchronized_when_request_is_cancelled(
first_block in any::<Arc<Block>>(),
second_block in any::<Arc<Block>>(),
) {
let runtime = zebra_test::init_async();
runtime.block_on(async move {
// The real stream and sink are from a split TCP connection,
// but that doesn't change how the state machine behaves.
let (mut peer_inbound_tx, peer_inbound_rx) = mpsc::channel(1);
let (
connection,
mut client_tx,
mut inbound_service,
mut peer_outbound_messages,
shared_error_slot,
) = new_test_connection();
let connection_task = tokio::spawn(connection.run(peer_inbound_rx));
let response_to_first_request = send_block_request(
first_block.hash(),
&mut client_tx,
&mut peer_outbound_messages,
)
.await;
// Cancel first request.
mem::drop(response_to_first_request);
let response_to_second_request = send_block_request(
second_block.hash(),
&mut client_tx,
&mut peer_outbound_messages,
)
.await;
// Reply to first request
peer_inbound_tx
.send(Ok(Message::Block(first_block)))
.await
.expect("Failed to send response to first block request");
// Reply to second request
peer_inbound_tx
.send(Ok(Message::Block(second_block.clone())))
.await
.expect("Failed to send response to second block request");
// Check second response is correctly received
let receive_response_result = response_to_second_request.await;
prop_assert!(receive_response_result.is_ok());
let response_result = receive_response_result.unwrap();
prop_assert!(response_result.is_ok());
let response = response_result.unwrap();
prop_assert_eq!(response, Response::Blocks(vec![second_block]));
// Check the state after the response
let error = shared_error_slot.try_get_error();
assert!(matches!(error, None));
inbound_service.expect_no_requests().await?;
// Stop the connection thread
mem::drop(peer_inbound_tx);
let connection_task_result = connection_task.await;
prop_assert!(connection_task_result.is_ok());
Ok(())
})?;
}
}
/// Creates a new [`Connection`] instance for property tests.
fn new_test_connection() -> (
Connection<
MockService<Request, Response, PropTestAssertion>,
SinkMapErr<mpsc::UnboundedSender<Message>, fn(mpsc::SendError) -> SerializationError>,
>,
mpsc::Sender<ClientRequest>,
MockService<Request, Response, PropTestAssertion>,
mpsc::UnboundedReceiver<Message>,
ErrorSlot,
) {
super::new_test_connection()
}
async fn send_block_request(
block: block::Hash,
client_requests: &mut mpsc::Sender<ClientRequest>,
outbound_messages: &mut mpsc::UnboundedReceiver<Message>,
) -> oneshot::Receiver<Result<Response, SharedPeerError>> {
let (response_sender, response_receiver) = oneshot::channel();
let request = Request::BlocksByHash(HashSet::from_iter([block]));
let client_request = ClientRequest {
request,
tx: response_sender,
span: Span::none(),
};
client_requests
.send(client_request)
.await
.expect("failed to send block request to connection task");
let request_message = outbound_messages
.next()
.await
.expect("First block request message not sent");
assert_eq!(request_message, Message::GetData(vec![block.into()]));
response_receiver
}

View File

@ -4,54 +4,30 @@
//! - connection tests when awaiting requests (#3232)
//! - connection tests with closed/dropped peer_outbound_tx (#3233)
use futures::{channel::mpsc, FutureExt};
use tokio_util::codec::FramedWrite;
use tower::service_fn;
use zebra_chain::parameters::Network;
use futures::{channel::mpsc, sink::SinkMapErr, FutureExt, StreamExt};
use zebra_chain::serialization::SerializationError;
use zebra_test::mock_service::{MockService, PanicAssertion};
use crate::{
peer::{client::ClientRequestReceiver, connection::State, Connection, ErrorSlot},
peer_set::ActiveConnectionCounter,
protocol::external::Codec,
PeerError,
peer::{
connection::{Connection, State},
ClientRequest, ErrorSlot,
},
protocol::external::Message,
PeerError, Request, Response,
};
#[tokio::test]
async fn connection_run_loop_ok() {
zebra_test::init();
let (client_tx, client_rx) = mpsc::channel(1);
// The real stream and sink are from a split TCP connection,
// but that doesn't change how the state machine behaves.
let (peer_inbound_tx, peer_inbound_rx) = mpsc::channel(1);
let mut peer_outbound_bytes = Vec::<u8>::new();
let peer_outbound_tx = FramedWrite::new(
&mut peer_outbound_bytes,
Codec::builder()
.for_network(Network::Mainnet)
.with_metrics_addr_label("test".into())
.finish(),
);
let unused_inbound_service =
service_fn(|_| async { unreachable!("inbound service should never be called") });
let shared_error_slot = ErrorSlot::default();
let connection = Connection {
state: State::AwaitingRequest,
request_timer: None,
cached_addrs: Vec::new(),
svc: unused_inbound_service,
client_rx: ClientRequestReceiver::from(client_rx),
error_slot: shared_error_slot.clone(),
peer_tx: peer_outbound_tx,
connection_tracker: ActiveConnectionCounter::new_counter().track_connection(),
metrics_label: "test".to_string(),
last_metrics_state: None,
};
let (connection, client_tx, mut inbound_service, mut peer_outbound_messages, shared_error_slot) =
new_test_connection();
let connection = connection.run(peer_inbound_rx);
@ -76,43 +52,21 @@ async fn connection_run_loop_ok() {
// We need to drop the future, because it holds a mutable reference to the bytes.
std::mem::drop(connection_guard);
assert_eq!(peer_outbound_bytes, Vec::<u8>::new());
assert!(peer_outbound_messages.next().await.is_none());
inbound_service.expect_no_requests().await;
}
#[tokio::test]
async fn connection_run_loop_future_drop() {
zebra_test::init();
let (client_tx, client_rx) = mpsc::channel(1);
// The real stream and sink are from a split TCP connection,
// but that doesn't change how the state machine behaves.
let (peer_inbound_tx, peer_inbound_rx) = mpsc::channel(1);
let mut peer_outbound_bytes = Vec::<u8>::new();
let peer_outbound_tx = FramedWrite::new(
&mut peer_outbound_bytes,
Codec::builder()
.for_network(Network::Mainnet)
.with_metrics_addr_label("test".into())
.finish(),
);
let unused_inbound_service =
service_fn(|_| async { unreachable!("inbound service should never be called") });
let shared_error_slot = ErrorSlot::default();
let connection = Connection {
state: State::AwaitingRequest,
request_timer: None,
cached_addrs: Vec::new(),
svc: unused_inbound_service,
client_rx: ClientRequestReceiver::from(client_rx),
error_slot: shared_error_slot.clone(),
peer_tx: peer_outbound_tx,
connection_tracker: ActiveConnectionCounter::new_counter().track_connection(),
metrics_label: "test".to_string(),
last_metrics_state: None,
};
let (connection, client_tx, mut inbound_service, mut peer_outbound_messages, shared_error_slot) =
new_test_connection();
let connection = connection.run(peer_inbound_rx);
@ -126,43 +80,26 @@ async fn connection_run_loop_future_drop() {
assert!(client_tx.is_closed());
assert!(peer_inbound_tx.is_closed());
assert_eq!(peer_outbound_bytes, Vec::<u8>::new());
assert!(peer_outbound_messages.next().await.is_none());
inbound_service.expect_no_requests().await;
}
#[tokio::test]
async fn connection_run_loop_client_close() {
zebra_test::init();
let (mut client_tx, client_rx) = mpsc::channel(1);
// The real stream and sink are from a split TCP connection,
// but that doesn't change how the state machine behaves.
let (peer_inbound_tx, peer_inbound_rx) = mpsc::channel(1);
let mut peer_outbound_bytes = Vec::<u8>::new();
let peer_outbound_tx = FramedWrite::new(
&mut peer_outbound_bytes,
Codec::builder()
.for_network(Network::Mainnet)
.with_metrics_addr_label("test".into())
.finish(),
);
let unused_inbound_service =
service_fn(|_| async { unreachable!("inbound service should never be called") });
let shared_error_slot = ErrorSlot::default();
let connection = Connection {
state: State::AwaitingRequest,
request_timer: None,
cached_addrs: Vec::new(),
svc: unused_inbound_service,
client_rx: ClientRequestReceiver::from(client_rx),
error_slot: shared_error_slot.clone(),
peer_tx: peer_outbound_tx,
connection_tracker: ActiveConnectionCounter::new_counter().track_connection(),
metrics_label: "test".to_string(),
last_metrics_state: None,
};
let (
connection,
mut client_tx,
mut inbound_service,
mut peer_outbound_messages,
shared_error_slot,
) = new_test_connection();
let connection = connection.run(peer_inbound_rx);
@ -183,43 +120,21 @@ async fn connection_run_loop_client_close() {
// We need to drop the future, because it holds a mutable reference to the bytes.
std::mem::drop(connection_guard);
assert_eq!(peer_outbound_bytes, Vec::<u8>::new());
assert!(peer_outbound_messages.next().await.is_none());
inbound_service.expect_no_requests().await;
}
#[tokio::test]
async fn connection_run_loop_client_drop() {
zebra_test::init();
let (client_tx, client_rx) = mpsc::channel(1);
// The real stream and sink are from a split TCP connection,
// but that doesn't change how the state machine behaves.
let (peer_inbound_tx, peer_inbound_rx) = mpsc::channel(1);
let mut peer_outbound_bytes = Vec::<u8>::new();
let peer_outbound_tx = FramedWrite::new(
&mut peer_outbound_bytes,
Codec::builder()
.for_network(Network::Mainnet)
.with_metrics_addr_label("test".into())
.finish(),
);
let unused_inbound_service =
service_fn(|_| async { unreachable!("inbound service should never be called") });
let shared_error_slot = ErrorSlot::default();
let connection = Connection {
state: State::AwaitingRequest,
request_timer: None,
cached_addrs: Vec::new(),
svc: unused_inbound_service,
client_rx: ClientRequestReceiver::from(client_rx),
error_slot: shared_error_slot.clone(),
peer_tx: peer_outbound_tx,
connection_tracker: ActiveConnectionCounter::new_counter().track_connection(),
metrics_label: "test".to_string(),
last_metrics_state: None,
};
let (connection, client_tx, mut inbound_service, mut peer_outbound_messages, shared_error_slot) =
new_test_connection();
let connection = connection.run(peer_inbound_rx);
@ -239,43 +154,21 @@ async fn connection_run_loop_client_drop() {
// We need to drop the future, because it holds a mutable reference to the bytes.
std::mem::drop(connection_guard);
assert_eq!(peer_outbound_bytes, Vec::<u8>::new());
assert!(peer_outbound_messages.next().await.is_none());
inbound_service.expect_no_requests().await;
}
#[tokio::test]
async fn connection_run_loop_inbound_close() {
zebra_test::init();
let (client_tx, client_rx) = mpsc::channel(1);
// The real stream and sink are from a split TCP connection,
// but that doesn't change how the state machine behaves.
let (mut peer_inbound_tx, peer_inbound_rx) = mpsc::channel(1);
let mut peer_outbound_bytes = Vec::<u8>::new();
let peer_outbound_tx = FramedWrite::new(
&mut peer_outbound_bytes,
Codec::builder()
.for_network(Network::Mainnet)
.with_metrics_addr_label("test".into())
.finish(),
);
let unused_inbound_service =
service_fn(|_| async { unreachable!("inbound service should never be called") });
let shared_error_slot = ErrorSlot::default();
let connection = Connection {
state: State::AwaitingRequest,
request_timer: None,
cached_addrs: Vec::new(),
svc: unused_inbound_service,
client_rx: ClientRequestReceiver::from(client_rx),
error_slot: shared_error_slot.clone(),
peer_tx: peer_outbound_tx,
connection_tracker: ActiveConnectionCounter::new_counter().track_connection(),
metrics_label: "test".to_string(),
last_metrics_state: None,
};
let (connection, client_tx, mut inbound_service, mut peer_outbound_messages, shared_error_slot) =
new_test_connection();
let connection = connection.run(peer_inbound_rx);
@ -296,43 +189,21 @@ async fn connection_run_loop_inbound_close() {
// We need to drop the future, because it holds a mutable reference to the bytes.
std::mem::drop(connection_guard);
assert_eq!(peer_outbound_bytes, Vec::<u8>::new());
assert!(peer_outbound_messages.next().await.is_none());
inbound_service.expect_no_requests().await;
}
#[tokio::test]
async fn connection_run_loop_inbound_drop() {
zebra_test::init();
let (client_tx, client_rx) = mpsc::channel(1);
// The real stream and sink are from a split TCP connection,
// but that doesn't change how the state machine behaves.
let (peer_inbound_tx, peer_inbound_rx) = mpsc::channel(1);
let mut peer_outbound_bytes = Vec::<u8>::new();
let peer_outbound_tx = FramedWrite::new(
&mut peer_outbound_bytes,
Codec::builder()
.for_network(Network::Mainnet)
.with_metrics_addr_label("test".into())
.finish(),
);
let unused_inbound_service =
service_fn(|_| async { unreachable!("inbound service should never be called") });
let shared_error_slot = ErrorSlot::default();
let connection = Connection {
state: State::AwaitingRequest,
request_timer: None,
cached_addrs: Vec::new(),
svc: unused_inbound_service,
client_rx: ClientRequestReceiver::from(client_rx),
error_slot: shared_error_slot.clone(),
peer_tx: peer_outbound_tx,
connection_tracker: ActiveConnectionCounter::new_counter().track_connection(),
metrics_label: "test".to_string(),
last_metrics_state: None,
};
let (connection, client_tx, mut inbound_service, mut peer_outbound_messages, shared_error_slot) =
new_test_connection();
let connection = connection.run(peer_inbound_rx);
@ -352,49 +223,33 @@ async fn connection_run_loop_inbound_drop() {
// We need to drop the future, because it holds a mutable reference to the bytes.
std::mem::drop(connection_guard);
assert_eq!(peer_outbound_bytes, Vec::<u8>::new());
assert!(peer_outbound_messages.next().await.is_none());
inbound_service.expect_no_requests().await;
}
#[tokio::test]
async fn connection_run_loop_failed() {
zebra_test::init();
let (client_tx, client_rx) = mpsc::channel(1);
// The real stream and sink are from a split TCP connection,
// but that doesn't change how the state machine behaves.
let (peer_inbound_tx, peer_inbound_rx) = mpsc::channel(1);
let mut peer_outbound_bytes = Vec::<u8>::new();
let peer_outbound_tx = FramedWrite::new(
&mut peer_outbound_bytes,
Codec::builder()
.for_network(Network::Mainnet)
.with_metrics_addr_label("test".into())
.finish(),
);
let unused_inbound_service =
service_fn(|_| async { unreachable!("inbound service should never be called") });
let shared_error_slot = ErrorSlot::default();
let (
mut connection,
client_tx,
mut inbound_service,
mut peer_outbound_messages,
shared_error_slot,
) = new_test_connection();
// Simulate an internal connection error.
connection.state = State::Failed;
shared_error_slot
.try_update_error(PeerError::ClientRequestTimeout.into())
.expect("unexpected previous error in tests");
let connection = Connection {
state: State::Failed,
request_timer: None,
cached_addrs: Vec::new(),
svc: unused_inbound_service,
client_rx: ClientRequestReceiver::from(client_rx),
error_slot: shared_error_slot.clone(),
peer_tx: peer_outbound_tx,
connection_tracker: ActiveConnectionCounter::new_counter().track_connection(),
metrics_label: "test".to_string(),
last_metrics_state: None,
};
let connection = connection.run(peer_inbound_rx);
// If we drop the future, the connection will close anyway, so we avoid the drop by cloning it.
@ -413,5 +268,21 @@ async fn connection_run_loop_failed() {
// We need to drop the future, because it holds a mutable reference to the bytes.
std::mem::drop(connection_guard);
assert_eq!(peer_outbound_bytes, Vec::<u8>::new());
assert!(peer_outbound_messages.next().await.is_none());
inbound_service.expect_no_requests().await;
}
/// Creates a new [`Connection`] instance for unit tests.
fn new_test_connection() -> (
Connection<
MockService<Request, Response, PanicAssertion>,
SinkMapErr<mpsc::UnboundedSender<Message>, fn(mpsc::SendError) -> SerializationError>,
>,
mpsc::Sender<ClientRequest>,
MockService<Request, Response, PanicAssertion>,
mpsc::UnboundedReceiver<Message>,
ErrorSlot,
) {
super::new_test_connection()
}

View File

@ -11,7 +11,7 @@ use std::{fmt, sync::Arc};
use proptest_derive::Arbitrary;
/// A response to a network request, represented in internal format.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Eq, PartialEq)]
#[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))]
pub enum Response {
/// Do not send any response to this request.

View File

@ -256,7 +256,7 @@ impl MockServiceBuilder {
/// Note that this is used by both [`Self::for_prop_tests`] and [`Self::for_unit_tests`], the
/// only difference being the `Assertion` generic type parameter, which Rust infers
/// automatically.
fn finish<Request, Response, Assertion, Error>(
pub fn finish<Request, Response, Assertion, Error>(
self,
) -> MockService<Request, Response, Assertion, Error> {
let proxy_channel_size = self