Test for message broadcast to the right number of peers (#3284)

* create a test for message broadcast to peers

* remove dbg comment

* use `prop_assert_eq`

* use a random number of peers with constant version

* fix some docs

* check total peers is equal to active peers

* increase coverage

Co-authored-by: teor <teor@riseup.net>
This commit is contained in:
Alfredo Garcia 2021-12-30 21:26:50 -03:00 committed by GitHub
parent d2f71f01d6
commit 5f772ebf65
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 100 additions and 10 deletions

View File

@ -20,7 +20,7 @@ pub use client::tests::ClientTestHarness;
#[cfg(not(test))]
use client::ClientRequest;
#[cfg(test)]
pub(crate) use client::ClientRequest;
pub(crate) use client::{tests::ReceiveRequestAttempt, ClientRequest};
use client::{ClientRequestReceiver, InProgressClientRequest, MustUseOneshotSender};

View File

@ -737,11 +737,16 @@ where
/// Broadcasts the same request to lots of ready peers, ignoring return values.
fn route_broadcast(&mut self, req: Request) -> <Self as tower::Service<Request>>::Future {
// Round up, so that if we have one ready peer, it gets the request
let half_ready_peers = (self.ready_services.len() + 1) / 2;
// Broadcasts ignore the response
self.route_multiple(req, half_ready_peers)
self.route_multiple(req, self.number_of_peers_to_broadcast())
}
/// Given a number of ready peers calculate to how many of them Zebra will
/// actually send the request to. Return this number.
pub(crate) fn number_of_peers_to_broadcast(&self) -> usize {
// We are currently sending broadcast messages to half of the total peers.
// Round up, so that if we have one ready peer, it gets the request.
(self.ready_services.len() + 1) / 2
}
/// Logs the peer set size.

View File

@ -4,13 +4,17 @@ use futures::FutureExt;
use proptest::prelude::*;
use tower::{discover::Discover, BoxError, ServiceExt};
use zebra_chain::{block, chain_tip::ChainTip, parameters::Network};
use zebra_chain::{
block, chain_tip::ChainTip, parameters::Network, serialization::ZcashDeserializeInto,
};
use super::{BlockHeightPairAcrossNetworkUpgrades, PeerSetBuilder, PeerVersions};
use crate::{
peer::{ClientTestHarness, LoadTrackedClient, MinimumPeerVersion},
constants::CURRENT_NETWORK_PROTOCOL_VERSION,
peer::{ClientTestHarness, LoadTrackedClient, MinimumPeerVersion, ReceiveRequestAttempt},
peer_set::PeerSet,
protocol::external::types::Version,
Request,
};
proptest! {
@ -90,17 +94,90 @@ proptest! {
Ok::<_, TestCaseError>(())
})?;
}
/// Test if requests are broadcasted to the right number of peers.
#[test]
fn broadcast_to_peers(
total_number_of_peers in (1..100usize)
) {
// Get a dummy block hash to help us construct a valid request to be broadcasted
let block: block::Block = zebra_test::vectors::BLOCK_MAINNET_10_BYTES
.zcash_deserialize_into()
.unwrap();
let block_hash = block::Hash::from(&block);
// Start the runtime
let runtime = zebra_test::init_async();
let _guard = runtime.enter();
let peer_versions = vec![CURRENT_NETWORK_PROTOCOL_VERSION; total_number_of_peers];
let peer_versions = PeerVersions {
peer_versions,
};
// Get peers and handles
let (discovered_peers, mut handles) = peer_versions.mock_peer_discovery();
let (minimum_peer_version, _best_tip_height) =
MinimumPeerVersion::with_mock_chain_tip(Network::Mainnet);
// Build a peerset
runtime.block_on(async move {
let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new()
.with_discover(discovered_peers)
.with_minimum_peer_version(minimum_peer_version.clone())
.build();
// Get the total number of active peers
let total_number_of_active_peers = check_if_only_up_to_date_peers_are_live(
&mut peer_set,
&mut handles,
CURRENT_NETWORK_PROTOCOL_VERSION,
)?;
// Since peer addresses are unique, and versions are valid, every peer should be active
prop_assert_eq!(total_number_of_peers, total_number_of_active_peers);
// Get the number of peers to broadcast
let number_of_peers_to_broadcast = peer_set.number_of_peers_to_broadcast();
// The number of peers to broadcast should be at least 1,
// and if possible, it should be less than the number of ready peers.
// (Since there are no requests, all active peers should be ready.)
prop_assert!(number_of_peers_to_broadcast >= 1);
if total_number_of_active_peers > 1 {
prop_assert!(number_of_peers_to_broadcast < total_number_of_active_peers);
}
// Send a request to all peers
let _ = peer_set.route_broadcast(Request::AdvertiseBlock(block_hash));
// Check how many peers received the request
let mut received = 0;
for mut h in handles {
if let ReceiveRequestAttempt::Request(client_request) = h.try_to_receive_outbound_client_request() {
prop_assert_eq!(client_request.request, Request::AdvertiseBlock(block_hash));
received += 1;
};
}
// Make sure the message was broadcasted to the right number of peers
prop_assert_eq!(received, number_of_peers_to_broadcast);
Ok::<_, TestCaseError>(())
})?;
}
}
/// Check if only peers with up-to-date protocol versions are live.
///
/// This will poll the `peer_set` to allow it to drop outdated peers, and then check the peer
/// `harnesses` to assert that only up-to-date peers are kept by the `peer_set`.
/// Returns the number of up-to-date peers on success.
fn check_if_only_up_to_date_peers_are_live<D, C>(
peer_set: &mut PeerSet<D, C>,
harnesses: &mut Vec<ClientTestHarness>,
minimum_version: Version,
) -> Result<(), TestCaseError>
) -> Result<usize, TestCaseError>
where
D: Discover<Key = SocketAddr, Service = LoadTrackedClient> + Unpin,
D::Error: Into<BoxError>,
@ -118,6 +195,7 @@ where
prop_assert!(matches!(poll_result, Some(Ok(_))));
}
let mut number_of_connected_peers = 0;
for harness in harnesses {
let is_outdated = harness.version() < minimum_version;
let is_connected = harness.wants_connection_heartbeats();
@ -128,7 +206,11 @@ where
is_connected,
is_outdated,
);
if is_connected {
number_of_connected_peers += 1;
}
}
Ok(())
Ok(number_of_connected_peers)
}

View File

@ -98,7 +98,10 @@ impl Version {
/// Returns the minimum specified network protocol version for `network` and
/// `network_upgrade`.
fn min_specified_for_upgrade(network: Network, network_upgrade: NetworkUpgrade) -> Version {
pub(crate) fn min_specified_for_upgrade(
network: Network,
network_upgrade: NetworkUpgrade,
) -> Version {
// TODO: Should we reject earlier protocol versions during our initial
// sync? zcashd accepts 170_002 or later during its initial sync.
Version(match (network, network_upgrade) {