3. Send notfound when Zebra doesn't have a block or transaction (#3466)

* refactor(network): rename Advertised to Available

```sh
fastmod Advertised Available zebra*
fastmod advertised available zebra*
```

* refactor(network): allow different available and missing types inside an InventoryStatus

And rename it to ResponseStatus.

Split the methods between ResponseStatus and an InventoryStatus alias.

* refactor(network): add a block_hash convenience method to InventoryHash

* test(network): improve failure logs for connection tests

* fix(inbound): move address sanitization into the response future

* feat(network): send notfound when Zebra doesn't have a block or transaction

* doc(network): move module docs to the top of each module

This makes them more likely to get updated when the module changes.

* fix(network): stop sending unsupported missing inventory types to the registry

* test(network): inbound messages are forwarded to the registry

* test(inbound): test Peers requests to the inbound service, directly and via TCP

* test(network): notfound block responses are sent by the inbound service

* test(network): notfound tx responses are sent by the inbound service

* test(network): increase sync test mock service timeout

The code that these tests use hasn't actually changed much,
and they are only failing on some platforms (coverage, macOS).

So it seems like the extra concurrent inbound tests have pushed them
past their time limit.
(Perhaps due to TCP system calls, or extra serialization work.)

* doc(network): fix typo

Co-authored-by: Janito Vaqueiro Ferreira Filho <janito.vff@gmail.com>

* test(network): remove unnecessary multi-threaded runtime from tests

This prevents `MockService<zebra_state>` timeouts
in the `sync_block_too_high_extend_tips` test,
at the cost of reducing coverage of different execution orders.

Co-authored-by: Janito Vaqueiro Ferreira Filho <janito.vff@gmail.com>
This commit is contained in:
teor 2022-02-14 11:51:34 +10:00 committed by GitHub
parent 2cae880e3e
commit 9f2028feff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 1998 additions and 1151 deletions

View File

@ -158,7 +158,7 @@ pub use crate::{
peer::{HandshakeError, PeerError, SharedPeerError},
peer_set::init,
policies::RetryLimit,
protocol::internal::{Request, Response},
protocol::internal::{Request, Response, ResponseStatus},
};
/// Types used in the definition of [`Request`] and [`Response`] messages.

View File

@ -365,6 +365,11 @@ impl MetaAddr {
MetaAddr::new_errored(addr, services.into())
}
/// Return the address for this `MetaAddr`.
pub fn addr(&self) -> SocketAddr {
self.addr
}
/// Returns the time of the last successful interaction with this peer.
///
/// Initially set to the unverified "last seen time" gossiped by the remote
@ -612,7 +617,7 @@ impl MetaAddr {
impl MetaAddr {
/// Forcefully change the time this peer last responded.
///
/// This method is for test-purposes only.
/// This method is for testing purposes only.
pub(crate) fn set_last_response(&mut self, last_response: DateTime32) {
self.last_response = Some(last_response);
}

View File

@ -1,30 +1,24 @@
//! Peer handling.
/// Handles outbound requests from our node to the network.
mod client;
/// The per-peer connection state machine.
mod connection;
/// Wrapper around handshake logic that also opens a TCP connection.
mod connector;
/// Peer-related errors.
mod error;
/// Performs peer handshakes.
mod handshake;
/// Tracks the load on a `Client` service.
mod load_tracked_client;
/// Watches for chain tip height updates to determine the minimum support peer protocol version.
mod minimum_peer_version;
#[cfg(any(test, feature = "proptest-impl"))]
pub use client::tests::ClientTestHarness;
#[cfg(not(test))]
use client::ClientRequest;
#[cfg(test)]
pub(crate) use client::{tests::ReceiveRequestAttempt, ClientRequest};
pub(crate) use client::tests::ReceiveRequestAttempt;
#[cfg(test)]
pub(crate) use handshake::register_inventory_status;
use client::{ClientRequestReceiver, InProgressClientRequest, MustUseOneshotSender};
pub(crate) use client::CancelHeartbeatTask;
pub(crate) use client::{CancelHeartbeatTask, ClientRequest};
pub use client::Client;
pub use connection::Connection;

View File

@ -1,3 +1,5 @@
//! Handles outbound requests from our node to the network.
use std::{
future::Future,
pin::Pin,

View File

@ -1,4 +1,4 @@
//! Zcash peer connection protocol handing for Zebra.
//! Zebra's per-peer connection state machine.
//!
//! Maps the external Zcash/Bitcoin protocol to Zebra's internal request/response
//! protocol.
@ -35,11 +35,13 @@ use crate::{
peer_set::ConnectionTracker,
protocol::{
external::{types::Nonce, InventoryHash, Message},
internal::{Request, Response},
internal::{Request, Response, ResponseStatus},
},
BoxError,
};
use ResponseStatus::*;
mod peer_tx;
#[cfg(test)]
@ -160,18 +162,11 @@ impl Handler {
) => {
// assumptions:
// - the transaction messages are sent in a single continuous batch
// - missing transaction hashes are included in a `NotFound` message
// - missing transactions are silently skipped
// (there is no `NotFound` message at the end of the batch)
if pending_ids.remove(&transaction.id) {
// we are in the middle of the continuous transaction messages
transactions.push(transaction);
if pending_ids.is_empty() {
Handler::Finished(Ok(Response::Transactions(transactions)))
} else {
Handler::TransactionsById {
pending_ids,
transactions,
}
}
} else {
// We got a transaction we didn't ask for. If the caller doesn't know any of the
// transactions, they should have sent a `NotFound` with all the hashes, rather
@ -188,18 +183,25 @@ impl Handler {
// connection open, so the inbound service can process transactions from good
// peers (case 2).
ignored_msg = Some(Message::Tx(transaction));
if !transactions.is_empty() {
// if our peers start sending mixed solicited and unsolicited transactions,
// we should update this code to handle those responses
info!("unexpected transaction from peer: transaction responses should be sent in a continuous batch, followed by notfound. Using partial received transactions as the peer response");
// TODO: does the caller need a list of missing transactions? (#1515)
Handler::Finished(Ok(Response::Transactions(transactions)))
} else {
// TODO: is it really an error if we ask for a transaction hash, but the peer
// doesn't know it? Should we close the connection on that kind of error?
// Should we fake a NotFound response here? (#1515)
let missing_transaction_ids = pending_ids.iter().map(Into::into).collect();
Handler::Finished(Err(PeerError::NotFound(missing_transaction_ids)))
}
if ignored_msg.is_some() && transactions.is_empty() {
// If we didn't get anything we wanted, retry the request.
let missing_transaction_ids = pending_ids.into_iter().map(Into::into).collect();
Handler::Finished(Err(PeerError::NotFound(missing_transaction_ids)))
} else if pending_ids.is_empty() || ignored_msg.is_some() {
// If we got some of what we wanted, let the internal client know.
let available = transactions.into_iter().map(ResponseStatus::Available);
let missing = pending_ids.into_iter().map(ResponseStatus::Missing);
Handler::Finished(Ok(Response::Transactions(
available.chain(missing).collect(),
)))
} else {
// Keep on waiting for more.
Handler::TransactionsById {
pending_ids,
transactions,
}
}
}
@ -219,7 +221,7 @@ impl Handler {
//
// If we're in sync with the peer, then the `NotFound` should contain the remaining
// hashes from the handler. If we're not in sync with the peer, we should return
// what we got so far, and log an error.
// what we got so far.
let missing_transaction_ids: HashSet<_> = transaction_ids(&missing_invs).collect();
if missing_transaction_ids != pending_ids {
trace!(?missing_invs, ?missing_transaction_ids, ?pending_ids);
@ -231,13 +233,18 @@ impl Handler {
info!("unexpected notfound message from peer: notfound contains duplicate hashes or non-transaction hashes. Using partial received transactions as the peer response");
}
if !transactions.is_empty() {
// TODO: does the caller need a list of missing transactions? (#1515)
Handler::Finished(Ok(Response::Transactions(transactions)))
if transactions.is_empty() {
// If we didn't get anything we wanted, retry the request.
let missing_transaction_ids = pending_ids.into_iter().map(Into::into).collect();
Handler::Finished(Err(PeerError::NotFound(missing_transaction_ids)))
} else {
// TODO: is it really an error if we ask for a transaction hash, but the peer
// doesn't know it? Should we close the connection on that kind of error? (#1515)
Handler::Finished(Err(PeerError::NotFound(missing_invs)))
// If we got some of what we wanted, let the internal client know.
let available = transactions.into_iter().map(ResponseStatus::Available);
let missing = pending_ids.into_iter().map(ResponseStatus::Missing);
Handler::Finished(Ok(Response::Transactions(
available.chain(missing).collect(),
)))
}
}
// `zcashd` returns requested blocks in a single batch of messages.
@ -258,25 +265,19 @@ impl Handler {
if pending_hashes.remove(&block.hash()) {
// we are in the middle of the continuous block messages
blocks.push(block);
if pending_hashes.is_empty() {
Handler::Finished(Ok(Response::Blocks(blocks)))
} else {
Handler::BlocksByHash {
pending_hashes,
blocks,
}
}
} else {
// We got a block we didn't ask for.
//
// So either:
// 1. The response is for a previously cancelled block request.
// We should ignore that block, and wait for the actual response.
// We should treat that block as an inbound gossiped block,
// and wait for the actual response.
// 2. The peer doesn't know any of the blocks we asked for.
// We should cancel the request, so we don't hang waiting for blocks that
// will never arrive.
// 3. The peer sent an unsolicited block.
// We should ignore that block, and wait for the actual response.
// We should treat that block as an inbound gossiped block,
// and wait for the actual response.
//
// We ignore the message, so we don't desynchronize with the peer. This happens
// when we cancel a request and send a second different request, but receive a
@ -286,15 +287,21 @@ impl Handler {
//
// Ignoring the message gives us a chance to synchronize back to the correct
// request.
//
// Peers can avoid these cascading errors by sending an explicit `notfound`.
// Zebra sends `notfound`, but `zcashd` doesn't.
ignored_msg = Some(Message::Block(block));
if !blocks.is_empty() {
// TODO: does the caller need a list of missing blocks? (#1515)
Handler::Finished(Ok(Response::Blocks(blocks)))
} else {
Handler::BlocksByHash {
pending_hashes,
blocks,
}
}
if pending_hashes.is_empty() {
// If we got everything we wanted, let the internal client know.
let available = blocks.into_iter().map(ResponseStatus::Available);
Handler::Finished(Ok(Response::Blocks(available.collect())))
} else {
// Keep on waiting for all the blocks we wanted, until we get them or time out.
Handler::BlocksByHash {
pending_hashes,
blocks,
}
}
}
@ -304,7 +311,7 @@ impl Handler {
pending_hashes,
blocks,
},
Message::NotFound(items),
Message::NotFound(missing_invs),
) => {
// assumptions:
// - the peer eventually returns a block or a `NotFound` entry
@ -315,36 +322,31 @@ impl Handler {
// If we're in sync with the peer, then the `NotFound` should contain the remaining
// hashes from the handler. If we're not in sync with the peer, we should return
// what we got so far, and log an error.
let missing_blocks: HashSet<_> = items
.iter()
.filter_map(|inv| match &inv {
InventoryHash::Block(b) => Some(b),
_ => None,
})
.cloned()
.collect();
let missing_blocks: HashSet<_> = block_hashes(&missing_invs).collect();
if missing_blocks != pending_hashes {
trace!(?items, ?missing_blocks, ?pending_hashes);
trace!(?missing_invs, ?missing_blocks, ?pending_hashes);
// if these errors are noisy, we should replace them with debugs
info!("unexpected notfound message from peer: all remaining block hashes should be listed in the notfound. Using partial received blocks as the peer response");
}
if missing_blocks.len() != items.len() {
trace!(?items, ?missing_blocks, ?pending_hashes);
if missing_blocks.len() != missing_invs.len() {
trace!(?missing_invs, ?missing_blocks, ?pending_hashes);
info!("unexpected notfound message from peer: notfound contains duplicate hashes or non-block hashes. Using partial received blocks as the peer response");
}
if !blocks.is_empty() {
// TODO: does the caller need a list of missing blocks? (#1515)
Handler::Finished(Ok(Response::Blocks(blocks)))
if blocks.is_empty() {
// If we didn't get anything we wanted, retry the request.
let missing_block_hashes = pending_hashes.into_iter().map(Into::into).collect();
Handler::Finished(Err(PeerError::NotFound(missing_block_hashes)))
} else {
// TODO: is it really an error if we ask for a block hash, but the peer
// doesn't know it? Should we close the connection on that kind of error? (#1515)
Handler::Finished(Err(PeerError::NotFound(items)))
// If we got some of what we wanted, let the internal client know.
let available = blocks.into_iter().map(ResponseStatus::Available);
let missing = pending_hashes.into_iter().map(ResponseStatus::Missing);
Handler::Finished(Ok(Response::Blocks(available.chain(missing).collect())))
}
}
// TODO:
// - add NotFound cases for blocks, transactions, and headers (#2726)
// - use `any(inv)` rather than `all(inv)`?
(Handler::FindBlocks, Message::Inv(items))
if items
@ -1216,18 +1218,48 @@ where
}
}
Response::Transactions(transactions) => {
// Generate one tx message per transaction.
// Generate one tx message per transaction,
// then a notfound message with all the missing transaction ids.
let mut missing_ids = Vec::new();
for transaction in transactions.into_iter() {
if let Err(e) = self.peer_tx.send(Message::Tx(transaction)).await {
match transaction {
Available(transaction) => {
if let Err(e) = self.peer_tx.send(Message::Tx(transaction)).await {
self.fail_with(e);
return;
}
}
Missing(id) => missing_ids.push(id.into()),
}
}
if !missing_ids.is_empty() {
if let Err(e) = self.peer_tx.send(Message::NotFound(missing_ids)).await {
self.fail_with(e);
return;
}
}
}
Response::Blocks(blocks) => {
// Generate one block message per block.
// Generate one tx message per block,
// then a notfound% message with all the missing block hashes.
let mut missing_hashes = Vec::new();
for block in blocks.into_iter() {
if let Err(e) = self.peer_tx.send(Message::Block(block)).await {
match block {
Available(block) => {
if let Err(e) = self.peer_tx.send(Message::Block(block)).await {
self.fail_with(e);
return;
}
}
Missing(hash) => missing_hashes.push(hash.into()),
}
}
if !missing_hashes.is_empty() {
if let Err(e) = self.peer_tx.send(Message::NotFound(missing_hashes)).await {
self.fail_with(e);
return;
}
@ -1400,11 +1432,5 @@ fn transaction_ids(items: &'_ [InventoryHash]) -> impl Iterator<Item = UnminedTx
/// Map a list of inventory hashes to the corresponding block hashes.
/// Non-block inventory hashes are skipped.
fn block_hashes(items: &'_ [InventoryHash]) -> impl Iterator<Item = block::Hash> + '_ {
items.iter().filter_map(|item| {
if let InventoryHash::Block(hash) = item {
Some(*hash)
} else {
None
}
})
items.iter().filter_map(InventoryHash::block_hash)
}

View File

@ -1,3 +1,5 @@
//! Randomised property tests for peer connection handling.
use std::{collections::HashSet, env, mem, sync::Arc};
use futures::{
@ -10,6 +12,7 @@ use tracing::Span;
use zebra_chain::{
block::{self, Block},
fmt::DisplayToDebug,
serialization::SerializationError,
};
use zebra_test::mock_service::{MockService, PropTestAssertion};
@ -17,9 +20,12 @@ use zebra_test::mock_service::{MockService, PropTestAssertion};
use crate::{
peer::{connection::Connection, ClientRequest, ErrorSlot},
protocol::external::Message,
protocol::internal::ResponseStatus,
Request, Response, SharedPeerError,
};
use ResponseStatus::*;
proptest! {
// The default value of proptest cases (256) causes this test to take more than ten seconds on
// most machines, so this reduces the value a little to reduce the test time.
@ -31,10 +37,13 @@ proptest! {
.unwrap_or(32))
)]
/// This test makes sure that Zebra ignores extra blocks after a block request is cancelled.
///
/// We need this behaviour to avoid cascading errors after a single cancelled block request.
#[test]
fn connection_is_not_desynchronized_when_request_is_cancelled(
first_block in any::<Arc<Block>>(),
second_block in any::<Arc<Block>>(),
first_block in any::<DisplayToDebug<Arc<Block>>>(),
second_block in any::<DisplayToDebug<Arc<Block>>>(),
) {
let runtime = zebra_test::init_async();
@ -72,26 +81,34 @@ proptest! {
// Reply to first request
peer_tx
.send(Ok(Message::Block(first_block)))
.send(Ok(Message::Block(first_block.0)))
.await
.expect("Failed to send response to first block request");
// Reply to second request
peer_tx
.send(Ok(Message::Block(second_block.clone())))
.send(Ok(Message::Block(second_block.0.clone())))
.await
.expect("Failed to send response to second block request");
// Check second response is correctly received
let receive_response_result = response_to_second_request.await;
prop_assert!(receive_response_result.is_ok());
prop_assert!(
receive_response_result.is_ok(),
"unexpected receive result: {:?}",
receive_response_result,
);
let response_result = receive_response_result.unwrap();
prop_assert!(response_result.is_ok());
prop_assert!(
response_result.is_ok(),
"unexpected response result: {:?}",
response_result,
);
let response = response_result.unwrap();
prop_assert_eq!(response, Response::Blocks(vec![second_block]));
prop_assert_eq!(response, Response::Blocks(vec![Available(second_block.0)]));
// Check the state after the response
let error = shared_error_slot.try_get_error();
@ -103,7 +120,11 @@ proptest! {
mem::drop(peer_tx);
let connection_task_result = connection_task.await;
prop_assert!(connection_task_result.is_ok());
prop_assert!(
connection_task_result.is_ok(),
"unexpected task result: {:?}",
connection_task_result,
);
Ok(())
})?;

View File

@ -1,3 +1,5 @@
//! Wrapper around handshake logic that also opens a TCP connection.
use std::{
future::Future,
net::SocketAddr,

View File

@ -1,3 +1,5 @@
//! Peer-related errors.
use std::{borrow::Cow, sync::Arc};
use thiserror::Error;

View File

@ -988,7 +988,7 @@ where
}
/// Register any advertised or missing inventory in `msg` for `connected_addr`.
async fn register_inventory_status(
pub(crate) async fn register_inventory_status(
msg: Result<Message, SerializationError>,
connected_addr: ConnectedAddr,
inv_collector: broadcast::Sender<InventoryChange>,
@ -1022,7 +1022,7 @@ async fn register_inventory_status(
// If all receivers have been dropped, `send` returns an error.
// When that happens, Zebra is shutting down, so we want to ignore this error.
let _ = inv_collector
.send(InventoryChange::new_advertised(*advertised, transient_addr));
.send(InventoryChange::new_available(*advertised, transient_addr));
}
[advertised @ ..] => {
let advertised = advertised
@ -1035,7 +1035,7 @@ async fn register_inventory_status(
);
if let Some(change) =
InventoryChange::new_advertised_multi(advertised, transient_addr)
InventoryChange::new_available_multi(advertised, transient_addr)
{
// Ignore channel errors that should only happen during shutdown.
let _ = inv_collector.send(change);
@ -1045,6 +1045,11 @@ async fn register_inventory_status(
}
(Ok(Message::NotFound(missing)), Some(transient_addr)) => {
// Ignore Errors and the unsupported FilteredBlock type
let missing = missing.iter().filter(|missing| {
missing.unmined_tx_id().is_some() || missing.block_hash().is_some()
});
debug!(?missing, "registering missing inventory for peer");
if let Some(change) = InventoryChange::new_missing_multi(missing, transient_addr) {

View File

@ -1,3 +1,5 @@
//! Watches for chain tip height updates to determine the minimum supported peer protocol version.
use zebra_chain::{chain_tip::ChainTip, parameters::Network};
use crate::protocol::external::types::Version;

View File

@ -1363,7 +1363,7 @@ where
+ 'static,
S::Future: Send + 'static,
{
// Create a test config that listens on any unused port.
// Create a test config that listens on an unused port.
let listen_addr = "127.0.0.1:0".parse().unwrap();
let mut config = Config {
listen_addr,

View File

@ -20,44 +20,37 @@ use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream,
use zebra_chain::{parameters::POST_BLOSSOM_POW_TARGET_SPACING, serialization::AtLeastOne};
use crate::{protocol::external::InventoryHash, BoxError};
use crate::{
protocol::{external::InventoryHash, internal::ResponseStatus},
BoxError,
};
use self::update::Update;
use InventoryStatus::*;
/// Underlying type for the alias InventoryStatus::*
use ResponseStatus::*;
pub mod update;
#[cfg(test)]
mod tests;
/// A peer inventory status, which tracks a hash for both available and missing inventory.
pub type InventoryStatus<T> = ResponseStatus<T, T>;
/// A peer inventory status change, used in the inventory status channel.
///
/// For performance reasons, advertisements should only be tracked
/// for hashes that are rare on the network.
/// So Zebra only tracks single-block inventory messages.
///
/// For security reasons, all `notfound` rejections should be tracked.
/// This also helps with performance, if the hash is rare on the network.
pub type InventoryChange = InventoryStatus<(AtLeastOne<InventoryHash>, SocketAddr)>;
/// An internal marker used in inventory status hash maps.
type InventoryMarker = InventoryStatus<()>;
/// A generic peer inventory status.
///
/// `Advertised` is used for inventory that peers claim to have,
/// and `Missing` is used for inventory they didn't provide when we requested it.
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub enum InventoryStatus<T: Clone> {
/// An advertised inventory hash.
///
/// For performance reasons, advertisements should only be tracked
/// for hashes that are rare on the network.
/// So Zebra only tracks single-block inventory messages.
Advertised(T),
/// An inventory hash rejected by a peer.
///
/// For security reasons, all `notfound` rejections should be tracked.
/// This also helps with performance, if the hash is rare on the network.
#[allow(dead_code)]
Missing(T),
}
/// An Inventory Registry for tracking recent inventory advertisements and missing inventory.
///
/// For more details please refer to the [RFC].
@ -90,9 +83,9 @@ impl std::fmt::Debug for InventoryRegistry {
}
impl InventoryChange {
/// Returns a new advertised inventory change from a single hash.
pub fn new_advertised(hash: InventoryHash, peer: SocketAddr) -> Self {
InventoryStatus::Advertised((AtLeastOne::from_one(hash), peer))
/// Returns a new available inventory change from a single hash.
pub fn new_available(hash: InventoryHash, peer: SocketAddr) -> Self {
InventoryStatus::Available((AtLeastOne::from_one(hash), peer))
}
/// Returns a new missing inventory change from a single hash.
@ -101,15 +94,15 @@ impl InventoryChange {
InventoryStatus::Missing((AtLeastOne::from_one(hash), peer))
}
/// Returns a new advertised multiple inventory change, if `hashes` contains at least one change.
pub fn new_advertised_multi<'a>(
/// Returns a new available multiple inventory change, if `hashes` contains at least one change.
pub fn new_available_multi<'a>(
hashes: impl IntoIterator<Item = &'a InventoryHash>,
peer: SocketAddr,
) -> Option<Self> {
let hashes: Vec<InventoryHash> = hashes.into_iter().copied().collect();
let hashes = hashes.try_into().ok();
hashes.map(|hashes| InventoryStatus::Advertised((hashes, peer)))
hashes.map(|hashes| InventoryStatus::Available((hashes, peer)))
}
/// Returns a new missing multiple inventory change, if `hashes` contains at least one change.
@ -125,64 +118,27 @@ impl InventoryChange {
}
}
impl<T: Clone> InventoryStatus<T> {
/// Returns true if the inventory item was advertised.
#[allow(dead_code)]
pub fn is_advertised(&self) -> bool {
matches!(self, Advertised(_))
}
/// Returns true if the inventory item was missing.
#[allow(dead_code)]
pub fn is_missing(&self) -> bool {
matches!(self, Missing(_))
}
/// Get the advertised inventory item, if present.
pub fn advertised(&self) -> Option<T> {
if let Advertised(item) = self {
Some(item.clone())
} else {
None
}
}
/// Get the rejected inventory item, if present.
#[allow(dead_code)]
pub fn missing(&self) -> Option<T> {
if let Missing(item) = self {
Some(item.clone())
} else {
None
}
}
/// Get the inner item, regardless of status.
pub fn inner(&self) -> T {
match self {
Advertised(item) | Missing(item) => item.clone(),
}
}
impl<T> InventoryStatus<T> {
/// Get a marker for the status, without any associated data.
pub fn marker(&self) -> InventoryMarker {
self.as_ref().map(|_inner| ())
}
/// Maps an `InventoryStatus<T>` to `InventoryStatus<U>` by applying a function to a contained value.
pub fn map<U: Clone, F: FnOnce(T) -> U>(self, f: F) -> InventoryStatus<U> {
// Based on Option::map from https://doc.rust-lang.org/src/core/option.rs.html#829
pub fn map<U, F: FnOnce(T) -> U>(self, f: F) -> InventoryStatus<U> {
// Based on Option::map from https://doc.rust-lang.org/src/core/option.rs.html#844
match self {
Advertised(item) => Advertised(f(item)),
Available(item) => Available(f(item)),
Missing(item) => Missing(f(item)),
}
}
}
/// Converts from `&InventoryStatus<T>` to `InventoryStatus<&T>`.
pub fn as_ref(&self) -> InventoryStatus<&T> {
impl<T: Clone> InventoryStatus<T> {
/// Get the inner item, regardless of status.
pub fn inner(&self) -> T {
match self {
Advertised(item) => Advertised(item),
Missing(item) => Missing(item),
Available(item) | Missing(item) => item.clone(),
}
}
}
@ -212,7 +168,7 @@ impl InventoryRegistry {
/// Returns an iterator over addrs of peers that have recently advertised `hash` in their inventory.
pub fn advertising_peers(&self, hash: InventoryHash) -> impl Iterator<Item = &SocketAddr> {
self.status_peers(hash)
.filter_map(|addr_status| addr_status.advertised())
.filter_map(|addr_status| addr_status.available())
}
/// Returns an iterator over addrs of peers that have recently missed `hash` in their inventory.
@ -332,7 +288,7 @@ impl InventoryRegistry {
// Prefer `missing` over `advertised`, so malicious peers can't reset their own entries,
// and funnel multiple failing requests to themselves.
if let Some(old_status) = current.get(&addr) {
if old_status.is_missing() && new_status.is_advertised() {
if old_status.is_missing() && new_status.is_available() {
debug!(?new_status, ?old_status, ?addr, ?inv, "skipping new status");
continue;
}

View File

@ -1,3 +1,22 @@
//! Tests for the inventory registry.
use tokio::sync::broadcast;
use crate::peer_set::inventory_registry::{InventoryChange, InventoryRegistry};
mod prop;
mod vectors;
/// The number of changes that can be pending in the inventory channel, before it starts lagging.
///
/// Lagging drops messages, so tests should avoid filling the channel.
pub const MAX_PENDING_CHANGES: usize = 32;
/// Returns a newly initialised inventory registry, and a sender for its inventory channel.
fn new_inv_registry() -> (InventoryRegistry, broadcast::Sender<InventoryChange>) {
let (inv_stream_tx, inv_stream_rx) = broadcast::channel(MAX_PENDING_CHANGES);
let inv_registry = InventoryRegistry::new(inv_stream_rx);
(inv_registry, inv_stream_tx)
}

View File

@ -0,0 +1,140 @@
//! Randomised property tests for the inventory registry.
use std::{collections::HashSet, net::SocketAddr};
use proptest::prelude::*;
use crate::{
peer::{register_inventory_status, ConnectedAddr},
peer_set::inventory_registry::{
tests::{new_inv_registry, MAX_PENDING_CHANGES},
InventoryMarker,
},
protocol::external::{InventoryHash, Message},
};
use InventoryHash::*;
/// The maximum number of random hashes we want to use in these tests.
pub const MAX_HASH_COUNT: usize = 5;
proptest! {
/// Check inventory registration works via the inbound peer message channel wrapper.
#[test]
fn inv_registry_inbound_wrapper_ok(
status in any::<InventoryMarker>(),
test_hashes in prop::collection::hash_set(any::<InventoryHash>(), 0..=MAX_HASH_COUNT)
) {
prop_assert!(MAX_HASH_COUNT <= MAX_PENDING_CHANGES, "channel must not block in tests");
// Start the runtime
let runtime = zebra_test::init_async();
let _guard = runtime.enter();
runtime.block_on(async move {
// Check all combinations of:
//
// Inventory availability:
// - Available
// - Missing
//
// Inventory multiplicity:
// - Empty messages are ignored without errors
// - Single is handled correctly
// - Multiple are handled correctly
//
// And inventory variants:
// - Block (empty and single only)
// - Tx for legacy v4 and earlier transactions
// - Wtx for v5 and later transactions
//
// Using randomised proptest inventory data.
inv_registry_inbound_wrapper_with(status, test_hashes).await;
})
}
}
/// Check inventory registration works via the inbound peer message channel wrapper.
#[tracing::instrument]
async fn inv_registry_inbound_wrapper_with(
status: InventoryMarker,
test_hashes: HashSet<InventoryHash>,
) {
let test_peer: SocketAddr = "1.1.1.1:1"
.parse()
.expect("unexpected invalid peer address");
let test_peer = ConnectedAddr::new_inbound_direct(test_peer);
let test_hashes: Vec<InventoryHash> = test_hashes.into_iter().collect();
let test_msg = if status.is_available() {
Message::Inv(test_hashes.clone())
} else {
Message::NotFound(test_hashes.clone())
};
let (mut inv_registry, inv_stream_tx) = new_inv_registry();
let forwarded_msg =
register_inventory_status(Ok(test_msg.clone()), test_peer, inv_stream_tx.clone()).await;
assert_eq!(
test_msg.clone(),
forwarded_msg.expect("unexpected forwarded error result"),
);
inv_registry
.update()
.await
.expect("unexpected dropped registry sender channel");
let test_peer = test_peer
.get_transient_addr()
.expect("unexpected None: expected Some transient peer address");
for &test_hash in test_hashes.iter() {
// The registry should ignore these unsupported types.
// (Currently, it panics if they are registered, but they are ok to query.)
if matches!(test_hash, Error | FilteredBlock(_)) {
assert_eq!(inv_registry.advertising_peers(test_hash).count(), 0);
assert_eq!(inv_registry.missing_peers(test_hash).count(), 0);
continue;
}
if status.is_available() {
// register_inventory_status doesn't register multi-block available inventory,
// for performance reasons.
if test_hashes.len() > 1 && test_hash.block_hash().is_some() {
assert_eq!(inv_registry.advertising_peers(test_hash).count(), 0);
assert_eq!(inv_registry.missing_peers(test_hash).count(), 0);
continue;
}
assert_eq!(
inv_registry.advertising_peers(test_hash).next(),
Some(&test_peer),
"unexpected None hash: {:?},\n\
in message {:?} \n\
with length {}\n\
should be advertised\n",
test_hash,
test_msg,
test_hashes.len(),
);
assert_eq!(inv_registry.advertising_peers(test_hash).count(), 1);
assert_eq!(inv_registry.missing_peers(test_hash).count(), 0);
} else {
assert_eq!(inv_registry.advertising_peers(test_hash).count(), 0);
assert_eq!(
inv_registry.missing_peers(test_hash).next(),
Some(&test_peer),
"unexpected None hash: {:?},\n\
in message {:?} \n\
with length {}\n\
should be advertised\n",
test_hash,
test_msg,
test_hashes.len(),
);
assert_eq!(inv_registry.missing_peers(test_hash).count(), 1);
}
}
}

View File

@ -1,22 +1,12 @@
//! Fixed test vectors for the inventory registry.
use tokio::sync::broadcast;
use zebra_chain::block;
use crate::{
peer_set::{
inventory_registry::{InventoryRegistry, InventoryStatus},
InventoryChange,
},
peer_set::inventory_registry::{tests::new_inv_registry, InventoryStatus},
protocol::external::InventoryHash,
};
/// The number of changes that can be pending in the inventory channel, before it starts lagging.
///
/// Lagging drops messages, so tests should avoid filling the channel.
pub const MAX_PENDING_CHANGES: usize = 32;
/// Check an empty inventory registry works as expected.
#[tokio::test]
async fn inv_registry_empty_ok() {
@ -40,7 +30,7 @@ async fn inv_registry_one_advertised_ok() {
let test_peer = "1.1.1.1:1"
.parse()
.expect("unexpected invalid peer address");
let test_change = InventoryStatus::new_advertised(test_hash, test_peer);
let test_change = InventoryStatus::new_available(test_hash, test_peer);
let (mut inv_registry, inv_stream_tx) = new_inv_registry();
@ -105,7 +95,7 @@ async fn inv_registry_prefer_missing_order(missing_first: bool) {
.expect("unexpected invalid peer address");
let missing_change = InventoryStatus::new_missing(test_hash, test_peer);
let advertised_change = InventoryStatus::new_advertised(test_hash, test_peer);
let advertised_change = InventoryStatus::new_available(test_hash, test_peer);
let (mut inv_registry, inv_stream_tx) = new_inv_registry();
@ -150,7 +140,7 @@ async fn inv_registry_prefer_current_order(missing_current: bool) {
.expect("unexpected invalid peer address");
let missing_change = InventoryStatus::new_missing(test_hash, test_peer);
let advertised_change = InventoryStatus::new_advertised(test_hash, test_peer);
let advertised_change = InventoryStatus::new_available(test_hash, test_peer);
let (mut inv_registry, inv_stream_tx) = new_inv_registry();
@ -192,12 +182,3 @@ async fn inv_registry_prefer_current_order(missing_current: bool) {
assert_eq!(inv_registry.missing_peers(test_hash).count(), 0);
}
}
/// Returns a newly initialised inventory registry, and a sender for its inventory channel.
fn new_inv_registry() -> (InventoryRegistry, broadcast::Sender<InventoryChange>) {
let (inv_stream_tx, inv_stream_rx) = broadcast::channel(MAX_PENDING_CHANGES);
let inv_registry = InventoryRegistry::new(inv_stream_rx);
(inv_registry, inv_stream_tx)
}

View File

@ -255,7 +255,7 @@ fn peer_set_route_inv_advertised_registry_order(advertised_first: bool) {
.parse()
.expect("unexpected invalid peer address");
let test_change = InventoryStatus::new_advertised(test_inv, test_peer);
let test_change = InventoryStatus::new_available(test_inv, test_peer);
// Use two peers with the same version
let peer_version = Version::min_specified_for_upgrade(Network::Mainnet, NetworkUpgrade::Canopy);

View File

@ -1,4 +1,4 @@
//! Inventory items for the Bitcoin protocol.
//! Inventory items for the Zcash network protocol.
use std::io::{Read, Write};
@ -65,6 +65,19 @@ impl InventoryHash {
InventoryHash::Tx(legacy_tx_id)
}
/// Returns the block hash for this inventory hash,
/// if this inventory hash is a non-filtered block variant.
pub fn block_hash(&self) -> Option<block::Hash> {
match self {
InventoryHash::Error => None,
InventoryHash::Tx(_legacy_tx_id) => None,
InventoryHash::Block(hash) => Some(*hash),
// Zebra does not support filtered blocks
InventoryHash::FilteredBlock(_ignored_hash) => None,
InventoryHash::Wtx(_wtx_id) => None,
}
}
/// Returns the unmined transaction ID for this inventory hash,
/// if this inventory hash is a transaction variant.
pub fn unmined_tx_id(&self) -> Option<UnminedTxId> {

View File

@ -1,5 +1,7 @@
mod request;
mod response;
mod response_status;
pub use request::Request;
pub use response::Response;
pub use response_status::ResponseStatus;

View File

@ -1,28 +1,31 @@
//! Zebra's internal peer message response format.
use std::{fmt, sync::Arc};
use zebra_chain::{
block::{self, Block},
transaction::{UnminedTx, UnminedTxId},
};
use crate::meta_addr::MetaAddr;
use std::{fmt, sync::Arc};
use crate::{meta_addr::MetaAddr, protocol::internal::ResponseStatus};
#[cfg(any(test, feature = "proptest-impl"))]
use proptest_derive::Arbitrary;
use ResponseStatus::*;
/// A response to a network request, represented in internal format.
#[derive(Clone, Debug, Eq, PartialEq)]
#[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))]
pub enum Response {
/// Do not send any response to this request.
/// The request does not have a response.
///
/// Either:
/// * the request does not need a response, or
/// * we have no useful data to provide in response to the request
/// * we have no useful data to provide in response to the request,
/// and the request was not an inventory request.
///
/// When Zebra doesn't have any useful data, it always sends no response,
/// instead of sending `notfound`. `zcashd` sometimes sends no response,
/// and sometimes sends `notfound`.
/// (Inventory requests provide a list of missing hashes if none of the hashes were available.)
Nil,
/// A list of peers, used to respond to `GetPeers`.
@ -32,35 +35,20 @@ pub enum Response {
// TODO: make this into a HashMap<SocketAddr, MetaAddr> - a unique list of peer addresses (#2244)
Peers(Vec<MetaAddr>),
/// A list of blocks.
///
/// The list contains zero or more blocks.
//
// TODO: split this into found and not found (#2726)
Blocks(Vec<Arc<Block>>),
/// A list of block hashes.
/// An ordered list of block hashes.
///
/// The list contains zero or more block hashes.
//
// TODO: make this into an IndexMap - an ordered unique list of hashes (#2244)
BlockHashes(Vec<block::Hash>),
/// A list of block headers.
/// An ordered list of block headers.
///
/// The list contains zero or more block headers.
//
// TODO: make this into a HashMap<block::Hash, CountedHeader> - a unique list of headers (#2244)
// split this into found and not found (#2726)
// TODO: make this into an IndexMap - an ordered unique list of headers (#2244)
BlockHeaders(Vec<block::CountedHeader>),
/// A list of unmined transactions.
///
/// The list contains zero or more unmined transactions.
//
// TODO: split this into found and not found (#2726)
Transactions(Vec<UnminedTx>),
/// A list of unmined transaction IDs.
///
/// v4 transactions use a legacy transaction ID, and
@ -68,8 +56,25 @@ pub enum Response {
///
/// The list contains zero or more transaction IDs.
//
// TODO: make this into a HashSet - a unique list of transaction IDs (#2244)
// TODO: make this into a HashSet - a unique list (#2244)
TransactionIds(Vec<UnminedTxId>),
/// A list of found blocks, and missing block hashes.
///
/// Each list contains zero or more entries.
///
/// When Zebra doesn't have a block or transaction, it always sends `notfound`.
/// `zcashd` sometimes sends no response, and sometimes sends `notfound`.
//
// TODO: make this into a HashMap<block::Hash, ResponseStatus<Arc<Block>, ()>> - a unique list (#2244)
Blocks(Vec<ResponseStatus<Arc<Block>, block::Hash>>),
/// A list of found unmined transactions, and missing unmined transaction IDs.
///
/// Each list contains zero or more entries.
//
// TODO: make this into a HashMap<UnminedTxId, ResponseStatus<UnminedTx, ()>> - a unique list (#2244)
Transactions(Vec<ResponseStatus<UnminedTx, UnminedTxId>>),
}
impl fmt::Display for Response {
@ -79,30 +84,38 @@ impl fmt::Display for Response {
Response::Peers(peers) => format!("Peers {{ peers: {} }}", peers.len()),
// Display heights for single-block responses (which Zebra requests and expects)
Response::Blocks(blocks) if blocks.len() == 1 => {
let block = blocks.first().expect("len is 1");
format!(
"Block {{ height: {}, hash: {} }}",
block
.coinbase_height()
.as_ref()
.map(|h| h.0.to_string())
.unwrap_or_else(|| "None".into()),
block.hash(),
)
}
Response::Blocks(blocks) => format!("Blocks {{ blocks: {} }}", blocks.len()),
Response::BlockHashes(hashes) => format!("BlockHashes {{ hashes: {} }}", hashes.len()),
Response::BlockHeaders(headers) => {
format!("BlockHeaders {{ headers: {} }}", headers.len())
}
Response::Transactions(transactions) => {
format!("Transactions {{ transactions: {} }}", transactions.len())
}
Response::TransactionIds(ids) => format!("TransactionIds {{ ids: {} }}", ids.len()),
// Display heights for single-block responses (which Zebra requests and expects)
Response::Blocks(blocks) if blocks.len() == 1 => {
match blocks.first().expect("len is 1") {
Available(block) => format!(
"Block {{ height: {}, hash: {} }}",
block
.coinbase_height()
.as_ref()
.map(|h| h.0.to_string())
.unwrap_or_else(|| "None".into()),
block.hash(),
),
Missing(hash) => format!("Block {{ missing: {} }}", hash),
}
}
Response::Blocks(blocks) => format!(
"Blocks {{ blocks: {}, missing: {} }}",
blocks.iter().filter(|r| r.is_available()).count(),
blocks.iter().filter(|r| r.is_missing()).count()
),
Response::Transactions(transactions) => format!(
"Transactions {{ transactions: {}, missing: {} }}",
transactions.iter().filter(|r| r.is_available()).count(),
transactions.iter().filter(|r| r.is_missing()).count()
),
})
}
}
@ -115,13 +128,12 @@ impl Response {
Response::Peers(_) => "Peers",
Response::Blocks(_) => "Blocks",
Response::BlockHashes(_) => "BlockHashes",
Response::BlockHeaders { .. } => "BlockHeaders",
Response::Transactions(_) => "Transactions",
Response::BlockHeaders(_) => "BlockHeaders",
Response::TransactionIds(_) => "TransactionIds",
Response::Blocks(_) => "Blocks",
Response::Transactions(_) => "Transactions",
}
}
}

View File

@ -0,0 +1,101 @@
//! The status of a response to an inventory request.
use std::fmt;
#[cfg(any(test, feature = "proptest-impl"))]
use proptest_derive::Arbitrary;
use ResponseStatus::*;
/// A generic peer inventory response status.
///
/// `Available` is used for inventory that is present in the response,
/// and `Missing` is used for inventory that is missing from the response.
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
#[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))]
pub enum ResponseStatus<A, M> {
/// An available inventory item.
Available(A),
/// A missing inventory item.
Missing(M),
}
impl<A, M> fmt::Display for ResponseStatus<A, M> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str(self.command())
}
}
impl<A, M> ResponseStatus<A, M> {
/// Returns the response status type as a string.
pub fn command(&self) -> &'static str {
match self {
ResponseStatus::Available(_) => "Available",
ResponseStatus::Missing(_) => "Missing",
}
}
/// Returns true if the inventory item was available.
#[allow(dead_code)]
pub fn is_available(&self) -> bool {
matches!(self, Available(_))
}
/// Returns true if the inventory item was missing.
#[allow(dead_code)]
pub fn is_missing(&self) -> bool {
matches!(self, Missing(_))
}
/// Maps a `ResponseStatus<A, M>` to `ResponseStatus<B, M>` by applying a function to a
/// contained [`Available`] value, leaving the [`Missing`] value untouched.
#[allow(dead_code)]
pub fn map_available<B, F: FnOnce(A) -> B>(self, f: F) -> ResponseStatus<B, M> {
// Based on Result::map from https://doc.rust-lang.org/src/core/result.rs.html#765
match self {
Available(a) => Available(f(a)),
Missing(m) => Missing(m),
}
}
/// Maps a `ResponseStatus<A, M>` to `ResponseStatus<A, N>` by applying a function to a
/// contained [`Missing`] value, leaving the [`Available`] value untouched.
#[allow(dead_code)]
pub fn map_missing<N, F: FnOnce(M) -> N>(self, f: F) -> ResponseStatus<A, N> {
// Based on Result::map_err from https://doc.rust-lang.org/src/core/result.rs.html#850
match self {
Available(a) => Available(a),
Missing(m) => Missing(f(m)),
}
}
/// Converts from `&ResponseStatus<A, M>` to `ResponseStatus<&A, &M>`.
pub fn as_ref(&self) -> ResponseStatus<&A, &M> {
match self {
Available(item) => Available(item),
Missing(item) => Missing(item),
}
}
}
impl<A: Clone, M: Clone> ResponseStatus<A, M> {
/// Get the available inventory item, if present.
pub fn available(&self) -> Option<A> {
if let Available(item) = self {
Some(item.clone())
} else {
None
}
}
/// Get the missing inventory item, if present.
#[allow(dead_code)]
pub fn missing(&self) -> Option<M> {
if let Missing(item) = self {
Some(item.clone())
} else {
None
}
}
}

View File

@ -6,6 +6,7 @@
//! It also responds to peer requests for blocks, transactions, and peer addresses.
use std::{
collections::HashSet,
future::Future,
pin::Pin,
sync::Arc,
@ -23,11 +24,14 @@ use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service, Service
use zebra_network as zn;
use zebra_state as zs;
use zebra_chain::block::{self, Block};
use zebra_chain::{
block::{self, Block},
transaction::UnminedTxId,
};
use zebra_consensus::chain::VerifyChainError;
use zebra_network::{
constants::{ADDR_RESPONSE_LIMIT_DENOMINATOR, MAX_ADDRS_IN_MESSAGE},
AddressBook,
AddressBook, ResponseStatus,
};
// Re-use the syncer timeouts for consistency.
@ -36,6 +40,8 @@ use super::{
sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT},
};
use ResponseStatus::*;
pub(crate) mod downloads;
#[cfg(test)]
@ -307,33 +313,41 @@ impl Service<zn::Request> for Inbound {
// # Correctness
//
// Briefly hold the address book threaded mutex while
// cloning the address book. Then sanitize after releasing
// the lock.
// cloning the address book. Then sanitize in the future,
// after releasing the lock.
let peers = address_book.lock().unwrap().clone();
// Correctness: get the current time after acquiring the address book lock.
let now = Utc::now();
async move {
// Correctness: get the current time after acquiring the address book lock.
let now = Utc::now();
// Send a sanitized response
let mut peers = peers.sanitized(now);
// Send a sanitized response
let mut peers = peers.sanitized(now);
// Truncate the list
//
// TODO: replace with div_ceil once it stabilises
// https://github.com/rust-lang/rust/issues/88581
let address_limit = (peers.len() + ADDR_RESPONSE_LIMIT_DENOMINATOR - 1) / ADDR_RESPONSE_LIMIT_DENOMINATOR;
let address_limit = MAX_ADDRS_IN_MESSAGE
.min(address_limit);
peers.truncate(address_limit);
// Truncate the list
//
// TODO: replace with div_ceil once it stabilises
// https://github.com/rust-lang/rust/issues/88581
let address_limit = (peers.len() + ADDR_RESPONSE_LIMIT_DENOMINATOR - 1) / ADDR_RESPONSE_LIMIT_DENOMINATOR;
let address_limit = MAX_ADDRS_IN_MESSAGE.min(address_limit);
peers.truncate(address_limit);
if !peers.is_empty() {
async { Ok(zn::Response::Peers(peers)) }.boxed()
} else {
debug!("ignoring `Peers` request from remote peer because our address book is empty");
async { Ok(zn::Response::Nil) }.boxed()
}
if peers.is_empty() {
// We don't know if the peer response will be empty until we've sanitized them.
debug!("ignoring `Peers` request from remote peer because our address book is empty");
Ok(zn::Response::Nil)
} else {
Ok(zn::Response::Peers(peers))
}
}.boxed()
}
zn::Request::BlocksByHash(hashes) => {
// We return an available or missing response to each inventory request,
// unless the request is empty.
if hashes.is_empty() {
return async { Ok(zn::Response::Nil) }.boxed();
}
// Correctness:
//
// We can't use `call_all` here, because it can hold one buffer slot per concurrent
@ -344,6 +358,7 @@ impl Service<zn::Request> for Inbound {
// https://github.com/tower-rs/tower/blob/master/tower/src/util/call_all/common.rs#L112
use futures::stream::TryStreamExt;
hashes
.clone()
.into_iter()
.map(|hash| zs::Request::Block(hash.into()))
.map(|request| state.clone().oneshot(request))
@ -360,22 +375,36 @@ impl Service<zn::Request> for Inbound {
})
.try_collect::<Vec<_>>()
.map_ok(|blocks| {
if blocks.is_empty() {
zn::Response::Nil
} else {
zn::Response::Blocks(blocks)
}
// Work out which hashes were missing.
let available_hashes: HashSet<block::Hash> = blocks.iter().map(|block| block.hash()).collect();
let available = blocks.into_iter().map(Available);
let missing = hashes.into_iter().filter(|hash| !available_hashes.contains(hash)).map(Missing);
zn::Response::Blocks(available.chain(missing).collect())
})
.boxed()
}
zn::Request::TransactionsById(transactions) => {
let request = mempool::Request::TransactionsById(transactions);
mempool.clone().oneshot(request).map_ok(|resp| match resp {
mempool::Response::Transactions(transactions) if transactions.is_empty() => zn::Response::Nil,
mempool::Response::Transactions(transactions) => zn::Response::Transactions(transactions),
_ => unreachable!("Mempool component should always respond to a `TransactionsById` request with a `Transactions` response"),
})
.boxed()
zn::Request::TransactionsById(req_tx_ids) => {
// We return an available or missing response to each inventory request,
// unless the request is empty.
if req_tx_ids.is_empty() {
return async { Ok(zn::Response::Nil) }.boxed();
}
let request = mempool::Request::TransactionsById(req_tx_ids.clone());
mempool.clone().oneshot(request).map_ok(move |resp| {
let transactions = match resp {
mempool::Response::Transactions(transactions) => transactions,
_ => unreachable!("Mempool component should always respond to a `TransactionsById` request with a `Transactions` response"),
};
// Work out which transaction IDs were missing.
let available_tx_ids: HashSet<UnminedTxId> = transactions.iter().map(|tx| tx.id).collect();
let available = transactions.into_iter().map(Available);
let missing = req_tx_ids.into_iter().filter(|tx_id| !available_tx_ids.contains(tx_id)).map(Missing);
zn::Response::Transactions(available.chain(missing).collect())
}).boxed()
}
zn::Request::FindBlocks { known_blocks, stop } => {
let request = zs::Request::FindBlockHashes { known_blocks, stop };

View File

@ -236,9 +236,12 @@ where
);
blocks
.into_iter()
.next()
.expect("successful response has the block in it")
.first()
.expect("just checked length")
.available()
.expect(
"unexpected missing block status: single block failures should be errors",
)
} else {
unreachable!("wrong response to block request");
};

View File

@ -1,830 +1,4 @@
//! Inbound service tests.
use std::{
collections::HashSet,
iter::{self, FromIterator},
net::SocketAddr,
str::FromStr,
sync::Arc,
time::Duration,
};
use futures::FutureExt;
use tokio::{sync::oneshot, task::JoinHandle};
use tower::{buffer::Buffer, builder::ServiceBuilder, util::BoxService, Service, ServiceExt};
use tracing::Span;
use zebra_chain::{
amount::Amount,
block::Block,
parameters::Network,
serialization::ZcashDeserializeInto,
transaction::{UnminedTx, UnminedTxId, VerifiedUnminedTx},
};
use zebra_consensus::{error::TransactionError, transaction, Config as ConsensusConfig};
use zebra_network::{AddressBook, Request, Response};
use zebra_state::Config as StateConfig;
use zebra_test::mock_service::{MockService, PanicAssertion};
use crate::{
components::{
inbound::InboundSetupData,
mempool::{self, gossip_mempool_transaction_id, unmined_transactions_in_blocks, Mempool},
sync::{self, BlockGossipError, SyncStatus},
},
BoxError,
};
/// Maximum time to wait for a network service request.
///
/// The default [`MockService`] value can be too short for some of these tests that take a little
/// longer than expected to actually send the network request.
///
/// Increasing this value causes the tests to take longer to complete, so it can't be too large.
const MAX_PEER_SET_REQUEST_DELAY: Duration = Duration::from_millis(500);
#[tokio::test]
async fn mempool_requests_for_transactions() {
let (
inbound_service,
_mempool_guard,
_committed_blocks,
added_transactions,
_mock_tx_verifier,
mut peer_set,
_state_guard,
sync_gossip_task_handle,
tx_gossip_task_handle,
) = setup(true).await;
let added_transactions: Vec<UnminedTx> = added_transactions
.iter()
.map(|t| t.transaction.clone())
.collect();
let added_transaction_ids: Vec<UnminedTxId> = added_transactions.iter().map(|t| t.id).collect();
// Test `Request::MempoolTransactionIds`
let response = inbound_service
.clone()
.oneshot(Request::MempoolTransactionIds)
.await;
match response {
Ok(Response::TransactionIds(response)) => assert_eq!(response, added_transaction_ids),
_ => unreachable!(
"`MempoolTransactionIds` requests should always respond `Ok(Vec<UnminedTxId>)`, got {:?}",
response
),
};
// Test `Request::TransactionsById`
let hash_set = added_transaction_ids
.iter()
.copied()
.collect::<HashSet<_>>();
let response = inbound_service
.clone()
.oneshot(Request::TransactionsById(hash_set))
.await;
match response {
Ok(Response::Transactions(response)) => assert_eq!(response, added_transactions),
_ => unreachable!("`TransactionsById` requests should always respond `Ok(Vec<UnminedTx>)`"),
};
// check that nothing unexpected happened
peer_set.expect_no_requests().await;
let sync_gossip_result = sync_gossip_task_handle.now_or_never();
assert!(
matches!(sync_gossip_result, None),
"unexpected error or panic in sync gossip task: {:?}",
sync_gossip_result,
);
let tx_gossip_result = tx_gossip_task_handle.now_or_never();
assert!(
matches!(tx_gossip_result, None),
"unexpected error or panic in transaction gossip task: {:?}",
tx_gossip_result,
);
}
#[tokio::test]
async fn mempool_push_transaction() -> Result<(), crate::BoxError> {
// get a block that has at least one non coinbase transaction
let block: Arc<Block> =
zebra_test::vectors::BLOCK_MAINNET_982681_BYTES.zcash_deserialize_into()?;
// use the first transaction that is not coinbase
let tx = block.transactions[1].clone();
let (
inbound_service,
_mempool_guard,
_committed_blocks,
_added_transactions,
mut tx_verifier,
mut peer_set,
_state_guard,
sync_gossip_task_handle,
tx_gossip_task_handle,
) = setup(false).await;
// Test `Request::PushTransaction`
let request = inbound_service
.clone()
.oneshot(Request::PushTransaction(tx.clone().into()));
// Simulate a successful transaction verification
let verification = tx_verifier.expect_request_that(|_| true).map(|responder| {
let transaction = responder
.request()
.clone()
.into_mempool_transaction()
.expect("unexpected non-mempool request");
// Set a dummy fee.
responder.respond(transaction::Response::from(VerifiedUnminedTx::new(
transaction,
Amount::zero(),
)));
});
let (response, _) = futures::join!(request, verification);
match response {
Ok(Response::Nil) => (),
_ => unreachable!("`PushTransaction` requests should always respond `Ok(Nil)`"),
};
// Use `Request::MempoolTransactionIds` to check the transaction was inserted to mempool
let request = inbound_service
.clone()
.oneshot(Request::MempoolTransactionIds)
.await;
match request {
Ok(Response::TransactionIds(response)) => assert_eq!(response, vec![tx.unmined_id()]),
_ => unreachable!(
"`MempoolTransactionIds` requests should always respond `Ok(Vec<UnminedTxId>)`"
),
};
// Make sure there is an additional request broadcasting the
// inserted transaction to peers.
let mut hs = HashSet::new();
hs.insert(tx.unmined_id());
peer_set
.expect_request(Request::AdvertiseTransactionIds(hs))
.await
.respond(Response::Nil);
let sync_gossip_result = sync_gossip_task_handle.now_or_never();
assert!(
matches!(sync_gossip_result, None),
"unexpected error or panic in sync gossip task: {:?}",
sync_gossip_result,
);
let tx_gossip_result = tx_gossip_task_handle.now_or_never();
assert!(
matches!(tx_gossip_result, None),
"unexpected error or panic in transaction gossip task: {:?}",
tx_gossip_result,
);
Ok(())
}
#[tokio::test]
async fn mempool_advertise_transaction_ids() -> Result<(), crate::BoxError> {
// get a block that has at least one non coinbase transaction
let block: Block = zebra_test::vectors::BLOCK_MAINNET_982681_BYTES.zcash_deserialize_into()?;
// use the first transaction that is not coinbase
let test_transaction = block
.transactions
.into_iter()
.find(|tx| !tx.has_any_coinbase_inputs())
.expect("at least one non-coinbase transaction");
let test_transaction_id = test_transaction.unmined_id();
let txs = HashSet::from_iter([test_transaction_id]);
let (
inbound_service,
_mempool_guard,
_committed_blocks,
_added_transactions,
mut tx_verifier,
mut peer_set,
_state_guard,
sync_gossip_task_handle,
tx_gossip_task_handle,
) = setup(false).await;
// Test `Request::AdvertiseTransactionIds`
let request = inbound_service
.clone()
.oneshot(Request::AdvertiseTransactionIds(txs.clone()));
// Ensure the mocked peer set responds
let peer_set_responder =
peer_set
.expect_request(Request::TransactionsById(txs))
.map(|responder| {
let unmined_transaction = UnminedTx::from(test_transaction.clone());
responder.respond(Response::Transactions(vec![unmined_transaction]))
});
// Simulate a successful transaction verification
let verification = tx_verifier.expect_request_that(|_| true).map(|responder| {
let transaction = responder
.request()
.clone()
.into_mempool_transaction()
.expect("unexpected non-mempool request");
// Set a dummy fee.
responder.respond(transaction::Response::from(VerifiedUnminedTx::new(
transaction,
Amount::zero(),
)));
});
let (response, _, _) = futures::join!(request, peer_set_responder, verification);
match response {
Ok(Response::Nil) => (),
_ => unreachable!("`AdvertiseTransactionIds` requests should always respond `Ok(Nil)`"),
};
// Use `Request::MempoolTransactionIds` to check the transaction was inserted to mempool
let request = inbound_service
.clone()
.oneshot(Request::MempoolTransactionIds)
.await;
match request {
Ok(Response::TransactionIds(response)) => {
assert_eq!(response, vec![test_transaction_id])
}
_ => unreachable!(
"`MempoolTransactionIds` requests should always respond `Ok(Vec<UnminedTxId>)`"
),
};
// Make sure there is an additional request broadcasting the
// inserted transaction to peers.
let mut hs = HashSet::new();
hs.insert(test_transaction.unmined_id());
peer_set
.expect_request(Request::AdvertiseTransactionIds(hs))
.await
.respond(Response::Nil);
let sync_gossip_result = sync_gossip_task_handle.now_or_never();
assert!(
matches!(sync_gossip_result, None),
"unexpected error or panic in sync gossip task: {:?}",
sync_gossip_result,
);
let tx_gossip_result = tx_gossip_task_handle.now_or_never();
assert!(
matches!(tx_gossip_result, None),
"unexpected error or panic in transaction gossip task: {:?}",
tx_gossip_result,
);
Ok(())
}
#[tokio::test]
async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> {
// Get a block that has at least one non coinbase transaction
let block: Block = zebra_test::vectors::BLOCK_MAINNET_982681_BYTES.zcash_deserialize_into()?;
// Use the first transaction that is not coinbase to test expiration
let tx1 = &*(block.transactions[1]).clone();
let mut tx1_id = tx1.unmined_id();
// Change the expiration height of the transaction to block 3
let mut tx1 = tx1.clone();
*tx1.expiry_height_mut() = zebra_chain::block::Height(3);
// Use the second transaction that is not coinbase to trigger `remove_expired_transactions()`
let tx2 = block.transactions[2].clone();
let mut tx2_id = tx2.unmined_id();
// Get services
let (
inbound_service,
mempool,
_committed_blocks,
_added_transactions,
mut tx_verifier,
mut peer_set,
state_service,
sync_gossip_task_handle,
tx_gossip_task_handle,
) = setup(false).await;
// Push test transaction
let request = inbound_service
.clone()
.oneshot(Request::PushTransaction(tx1.clone().into()));
// Simulate a successful transaction verification
let verification = tx_verifier.expect_request_that(|_| true).map(|responder| {
tx1_id = responder.request().tx_id();
let transaction = responder
.request()
.clone()
.into_mempool_transaction()
.expect("unexpected non-mempool request");
// Set a dummy fee.
responder.respond(transaction::Response::from(VerifiedUnminedTx::new(
transaction,
Amount::zero(),
)));
});
let (response, _) = futures::join!(request, verification);
match response {
Ok(Response::Nil) => (),
_ => unreachable!("`PushTransaction` requests should always respond `Ok(Nil)`"),
};
// Use `Request::MempoolTransactionIds` to check the transaction was inserted to mempool
let request = inbound_service
.clone()
.oneshot(Request::MempoolTransactionIds)
.await;
match request {
Ok(Response::TransactionIds(response)) => {
assert_eq!(response, vec![tx1_id])
}
_ => unreachable!(
"`MempoolTransactionIds` requests should always respond `Ok(Vec<UnminedTxId>)`"
),
};
// Add a new block to the state (make the chain tip advance)
let block_two: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_2_BYTES
.zcash_deserialize_into()
.unwrap();
state_service
.clone()
.oneshot(zebra_state::Request::CommitFinalizedBlock(
block_two.clone().into(),
))
.await
.unwrap();
// Test transaction 1 is gossiped
let mut hs = HashSet::new();
hs.insert(tx1_id);
peer_set
.expect_request(Request::AdvertiseTransactionIds(hs))
.await
.respond(Response::Nil);
// Block is gossiped then
peer_set
.expect_request(Request::AdvertiseBlock(block_two.hash()))
.await
.respond(Response::Nil);
// Make sure tx1 is still in the mempool as it is not expired yet.
let request = inbound_service
.clone()
.oneshot(Request::MempoolTransactionIds)
.await;
match request {
Ok(Response::TransactionIds(response)) => {
assert_eq!(response, vec![tx1_id])
}
_ => unreachable!(
"`MempoolTransactionIds` requests should always respond `Ok(Vec<UnminedTxId>)`"
),
};
// As our test transaction will expire at a block height greater or equal to 3 we need to push block 3.
let block_three: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_3_BYTES
.zcash_deserialize_into()
.unwrap();
state_service
.clone()
.oneshot(zebra_state::Request::CommitFinalizedBlock(
block_three.clone().into(),
))
.await
.unwrap();
// Block is gossiped
peer_set
.expect_request(Request::AdvertiseBlock(block_three.hash()))
.await
.respond(Response::Nil);
// Push a second transaction to trigger `remove_expired_transactions()`
let request = inbound_service
.clone()
.oneshot(Request::PushTransaction(tx2.clone().into()));
// Simulate a successful transaction verification
let verification = tx_verifier.expect_request_that(|_| true).map(|responder| {
tx2_id = responder.request().tx_id();
let transaction = responder
.request()
.clone()
.into_mempool_transaction()
.expect("unexpected non-mempool request");
// Set a dummy fee.
responder.respond(transaction::Response::from(VerifiedUnminedTx::new(
transaction,
Amount::zero(),
)));
});
let (response, _) = futures::join!(request, verification);
match response {
Ok(Response::Nil) => (),
_ => unreachable!("`PushTransaction` requests should always respond `Ok(Nil)`"),
};
// Use `Request::MempoolTransactionIds` to check the transaction was inserted to mempool
let request = inbound_service
.clone()
.oneshot(Request::MempoolTransactionIds)
.await;
// Only tx2 will be in the mempool while tx1 was expired
match request {
Ok(Response::TransactionIds(response)) => {
assert_eq!(response, vec![tx2_id])
}
_ => unreachable!(
"`MempoolTransactionIds` requests should always respond `Ok(Vec<UnminedTxId>)`"
),
};
// Check if tx1 was added to the rejected list as well
let response = mempool
.clone()
.oneshot(mempool::Request::Queue(vec![tx1_id.into()]))
.await
.unwrap();
let queued_responses = match response {
mempool::Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert_eq!(
queued_responses[0],
Err(mempool::MempoolError::StorageEffectsChain(
mempool::SameEffectsChainRejectionError::Expired
))
);
// Test transaction 2 is gossiped
let mut hs = HashSet::new();
hs.insert(tx2_id);
peer_set
.expect_request(Request::AdvertiseTransactionIds(hs))
.await
.respond(Response::Nil);
// Add all the rest of the continuous blocks we have to test tx2 will never expire.
let more_blocks: Vec<Arc<Block>> = vec![
zebra_test::vectors::BLOCK_MAINNET_4_BYTES
.zcash_deserialize_into()
.unwrap(),
zebra_test::vectors::BLOCK_MAINNET_5_BYTES
.zcash_deserialize_into()
.unwrap(),
zebra_test::vectors::BLOCK_MAINNET_6_BYTES
.zcash_deserialize_into()
.unwrap(),
zebra_test::vectors::BLOCK_MAINNET_7_BYTES
.zcash_deserialize_into()
.unwrap(),
zebra_test::vectors::BLOCK_MAINNET_8_BYTES
.zcash_deserialize_into()
.unwrap(),
zebra_test::vectors::BLOCK_MAINNET_9_BYTES
.zcash_deserialize_into()
.unwrap(),
zebra_test::vectors::BLOCK_MAINNET_10_BYTES
.zcash_deserialize_into()
.unwrap(),
];
for block in more_blocks {
state_service
.clone()
.oneshot(zebra_state::Request::CommitFinalizedBlock(
block.clone().into(),
))
.await
.unwrap();
// Block is gossiped
peer_set
.expect_request(Request::AdvertiseBlock(block.hash()))
.await
.respond(Response::Nil);
let request = inbound_service
.clone()
.oneshot(Request::MempoolTransactionIds)
.await;
// tx2 is still in the mempool as the blockchain progress because the zero expiration height
match request {
Ok(Response::TransactionIds(response)) => {
assert_eq!(response, vec![tx2_id])
}
_ => unreachable!(
"`MempoolTransactionIds` requests should always respond `Ok(Vec<UnminedTxId>)`"
),
};
}
// check that nothing unexpected happened
peer_set.expect_no_requests().await;
let sync_gossip_result = sync_gossip_task_handle.now_or_never();
assert!(
matches!(sync_gossip_result, None),
"unexpected error or panic in sync gossip task: {:?}",
sync_gossip_result,
);
let tx_gossip_result = tx_gossip_task_handle.now_or_never();
assert!(
matches!(tx_gossip_result, None),
"unexpected error or panic in transaction gossip task: {:?}",
tx_gossip_result,
);
Ok(())
}
/// Test that the inbound downloader rejects blocks above the lookahead limit.
///
/// TODO: also test that it rejects blocks behind the tip limit. (Needs ~100 fake blocks.)
#[tokio::test]
async fn inbound_block_height_lookahead_limit() -> Result<(), crate::BoxError> {
// Get services
let (
inbound_service,
_mempool,
_committed_blocks,
_added_transactions,
mut tx_verifier,
mut peer_set,
state_service,
sync_gossip_task_handle,
tx_gossip_task_handle,
) = setup(false).await;
// Get the next block
let block: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_2_BYTES.zcash_deserialize_into()?;
let block_hash = block.hash();
// Push test block hash
let _request = inbound_service
.clone()
.oneshot(Request::AdvertiseBlock(block_hash))
.await?;
// Block is fetched, and committed to the state
peer_set
.expect_request(Request::BlocksByHash(iter::once(block_hash).collect()))
.await
.respond(Response::Blocks(vec![block]));
// TODO: check that the block is queued in the checkpoint verifier
// check that nothing unexpected happened
peer_set.expect_no_requests().await;
tx_verifier.expect_no_requests().await;
// Get a block that is a long way away from genesis
let block: Arc<Block> =
zebra_test::vectors::BLOCK_MAINNET_982681_BYTES.zcash_deserialize_into()?;
let block_hash = block.hash();
// Push test block hash
let _request = inbound_service
.clone()
.oneshot(Request::AdvertiseBlock(block_hash))
.await?;
// Block is fetched, but the downloader drops it because it is too high
peer_set
.expect_request(Request::BlocksByHash(iter::once(block_hash).collect()))
.await
.respond(Response::Blocks(vec![block]));
let response = state_service
.clone()
.oneshot(zebra_state::Request::Depth(block_hash))
.await?;
assert_eq!(response, zebra_state::Response::Depth(None));
// TODO: check that the block is not queued in the checkpoint verifier or non-finalized state
// check that nothing unexpected happened
peer_set.expect_no_requests().await;
tx_verifier.expect_no_requests().await;
let sync_gossip_result = sync_gossip_task_handle.now_or_never();
assert!(
matches!(sync_gossip_result, None),
"unexpected error or panic in sync gossip task: {:?}",
sync_gossip_result,
);
let tx_gossip_result = tx_gossip_task_handle.now_or_never();
assert!(
matches!(tx_gossip_result, None),
"unexpected error or panic in transaction gossip task: {:?}",
tx_gossip_result,
);
Ok(())
}
async fn setup(
add_transactions: bool,
) -> (
Buffer<
BoxService<zebra_network::Request, zebra_network::Response, BoxError>,
zebra_network::Request,
>,
Buffer<BoxService<mempool::Request, mempool::Response, BoxError>, mempool::Request>,
Vec<Arc<Block>>,
Vec<VerifiedUnminedTx>,
MockService<transaction::Request, transaction::Response, PanicAssertion, TransactionError>,
MockService<Request, Response, PanicAssertion>,
Buffer<BoxService<zebra_state::Request, zebra_state::Response, BoxError>, zebra_state::Request>,
JoinHandle<Result<(), BlockGossipError>>,
JoinHandle<Result<(), BoxError>>,
) {
let network = Network::Mainnet;
let consensus_config = ConsensusConfig::default();
let state_config = StateConfig::ephemeral();
let address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Span::none());
let address_book = Arc::new(std::sync::Mutex::new(address_book));
let (sync_status, mut recent_syncs) = SyncStatus::new();
let (state, latest_chain_tip, chain_tip_change) =
zebra_state::init(state_config.clone(), network);
let mut state_service = ServiceBuilder::new().buffer(1).service(state);
// Download task panics and timeouts are propagated to the tests that use Groth16 verifiers.
let (block_verifier, _transaction_verifier, _groth16_download_handle) =
zebra_consensus::chain::init(
consensus_config.clone(),
network,
state_service.clone(),
true,
)
.await;
let mut peer_set = MockService::build()
.with_max_request_delay(MAX_PEER_SET_REQUEST_DELAY)
.for_unit_tests();
let buffered_peer_set = Buffer::new(BoxService::new(peer_set.clone()), 10);
let mock_tx_verifier = MockService::build().for_unit_tests();
let buffered_tx_verifier = Buffer::new(BoxService::new(mock_tx_verifier.clone()), 10);
let mut committed_blocks = Vec::new();
// Push the genesis block to the state.
// This must be done before creating the mempool to avoid `chain_tip_change`
// returning "reset" which would clear the mempool.
let genesis_block: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES
.zcash_deserialize_into()
.unwrap();
state_service
.ready()
.await
.unwrap()
.call(zebra_state::Request::CommitFinalizedBlock(
genesis_block.clone().into(),
))
.await
.unwrap();
committed_blocks.push(genesis_block);
// Also push block 1.
// Block one is a network upgrade and the mempool will be cleared at it,
// let all our tests start after this event.
let block_one: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_1_BYTES
.zcash_deserialize_into()
.unwrap();
state_service
.clone()
.oneshot(zebra_state::Request::CommitFinalizedBlock(
block_one.clone().into(),
))
.await
.unwrap();
committed_blocks.push(block_one);
let (mut mempool_service, transaction_receiver) = Mempool::new(
&mempool::Config::default(),
buffered_peer_set.clone(),
state_service.clone(),
buffered_tx_verifier.clone(),
sync_status.clone(),
latest_chain_tip.clone(),
chain_tip_change.clone(),
);
// Enable the mempool
mempool_service.enable(&mut recent_syncs).await;
// Add transactions to the mempool, skipping verification and broadcast
let mut added_transactions = Vec::new();
if add_transactions {
added_transactions.extend(add_some_stuff_to_mempool(&mut mempool_service, network));
}
let mempool_service = BoxService::new(mempool_service);
let mempool_service = ServiceBuilder::new().buffer(1).service(mempool_service);
let (setup_tx, setup_rx) = oneshot::channel();
let inbound_service = ServiceBuilder::new()
.load_shed()
.service(super::Inbound::new(setup_rx));
let inbound_service = BoxService::new(inbound_service);
let inbound_service = ServiceBuilder::new().buffer(1).service(inbound_service);
let setup_data = InboundSetupData {
address_book,
block_download_peer_set: buffered_peer_set,
block_verifier,
mempool: mempool_service.clone(),
state: state_service.clone(),
latest_chain_tip,
};
let r = setup_tx.send(setup_data);
// We can't expect or unwrap because the returned Result does not implement Debug
assert!(r.is_ok(), "unexpected setup channel send failure");
let sync_gossip_task_handle = tokio::spawn(sync::gossip_best_tip_block_hashes(
sync_status.clone(),
chain_tip_change,
peer_set.clone(),
));
let tx_gossip_task_handle = tokio::spawn(gossip_mempool_transaction_id(
transaction_receiver,
peer_set.clone(),
));
// Make sure there is an additional request broadcasting the
// committed blocks to peers.
//
// (The genesis block gets skipped, because block 1 is committed before the task is spawned.)
for block in committed_blocks.iter().skip(1) {
peer_set
.expect_request(Request::AdvertiseBlock(block.hash()))
.await
.respond(Response::Nil);
}
(
inbound_service,
mempool_service,
committed_blocks,
added_transactions,
mock_tx_verifier,
peer_set,
state_service,
sync_gossip_task_handle,
tx_gossip_task_handle,
)
}
/// Manually add a transaction to the mempool storage.
///
/// Skips some mempool functionality, like transaction verification and peer broadcasts.
fn add_some_stuff_to_mempool(
mempool_service: &mut Mempool,
network: Network,
) -> Vec<VerifiedUnminedTx> {
// get the genesis block coinbase transaction from the Zcash blockchain.
let genesis_transactions: Vec<_> = unmined_transactions_in_blocks(..=0, network)
.take(1)
.collect();
// Insert the genesis block coinbase transaction into the mempool storage.
mempool_service
.storage()
.insert(genesis_transactions[0].clone())
.unwrap();
genesis_transactions
}
mod fake_peer_set;
mod real_peer_set;

View File

@ -0,0 +1,848 @@
//! Inbound service tests with a fake peer set.
use std::{
collections::HashSet,
iter::{self, FromIterator},
net::SocketAddr,
str::FromStr,
sync::Arc,
time::Duration,
};
use futures::FutureExt;
use tokio::{sync::oneshot, task::JoinHandle};
use tower::{buffer::Buffer, builder::ServiceBuilder, util::BoxService, Service, ServiceExt};
use tracing::Span;
use zebra_chain::{
amount::Amount,
block::Block,
parameters::Network,
serialization::ZcashDeserializeInto,
transaction::{UnminedTx, UnminedTxId, VerifiedUnminedTx},
};
use zebra_consensus::{error::TransactionError, transaction, Config as ConsensusConfig};
use zebra_network::{AddressBook, Request, Response, ResponseStatus};
use zebra_state::Config as StateConfig;
use zebra_test::mock_service::{MockService, PanicAssertion};
use crate::{
components::{
inbound::{Inbound, InboundSetupData},
mempool::{self, gossip_mempool_transaction_id, unmined_transactions_in_blocks, Mempool},
sync::{self, BlockGossipError, SyncStatus},
},
BoxError,
};
use ResponseStatus::*;
/// Maximum time to wait for a network service request.
///
/// The default [`MockService`] value can be too short for some of these tests that take a little
/// longer than expected to actually send the network request.
///
/// Increasing this value causes the tests to take longer to complete, so it can't be too large.
const MAX_PEER_SET_REQUEST_DELAY: Duration = Duration::from_millis(500);
#[tokio::test]
async fn mempool_requests_for_transactions() {
let (
inbound_service,
_mempool_guard,
_committed_blocks,
added_transactions,
_mock_tx_verifier,
mut peer_set,
_state_guard,
sync_gossip_task_handle,
tx_gossip_task_handle,
) = setup(true).await;
let added_transactions: Vec<UnminedTx> = added_transactions
.iter()
.map(|t| t.transaction.clone())
.collect();
let added_transaction_ids: Vec<UnminedTxId> = added_transactions.iter().map(|t| t.id).collect();
// Test `Request::MempoolTransactionIds`
let response = inbound_service
.clone()
.oneshot(Request::MempoolTransactionIds)
.await;
match response {
Ok(Response::TransactionIds(response)) => assert_eq!(response, added_transaction_ids),
_ => unreachable!(
"`MempoolTransactionIds` requests should always respond `Ok(Vec<UnminedTxId>)`, got {:?}",
response
),
};
// Test `Request::TransactionsById`
let hash_set = added_transaction_ids
.iter()
.copied()
.collect::<HashSet<_>>();
let response = inbound_service
.clone()
.oneshot(Request::TransactionsById(hash_set))
.await;
match response {
Ok(Response::Transactions(response)) => {
assert_eq!(
response,
added_transactions
.into_iter()
.map(Available)
.collect::<Vec<_>>(),
)
}
_ => unreachable!("`TransactionsById` requests should always respond `Ok(Vec<UnminedTx>)`"),
};
// check that nothing unexpected happened
peer_set.expect_no_requests().await;
let sync_gossip_result = sync_gossip_task_handle.now_or_never();
assert!(
matches!(sync_gossip_result, None),
"unexpected error or panic in sync gossip task: {:?}",
sync_gossip_result,
);
let tx_gossip_result = tx_gossip_task_handle.now_or_never();
assert!(
matches!(tx_gossip_result, None),
"unexpected error or panic in transaction gossip task: {:?}",
tx_gossip_result,
);
}
#[tokio::test]
async fn mempool_push_transaction() -> Result<(), crate::BoxError> {
// get a block that has at least one non coinbase transaction
let block: Arc<Block> =
zebra_test::vectors::BLOCK_MAINNET_982681_BYTES.zcash_deserialize_into()?;
// use the first transaction that is not coinbase
let tx = block.transactions[1].clone();
let (
inbound_service,
_mempool_guard,
_committed_blocks,
_added_transactions,
mut tx_verifier,
mut peer_set,
_state_guard,
sync_gossip_task_handle,
tx_gossip_task_handle,
) = setup(false).await;
// Test `Request::PushTransaction`
let request = inbound_service
.clone()
.oneshot(Request::PushTransaction(tx.clone().into()));
// Simulate a successful transaction verification
let verification = tx_verifier.expect_request_that(|_| true).map(|responder| {
let transaction = responder
.request()
.clone()
.into_mempool_transaction()
.expect("unexpected non-mempool request");
// Set a dummy fee.
responder.respond(transaction::Response::from(VerifiedUnminedTx::new(
transaction,
Amount::zero(),
)));
});
let (response, _) = futures::join!(request, verification);
match response {
Ok(Response::Nil) => (),
_ => unreachable!("`PushTransaction` requests should always respond `Ok(Nil)`"),
};
// Use `Request::MempoolTransactionIds` to check the transaction was inserted to mempool
let request = inbound_service
.clone()
.oneshot(Request::MempoolTransactionIds)
.await;
match request {
Ok(Response::TransactionIds(response)) => assert_eq!(response, vec![tx.unmined_id()]),
_ => unreachable!(
"`MempoolTransactionIds` requests should always respond `Ok(Vec<UnminedTxId>)`"
),
};
// Make sure there is an additional request broadcasting the
// inserted transaction to peers.
let mut hs = HashSet::new();
hs.insert(tx.unmined_id());
peer_set
.expect_request(Request::AdvertiseTransactionIds(hs))
.await
.respond(Response::Nil);
let sync_gossip_result = sync_gossip_task_handle.now_or_never();
assert!(
matches!(sync_gossip_result, None),
"unexpected error or panic in sync gossip task: {:?}",
sync_gossip_result,
);
let tx_gossip_result = tx_gossip_task_handle.now_or_never();
assert!(
matches!(tx_gossip_result, None),
"unexpected error or panic in transaction gossip task: {:?}",
tx_gossip_result,
);
Ok(())
}
#[tokio::test]
async fn mempool_advertise_transaction_ids() -> Result<(), crate::BoxError> {
// get a block that has at least one non coinbase transaction
let block: Block = zebra_test::vectors::BLOCK_MAINNET_982681_BYTES.zcash_deserialize_into()?;
// use the first transaction that is not coinbase
let test_transaction = block
.transactions
.into_iter()
.find(|tx| !tx.has_any_coinbase_inputs())
.expect("at least one non-coinbase transaction");
let test_transaction_id = test_transaction.unmined_id();
let txs = HashSet::from_iter([test_transaction_id]);
let (
inbound_service,
_mempool_guard,
_committed_blocks,
_added_transactions,
mut tx_verifier,
mut peer_set,
_state_guard,
sync_gossip_task_handle,
tx_gossip_task_handle,
) = setup(false).await;
// Test `Request::AdvertiseTransactionIds`
let request = inbound_service
.clone()
.oneshot(Request::AdvertiseTransactionIds(txs.clone()));
// Ensure the mocked peer set responds
let peer_set_responder =
peer_set
.expect_request(Request::TransactionsById(txs))
.map(|responder| {
let unmined_transaction = UnminedTx::from(test_transaction.clone());
responder.respond(Response::Transactions(vec![Available(unmined_transaction)]))
});
// Simulate a successful transaction verification
let verification = tx_verifier.expect_request_that(|_| true).map(|responder| {
let transaction = responder
.request()
.clone()
.into_mempool_transaction()
.expect("unexpected non-mempool request");
// Set a dummy fee.
responder.respond(transaction::Response::from(VerifiedUnminedTx::new(
transaction,
Amount::zero(),
)));
});
let (response, _, _) = futures::join!(request, peer_set_responder, verification);
match response {
Ok(Response::Nil) => (),
_ => unreachable!("`AdvertiseTransactionIds` requests should always respond `Ok(Nil)`"),
};
// Use `Request::MempoolTransactionIds` to check the transaction was inserted to mempool
let request = inbound_service
.clone()
.oneshot(Request::MempoolTransactionIds)
.await;
match request {
Ok(Response::TransactionIds(response)) => {
assert_eq!(response, vec![test_transaction_id])
}
_ => unreachable!(
"`MempoolTransactionIds` requests should always respond `Ok(Vec<UnminedTxId>)`"
),
};
// Make sure there is an additional request broadcasting the
// inserted transaction to peers.
let mut hs = HashSet::new();
hs.insert(test_transaction.unmined_id());
peer_set
.expect_request(Request::AdvertiseTransactionIds(hs))
.await
.respond(Response::Nil);
let sync_gossip_result = sync_gossip_task_handle.now_or_never();
assert!(
matches!(sync_gossip_result, None),
"unexpected error or panic in sync gossip task: {:?}",
sync_gossip_result,
);
let tx_gossip_result = tx_gossip_task_handle.now_or_never();
assert!(
matches!(tx_gossip_result, None),
"unexpected error or panic in transaction gossip task: {:?}",
tx_gossip_result,
);
Ok(())
}
#[tokio::test]
async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> {
// Get a block that has at least one non coinbase transaction
let block: Block = zebra_test::vectors::BLOCK_MAINNET_982681_BYTES.zcash_deserialize_into()?;
// Use the first transaction that is not coinbase to test expiration
let tx1 = &*(block.transactions[1]).clone();
let mut tx1_id = tx1.unmined_id();
// Change the expiration height of the transaction to block 3
let mut tx1 = tx1.clone();
*tx1.expiry_height_mut() = zebra_chain::block::Height(3);
// Use the second transaction that is not coinbase to trigger `remove_expired_transactions()`
let tx2 = block.transactions[2].clone();
let mut tx2_id = tx2.unmined_id();
// Get services
let (
inbound_service,
mempool,
_committed_blocks,
_added_transactions,
mut tx_verifier,
mut peer_set,
state_service,
sync_gossip_task_handle,
tx_gossip_task_handle,
) = setup(false).await;
// Push test transaction
let request = inbound_service
.clone()
.oneshot(Request::PushTransaction(tx1.clone().into()));
// Simulate a successful transaction verification
let verification = tx_verifier.expect_request_that(|_| true).map(|responder| {
tx1_id = responder.request().tx_id();
let transaction = responder
.request()
.clone()
.into_mempool_transaction()
.expect("unexpected non-mempool request");
// Set a dummy fee.
responder.respond(transaction::Response::from(VerifiedUnminedTx::new(
transaction,
Amount::zero(),
)));
});
let (response, _) = futures::join!(request, verification);
match response {
Ok(Response::Nil) => (),
_ => unreachable!("`PushTransaction` requests should always respond `Ok(Nil)`"),
};
// Use `Request::MempoolTransactionIds` to check the transaction was inserted to mempool
let request = inbound_service
.clone()
.oneshot(Request::MempoolTransactionIds)
.await;
match request {
Ok(Response::TransactionIds(response)) => {
assert_eq!(response, vec![tx1_id])
}
_ => unreachable!(
"`MempoolTransactionIds` requests should always respond `Ok(Vec<UnminedTxId>)`"
),
};
// Add a new block to the state (make the chain tip advance)
let block_two: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_2_BYTES
.zcash_deserialize_into()
.unwrap();
state_service
.clone()
.oneshot(zebra_state::Request::CommitFinalizedBlock(
block_two.clone().into(),
))
.await
.unwrap();
// Test transaction 1 is gossiped
let mut hs = HashSet::new();
hs.insert(tx1_id);
peer_set
.expect_request(Request::AdvertiseTransactionIds(hs))
.await
.respond(Response::Nil);
// Block is gossiped then
peer_set
.expect_request(Request::AdvertiseBlock(block_two.hash()))
.await
.respond(Response::Nil);
// Make sure tx1 is still in the mempool as it is not expired yet.
let request = inbound_service
.clone()
.oneshot(Request::MempoolTransactionIds)
.await;
match request {
Ok(Response::TransactionIds(response)) => {
assert_eq!(response, vec![tx1_id])
}
_ => unreachable!(
"`MempoolTransactionIds` requests should always respond `Ok(Vec<UnminedTxId>)`"
),
};
// As our test transaction will expire at a block height greater or equal to 3 we need to push block 3.
let block_three: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_3_BYTES
.zcash_deserialize_into()
.unwrap();
state_service
.clone()
.oneshot(zebra_state::Request::CommitFinalizedBlock(
block_three.clone().into(),
))
.await
.unwrap();
// Block is gossiped
peer_set
.expect_request(Request::AdvertiseBlock(block_three.hash()))
.await
.respond(Response::Nil);
// Push a second transaction to trigger `remove_expired_transactions()`
let request = inbound_service
.clone()
.oneshot(Request::PushTransaction(tx2.clone().into()));
// Simulate a successful transaction verification
let verification = tx_verifier.expect_request_that(|_| true).map(|responder| {
tx2_id = responder.request().tx_id();
let transaction = responder
.request()
.clone()
.into_mempool_transaction()
.expect("unexpected non-mempool request");
// Set a dummy fee.
responder.respond(transaction::Response::from(VerifiedUnminedTx::new(
transaction,
Amount::zero(),
)));
});
let (response, _) = futures::join!(request, verification);
match response {
Ok(Response::Nil) => (),
_ => unreachable!("`PushTransaction` requests should always respond `Ok(Nil)`"),
};
// Use `Request::MempoolTransactionIds` to check the transaction was inserted to mempool
let request = inbound_service
.clone()
.oneshot(Request::MempoolTransactionIds)
.await;
// Only tx2 will be in the mempool while tx1 was expired
match request {
Ok(Response::TransactionIds(response)) => {
assert_eq!(response, vec![tx2_id])
}
_ => unreachable!(
"`MempoolTransactionIds` requests should always respond `Ok(Vec<UnminedTxId>)`"
),
};
// Check if tx1 was added to the rejected list as well
let response = mempool
.clone()
.oneshot(mempool::Request::Queue(vec![tx1_id.into()]))
.await
.unwrap();
let queued_responses = match response {
mempool::Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert_eq!(
queued_responses[0],
Err(mempool::MempoolError::StorageEffectsChain(
mempool::SameEffectsChainRejectionError::Expired
))
);
// Test transaction 2 is gossiped
let mut hs = HashSet::new();
hs.insert(tx2_id);
peer_set
.expect_request(Request::AdvertiseTransactionIds(hs))
.await
.respond(Response::Nil);
// Add all the rest of the continuous blocks we have to test tx2 will never expire.
let more_blocks: Vec<Arc<Block>> = vec![
zebra_test::vectors::BLOCK_MAINNET_4_BYTES
.zcash_deserialize_into()
.unwrap(),
zebra_test::vectors::BLOCK_MAINNET_5_BYTES
.zcash_deserialize_into()
.unwrap(),
zebra_test::vectors::BLOCK_MAINNET_6_BYTES
.zcash_deserialize_into()
.unwrap(),
zebra_test::vectors::BLOCK_MAINNET_7_BYTES
.zcash_deserialize_into()
.unwrap(),
zebra_test::vectors::BLOCK_MAINNET_8_BYTES
.zcash_deserialize_into()
.unwrap(),
zebra_test::vectors::BLOCK_MAINNET_9_BYTES
.zcash_deserialize_into()
.unwrap(),
zebra_test::vectors::BLOCK_MAINNET_10_BYTES
.zcash_deserialize_into()
.unwrap(),
];
for block in more_blocks {
state_service
.clone()
.oneshot(zebra_state::Request::CommitFinalizedBlock(
block.clone().into(),
))
.await
.unwrap();
// Block is gossiped
peer_set
.expect_request(Request::AdvertiseBlock(block.hash()))
.await
.respond(Response::Nil);
let request = inbound_service
.clone()
.oneshot(Request::MempoolTransactionIds)
.await;
// tx2 is still in the mempool as the blockchain progress because the zero expiration height
match request {
Ok(Response::TransactionIds(response)) => {
assert_eq!(response, vec![tx2_id])
}
_ => unreachable!(
"`MempoolTransactionIds` requests should always respond `Ok(Vec<UnminedTxId>)`"
),
};
}
// check that nothing unexpected happened
peer_set.expect_no_requests().await;
let sync_gossip_result = sync_gossip_task_handle.now_or_never();
assert!(
matches!(sync_gossip_result, None),
"unexpected error or panic in sync gossip task: {:?}",
sync_gossip_result,
);
let tx_gossip_result = tx_gossip_task_handle.now_or_never();
assert!(
matches!(tx_gossip_result, None),
"unexpected error or panic in transaction gossip task: {:?}",
tx_gossip_result,
);
Ok(())
}
/// Test that the inbound downloader rejects blocks above the lookahead limit.
///
/// TODO: also test that it rejects blocks behind the tip limit. (Needs ~100 fake blocks.)
#[tokio::test]
async fn inbound_block_height_lookahead_limit() -> Result<(), crate::BoxError> {
// Get services
let (
inbound_service,
_mempool,
_committed_blocks,
_added_transactions,
mut tx_verifier,
mut peer_set,
state_service,
sync_gossip_task_handle,
tx_gossip_task_handle,
) = setup(false).await;
// Get the next block
let block: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_2_BYTES.zcash_deserialize_into()?;
let block_hash = block.hash();
// Push test block hash
let _request = inbound_service
.clone()
.oneshot(Request::AdvertiseBlock(block_hash))
.await?;
// Block is fetched, and committed to the state
peer_set
.expect_request(Request::BlocksByHash(iter::once(block_hash).collect()))
.await
.respond(Response::Blocks(vec![Available(block)]));
// TODO: check that the block is queued in the checkpoint verifier
// check that nothing unexpected happened
peer_set.expect_no_requests().await;
tx_verifier.expect_no_requests().await;
// Get a block that is a long way away from genesis
let block: Arc<Block> =
zebra_test::vectors::BLOCK_MAINNET_982681_BYTES.zcash_deserialize_into()?;
let block_hash = block.hash();
// Push test block hash
let _request = inbound_service
.clone()
.oneshot(Request::AdvertiseBlock(block_hash))
.await?;
// Block is fetched, but the downloader drops it because it is too high
peer_set
.expect_request(Request::BlocksByHash(iter::once(block_hash).collect()))
.await
.respond(Response::Blocks(vec![Available(block)]));
let response = state_service
.clone()
.oneshot(zebra_state::Request::Depth(block_hash))
.await?;
assert_eq!(response, zebra_state::Response::Depth(None));
// TODO: check that the block is not queued in the checkpoint verifier or non-finalized state
// check that nothing unexpected happened
peer_set.expect_no_requests().await;
tx_verifier.expect_no_requests().await;
let sync_gossip_result = sync_gossip_task_handle.now_or_never();
assert!(
matches!(sync_gossip_result, None),
"unexpected error or panic in sync gossip task: {:?}",
sync_gossip_result,
);
let tx_gossip_result = tx_gossip_task_handle.now_or_never();
assert!(
matches!(tx_gossip_result, None),
"unexpected error or panic in transaction gossip task: {:?}",
tx_gossip_result,
);
Ok(())
}
/// Setup a fake Zebra network stack, with fake peer set.
///
/// Adds some initial state blocks, and mempool transactions if `add_transactions` is true.
///
/// Uses a real block verifier, but a fake transaction verifier.
/// Does not run a block syncer task.
async fn setup(
add_transactions: bool,
) -> (
Buffer<
BoxService<zebra_network::Request, zebra_network::Response, BoxError>,
zebra_network::Request,
>,
Buffer<BoxService<mempool::Request, mempool::Response, BoxError>, mempool::Request>,
Vec<Arc<Block>>,
Vec<VerifiedUnminedTx>,
MockService<transaction::Request, transaction::Response, PanicAssertion, TransactionError>,
MockService<Request, Response, PanicAssertion>,
Buffer<BoxService<zebra_state::Request, zebra_state::Response, BoxError>, zebra_state::Request>,
JoinHandle<Result<(), BlockGossipError>>,
JoinHandle<Result<(), BoxError>>,
) {
zebra_test::init();
let network = Network::Mainnet;
let consensus_config = ConsensusConfig::default();
let state_config = StateConfig::ephemeral();
let address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Span::none());
let address_book = Arc::new(std::sync::Mutex::new(address_book));
let (sync_status, mut recent_syncs) = SyncStatus::new();
let (state, latest_chain_tip, chain_tip_change) =
zebra_state::init(state_config.clone(), network);
let mut state_service = ServiceBuilder::new().buffer(1).service(state);
// Download task panics and timeouts are propagated to the tests that use Groth16 verifiers.
let (block_verifier, _transaction_verifier, _groth16_download_handle) =
zebra_consensus::chain::init(
consensus_config.clone(),
network,
state_service.clone(),
true,
)
.await;
let mut peer_set = MockService::build()
.with_max_request_delay(MAX_PEER_SET_REQUEST_DELAY)
.for_unit_tests();
let buffered_peer_set = Buffer::new(BoxService::new(peer_set.clone()), 10);
let mock_tx_verifier = MockService::build().for_unit_tests();
let buffered_tx_verifier = Buffer::new(BoxService::new(mock_tx_verifier.clone()), 10);
let mut committed_blocks = Vec::new();
// Push the genesis block to the state.
// This must be done before creating the mempool to avoid `chain_tip_change`
// returning "reset" which would clear the mempool.
let genesis_block: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES
.zcash_deserialize_into()
.unwrap();
state_service
.ready()
.await
.unwrap()
.call(zebra_state::Request::CommitFinalizedBlock(
genesis_block.clone().into(),
))
.await
.unwrap();
committed_blocks.push(genesis_block);
// Also push block 1.
// Block one is a network upgrade and the mempool will be cleared at it,
// let all our tests start after this event.
let block_one: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_1_BYTES
.zcash_deserialize_into()
.unwrap();
state_service
.clone()
.oneshot(zebra_state::Request::CommitFinalizedBlock(
block_one.clone().into(),
))
.await
.unwrap();
committed_blocks.push(block_one);
let (mut mempool_service, transaction_receiver) = Mempool::new(
&mempool::Config::default(),
buffered_peer_set.clone(),
state_service.clone(),
buffered_tx_verifier.clone(),
sync_status.clone(),
latest_chain_tip.clone(),
chain_tip_change.clone(),
);
// Enable the mempool
mempool_service.enable(&mut recent_syncs).await;
// Add transactions to the mempool, skipping verification and broadcast
let mut added_transactions = Vec::new();
if add_transactions {
added_transactions.extend(add_some_stuff_to_mempool(&mut mempool_service, network));
}
let mempool_service = BoxService::new(mempool_service);
let mempool_service = ServiceBuilder::new().buffer(1).service(mempool_service);
let (setup_tx, setup_rx) = oneshot::channel();
let inbound_service = ServiceBuilder::new()
.load_shed()
.service(Inbound::new(setup_rx));
let inbound_service = BoxService::new(inbound_service);
let inbound_service = ServiceBuilder::new().buffer(1).service(inbound_service);
let setup_data = InboundSetupData {
address_book,
block_download_peer_set: buffered_peer_set,
block_verifier,
mempool: mempool_service.clone(),
state: state_service.clone(),
latest_chain_tip,
};
let r = setup_tx.send(setup_data);
// We can't expect or unwrap because the returned Result does not implement Debug
assert!(r.is_ok(), "unexpected setup channel send failure");
let sync_gossip_task_handle = tokio::spawn(sync::gossip_best_tip_block_hashes(
sync_status.clone(),
chain_tip_change,
peer_set.clone(),
));
let tx_gossip_task_handle = tokio::spawn(gossip_mempool_transaction_id(
transaction_receiver,
peer_set.clone(),
));
// Make sure there is an additional request broadcasting the
// committed blocks to peers.
//
// (The genesis block gets skipped, because block 1 is committed before the task is spawned.)
for block in committed_blocks.iter().skip(1) {
peer_set
.expect_request(Request::AdvertiseBlock(block.hash()))
.await
.respond(Response::Nil);
}
(
inbound_service,
mempool_service,
committed_blocks,
added_transactions,
mock_tx_verifier,
peer_set,
state_service,
sync_gossip_task_handle,
tx_gossip_task_handle,
)
}
/// Manually add a transaction to the mempool storage.
///
/// Skips some mempool functionality, like transaction verification and peer broadcasts.
fn add_some_stuff_to_mempool(
mempool_service: &mut Mempool,
network: Network,
) -> Vec<VerifiedUnminedTx> {
// get the genesis block coinbase transaction from the Zcash blockchain.
let genesis_transactions: Vec<_> = unmined_transactions_in_blocks(..=0, network)
.take(1)
.collect();
// Insert the genesis block coinbase transaction into the mempool storage.
mempool_service
.storage()
.insert(genesis_transactions[0].clone())
.unwrap();
genesis_transactions
}

View File

@ -0,0 +1,498 @@
//! Inbound service tests with a real peer set.
use std::{collections::HashSet, iter, net::SocketAddr, sync::Arc};
use futures::FutureExt;
use tokio::{sync::oneshot, task::JoinHandle};
use tower::{
buffer::Buffer,
builder::ServiceBuilder,
util::{BoxCloneService, BoxService},
ServiceExt,
};
use zebra_chain::{
block::{self, Block},
parameters::Network,
transaction::{AuthDigest, Hash as TxHash, UnminedTxId, WtxId},
};
use zebra_consensus::{chain::VerifyChainError, error::TransactionError, transaction};
use zebra_network::{
connect_isolated_tcp_direct, Config as NetworkConfig, Request, Response, ResponseStatus,
SharedPeerError,
};
use zebra_state::Config as StateConfig;
use zebra_test::mock_service::{MockService, PanicAssertion};
use crate::{
components::{
inbound::{Inbound, InboundSetupData},
mempool::{self, gossip_mempool_transaction_id, Mempool},
sync::{self, BlockGossipError, SyncStatus},
},
BoxError,
};
use ResponseStatus::*;
/// Check that a network stack with an empty address book only contains the local listener port,
/// by querying the inbound service via a local TCP connection.
///
/// Uses a real Zebra network stack with a local listener address,
/// and an isolated Zebra inbound TCP connection.
#[tokio::test]
async fn inbound_peers_empty_address_book() -> Result<(), crate::BoxError> {
let (
// real services
connected_peer_service,
inbound_service,
_peer_set,
_mempool_service,
_state_service,
// mocked services
_mock_block_verifier,
_mock_tx_verifier,
// real tasks
block_gossip_task_handle,
tx_gossip_task_handle,
// real open socket addresses
listen_addr,
) = setup().await;
// Use inbound directly
let request = inbound_service.clone().oneshot(Request::Peers);
let response = request.await;
match response.as_ref() {
Ok(Response::Peers(single_peer)) if single_peer.len() == 1 => {
assert_eq!(single_peer.first().unwrap().addr(), listen_addr)
}
Ok(Response::Peers(_peer_list)) => unreachable!(
"`Peers` response should contain a single peer, \
actual result: {:?}",
response
),
_ => unreachable!(
"`Peers` requests should always respond `Ok(Response::Peers(_))`, \
actual result: {:?}",
response
),
};
// Use the connected peer via a local TCP connection
let request = connected_peer_service.clone().oneshot(Request::Peers);
let response = request.await;
match response.as_ref() {
Ok(Response::Peers(single_peer)) if single_peer.len() == 1 => {
assert_eq!(single_peer.first().unwrap().addr(), listen_addr)
}
Ok(Response::Peers(_peer_list)) => unreachable!(
"`Peers` response should contain a single peer, \
actual result: {:?}",
response
),
_ => unreachable!(
"`Peers` requests should always respond `Ok(Response::Peers(_))`, \
actual result: {:?}",
response
),
};
let block_gossip_result = block_gossip_task_handle.now_or_never();
assert!(
matches!(block_gossip_result, None),
"unexpected error or panic in block gossip task: {:?}",
block_gossip_result,
);
let tx_gossip_result = tx_gossip_task_handle.now_or_never();
assert!(
matches!(tx_gossip_result, None),
"unexpected error or panic in transaction gossip task: {:?}",
tx_gossip_result,
);
Ok(())
}
/// Check that a network stack with an empty state responds to block requests with `notfound`.
///
/// Uses a real Zebra network stack, with an isolated Zebra inbound TCP connection.
#[tokio::test]
async fn inbound_block_empty_state_notfound() -> Result<(), crate::BoxError> {
let (
// real services
connected_peer_service,
inbound_service,
_peer_set,
_mempool_service,
_state_service,
// mocked services
_mock_block_verifier,
_mock_tx_verifier,
// real tasks
block_gossip_task_handle,
tx_gossip_task_handle,
// real open socket addresses
_listen_addr,
) = setup().await;
let test_block = block::Hash([0x11; 32]);
// Use inbound directly
let request = inbound_service
.clone()
.oneshot(Request::BlocksByHash(iter::once(test_block).collect()));
let response = request.await;
match response.as_ref() {
Ok(Response::Blocks(single_block)) if single_block.len() == 1 => {
assert_eq!(single_block.first().unwrap(), &Missing(test_block));
}
Ok(Response::Blocks(_block_list)) => unreachable!(
"`BlocksByHash` response should contain a single block, \
actual result: {:?}",
response
),
_ => unreachable!(
"inbound service should respond to `BlocksByHash` with `Ok(Response::Blocks(_))`, \
actual result: {:?}",
response
),
};
// Use the connected peer via a local TCP connection
let request = connected_peer_service
.clone()
.oneshot(Request::BlocksByHash(iter::once(test_block).collect()));
let response = request.await;
match response.as_ref() {
Err(missing_error) => {
let missing_error = missing_error
.downcast_ref::<SharedPeerError>()
.expect("unexpected inner error type, expected SharedPeerError");
assert_eq!(
missing_error.inner_debug(),
"NotFound([Block(block::Hash(\"1111111111111111111111111111111111111111111111111111111111111111\"))])"
);
}
_ => unreachable!(
"peer::Connection should map missing `BlocksByHash` responses as `Err(SharedPeerError(NotFound(_)))`, \
actual result: {:?}",
response
),
};
let block_gossip_result = block_gossip_task_handle.now_or_never();
assert!(
matches!(block_gossip_result, None),
"unexpected error or panic in block gossip task: {:?}",
block_gossip_result,
);
let tx_gossip_result = tx_gossip_task_handle.now_or_never();
assert!(
matches!(tx_gossip_result, None),
"unexpected error or panic in transaction gossip task: {:?}",
tx_gossip_result,
);
Ok(())
}
/// Check that a network stack with an empty state responds to single transaction requests with `notfound`.
///
/// Uses a real Zebra network stack, with an isolated Zebra inbound TCP connection.
///
/// TODO: test a response with some Available and some Missing transactions.
#[tokio::test]
async fn inbound_tx_empty_state_notfound() -> Result<(), crate::BoxError> {
let (
// real services
connected_peer_service,
inbound_service,
_peer_set,
_mempool_service,
_state_service,
// mocked services
_mock_block_verifier,
_mock_tx_verifier,
// real tasks
block_gossip_task_handle,
tx_gossip_task_handle,
// real open socket addresses
_listen_addr,
) = setup().await;
let test_tx = UnminedTxId::from_legacy_id(TxHash([0x22; 32]));
let test_wtx: UnminedTxId = WtxId {
id: TxHash([0x33; 32]),
auth_digest: AuthDigest([0x44; 32]),
}
.into();
// Test both transaction ID variants, separately and together
for txs in [vec![test_tx], vec![test_wtx], vec![test_tx, test_wtx]] {
// Use inbound directly
let request = inbound_service
.clone()
.oneshot(Request::TransactionsById(txs.iter().copied().collect()));
let response = request.await;
match response.as_ref() {
Ok(Response::Transactions(response_txs)) => {
// The response order is unstable, because it depends on concurrent inbound futures.
// In #2244 we will fix this by replacing response Vecs with HashSets.
for tx in &txs {
assert!(
response_txs.contains(&Missing(*tx)),
"expected {:?}, but it was not in the response",
tx
);
}
assert_eq!(response_txs.len(), txs.len());
}
_ => unreachable!(
"inbound service should respond to `TransactionsById` with `Ok(Response::Transactions(_))`, \
actual result: {:?}",
response
),
};
// Use the connected peer via a local TCP connection
let request = connected_peer_service
.clone()
.oneshot(Request::TransactionsById(txs.iter().copied().collect()));
let response = request.await;
match response.as_ref() {
Err(missing_error) => {
let missing_error = missing_error
.downcast_ref::<SharedPeerError>()
.expect("unexpected inner error type, expected SharedPeerError");
// Unfortunately, we can't access SharedPeerError's inner type,
// so we can't compare the actual responses.
if txs == vec![test_tx] {
assert_eq!(
missing_error.inner_debug(),
"NotFound([Tx(transaction::Hash(\"2222222222222222222222222222222222222222222222222222222222222222\"))])",
);
} else if txs == vec![test_wtx] {
assert_eq!(
missing_error.inner_debug(),
"NotFound([Wtx(WtxId { id: transaction::Hash(\"3333333333333333333333333333333333333333333333333333333333333333\"), auth_digest: AuthDigest(\"4444444444444444444444444444444444444444444444444444444444444444\") })])",
);
} else if txs == vec![test_tx, test_wtx] {
// The response order is unstable, because it depends on concurrent inbound futures.
// In #2244 we will fix this by replacing response Vecs with HashSets.
assert!(
missing_error.inner_debug() ==
"NotFound([Tx(transaction::Hash(\"2222222222222222222222222222222222222222222222222222222222222222\")), Wtx(WtxId { id: transaction::Hash(\"3333333333333333333333333333333333333333333333333333333333333333\"), auth_digest: AuthDigest(\"4444444444444444444444444444444444444444444444444444444444444444\") })])"
||
missing_error.inner_debug() ==
"NotFound([Wtx(WtxId { id: transaction::Hash(\"3333333333333333333333333333333333333333333333333333333333333333\"), auth_digest: AuthDigest(\"4444444444444444444444444444444444444444444444444444444444444444\") }), Tx(transaction::Hash(\"2222222222222222222222222222222222222222222222222222222222222222\"))])",
"unexpected response: {:?}",
missing_error.inner_debug(),
);
} else {
unreachable!("unexpected test case");
}
}
_ => unreachable!(
"peer::Connection should map missing `TransactionsById` responses as `Err(SharedPeerError(NotFound(_)))`, \
actual result: {:?}",
response
),
};
}
let block_gossip_result = block_gossip_task_handle.now_or_never();
assert!(
matches!(block_gossip_result, None),
"unexpected error or panic in block gossip task: {:?}",
block_gossip_result,
);
let tx_gossip_result = tx_gossip_task_handle.now_or_never();
assert!(
matches!(tx_gossip_result, None),
"unexpected error or panic in transaction gossip task: {:?}",
tx_gossip_result,
);
Ok(())
}
/// Setup a real Zebra network stack, with a connected peer using a real isolated network stack.
///
/// Uses fake verifiers, and does not run a block syncer task.
async fn setup() -> (
// real services
// connected peer
Buffer<
BoxService<zebra_network::Request, zebra_network::Response, BoxError>,
zebra_network::Request,
>,
// inbound service
BoxCloneService<zebra_network::Request, zebra_network::Response, BoxError>,
// outbound peer set (only has the connected peer)
Buffer<
BoxService<zebra_network::Request, zebra_network::Response, BoxError>,
zebra_network::Request,
>,
Buffer<BoxService<mempool::Request, mempool::Response, BoxError>, mempool::Request>,
Buffer<BoxService<zebra_state::Request, zebra_state::Response, BoxError>, zebra_state::Request>,
// mocked services
MockService<Arc<Block>, block::Hash, PanicAssertion, VerifyChainError>,
MockService<transaction::Request, transaction::Response, PanicAssertion, TransactionError>,
// real tasks
JoinHandle<Result<(), BlockGossipError>>,
JoinHandle<Result<(), BoxError>>,
// real open socket addresses
SocketAddr,
) {
zebra_test::init();
let network = Network::Mainnet;
// Open a listener on an unused IPv4 localhost port
let config_listen_addr = "127.0.0.1:0".parse().unwrap();
// Inbound
let (setup_tx, setup_rx) = oneshot::channel();
let inbound_service = Inbound::new(setup_rx);
let inbound_service = ServiceBuilder::new()
.boxed_clone()
.load_shed()
.buffer(10)
.service(inbound_service);
// State
let state_config = StateConfig::ephemeral();
let (state_service, latest_chain_tip, chain_tip_change) =
zebra_state::init(state_config, network);
let state_service = ServiceBuilder::new().buffer(10).service(state_service);
// Network
let network_config = NetworkConfig {
network,
listen_addr: config_listen_addr,
// Stop Zebra making outbound connections
initial_mainnet_peers: HashSet::new(),
initial_testnet_peers: HashSet::new(),
..NetworkConfig::default()
};
let (mut peer_set, address_book) = zebra_network::init(
network_config,
inbound_service.clone(),
latest_chain_tip.clone(),
)
.await;
// Inbound listener
let listen_addr = address_book
.lock()
.unwrap()
.local_listener_meta_addr()
.addr();
assert_ne!(
listen_addr.port(),
0,
"dynamic ports are replaced with OS-assigned ports"
);
assert_eq!(
listen_addr.ip(),
config_listen_addr.ip(),
"IP addresses are correctly propagated"
);
// Fake syncer
let (sync_status, mut recent_syncs) = SyncStatus::new();
// Fake verifiers
let mock_block_verifier = MockService::build().for_unit_tests();
let buffered_block_verifier = ServiceBuilder::new()
.buffer(10)
.service(BoxService::new(mock_block_verifier.clone()));
let mock_tx_verifier = MockService::build().for_unit_tests();
let buffered_tx_verifier = ServiceBuilder::new()
.buffer(10)
.service(BoxService::new(mock_tx_verifier.clone()));
// Mempool
let mempool_config = mempool::Config::default();
let (mut mempool_service, transaction_receiver) = Mempool::new(
&mempool_config,
peer_set.clone(),
state_service.clone(),
buffered_tx_verifier.clone(),
sync_status.clone(),
latest_chain_tip.clone(),
chain_tip_change.clone(),
);
// Enable the mempool
mempool_service.enable(&mut recent_syncs).await;
let mempool_service = ServiceBuilder::new()
.buffer(10)
.boxed()
// boxed() needs this extra tiny buffer
.buffer(1)
.service(mempool_service);
// Initialize the inbound service
let setup_data = InboundSetupData {
address_book,
block_download_peer_set: peer_set.clone(),
block_verifier: buffered_block_verifier,
mempool: mempool_service.clone(),
state: state_service.clone(),
latest_chain_tip,
};
let r = setup_tx.send(setup_data);
// We can't expect or unwrap because the returned Result does not implement Debug
assert!(r.is_ok(), "unexpected setup channel send failure");
let block_gossip_task_handle = tokio::spawn(sync::gossip_best_tip_block_hashes(
sync_status.clone(),
chain_tip_change,
peer_set.clone(),
));
let tx_gossip_task_handle = tokio::spawn(gossip_mempool_transaction_id(
transaction_receiver,
peer_set.clone(),
));
// Open a fake peer connection to the inbound listener, using the isolated connection API
let connected_peer_service =
connect_isolated_tcp_direct(network, listen_addr, "test".to_string())
.await
.expect("local listener connection succeeds");
let connected_peer_service = ServiceBuilder::new()
.buffer(10)
.service(connected_peer_service);
// Make the peer set find the new peer
let _ = peer_set
.ready()
.await
.expect("peer set becomes ready without errors");
// there is no syncer task, and the verifiers are fake,
// but the network stack is all real
(
// real services
connected_peer_service,
inbound_service,
peer_set,
mempool_service,
state_service,
// mocked services
mock_block_verifier,
mock_tx_verifier,
// real tasks
block_gossip_task_handle,
tx_gossip_task_handle,
// real open socket addresses
listen_addr,
)
}

View File

@ -322,6 +322,10 @@ where
_ => unreachable!("wrong response to transaction request"),
};
let tx = tx.available().expect(
"unexpected missing tx status: single tx failures should be errors",
);
metrics::counter!(
"mempool.downloaded.transactions.total",
1,

View File

@ -1,3 +1,5 @@
//! A download stream for Zebra's block syncer.
use std::{
collections::HashMap,
convert::TryFrom,
@ -259,9 +261,10 @@ where
);
blocks
.into_iter()
.next()
.expect("successful response has the block in it")
.first()
.expect("just checked length")
.available()
.expect("unexpected missing block status: single block failures should be errors")
} else {
unreachable!("wrong response to block request");
};

View File

@ -11,6 +11,7 @@ use zebra_chain::{
serialization::ZcashDeserializeInto,
};
use zebra_consensus::Config as ConsensusConfig;
use zebra_network::ResponseStatus;
use zebra_state::Config as StateConfig;
use zebra_test::mock_service::{MockService, PanicAssertion};
@ -25,18 +26,20 @@ use crate::{
config::ZebradConfig,
};
use ResponseStatus::*;
/// Maximum time to wait for a request to any test service.
///
/// The default [`MockService`] value can be too short for some of these tests that take a little
/// longer than expected to actually send the request.
///
/// Increasing this value causes the tests to take longer to complete, so it can't be too large.
const MAX_SERVICE_REQUEST_DELAY: Duration = Duration::from_millis(500);
const MAX_SERVICE_REQUEST_DELAY: Duration = Duration::from_millis(1000);
/// Test that the syncer downloads genesis, blocks 1-2 using obtain_tips, and blocks 3-4 using extend_tips.
///
/// This test also makes sure that the syncer downloads blocks in order.
#[tokio::test(flavor = "multi_thread")]
#[tokio::test]
async fn sync_blocks_ok() -> Result<(), crate::BoxError> {
// Get services
let (
@ -83,7 +86,7 @@ async fn sync_blocks_ok() -> Result<(), crate::BoxError> {
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block0_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![block0.clone()]));
.respond(zn::Response::Blocks(vec![Available(block0.clone())]));
chain_verifier
.expect_request(block0)
@ -157,11 +160,11 @@ async fn sync_blocks_ok() -> Result<(), crate::BoxError> {
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block1_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![block1.clone()]));
.respond(zn::Response::Blocks(vec![Available(block1.clone())]));
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block2_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![block2.clone()]));
.respond(zn::Response::Blocks(vec![Available(block2.clone())]));
// We can't guarantee the verification request order
let mut remaining_blocks: HashMap<block::Hash, Arc<Block>> =
@ -221,11 +224,11 @@ async fn sync_blocks_ok() -> Result<(), crate::BoxError> {
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block3_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![block3.clone()]));
.respond(zn::Response::Blocks(vec![Available(block3.clone())]));
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block4_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![block4.clone()]));
.respond(zn::Response::Blocks(vec![Available(block4.clone())]));
// We can't guarantee the verification request order
let mut remaining_blocks: HashMap<block::Hash, Arc<Block>> =
@ -264,7 +267,7 @@ async fn sync_blocks_ok() -> Result<(), crate::BoxError> {
/// with duplicate block hashes.
///
/// This test also makes sure that the syncer downloads blocks in order.
#[tokio::test(flavor = "multi_thread")]
#[tokio::test]
async fn sync_blocks_duplicate_hashes_ok() -> Result<(), crate::BoxError> {
// Get services
let (
@ -311,7 +314,7 @@ async fn sync_blocks_duplicate_hashes_ok() -> Result<(), crate::BoxError> {
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block0_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![block0.clone()]));
.respond(zn::Response::Blocks(vec![Available(block0.clone())]));
chain_verifier
.expect_request(block0)
@ -387,11 +390,11 @@ async fn sync_blocks_duplicate_hashes_ok() -> Result<(), crate::BoxError> {
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block1_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![block1.clone()]));
.respond(zn::Response::Blocks(vec![Available(block1.clone())]));
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block2_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![block2.clone()]));
.respond(zn::Response::Blocks(vec![Available(block2.clone())]));
// We can't guarantee the verification request order
let mut remaining_blocks: HashMap<block::Hash, Arc<Block>> =
@ -453,11 +456,11 @@ async fn sync_blocks_duplicate_hashes_ok() -> Result<(), crate::BoxError> {
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block3_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![block3.clone()]));
.respond(zn::Response::Blocks(vec![Available(block3.clone())]));
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block4_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![block4.clone()]));
.respond(zn::Response::Blocks(vec![Available(block4.clone())]));
// We can't guarantee the verification request order
let mut remaining_blocks: HashMap<block::Hash, Arc<Block>> =
@ -527,7 +530,7 @@ async fn sync_block_wrong_hash() -> Result<(), crate::BoxError> {
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block0_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![block982k.clone()]));
.respond(zn::Response::Blocks(vec![Available(block982k.clone())]));
// Block is dropped because it has the wrong hash.
// We expect more requests to the state service, because the syncer keeps on running.
@ -547,7 +550,7 @@ async fn sync_block_wrong_hash() -> Result<(), crate::BoxError> {
/// Test that the sync downloader rejects blocks that are too high in obtain_tips.
///
/// TODO: also test that it rejects blocks behind the tip limit. (Needs ~100 fake blocks.)
#[tokio::test(flavor = "multi_thread")]
#[tokio::test]
async fn sync_block_too_high_obtain_tips() -> Result<(), crate::BoxError> {
// Get services
let (
@ -593,7 +596,7 @@ async fn sync_block_too_high_obtain_tips() -> Result<(), crate::BoxError> {
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block0_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![block0.clone()]));
.respond(zn::Response::Blocks(vec![Available(block0.clone())]));
chain_verifier
.expect_request(block0)
@ -675,15 +678,15 @@ async fn sync_block_too_high_obtain_tips() -> Result<(), crate::BoxError> {
iter::once(block982k_hash).collect(),
))
.await
.respond(zn::Response::Blocks(vec![block982k.clone()]));
.respond(zn::Response::Blocks(vec![Available(block982k.clone())]));
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block1_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![block1.clone()]));
.respond(zn::Response::Blocks(vec![Available(block1.clone())]));
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block2_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![block2.clone()]));
.respond(zn::Response::Blocks(vec![Available(block2.clone())]));
// At this point, the following tasks race:
// - The valid chain verifier requests
@ -703,7 +706,7 @@ async fn sync_block_too_high_obtain_tips() -> Result<(), crate::BoxError> {
/// Test that the sync downloader rejects blocks that are too high in extend_tips.
///
/// TODO: also test that it rejects blocks behind the tip limit. (Needs ~100 fake blocks.)
#[tokio::test(flavor = "multi_thread")]
#[tokio::test]
async fn sync_block_too_high_extend_tips() -> Result<(), crate::BoxError> {
// Get services
let (
@ -755,7 +758,7 @@ async fn sync_block_too_high_extend_tips() -> Result<(), crate::BoxError> {
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block0_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![block0.clone()]));
.respond(zn::Response::Blocks(vec![Available(block0.clone())]));
chain_verifier
.expect_request(block0)
@ -829,11 +832,11 @@ async fn sync_block_too_high_extend_tips() -> Result<(), crate::BoxError> {
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block1_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![block1.clone()]));
.respond(zn::Response::Blocks(vec![Available(block1.clone())]));
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block2_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![block2.clone()]));
.respond(zn::Response::Blocks(vec![Available(block2.clone())]));
// We can't guarantee the verification request order
let mut remaining_blocks: HashMap<block::Hash, Arc<Block>> =
@ -895,17 +898,17 @@ async fn sync_block_too_high_extend_tips() -> Result<(), crate::BoxError> {
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block3_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![block3.clone()]));
.respond(zn::Response::Blocks(vec![Available(block3.clone())]));
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block4_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![block4.clone()]));
.respond(zn::Response::Blocks(vec![Available(block4.clone())]));
peer_set
.expect_request(zn::Request::BlocksByHash(
iter::once(block982k_hash).collect(),
))
.await
.respond(zn::Response::Blocks(vec![block982k.clone()]));
.respond(zn::Response::Blocks(vec![Available(block982k.clone())]));
// At this point, the following tasks race:
// - The valid chain verifier requests
@ -934,6 +937,8 @@ fn setup() -> (
MockService<zebra_state::Request, zebra_state::Response, PanicAssertion>,
MockChainTipSender,
) {
zebra_test::init();
let consensus_config = ConsensusConfig::default();
let state_config = StateConfig::ephemeral();
let config = ZebradConfig {