diff --git a/zebra-network/src/peer.rs b/zebra-network/src/peer.rs index 70ed31db..6218e69e 100644 --- a/zebra-network/src/peer.rs +++ b/zebra-network/src/peer.rs @@ -20,7 +20,7 @@ pub use client::tests::ClientTestHarness; #[cfg(not(test))] use client::ClientRequest; #[cfg(test)] -pub(crate) use client::ClientRequest; +pub(crate) use client::{tests::ReceiveRequestAttempt, ClientRequest}; use client::{ClientRequestReceiver, InProgressClientRequest, MustUseOneshotSender}; diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index 11ebb837..1d33d93d 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -737,11 +737,16 @@ where /// Broadcasts the same request to lots of ready peers, ignoring return values. fn route_broadcast(&mut self, req: Request) -> >::Future { - // Round up, so that if we have one ready peer, it gets the request - let half_ready_peers = (self.ready_services.len() + 1) / 2; - // Broadcasts ignore the response - self.route_multiple(req, half_ready_peers) + self.route_multiple(req, self.number_of_peers_to_broadcast()) + } + + /// Given a number of ready peers calculate to how many of them Zebra will + /// actually send the request to. Return this number. + pub(crate) fn number_of_peers_to_broadcast(&self) -> usize { + // We are currently sending broadcast messages to half of the total peers. + // Round up, so that if we have one ready peer, it gets the request. + (self.ready_services.len() + 1) / 2 } /// Logs the peer set size. diff --git a/zebra-network/src/peer_set/set/tests/prop.rs b/zebra-network/src/peer_set/set/tests/prop.rs index 9b820c92..b803444c 100644 --- a/zebra-network/src/peer_set/set/tests/prop.rs +++ b/zebra-network/src/peer_set/set/tests/prop.rs @@ -4,13 +4,17 @@ use futures::FutureExt; use proptest::prelude::*; use tower::{discover::Discover, BoxError, ServiceExt}; -use zebra_chain::{block, chain_tip::ChainTip, parameters::Network}; +use zebra_chain::{ + block, chain_tip::ChainTip, parameters::Network, serialization::ZcashDeserializeInto, +}; use super::{BlockHeightPairAcrossNetworkUpgrades, PeerSetBuilder, PeerVersions}; use crate::{ - peer::{ClientTestHarness, LoadTrackedClient, MinimumPeerVersion}, + constants::CURRENT_NETWORK_PROTOCOL_VERSION, + peer::{ClientTestHarness, LoadTrackedClient, MinimumPeerVersion, ReceiveRequestAttempt}, peer_set::PeerSet, protocol::external::types::Version, + Request, }; proptest! { @@ -90,17 +94,90 @@ proptest! { Ok::<_, TestCaseError>(()) })?; } + + /// Test if requests are broadcasted to the right number of peers. + #[test] + fn broadcast_to_peers( + total_number_of_peers in (1..100usize) + ) { + // Get a dummy block hash to help us construct a valid request to be broadcasted + let block: block::Block = zebra_test::vectors::BLOCK_MAINNET_10_BYTES + .zcash_deserialize_into() + .unwrap(); + let block_hash = block::Hash::from(&block); + + // Start the runtime + let runtime = zebra_test::init_async(); + let _guard = runtime.enter(); + + let peer_versions = vec![CURRENT_NETWORK_PROTOCOL_VERSION; total_number_of_peers]; + let peer_versions = PeerVersions { + peer_versions, + }; + + // Get peers and handles + let (discovered_peers, mut handles) = peer_versions.mock_peer_discovery(); + let (minimum_peer_version, _best_tip_height) = + MinimumPeerVersion::with_mock_chain_tip(Network::Mainnet); + + // Build a peerset + runtime.block_on(async move { + let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new() + .with_discover(discovered_peers) + .with_minimum_peer_version(minimum_peer_version.clone()) + .build(); + + // Get the total number of active peers + let total_number_of_active_peers = check_if_only_up_to_date_peers_are_live( + &mut peer_set, + &mut handles, + CURRENT_NETWORK_PROTOCOL_VERSION, + )?; + + // Since peer addresses are unique, and versions are valid, every peer should be active + prop_assert_eq!(total_number_of_peers, total_number_of_active_peers); + + // Get the number of peers to broadcast + let number_of_peers_to_broadcast = peer_set.number_of_peers_to_broadcast(); + + // The number of peers to broadcast should be at least 1, + // and if possible, it should be less than the number of ready peers. + // (Since there are no requests, all active peers should be ready.) + prop_assert!(number_of_peers_to_broadcast >= 1); + if total_number_of_active_peers > 1 { + prop_assert!(number_of_peers_to_broadcast < total_number_of_active_peers); + } + + // Send a request to all peers + let _ = peer_set.route_broadcast(Request::AdvertiseBlock(block_hash)); + + // Check how many peers received the request + let mut received = 0; + for mut h in handles { + if let ReceiveRequestAttempt::Request(client_request) = h.try_to_receive_outbound_client_request() { + prop_assert_eq!(client_request.request, Request::AdvertiseBlock(block_hash)); + received += 1; + }; + } + + // Make sure the message was broadcasted to the right number of peers + prop_assert_eq!(received, number_of_peers_to_broadcast); + + Ok::<_, TestCaseError>(()) + })?; + } } /// Check if only peers with up-to-date protocol versions are live. /// /// This will poll the `peer_set` to allow it to drop outdated peers, and then check the peer /// `harnesses` to assert that only up-to-date peers are kept by the `peer_set`. +/// Returns the number of up-to-date peers on success. fn check_if_only_up_to_date_peers_are_live( peer_set: &mut PeerSet, harnesses: &mut Vec, minimum_version: Version, -) -> Result<(), TestCaseError> +) -> Result where D: Discover + Unpin, D::Error: Into, @@ -118,6 +195,7 @@ where prop_assert!(matches!(poll_result, Some(Ok(_)))); } + let mut number_of_connected_peers = 0; for harness in harnesses { let is_outdated = harness.version() < minimum_version; let is_connected = harness.wants_connection_heartbeats(); @@ -128,7 +206,11 @@ where is_connected, is_outdated, ); + + if is_connected { + number_of_connected_peers += 1; + } } - Ok(()) + Ok(number_of_connected_peers) } diff --git a/zebra-network/src/protocol/external/types.rs b/zebra-network/src/protocol/external/types.rs index f404835e..8e8599d5 100644 --- a/zebra-network/src/protocol/external/types.rs +++ b/zebra-network/src/protocol/external/types.rs @@ -98,7 +98,10 @@ impl Version { /// Returns the minimum specified network protocol version for `network` and /// `network_upgrade`. - fn min_specified_for_upgrade(network: Network, network_upgrade: NetworkUpgrade) -> Version { + pub(crate) fn min_specified_for_upgrade( + network: Network, + network_upgrade: NetworkUpgrade, + ) -> Version { // TODO: Should we reject earlier protocol versions during our initial // sync? zcashd accepts 170_002 or later during its initial sync. Version(match (network, network_upgrade) {