diff --git a/zebra-network/src/lib.rs b/zebra-network/src/lib.rs index effd176d..c7acae0e 100644 --- a/zebra-network/src/lib.rs +++ b/zebra-network/src/lib.rs @@ -158,7 +158,7 @@ pub use crate::{ peer::{HandshakeError, PeerError, SharedPeerError}, peer_set::init, policies::RetryLimit, - protocol::internal::{Request, Response}, + protocol::internal::{Request, Response, ResponseStatus}, }; /// Types used in the definition of [`Request`] and [`Response`] messages. diff --git a/zebra-network/src/meta_addr.rs b/zebra-network/src/meta_addr.rs index 5057f12d..36f362c3 100644 --- a/zebra-network/src/meta_addr.rs +++ b/zebra-network/src/meta_addr.rs @@ -365,6 +365,11 @@ impl MetaAddr { MetaAddr::new_errored(addr, services.into()) } + /// Return the address for this `MetaAddr`. + pub fn addr(&self) -> SocketAddr { + self.addr + } + /// Returns the time of the last successful interaction with this peer. /// /// Initially set to the unverified "last seen time" gossiped by the remote @@ -612,7 +617,7 @@ impl MetaAddr { impl MetaAddr { /// Forcefully change the time this peer last responded. /// - /// This method is for test-purposes only. + /// This method is for testing purposes only. pub(crate) fn set_last_response(&mut self, last_response: DateTime32) { self.last_response = Some(last_response); } diff --git a/zebra-network/src/peer.rs b/zebra-network/src/peer.rs index 6218e69e..b434e7ac 100644 --- a/zebra-network/src/peer.rs +++ b/zebra-network/src/peer.rs @@ -1,30 +1,24 @@ //! Peer handling. -/// Handles outbound requests from our node to the network. mod client; -/// The per-peer connection state machine. mod connection; -/// Wrapper around handshake logic that also opens a TCP connection. mod connector; -/// Peer-related errors. mod error; -/// Performs peer handshakes. mod handshake; -/// Tracks the load on a `Client` service. mod load_tracked_client; -/// Watches for chain tip height updates to determine the minimum support peer protocol version. mod minimum_peer_version; #[cfg(any(test, feature = "proptest-impl"))] pub use client::tests::ClientTestHarness; -#[cfg(not(test))] -use client::ClientRequest; + #[cfg(test)] -pub(crate) use client::{tests::ReceiveRequestAttempt, ClientRequest}; +pub(crate) use client::tests::ReceiveRequestAttempt; +#[cfg(test)] +pub(crate) use handshake::register_inventory_status; use client::{ClientRequestReceiver, InProgressClientRequest, MustUseOneshotSender}; -pub(crate) use client::CancelHeartbeatTask; +pub(crate) use client::{CancelHeartbeatTask, ClientRequest}; pub use client::Client; pub use connection::Connection; diff --git a/zebra-network/src/peer/client.rs b/zebra-network/src/peer/client.rs index 5c1f3e58..1630747f 100644 --- a/zebra-network/src/peer/client.rs +++ b/zebra-network/src/peer/client.rs @@ -1,3 +1,5 @@ +//! Handles outbound requests from our node to the network. + use std::{ future::Future, pin::Pin, diff --git a/zebra-network/src/peer/connection.rs b/zebra-network/src/peer/connection.rs index 4fb9a72d..d8d4787c 100644 --- a/zebra-network/src/peer/connection.rs +++ b/zebra-network/src/peer/connection.rs @@ -1,4 +1,4 @@ -//! Zcash peer connection protocol handing for Zebra. +//! Zebra's per-peer connection state machine. //! //! Maps the external Zcash/Bitcoin protocol to Zebra's internal request/response //! protocol. @@ -35,11 +35,13 @@ use crate::{ peer_set::ConnectionTracker, protocol::{ external::{types::Nonce, InventoryHash, Message}, - internal::{Request, Response}, + internal::{Request, Response, ResponseStatus}, }, BoxError, }; +use ResponseStatus::*; + mod peer_tx; #[cfg(test)] @@ -160,18 +162,11 @@ impl Handler { ) => { // assumptions: // - the transaction messages are sent in a single continuous batch - // - missing transaction hashes are included in a `NotFound` message + // - missing transactions are silently skipped + // (there is no `NotFound` message at the end of the batch) if pending_ids.remove(&transaction.id) { // we are in the middle of the continuous transaction messages transactions.push(transaction); - if pending_ids.is_empty() { - Handler::Finished(Ok(Response::Transactions(transactions))) - } else { - Handler::TransactionsById { - pending_ids, - transactions, - } - } } else { // We got a transaction we didn't ask for. If the caller doesn't know any of the // transactions, they should have sent a `NotFound` with all the hashes, rather @@ -188,18 +183,25 @@ impl Handler { // connection open, so the inbound service can process transactions from good // peers (case 2). ignored_msg = Some(Message::Tx(transaction)); - if !transactions.is_empty() { - // if our peers start sending mixed solicited and unsolicited transactions, - // we should update this code to handle those responses - info!("unexpected transaction from peer: transaction responses should be sent in a continuous batch, followed by notfound. Using partial received transactions as the peer response"); - // TODO: does the caller need a list of missing transactions? (#1515) - Handler::Finished(Ok(Response::Transactions(transactions))) - } else { - // TODO: is it really an error if we ask for a transaction hash, but the peer - // doesn't know it? Should we close the connection on that kind of error? - // Should we fake a NotFound response here? (#1515) - let missing_transaction_ids = pending_ids.iter().map(Into::into).collect(); - Handler::Finished(Err(PeerError::NotFound(missing_transaction_ids))) + } + + if ignored_msg.is_some() && transactions.is_empty() { + // If we didn't get anything we wanted, retry the request. + let missing_transaction_ids = pending_ids.into_iter().map(Into::into).collect(); + Handler::Finished(Err(PeerError::NotFound(missing_transaction_ids))) + } else if pending_ids.is_empty() || ignored_msg.is_some() { + // If we got some of what we wanted, let the internal client know. + let available = transactions.into_iter().map(ResponseStatus::Available); + let missing = pending_ids.into_iter().map(ResponseStatus::Missing); + + Handler::Finished(Ok(Response::Transactions( + available.chain(missing).collect(), + ))) + } else { + // Keep on waiting for more. + Handler::TransactionsById { + pending_ids, + transactions, } } } @@ -219,7 +221,7 @@ impl Handler { // // If we're in sync with the peer, then the `NotFound` should contain the remaining // hashes from the handler. If we're not in sync with the peer, we should return - // what we got so far, and log an error. + // what we got so far. let missing_transaction_ids: HashSet<_> = transaction_ids(&missing_invs).collect(); if missing_transaction_ids != pending_ids { trace!(?missing_invs, ?missing_transaction_ids, ?pending_ids); @@ -231,13 +233,18 @@ impl Handler { info!("unexpected notfound message from peer: notfound contains duplicate hashes or non-transaction hashes. Using partial received transactions as the peer response"); } - if !transactions.is_empty() { - // TODO: does the caller need a list of missing transactions? (#1515) - Handler::Finished(Ok(Response::Transactions(transactions))) + if transactions.is_empty() { + // If we didn't get anything we wanted, retry the request. + let missing_transaction_ids = pending_ids.into_iter().map(Into::into).collect(); + Handler::Finished(Err(PeerError::NotFound(missing_transaction_ids))) } else { - // TODO: is it really an error if we ask for a transaction hash, but the peer - // doesn't know it? Should we close the connection on that kind of error? (#1515) - Handler::Finished(Err(PeerError::NotFound(missing_invs))) + // If we got some of what we wanted, let the internal client know. + let available = transactions.into_iter().map(ResponseStatus::Available); + let missing = pending_ids.into_iter().map(ResponseStatus::Missing); + + Handler::Finished(Ok(Response::Transactions( + available.chain(missing).collect(), + ))) } } // `zcashd` returns requested blocks in a single batch of messages. @@ -258,25 +265,19 @@ impl Handler { if pending_hashes.remove(&block.hash()) { // we are in the middle of the continuous block messages blocks.push(block); - if pending_hashes.is_empty() { - Handler::Finished(Ok(Response::Blocks(blocks))) - } else { - Handler::BlocksByHash { - pending_hashes, - blocks, - } - } } else { // We got a block we didn't ask for. // // So either: // 1. The response is for a previously cancelled block request. - // We should ignore that block, and wait for the actual response. + // We should treat that block as an inbound gossiped block, + // and wait for the actual response. // 2. The peer doesn't know any of the blocks we asked for. // We should cancel the request, so we don't hang waiting for blocks that // will never arrive. // 3. The peer sent an unsolicited block. - // We should ignore that block, and wait for the actual response. + // We should treat that block as an inbound gossiped block, + // and wait for the actual response. // // We ignore the message, so we don't desynchronize with the peer. This happens // when we cancel a request and send a second different request, but receive a @@ -286,15 +287,21 @@ impl Handler { // // Ignoring the message gives us a chance to synchronize back to the correct // request. + // + // Peers can avoid these cascading errors by sending an explicit `notfound`. + // Zebra sends `notfound`, but `zcashd` doesn't. ignored_msg = Some(Message::Block(block)); - if !blocks.is_empty() { - // TODO: does the caller need a list of missing blocks? (#1515) - Handler::Finished(Ok(Response::Blocks(blocks))) - } else { - Handler::BlocksByHash { - pending_hashes, - blocks, - } + } + + if pending_hashes.is_empty() { + // If we got everything we wanted, let the internal client know. + let available = blocks.into_iter().map(ResponseStatus::Available); + Handler::Finished(Ok(Response::Blocks(available.collect()))) + } else { + // Keep on waiting for all the blocks we wanted, until we get them or time out. + Handler::BlocksByHash { + pending_hashes, + blocks, } } } @@ -304,7 +311,7 @@ impl Handler { pending_hashes, blocks, }, - Message::NotFound(items), + Message::NotFound(missing_invs), ) => { // assumptions: // - the peer eventually returns a block or a `NotFound` entry @@ -315,36 +322,31 @@ impl Handler { // If we're in sync with the peer, then the `NotFound` should contain the remaining // hashes from the handler. If we're not in sync with the peer, we should return // what we got so far, and log an error. - let missing_blocks: HashSet<_> = items - .iter() - .filter_map(|inv| match &inv { - InventoryHash::Block(b) => Some(b), - _ => None, - }) - .cloned() - .collect(); + let missing_blocks: HashSet<_> = block_hashes(&missing_invs).collect(); if missing_blocks != pending_hashes { - trace!(?items, ?missing_blocks, ?pending_hashes); + trace!(?missing_invs, ?missing_blocks, ?pending_hashes); // if these errors are noisy, we should replace them with debugs info!("unexpected notfound message from peer: all remaining block hashes should be listed in the notfound. Using partial received blocks as the peer response"); } - if missing_blocks.len() != items.len() { - trace!(?items, ?missing_blocks, ?pending_hashes); + if missing_blocks.len() != missing_invs.len() { + trace!(?missing_invs, ?missing_blocks, ?pending_hashes); info!("unexpected notfound message from peer: notfound contains duplicate hashes or non-block hashes. Using partial received blocks as the peer response"); } - if !blocks.is_empty() { - // TODO: does the caller need a list of missing blocks? (#1515) - Handler::Finished(Ok(Response::Blocks(blocks))) + if blocks.is_empty() { + // If we didn't get anything we wanted, retry the request. + let missing_block_hashes = pending_hashes.into_iter().map(Into::into).collect(); + Handler::Finished(Err(PeerError::NotFound(missing_block_hashes))) } else { - // TODO: is it really an error if we ask for a block hash, but the peer - // doesn't know it? Should we close the connection on that kind of error? (#1515) - Handler::Finished(Err(PeerError::NotFound(items))) + // If we got some of what we wanted, let the internal client know. + let available = blocks.into_iter().map(ResponseStatus::Available); + let missing = pending_hashes.into_iter().map(ResponseStatus::Missing); + + Handler::Finished(Ok(Response::Blocks(available.chain(missing).collect()))) } } // TODO: - // - add NotFound cases for blocks, transactions, and headers (#2726) // - use `any(inv)` rather than `all(inv)`? (Handler::FindBlocks, Message::Inv(items)) if items @@ -1216,18 +1218,48 @@ where } } Response::Transactions(transactions) => { - // Generate one tx message per transaction. + // Generate one tx message per transaction, + // then a notfound message with all the missing transaction ids. + let mut missing_ids = Vec::new(); + for transaction in transactions.into_iter() { - if let Err(e) = self.peer_tx.send(Message::Tx(transaction)).await { + match transaction { + Available(transaction) => { + if let Err(e) = self.peer_tx.send(Message::Tx(transaction)).await { + self.fail_with(e); + return; + } + } + Missing(id) => missing_ids.push(id.into()), + } + } + + if !missing_ids.is_empty() { + if let Err(e) = self.peer_tx.send(Message::NotFound(missing_ids)).await { self.fail_with(e); return; } } } Response::Blocks(blocks) => { - // Generate one block message per block. + // Generate one tx message per block, + // then a notfound% message with all the missing block hashes. + let mut missing_hashes = Vec::new(); + for block in blocks.into_iter() { - if let Err(e) = self.peer_tx.send(Message::Block(block)).await { + match block { + Available(block) => { + if let Err(e) = self.peer_tx.send(Message::Block(block)).await { + self.fail_with(e); + return; + } + } + Missing(hash) => missing_hashes.push(hash.into()), + } + } + + if !missing_hashes.is_empty() { + if let Err(e) = self.peer_tx.send(Message::NotFound(missing_hashes)).await { self.fail_with(e); return; } @@ -1400,11 +1432,5 @@ fn transaction_ids(items: &'_ [InventoryHash]) -> impl Iterator impl Iterator + '_ { - items.iter().filter_map(|item| { - if let InventoryHash::Block(hash) = item { - Some(*hash) - } else { - None - } - }) + items.iter().filter_map(InventoryHash::block_hash) } diff --git a/zebra-network/src/peer/connection/tests/prop.rs b/zebra-network/src/peer/connection/tests/prop.rs index 7290c9ad..fe600913 100644 --- a/zebra-network/src/peer/connection/tests/prop.rs +++ b/zebra-network/src/peer/connection/tests/prop.rs @@ -1,3 +1,5 @@ +//! Randomised property tests for peer connection handling. + use std::{collections::HashSet, env, mem, sync::Arc}; use futures::{ @@ -10,6 +12,7 @@ use tracing::Span; use zebra_chain::{ block::{self, Block}, + fmt::DisplayToDebug, serialization::SerializationError, }; use zebra_test::mock_service::{MockService, PropTestAssertion}; @@ -17,9 +20,12 @@ use zebra_test::mock_service::{MockService, PropTestAssertion}; use crate::{ peer::{connection::Connection, ClientRequest, ErrorSlot}, protocol::external::Message, + protocol::internal::ResponseStatus, Request, Response, SharedPeerError, }; +use ResponseStatus::*; + proptest! { // The default value of proptest cases (256) causes this test to take more than ten seconds on // most machines, so this reduces the value a little to reduce the test time. @@ -31,10 +37,13 @@ proptest! { .unwrap_or(32)) )] + /// This test makes sure that Zebra ignores extra blocks after a block request is cancelled. + /// + /// We need this behaviour to avoid cascading errors after a single cancelled block request. #[test] fn connection_is_not_desynchronized_when_request_is_cancelled( - first_block in any::>(), - second_block in any::>(), + first_block in any::>>(), + second_block in any::>>(), ) { let runtime = zebra_test::init_async(); @@ -72,26 +81,34 @@ proptest! { // Reply to first request peer_tx - .send(Ok(Message::Block(first_block))) + .send(Ok(Message::Block(first_block.0))) .await .expect("Failed to send response to first block request"); // Reply to second request peer_tx - .send(Ok(Message::Block(second_block.clone()))) + .send(Ok(Message::Block(second_block.0.clone()))) .await .expect("Failed to send response to second block request"); // Check second response is correctly received let receive_response_result = response_to_second_request.await; - prop_assert!(receive_response_result.is_ok()); + prop_assert!( + receive_response_result.is_ok(), + "unexpected receive result: {:?}", + receive_response_result, + ); let response_result = receive_response_result.unwrap(); - prop_assert!(response_result.is_ok()); + prop_assert!( + response_result.is_ok(), + "unexpected response result: {:?}", + response_result, + ); let response = response_result.unwrap(); - prop_assert_eq!(response, Response::Blocks(vec![second_block])); + prop_assert_eq!(response, Response::Blocks(vec![Available(second_block.0)])); // Check the state after the response let error = shared_error_slot.try_get_error(); @@ -103,7 +120,11 @@ proptest! { mem::drop(peer_tx); let connection_task_result = connection_task.await; - prop_assert!(connection_task_result.is_ok()); + prop_assert!( + connection_task_result.is_ok(), + "unexpected task result: {:?}", + connection_task_result, + ); Ok(()) })?; diff --git a/zebra-network/src/peer/connector.rs b/zebra-network/src/peer/connector.rs index 253ebed5..6e73a3de 100644 --- a/zebra-network/src/peer/connector.rs +++ b/zebra-network/src/peer/connector.rs @@ -1,3 +1,5 @@ +//! Wrapper around handshake logic that also opens a TCP connection. + use std::{ future::Future, net::SocketAddr, diff --git a/zebra-network/src/peer/error.rs b/zebra-network/src/peer/error.rs index 2122bf21..558836ab 100644 --- a/zebra-network/src/peer/error.rs +++ b/zebra-network/src/peer/error.rs @@ -1,3 +1,5 @@ +//! Peer-related errors. + use std::{borrow::Cow, sync::Arc}; use thiserror::Error; diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index d4131b12..1f0e9d2a 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -988,7 +988,7 @@ where } /// Register any advertised or missing inventory in `msg` for `connected_addr`. -async fn register_inventory_status( +pub(crate) async fn register_inventory_status( msg: Result, connected_addr: ConnectedAddr, inv_collector: broadcast::Sender, @@ -1022,7 +1022,7 @@ async fn register_inventory_status( // If all receivers have been dropped, `send` returns an error. // When that happens, Zebra is shutting down, so we want to ignore this error. let _ = inv_collector - .send(InventoryChange::new_advertised(*advertised, transient_addr)); + .send(InventoryChange::new_available(*advertised, transient_addr)); } [advertised @ ..] => { let advertised = advertised @@ -1035,7 +1035,7 @@ async fn register_inventory_status( ); if let Some(change) = - InventoryChange::new_advertised_multi(advertised, transient_addr) + InventoryChange::new_available_multi(advertised, transient_addr) { // Ignore channel errors that should only happen during shutdown. let _ = inv_collector.send(change); @@ -1045,6 +1045,11 @@ async fn register_inventory_status( } (Ok(Message::NotFound(missing)), Some(transient_addr)) => { + // Ignore Errors and the unsupported FilteredBlock type + let missing = missing.iter().filter(|missing| { + missing.unmined_tx_id().is_some() || missing.block_hash().is_some() + }); + debug!(?missing, "registering missing inventory for peer"); if let Some(change) = InventoryChange::new_missing_multi(missing, transient_addr) { diff --git a/zebra-network/src/peer/minimum_peer_version.rs b/zebra-network/src/peer/minimum_peer_version.rs index 5ee2a449..421915d8 100644 --- a/zebra-network/src/peer/minimum_peer_version.rs +++ b/zebra-network/src/peer/minimum_peer_version.rs @@ -1,3 +1,5 @@ +//! Watches for chain tip height updates to determine the minimum supported peer protocol version. + use zebra_chain::{chain_tip::ChainTip, parameters::Network}; use crate::protocol::external::types::Version; diff --git a/zebra-network/src/peer_set/initialize/tests/vectors.rs b/zebra-network/src/peer_set/initialize/tests/vectors.rs index f1a1e66f..1af5f7e4 100644 --- a/zebra-network/src/peer_set/initialize/tests/vectors.rs +++ b/zebra-network/src/peer_set/initialize/tests/vectors.rs @@ -1363,7 +1363,7 @@ where + 'static, S::Future: Send + 'static, { - // Create a test config that listens on any unused port. + // Create a test config that listens on an unused port. let listen_addr = "127.0.0.1:0".parse().unwrap(); let mut config = Config { listen_addr, diff --git a/zebra-network/src/peer_set/inventory_registry.rs b/zebra-network/src/peer_set/inventory_registry.rs index c16ae9f2..6d3db30f 100644 --- a/zebra-network/src/peer_set/inventory_registry.rs +++ b/zebra-network/src/peer_set/inventory_registry.rs @@ -20,44 +20,37 @@ use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream, use zebra_chain::{parameters::POST_BLOSSOM_POW_TARGET_SPACING, serialization::AtLeastOne}; -use crate::{protocol::external::InventoryHash, BoxError}; +use crate::{ + protocol::{external::InventoryHash, internal::ResponseStatus}, + BoxError, +}; use self::update::Update; -use InventoryStatus::*; +/// Underlying type for the alias InventoryStatus::* +use ResponseStatus::*; pub mod update; #[cfg(test)] mod tests; +/// A peer inventory status, which tracks a hash for both available and missing inventory. +pub type InventoryStatus = ResponseStatus; + /// A peer inventory status change, used in the inventory status channel. +/// +/// For performance reasons, advertisements should only be tracked +/// for hashes that are rare on the network. +/// So Zebra only tracks single-block inventory messages. +/// +/// For security reasons, all `notfound` rejections should be tracked. +/// This also helps with performance, if the hash is rare on the network. pub type InventoryChange = InventoryStatus<(AtLeastOne, SocketAddr)>; /// An internal marker used in inventory status hash maps. type InventoryMarker = InventoryStatus<()>; -/// A generic peer inventory status. -/// -/// `Advertised` is used for inventory that peers claim to have, -/// and `Missing` is used for inventory they didn't provide when we requested it. -#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] -pub enum InventoryStatus { - /// An advertised inventory hash. - /// - /// For performance reasons, advertisements should only be tracked - /// for hashes that are rare on the network. - /// So Zebra only tracks single-block inventory messages. - Advertised(T), - - /// An inventory hash rejected by a peer. - /// - /// For security reasons, all `notfound` rejections should be tracked. - /// This also helps with performance, if the hash is rare on the network. - #[allow(dead_code)] - Missing(T), -} - /// An Inventory Registry for tracking recent inventory advertisements and missing inventory. /// /// For more details please refer to the [RFC]. @@ -90,9 +83,9 @@ impl std::fmt::Debug for InventoryRegistry { } impl InventoryChange { - /// Returns a new advertised inventory change from a single hash. - pub fn new_advertised(hash: InventoryHash, peer: SocketAddr) -> Self { - InventoryStatus::Advertised((AtLeastOne::from_one(hash), peer)) + /// Returns a new available inventory change from a single hash. + pub fn new_available(hash: InventoryHash, peer: SocketAddr) -> Self { + InventoryStatus::Available((AtLeastOne::from_one(hash), peer)) } /// Returns a new missing inventory change from a single hash. @@ -101,15 +94,15 @@ impl InventoryChange { InventoryStatus::Missing((AtLeastOne::from_one(hash), peer)) } - /// Returns a new advertised multiple inventory change, if `hashes` contains at least one change. - pub fn new_advertised_multi<'a>( + /// Returns a new available multiple inventory change, if `hashes` contains at least one change. + pub fn new_available_multi<'a>( hashes: impl IntoIterator, peer: SocketAddr, ) -> Option { let hashes: Vec = hashes.into_iter().copied().collect(); let hashes = hashes.try_into().ok(); - hashes.map(|hashes| InventoryStatus::Advertised((hashes, peer))) + hashes.map(|hashes| InventoryStatus::Available((hashes, peer))) } /// Returns a new missing multiple inventory change, if `hashes` contains at least one change. @@ -125,64 +118,27 @@ impl InventoryChange { } } -impl InventoryStatus { - /// Returns true if the inventory item was advertised. - #[allow(dead_code)] - pub fn is_advertised(&self) -> bool { - matches!(self, Advertised(_)) - } - - /// Returns true if the inventory item was missing. - #[allow(dead_code)] - pub fn is_missing(&self) -> bool { - matches!(self, Missing(_)) - } - - /// Get the advertised inventory item, if present. - pub fn advertised(&self) -> Option { - if let Advertised(item) = self { - Some(item.clone()) - } else { - None - } - } - - /// Get the rejected inventory item, if present. - #[allow(dead_code)] - pub fn missing(&self) -> Option { - if let Missing(item) = self { - Some(item.clone()) - } else { - None - } - } - - /// Get the inner item, regardless of status. - pub fn inner(&self) -> T { - match self { - Advertised(item) | Missing(item) => item.clone(), - } - } - +impl InventoryStatus { /// Get a marker for the status, without any associated data. pub fn marker(&self) -> InventoryMarker { self.as_ref().map(|_inner| ()) } /// Maps an `InventoryStatus` to `InventoryStatus` by applying a function to a contained value. - pub fn map U>(self, f: F) -> InventoryStatus { - // Based on Option::map from https://doc.rust-lang.org/src/core/option.rs.html#829 + pub fn map U>(self, f: F) -> InventoryStatus { + // Based on Option::map from https://doc.rust-lang.org/src/core/option.rs.html#844 match self { - Advertised(item) => Advertised(f(item)), + Available(item) => Available(f(item)), Missing(item) => Missing(f(item)), } } +} - /// Converts from `&InventoryStatus` to `InventoryStatus<&T>`. - pub fn as_ref(&self) -> InventoryStatus<&T> { +impl InventoryStatus { + /// Get the inner item, regardless of status. + pub fn inner(&self) -> T { match self { - Advertised(item) => Advertised(item), - Missing(item) => Missing(item), + Available(item) | Missing(item) => item.clone(), } } } @@ -212,7 +168,7 @@ impl InventoryRegistry { /// Returns an iterator over addrs of peers that have recently advertised `hash` in their inventory. pub fn advertising_peers(&self, hash: InventoryHash) -> impl Iterator { self.status_peers(hash) - .filter_map(|addr_status| addr_status.advertised()) + .filter_map(|addr_status| addr_status.available()) } /// Returns an iterator over addrs of peers that have recently missed `hash` in their inventory. @@ -332,7 +288,7 @@ impl InventoryRegistry { // Prefer `missing` over `advertised`, so malicious peers can't reset their own entries, // and funnel multiple failing requests to themselves. if let Some(old_status) = current.get(&addr) { - if old_status.is_missing() && new_status.is_advertised() { + if old_status.is_missing() && new_status.is_available() { debug!(?new_status, ?old_status, ?addr, ?inv, "skipping new status"); continue; } diff --git a/zebra-network/src/peer_set/inventory_registry/tests.rs b/zebra-network/src/peer_set/inventory_registry/tests.rs index e83d591b..61a86196 100644 --- a/zebra-network/src/peer_set/inventory_registry/tests.rs +++ b/zebra-network/src/peer_set/inventory_registry/tests.rs @@ -1,3 +1,22 @@ //! Tests for the inventory registry. +use tokio::sync::broadcast; + +use crate::peer_set::inventory_registry::{InventoryChange, InventoryRegistry}; + +mod prop; mod vectors; + +/// The number of changes that can be pending in the inventory channel, before it starts lagging. +/// +/// Lagging drops messages, so tests should avoid filling the channel. +pub const MAX_PENDING_CHANGES: usize = 32; + +/// Returns a newly initialised inventory registry, and a sender for its inventory channel. +fn new_inv_registry() -> (InventoryRegistry, broadcast::Sender) { + let (inv_stream_tx, inv_stream_rx) = broadcast::channel(MAX_PENDING_CHANGES); + + let inv_registry = InventoryRegistry::new(inv_stream_rx); + + (inv_registry, inv_stream_tx) +} diff --git a/zebra-network/src/peer_set/inventory_registry/tests/prop.rs b/zebra-network/src/peer_set/inventory_registry/tests/prop.rs new file mode 100644 index 00000000..5d880d55 --- /dev/null +++ b/zebra-network/src/peer_set/inventory_registry/tests/prop.rs @@ -0,0 +1,140 @@ +//! Randomised property tests for the inventory registry. + +use std::{collections::HashSet, net::SocketAddr}; + +use proptest::prelude::*; + +use crate::{ + peer::{register_inventory_status, ConnectedAddr}, + peer_set::inventory_registry::{ + tests::{new_inv_registry, MAX_PENDING_CHANGES}, + InventoryMarker, + }, + protocol::external::{InventoryHash, Message}, +}; + +use InventoryHash::*; + +/// The maximum number of random hashes we want to use in these tests. +pub const MAX_HASH_COUNT: usize = 5; + +proptest! { + /// Check inventory registration works via the inbound peer message channel wrapper. + #[test] + fn inv_registry_inbound_wrapper_ok( + status in any::(), + test_hashes in prop::collection::hash_set(any::(), 0..=MAX_HASH_COUNT) + ) { + prop_assert!(MAX_HASH_COUNT <= MAX_PENDING_CHANGES, "channel must not block in tests"); + + // Start the runtime + let runtime = zebra_test::init_async(); + let _guard = runtime.enter(); + + runtime.block_on(async move { + // Check all combinations of: + // + // Inventory availability: + // - Available + // - Missing + // + // Inventory multiplicity: + // - Empty messages are ignored without errors + // - Single is handled correctly + // - Multiple are handled correctly + // + // And inventory variants: + // - Block (empty and single only) + // - Tx for legacy v4 and earlier transactions + // - Wtx for v5 and later transactions + // + // Using randomised proptest inventory data. + inv_registry_inbound_wrapper_with(status, test_hashes).await; + }) + } +} + +/// Check inventory registration works via the inbound peer message channel wrapper. +#[tracing::instrument] +async fn inv_registry_inbound_wrapper_with( + status: InventoryMarker, + test_hashes: HashSet, +) { + let test_peer: SocketAddr = "1.1.1.1:1" + .parse() + .expect("unexpected invalid peer address"); + let test_peer = ConnectedAddr::new_inbound_direct(test_peer); + + let test_hashes: Vec = test_hashes.into_iter().collect(); + let test_msg = if status.is_available() { + Message::Inv(test_hashes.clone()) + } else { + Message::NotFound(test_hashes.clone()) + }; + + let (mut inv_registry, inv_stream_tx) = new_inv_registry(); + + let forwarded_msg = + register_inventory_status(Ok(test_msg.clone()), test_peer, inv_stream_tx.clone()).await; + assert_eq!( + test_msg.clone(), + forwarded_msg.expect("unexpected forwarded error result"), + ); + + inv_registry + .update() + .await + .expect("unexpected dropped registry sender channel"); + + let test_peer = test_peer + .get_transient_addr() + .expect("unexpected None: expected Some transient peer address"); + + for &test_hash in test_hashes.iter() { + // The registry should ignore these unsupported types. + // (Currently, it panics if they are registered, but they are ok to query.) + if matches!(test_hash, Error | FilteredBlock(_)) { + assert_eq!(inv_registry.advertising_peers(test_hash).count(), 0); + assert_eq!(inv_registry.missing_peers(test_hash).count(), 0); + continue; + } + + if status.is_available() { + // register_inventory_status doesn't register multi-block available inventory, + // for performance reasons. + if test_hashes.len() > 1 && test_hash.block_hash().is_some() { + assert_eq!(inv_registry.advertising_peers(test_hash).count(), 0); + assert_eq!(inv_registry.missing_peers(test_hash).count(), 0); + continue; + } + + assert_eq!( + inv_registry.advertising_peers(test_hash).next(), + Some(&test_peer), + "unexpected None hash: {:?},\n\ + in message {:?} \n\ + with length {}\n\ + should be advertised\n", + test_hash, + test_msg, + test_hashes.len(), + ); + assert_eq!(inv_registry.advertising_peers(test_hash).count(), 1); + assert_eq!(inv_registry.missing_peers(test_hash).count(), 0); + } else { + assert_eq!(inv_registry.advertising_peers(test_hash).count(), 0); + assert_eq!( + inv_registry.missing_peers(test_hash).next(), + Some(&test_peer), + "unexpected None hash: {:?},\n\ + in message {:?} \n\ + with length {}\n\ + should be advertised\n", + test_hash, + test_msg, + test_hashes.len(), + ); + assert_eq!(inv_registry.missing_peers(test_hash).count(), 1); + } + } +} diff --git a/zebra-network/src/peer_set/inventory_registry/tests/vectors.rs b/zebra-network/src/peer_set/inventory_registry/tests/vectors.rs index e89814d6..3d918047 100644 --- a/zebra-network/src/peer_set/inventory_registry/tests/vectors.rs +++ b/zebra-network/src/peer_set/inventory_registry/tests/vectors.rs @@ -1,22 +1,12 @@ //! Fixed test vectors for the inventory registry. -use tokio::sync::broadcast; - use zebra_chain::block; use crate::{ - peer_set::{ - inventory_registry::{InventoryRegistry, InventoryStatus}, - InventoryChange, - }, + peer_set::inventory_registry::{tests::new_inv_registry, InventoryStatus}, protocol::external::InventoryHash, }; -/// The number of changes that can be pending in the inventory channel, before it starts lagging. -/// -/// Lagging drops messages, so tests should avoid filling the channel. -pub const MAX_PENDING_CHANGES: usize = 32; - /// Check an empty inventory registry works as expected. #[tokio::test] async fn inv_registry_empty_ok() { @@ -40,7 +30,7 @@ async fn inv_registry_one_advertised_ok() { let test_peer = "1.1.1.1:1" .parse() .expect("unexpected invalid peer address"); - let test_change = InventoryStatus::new_advertised(test_hash, test_peer); + let test_change = InventoryStatus::new_available(test_hash, test_peer); let (mut inv_registry, inv_stream_tx) = new_inv_registry(); @@ -105,7 +95,7 @@ async fn inv_registry_prefer_missing_order(missing_first: bool) { .expect("unexpected invalid peer address"); let missing_change = InventoryStatus::new_missing(test_hash, test_peer); - let advertised_change = InventoryStatus::new_advertised(test_hash, test_peer); + let advertised_change = InventoryStatus::new_available(test_hash, test_peer); let (mut inv_registry, inv_stream_tx) = new_inv_registry(); @@ -150,7 +140,7 @@ async fn inv_registry_prefer_current_order(missing_current: bool) { .expect("unexpected invalid peer address"); let missing_change = InventoryStatus::new_missing(test_hash, test_peer); - let advertised_change = InventoryStatus::new_advertised(test_hash, test_peer); + let advertised_change = InventoryStatus::new_available(test_hash, test_peer); let (mut inv_registry, inv_stream_tx) = new_inv_registry(); @@ -192,12 +182,3 @@ async fn inv_registry_prefer_current_order(missing_current: bool) { assert_eq!(inv_registry.missing_peers(test_hash).count(), 0); } } - -/// Returns a newly initialised inventory registry, and a sender for its inventory channel. -fn new_inv_registry() -> (InventoryRegistry, broadcast::Sender) { - let (inv_stream_tx, inv_stream_rx) = broadcast::channel(MAX_PENDING_CHANGES); - - let inv_registry = InventoryRegistry::new(inv_stream_rx); - - (inv_registry, inv_stream_tx) -} diff --git a/zebra-network/src/peer_set/set/tests/vectors.rs b/zebra-network/src/peer_set/set/tests/vectors.rs index 2b02091c..c9cb71ce 100644 --- a/zebra-network/src/peer_set/set/tests/vectors.rs +++ b/zebra-network/src/peer_set/set/tests/vectors.rs @@ -255,7 +255,7 @@ fn peer_set_route_inv_advertised_registry_order(advertised_first: bool) { .parse() .expect("unexpected invalid peer address"); - let test_change = InventoryStatus::new_advertised(test_inv, test_peer); + let test_change = InventoryStatus::new_available(test_inv, test_peer); // Use two peers with the same version let peer_version = Version::min_specified_for_upgrade(Network::Mainnet, NetworkUpgrade::Canopy); diff --git a/zebra-network/src/protocol/external/inv.rs b/zebra-network/src/protocol/external/inv.rs index ea483d88..7f371339 100644 --- a/zebra-network/src/protocol/external/inv.rs +++ b/zebra-network/src/protocol/external/inv.rs @@ -1,4 +1,4 @@ -//! Inventory items for the Bitcoin protocol. +//! Inventory items for the Zcash network protocol. use std::io::{Read, Write}; @@ -65,6 +65,19 @@ impl InventoryHash { InventoryHash::Tx(legacy_tx_id) } + /// Returns the block hash for this inventory hash, + /// if this inventory hash is a non-filtered block variant. + pub fn block_hash(&self) -> Option { + match self { + InventoryHash::Error => None, + InventoryHash::Tx(_legacy_tx_id) => None, + InventoryHash::Block(hash) => Some(*hash), + // Zebra does not support filtered blocks + InventoryHash::FilteredBlock(_ignored_hash) => None, + InventoryHash::Wtx(_wtx_id) => None, + } + } + /// Returns the unmined transaction ID for this inventory hash, /// if this inventory hash is a transaction variant. pub fn unmined_tx_id(&self) -> Option { diff --git a/zebra-network/src/protocol/internal.rs b/zebra-network/src/protocol/internal.rs index e97891a1..df603a34 100644 --- a/zebra-network/src/protocol/internal.rs +++ b/zebra-network/src/protocol/internal.rs @@ -1,5 +1,7 @@ mod request; mod response; +mod response_status; pub use request::Request; pub use response::Response; +pub use response_status::ResponseStatus; diff --git a/zebra-network/src/protocol/internal/response.rs b/zebra-network/src/protocol/internal/response.rs index fefc4fc2..b1d89c62 100644 --- a/zebra-network/src/protocol/internal/response.rs +++ b/zebra-network/src/protocol/internal/response.rs @@ -1,28 +1,31 @@ +//! Zebra's internal peer message response format. + +use std::{fmt, sync::Arc}; + use zebra_chain::{ block::{self, Block}, transaction::{UnminedTx, UnminedTxId}, }; -use crate::meta_addr::MetaAddr; - -use std::{fmt, sync::Arc}; +use crate::{meta_addr::MetaAddr, protocol::internal::ResponseStatus}; #[cfg(any(test, feature = "proptest-impl"))] use proptest_derive::Arbitrary; +use ResponseStatus::*; + /// A response to a network request, represented in internal format. #[derive(Clone, Debug, Eq, PartialEq)] #[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))] pub enum Response { - /// Do not send any response to this request. + /// The request does not have a response. /// /// Either: /// * the request does not need a response, or - /// * we have no useful data to provide in response to the request + /// * we have no useful data to provide in response to the request, + /// and the request was not an inventory request. /// - /// When Zebra doesn't have any useful data, it always sends no response, - /// instead of sending `notfound`. `zcashd` sometimes sends no response, - /// and sometimes sends `notfound`. + /// (Inventory requests provide a list of missing hashes if none of the hashes were available.) Nil, /// A list of peers, used to respond to `GetPeers`. @@ -32,35 +35,20 @@ pub enum Response { // TODO: make this into a HashMap - a unique list of peer addresses (#2244) Peers(Vec), - /// A list of blocks. - /// - /// The list contains zero or more blocks. - // - // TODO: split this into found and not found (#2726) - Blocks(Vec>), - - /// A list of block hashes. + /// An ordered list of block hashes. /// /// The list contains zero or more block hashes. // // TODO: make this into an IndexMap - an ordered unique list of hashes (#2244) BlockHashes(Vec), - /// A list of block headers. + /// An ordered list of block headers. /// /// The list contains zero or more block headers. // - // TODO: make this into a HashMap - a unique list of headers (#2244) - // split this into found and not found (#2726) + // TODO: make this into an IndexMap - an ordered unique list of headers (#2244) BlockHeaders(Vec), - /// A list of unmined transactions. - /// - /// The list contains zero or more unmined transactions. - // - // TODO: split this into found and not found (#2726) - Transactions(Vec), - /// A list of unmined transaction IDs. /// /// v4 transactions use a legacy transaction ID, and @@ -68,8 +56,25 @@ pub enum Response { /// /// The list contains zero or more transaction IDs. // - // TODO: make this into a HashSet - a unique list of transaction IDs (#2244) + // TODO: make this into a HashSet - a unique list (#2244) TransactionIds(Vec), + + /// A list of found blocks, and missing block hashes. + /// + /// Each list contains zero or more entries. + /// + /// When Zebra doesn't have a block or transaction, it always sends `notfound`. + /// `zcashd` sometimes sends no response, and sometimes sends `notfound`. + // + // TODO: make this into a HashMap, ()>> - a unique list (#2244) + Blocks(Vec, block::Hash>>), + + /// A list of found unmined transactions, and missing unmined transaction IDs. + /// + /// Each list contains zero or more entries. + // + // TODO: make this into a HashMap> - a unique list (#2244) + Transactions(Vec>), } impl fmt::Display for Response { @@ -79,30 +84,38 @@ impl fmt::Display for Response { Response::Peers(peers) => format!("Peers {{ peers: {} }}", peers.len()), - // Display heights for single-block responses (which Zebra requests and expects) - Response::Blocks(blocks) if blocks.len() == 1 => { - let block = blocks.first().expect("len is 1"); - format!( - "Block {{ height: {}, hash: {} }}", - block - .coinbase_height() - .as_ref() - .map(|h| h.0.to_string()) - .unwrap_or_else(|| "None".into()), - block.hash(), - ) - } - Response::Blocks(blocks) => format!("Blocks {{ blocks: {} }}", blocks.len()), - Response::BlockHashes(hashes) => format!("BlockHashes {{ hashes: {} }}", hashes.len()), Response::BlockHeaders(headers) => { format!("BlockHeaders {{ headers: {} }}", headers.len()) } - - Response::Transactions(transactions) => { - format!("Transactions {{ transactions: {} }}", transactions.len()) - } Response::TransactionIds(ids) => format!("TransactionIds {{ ids: {} }}", ids.len()), + + // Display heights for single-block responses (which Zebra requests and expects) + Response::Blocks(blocks) if blocks.len() == 1 => { + match blocks.first().expect("len is 1") { + Available(block) => format!( + "Block {{ height: {}, hash: {} }}", + block + .coinbase_height() + .as_ref() + .map(|h| h.0.to_string()) + .unwrap_or_else(|| "None".into()), + block.hash(), + ), + Missing(hash) => format!("Block {{ missing: {} }}", hash), + } + } + Response::Blocks(blocks) => format!( + "Blocks {{ blocks: {}, missing: {} }}", + blocks.iter().filter(|r| r.is_available()).count(), + blocks.iter().filter(|r| r.is_missing()).count() + ), + + Response::Transactions(transactions) => format!( + "Transactions {{ transactions: {}, missing: {} }}", + transactions.iter().filter(|r| r.is_available()).count(), + transactions.iter().filter(|r| r.is_missing()).count() + ), }) } } @@ -115,13 +128,12 @@ impl Response { Response::Peers(_) => "Peers", - Response::Blocks(_) => "Blocks", Response::BlockHashes(_) => "BlockHashes", - - Response::BlockHeaders { .. } => "BlockHeaders", - - Response::Transactions(_) => "Transactions", + Response::BlockHeaders(_) => "BlockHeaders", Response::TransactionIds(_) => "TransactionIds", + + Response::Blocks(_) => "Blocks", + Response::Transactions(_) => "Transactions", } } } diff --git a/zebra-network/src/protocol/internal/response_status.rs b/zebra-network/src/protocol/internal/response_status.rs new file mode 100644 index 00000000..a62b9576 --- /dev/null +++ b/zebra-network/src/protocol/internal/response_status.rs @@ -0,0 +1,101 @@ +//! The status of a response to an inventory request. + +use std::fmt; + +#[cfg(any(test, feature = "proptest-impl"))] +use proptest_derive::Arbitrary; + +use ResponseStatus::*; + +/// A generic peer inventory response status. +/// +/// `Available` is used for inventory that is present in the response, +/// and `Missing` is used for inventory that is missing from the response. +#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] +#[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))] +pub enum ResponseStatus { + /// An available inventory item. + Available(A), + + /// A missing inventory item. + Missing(M), +} + +impl fmt::Display for ResponseStatus { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str(self.command()) + } +} + +impl ResponseStatus { + /// Returns the response status type as a string. + pub fn command(&self) -> &'static str { + match self { + ResponseStatus::Available(_) => "Available", + ResponseStatus::Missing(_) => "Missing", + } + } + + /// Returns true if the inventory item was available. + #[allow(dead_code)] + pub fn is_available(&self) -> bool { + matches!(self, Available(_)) + } + + /// Returns true if the inventory item was missing. + #[allow(dead_code)] + pub fn is_missing(&self) -> bool { + matches!(self, Missing(_)) + } + + /// Maps a `ResponseStatus` to `ResponseStatus` by applying a function to a + /// contained [`Available`] value, leaving the [`Missing`] value untouched. + #[allow(dead_code)] + pub fn map_available B>(self, f: F) -> ResponseStatus { + // Based on Result::map from https://doc.rust-lang.org/src/core/result.rs.html#765 + match self { + Available(a) => Available(f(a)), + Missing(m) => Missing(m), + } + } + + /// Maps a `ResponseStatus` to `ResponseStatus` by applying a function to a + /// contained [`Missing`] value, leaving the [`Available`] value untouched. + #[allow(dead_code)] + pub fn map_missing N>(self, f: F) -> ResponseStatus { + // Based on Result::map_err from https://doc.rust-lang.org/src/core/result.rs.html#850 + match self { + Available(a) => Available(a), + Missing(m) => Missing(f(m)), + } + } + + /// Converts from `&ResponseStatus` to `ResponseStatus<&A, &M>`. + pub fn as_ref(&self) -> ResponseStatus<&A, &M> { + match self { + Available(item) => Available(item), + Missing(item) => Missing(item), + } + } +} + +impl ResponseStatus { + /// Get the available inventory item, if present. + pub fn available(&self) -> Option { + if let Available(item) = self { + Some(item.clone()) + } else { + None + } + } + + /// Get the missing inventory item, if present. + #[allow(dead_code)] + pub fn missing(&self) -> Option { + if let Missing(item) = self { + Some(item.clone()) + } else { + None + } + } +} diff --git a/zebrad/src/components/inbound.rs b/zebrad/src/components/inbound.rs index 734af935..1fb84591 100644 --- a/zebrad/src/components/inbound.rs +++ b/zebrad/src/components/inbound.rs @@ -6,6 +6,7 @@ //! It also responds to peer requests for blocks, transactions, and peer addresses. use std::{ + collections::HashSet, future::Future, pin::Pin, sync::Arc, @@ -23,11 +24,14 @@ use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service, Service use zebra_network as zn; use zebra_state as zs; -use zebra_chain::block::{self, Block}; +use zebra_chain::{ + block::{self, Block}, + transaction::UnminedTxId, +}; use zebra_consensus::chain::VerifyChainError; use zebra_network::{ constants::{ADDR_RESPONSE_LIMIT_DENOMINATOR, MAX_ADDRS_IN_MESSAGE}, - AddressBook, + AddressBook, ResponseStatus, }; // Re-use the syncer timeouts for consistency. @@ -36,6 +40,8 @@ use super::{ sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT}, }; +use ResponseStatus::*; + pub(crate) mod downloads; #[cfg(test)] @@ -307,33 +313,41 @@ impl Service for Inbound { // # Correctness // // Briefly hold the address book threaded mutex while - // cloning the address book. Then sanitize after releasing - // the lock. + // cloning the address book. Then sanitize in the future, + // after releasing the lock. let peers = address_book.lock().unwrap().clone(); - // Correctness: get the current time after acquiring the address book lock. - let now = Utc::now(); + async move { + // Correctness: get the current time after acquiring the address book lock. + let now = Utc::now(); - // Send a sanitized response - let mut peers = peers.sanitized(now); + // Send a sanitized response + let mut peers = peers.sanitized(now); - // Truncate the list - // - // TODO: replace with div_ceil once it stabilises - // https://github.com/rust-lang/rust/issues/88581 - let address_limit = (peers.len() + ADDR_RESPONSE_LIMIT_DENOMINATOR - 1) / ADDR_RESPONSE_LIMIT_DENOMINATOR; - let address_limit = MAX_ADDRS_IN_MESSAGE - .min(address_limit); - peers.truncate(address_limit); + // Truncate the list + // + // TODO: replace with div_ceil once it stabilises + // https://github.com/rust-lang/rust/issues/88581 + let address_limit = (peers.len() + ADDR_RESPONSE_LIMIT_DENOMINATOR - 1) / ADDR_RESPONSE_LIMIT_DENOMINATOR; + let address_limit = MAX_ADDRS_IN_MESSAGE.min(address_limit); + peers.truncate(address_limit); - if !peers.is_empty() { - async { Ok(zn::Response::Peers(peers)) }.boxed() - } else { - debug!("ignoring `Peers` request from remote peer because our address book is empty"); - async { Ok(zn::Response::Nil) }.boxed() - } + if peers.is_empty() { + // We don't know if the peer response will be empty until we've sanitized them. + debug!("ignoring `Peers` request from remote peer because our address book is empty"); + Ok(zn::Response::Nil) + } else { + Ok(zn::Response::Peers(peers)) + } + }.boxed() } zn::Request::BlocksByHash(hashes) => { + // We return an available or missing response to each inventory request, + // unless the request is empty. + if hashes.is_empty() { + return async { Ok(zn::Response::Nil) }.boxed(); + } + // Correctness: // // We can't use `call_all` here, because it can hold one buffer slot per concurrent @@ -344,6 +358,7 @@ impl Service for Inbound { // https://github.com/tower-rs/tower/blob/master/tower/src/util/call_all/common.rs#L112 use futures::stream::TryStreamExt; hashes + .clone() .into_iter() .map(|hash| zs::Request::Block(hash.into())) .map(|request| state.clone().oneshot(request)) @@ -360,22 +375,36 @@ impl Service for Inbound { }) .try_collect::>() .map_ok(|blocks| { - if blocks.is_empty() { - zn::Response::Nil - } else { - zn::Response::Blocks(blocks) - } + // Work out which hashes were missing. + let available_hashes: HashSet = blocks.iter().map(|block| block.hash()).collect(); + let available = blocks.into_iter().map(Available); + let missing = hashes.into_iter().filter(|hash| !available_hashes.contains(hash)).map(Missing); + + zn::Response::Blocks(available.chain(missing).collect()) }) .boxed() } - zn::Request::TransactionsById(transactions) => { - let request = mempool::Request::TransactionsById(transactions); - mempool.clone().oneshot(request).map_ok(|resp| match resp { - mempool::Response::Transactions(transactions) if transactions.is_empty() => zn::Response::Nil, - mempool::Response::Transactions(transactions) => zn::Response::Transactions(transactions), - _ => unreachable!("Mempool component should always respond to a `TransactionsById` request with a `Transactions` response"), - }) - .boxed() + zn::Request::TransactionsById(req_tx_ids) => { + // We return an available or missing response to each inventory request, + // unless the request is empty. + if req_tx_ids.is_empty() { + return async { Ok(zn::Response::Nil) }.boxed(); + } + + let request = mempool::Request::TransactionsById(req_tx_ids.clone()); + mempool.clone().oneshot(request).map_ok(move |resp| { + let transactions = match resp { + mempool::Response::Transactions(transactions) => transactions, + _ => unreachable!("Mempool component should always respond to a `TransactionsById` request with a `Transactions` response"), + }; + + // Work out which transaction IDs were missing. + let available_tx_ids: HashSet = transactions.iter().map(|tx| tx.id).collect(); + let available = transactions.into_iter().map(Available); + let missing = req_tx_ids.into_iter().filter(|tx_id| !available_tx_ids.contains(tx_id)).map(Missing); + + zn::Response::Transactions(available.chain(missing).collect()) + }).boxed() } zn::Request::FindBlocks { known_blocks, stop } => { let request = zs::Request::FindBlockHashes { known_blocks, stop }; diff --git a/zebrad/src/components/inbound/downloads.rs b/zebrad/src/components/inbound/downloads.rs index 163ff079..92ebec72 100644 --- a/zebrad/src/components/inbound/downloads.rs +++ b/zebrad/src/components/inbound/downloads.rs @@ -236,9 +236,12 @@ where ); blocks - .into_iter() - .next() - .expect("successful response has the block in it") + .first() + .expect("just checked length") + .available() + .expect( + "unexpected missing block status: single block failures should be errors", + ) } else { unreachable!("wrong response to block request"); }; diff --git a/zebrad/src/components/inbound/tests.rs b/zebrad/src/components/inbound/tests.rs index c16e73e3..c45b62d8 100644 --- a/zebrad/src/components/inbound/tests.rs +++ b/zebrad/src/components/inbound/tests.rs @@ -1,830 +1,4 @@ //! Inbound service tests. -use std::{ - collections::HashSet, - iter::{self, FromIterator}, - net::SocketAddr, - str::FromStr, - sync::Arc, - time::Duration, -}; - -use futures::FutureExt; -use tokio::{sync::oneshot, task::JoinHandle}; -use tower::{buffer::Buffer, builder::ServiceBuilder, util::BoxService, Service, ServiceExt}; -use tracing::Span; - -use zebra_chain::{ - amount::Amount, - block::Block, - parameters::Network, - serialization::ZcashDeserializeInto, - transaction::{UnminedTx, UnminedTxId, VerifiedUnminedTx}, -}; -use zebra_consensus::{error::TransactionError, transaction, Config as ConsensusConfig}; -use zebra_network::{AddressBook, Request, Response}; -use zebra_state::Config as StateConfig; -use zebra_test::mock_service::{MockService, PanicAssertion}; - -use crate::{ - components::{ - inbound::InboundSetupData, - mempool::{self, gossip_mempool_transaction_id, unmined_transactions_in_blocks, Mempool}, - sync::{self, BlockGossipError, SyncStatus}, - }, - BoxError, -}; - -/// Maximum time to wait for a network service request. -/// -/// The default [`MockService`] value can be too short for some of these tests that take a little -/// longer than expected to actually send the network request. -/// -/// Increasing this value causes the tests to take longer to complete, so it can't be too large. -const MAX_PEER_SET_REQUEST_DELAY: Duration = Duration::from_millis(500); - -#[tokio::test] -async fn mempool_requests_for_transactions() { - let ( - inbound_service, - _mempool_guard, - _committed_blocks, - added_transactions, - _mock_tx_verifier, - mut peer_set, - _state_guard, - sync_gossip_task_handle, - tx_gossip_task_handle, - ) = setup(true).await; - - let added_transactions: Vec = added_transactions - .iter() - .map(|t| t.transaction.clone()) - .collect(); - let added_transaction_ids: Vec = added_transactions.iter().map(|t| t.id).collect(); - - // Test `Request::MempoolTransactionIds` - let response = inbound_service - .clone() - .oneshot(Request::MempoolTransactionIds) - .await; - match response { - Ok(Response::TransactionIds(response)) => assert_eq!(response, added_transaction_ids), - _ => unreachable!( - "`MempoolTransactionIds` requests should always respond `Ok(Vec)`, got {:?}", - response - ), - }; - - // Test `Request::TransactionsById` - let hash_set = added_transaction_ids - .iter() - .copied() - .collect::>(); - - let response = inbound_service - .clone() - .oneshot(Request::TransactionsById(hash_set)) - .await; - - match response { - Ok(Response::Transactions(response)) => assert_eq!(response, added_transactions), - _ => unreachable!("`TransactionsById` requests should always respond `Ok(Vec)`"), - }; - - // check that nothing unexpected happened - peer_set.expect_no_requests().await; - - let sync_gossip_result = sync_gossip_task_handle.now_or_never(); - assert!( - matches!(sync_gossip_result, None), - "unexpected error or panic in sync gossip task: {:?}", - sync_gossip_result, - ); - - let tx_gossip_result = tx_gossip_task_handle.now_or_never(); - assert!( - matches!(tx_gossip_result, None), - "unexpected error or panic in transaction gossip task: {:?}", - tx_gossip_result, - ); -} - -#[tokio::test] -async fn mempool_push_transaction() -> Result<(), crate::BoxError> { - // get a block that has at least one non coinbase transaction - let block: Arc = - zebra_test::vectors::BLOCK_MAINNET_982681_BYTES.zcash_deserialize_into()?; - - // use the first transaction that is not coinbase - let tx = block.transactions[1].clone(); - - let ( - inbound_service, - _mempool_guard, - _committed_blocks, - _added_transactions, - mut tx_verifier, - mut peer_set, - _state_guard, - sync_gossip_task_handle, - tx_gossip_task_handle, - ) = setup(false).await; - - // Test `Request::PushTransaction` - let request = inbound_service - .clone() - .oneshot(Request::PushTransaction(tx.clone().into())); - // Simulate a successful transaction verification - let verification = tx_verifier.expect_request_that(|_| true).map(|responder| { - let transaction = responder - .request() - .clone() - .into_mempool_transaction() - .expect("unexpected non-mempool request"); - - // Set a dummy fee. - responder.respond(transaction::Response::from(VerifiedUnminedTx::new( - transaction, - Amount::zero(), - ))); - }); - let (response, _) = futures::join!(request, verification); - match response { - Ok(Response::Nil) => (), - _ => unreachable!("`PushTransaction` requests should always respond `Ok(Nil)`"), - }; - - // Use `Request::MempoolTransactionIds` to check the transaction was inserted to mempool - let request = inbound_service - .clone() - .oneshot(Request::MempoolTransactionIds) - .await; - - match request { - Ok(Response::TransactionIds(response)) => assert_eq!(response, vec![tx.unmined_id()]), - _ => unreachable!( - "`MempoolTransactionIds` requests should always respond `Ok(Vec)`" - ), - }; - - // Make sure there is an additional request broadcasting the - // inserted transaction to peers. - let mut hs = HashSet::new(); - hs.insert(tx.unmined_id()); - peer_set - .expect_request(Request::AdvertiseTransactionIds(hs)) - .await - .respond(Response::Nil); - - let sync_gossip_result = sync_gossip_task_handle.now_or_never(); - assert!( - matches!(sync_gossip_result, None), - "unexpected error or panic in sync gossip task: {:?}", - sync_gossip_result, - ); - - let tx_gossip_result = tx_gossip_task_handle.now_or_never(); - assert!( - matches!(tx_gossip_result, None), - "unexpected error or panic in transaction gossip task: {:?}", - tx_gossip_result, - ); - - Ok(()) -} - -#[tokio::test] -async fn mempool_advertise_transaction_ids() -> Result<(), crate::BoxError> { - // get a block that has at least one non coinbase transaction - let block: Block = zebra_test::vectors::BLOCK_MAINNET_982681_BYTES.zcash_deserialize_into()?; - - // use the first transaction that is not coinbase - let test_transaction = block - .transactions - .into_iter() - .find(|tx| !tx.has_any_coinbase_inputs()) - .expect("at least one non-coinbase transaction"); - let test_transaction_id = test_transaction.unmined_id(); - let txs = HashSet::from_iter([test_transaction_id]); - - let ( - inbound_service, - _mempool_guard, - _committed_blocks, - _added_transactions, - mut tx_verifier, - mut peer_set, - _state_guard, - sync_gossip_task_handle, - tx_gossip_task_handle, - ) = setup(false).await; - - // Test `Request::AdvertiseTransactionIds` - let request = inbound_service - .clone() - .oneshot(Request::AdvertiseTransactionIds(txs.clone())); - // Ensure the mocked peer set responds - let peer_set_responder = - peer_set - .expect_request(Request::TransactionsById(txs)) - .map(|responder| { - let unmined_transaction = UnminedTx::from(test_transaction.clone()); - responder.respond(Response::Transactions(vec![unmined_transaction])) - }); - // Simulate a successful transaction verification - let verification = tx_verifier.expect_request_that(|_| true).map(|responder| { - let transaction = responder - .request() - .clone() - .into_mempool_transaction() - .expect("unexpected non-mempool request"); - - // Set a dummy fee. - responder.respond(transaction::Response::from(VerifiedUnminedTx::new( - transaction, - Amount::zero(), - ))); - }); - let (response, _, _) = futures::join!(request, peer_set_responder, verification); - - match response { - Ok(Response::Nil) => (), - _ => unreachable!("`AdvertiseTransactionIds` requests should always respond `Ok(Nil)`"), - }; - - // Use `Request::MempoolTransactionIds` to check the transaction was inserted to mempool - let request = inbound_service - .clone() - .oneshot(Request::MempoolTransactionIds) - .await; - - match request { - Ok(Response::TransactionIds(response)) => { - assert_eq!(response, vec![test_transaction_id]) - } - _ => unreachable!( - "`MempoolTransactionIds` requests should always respond `Ok(Vec)`" - ), - }; - - // Make sure there is an additional request broadcasting the - // inserted transaction to peers. - let mut hs = HashSet::new(); - hs.insert(test_transaction.unmined_id()); - peer_set - .expect_request(Request::AdvertiseTransactionIds(hs)) - .await - .respond(Response::Nil); - - let sync_gossip_result = sync_gossip_task_handle.now_or_never(); - assert!( - matches!(sync_gossip_result, None), - "unexpected error or panic in sync gossip task: {:?}", - sync_gossip_result, - ); - - let tx_gossip_result = tx_gossip_task_handle.now_or_never(); - assert!( - matches!(tx_gossip_result, None), - "unexpected error or panic in transaction gossip task: {:?}", - tx_gossip_result, - ); - - Ok(()) -} - -#[tokio::test] -async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> { - // Get a block that has at least one non coinbase transaction - let block: Block = zebra_test::vectors::BLOCK_MAINNET_982681_BYTES.zcash_deserialize_into()?; - - // Use the first transaction that is not coinbase to test expiration - let tx1 = &*(block.transactions[1]).clone(); - let mut tx1_id = tx1.unmined_id(); - - // Change the expiration height of the transaction to block 3 - let mut tx1 = tx1.clone(); - *tx1.expiry_height_mut() = zebra_chain::block::Height(3); - - // Use the second transaction that is not coinbase to trigger `remove_expired_transactions()` - let tx2 = block.transactions[2].clone(); - let mut tx2_id = tx2.unmined_id(); - - // Get services - let ( - inbound_service, - mempool, - _committed_blocks, - _added_transactions, - mut tx_verifier, - mut peer_set, - state_service, - sync_gossip_task_handle, - tx_gossip_task_handle, - ) = setup(false).await; - - // Push test transaction - let request = inbound_service - .clone() - .oneshot(Request::PushTransaction(tx1.clone().into())); - // Simulate a successful transaction verification - let verification = tx_verifier.expect_request_that(|_| true).map(|responder| { - tx1_id = responder.request().tx_id(); - let transaction = responder - .request() - .clone() - .into_mempool_transaction() - .expect("unexpected non-mempool request"); - - // Set a dummy fee. - responder.respond(transaction::Response::from(VerifiedUnminedTx::new( - transaction, - Amount::zero(), - ))); - }); - let (response, _) = futures::join!(request, verification); - match response { - Ok(Response::Nil) => (), - _ => unreachable!("`PushTransaction` requests should always respond `Ok(Nil)`"), - }; - - // Use `Request::MempoolTransactionIds` to check the transaction was inserted to mempool - let request = inbound_service - .clone() - .oneshot(Request::MempoolTransactionIds) - .await; - - match request { - Ok(Response::TransactionIds(response)) => { - assert_eq!(response, vec![tx1_id]) - } - _ => unreachable!( - "`MempoolTransactionIds` requests should always respond `Ok(Vec)`" - ), - }; - - // Add a new block to the state (make the chain tip advance) - let block_two: Arc = zebra_test::vectors::BLOCK_MAINNET_2_BYTES - .zcash_deserialize_into() - .unwrap(); - state_service - .clone() - .oneshot(zebra_state::Request::CommitFinalizedBlock( - block_two.clone().into(), - )) - .await - .unwrap(); - - // Test transaction 1 is gossiped - let mut hs = HashSet::new(); - hs.insert(tx1_id); - peer_set - .expect_request(Request::AdvertiseTransactionIds(hs)) - .await - .respond(Response::Nil); - - // Block is gossiped then - peer_set - .expect_request(Request::AdvertiseBlock(block_two.hash())) - .await - .respond(Response::Nil); - - // Make sure tx1 is still in the mempool as it is not expired yet. - let request = inbound_service - .clone() - .oneshot(Request::MempoolTransactionIds) - .await; - - match request { - Ok(Response::TransactionIds(response)) => { - assert_eq!(response, vec![tx1_id]) - } - _ => unreachable!( - "`MempoolTransactionIds` requests should always respond `Ok(Vec)`" - ), - }; - - // As our test transaction will expire at a block height greater or equal to 3 we need to push block 3. - let block_three: Arc = zebra_test::vectors::BLOCK_MAINNET_3_BYTES - .zcash_deserialize_into() - .unwrap(); - state_service - .clone() - .oneshot(zebra_state::Request::CommitFinalizedBlock( - block_three.clone().into(), - )) - .await - .unwrap(); - - // Block is gossiped - peer_set - .expect_request(Request::AdvertiseBlock(block_three.hash())) - .await - .respond(Response::Nil); - - // Push a second transaction to trigger `remove_expired_transactions()` - let request = inbound_service - .clone() - .oneshot(Request::PushTransaction(tx2.clone().into())); - // Simulate a successful transaction verification - let verification = tx_verifier.expect_request_that(|_| true).map(|responder| { - tx2_id = responder.request().tx_id(); - let transaction = responder - .request() - .clone() - .into_mempool_transaction() - .expect("unexpected non-mempool request"); - - // Set a dummy fee. - responder.respond(transaction::Response::from(VerifiedUnminedTx::new( - transaction, - Amount::zero(), - ))); - }); - let (response, _) = futures::join!(request, verification); - match response { - Ok(Response::Nil) => (), - _ => unreachable!("`PushTransaction` requests should always respond `Ok(Nil)`"), - }; - - // Use `Request::MempoolTransactionIds` to check the transaction was inserted to mempool - let request = inbound_service - .clone() - .oneshot(Request::MempoolTransactionIds) - .await; - - // Only tx2 will be in the mempool while tx1 was expired - match request { - Ok(Response::TransactionIds(response)) => { - assert_eq!(response, vec![tx2_id]) - } - _ => unreachable!( - "`MempoolTransactionIds` requests should always respond `Ok(Vec)`" - ), - }; - // Check if tx1 was added to the rejected list as well - let response = mempool - .clone() - .oneshot(mempool::Request::Queue(vec![tx1_id.into()])) - .await - .unwrap(); - - let queued_responses = match response { - mempool::Response::Queued(queue_responses) => queue_responses, - _ => unreachable!("will never happen in this test"), - }; - - assert_eq!(queued_responses.len(), 1); - assert_eq!( - queued_responses[0], - Err(mempool::MempoolError::StorageEffectsChain( - mempool::SameEffectsChainRejectionError::Expired - )) - ); - - // Test transaction 2 is gossiped - let mut hs = HashSet::new(); - hs.insert(tx2_id); - peer_set - .expect_request(Request::AdvertiseTransactionIds(hs)) - .await - .respond(Response::Nil); - - // Add all the rest of the continuous blocks we have to test tx2 will never expire. - let more_blocks: Vec> = vec![ - zebra_test::vectors::BLOCK_MAINNET_4_BYTES - .zcash_deserialize_into() - .unwrap(), - zebra_test::vectors::BLOCK_MAINNET_5_BYTES - .zcash_deserialize_into() - .unwrap(), - zebra_test::vectors::BLOCK_MAINNET_6_BYTES - .zcash_deserialize_into() - .unwrap(), - zebra_test::vectors::BLOCK_MAINNET_7_BYTES - .zcash_deserialize_into() - .unwrap(), - zebra_test::vectors::BLOCK_MAINNET_8_BYTES - .zcash_deserialize_into() - .unwrap(), - zebra_test::vectors::BLOCK_MAINNET_9_BYTES - .zcash_deserialize_into() - .unwrap(), - zebra_test::vectors::BLOCK_MAINNET_10_BYTES - .zcash_deserialize_into() - .unwrap(), - ]; - for block in more_blocks { - state_service - .clone() - .oneshot(zebra_state::Request::CommitFinalizedBlock( - block.clone().into(), - )) - .await - .unwrap(); - - // Block is gossiped - peer_set - .expect_request(Request::AdvertiseBlock(block.hash())) - .await - .respond(Response::Nil); - - let request = inbound_service - .clone() - .oneshot(Request::MempoolTransactionIds) - .await; - - // tx2 is still in the mempool as the blockchain progress because the zero expiration height - match request { - Ok(Response::TransactionIds(response)) => { - assert_eq!(response, vec![tx2_id]) - } - _ => unreachable!( - "`MempoolTransactionIds` requests should always respond `Ok(Vec)`" - ), - }; - } - - // check that nothing unexpected happened - peer_set.expect_no_requests().await; - - let sync_gossip_result = sync_gossip_task_handle.now_or_never(); - assert!( - matches!(sync_gossip_result, None), - "unexpected error or panic in sync gossip task: {:?}", - sync_gossip_result, - ); - - let tx_gossip_result = tx_gossip_task_handle.now_or_never(); - assert!( - matches!(tx_gossip_result, None), - "unexpected error or panic in transaction gossip task: {:?}", - tx_gossip_result, - ); - - Ok(()) -} - -/// Test that the inbound downloader rejects blocks above the lookahead limit. -/// -/// TODO: also test that it rejects blocks behind the tip limit. (Needs ~100 fake blocks.) -#[tokio::test] -async fn inbound_block_height_lookahead_limit() -> Result<(), crate::BoxError> { - // Get services - let ( - inbound_service, - _mempool, - _committed_blocks, - _added_transactions, - mut tx_verifier, - mut peer_set, - state_service, - sync_gossip_task_handle, - tx_gossip_task_handle, - ) = setup(false).await; - - // Get the next block - let block: Arc = zebra_test::vectors::BLOCK_MAINNET_2_BYTES.zcash_deserialize_into()?; - let block_hash = block.hash(); - - // Push test block hash - let _request = inbound_service - .clone() - .oneshot(Request::AdvertiseBlock(block_hash)) - .await?; - - // Block is fetched, and committed to the state - peer_set - .expect_request(Request::BlocksByHash(iter::once(block_hash).collect())) - .await - .respond(Response::Blocks(vec![block])); - - // TODO: check that the block is queued in the checkpoint verifier - - // check that nothing unexpected happened - peer_set.expect_no_requests().await; - tx_verifier.expect_no_requests().await; - - // Get a block that is a long way away from genesis - let block: Arc = - zebra_test::vectors::BLOCK_MAINNET_982681_BYTES.zcash_deserialize_into()?; - let block_hash = block.hash(); - - // Push test block hash - let _request = inbound_service - .clone() - .oneshot(Request::AdvertiseBlock(block_hash)) - .await?; - - // Block is fetched, but the downloader drops it because it is too high - peer_set - .expect_request(Request::BlocksByHash(iter::once(block_hash).collect())) - .await - .respond(Response::Blocks(vec![block])); - - let response = state_service - .clone() - .oneshot(zebra_state::Request::Depth(block_hash)) - .await?; - assert_eq!(response, zebra_state::Response::Depth(None)); - - // TODO: check that the block is not queued in the checkpoint verifier or non-finalized state - - // check that nothing unexpected happened - peer_set.expect_no_requests().await; - tx_verifier.expect_no_requests().await; - - let sync_gossip_result = sync_gossip_task_handle.now_or_never(); - assert!( - matches!(sync_gossip_result, None), - "unexpected error or panic in sync gossip task: {:?}", - sync_gossip_result, - ); - - let tx_gossip_result = tx_gossip_task_handle.now_or_never(); - assert!( - matches!(tx_gossip_result, None), - "unexpected error or panic in transaction gossip task: {:?}", - tx_gossip_result, - ); - - Ok(()) -} - -async fn setup( - add_transactions: bool, -) -> ( - Buffer< - BoxService, - zebra_network::Request, - >, - Buffer, mempool::Request>, - Vec>, - Vec, - MockService, - MockService, - Buffer, zebra_state::Request>, - JoinHandle>, - JoinHandle>, -) { - let network = Network::Mainnet; - let consensus_config = ConsensusConfig::default(); - let state_config = StateConfig::ephemeral(); - let address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Span::none()); - let address_book = Arc::new(std::sync::Mutex::new(address_book)); - let (sync_status, mut recent_syncs) = SyncStatus::new(); - let (state, latest_chain_tip, chain_tip_change) = - zebra_state::init(state_config.clone(), network); - - let mut state_service = ServiceBuilder::new().buffer(1).service(state); - - // Download task panics and timeouts are propagated to the tests that use Groth16 verifiers. - let (block_verifier, _transaction_verifier, _groth16_download_handle) = - zebra_consensus::chain::init( - consensus_config.clone(), - network, - state_service.clone(), - true, - ) - .await; - - let mut peer_set = MockService::build() - .with_max_request_delay(MAX_PEER_SET_REQUEST_DELAY) - .for_unit_tests(); - let buffered_peer_set = Buffer::new(BoxService::new(peer_set.clone()), 10); - - let mock_tx_verifier = MockService::build().for_unit_tests(); - let buffered_tx_verifier = Buffer::new(BoxService::new(mock_tx_verifier.clone()), 10); - - let mut committed_blocks = Vec::new(); - - // Push the genesis block to the state. - // This must be done before creating the mempool to avoid `chain_tip_change` - // returning "reset" which would clear the mempool. - let genesis_block: Arc = zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES - .zcash_deserialize_into() - .unwrap(); - state_service - .ready() - .await - .unwrap() - .call(zebra_state::Request::CommitFinalizedBlock( - genesis_block.clone().into(), - )) - .await - .unwrap(); - committed_blocks.push(genesis_block); - - // Also push block 1. - // Block one is a network upgrade and the mempool will be cleared at it, - // let all our tests start after this event. - let block_one: Arc = zebra_test::vectors::BLOCK_MAINNET_1_BYTES - .zcash_deserialize_into() - .unwrap(); - state_service - .clone() - .oneshot(zebra_state::Request::CommitFinalizedBlock( - block_one.clone().into(), - )) - .await - .unwrap(); - committed_blocks.push(block_one); - - let (mut mempool_service, transaction_receiver) = Mempool::new( - &mempool::Config::default(), - buffered_peer_set.clone(), - state_service.clone(), - buffered_tx_verifier.clone(), - sync_status.clone(), - latest_chain_tip.clone(), - chain_tip_change.clone(), - ); - - // Enable the mempool - mempool_service.enable(&mut recent_syncs).await; - - // Add transactions to the mempool, skipping verification and broadcast - let mut added_transactions = Vec::new(); - if add_transactions { - added_transactions.extend(add_some_stuff_to_mempool(&mut mempool_service, network)); - } - - let mempool_service = BoxService::new(mempool_service); - let mempool_service = ServiceBuilder::new().buffer(1).service(mempool_service); - - let (setup_tx, setup_rx) = oneshot::channel(); - - let inbound_service = ServiceBuilder::new() - .load_shed() - .service(super::Inbound::new(setup_rx)); - let inbound_service = BoxService::new(inbound_service); - let inbound_service = ServiceBuilder::new().buffer(1).service(inbound_service); - - let setup_data = InboundSetupData { - address_book, - block_download_peer_set: buffered_peer_set, - block_verifier, - mempool: mempool_service.clone(), - state: state_service.clone(), - latest_chain_tip, - }; - let r = setup_tx.send(setup_data); - // We can't expect or unwrap because the returned Result does not implement Debug - assert!(r.is_ok(), "unexpected setup channel send failure"); - - let sync_gossip_task_handle = tokio::spawn(sync::gossip_best_tip_block_hashes( - sync_status.clone(), - chain_tip_change, - peer_set.clone(), - )); - - let tx_gossip_task_handle = tokio::spawn(gossip_mempool_transaction_id( - transaction_receiver, - peer_set.clone(), - )); - - // Make sure there is an additional request broadcasting the - // committed blocks to peers. - // - // (The genesis block gets skipped, because block 1 is committed before the task is spawned.) - for block in committed_blocks.iter().skip(1) { - peer_set - .expect_request(Request::AdvertiseBlock(block.hash())) - .await - .respond(Response::Nil); - } - - ( - inbound_service, - mempool_service, - committed_blocks, - added_transactions, - mock_tx_verifier, - peer_set, - state_service, - sync_gossip_task_handle, - tx_gossip_task_handle, - ) -} - -/// Manually add a transaction to the mempool storage. -/// -/// Skips some mempool functionality, like transaction verification and peer broadcasts. -fn add_some_stuff_to_mempool( - mempool_service: &mut Mempool, - network: Network, -) -> Vec { - // get the genesis block coinbase transaction from the Zcash blockchain. - let genesis_transactions: Vec<_> = unmined_transactions_in_blocks(..=0, network) - .take(1) - .collect(); - - // Insert the genesis block coinbase transaction into the mempool storage. - mempool_service - .storage() - .insert(genesis_transactions[0].clone()) - .unwrap(); - - genesis_transactions -} +mod fake_peer_set; +mod real_peer_set; diff --git a/zebrad/src/components/inbound/tests/fake_peer_set.rs b/zebrad/src/components/inbound/tests/fake_peer_set.rs new file mode 100644 index 00000000..d29623ee --- /dev/null +++ b/zebrad/src/components/inbound/tests/fake_peer_set.rs @@ -0,0 +1,848 @@ +//! Inbound service tests with a fake peer set. + +use std::{ + collections::HashSet, + iter::{self, FromIterator}, + net::SocketAddr, + str::FromStr, + sync::Arc, + time::Duration, +}; + +use futures::FutureExt; +use tokio::{sync::oneshot, task::JoinHandle}; +use tower::{buffer::Buffer, builder::ServiceBuilder, util::BoxService, Service, ServiceExt}; +use tracing::Span; + +use zebra_chain::{ + amount::Amount, + block::Block, + parameters::Network, + serialization::ZcashDeserializeInto, + transaction::{UnminedTx, UnminedTxId, VerifiedUnminedTx}, +}; +use zebra_consensus::{error::TransactionError, transaction, Config as ConsensusConfig}; +use zebra_network::{AddressBook, Request, Response, ResponseStatus}; +use zebra_state::Config as StateConfig; +use zebra_test::mock_service::{MockService, PanicAssertion}; + +use crate::{ + components::{ + inbound::{Inbound, InboundSetupData}, + mempool::{self, gossip_mempool_transaction_id, unmined_transactions_in_blocks, Mempool}, + sync::{self, BlockGossipError, SyncStatus}, + }, + BoxError, +}; + +use ResponseStatus::*; + +/// Maximum time to wait for a network service request. +/// +/// The default [`MockService`] value can be too short for some of these tests that take a little +/// longer than expected to actually send the network request. +/// +/// Increasing this value causes the tests to take longer to complete, so it can't be too large. +const MAX_PEER_SET_REQUEST_DELAY: Duration = Duration::from_millis(500); + +#[tokio::test] +async fn mempool_requests_for_transactions() { + let ( + inbound_service, + _mempool_guard, + _committed_blocks, + added_transactions, + _mock_tx_verifier, + mut peer_set, + _state_guard, + sync_gossip_task_handle, + tx_gossip_task_handle, + ) = setup(true).await; + + let added_transactions: Vec = added_transactions + .iter() + .map(|t| t.transaction.clone()) + .collect(); + let added_transaction_ids: Vec = added_transactions.iter().map(|t| t.id).collect(); + + // Test `Request::MempoolTransactionIds` + let response = inbound_service + .clone() + .oneshot(Request::MempoolTransactionIds) + .await; + match response { + Ok(Response::TransactionIds(response)) => assert_eq!(response, added_transaction_ids), + _ => unreachable!( + "`MempoolTransactionIds` requests should always respond `Ok(Vec)`, got {:?}", + response + ), + }; + + // Test `Request::TransactionsById` + let hash_set = added_transaction_ids + .iter() + .copied() + .collect::>(); + + let response = inbound_service + .clone() + .oneshot(Request::TransactionsById(hash_set)) + .await; + + match response { + Ok(Response::Transactions(response)) => { + assert_eq!( + response, + added_transactions + .into_iter() + .map(Available) + .collect::>(), + ) + } + _ => unreachable!("`TransactionsById` requests should always respond `Ok(Vec)`"), + }; + + // check that nothing unexpected happened + peer_set.expect_no_requests().await; + + let sync_gossip_result = sync_gossip_task_handle.now_or_never(); + assert!( + matches!(sync_gossip_result, None), + "unexpected error or panic in sync gossip task: {:?}", + sync_gossip_result, + ); + + let tx_gossip_result = tx_gossip_task_handle.now_or_never(); + assert!( + matches!(tx_gossip_result, None), + "unexpected error or panic in transaction gossip task: {:?}", + tx_gossip_result, + ); +} + +#[tokio::test] +async fn mempool_push_transaction() -> Result<(), crate::BoxError> { + // get a block that has at least one non coinbase transaction + let block: Arc = + zebra_test::vectors::BLOCK_MAINNET_982681_BYTES.zcash_deserialize_into()?; + + // use the first transaction that is not coinbase + let tx = block.transactions[1].clone(); + + let ( + inbound_service, + _mempool_guard, + _committed_blocks, + _added_transactions, + mut tx_verifier, + mut peer_set, + _state_guard, + sync_gossip_task_handle, + tx_gossip_task_handle, + ) = setup(false).await; + + // Test `Request::PushTransaction` + let request = inbound_service + .clone() + .oneshot(Request::PushTransaction(tx.clone().into())); + // Simulate a successful transaction verification + let verification = tx_verifier.expect_request_that(|_| true).map(|responder| { + let transaction = responder + .request() + .clone() + .into_mempool_transaction() + .expect("unexpected non-mempool request"); + + // Set a dummy fee. + responder.respond(transaction::Response::from(VerifiedUnminedTx::new( + transaction, + Amount::zero(), + ))); + }); + let (response, _) = futures::join!(request, verification); + match response { + Ok(Response::Nil) => (), + _ => unreachable!("`PushTransaction` requests should always respond `Ok(Nil)`"), + }; + + // Use `Request::MempoolTransactionIds` to check the transaction was inserted to mempool + let request = inbound_service + .clone() + .oneshot(Request::MempoolTransactionIds) + .await; + + match request { + Ok(Response::TransactionIds(response)) => assert_eq!(response, vec![tx.unmined_id()]), + _ => unreachable!( + "`MempoolTransactionIds` requests should always respond `Ok(Vec)`" + ), + }; + + // Make sure there is an additional request broadcasting the + // inserted transaction to peers. + let mut hs = HashSet::new(); + hs.insert(tx.unmined_id()); + peer_set + .expect_request(Request::AdvertiseTransactionIds(hs)) + .await + .respond(Response::Nil); + + let sync_gossip_result = sync_gossip_task_handle.now_or_never(); + assert!( + matches!(sync_gossip_result, None), + "unexpected error or panic in sync gossip task: {:?}", + sync_gossip_result, + ); + + let tx_gossip_result = tx_gossip_task_handle.now_or_never(); + assert!( + matches!(tx_gossip_result, None), + "unexpected error or panic in transaction gossip task: {:?}", + tx_gossip_result, + ); + + Ok(()) +} + +#[tokio::test] +async fn mempool_advertise_transaction_ids() -> Result<(), crate::BoxError> { + // get a block that has at least one non coinbase transaction + let block: Block = zebra_test::vectors::BLOCK_MAINNET_982681_BYTES.zcash_deserialize_into()?; + + // use the first transaction that is not coinbase + let test_transaction = block + .transactions + .into_iter() + .find(|tx| !tx.has_any_coinbase_inputs()) + .expect("at least one non-coinbase transaction"); + let test_transaction_id = test_transaction.unmined_id(); + let txs = HashSet::from_iter([test_transaction_id]); + + let ( + inbound_service, + _mempool_guard, + _committed_blocks, + _added_transactions, + mut tx_verifier, + mut peer_set, + _state_guard, + sync_gossip_task_handle, + tx_gossip_task_handle, + ) = setup(false).await; + + // Test `Request::AdvertiseTransactionIds` + let request = inbound_service + .clone() + .oneshot(Request::AdvertiseTransactionIds(txs.clone())); + // Ensure the mocked peer set responds + let peer_set_responder = + peer_set + .expect_request(Request::TransactionsById(txs)) + .map(|responder| { + let unmined_transaction = UnminedTx::from(test_transaction.clone()); + responder.respond(Response::Transactions(vec![Available(unmined_transaction)])) + }); + // Simulate a successful transaction verification + let verification = tx_verifier.expect_request_that(|_| true).map(|responder| { + let transaction = responder + .request() + .clone() + .into_mempool_transaction() + .expect("unexpected non-mempool request"); + + // Set a dummy fee. + responder.respond(transaction::Response::from(VerifiedUnminedTx::new( + transaction, + Amount::zero(), + ))); + }); + let (response, _, _) = futures::join!(request, peer_set_responder, verification); + + match response { + Ok(Response::Nil) => (), + _ => unreachable!("`AdvertiseTransactionIds` requests should always respond `Ok(Nil)`"), + }; + + // Use `Request::MempoolTransactionIds` to check the transaction was inserted to mempool + let request = inbound_service + .clone() + .oneshot(Request::MempoolTransactionIds) + .await; + + match request { + Ok(Response::TransactionIds(response)) => { + assert_eq!(response, vec![test_transaction_id]) + } + _ => unreachable!( + "`MempoolTransactionIds` requests should always respond `Ok(Vec)`" + ), + }; + + // Make sure there is an additional request broadcasting the + // inserted transaction to peers. + let mut hs = HashSet::new(); + hs.insert(test_transaction.unmined_id()); + peer_set + .expect_request(Request::AdvertiseTransactionIds(hs)) + .await + .respond(Response::Nil); + + let sync_gossip_result = sync_gossip_task_handle.now_or_never(); + assert!( + matches!(sync_gossip_result, None), + "unexpected error or panic in sync gossip task: {:?}", + sync_gossip_result, + ); + + let tx_gossip_result = tx_gossip_task_handle.now_or_never(); + assert!( + matches!(tx_gossip_result, None), + "unexpected error or panic in transaction gossip task: {:?}", + tx_gossip_result, + ); + + Ok(()) +} + +#[tokio::test] +async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> { + // Get a block that has at least one non coinbase transaction + let block: Block = zebra_test::vectors::BLOCK_MAINNET_982681_BYTES.zcash_deserialize_into()?; + + // Use the first transaction that is not coinbase to test expiration + let tx1 = &*(block.transactions[1]).clone(); + let mut tx1_id = tx1.unmined_id(); + + // Change the expiration height of the transaction to block 3 + let mut tx1 = tx1.clone(); + *tx1.expiry_height_mut() = zebra_chain::block::Height(3); + + // Use the second transaction that is not coinbase to trigger `remove_expired_transactions()` + let tx2 = block.transactions[2].clone(); + let mut tx2_id = tx2.unmined_id(); + + // Get services + let ( + inbound_service, + mempool, + _committed_blocks, + _added_transactions, + mut tx_verifier, + mut peer_set, + state_service, + sync_gossip_task_handle, + tx_gossip_task_handle, + ) = setup(false).await; + + // Push test transaction + let request = inbound_service + .clone() + .oneshot(Request::PushTransaction(tx1.clone().into())); + // Simulate a successful transaction verification + let verification = tx_verifier.expect_request_that(|_| true).map(|responder| { + tx1_id = responder.request().tx_id(); + let transaction = responder + .request() + .clone() + .into_mempool_transaction() + .expect("unexpected non-mempool request"); + + // Set a dummy fee. + responder.respond(transaction::Response::from(VerifiedUnminedTx::new( + transaction, + Amount::zero(), + ))); + }); + let (response, _) = futures::join!(request, verification); + match response { + Ok(Response::Nil) => (), + _ => unreachable!("`PushTransaction` requests should always respond `Ok(Nil)`"), + }; + + // Use `Request::MempoolTransactionIds` to check the transaction was inserted to mempool + let request = inbound_service + .clone() + .oneshot(Request::MempoolTransactionIds) + .await; + + match request { + Ok(Response::TransactionIds(response)) => { + assert_eq!(response, vec![tx1_id]) + } + _ => unreachable!( + "`MempoolTransactionIds` requests should always respond `Ok(Vec)`" + ), + }; + + // Add a new block to the state (make the chain tip advance) + let block_two: Arc = zebra_test::vectors::BLOCK_MAINNET_2_BYTES + .zcash_deserialize_into() + .unwrap(); + state_service + .clone() + .oneshot(zebra_state::Request::CommitFinalizedBlock( + block_two.clone().into(), + )) + .await + .unwrap(); + + // Test transaction 1 is gossiped + let mut hs = HashSet::new(); + hs.insert(tx1_id); + peer_set + .expect_request(Request::AdvertiseTransactionIds(hs)) + .await + .respond(Response::Nil); + + // Block is gossiped then + peer_set + .expect_request(Request::AdvertiseBlock(block_two.hash())) + .await + .respond(Response::Nil); + + // Make sure tx1 is still in the mempool as it is not expired yet. + let request = inbound_service + .clone() + .oneshot(Request::MempoolTransactionIds) + .await; + + match request { + Ok(Response::TransactionIds(response)) => { + assert_eq!(response, vec![tx1_id]) + } + _ => unreachable!( + "`MempoolTransactionIds` requests should always respond `Ok(Vec)`" + ), + }; + + // As our test transaction will expire at a block height greater or equal to 3 we need to push block 3. + let block_three: Arc = zebra_test::vectors::BLOCK_MAINNET_3_BYTES + .zcash_deserialize_into() + .unwrap(); + state_service + .clone() + .oneshot(zebra_state::Request::CommitFinalizedBlock( + block_three.clone().into(), + )) + .await + .unwrap(); + + // Block is gossiped + peer_set + .expect_request(Request::AdvertiseBlock(block_three.hash())) + .await + .respond(Response::Nil); + + // Push a second transaction to trigger `remove_expired_transactions()` + let request = inbound_service + .clone() + .oneshot(Request::PushTransaction(tx2.clone().into())); + // Simulate a successful transaction verification + let verification = tx_verifier.expect_request_that(|_| true).map(|responder| { + tx2_id = responder.request().tx_id(); + let transaction = responder + .request() + .clone() + .into_mempool_transaction() + .expect("unexpected non-mempool request"); + + // Set a dummy fee. + responder.respond(transaction::Response::from(VerifiedUnminedTx::new( + transaction, + Amount::zero(), + ))); + }); + let (response, _) = futures::join!(request, verification); + match response { + Ok(Response::Nil) => (), + _ => unreachable!("`PushTransaction` requests should always respond `Ok(Nil)`"), + }; + + // Use `Request::MempoolTransactionIds` to check the transaction was inserted to mempool + let request = inbound_service + .clone() + .oneshot(Request::MempoolTransactionIds) + .await; + + // Only tx2 will be in the mempool while tx1 was expired + match request { + Ok(Response::TransactionIds(response)) => { + assert_eq!(response, vec![tx2_id]) + } + _ => unreachable!( + "`MempoolTransactionIds` requests should always respond `Ok(Vec)`" + ), + }; + // Check if tx1 was added to the rejected list as well + let response = mempool + .clone() + .oneshot(mempool::Request::Queue(vec![tx1_id.into()])) + .await + .unwrap(); + + let queued_responses = match response { + mempool::Response::Queued(queue_responses) => queue_responses, + _ => unreachable!("will never happen in this test"), + }; + + assert_eq!(queued_responses.len(), 1); + assert_eq!( + queued_responses[0], + Err(mempool::MempoolError::StorageEffectsChain( + mempool::SameEffectsChainRejectionError::Expired + )) + ); + + // Test transaction 2 is gossiped + let mut hs = HashSet::new(); + hs.insert(tx2_id); + peer_set + .expect_request(Request::AdvertiseTransactionIds(hs)) + .await + .respond(Response::Nil); + + // Add all the rest of the continuous blocks we have to test tx2 will never expire. + let more_blocks: Vec> = vec![ + zebra_test::vectors::BLOCK_MAINNET_4_BYTES + .zcash_deserialize_into() + .unwrap(), + zebra_test::vectors::BLOCK_MAINNET_5_BYTES + .zcash_deserialize_into() + .unwrap(), + zebra_test::vectors::BLOCK_MAINNET_6_BYTES + .zcash_deserialize_into() + .unwrap(), + zebra_test::vectors::BLOCK_MAINNET_7_BYTES + .zcash_deserialize_into() + .unwrap(), + zebra_test::vectors::BLOCK_MAINNET_8_BYTES + .zcash_deserialize_into() + .unwrap(), + zebra_test::vectors::BLOCK_MAINNET_9_BYTES + .zcash_deserialize_into() + .unwrap(), + zebra_test::vectors::BLOCK_MAINNET_10_BYTES + .zcash_deserialize_into() + .unwrap(), + ]; + for block in more_blocks { + state_service + .clone() + .oneshot(zebra_state::Request::CommitFinalizedBlock( + block.clone().into(), + )) + .await + .unwrap(); + + // Block is gossiped + peer_set + .expect_request(Request::AdvertiseBlock(block.hash())) + .await + .respond(Response::Nil); + + let request = inbound_service + .clone() + .oneshot(Request::MempoolTransactionIds) + .await; + + // tx2 is still in the mempool as the blockchain progress because the zero expiration height + match request { + Ok(Response::TransactionIds(response)) => { + assert_eq!(response, vec![tx2_id]) + } + _ => unreachable!( + "`MempoolTransactionIds` requests should always respond `Ok(Vec)`" + ), + }; + } + + // check that nothing unexpected happened + peer_set.expect_no_requests().await; + + let sync_gossip_result = sync_gossip_task_handle.now_or_never(); + assert!( + matches!(sync_gossip_result, None), + "unexpected error or panic in sync gossip task: {:?}", + sync_gossip_result, + ); + + let tx_gossip_result = tx_gossip_task_handle.now_or_never(); + assert!( + matches!(tx_gossip_result, None), + "unexpected error or panic in transaction gossip task: {:?}", + tx_gossip_result, + ); + + Ok(()) +} + +/// Test that the inbound downloader rejects blocks above the lookahead limit. +/// +/// TODO: also test that it rejects blocks behind the tip limit. (Needs ~100 fake blocks.) +#[tokio::test] +async fn inbound_block_height_lookahead_limit() -> Result<(), crate::BoxError> { + // Get services + let ( + inbound_service, + _mempool, + _committed_blocks, + _added_transactions, + mut tx_verifier, + mut peer_set, + state_service, + sync_gossip_task_handle, + tx_gossip_task_handle, + ) = setup(false).await; + + // Get the next block + let block: Arc = zebra_test::vectors::BLOCK_MAINNET_2_BYTES.zcash_deserialize_into()?; + let block_hash = block.hash(); + + // Push test block hash + let _request = inbound_service + .clone() + .oneshot(Request::AdvertiseBlock(block_hash)) + .await?; + + // Block is fetched, and committed to the state + peer_set + .expect_request(Request::BlocksByHash(iter::once(block_hash).collect())) + .await + .respond(Response::Blocks(vec![Available(block)])); + + // TODO: check that the block is queued in the checkpoint verifier + + // check that nothing unexpected happened + peer_set.expect_no_requests().await; + tx_verifier.expect_no_requests().await; + + // Get a block that is a long way away from genesis + let block: Arc = + zebra_test::vectors::BLOCK_MAINNET_982681_BYTES.zcash_deserialize_into()?; + let block_hash = block.hash(); + + // Push test block hash + let _request = inbound_service + .clone() + .oneshot(Request::AdvertiseBlock(block_hash)) + .await?; + + // Block is fetched, but the downloader drops it because it is too high + peer_set + .expect_request(Request::BlocksByHash(iter::once(block_hash).collect())) + .await + .respond(Response::Blocks(vec![Available(block)])); + + let response = state_service + .clone() + .oneshot(zebra_state::Request::Depth(block_hash)) + .await?; + assert_eq!(response, zebra_state::Response::Depth(None)); + + // TODO: check that the block is not queued in the checkpoint verifier or non-finalized state + + // check that nothing unexpected happened + peer_set.expect_no_requests().await; + tx_verifier.expect_no_requests().await; + + let sync_gossip_result = sync_gossip_task_handle.now_or_never(); + assert!( + matches!(sync_gossip_result, None), + "unexpected error or panic in sync gossip task: {:?}", + sync_gossip_result, + ); + + let tx_gossip_result = tx_gossip_task_handle.now_or_never(); + assert!( + matches!(tx_gossip_result, None), + "unexpected error or panic in transaction gossip task: {:?}", + tx_gossip_result, + ); + + Ok(()) +} + +/// Setup a fake Zebra network stack, with fake peer set. +/// +/// Adds some initial state blocks, and mempool transactions if `add_transactions` is true. +/// +/// Uses a real block verifier, but a fake transaction verifier. +/// Does not run a block syncer task. +async fn setup( + add_transactions: bool, +) -> ( + Buffer< + BoxService, + zebra_network::Request, + >, + Buffer, mempool::Request>, + Vec>, + Vec, + MockService, + MockService, + Buffer, zebra_state::Request>, + JoinHandle>, + JoinHandle>, +) { + zebra_test::init(); + + let network = Network::Mainnet; + let consensus_config = ConsensusConfig::default(); + let state_config = StateConfig::ephemeral(); + let address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Span::none()); + let address_book = Arc::new(std::sync::Mutex::new(address_book)); + let (sync_status, mut recent_syncs) = SyncStatus::new(); + let (state, latest_chain_tip, chain_tip_change) = + zebra_state::init(state_config.clone(), network); + + let mut state_service = ServiceBuilder::new().buffer(1).service(state); + + // Download task panics and timeouts are propagated to the tests that use Groth16 verifiers. + let (block_verifier, _transaction_verifier, _groth16_download_handle) = + zebra_consensus::chain::init( + consensus_config.clone(), + network, + state_service.clone(), + true, + ) + .await; + + let mut peer_set = MockService::build() + .with_max_request_delay(MAX_PEER_SET_REQUEST_DELAY) + .for_unit_tests(); + let buffered_peer_set = Buffer::new(BoxService::new(peer_set.clone()), 10); + + let mock_tx_verifier = MockService::build().for_unit_tests(); + let buffered_tx_verifier = Buffer::new(BoxService::new(mock_tx_verifier.clone()), 10); + + let mut committed_blocks = Vec::new(); + + // Push the genesis block to the state. + // This must be done before creating the mempool to avoid `chain_tip_change` + // returning "reset" which would clear the mempool. + let genesis_block: Arc = zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES + .zcash_deserialize_into() + .unwrap(); + state_service + .ready() + .await + .unwrap() + .call(zebra_state::Request::CommitFinalizedBlock( + genesis_block.clone().into(), + )) + .await + .unwrap(); + committed_blocks.push(genesis_block); + + // Also push block 1. + // Block one is a network upgrade and the mempool will be cleared at it, + // let all our tests start after this event. + let block_one: Arc = zebra_test::vectors::BLOCK_MAINNET_1_BYTES + .zcash_deserialize_into() + .unwrap(); + state_service + .clone() + .oneshot(zebra_state::Request::CommitFinalizedBlock( + block_one.clone().into(), + )) + .await + .unwrap(); + committed_blocks.push(block_one); + + let (mut mempool_service, transaction_receiver) = Mempool::new( + &mempool::Config::default(), + buffered_peer_set.clone(), + state_service.clone(), + buffered_tx_verifier.clone(), + sync_status.clone(), + latest_chain_tip.clone(), + chain_tip_change.clone(), + ); + + // Enable the mempool + mempool_service.enable(&mut recent_syncs).await; + + // Add transactions to the mempool, skipping verification and broadcast + let mut added_transactions = Vec::new(); + if add_transactions { + added_transactions.extend(add_some_stuff_to_mempool(&mut mempool_service, network)); + } + + let mempool_service = BoxService::new(mempool_service); + let mempool_service = ServiceBuilder::new().buffer(1).service(mempool_service); + + let (setup_tx, setup_rx) = oneshot::channel(); + + let inbound_service = ServiceBuilder::new() + .load_shed() + .service(Inbound::new(setup_rx)); + let inbound_service = BoxService::new(inbound_service); + let inbound_service = ServiceBuilder::new().buffer(1).service(inbound_service); + + let setup_data = InboundSetupData { + address_book, + block_download_peer_set: buffered_peer_set, + block_verifier, + mempool: mempool_service.clone(), + state: state_service.clone(), + latest_chain_tip, + }; + let r = setup_tx.send(setup_data); + // We can't expect or unwrap because the returned Result does not implement Debug + assert!(r.is_ok(), "unexpected setup channel send failure"); + + let sync_gossip_task_handle = tokio::spawn(sync::gossip_best_tip_block_hashes( + sync_status.clone(), + chain_tip_change, + peer_set.clone(), + )); + + let tx_gossip_task_handle = tokio::spawn(gossip_mempool_transaction_id( + transaction_receiver, + peer_set.clone(), + )); + + // Make sure there is an additional request broadcasting the + // committed blocks to peers. + // + // (The genesis block gets skipped, because block 1 is committed before the task is spawned.) + for block in committed_blocks.iter().skip(1) { + peer_set + .expect_request(Request::AdvertiseBlock(block.hash())) + .await + .respond(Response::Nil); + } + + ( + inbound_service, + mempool_service, + committed_blocks, + added_transactions, + mock_tx_verifier, + peer_set, + state_service, + sync_gossip_task_handle, + tx_gossip_task_handle, + ) +} + +/// Manually add a transaction to the mempool storage. +/// +/// Skips some mempool functionality, like transaction verification and peer broadcasts. +fn add_some_stuff_to_mempool( + mempool_service: &mut Mempool, + network: Network, +) -> Vec { + // get the genesis block coinbase transaction from the Zcash blockchain. + let genesis_transactions: Vec<_> = unmined_transactions_in_blocks(..=0, network) + .take(1) + .collect(); + + // Insert the genesis block coinbase transaction into the mempool storage. + mempool_service + .storage() + .insert(genesis_transactions[0].clone()) + .unwrap(); + + genesis_transactions +} diff --git a/zebrad/src/components/inbound/tests/real_peer_set.rs b/zebrad/src/components/inbound/tests/real_peer_set.rs new file mode 100644 index 00000000..c4746a9e --- /dev/null +++ b/zebrad/src/components/inbound/tests/real_peer_set.rs @@ -0,0 +1,498 @@ +//! Inbound service tests with a real peer set. + +use std::{collections::HashSet, iter, net::SocketAddr, sync::Arc}; + +use futures::FutureExt; +use tokio::{sync::oneshot, task::JoinHandle}; +use tower::{ + buffer::Buffer, + builder::ServiceBuilder, + util::{BoxCloneService, BoxService}, + ServiceExt, +}; + +use zebra_chain::{ + block::{self, Block}, + parameters::Network, + transaction::{AuthDigest, Hash as TxHash, UnminedTxId, WtxId}, +}; +use zebra_consensus::{chain::VerifyChainError, error::TransactionError, transaction}; +use zebra_network::{ + connect_isolated_tcp_direct, Config as NetworkConfig, Request, Response, ResponseStatus, + SharedPeerError, +}; +use zebra_state::Config as StateConfig; +use zebra_test::mock_service::{MockService, PanicAssertion}; + +use crate::{ + components::{ + inbound::{Inbound, InboundSetupData}, + mempool::{self, gossip_mempool_transaction_id, Mempool}, + sync::{self, BlockGossipError, SyncStatus}, + }, + BoxError, +}; + +use ResponseStatus::*; + +/// Check that a network stack with an empty address book only contains the local listener port, +/// by querying the inbound service via a local TCP connection. +/// +/// Uses a real Zebra network stack with a local listener address, +/// and an isolated Zebra inbound TCP connection. +#[tokio::test] +async fn inbound_peers_empty_address_book() -> Result<(), crate::BoxError> { + let ( + // real services + connected_peer_service, + inbound_service, + _peer_set, + _mempool_service, + _state_service, + // mocked services + _mock_block_verifier, + _mock_tx_verifier, + // real tasks + block_gossip_task_handle, + tx_gossip_task_handle, + // real open socket addresses + listen_addr, + ) = setup().await; + + // Use inbound directly + let request = inbound_service.clone().oneshot(Request::Peers); + let response = request.await; + match response.as_ref() { + Ok(Response::Peers(single_peer)) if single_peer.len() == 1 => { + assert_eq!(single_peer.first().unwrap().addr(), listen_addr) + } + Ok(Response::Peers(_peer_list)) => unreachable!( + "`Peers` response should contain a single peer, \ + actual result: {:?}", + response + ), + _ => unreachable!( + "`Peers` requests should always respond `Ok(Response::Peers(_))`, \ + actual result: {:?}", + response + ), + }; + + // Use the connected peer via a local TCP connection + let request = connected_peer_service.clone().oneshot(Request::Peers); + let response = request.await; + match response.as_ref() { + Ok(Response::Peers(single_peer)) if single_peer.len() == 1 => { + assert_eq!(single_peer.first().unwrap().addr(), listen_addr) + } + Ok(Response::Peers(_peer_list)) => unreachable!( + "`Peers` response should contain a single peer, \ + actual result: {:?}", + response + ), + _ => unreachable!( + "`Peers` requests should always respond `Ok(Response::Peers(_))`, \ + actual result: {:?}", + response + ), + }; + + let block_gossip_result = block_gossip_task_handle.now_or_never(); + assert!( + matches!(block_gossip_result, None), + "unexpected error or panic in block gossip task: {:?}", + block_gossip_result, + ); + + let tx_gossip_result = tx_gossip_task_handle.now_or_never(); + assert!( + matches!(tx_gossip_result, None), + "unexpected error or panic in transaction gossip task: {:?}", + tx_gossip_result, + ); + + Ok(()) +} + +/// Check that a network stack with an empty state responds to block requests with `notfound`. +/// +/// Uses a real Zebra network stack, with an isolated Zebra inbound TCP connection. +#[tokio::test] +async fn inbound_block_empty_state_notfound() -> Result<(), crate::BoxError> { + let ( + // real services + connected_peer_service, + inbound_service, + _peer_set, + _mempool_service, + _state_service, + // mocked services + _mock_block_verifier, + _mock_tx_verifier, + // real tasks + block_gossip_task_handle, + tx_gossip_task_handle, + // real open socket addresses + _listen_addr, + ) = setup().await; + + let test_block = block::Hash([0x11; 32]); + + // Use inbound directly + let request = inbound_service + .clone() + .oneshot(Request::BlocksByHash(iter::once(test_block).collect())); + let response = request.await; + match response.as_ref() { + Ok(Response::Blocks(single_block)) if single_block.len() == 1 => { + assert_eq!(single_block.first().unwrap(), &Missing(test_block)); + } + Ok(Response::Blocks(_block_list)) => unreachable!( + "`BlocksByHash` response should contain a single block, \ + actual result: {:?}", + response + ), + _ => unreachable!( + "inbound service should respond to `BlocksByHash` with `Ok(Response::Blocks(_))`, \ + actual result: {:?}", + response + ), + }; + + // Use the connected peer via a local TCP connection + let request = connected_peer_service + .clone() + .oneshot(Request::BlocksByHash(iter::once(test_block).collect())); + let response = request.await; + match response.as_ref() { + Err(missing_error) => { + let missing_error = missing_error + .downcast_ref::() + .expect("unexpected inner error type, expected SharedPeerError"); + assert_eq!( + missing_error.inner_debug(), + "NotFound([Block(block::Hash(\"1111111111111111111111111111111111111111111111111111111111111111\"))])" + ); + } + _ => unreachable!( + "peer::Connection should map missing `BlocksByHash` responses as `Err(SharedPeerError(NotFound(_)))`, \ + actual result: {:?}", + response + ), + }; + + let block_gossip_result = block_gossip_task_handle.now_or_never(); + assert!( + matches!(block_gossip_result, None), + "unexpected error or panic in block gossip task: {:?}", + block_gossip_result, + ); + + let tx_gossip_result = tx_gossip_task_handle.now_or_never(); + assert!( + matches!(tx_gossip_result, None), + "unexpected error or panic in transaction gossip task: {:?}", + tx_gossip_result, + ); + + Ok(()) +} + +/// Check that a network stack with an empty state responds to single transaction requests with `notfound`. +/// +/// Uses a real Zebra network stack, with an isolated Zebra inbound TCP connection. +/// +/// TODO: test a response with some Available and some Missing transactions. +#[tokio::test] +async fn inbound_tx_empty_state_notfound() -> Result<(), crate::BoxError> { + let ( + // real services + connected_peer_service, + inbound_service, + _peer_set, + _mempool_service, + _state_service, + // mocked services + _mock_block_verifier, + _mock_tx_verifier, + // real tasks + block_gossip_task_handle, + tx_gossip_task_handle, + // real open socket addresses + _listen_addr, + ) = setup().await; + + let test_tx = UnminedTxId::from_legacy_id(TxHash([0x22; 32])); + let test_wtx: UnminedTxId = WtxId { + id: TxHash([0x33; 32]), + auth_digest: AuthDigest([0x44; 32]), + } + .into(); + + // Test both transaction ID variants, separately and together + for txs in [vec![test_tx], vec![test_wtx], vec![test_tx, test_wtx]] { + // Use inbound directly + let request = inbound_service + .clone() + .oneshot(Request::TransactionsById(txs.iter().copied().collect())); + let response = request.await; + match response.as_ref() { + Ok(Response::Transactions(response_txs)) => { + // The response order is unstable, because it depends on concurrent inbound futures. + // In #2244 we will fix this by replacing response Vecs with HashSets. + for tx in &txs { + assert!( + response_txs.contains(&Missing(*tx)), + "expected {:?}, but it was not in the response", + tx + ); + } + assert_eq!(response_txs.len(), txs.len()); + } + _ => unreachable!( + "inbound service should respond to `TransactionsById` with `Ok(Response::Transactions(_))`, \ + actual result: {:?}", + response + ), + }; + + // Use the connected peer via a local TCP connection + let request = connected_peer_service + .clone() + .oneshot(Request::TransactionsById(txs.iter().copied().collect())); + let response = request.await; + match response.as_ref() { + Err(missing_error) => { + let missing_error = missing_error + .downcast_ref::() + .expect("unexpected inner error type, expected SharedPeerError"); + + // Unfortunately, we can't access SharedPeerError's inner type, + // so we can't compare the actual responses. + if txs == vec![test_tx] { + assert_eq!( + missing_error.inner_debug(), + "NotFound([Tx(transaction::Hash(\"2222222222222222222222222222222222222222222222222222222222222222\"))])", + ); + } else if txs == vec![test_wtx] { + assert_eq!( + missing_error.inner_debug(), + "NotFound([Wtx(WtxId { id: transaction::Hash(\"3333333333333333333333333333333333333333333333333333333333333333\"), auth_digest: AuthDigest(\"4444444444444444444444444444444444444444444444444444444444444444\") })])", + ); + } else if txs == vec![test_tx, test_wtx] { + // The response order is unstable, because it depends on concurrent inbound futures. + // In #2244 we will fix this by replacing response Vecs with HashSets. + assert!( + missing_error.inner_debug() == + "NotFound([Tx(transaction::Hash(\"2222222222222222222222222222222222222222222222222222222222222222\")), Wtx(WtxId { id: transaction::Hash(\"3333333333333333333333333333333333333333333333333333333333333333\"), auth_digest: AuthDigest(\"4444444444444444444444444444444444444444444444444444444444444444\") })])" + || + missing_error.inner_debug() == + "NotFound([Wtx(WtxId { id: transaction::Hash(\"3333333333333333333333333333333333333333333333333333333333333333\"), auth_digest: AuthDigest(\"4444444444444444444444444444444444444444444444444444444444444444\") }), Tx(transaction::Hash(\"2222222222222222222222222222222222222222222222222222222222222222\"))])", + "unexpected response: {:?}", + missing_error.inner_debug(), + ); + } else { + unreachable!("unexpected test case"); + } + } + _ => unreachable!( + "peer::Connection should map missing `TransactionsById` responses as `Err(SharedPeerError(NotFound(_)))`, \ + actual result: {:?}", + response + ), + }; + } + + let block_gossip_result = block_gossip_task_handle.now_or_never(); + assert!( + matches!(block_gossip_result, None), + "unexpected error or panic in block gossip task: {:?}", + block_gossip_result, + ); + + let tx_gossip_result = tx_gossip_task_handle.now_or_never(); + assert!( + matches!(tx_gossip_result, None), + "unexpected error or panic in transaction gossip task: {:?}", + tx_gossip_result, + ); + + Ok(()) +} + +/// Setup a real Zebra network stack, with a connected peer using a real isolated network stack. +/// +/// Uses fake verifiers, and does not run a block syncer task. +async fn setup() -> ( + // real services + // connected peer + Buffer< + BoxService, + zebra_network::Request, + >, + // inbound service + BoxCloneService, + // outbound peer set (only has the connected peer) + Buffer< + BoxService, + zebra_network::Request, + >, + Buffer, mempool::Request>, + Buffer, zebra_state::Request>, + // mocked services + MockService, block::Hash, PanicAssertion, VerifyChainError>, + MockService, + // real tasks + JoinHandle>, + JoinHandle>, + // real open socket addresses + SocketAddr, +) { + zebra_test::init(); + + let network = Network::Mainnet; + // Open a listener on an unused IPv4 localhost port + let config_listen_addr = "127.0.0.1:0".parse().unwrap(); + + // Inbound + let (setup_tx, setup_rx) = oneshot::channel(); + let inbound_service = Inbound::new(setup_rx); + let inbound_service = ServiceBuilder::new() + .boxed_clone() + .load_shed() + .buffer(10) + .service(inbound_service); + + // State + let state_config = StateConfig::ephemeral(); + let (state_service, latest_chain_tip, chain_tip_change) = + zebra_state::init(state_config, network); + let state_service = ServiceBuilder::new().buffer(10).service(state_service); + + // Network + let network_config = NetworkConfig { + network, + listen_addr: config_listen_addr, + + // Stop Zebra making outbound connections + initial_mainnet_peers: HashSet::new(), + initial_testnet_peers: HashSet::new(), + + ..NetworkConfig::default() + }; + let (mut peer_set, address_book) = zebra_network::init( + network_config, + inbound_service.clone(), + latest_chain_tip.clone(), + ) + .await; + + // Inbound listener + let listen_addr = address_book + .lock() + .unwrap() + .local_listener_meta_addr() + .addr(); + assert_ne!( + listen_addr.port(), + 0, + "dynamic ports are replaced with OS-assigned ports" + ); + assert_eq!( + listen_addr.ip(), + config_listen_addr.ip(), + "IP addresses are correctly propagated" + ); + + // Fake syncer + let (sync_status, mut recent_syncs) = SyncStatus::new(); + + // Fake verifiers + let mock_block_verifier = MockService::build().for_unit_tests(); + let buffered_block_verifier = ServiceBuilder::new() + .buffer(10) + .service(BoxService::new(mock_block_verifier.clone())); + let mock_tx_verifier = MockService::build().for_unit_tests(); + let buffered_tx_verifier = ServiceBuilder::new() + .buffer(10) + .service(BoxService::new(mock_tx_verifier.clone())); + + // Mempool + let mempool_config = mempool::Config::default(); + let (mut mempool_service, transaction_receiver) = Mempool::new( + &mempool_config, + peer_set.clone(), + state_service.clone(), + buffered_tx_verifier.clone(), + sync_status.clone(), + latest_chain_tip.clone(), + chain_tip_change.clone(), + ); + + // Enable the mempool + mempool_service.enable(&mut recent_syncs).await; + let mempool_service = ServiceBuilder::new() + .buffer(10) + .boxed() + // boxed() needs this extra tiny buffer + .buffer(1) + .service(mempool_service); + + // Initialize the inbound service + let setup_data = InboundSetupData { + address_book, + block_download_peer_set: peer_set.clone(), + block_verifier: buffered_block_verifier, + mempool: mempool_service.clone(), + state: state_service.clone(), + latest_chain_tip, + }; + let r = setup_tx.send(setup_data); + // We can't expect or unwrap because the returned Result does not implement Debug + assert!(r.is_ok(), "unexpected setup channel send failure"); + + let block_gossip_task_handle = tokio::spawn(sync::gossip_best_tip_block_hashes( + sync_status.clone(), + chain_tip_change, + peer_set.clone(), + )); + + let tx_gossip_task_handle = tokio::spawn(gossip_mempool_transaction_id( + transaction_receiver, + peer_set.clone(), + )); + + // Open a fake peer connection to the inbound listener, using the isolated connection API + let connected_peer_service = + connect_isolated_tcp_direct(network, listen_addr, "test".to_string()) + .await + .expect("local listener connection succeeds"); + let connected_peer_service = ServiceBuilder::new() + .buffer(10) + .service(connected_peer_service); + + // Make the peer set find the new peer + let _ = peer_set + .ready() + .await + .expect("peer set becomes ready without errors"); + + // there is no syncer task, and the verifiers are fake, + // but the network stack is all real + ( + // real services + connected_peer_service, + inbound_service, + peer_set, + mempool_service, + state_service, + // mocked services + mock_block_verifier, + mock_tx_verifier, + // real tasks + block_gossip_task_handle, + tx_gossip_task_handle, + // real open socket addresses + listen_addr, + ) +} diff --git a/zebrad/src/components/mempool/downloads.rs b/zebrad/src/components/mempool/downloads.rs index eedab09c..0720fd3f 100644 --- a/zebrad/src/components/mempool/downloads.rs +++ b/zebrad/src/components/mempool/downloads.rs @@ -322,6 +322,10 @@ where _ => unreachable!("wrong response to transaction request"), }; + let tx = tx.available().expect( + "unexpected missing tx status: single tx failures should be errors", + ); + metrics::counter!( "mempool.downloaded.transactions.total", 1, diff --git a/zebrad/src/components/sync/downloads.rs b/zebrad/src/components/sync/downloads.rs index 57c8c9e4..1ba2ded0 100644 --- a/zebrad/src/components/sync/downloads.rs +++ b/zebrad/src/components/sync/downloads.rs @@ -1,3 +1,5 @@ +//! A download stream for Zebra's block syncer. + use std::{ collections::HashMap, convert::TryFrom, @@ -259,9 +261,10 @@ where ); blocks - .into_iter() - .next() - .expect("successful response has the block in it") + .first() + .expect("just checked length") + .available() + .expect("unexpected missing block status: single block failures should be errors") } else { unreachable!("wrong response to block request"); }; diff --git a/zebrad/src/components/sync/tests/vectors.rs b/zebrad/src/components/sync/tests/vectors.rs index 0427b8a5..b0ad3257 100644 --- a/zebrad/src/components/sync/tests/vectors.rs +++ b/zebrad/src/components/sync/tests/vectors.rs @@ -11,6 +11,7 @@ use zebra_chain::{ serialization::ZcashDeserializeInto, }; use zebra_consensus::Config as ConsensusConfig; +use zebra_network::ResponseStatus; use zebra_state::Config as StateConfig; use zebra_test::mock_service::{MockService, PanicAssertion}; @@ -25,18 +26,20 @@ use crate::{ config::ZebradConfig, }; +use ResponseStatus::*; + /// Maximum time to wait for a request to any test service. /// /// The default [`MockService`] value can be too short for some of these tests that take a little /// longer than expected to actually send the request. /// /// Increasing this value causes the tests to take longer to complete, so it can't be too large. -const MAX_SERVICE_REQUEST_DELAY: Duration = Duration::from_millis(500); +const MAX_SERVICE_REQUEST_DELAY: Duration = Duration::from_millis(1000); /// Test that the syncer downloads genesis, blocks 1-2 using obtain_tips, and blocks 3-4 using extend_tips. /// /// This test also makes sure that the syncer downloads blocks in order. -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn sync_blocks_ok() -> Result<(), crate::BoxError> { // Get services let ( @@ -83,7 +86,7 @@ async fn sync_blocks_ok() -> Result<(), crate::BoxError> { peer_set .expect_request(zn::Request::BlocksByHash(iter::once(block0_hash).collect())) .await - .respond(zn::Response::Blocks(vec![block0.clone()])); + .respond(zn::Response::Blocks(vec![Available(block0.clone())])); chain_verifier .expect_request(block0) @@ -157,11 +160,11 @@ async fn sync_blocks_ok() -> Result<(), crate::BoxError> { peer_set .expect_request(zn::Request::BlocksByHash(iter::once(block1_hash).collect())) .await - .respond(zn::Response::Blocks(vec![block1.clone()])); + .respond(zn::Response::Blocks(vec![Available(block1.clone())])); peer_set .expect_request(zn::Request::BlocksByHash(iter::once(block2_hash).collect())) .await - .respond(zn::Response::Blocks(vec![block2.clone()])); + .respond(zn::Response::Blocks(vec![Available(block2.clone())])); // We can't guarantee the verification request order let mut remaining_blocks: HashMap> = @@ -221,11 +224,11 @@ async fn sync_blocks_ok() -> Result<(), crate::BoxError> { peer_set .expect_request(zn::Request::BlocksByHash(iter::once(block3_hash).collect())) .await - .respond(zn::Response::Blocks(vec![block3.clone()])); + .respond(zn::Response::Blocks(vec![Available(block3.clone())])); peer_set .expect_request(zn::Request::BlocksByHash(iter::once(block4_hash).collect())) .await - .respond(zn::Response::Blocks(vec![block4.clone()])); + .respond(zn::Response::Blocks(vec![Available(block4.clone())])); // We can't guarantee the verification request order let mut remaining_blocks: HashMap> = @@ -264,7 +267,7 @@ async fn sync_blocks_ok() -> Result<(), crate::BoxError> { /// with duplicate block hashes. /// /// This test also makes sure that the syncer downloads blocks in order. -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn sync_blocks_duplicate_hashes_ok() -> Result<(), crate::BoxError> { // Get services let ( @@ -311,7 +314,7 @@ async fn sync_blocks_duplicate_hashes_ok() -> Result<(), crate::BoxError> { peer_set .expect_request(zn::Request::BlocksByHash(iter::once(block0_hash).collect())) .await - .respond(zn::Response::Blocks(vec![block0.clone()])); + .respond(zn::Response::Blocks(vec![Available(block0.clone())])); chain_verifier .expect_request(block0) @@ -387,11 +390,11 @@ async fn sync_blocks_duplicate_hashes_ok() -> Result<(), crate::BoxError> { peer_set .expect_request(zn::Request::BlocksByHash(iter::once(block1_hash).collect())) .await - .respond(zn::Response::Blocks(vec![block1.clone()])); + .respond(zn::Response::Blocks(vec![Available(block1.clone())])); peer_set .expect_request(zn::Request::BlocksByHash(iter::once(block2_hash).collect())) .await - .respond(zn::Response::Blocks(vec![block2.clone()])); + .respond(zn::Response::Blocks(vec![Available(block2.clone())])); // We can't guarantee the verification request order let mut remaining_blocks: HashMap> = @@ -453,11 +456,11 @@ async fn sync_blocks_duplicate_hashes_ok() -> Result<(), crate::BoxError> { peer_set .expect_request(zn::Request::BlocksByHash(iter::once(block3_hash).collect())) .await - .respond(zn::Response::Blocks(vec![block3.clone()])); + .respond(zn::Response::Blocks(vec![Available(block3.clone())])); peer_set .expect_request(zn::Request::BlocksByHash(iter::once(block4_hash).collect())) .await - .respond(zn::Response::Blocks(vec![block4.clone()])); + .respond(zn::Response::Blocks(vec![Available(block4.clone())])); // We can't guarantee the verification request order let mut remaining_blocks: HashMap> = @@ -527,7 +530,7 @@ async fn sync_block_wrong_hash() -> Result<(), crate::BoxError> { peer_set .expect_request(zn::Request::BlocksByHash(iter::once(block0_hash).collect())) .await - .respond(zn::Response::Blocks(vec![block982k.clone()])); + .respond(zn::Response::Blocks(vec![Available(block982k.clone())])); // Block is dropped because it has the wrong hash. // We expect more requests to the state service, because the syncer keeps on running. @@ -547,7 +550,7 @@ async fn sync_block_wrong_hash() -> Result<(), crate::BoxError> { /// Test that the sync downloader rejects blocks that are too high in obtain_tips. /// /// TODO: also test that it rejects blocks behind the tip limit. (Needs ~100 fake blocks.) -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn sync_block_too_high_obtain_tips() -> Result<(), crate::BoxError> { // Get services let ( @@ -593,7 +596,7 @@ async fn sync_block_too_high_obtain_tips() -> Result<(), crate::BoxError> { peer_set .expect_request(zn::Request::BlocksByHash(iter::once(block0_hash).collect())) .await - .respond(zn::Response::Blocks(vec![block0.clone()])); + .respond(zn::Response::Blocks(vec![Available(block0.clone())])); chain_verifier .expect_request(block0) @@ -675,15 +678,15 @@ async fn sync_block_too_high_obtain_tips() -> Result<(), crate::BoxError> { iter::once(block982k_hash).collect(), )) .await - .respond(zn::Response::Blocks(vec![block982k.clone()])); + .respond(zn::Response::Blocks(vec![Available(block982k.clone())])); peer_set .expect_request(zn::Request::BlocksByHash(iter::once(block1_hash).collect())) .await - .respond(zn::Response::Blocks(vec![block1.clone()])); + .respond(zn::Response::Blocks(vec![Available(block1.clone())])); peer_set .expect_request(zn::Request::BlocksByHash(iter::once(block2_hash).collect())) .await - .respond(zn::Response::Blocks(vec![block2.clone()])); + .respond(zn::Response::Blocks(vec![Available(block2.clone())])); // At this point, the following tasks race: // - The valid chain verifier requests @@ -703,7 +706,7 @@ async fn sync_block_too_high_obtain_tips() -> Result<(), crate::BoxError> { /// Test that the sync downloader rejects blocks that are too high in extend_tips. /// /// TODO: also test that it rejects blocks behind the tip limit. (Needs ~100 fake blocks.) -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn sync_block_too_high_extend_tips() -> Result<(), crate::BoxError> { // Get services let ( @@ -755,7 +758,7 @@ async fn sync_block_too_high_extend_tips() -> Result<(), crate::BoxError> { peer_set .expect_request(zn::Request::BlocksByHash(iter::once(block0_hash).collect())) .await - .respond(zn::Response::Blocks(vec![block0.clone()])); + .respond(zn::Response::Blocks(vec![Available(block0.clone())])); chain_verifier .expect_request(block0) @@ -829,11 +832,11 @@ async fn sync_block_too_high_extend_tips() -> Result<(), crate::BoxError> { peer_set .expect_request(zn::Request::BlocksByHash(iter::once(block1_hash).collect())) .await - .respond(zn::Response::Blocks(vec![block1.clone()])); + .respond(zn::Response::Blocks(vec![Available(block1.clone())])); peer_set .expect_request(zn::Request::BlocksByHash(iter::once(block2_hash).collect())) .await - .respond(zn::Response::Blocks(vec![block2.clone()])); + .respond(zn::Response::Blocks(vec![Available(block2.clone())])); // We can't guarantee the verification request order let mut remaining_blocks: HashMap> = @@ -895,17 +898,17 @@ async fn sync_block_too_high_extend_tips() -> Result<(), crate::BoxError> { peer_set .expect_request(zn::Request::BlocksByHash(iter::once(block3_hash).collect())) .await - .respond(zn::Response::Blocks(vec![block3.clone()])); + .respond(zn::Response::Blocks(vec![Available(block3.clone())])); peer_set .expect_request(zn::Request::BlocksByHash(iter::once(block4_hash).collect())) .await - .respond(zn::Response::Blocks(vec![block4.clone()])); + .respond(zn::Response::Blocks(vec![Available(block4.clone())])); peer_set .expect_request(zn::Request::BlocksByHash( iter::once(block982k_hash).collect(), )) .await - .respond(zn::Response::Blocks(vec![block982k.clone()])); + .respond(zn::Response::Blocks(vec![Available(block982k.clone())])); // At this point, the following tasks race: // - The valid chain verifier requests @@ -934,6 +937,8 @@ fn setup() -> ( MockService, MockChainTipSender, ) { + zebra_test::init(); + let consensus_config = ConsensusConfig::default(); let state_config = StateConfig::ephemeral(); let config = ZebradConfig {