From a4dd3b7396ef12d1777fd4a731ddd940004e75a9 Mon Sep 17 00:00:00 2001 From: teor Date: Tue, 15 Feb 2022 11:44:33 +1000 Subject: [PATCH] 4. Avoid repeated requests to peers after partial responses or errors (#3505) * fix(network): split synthetic NotFoundRegistry from message NotFoundResponse * docs(network): Improve `notfound` message documentation * refactor(network): Rename MustUseOneshotSender to MustUseClientResponseSender ``` fastmod MustUseOneshotSender MustUseClientResponseSender zebra* ``` * docs(network): fix a comment typo * refactor(network): remove generics from MustUseClientResponseSender * refactor(network): add an inventory collector to Client, but don't use it yet * feat(network): register missing peer responses as missing inventory We register this missing inventory based on peer responses, or connection errors or timeouts. Inbound message inventory tracking requires peers to send `notfound` messages. But `zcashd` skips `notfound` for blocks, so we can't rely on peer messages. This missing inventory tracking works regardless of peer `notfound` messages. * refactor(network): rename ResponseStatus to InventoryResponse ```sh fastmod ResponseStatus InventoryResponse zebra* ``` * refactor(network): rename InventoryStatus::inner() to to_inner() * fix(network): remove a redundant runtime.enter() in a test * doc(network): the exact time used to filter outbound peers doesn't matter * fix(network): handle block requests slightly more efficiently * doc(network): fix a typo * fmt(network): `cargo fmt` after rename ResponseStatus to InventoryResponse * doc(test): clarify some test comments * test(network): test synthetic notfound from connection errors and peer inventory routing * test(network): improve inbound test diagnostics * feat(network): add a proptest-impl feature to zebra-network * feat(network): add a test-only connect_isolated_with_inbound function * test(network): allow a response on the isolated peer test connection * test(network): fix failures in test synthetic notfound * test(network): Simplify SharedPeerError test assertions * test(network): test synthetic notfound from partially successful requests * test(network): MissingInventoryCollector ignores local NotFoundRegistry errors * fix(network): decrease the inventory rotation interval This stops us waiting 3-4 sync resets (4 minutes) before we retry a missing block. Now we wait 1-2 sync resets (2 minutes), which is still a reasonable rate limit. This should speed up syncing near the tip, and on testnet. * fmt(network): cargo fmt --all * cleanup(network): remove unnecessary allow(dead_code) * cleanup(network): stop importing the whole sync module into tests * doc(network): clarify syncer inventory retry constraint * doc(network): add a TODO for a fix to ensure API behaviour remains consistent * doc(network): fix a function doc typo * doc(network): clarify how we handle peers that don't send `notfound` * docs(network): clarify a test comment Co-authored-by: Janito Vaqueiro Ferreira Filho Co-authored-by: Janito Vaqueiro Ferreira Filho Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- zebra-chain/Cargo.toml | 6 +- zebra-consensus/Cargo.toml | 4 +- zebra-network/Cargo.toml | 10 +- zebra-network/src/constants.rs | 23 + zebra-network/src/isolated.rs | 71 ++- zebra-network/src/isolated/tor.rs | 46 +- zebra-network/src/lib.rs | 13 +- zebra-network/src/meta_addr/arbitrary.rs | 2 + zebra-network/src/peer.rs | 2 +- zebra-network/src/peer/client.rs | 215 ++++++++- zebra-network/src/peer/client/tests.rs | 54 ++- .../src/peer/client/tests/vectors.rs | 43 +- zebra-network/src/peer/connection.rs | 70 +-- .../src/peer/connection/tests/prop.rs | 7 +- .../src/peer/connection/tests/vectors.rs | 8 + zebra-network/src/peer/error.rs | 40 +- zebra-network/src/peer/handshake.rs | 13 +- .../src/peer/minimum_peer_version/tests.rs | 2 + .../src/peer_set/inventory_registry.rs | 23 +- .../peer_set/inventory_registry/tests/prop.rs | 1 - zebra-network/src/peer_set/set.rs | 6 +- .../src/peer_set/set/tests/vectors.rs | 2 +- zebra-network/src/protocol/external/addr.rs | 1 + .../src/protocol/external/message.rs | 8 +- zebra-network/src/protocol/external/types.rs | 8 +- zebra-network/src/protocol/internal.rs | 2 +- .../src/protocol/internal/request.rs | 26 + .../src/protocol/internal/response.rs | 17 +- .../src/protocol/internal/response_status.rs | 31 +- zebra-state/Cargo.toml | 4 +- zebrad/Cargo.toml | 5 +- zebrad/src/components/inbound.rs | 14 +- .../components/inbound/tests/fake_peer_set.rs | 4 +- .../components/inbound/tests/real_peer_set.rs | 445 +++++++++++++++--- zebrad/src/components/sync.rs | 2 + zebrad/src/components/sync/tests/timing.rs | 30 +- zebrad/src/components/sync/tests/vectors.rs | 4 +- 37 files changed, 1024 insertions(+), 238 deletions(-) diff --git a/zebra-chain/Cargo.toml b/zebra-chain/Cargo.toml index 0f0089ee..3652998f 100644 --- a/zebra-chain/Cargo.toml +++ b/zebra-chain/Cargo.toml @@ -55,7 +55,7 @@ zcash_note_encryption = { git = "https://github.com/ZcashFoundation/librustzcash zcash_primitives = { git = "https://github.com/ZcashFoundation/librustzcash.git", tag = "0.5.1-zebra-v1.0.0-beta.4" } zcash_history = { git = "https://github.com/ZcashFoundation/librustzcash.git", tag = "0.5.1-zebra-v1.0.0-beta.4" } -proptest = { version = "0.10", optional = true } +proptest = { version = "0.10.1", optional = true } proptest-derive = { version = "0.3.0", optional = true } rand = { version = "0.8", optional = true } @@ -76,8 +76,8 @@ itertools = "0.10.3" spandoc = "0.2" tracing = "0.1.29" -proptest = "0.10" -proptest-derive = "0.3" +proptest = "0.10.1" +proptest-derive = "0.3.0" rand = "0.8" rand_chacha = "0.3" diff --git a/zebra-consensus/Cargo.toml b/zebra-consensus/Cargo.toml index 0691684b..574c7384 100644 --- a/zebra-consensus/Cargo.toml +++ b/zebra-consensus/Cargo.toml @@ -47,13 +47,13 @@ zebra-chain = { path = "../zebra-chain" } zebra-state = { path = "../zebra-state" } zebra-script = { path = "../zebra-script" } -proptest = { version = "0.10", optional = true } +proptest = { version = "0.10.1", optional = true } proptest-derive = { version = "0.3.0", optional = true } [dev-dependencies] color-eyre = "0.5.11" hex = "0.4.3" -proptest = "0.10" +proptest = "0.10.1" proptest-derive = "0.3.0" rand07 = { package = "rand", version = "0.7" } spandoc = "0.2" diff --git a/zebra-network/Cargo.toml b/zebra-network/Cargo.toml index 337411d2..68ae71e5 100644 --- a/zebra-network/Cargo.toml +++ b/zebra-network/Cargo.toml @@ -10,6 +10,7 @@ edition = "2021" [features] default = [] tor = ["arti-client", "tor-rtcompat"] +proptest-impl = ["proptest", "proptest-derive", "zebra-chain/proptest-impl"] [dependencies] bitflags = "1.2" @@ -40,11 +41,16 @@ tracing-error = { version = "0.1.2", features = ["traced-error"] } arti-client = { version = "0.0.2", optional = true } tor-rtcompat = { version = "0.0.2", optional = true } +# proptest dependencies +proptest = { version = "0.10.1", optional = true } +proptest-derive = { version = "0.3.0", optional = true } + zebra-chain = { path = "../zebra-chain" } [dev-dependencies] -proptest = "0.10" -proptest-derive = "0.3" +proptest = "0.10.1" +proptest-derive = "0.3.0" + static_assertions = "1.1.0" tokio = { version = "1.16.1", features = ["test-util"] } toml = "0.5" diff --git a/zebra-network/src/constants.rs b/zebra-network/src/constants.rs index 449ec468..6c09ee61 100644 --- a/zebra-network/src/constants.rs +++ b/zebra-network/src/constants.rs @@ -95,6 +95,11 @@ pub const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(4); /// specific manner that matches up with this math. pub const MIN_PEER_RECONNECTION_DELAY: Duration = Duration::from_secs(59 + 20 + 20 + 20); +/// Zebra rotates its peer inventory registry every time this interval elapses. +/// +/// After 2 of these intervals, Zebra's local available and missing inventory entries expire. +pub const INVENTORY_ROTATION_INTERVAL: Duration = Duration::from_secs(53); + /// The default peer address crawler interval. /// /// This should be at least [`HANDSHAKE_TIMEOUT`](constants::HANDSHAKE_TIMEOUT) @@ -309,6 +314,8 @@ mod tests { use std::convert::TryFrom; + use zebra_chain::parameters::POST_BLOSSOM_POW_TARGET_SPACING; + use super::*; /// This assures that the `Duration` value we are computing for @@ -394,4 +401,20 @@ mod tests { "the address book limit should actually be used" ); } + + /// Make sure inventory registry rotation is consistent with the target block interval. + #[test] + fn ensure_inventory_rotation_consistent() { + zebra_test::init(); + + assert!( + INVENTORY_ROTATION_INTERVAL + < Duration::from_secs( + POST_BLOSSOM_POW_TARGET_SPACING + .try_into() + .expect("non-negative"), + ), + "we should expire inventory every time 1-2 new blocks get generated" + ); + } } diff --git a/zebra-network/src/isolated.rs b/zebra-network/src/isolated.rs index d3d65789..0d4130bb 100644 --- a/zebra-network/src/isolated.rs +++ b/zebra-network/src/isolated.rs @@ -6,7 +6,7 @@ use futures::future::TryFutureExt; use tokio::io::{AsyncRead, AsyncWrite}; use tower::{ util::{BoxService, Oneshot}, - ServiceExt, + Service, ServiceExt, }; use zebra_chain::{chain_tip::NoChainTip, parameters::Network}; @@ -32,11 +32,11 @@ mod tests; /// this low-level API is useful for custom network crawlers or Tor connections. /// /// In addition to being completely isolated from all other node state, this -/// method also aims to be minimally distinguishable from other clients. +/// function also aims to be minimally distinguishable from other clients. /// /// SECURITY TODO: check if the timestamp field can be zeroed, to remove another distinguisher (#3300) /// -/// Note that this method does not implement any timeout behavior, so callers may +/// Note that this function does not implement any timeout behavior, so callers may /// want to layer it with a timeout as appropriate for their application. /// /// # Inputs @@ -54,6 +54,37 @@ pub fn connect_isolated( ) -> impl Future, BoxError>> where PeerTransport: AsyncRead + AsyncWrite + Unpin + Send + 'static, +{ + let nil_inbound_service = + tower::service_fn(|_req| async move { Ok::(Response::Nil) }); + + connect_isolated_with_inbound(network, data_stream, user_agent, nil_inbound_service) +} + +/// Creates an isolated Zcash peer connection using the provided data stream. +/// This function is for testing purposes only. +/// +/// See [`connect_isolated`] for details. +/// +/// # Additional Inputs +/// +/// - `inbound_service`: a [`tower::Service`] that answers inbound requests from the connected peer. +/// +/// # Privacy +/// +/// This function can make the isolated connection send different responses to peers, +/// which makes it stand out from other isolated connections from other peers. +pub fn connect_isolated_with_inbound( + network: Network, + data_stream: PeerTransport, + user_agent: String, + inbound_service: InboundService, +) -> impl Future, BoxError>> +where + PeerTransport: AsyncRead + AsyncWrite + Unpin + Send + 'static, + InboundService: + Service + Clone + Send + 'static, + InboundService::Future: Send, { let config = Config { network, @@ -62,9 +93,7 @@ where let handshake = peer::Handshake::builder() .with_config(config) - .with_inbound_service(tower::service_fn(|_req| async move { - Ok::(Response::Nil) - })) + .with_inbound_service(inbound_service) .with_user_agent(user_agent) .with_latest_chain_tip(NoChainTip) .finish() @@ -101,7 +130,35 @@ pub fn connect_isolated_tcp_direct( addr: SocketAddr, user_agent: String, ) -> impl Future, BoxError>> { + let nil_inbound_service = + tower::service_fn(|_req| async move { Ok::(Response::Nil) }); + + connect_isolated_tcp_direct_with_inbound(network, addr, user_agent, nil_inbound_service) +} + +/// Creates an isolated Zcash peer connection using the provided data stream. +/// This function is for testing purposes only. +/// +/// See [`connect_isolated_with_inbound`] and [`connect_isolated_tcp_direct`] for details. +/// +/// # Privacy +/// +/// This function can make the isolated connection send different responses to peers, +/// which makes it stand out from other isolated connections from other peers. +pub fn connect_isolated_tcp_direct_with_inbound( + network: Network, + addr: SocketAddr, + user_agent: String, + inbound_service: InboundService, +) -> impl Future, BoxError>> +where + InboundService: + Service + Clone + Send + 'static, + InboundService::Future: Send, +{ tokio::net::TcpStream::connect(addr) .err_into() - .and_then(move |tcp_stream| connect_isolated(network, tcp_stream, user_agent)) + .and_then(move |tcp_stream| { + connect_isolated_with_inbound(network, tcp_stream, user_agent, inbound_service) + }) } diff --git a/zebra-network/src/isolated/tor.rs b/zebra-network/src/isolated/tor.rs index 781de8e2..e1ce2525 100644 --- a/zebra-network/src/isolated/tor.rs +++ b/zebra-network/src/isolated/tor.rs @@ -2,13 +2,13 @@ use std::sync::{Arc, Mutex}; -use arti_client::{TorAddr, TorClient, TorClientConfig}; +use arti_client::{DataStream, TorAddr, TorClient, TorClientConfig}; use tor_rtcompat::tokio::TokioRuntimeHandle; -use tower::util::BoxService; +use tower::{util::BoxService, Service}; use zebra_chain::parameters::Network; -use crate::{connect_isolated, BoxError, Request, Response}; +use crate::{connect_isolated, connect_isolated_with_inbound, BoxError, Request, Response}; #[cfg(test)] mod tests; @@ -45,6 +45,44 @@ pub async fn connect_isolated_tor( hostname: String, user_agent: String, ) -> Result, BoxError> { + let tor_stream = new_tor_stream(hostname).await?; + + // Calling connect_isolated_tor_with_inbound causes lifetime issues. + // + // TODO: fix the lifetime issues, and call connect_isolated_tor_with_inbound + // so the behaviour of both functions is consistent. + connect_isolated(network, tor_stream, user_agent).await +} + +/// Creates an isolated Zcash peer connection to `hostname` via Tor. +/// This function is for testing purposes only. +/// +/// See [`connect_isolated_with_inbound`] and [`connect_isolated_tor`] for details. +/// +/// # Privacy +/// +/// This function can make the isolated connection send different responses to peers, +/// which makes it stand out from other isolated connections from other peers. +pub async fn connect_isolated_tor_with_inbound( + network: Network, + hostname: String, + user_agent: String, + inbound_service: InboundService, +) -> Result, BoxError> +where + InboundService: + Service + Clone + Send + 'static, + InboundService::Future: Send, +{ + let tor_stream = new_tor_stream(hostname).await?; + + connect_isolated_with_inbound(network, tor_stream, user_agent, inbound_service).await +} + +/// Creates a Zcash peer connection to `hostname` via Tor, and returns a tor stream. +/// +/// See [`connect_isolated`] for details. +async fn new_tor_stream(hostname: String) -> Result { let addr = TorAddr::from(hostname)?; // Initialize or clone the shared tor client instance @@ -55,7 +93,7 @@ pub async fn connect_isolated_tor( let tor_stream = tor_client.connect(addr, None).await?; - connect_isolated(network, tor_stream, user_agent).await + Ok(tor_stream) } /// Returns a new tor client instance, and updates [`SHARED_TOR_CLIENT`]. diff --git a/zebra-network/src/lib.rs b/zebra-network/src/lib.rs index c7acae0e..aaf9576c 100644 --- a/zebra-network/src/lib.rs +++ b/zebra-network/src/lib.rs @@ -150,6 +150,14 @@ mod protocol; #[cfg(feature = "tor")] pub use crate::isolated::tor::connect_isolated_tor; +#[cfg(all(feature = "tor", any(test, feature = "proptest-impl")))] +pub use crate::isolated::tor::connect_isolated_tor_with_inbound; + +#[cfg(any(test, feature = "proptest-impl"))] +pub use crate::isolated::{ + connect_isolated_tcp_direct_with_inbound, connect_isolated_with_inbound, +}; + pub use crate::{ address_book::AddressBook, config::Config, @@ -158,10 +166,13 @@ pub use crate::{ peer::{HandshakeError, PeerError, SharedPeerError}, peer_set::init, policies::RetryLimit, - protocol::internal::{Request, Response, ResponseStatus}, + protocol::internal::{InventoryResponse, Request, Response}, }; /// Types used in the definition of [`Request`] and [`Response`] messages. pub mod types { pub use crate::{meta_addr::MetaAddr, protocol::types::PeerServices}; + + #[cfg(any(test, feature = "proptest-impl"))] + pub use crate::protocol::external::InventoryHash; } diff --git a/zebra-network/src/meta_addr/arbitrary.rs b/zebra-network/src/meta_addr/arbitrary.rs index 08d8f671..2090441d 100644 --- a/zebra-network/src/meta_addr/arbitrary.rs +++ b/zebra-network/src/meta_addr/arbitrary.rs @@ -12,12 +12,14 @@ use super::{MetaAddr, MetaAddrChange, PeerServices}; /// /// This should be at least twice the number of [`PeerAddrState`]s, so the tests /// can cover multiple transitions through every state. +#[allow(dead_code)] pub const MAX_ADDR_CHANGE: usize = 15; /// The largest number of random addresses we want to add to an [`AddressBook`]. /// /// This should be at least the number of [`PeerAddrState`]s, so the tests can /// cover interactions between addresses in different states. +#[allow(dead_code)] pub const MAX_META_ADDR: usize = 8; impl MetaAddr { diff --git a/zebra-network/src/peer.rs b/zebra-network/src/peer.rs index b434e7ac..8d4876de 100644 --- a/zebra-network/src/peer.rs +++ b/zebra-network/src/peer.rs @@ -16,7 +16,7 @@ pub(crate) use client::tests::ReceiveRequestAttempt; #[cfg(test)] pub(crate) use handshake::register_inventory_status; -use client::{ClientRequestReceiver, InProgressClientRequest, MustUseOneshotSender}; +use client::{ClientRequestReceiver, InProgressClientRequest, MustUseClientResponseSender}; pub(crate) use client::{CancelHeartbeatTask, ClientRequest}; diff --git a/zebra-network/src/peer/client.rs b/zebra-network/src/peer/client.rs index 1630747f..3d9f9dd1 100644 --- a/zebra-network/src/peer/client.rs +++ b/zebra-network/src/peer/client.rs @@ -1,7 +1,10 @@ //! Handles outbound requests from our node to the network. use std::{ + collections::HashSet, future::Future, + iter, + net::SocketAddr, pin::Pin, task::{Context, Poll}, }; @@ -12,13 +15,14 @@ use futures::{ stream::{Stream, StreamExt}, FutureExt, }; -use tokio::task::JoinHandle; +use tokio::{sync::broadcast, task::JoinHandle}; use tower::Service; use crate::{ peer::error::AlreadyErrored, + peer_set::InventoryChange, protocol::{ - external::types::Version, + external::{types::Version, InventoryHash}, internal::{Request, Response}, }, }; @@ -37,6 +41,13 @@ pub struct Client { /// Used to send [`Request`]s to the remote peer. pub(crate) server_tx: mpsc::Sender, + /// Used to register missing inventory in client [`Response`]s, + /// so that the peer set can route retries to other clients. + pub(crate) inv_collector: broadcast::Sender, + + /// The peer address for registering missing inventory. + pub(crate) transient_addr: Option, + /// A slot for an error shared between the Connection and the Client that uses it. /// /// `None` unless the connection or client have errored. @@ -69,6 +80,13 @@ pub(crate) struct ClientRequest { /// future that may be moved around before it resolves. pub tx: oneshot::Sender>, + /// Used to register missing inventory in responses on `tx`, + /// so that the peer set can route retries to other clients. + pub inv_collector: Option>, + + /// The peer address for registering missing inventory. + pub transient_addr: Option, + /// The tracing context for the request, so that work the connection task does /// processing messages in the context of this request will have correct context. pub span: tracing::Span, @@ -89,6 +107,7 @@ pub(super) struct ClientRequestReceiver { pub(super) struct InProgressClientRequest { /// The actual request. pub request: Request, + /// The return message channel, included because `peer::Client::call` returns a /// future that may be moved around before it resolves. /// @@ -99,27 +118,53 @@ pub(super) struct InProgressClientRequest { /// `Ok(())`, it will assume that it is safe to unconditionally poll the /// `Receiver` tied to the `Sender` used to create the `ClientRequest`. /// + /// We also take advantage of this invariant to route inventory requests + /// away from peers that did not respond with that inventory. + /// /// We enforce this invariant via the type system, by converting /// `ClientRequest`s to `InProgressClientRequest`s when they are received by /// the background task. These conversions are implemented by /// `ClientRequestReceiver`. - pub tx: MustUseOneshotSender>, + pub tx: MustUseClientResponseSender, + /// The tracing context for the request, so that work the connection task does /// processing messages in the context of this request will have correct context. pub span: tracing::Span, } -/// A oneshot::Sender that must be used by calling `send()`. +/// A `oneshot::Sender` for client responses, that must be used by calling `send()`. +/// Also handles forwarding missing inventory to the inventory registry. /// /// Panics on drop if `tx` has not been used or canceled. /// Panics if `tx.send()` is used more than once. #[derive(Debug)] #[must_use = "tx.send() must be called before drop"] -pub(super) struct MustUseOneshotSender { - /// The sender for the oneshot channel. +pub(super) struct MustUseClientResponseSender { + /// The sender for the oneshot client response channel. /// /// `None` if `tx.send()` has been used. - pub tx: Option>, + pub tx: Option>>, + + /// Forwards missing inventory in the response to the inventory collector. + /// + /// Boxed to reduce the size of containing structures. + pub missing_inv: Option>, +} + +/// Forwards missing inventory in the response to the inventory registry. +#[derive(Debug)] +pub(super) struct MissingInventoryCollector { + /// A clone of the original request, if it is an inventory request. + /// + /// This struct is only ever created with inventory requests. + request: Request, + + /// Used to register missing inventory from responses, + /// so that the peer set can route retries to other clients. + collector: broadcast::Sender, + + /// The peer address for registering missing inventory. + transient_addr: SocketAddr, } impl std::fmt::Debug for Client { @@ -133,12 +178,17 @@ impl std::fmt::Debug for Client { impl From for InProgressClientRequest { fn from(client_request: ClientRequest) -> Self { - let ClientRequest { request, tx, span } = client_request; - InProgressClientRequest { + let ClientRequest { request, - tx: tx.into(), + tx, + inv_collector, + transient_addr, span, - } + } = client_request; + + let tx = MustUseClientResponseSender::new(tx, &request, inv_collector, transient_addr); + + InProgressClientRequest { request, tx, span } } } @@ -199,26 +249,52 @@ impl From> for ClientRequestReceiver { } } -impl MustUseOneshotSender { - /// Forwards `t` to `tx.send()`, and marks this sender as used. +impl MustUseClientResponseSender { + /// Returns a newly created client response sender for `tx`. + /// + /// If `request` or the response contains missing inventory, + /// it is forwarded to the `inv_collector`, for the peer at `transient_addr`. + pub fn new( + tx: oneshot::Sender>, + request: &Request, + inv_collector: Option>, + transient_addr: Option, + ) -> Self { + Self { + tx: Some(tx), + missing_inv: MissingInventoryCollector::new(request, inv_collector, transient_addr), + } + } + + /// Forwards `response` to `tx.send()`, and missing inventory to `inv_collector`, + /// and marks this sender as used. /// /// Panics if `tx.send()` is used more than once. - pub fn send(mut self, t: T) -> Result<(), T> { + pub fn send( + mut self, + response: Result, + ) -> Result<(), Result> { + // Forward any missing inventory to the registry. + if let Some(missing_inv) = self.missing_inv.take() { + missing_inv.send(&response); + } + + // Forward the response to the internal requester. self.tx .take() .unwrap_or_else(|| { panic!( - "multiple uses of oneshot sender: oneshot must be used exactly once: {:?}", + "multiple uses of response sender: response must be sent exactly once: {:?}", self ) }) - .send(t) + .send(response) } /// Returns `tx.cancellation()`. /// /// Panics if `tx.send()` has previously been used. - pub fn cancellation(&mut self) -> oneshot::Cancellation<'_, T> { + pub fn cancellation(&mut self) -> oneshot::Cancellation<'_, Result> { self.tx .as_mut() .map(|tx| tx.cancellation()) @@ -239,13 +315,7 @@ impl MustUseOneshotSender { } } -impl From> for MustUseOneshotSender { - fn from(sender: oneshot::Sender) -> Self { - MustUseOneshotSender { tx: Some(sender) } - } -} - -impl Drop for MustUseOneshotSender { +impl Drop for MustUseClientResponseSender { #[instrument(skip(self))] fn drop(&mut self) { // we don't panic if we are shutting down anyway @@ -253,13 +323,100 @@ impl Drop for MustUseOneshotSender { // is_canceled() will not panic, because we check is_none() first assert!( self.tx.is_none() || self.is_canceled(), - "unused oneshot sender: oneshot must be used or canceled: {:?}", + "unused client response sender: oneshot must be used or canceled: {:?}", self ); } } } +impl MissingInventoryCollector { + /// Returns a newly created missing inventory collector, if needed. + /// + /// If `request` or the response contains missing inventory, + /// it is forwarded to the `inv_collector`, for the peer at `transient_addr`. + pub fn new( + request: &Request, + inv_collector: Option>, + transient_addr: Option, + ) -> Option> { + if !request.is_inventory_download() { + return None; + } + + if let (Some(inv_collector), Some(transient_addr)) = (inv_collector, transient_addr) { + Some(Box::new(MissingInventoryCollector { + request: request.clone(), + collector: inv_collector, + transient_addr, + })) + } else { + None + } + } + + /// Forwards any missing inventory to the registry. + /// + /// `zcashd` doesn't send `notfound` messages for blocks, + /// so we need to track missing blocks ourselves. + /// + /// This can sometimes send duplicate missing inventory, + /// but the registry ignores duplicates anyway. + pub fn send(self, response: &Result) { + let missing_inv: HashSet = match (self.request, response) { + // Missing block hashes from partial responses. + (_, Ok(Response::Blocks(block_statuses))) => block_statuses + .iter() + .filter_map(|b| b.missing()) + .map(InventoryHash::Block) + .collect(), + + // Missing transaction IDs from partial responses. + (_, Ok(Response::Transactions(tx_statuses))) => tx_statuses + .iter() + .filter_map(|tx| tx.missing()) + .map(|tx| tx.into()) + .collect(), + + // Other response types never contain missing inventory. + (_, Ok(_)) => iter::empty().collect(), + + // We don't forward NotFoundRegistry errors, + // because the errors are generated locally from the registry, + // so those statuses are already in the registry. + // + // Unfortunately, we can't access the inner error variant here, + // due to TracedError. + (_, Err(e)) if e.inner_debug().contains("NotFoundRegistry") => iter::empty().collect(), + + // Missing inventory from other errors, including NotFoundResponse, timeouts, + // and dropped connections. + (request, Err(_)) => { + // The request either contains blocks or transactions, + // but this is a convenient way to collect them both. + let missing_blocks = request + .block_hash_inventory() + .into_iter() + .map(InventoryHash::Block); + + let missing_txs = request + .transaction_id_inventory() + .into_iter() + .map(InventoryHash::from); + + missing_blocks.chain(missing_txs).collect() + } + }; + + if let Some(missing_inv) = + InventoryChange::new_missing_multi(missing_inv.iter(), self.transient_addr) + { + // if all the receivers are closed, assume we're in tests or an isolated connection + let _ = self.collector.send(missing_inv); + } + } +} + impl Client { /// Check if this connection's heartbeat task has exited. fn check_heartbeat(&mut self, cx: &mut Context<'_>) -> Result<(), SharedPeerError> { @@ -401,7 +558,13 @@ impl Service for Client { // request. let span = tracing::Span::current(); - match self.server_tx.try_send(ClientRequest { request, tx, span }) { + match self.server_tx.try_send(ClientRequest { + request, + tx, + inv_collector: Some(self.inv_collector.clone()), + transient_addr: self.transient_addr, + span, + }) { Err(e) => { if e.is_disconnected() { let ClientRequest { tx, .. } = e.into_inner(); diff --git a/zebra-network/src/peer/client/tests.rs b/zebra-network/src/peer/client/tests.rs index b26553b4..d11cc740 100644 --- a/zebra-network/src/peer/client/tests.rs +++ b/zebra-network/src/peer/client/tests.rs @@ -1,7 +1,7 @@ //! Tests for the [`Client`] part of peer connections, and some test utilities for mocking //! [`Client`] instances. -mod vectors; +#![cfg_attr(feature = "proptest-impl", allow(dead_code))] use std::time::Duration; @@ -9,13 +9,20 @@ use futures::{ channel::{mpsc, oneshot}, future::{self, AbortHandle, Future, FutureExt}, }; -use tokio::task::JoinHandle; +use tokio::{ + sync::broadcast::{self, error::TryRecvError}, + task::JoinHandle, +}; use crate::{ peer::{error::SharedPeerError, CancelHeartbeatTask, Client, ClientRequest, ErrorSlot}, + peer_set::InventoryChange, protocol::external::types::Version, }; +#[cfg(test)] +mod vectors; + /// The maximum time a mocked peer connection should be alive during a test. const MAX_PEER_CONNECTION_TIME: Duration = Duration::from_secs(10); @@ -23,6 +30,8 @@ const MAX_PEER_CONNECTION_TIME: Duration = Duration::from_secs(10); pub struct ClientTestHarness { client_request_receiver: Option>, shutdown_receiver: Option>, + #[allow(dead_code)] + inv_receiver: Option>, error_slot: ErrorSlot, version: Version, connection_aborter: AbortHandle, @@ -109,6 +118,42 @@ impl ClientTestHarness { } } + /// Drops the receiver endpoint of [`InventoryChanges`], forcefully closing the channel. + /// + /// The inventory registry that would track the changes is mocked for testing. + /// + /// Note: this closes the broadcast receiver, it doesn't have a separate `close()` method. + #[allow(dead_code)] + pub fn drop_inventory_change_receiver(&mut self) { + self.inv_receiver + .take() + .expect("inventory change receiver endpoint has already been dropped"); + } + + /// Tries to receive an [`InventoryChange`] sent by the [`Client`] instance. + /// + /// This method acts like a mock inventory registry, allowing tests to track the changes. + /// + /// TODO: make ReceiveRequestAttempt generic, and use it here. + #[allow(dead_code)] + pub(crate) fn try_to_receive_inventory_change(&mut self) -> Option { + let receive_result = self + .inv_receiver + .as_mut() + .expect("inventory change receiver endpoint has been dropped") + .try_recv(); + + match receive_result { + Ok(change) => Some(change), + Err(TryRecvError::Empty) => None, + Err(TryRecvError::Closed) => None, + Err(TryRecvError::Lagged(skipped_messages)) => unreachable!( + "unexpected lagged inventory receiver in tests, skipped {} messages", + skipped_messages, + ), + } + } + /// Returns the current error in the [`ErrorSlot`], if there is one. pub fn current_error(&self) -> Option { self.error_slot.try_get_error() @@ -228,6 +273,8 @@ where pub fn finish(self) -> (Client, ClientTestHarness) { let (shutdown_sender, shutdown_receiver) = oneshot::channel(); let (client_request_sender, client_request_receiver) = mpsc::channel(1); + let (inv_sender, inv_receiver) = broadcast::channel(5); + let error_slot = ErrorSlot::default(); let version = self.version.unwrap_or(Version(0)); @@ -239,6 +286,8 @@ where let client = Client { shutdown_tx: Some(shutdown_sender), server_tx: client_request_sender, + inv_collector: inv_sender, + transient_addr: None, error_slot: error_slot.clone(), version, connection_task, @@ -248,6 +297,7 @@ where let harness = ClientTestHarness { client_request_receiver: Some(client_request_receiver), shutdown_receiver: Some(shutdown_receiver), + inv_receiver: Some(inv_receiver), error_slot, version, connection_aborter, diff --git a/zebra-network/src/peer/client/tests/vectors.rs b/zebra-network/src/peer/client/tests/vectors.rs index 5130699b..bb80fcbc 100644 --- a/zebra-network/src/peer/client/tests/vectors.rs +++ b/zebra-network/src/peer/client/tests/vectors.rs @@ -1,11 +1,19 @@ //! Fixed peer [`Client`] test vectors. +use std::iter; + use futures::poll; +use tokio::sync::broadcast; use tower::ServiceExt; +use zebra_chain::block; use zebra_test::service_extensions::IsReady; -use crate::{peer::ClientTestHarness, PeerError}; +use crate::{ + peer::{client::MissingInventoryCollector, ClientTestHarness}, + protocol::external::InventoryHash, + PeerError, Request, SharedPeerError, +}; /// Test that a newly initialized client functions correctly before it is polled. #[tokio::test] @@ -217,3 +225,36 @@ async fn client_service_propagates_panic_from_heartbeat_task() { let _ = poll!(client.ready()); } + +/// Make sure MissingInventoryCollector ignores NotFoundRegistry errors. +/// +/// ## Correctness +/// +/// If the MissingInventoryCollector registered these locally generated errors, +/// our missing inventory errors could get constantly refreshed locally, +/// and we would never ask the peer if it has received the inventory. +#[test] +fn missing_inv_collector_ignores_local_registry_errors() { + zebra_test::init(); + + let block_hash = block::Hash([0; 32]); + let request = Request::BlocksByHash(iter::once(block_hash).collect()); + let response = Err(SharedPeerError::from(PeerError::NotFoundRegistry(vec![ + InventoryHash::from(block_hash), + ]))); + + let (inv_collector, mut inv_receiver) = broadcast::channel(1); + let transient_addr = "0.0.0.0:0".parse().unwrap(); + + // Keep the channel open, so we don't get a `Closed` error. + let _inv_channel_guard = inv_collector.clone(); + + let missing_inv = + MissingInventoryCollector::new(&request, Some(inv_collector), Some(transient_addr)) + .expect("unexpected invalid collector: arguments should be valid"); + + missing_inv.send(&response); + + let recv_result = inv_receiver.try_recv(); + assert_eq!(recv_result, Err(broadcast::error::TryRecvError::Empty)); +} diff --git a/zebra-network/src/peer/connection.rs b/zebra-network/src/peer/connection.rs index d8d4787c..87cdc31d 100644 --- a/zebra-network/src/peer/connection.rs +++ b/zebra-network/src/peer/connection.rs @@ -29,18 +29,18 @@ use crate::{ meta_addr::MetaAddr, peer::{ connection::peer_tx::PeerTx, error::AlreadyErrored, ClientRequest, ClientRequestReceiver, - ConnectedAddr, ErrorSlot, InProgressClientRequest, MustUseOneshotSender, PeerError, + ConnectedAddr, ErrorSlot, InProgressClientRequest, MustUseClientResponseSender, PeerError, SharedPeerError, }, peer_set::ConnectionTracker, protocol::{ external::{types::Nonce, InventoryHash, Message}, - internal::{Request, Response, ResponseStatus}, + internal::{InventoryResponse, Request, Response}, }, BoxError, }; -use ResponseStatus::*; +use InventoryResponse::*; mod peer_tx; @@ -151,7 +151,7 @@ impl Handler { (Handler::Peers, Message::Addr(addrs)) => Handler::Finished(Ok(Response::Peers(addrs))), // `zcashd` returns requested transactions in a single batch of messages. // Other transaction or non-transaction messages can come before or after the batch. - // After the transaction batch, `zcashd` sends `NotFound` if any transactions are missing: + // After the transaction batch, `zcashd` sends `notfound` if any transactions are missing: // https://github.com/zcash/zcash/blob/e7b425298f6d9a54810cb7183f00be547e4d9415/src/main.cpp#L5617 ( Handler::TransactionsById { @@ -163,17 +163,17 @@ impl Handler { // assumptions: // - the transaction messages are sent in a single continuous batch // - missing transactions are silently skipped - // (there is no `NotFound` message at the end of the batch) + // (there is no `notfound` message at the end of the batch) if pending_ids.remove(&transaction.id) { // we are in the middle of the continuous transaction messages transactions.push(transaction); } else { // We got a transaction we didn't ask for. If the caller doesn't know any of the - // transactions, they should have sent a `NotFound` with all the hashes, rather + // transactions, they should have sent a `notfound` with all the hashes, rather // than an unsolicited transaction. // // So either: - // 1. The peer implements the protocol badly, skipping `NotFound`. + // 1. The peer implements the protocol badly, skipping `notfound`. // We should cancel the request, so we don't hang waiting for transactions // that will never arrive. // 2. The peer sent an unsolicited transaction. @@ -188,11 +188,11 @@ impl Handler { if ignored_msg.is_some() && transactions.is_empty() { // If we didn't get anything we wanted, retry the request. let missing_transaction_ids = pending_ids.into_iter().map(Into::into).collect(); - Handler::Finished(Err(PeerError::NotFound(missing_transaction_ids))) + Handler::Finished(Err(PeerError::NotFoundResponse(missing_transaction_ids))) } else if pending_ids.is_empty() || ignored_msg.is_some() { // If we got some of what we wanted, let the internal client know. - let available = transactions.into_iter().map(ResponseStatus::Available); - let missing = pending_ids.into_iter().map(ResponseStatus::Missing); + let available = transactions.into_iter().map(InventoryResponse::Available); + let missing = pending_ids.into_iter().map(InventoryResponse::Missing); Handler::Finished(Ok(Response::Transactions( available.chain(missing).collect(), @@ -214,12 +214,12 @@ impl Handler { Message::NotFound(missing_invs), ) => { // assumptions: - // - the peer eventually returns a transaction or a `NotFound` entry + // - the peer eventually returns a transaction or a `notfound` entry // for each hash - // - all `NotFound` entries are contained in a single message - // - the `NotFound` message comes after the transaction messages + // - all `notfound` entries are contained in a single message + // - the `notfound` message comes after the transaction messages // - // If we're in sync with the peer, then the `NotFound` should contain the remaining + // If we're in sync with the peer, then the `notfound` should contain the remaining // hashes from the handler. If we're not in sync with the peer, we should return // what we got so far. let missing_transaction_ids: HashSet<_> = transaction_ids(&missing_invs).collect(); @@ -236,11 +236,11 @@ impl Handler { if transactions.is_empty() { // If we didn't get anything we wanted, retry the request. let missing_transaction_ids = pending_ids.into_iter().map(Into::into).collect(); - Handler::Finished(Err(PeerError::NotFound(missing_transaction_ids))) + Handler::Finished(Err(PeerError::NotFoundResponse(missing_transaction_ids))) } else { // If we got some of what we wanted, let the internal client know. - let available = transactions.into_iter().map(ResponseStatus::Available); - let missing = pending_ids.into_iter().map(ResponseStatus::Missing); + let available = transactions.into_iter().map(InventoryResponse::Available); + let missing = pending_ids.into_iter().map(InventoryResponse::Missing); Handler::Finished(Ok(Response::Transactions( available.chain(missing).collect(), @@ -249,7 +249,7 @@ impl Handler { } // `zcashd` returns requested blocks in a single batch of messages. // Other blocks or non-blocks messages can come before or after the batch. - // `zcashd` silently skips missing blocks, rather than sending a final `NotFound` message. + // `zcashd` silently skips missing blocks, rather than sending a final `notfound` message. // https://github.com/zcash/zcash/blob/e7b425298f6d9a54810cb7183f00be547e4d9415/src/main.cpp#L5523 ( Handler::BlocksByHash { @@ -261,7 +261,7 @@ impl Handler { // assumptions: // - the block messages are sent in a single continuous batch // - missing blocks are silently skipped - // (there is no `NotFound` message at the end of the batch) + // (there is no `notfound` message at the end of the batch) if pending_hashes.remove(&block.hash()) { // we are in the middle of the continuous block messages blocks.push(block); @@ -286,16 +286,20 @@ impl Handler { // when the response for the second request arrives. // // Ignoring the message gives us a chance to synchronize back to the correct - // request. + // request. If that doesn't happen, this request times out. // - // Peers can avoid these cascading errors by sending an explicit `notfound`. - // Zebra sends `notfound`, but `zcashd` doesn't. + // In case 2, if peers respond with a `notfound` message, + // the cascading errors don't happen. The `notfound` message cancels our request, + // and we know we are in sync with the peer. + // + // Zebra sends `notfound` in response to block requests, but `zcashd` doesn't. + // So we need this message workaround, and the related inventory workarounds. ignored_msg = Some(Message::Block(block)); } if pending_hashes.is_empty() { // If we got everything we wanted, let the internal client know. - let available = blocks.into_iter().map(ResponseStatus::Available); + let available = blocks.into_iter().map(InventoryResponse::Available); Handler::Finished(Ok(Response::Blocks(available.collect()))) } else { // Keep on waiting for all the blocks we wanted, until we get them or time out. @@ -314,12 +318,12 @@ impl Handler { Message::NotFound(missing_invs), ) => { // assumptions: - // - the peer eventually returns a block or a `NotFound` entry + // - the peer eventually returns a block or a `notfound` entry // for each hash - // - all `NotFound` entries are contained in a single message - // - the `NotFound` message comes after the block messages + // - all `notfound` entries are contained in a single message + // - the `notfound` message comes after the block messages // - // If we're in sync with the peer, then the `NotFound` should contain the remaining + // If we're in sync with the peer, then the `notfound` should contain the remaining // hashes from the handler. If we're not in sync with the peer, we should return // what we got so far, and log an error. let missing_blocks: HashSet<_> = block_hashes(&missing_invs).collect(); @@ -336,11 +340,11 @@ impl Handler { if blocks.is_empty() { // If we didn't get anything we wanted, retry the request. let missing_block_hashes = pending_hashes.into_iter().map(Into::into).collect(); - Handler::Finished(Err(PeerError::NotFound(missing_block_hashes))) + Handler::Finished(Err(PeerError::NotFoundResponse(missing_block_hashes))) } else { // If we got some of what we wanted, let the internal client know. - let available = blocks.into_iter().map(ResponseStatus::Available); - let missing = pending_hashes.into_iter().map(ResponseStatus::Missing); + let available = blocks.into_iter().map(InventoryResponse::Available); + let missing = pending_hashes.into_iter().map(InventoryResponse::Missing); Handler::Finished(Ok(Response::Blocks(available.chain(missing).collect()))) } @@ -387,7 +391,7 @@ pub(super) enum State { /// Awaiting a peer message we can interpret as a client request. AwaitingResponse { handler: Handler, - tx: MustUseOneshotSender>, + tx: MustUseClientResponseSender, span: tracing::Span, }, /// A failure has occurred and we are shutting down the connection. @@ -470,7 +474,7 @@ pub struct Connection { pub(super) client_rx: ClientRequestReceiver, /// A slot for an error shared between the Connection and the Client that uses it. - // + /// /// `None` unless the connection or client have errored. pub(super) error_slot: ErrorSlot, @@ -1243,7 +1247,7 @@ where } Response::Blocks(blocks) => { // Generate one tx message per block, - // then a notfound% message with all the missing block hashes. + // then a notfound message with all the missing block hashes. let mut missing_hashes = Vec::new(); for block in blocks.into_iter() { diff --git a/zebra-network/src/peer/connection/tests/prop.rs b/zebra-network/src/peer/connection/tests/prop.rs index fe600913..588d74bc 100644 --- a/zebra-network/src/peer/connection/tests/prop.rs +++ b/zebra-network/src/peer/connection/tests/prop.rs @@ -20,11 +20,11 @@ use zebra_test::mock_service::{MockService, PropTestAssertion}; use crate::{ peer::{connection::Connection, ClientRequest, ErrorSlot}, protocol::external::Message, - protocol::internal::ResponseStatus, + protocol::internal::InventoryResponse, Request, Response, SharedPeerError, }; -use ResponseStatus::*; +use InventoryResponse::*; proptest! { // The default value of proptest cases (256) causes this test to take more than ten seconds on @@ -156,6 +156,9 @@ async fn send_block_request( let client_request = ClientRequest { request, tx: response_sender, + // we skip inventory collection in these tests + inv_collector: None, + transient_addr: None, span: Span::none(), }; diff --git a/zebra-network/src/peer/connection/tests/vectors.rs b/zebra-network/src/peer/connection/tests/vectors.rs index b9bce227..ba995333 100644 --- a/zebra-network/src/peer/connection/tests/vectors.rs +++ b/zebra-network/src/peer/connection/tests/vectors.rs @@ -132,6 +132,8 @@ async fn connection_run_loop_message_ok() { let request = ClientRequest { request: Request::Peers, tx: request_tx, + inv_collector: None, + transient_addr: None, span: Span::current(), }; @@ -459,6 +461,8 @@ async fn connection_run_loop_send_timeout_nil_response() { let request = ClientRequest { request: Request::AdvertiseTransactionIds(HashSet::new()), tx: request_tx, + inv_collector: None, + transient_addr: None, span: Span::current(), }; @@ -532,6 +536,8 @@ async fn connection_run_loop_send_timeout_expect_response() { let request = ClientRequest { request: Request::Peers, tx: request_tx, + inv_collector: None, + transient_addr: None, span: Span::current(), }; @@ -605,6 +611,8 @@ async fn connection_run_loop_receive_timeout() { let request = ClientRequest { request: Request::Peers, tx: request_tx, + inv_collector: None, + transient_addr: None, span: Span::current(), }; diff --git a/zebra-network/src/peer/error.rs b/zebra-network/src/peer/error.rs index 558836ab..7fb46e8b 100644 --- a/zebra-network/src/peer/error.rs +++ b/zebra-network/src/peer/error.rs @@ -82,9 +82,40 @@ pub enum PeerError { #[error("Internal services over capacity")] Overloaded, - /// We requested data that the peer couldn't find. - #[error("Remote peer could not find items: {0:?}")] - NotFound(Vec), + /// We requested data, but the peer replied with a `notfound` message. + /// (Or it didn't respond before the request finished.) + /// + /// This error happens when the peer doesn't have any of the requested data, + /// so that the original request can be retried. + /// + /// This is a temporary error. + /// + /// Zebra can try different peers if the request is retried, + /// or peers can download and verify the missing data. + /// + /// If the peer has some of the data, the request returns an [`Ok`] response, + /// with any `notfound` data is marked as [`Missing`](InventoryResponse::Missing). + #[error("Remote peer could not find any of the items: {0:?}")] + NotFoundResponse(Vec), + + /// We requested data, but all our ready peers are marked as recently + /// [`Missing`](InventoryResponse::Missing) that data in our local inventory registry. + /// + /// This is a temporary error. + /// + /// Peers with the inventory can finish their requests and become ready, + /// or other peers can download and verify the missing data. + /// + /// # Correctness + /// + /// This error is produced using Zebra's local inventory registry, + /// without contacting any peers. + /// + /// Client responses containing this error must not be used to update the inventory registry. + /// This makes sure that we eventually expire our local cache of missing inventory, + /// and send requests to peers again. + #[error("All ready peers are registered as recently missing these items: {0:?}")] + NotFoundRegistry(Vec), } impl PeerError { @@ -103,7 +134,8 @@ impl PeerError { PeerError::Serialization(inner) => format!("Serialization({})", inner).into(), PeerError::DuplicateHandshake => "DuplicateHandshake".into(), PeerError::Overloaded => "Overloaded".into(), - PeerError::NotFound(_) => "NotFound".into(), + PeerError::NotFoundResponse(_) => "NotFoundResponse".into(), + PeerError::NotFoundRegistry(_) => "NotFoundRegistry".into(), } } } diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index 1f0e9d2a..d5d5b4d1 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -882,7 +882,7 @@ where // So we can just track peer activity based on Ping and Pong. // (This significantly improves performance, by reducing time system calls.) let inbound_ts_collector = address_book_updater.clone(); - let inv_collector = inv_collector.clone(); + let inbound_inv_collector = inv_collector.clone(); let ts_inner_conn_span = connection_span.clone(); let inv_inner_conn_span = connection_span.clone(); let peer_rx = peer_rx @@ -892,6 +892,7 @@ where let inbound_ts_collector = inbound_ts_collector.clone(); let span = debug_span!(parent: ts_inner_conn_span.clone(), "inbound_ts_collector"); + async move { match &msg { Ok(msg) => { @@ -935,9 +936,10 @@ where .instrument(span) }) .then(move |msg| { - let inv_collector = inv_collector.clone(); + let inbound_inv_collector = inbound_inv_collector.clone(); let span = debug_span!(parent: inv_inner_conn_span.clone(), "inventory_filter"); - register_inventory_status(msg, connected_addr, inv_collector).instrument(span) + register_inventory_status(msg, connected_addr, inbound_inv_collector) + .instrument(span) }) .boxed(); @@ -971,6 +973,8 @@ where let client = Client { shutdown_tx: Some(shutdown_tx), server_tx, + inv_collector, + transient_addr: connected_addr.get_transient_addr(), error_slot, version: remote_version, connection_task, @@ -1184,6 +1188,9 @@ async fn send_one_heartbeat( match server_tx.try_send(ClientRequest { request, tx, + // we're not requesting inventory, so we don't need to update the registry + inv_collector: None, + transient_addr: None, span: tracing::Span::current(), }) { Ok(()) => {} diff --git a/zebra-network/src/peer/minimum_peer_version/tests.rs b/zebra-network/src/peer/minimum_peer_version/tests.rs index 83700f5f..57f6fed5 100644 --- a/zebra-network/src/peer/minimum_peer_version/tests.rs +++ b/zebra-network/src/peer/minimum_peer_version/tests.rs @@ -1,5 +1,7 @@ //! Test utilities and tests for minimum network peer version requirements. +#![cfg_attr(feature = "proptest-impl", allow(dead_code))] + use zebra_chain::{ chain_tip::mock::{MockChainTip, MockChainTipSender}, parameters::Network, diff --git a/zebra-network/src/peer_set/inventory_registry.rs b/zebra-network/src/peer_set/inventory_registry.rs index 6d3db30f..d7c4a702 100644 --- a/zebra-network/src/peer_set/inventory_registry.rs +++ b/zebra-network/src/peer_set/inventory_registry.rs @@ -8,7 +8,6 @@ use std::{ net::SocketAddr, pin::Pin, task::{Context, Poll}, - time::Duration, }; use futures::{FutureExt, Stream, StreamExt}; @@ -18,17 +17,18 @@ use tokio::{ }; use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream, IntervalStream}; -use zebra_chain::{parameters::POST_BLOSSOM_POW_TARGET_SPACING, serialization::AtLeastOne}; +use zebra_chain::serialization::AtLeastOne; use crate::{ - protocol::{external::InventoryHash, internal::ResponseStatus}, + constants::INVENTORY_ROTATION_INTERVAL, + protocol::{external::InventoryHash, internal::InventoryResponse}, BoxError, }; use self::update::Update; /// Underlying type for the alias InventoryStatus::* -use ResponseStatus::*; +use InventoryResponse::*; pub mod update; @@ -36,7 +36,7 @@ pub mod update; mod tests; /// A peer inventory status, which tracks a hash for both available and missing inventory. -pub type InventoryStatus = ResponseStatus; +pub type InventoryStatus = InventoryResponse; /// A peer inventory status change, used in the inventory status channel. /// @@ -106,7 +106,6 @@ impl InventoryChange { } /// Returns a new missing multiple inventory change, if `hashes` contains at least one change. - #[allow(dead_code)] pub fn new_missing_multi<'a>( hashes: impl IntoIterator, peer: SocketAddr, @@ -135,8 +134,8 @@ impl InventoryStatus { } impl InventoryStatus { - /// Get the inner item, regardless of status. - pub fn inner(&self) -> T { + /// Returns a clone of the inner item, regardless of status. + pub fn to_inner(&self) -> T { match self { Available(item) | Missing(item) => item.clone(), } @@ -146,11 +145,7 @@ impl InventoryStatus { impl InventoryRegistry { /// Returns a new Inventory Registry for `inv_stream`. pub fn new(inv_stream: broadcast::Receiver) -> Self { - let interval = Duration::from_secs( - POST_BLOSSOM_POW_TARGET_SPACING - .try_into() - .expect("non-negative"), - ); + let interval = INVENTORY_ROTATION_INTERVAL; // Don't do an immediate rotation, current and prev are already empty. let mut interval = tokio::time::interval_at(Instant::now() + interval, interval); @@ -270,7 +265,7 @@ impl InventoryRegistry { /// `Missing` markers are not updated until the registry rotates, for security reasons. fn register(&mut self, change: InventoryChange) { let new_status = change.marker(); - let (invs, addr) = change.inner(); + let (invs, addr) = change.to_inner(); for inv in invs { use InventoryHash::*; diff --git a/zebra-network/src/peer_set/inventory_registry/tests/prop.rs b/zebra-network/src/peer_set/inventory_registry/tests/prop.rs index 5d880d55..acaf2795 100644 --- a/zebra-network/src/peer_set/inventory_registry/tests/prop.rs +++ b/zebra-network/src/peer_set/inventory_registry/tests/prop.rs @@ -29,7 +29,6 @@ proptest! { // Start the runtime let runtime = zebra_test::init_async(); - let _guard = runtime.enter(); runtime.block_on(async move { // Check all combinations of: diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index e8e8a6f9..5d563180 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -654,7 +654,7 @@ where /// falling back to a ready peer that isn't missing the inventory. /// /// If all ready peers are missing the inventory, - /// returns a [`NotFound`](PeerError::NotFound) error. + /// returns a synthetic [`NotFoundRegistry`](PeerError::NotFoundRegistry) error. /// /// Uses P2C to route requests to the least loaded peer in each list. fn route_inv( @@ -724,7 +724,9 @@ where // Avoid routing requests to peers that are missing inventory. // If we kept trying doomed requests, peers that are missing our requested inventory // could take up a large amount of our bandwidth and retry limits. - Err(SharedPeerError::from(PeerError::NotFound(vec![hash]))) + Err(SharedPeerError::from(PeerError::NotFoundRegistry(vec![ + hash, + ]))) } .map_err(Into::into) .boxed() diff --git a/zebra-network/src/peer_set/set/tests/vectors.rs b/zebra-network/src/peer_set/set/tests/vectors.rs index c9cb71ce..31be021e 100644 --- a/zebra-network/src/peer_set/set/tests/vectors.rs +++ b/zebra-network/src/peer_set/set/tests/vectors.rs @@ -542,7 +542,7 @@ fn peer_set_route_inv_all_missing_fail() { .downcast_ref::() .expect("peer set should return a boxed SharedPeerError") .inner_debug(), - "NotFound([Block(block::Hash(\"0000000000000000000000000000000000000000000000000000000000000000\"))])" + "NotFoundRegistry([Block(block::Hash(\"0000000000000000000000000000000000000000000000000000000000000000\"))])" ); }); } diff --git a/zebra-network/src/protocol/external/addr.rs b/zebra-network/src/protocol/external/addr.rs index cc070a63..a855e566 100644 --- a/zebra-network/src/protocol/external/addr.rs +++ b/zebra-network/src/protocol/external/addr.rs @@ -20,6 +20,7 @@ pub use in_version::AddrInVersion; pub(super) use v1::AddrV1; pub(super) use v2::AddrV2; +#[allow(unused_imports)] #[cfg(any(test, feature = "proptest-impl"))] pub(super) use v1::{ipv6_mapped_socket_addr, ADDR_V1_SIZE}; diff --git a/zebra-network/src/protocol/external/message.rs b/zebra-network/src/protocol/external/message.rs index a3674666..436e96ff 100644 --- a/zebra-network/src/protocol/external/message.rs +++ b/zebra-network/src/protocol/external/message.rs @@ -230,7 +230,7 @@ pub enum Message { /// /// `zcashd` returns requested items in a single batch of messages. /// Missing blocks are silently skipped. Missing transaction hashes are - /// included in a single `NotFound` message following the transactions. + /// included in a single `notfound` message following the transactions. /// Other item or non-item messages can come before or after the batch. /// /// The list contains zero or more inventory hashes. @@ -254,13 +254,15 @@ pub enum Message { /// A `notfound` message. /// + /// Zebra responds with this message when it doesn't have the requested blocks or transactions. + /// /// When a peer requests a list of transaction hashes, `zcashd` returns: /// - a batch of messages containing found transactions, then - /// - a `NotFound` message containing a list of transaction hashes that + /// - a `notfound` message containing a list of transaction hashes that /// aren't available in its mempool or state. /// /// But when a peer requests blocks or headers, any missing items are - /// silently skipped, without any `NotFound` messages. + /// silently skipped, without any `notfound` messages. /// /// The list contains zero or more inventory hashes. /// diff --git a/zebra-network/src/protocol/external/types.rs b/zebra-network/src/protocol/external/types.rs index 8e8599d5..a4e8238e 100644 --- a/zebra-network/src/protocol/external/types.rs +++ b/zebra-network/src/protocol/external/types.rs @@ -1,7 +1,3 @@ -#![allow(clippy::unit_arg)] - -use crate::constants::{self, magics}; - use std::{cmp::max, fmt}; use zebra_chain::{ @@ -12,7 +8,9 @@ use zebra_chain::{ }, }; -#[cfg(test)] +use crate::constants::{self, magics}; + +#[cfg(any(test, feature = "proptest-impl"))] use proptest_derive::Arbitrary; /// A magic number identifying the network. diff --git a/zebra-network/src/protocol/internal.rs b/zebra-network/src/protocol/internal.rs index df603a34..a3745164 100644 --- a/zebra-network/src/protocol/internal.rs +++ b/zebra-network/src/protocol/internal.rs @@ -4,4 +4,4 @@ mod response_status; pub use request::Request; pub use response::Response; -pub use response_status::ResponseStatus; +pub use response_status::InventoryResponse; diff --git a/zebra-network/src/protocol/internal/request.rs b/zebra-network/src/protocol/internal/request.rs index e92b4880..e4b79b6e 100644 --- a/zebra-network/src/protocol/internal/request.rs +++ b/zebra-network/src/protocol/internal/request.rs @@ -249,4 +249,30 @@ impl Request { Request::MempoolTransactionIds => "MempoolTransactionIds", } } + + /// Returns true if the request is for block or transaction inventory downloads. + pub fn is_inventory_download(&self) -> bool { + matches!( + self, + Request::BlocksByHash(_) | Request::TransactionsById(_) + ) + } + + /// Returns the block hash inventory downloads from the request, if any. + pub fn block_hash_inventory(&self) -> HashSet { + if let Request::BlocksByHash(block_hashes) = self { + block_hashes.clone() + } else { + HashSet::new() + } + } + + /// Returns the transaction ID inventory downloads from the request, if any. + pub fn transaction_id_inventory(&self) -> HashSet { + if let Request::TransactionsById(transaction_ids) = self { + transaction_ids.clone() + } else { + HashSet::new() + } + } } diff --git a/zebra-network/src/protocol/internal/response.rs b/zebra-network/src/protocol/internal/response.rs index b1d89c62..eb54e037 100644 --- a/zebra-network/src/protocol/internal/response.rs +++ b/zebra-network/src/protocol/internal/response.rs @@ -7,12 +7,12 @@ use zebra_chain::{ transaction::{UnminedTx, UnminedTxId}, }; -use crate::{meta_addr::MetaAddr, protocol::internal::ResponseStatus}; +use crate::{meta_addr::MetaAddr, protocol::internal::InventoryResponse}; #[cfg(any(test, feature = "proptest-impl"))] use proptest_derive::Arbitrary; -use ResponseStatus::*; +use InventoryResponse::*; /// A response to a network request, represented in internal format. #[derive(Clone, Debug, Eq, PartialEq)] @@ -66,15 +66,15 @@ pub enum Response { /// When Zebra doesn't have a block or transaction, it always sends `notfound`. /// `zcashd` sometimes sends no response, and sometimes sends `notfound`. // - // TODO: make this into a HashMap, ()>> - a unique list (#2244) - Blocks(Vec, block::Hash>>), + // TODO: make this into a HashMap, ()>> - a unique list (#2244) + Blocks(Vec, block::Hash>>), /// A list of found unmined transactions, and missing unmined transaction IDs. /// /// Each list contains zero or more entries. // - // TODO: make this into a HashMap> - a unique list (#2244) - Transactions(Vec>), + // TODO: make this into a HashMap> - a unique list (#2244) + Transactions(Vec>), } impl fmt::Display for Response { @@ -136,4 +136,9 @@ impl Response { Response::Transactions(_) => "Transactions", } } + + /// Returns true if the response is a block or transaction inventory download. + pub fn is_inventory_download(&self) -> bool { + matches!(self, Response::Blocks(_) | Response::Transactions(_)) + } } diff --git a/zebra-network/src/protocol/internal/response_status.rs b/zebra-network/src/protocol/internal/response_status.rs index a62b9576..1d5baba0 100644 --- a/zebra-network/src/protocol/internal/response_status.rs +++ b/zebra-network/src/protocol/internal/response_status.rs @@ -5,7 +5,7 @@ use std::fmt; #[cfg(any(test, feature = "proptest-impl"))] use proptest_derive::Arbitrary; -use ResponseStatus::*; +use InventoryResponse::*; /// A generic peer inventory response status. /// @@ -13,7 +13,7 @@ use ResponseStatus::*; /// and `Missing` is used for inventory that is missing from the response. #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] #[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))] -pub enum ResponseStatus { +pub enum InventoryResponse { /// An available inventory item. Available(A), @@ -21,37 +21,34 @@ pub enum ResponseStatus { Missing(M), } -impl fmt::Display for ResponseStatus { +impl fmt::Display for InventoryResponse { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.write_str(self.command()) } } -impl ResponseStatus { +impl InventoryResponse { /// Returns the response status type as a string. pub fn command(&self) -> &'static str { match self { - ResponseStatus::Available(_) => "Available", - ResponseStatus::Missing(_) => "Missing", + InventoryResponse::Available(_) => "Available", + InventoryResponse::Missing(_) => "Missing", } } /// Returns true if the inventory item was available. - #[allow(dead_code)] pub fn is_available(&self) -> bool { matches!(self, Available(_)) } /// Returns true if the inventory item was missing. - #[allow(dead_code)] pub fn is_missing(&self) -> bool { matches!(self, Missing(_)) } - /// Maps a `ResponseStatus` to `ResponseStatus` by applying a function to a + /// Maps a `InventoryResponse` to `InventoryResponse` by applying a function to a /// contained [`Available`] value, leaving the [`Missing`] value untouched. - #[allow(dead_code)] - pub fn map_available B>(self, f: F) -> ResponseStatus { + pub fn map_available B>(self, f: F) -> InventoryResponse { // Based on Result::map from https://doc.rust-lang.org/src/core/result.rs.html#765 match self { Available(a) => Available(f(a)), @@ -59,10 +56,9 @@ impl ResponseStatus { } } - /// Maps a `ResponseStatus` to `ResponseStatus` by applying a function to a + /// Maps a `InventoryResponse` to `InventoryResponse` by applying a function to a /// contained [`Missing`] value, leaving the [`Available`] value untouched. - #[allow(dead_code)] - pub fn map_missing N>(self, f: F) -> ResponseStatus { + pub fn map_missing N>(self, f: F) -> InventoryResponse { // Based on Result::map_err from https://doc.rust-lang.org/src/core/result.rs.html#850 match self { Available(a) => Available(a), @@ -70,8 +66,8 @@ impl ResponseStatus { } } - /// Converts from `&ResponseStatus` to `ResponseStatus<&A, &M>`. - pub fn as_ref(&self) -> ResponseStatus<&A, &M> { + /// Converts from `&InventoryResponse` to `InventoryResponse<&A, &M>`. + pub fn as_ref(&self) -> InventoryResponse<&A, &M> { match self { Available(item) => Available(item), Missing(item) => Missing(item), @@ -79,7 +75,7 @@ impl ResponseStatus { } } -impl ResponseStatus { +impl InventoryResponse { /// Get the available inventory item, if present. pub fn available(&self) -> Option { if let Available(item) = self { @@ -90,7 +86,6 @@ impl ResponseStatus { } /// Get the missing inventory item, if present. - #[allow(dead_code)] pub fn missing(&self) -> Option { if let Missing(item) = self { Some(item.clone()) diff --git a/zebra-state/Cargo.toml b/zebra-state/Cargo.toml index d38a7ff8..917b5539 100644 --- a/zebra-state/Cargo.toml +++ b/zebra-state/Cargo.toml @@ -24,7 +24,7 @@ metrics = "0.17.1" # The fix should be included in multiset 0.0.6. multiset = { git = "https://github.com/jmitchell/multiset", rev = "91ef8550b518f75ae87ae0d8771150f259fd34d5" } proptest = { version = "0.10.1", optional = true } -proptest-derive = { version = "0.3", optional = true } +proptest-derive = { version = "0.3.0", optional = true } regex = "1" rlimit = "0.5.4" rocksdb = "0.17.0" @@ -47,7 +47,7 @@ halo2 = "=0.1.0-beta.1" itertools = "0.10.3" jubjub = "0.8.0" proptest = "0.10.1" -proptest-derive = "0.3" +proptest-derive = "0.3.0" spandoc = "0.2" tokio = { version = "1.16.1", features = ["full"] } diff --git a/zebrad/Cargo.toml b/zebrad/Cargo.toml index 6ed7ea51..a79f7163 100644 --- a/zebrad/Cargo.toml +++ b/zebrad/Cargo.toml @@ -61,11 +61,12 @@ semver = "1.0.5" tempfile = "3.3.0" tokio = { version = "1.16.1", features = ["full", "test-util"] } -proptest = "0.10" -proptest-derive = "0.3" +proptest = "0.10.1" +proptest-derive = "0.3.0" zebra-chain = { path = "../zebra-chain", features = ["proptest-impl"] } zebra-consensus = { path = "../zebra-consensus/", features = ["proptest-impl"] } +zebra-network = { path = "../zebra-network", features = ["proptest-impl"] } zebra-state = { path = "../zebra-state", features = ["proptest-impl"] } zebra-test = { path = "../zebra-test" } diff --git a/zebrad/src/components/inbound.rs b/zebrad/src/components/inbound.rs index 1fb84591..5267a4c2 100644 --- a/zebrad/src/components/inbound.rs +++ b/zebrad/src/components/inbound.rs @@ -31,7 +31,7 @@ use zebra_chain::{ use zebra_consensus::chain::VerifyChainError; use zebra_network::{ constants::{ADDR_RESPONSE_LIMIT_DENOMINATOR, MAX_ADDRS_IN_MESSAGE}, - AddressBook, ResponseStatus, + AddressBook, InventoryResponse, }; // Re-use the syncer timeouts for consistency. @@ -40,7 +40,7 @@ use super::{ sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT}, }; -use ResponseStatus::*; +use InventoryResponse::*; pub(crate) mod downloads; @@ -319,6 +319,9 @@ impl Service for Inbound { async move { // Correctness: get the current time after acquiring the address book lock. + // + // This time is used to filter outdated peers, so it doesn't really matter + // if we get it when the future is created, or when it starts running. let now = Utc::now(); // Send a sanitized response @@ -358,17 +361,14 @@ impl Service for Inbound { // https://github.com/tower-rs/tower/blob/master/tower/src/util/call_all/common.rs#L112 use futures::stream::TryStreamExt; hashes - .clone() - .into_iter() + .iter() + .cloned() .map(|hash| zs::Request::Block(hash.into())) .map(|request| state.clone().oneshot(request)) .collect::>() .try_filter_map(|response| async move { Ok(match response { zs::Response::Block(Some(block)) => Some(block), - // `zcashd` ignores missing blocks in GetData responses, - // rather than including them in a trailing `NotFound` - // message zs::Response::Block(None) => None, _ => unreachable!("wrong response from state"), }) diff --git a/zebrad/src/components/inbound/tests/fake_peer_set.rs b/zebrad/src/components/inbound/tests/fake_peer_set.rs index d29623ee..e5b4ca64 100644 --- a/zebrad/src/components/inbound/tests/fake_peer_set.rs +++ b/zebrad/src/components/inbound/tests/fake_peer_set.rs @@ -22,7 +22,7 @@ use zebra_chain::{ transaction::{UnminedTx, UnminedTxId, VerifiedUnminedTx}, }; use zebra_consensus::{error::TransactionError, transaction, Config as ConsensusConfig}; -use zebra_network::{AddressBook, Request, Response, ResponseStatus}; +use zebra_network::{AddressBook, InventoryResponse, Request, Response}; use zebra_state::Config as StateConfig; use zebra_test::mock_service::{MockService, PanicAssertion}; @@ -35,7 +35,7 @@ use crate::{ BoxError, }; -use ResponseStatus::*; +use InventoryResponse::*; /// Maximum time to wait for a network service request. /// diff --git a/zebrad/src/components/inbound/tests/real_peer_set.rs b/zebrad/src/components/inbound/tests/real_peer_set.rs index c4746a9e..dfac2da0 100644 --- a/zebrad/src/components/inbound/tests/real_peer_set.rs +++ b/zebrad/src/components/inbound/tests/real_peer_set.rs @@ -14,12 +14,13 @@ use tower::{ use zebra_chain::{ block::{self, Block}, parameters::Network, - transaction::{AuthDigest, Hash as TxHash, UnminedTxId, WtxId}, + serialization::ZcashDeserializeInto, + transaction::{AuthDigest, Hash as TxHash, Transaction, UnminedTx, UnminedTxId, WtxId}, }; use zebra_consensus::{chain::VerifyChainError, error::TransactionError, transaction}; use zebra_network::{ - connect_isolated_tcp_direct, Config as NetworkConfig, Request, Response, ResponseStatus, - SharedPeerError, + connect_isolated_tcp_direct_with_inbound, types::InventoryHash, Config as NetworkConfig, + InventoryResponse, PeerError, Request, Response, SharedPeerError, }; use zebra_state::Config as StateConfig; use zebra_test::mock_service::{MockService, PanicAssertion}; @@ -33,7 +34,7 @@ use crate::{ BoxError, }; -use ResponseStatus::*; +use InventoryResponse::*; /// Check that a network stack with an empty address book only contains the local listener port, /// by querying the inbound service via a local TCP connection. @@ -57,9 +58,9 @@ async fn inbound_peers_empty_address_book() -> Result<(), crate::BoxError> { tx_gossip_task_handle, // real open socket addresses listen_addr, - ) = setup().await; + ) = setup(None).await; - // Use inbound directly + // Send a request to inbound directly let request = inbound_service.clone().oneshot(Request::Peers); let response = request.await; match response.as_ref() { @@ -78,7 +79,7 @@ async fn inbound_peers_empty_address_book() -> Result<(), crate::BoxError> { ), }; - // Use the connected peer via a local TCP connection + // Send a request via the connected peer, via a local TCP connection, to the inbound service let request = connected_peer_service.clone().oneshot(Request::Peers); let response = request.await; match response.as_ref() { @@ -134,11 +135,11 @@ async fn inbound_block_empty_state_notfound() -> Result<(), crate::BoxError> { tx_gossip_task_handle, // real open socket addresses _listen_addr, - ) = setup().await; + ) = setup(None).await; let test_block = block::Hash([0x11; 32]); - // Use inbound directly + // Send a request to inbound directly let request = inbound_service .clone() .oneshot(Request::BlocksByHash(iter::once(test_block).collect())); @@ -159,26 +160,27 @@ async fn inbound_block_empty_state_notfound() -> Result<(), crate::BoxError> { ), }; - // Use the connected peer via a local TCP connection - let request = connected_peer_service + // Send a request via the connected peer, via a local TCP connection, to the inbound service + let response = connected_peer_service .clone() - .oneshot(Request::BlocksByHash(iter::once(test_block).collect())); - let response = request.await; - match response.as_ref() { - Err(missing_error) => { - let missing_error = missing_error - .downcast_ref::() - .expect("unexpected inner error type, expected SharedPeerError"); - assert_eq!( - missing_error.inner_debug(), - "NotFound([Block(block::Hash(\"1111111111111111111111111111111111111111111111111111111111111111\"))])" - ); - } - _ => unreachable!( - "peer::Connection should map missing `BlocksByHash` responses as `Err(SharedPeerError(NotFound(_)))`, \ + .oneshot(Request::BlocksByHash(iter::once(test_block).collect())) + .await; + if let Err(missing_error) = response.as_ref() { + let missing_error = missing_error + .downcast_ref::() + .expect("unexpected inner error type, expected SharedPeerError"); + + // Unfortunately, we can't access SharedPeerError's inner type, + // so we can't compare the actual responses. + let expected = PeerError::NotFoundResponse(vec![InventoryHash::Block(test_block)]); + let expected = SharedPeerError::from(expected); + assert_eq!(missing_error.inner_debug(), expected.inner_debug()); + } else { + unreachable!( + "peer::Connection should map missing `BlocksByHash` responses as `Err(SharedPeerError(NotFoundResponse(_)))`, \ actual result: {:?}", - response - ), + response + ) }; let block_gossip_result = block_gossip_task_handle.now_or_never(); @@ -201,8 +203,6 @@ async fn inbound_block_empty_state_notfound() -> Result<(), crate::BoxError> { /// Check that a network stack with an empty state responds to single transaction requests with `notfound`. /// /// Uses a real Zebra network stack, with an isolated Zebra inbound TCP connection. -/// -/// TODO: test a response with some Available and some Missing transactions. #[tokio::test] async fn inbound_tx_empty_state_notfound() -> Result<(), crate::BoxError> { let ( @@ -220,7 +220,7 @@ async fn inbound_tx_empty_state_notfound() -> Result<(), crate::BoxError> { tx_gossip_task_handle, // real open socket addresses _listen_addr, - ) = setup().await; + ) = setup(None).await; let test_tx = UnminedTxId::from_legacy_id(TxHash([0x22; 32])); let test_wtx: UnminedTxId = WtxId { @@ -231,7 +231,7 @@ async fn inbound_tx_empty_state_notfound() -> Result<(), crate::BoxError> { // Test both transaction ID variants, separately and together for txs in [vec![test_tx], vec![test_wtx], vec![test_tx, test_wtx]] { - // Use inbound directly + // Send a request to inbound directly let request = inbound_service .clone() .oneshot(Request::TransactionsById(txs.iter().copied().collect())); @@ -256,50 +256,55 @@ async fn inbound_tx_empty_state_notfound() -> Result<(), crate::BoxError> { ), }; - // Use the connected peer via a local TCP connection - let request = connected_peer_service + // Send a request via the connected peer, via a local TCP connection, to the inbound service + let response = connected_peer_service .clone() - .oneshot(Request::TransactionsById(txs.iter().copied().collect())); - let response = request.await; - match response.as_ref() { - Err(missing_error) => { - let missing_error = missing_error - .downcast_ref::() - .expect("unexpected inner error type, expected SharedPeerError"); + .oneshot(Request::TransactionsById(txs.iter().copied().collect())) + .await; + if let Err(missing_error) = response.as_ref() { + let missing_error = missing_error + .downcast_ref::() + .expect("unexpected inner error type, expected SharedPeerError"); - // Unfortunately, we can't access SharedPeerError's inner type, - // so we can't compare the actual responses. - if txs == vec![test_tx] { - assert_eq!( - missing_error.inner_debug(), - "NotFound([Tx(transaction::Hash(\"2222222222222222222222222222222222222222222222222222222222222222\"))])", - ); - } else if txs == vec![test_wtx] { - assert_eq!( - missing_error.inner_debug(), - "NotFound([Wtx(WtxId { id: transaction::Hash(\"3333333333333333333333333333333333333333333333333333333333333333\"), auth_digest: AuthDigest(\"4444444444444444444444444444444444444444444444444444444444444444\") })])", - ); - } else if txs == vec![test_tx, test_wtx] { - // The response order is unstable, because it depends on concurrent inbound futures. - // In #2244 we will fix this by replacing response Vecs with HashSets. - assert!( - missing_error.inner_debug() == - "NotFound([Tx(transaction::Hash(\"2222222222222222222222222222222222222222222222222222222222222222\")), Wtx(WtxId { id: transaction::Hash(\"3333333333333333333333333333333333333333333333333333333333333333\"), auth_digest: AuthDigest(\"4444444444444444444444444444444444444444444444444444444444444444\") })])" - || - missing_error.inner_debug() == - "NotFound([Wtx(WtxId { id: transaction::Hash(\"3333333333333333333333333333333333333333333333333333333333333333\"), auth_digest: AuthDigest(\"4444444444444444444444444444444444444444444444444444444444444444\") }), Tx(transaction::Hash(\"2222222222222222222222222222222222222222222222222222222222222222\"))])", - "unexpected response: {:?}", - missing_error.inner_debug(), - ); - } else { - unreachable!("unexpected test case"); - } + // Unfortunately, we can't access SharedPeerError's inner type, + // so we can't compare the actual responses. + if txs.len() <= 1 { + let expected = PeerError::NotFoundResponse( + txs.iter().copied().map(InventoryHash::from).collect(), + ); + let expected = SharedPeerError::from(expected); + assert_eq!(missing_error.inner_debug(), expected.inner_debug()); + } else { + // The response order is unstable, because it depends on concurrent inbound futures. + // In #2244 we will fix this by replacing response Vecs with HashSets. + // + // Assume there are only 2 transactions. + let expected1: Vec = + txs.iter().copied().map(InventoryHash::from).collect(); + let expected2: Vec = + txs.iter().rev().copied().map(InventoryHash::from).collect(); + + let expected: Vec = [expected1, expected2] + .into_iter() + .map(PeerError::NotFoundResponse) + .map(|error| SharedPeerError::from(error).inner_debug()) + .collect(); + let actual = missing_error.inner_debug(); + + assert!( + expected.iter().any(|expected| expected == &actual), + "unexpected response: {:?} \ + expected one of: {:?}", + actual, + expected, + ); } - _ => unreachable!( - "peer::Connection should map missing `TransactionsById` responses as `Err(SharedPeerError(NotFound(_)))`, \ + } else { + unreachable!( + "peer::Connection should map missing `TransactionsById` responses as `Err(SharedPeerError(NotFoundResponse(_)))`, \ actual result: {:?}", - response - ), + response + ) }; } @@ -320,12 +325,286 @@ async fn inbound_tx_empty_state_notfound() -> Result<(), crate::BoxError> { Ok(()) } +/// Check that a network stack: +/// - returns a `NotFound` error when a peer responds with an unrelated transaction, and +/// - returns a `NotFoundRegistry` error for repeated requests to a non-responsive peer. +/// +/// The requests are coming from the full stack to the isolated peer, +/// so this is the reverse of the previous tests. +/// +/// Uses a Zebra network stack's peer set to query an isolated Zebra TCP connection, +/// with an unrelated transaction test responder. +#[tokio::test] +async fn outbound_tx_unrelated_response_notfound() -> Result<(), crate::BoxError> { + // We respond with an unrelated transaction, so the peer gives up on the request. + let unrelated_response: Transaction = + zebra_test::vectors::DUMMY_TX1.zcash_deserialize_into()?; + let unrelated_response = Response::Transactions(vec![Available(unrelated_response.into())]); + + let ( + // real services + _connected_peer_service, + _inbound_service, + peer_set, + _mempool_service, + _state_service, + // mocked services + _mock_block_verifier, + _mock_tx_verifier, + // real tasks + block_gossip_task_handle, + tx_gossip_task_handle, + // real open socket addresses + _listen_addr, + ) = setup(Some(unrelated_response)).await; + + let test_tx5 = UnminedTxId::from_legacy_id(TxHash([0x55; 32])); + let test_wtx67: UnminedTxId = WtxId { + id: TxHash([0x66; 32]), + auth_digest: AuthDigest([0x77; 32]), + } + .into(); + let test_tx8 = UnminedTxId::from_legacy_id(TxHash([0x88; 32])); + let test_wtx91: UnminedTxId = WtxId { + id: TxHash([0x99; 32]), + auth_digest: AuthDigest([0x11; 32]), + } + .into(); + + // Test both transaction ID variants, separately and together. + // These IDs all need to be different, to avoid polluting the inventory registry between tests. + for txs in [vec![test_tx5], vec![test_wtx67], vec![test_tx8, test_wtx91]] { + // Send a request via the peer set, via a local TCP connection, + // to the isolated peer's `unrelated_response` inbound service + let response = peer_set + .clone() + .oneshot(Request::TransactionsById(txs.iter().copied().collect())) + .await; + + // Unfortunately, we can't access SharedPeerError's inner type, + // so we can't compare the actual responses. + if let Err(missing_error) = response.as_ref() { + let missing_error = missing_error + .downcast_ref::() + .expect("unexpected inner error type, expected SharedPeerError"); + + if txs.len() <= 1 { + let expected = PeerError::NotFoundResponse( + txs.iter().copied().map(InventoryHash::from).collect(), + ); + let expected = SharedPeerError::from(expected); + assert_eq!(missing_error.inner_debug(), expected.inner_debug()); + } else { + // The response order is unstable, because it depends on concurrent inbound futures. + // In #2244 we will fix this by replacing response Vecs with HashSets. + // + // Assume there are only 2 transactions. + let expected1: Vec = + txs.iter().copied().map(InventoryHash::from).collect(); + let expected2: Vec = + txs.iter().rev().copied().map(InventoryHash::from).collect(); + + let expected: Vec = [expected1, expected2] + .into_iter() + .map(PeerError::NotFoundResponse) + .map(|error| SharedPeerError::from(error).inner_debug()) + .collect(); + let actual = missing_error.inner_debug(); + + assert!( + expected.iter().any(|expected| expected == &actual), + "unexpected response: {:?} \ + expected one of: {:?}", + actual, + expected, + ); + } + } else { + unreachable!( + "peer::Connection should map missing `TransactionsById` responses as `Err(SharedPeerError(NotFoundResponse(_)))`, \ + actual result: {:?}", + response + ) + }; + + // The peer set only does routing for single-transaction requests. + // (But the inventory tracker tracks the response to requests of any size.) + for tx in &txs { + // Now send the same request to the peer set, + // but expect a local failure from the inventory registry. + let response = peer_set + .clone() + .oneshot(Request::TransactionsById(iter::once(tx).copied().collect())) + .await; + + // The only ready peer in the PeerSet failed the same request, + // so we expect the peer set to return a `NotFoundRegistry` error immediately. + // + // If these asserts fail, then the PeerSet isn't returning inv routing error responses. + // (Or the missing inventory from the previous timeout wasn't registered correctly.) + if let Err(missing_error) = response.as_ref() { + let missing_error = missing_error + .downcast_ref::() + .expect("unexpected inner error type, expected SharedPeerError"); + + // Unfortunately, we can't access SharedPeerError's inner type, + // so we can't compare the actual responses. + let expected = PeerError::NotFoundRegistry(vec![InventoryHash::from(*tx)]); + let expected = SharedPeerError::from(expected); + assert_eq!(missing_error.inner_debug(), expected.inner_debug()); + } else { + unreachable!( + "peer::Connection should map missing `TransactionsById` responses as `Err(SharedPeerError(NotFoundRegistry(_)))`, \ + actual result: {:?}", + response + ) + }; + } + } + + let block_gossip_result = block_gossip_task_handle.now_or_never(); + assert!( + matches!(block_gossip_result, None), + "unexpected error or panic in block gossip task: {:?}", + block_gossip_result, + ); + + let tx_gossip_result = tx_gossip_task_handle.now_or_never(); + assert!( + matches!(tx_gossip_result, None), + "unexpected error or panic in transaction gossip task: {:?}", + tx_gossip_result, + ); + + Ok(()) +} + +/// Check that a network stack: +/// - returns a partial notfound response, when a peer partially responds to a multi-transaction request, +/// - returns a `NotFoundRegistry` error for repeated requests to a non-responsive peer. +/// +/// The requests are coming from the full stack to the isolated peer. +#[tokio::test] +async fn outbound_tx_partial_response_notfound() -> Result<(), crate::BoxError> { + // We repeatedly respond with the same transaction, so the peer gives up on the second response. + let repeated_tx: Transaction = zebra_test::vectors::DUMMY_TX1.zcash_deserialize_into()?; + let repeated_tx: UnminedTx = repeated_tx.into(); + let repeated_response = Response::Transactions(vec![ + Available(repeated_tx.clone()), + Available(repeated_tx.clone()), + ]); + + let ( + // real services + _connected_peer_service, + _inbound_service, + peer_set, + _mempool_service, + _state_service, + // mocked services + _mock_block_verifier, + _mock_tx_verifier, + // real tasks + block_gossip_task_handle, + tx_gossip_task_handle, + // real open socket addresses + _listen_addr, + ) = setup(Some(repeated_response)).await; + + let missing_tx_id = UnminedTxId::from_legacy_id(TxHash([0x22; 32])); + + let txs = [missing_tx_id, repeated_tx.id]; + + // Send a request via the peer set, via a local TCP connection, + // to the isolated peer's `repeated_response` inbound service + let response = peer_set + .clone() + .oneshot(Request::TransactionsById(txs.iter().copied().collect())) + .await; + + if let Ok(Response::Transactions(tx_response)) = response.as_ref() { + let available: Vec = tx_response + .iter() + .filter_map(InventoryResponse::available) + .collect(); + let missing: Vec = tx_response + .iter() + .filter_map(InventoryResponse::missing) + .collect(); + + assert_eq!(available, vec![repeated_tx]); + assert_eq!(missing, vec![missing_tx_id]); + } else { + unreachable!( + "peer::Connection should map partial `TransactionsById` responses as `Ok(Response::Transactions(_))`, \ + actual result: {:?}", + response + ) + }; + + // Now send another request to the peer set with only the missing transaction, + // but expect a local failure from the inventory registry. + // + // The peer set only does routing for single-transaction requests. + // (But the inventory tracker tracks the response to requests of any size.) + let response = peer_set + .clone() + .oneshot(Request::TransactionsById( + iter::once(missing_tx_id).collect(), + )) + .await; + + // The only ready peer in the PeerSet failed the same request, + // so we expect the peer set to return a `NotFoundRegistry` error immediately. + // + // If these asserts fail, then the PeerSet isn't returning inv routing error responses. + // (Or the missing inventory from the previous timeout wasn't registered correctly.) + if let Err(missing_error) = response.as_ref() { + let missing_error = missing_error + .downcast_ref::() + .expect("unexpected inner error type, expected SharedPeerError"); + + // Unfortunately, we can't access SharedPeerError's inner type, + // so we can't compare the actual responses. + let expected = PeerError::NotFoundRegistry(vec![InventoryHash::from(missing_tx_id)]); + let expected = SharedPeerError::from(expected); + assert_eq!(missing_error.inner_debug(), expected.inner_debug()); + } else { + unreachable!( + "peer::Connection should map missing `TransactionsById` responses as `Err(SharedPeerError(NotFoundRegistry(_)))`, \ + actual result: {:?}", + response + ) + }; + + let block_gossip_result = block_gossip_task_handle.now_or_never(); + assert!( + matches!(block_gossip_result, None), + "unexpected error or panic in block gossip task: {:?}", + block_gossip_result, + ); + + let tx_gossip_result = tx_gossip_task_handle.now_or_never(); + assert!( + matches!(tx_gossip_result, None), + "unexpected error or panic in transaction gossip task: {:?}", + tx_gossip_result, + ); + + Ok(()) +} + /// Setup a real Zebra network stack, with a connected peer using a real isolated network stack. /// +/// The isolated peer responds to every request with `isolated_peer_response`. +/// (If no response is provided, the isolated peer ignores inbound requests.) +/// /// Uses fake verifiers, and does not run a block syncer task. -async fn setup() -> ( +async fn setup( + isolated_peer_response: Option, +) -> ( // real services - // connected peer + // connected peer which responds with isolated_peer_response Buffer< BoxService, zebra_network::Request, @@ -462,11 +741,23 @@ async fn setup() -> ( peer_set.clone(), )); + // Set up the inbound service response for the isolated peer + let isolated_peer_response = isolated_peer_response.unwrap_or(Response::Nil); + let response_inbound_service = tower::service_fn(move |_req| { + let isolated_peer_response = isolated_peer_response.clone(); + async move { Ok::(isolated_peer_response) } + }); + let user_agent = "test".to_string(); + // Open a fake peer connection to the inbound listener, using the isolated connection API - let connected_peer_service = - connect_isolated_tcp_direct(network, listen_addr, "test".to_string()) - .await - .expect("local listener connection succeeds"); + let connected_peer_service = connect_isolated_tcp_direct_with_inbound( + network, + listen_addr, + user_agent, + response_inbound_service, + ) + .await + .expect("local listener connection succeeds"); let connected_peer_service = ServiceBuilder::new() .buffer(10) .service(connected_peer_service); diff --git a/zebrad/src/components/sync.rs b/zebrad/src/components/sync.rs index cfbad120..a699d086 100644 --- a/zebrad/src/components/sync.rs +++ b/zebrad/src/components/sync.rs @@ -853,6 +853,8 @@ where BlockDownloadVerifyError::DownloadFailed(ref source) if format!("{:?}", source).contains("NotFound") => { + // Covers both NotFoundResponse and NotFoundRegistry errors. + // // TODO: improve this by checking the type (#2908) // restart after a certain number of NotFound errors? debug!(error = ?e, "block was not found, possibly from a peer that doesn't have the block yet, continuing"); diff --git a/zebrad/src/components/sync/tests/timing.rs b/zebrad/src/components/sync/tests/timing.rs index 6fd76b02..3f3f4cd1 100644 --- a/zebrad/src/components/sync/tests/timing.rs +++ b/zebrad/src/components/sync/tests/timing.rs @@ -10,10 +10,18 @@ use futures::future; use tokio::time::{timeout, Duration}; use zebra_chain::parameters::{Network, POST_BLOSSOM_POW_TARGET_SPACING}; -use zebra_network::constants::{DEFAULT_CRAWL_NEW_PEER_INTERVAL, HANDSHAKE_TIMEOUT}; +use zebra_network::constants::{ + DEFAULT_CRAWL_NEW_PEER_INTERVAL, HANDSHAKE_TIMEOUT, INVENTORY_ROTATION_INTERVAL, +}; +use zebra_state::ChainTipSender; -use super::super::*; -use crate::config::ZebradConfig; +use crate::{ + components::sync::{ + ChainSync, BLOCK_DOWNLOAD_RETRY_LIMIT, BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT, + GENESIS_TIMEOUT_RETRY, SYNC_RESTART_DELAY, + }, + config::ZebradConfig, +}; /// Make sure the timeout values are consistent with each other. #[test] @@ -78,6 +86,20 @@ fn ensure_timeouts_consistent() { "a syncer tip crawl should complete before most new blocks" ); + // This is a compromise between two failure modes: + // - some peers have the inventory, but they weren't ready last time we checked, + // so we want to retry soon + // - all peers are missing the inventory, so we want to wait for a while before retrying + assert!( + INVENTORY_ROTATION_INTERVAL < SYNC_RESTART_DELAY, + "we should expire some inventory every time the syncer resets" + ); + assert!( + SYNC_RESTART_DELAY < 2 * INVENTORY_ROTATION_INTERVAL, + "we should give the syncer at least one retry attempt, \ + before we expire all inventory" + ); + // The default peer crawler interval should be at least // `HANDSHAKE_TIMEOUT` lower than all other crawler intervals. // @@ -133,7 +155,7 @@ fn request_genesis_is_rate_limited() { }); // create an empty latest chain tip - let (_sender, latest_chain_tip, _change) = zs::ChainTipSender::new(None, Network::Mainnet); + let (_sender, latest_chain_tip, _change) = ChainTipSender::new(None, Network::Mainnet); // create a verifier service that will always panic as it will never be called let verifier_service = diff --git a/zebrad/src/components/sync/tests/vectors.rs b/zebrad/src/components/sync/tests/vectors.rs index b0ad3257..871ffdfb 100644 --- a/zebrad/src/components/sync/tests/vectors.rs +++ b/zebrad/src/components/sync/tests/vectors.rs @@ -11,7 +11,7 @@ use zebra_chain::{ serialization::ZcashDeserializeInto, }; use zebra_consensus::Config as ConsensusConfig; -use zebra_network::ResponseStatus; +use zebra_network::InventoryResponse; use zebra_state::Config as StateConfig; use zebra_test::mock_service::{MockService, PanicAssertion}; @@ -26,7 +26,7 @@ use crate::{ config::ZebradConfig, }; -use ResponseStatus::*; +use InventoryResponse::*; /// Maximum time to wait for a request to any test service. ///