Fix task handling bugs, so peers are more likely to be available (#3191)

* Tweak crawler timings so peers are more likely to be available

* Tweak min peer connection interval so we try all peers

* Let other tasks run between fanouts, so we're more likely to choose different peers

* Let other tasks run between retries, so we're more likely to choose different peers

* Let other tasks run after peer crawler DemandDrop

This makes it more likely that peers will become ready.
This commit is contained in:
teor 2021-12-20 09:02:31 +10:00 committed by GitHub
parent cb213210b1
commit 6cbd7dce43
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 534 additions and 415 deletions

View File

@ -138,7 +138,7 @@ pub(crate) const CONSENSUS_BRANCH_IDS: &[(NetworkUpgrade, ConsensusBranchId)] =
const PRE_BLOSSOM_POW_TARGET_SPACING: i64 = 150;
/// The target block spacing after Blossom activation.
const POST_BLOSSOM_POW_TARGET_SPACING: i64 = 75;
pub const POST_BLOSSOM_POW_TARGET_SPACING: i64 = 75;
/// The averaging window for difficulty threshold arithmetic mean calculations.
///

View File

@ -242,7 +242,7 @@ impl Default for Config {
network: Network::Mainnet,
initial_mainnet_peers: mainnet_peers,
initial_testnet_peers: testnet_peers,
crawl_new_peer_interval: Duration::from_secs(60),
crawl_new_peer_interval: constants::DEFAULT_CRAWL_NEW_PEER_INTERVAL,
// The default peerset target size should be large enough to ensure
// nodes have a reliable set of peers. But it should also be limited

View File

@ -64,7 +64,21 @@ pub const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(4);
/// This avoids explicit synchronization, but relies on the peer
/// connector actually setting up channels and these heartbeats in a
/// specific manner that matches up with this math.
pub const MIN_PEER_RECONNECTION_DELAY: Duration = Duration::from_secs(60 + 20 + 20 + 20);
pub const MIN_PEER_RECONNECTION_DELAY: Duration = Duration::from_secs(59 + 20 + 20 + 20);
/// The default peer address crawler interval.
///
/// This should be at least [`HANDSHAKE_TIMEOUT`](constants::HANDSHAKE_TIMEOUT)
/// lower than all other crawler intervals.
///
/// This makes the following sequence of events more likely:
/// 1. a peer address crawl,
/// 2. new peer connections,
/// 3. peer requests from other crawlers.
///
/// Using a prime number makes sure that peer address crawls
/// don't synchronise with other crawls.
pub const DEFAULT_CRAWL_NEW_PEER_INTERVAL: Duration = Duration::from_secs(61);
/// The maximum duration since a peer was last seen to consider it reachable.
///
@ -89,7 +103,9 @@ pub const MAX_RECENT_PEER_AGE: Duration32 = Duration32::from_days(3);
/// Regular interval for sending keepalive `Ping` messages to each
/// connected peer.
pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(60);
///
/// Using a prime number makes sure that heartbeats don't synchronise with crawls.
pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(59);
/// The minimum time between successive calls to [`CandidateSet::next()`][Self::next].
///
@ -97,15 +113,17 @@ pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(60);
///
/// Zebra resists distributed denial of service attacks by making sure that new peer connections
/// are initiated at least `MIN_PEER_CONNECTION_INTERVAL` apart.
pub const MIN_PEER_CONNECTION_INTERVAL: Duration = Duration::from_millis(100);
pub const MIN_PEER_CONNECTION_INTERVAL: Duration = Duration::from_millis(25);
/// The minimum time between successive calls to [`CandidateSet::update()`][Self::update].
///
/// Using a prime number makes sure that peer address crawls don't synchronise with other crawls.
///
/// ## Security
///
/// Zebra resists distributed denial of service attacks by making sure that requests for more
/// peer addresses are sent at least `MIN_PEER_GET_ADDR_INTERVAL` apart.
pub const MIN_PEER_GET_ADDR_INTERVAL: Duration = Duration::from_secs(30);
pub const MIN_PEER_GET_ADDR_INTERVAL: Duration = Duration::from_secs(31);
/// The combined timeout for all the requests in [`CandidateSet::update()`][Self::update].
///
@ -255,6 +273,8 @@ pub mod magics {
#[cfg(test)]
mod tests {
use std::convert::TryFrom;
use super::*;
/// This assures that the `Duration` value we are computing for
@ -287,6 +307,22 @@ mod tests {
assert!(EWMA_DECAY_TIME_NANOS > request_timeout_nanos,
"The EWMA decay time should be higher than the request timeout, so timed out peers are penalised by the EWMA.");
assert!(
u32::try_from(MAX_ADDRS_IN_ADDRESS_BOOK).expect("fits in u32")
* MIN_PEER_CONNECTION_INTERVAL
< MIN_PEER_RECONNECTION_DELAY,
"each peer should get at least one connection attempt in each connection interval",
);
assert!(
MIN_PEER_RECONNECTION_DELAY.as_secs()
/ (u32::try_from(MAX_ADDRS_IN_ADDRESS_BOOK).expect("fits in u32")
* MIN_PEER_CONNECTION_INTERVAL)
.as_secs()
<= 2,
"each peer should only have a few connection attempts in each connection interval",
);
}
/// Make sure that peer age limits are consistent with each other.

View File

@ -73,7 +73,7 @@ pub use crate::{
meta_addr::PeerAddrState,
peer::{HandshakeError, PeerError, SharedPeerError},
peer_set::init,
policies::{RetryErrors, RetryLimit},
policies::RetryLimit,
protocol::internal::{Request, Response},
};

View File

@ -259,9 +259,14 @@ where
let mut more_peers = None;
// Launch requests
//
// TODO: launch each fanout in its own task (might require tokio 1.6)
for _ in 0..fanout_limit {
for attempt in 0..fanout_limit {
if attempt > 0 {
// Let other tasks run, so we're more likely to choose a different peer.
//
// TODO: move fanouts into the PeerSet, so we always choose different peers (#2214)
tokio::task::yield_now().await;
}
let peer_service = self.peer_service.ready().await?;
responses.push(peer_service.call(Request::Peers));
}

View File

@ -696,7 +696,6 @@ where
// congested it can generate a lot of demand signal very
// rapidly.
trace!("too many open connections or in-flight handshakes, dropping demand signal");
continue;
}
DemandHandshake { candidate } => {
// Increment the connection count before we spawn the connection.

View File

@ -1,4 +1,6 @@
use futures::future;
use std::pin::Pin;
use futures::{Future, FutureExt};
use tower::retry::Policy;
/// A very basic retry policy with a limited number of retry attempts.
@ -19,14 +21,26 @@ impl RetryLimit {
}
impl<Req: Clone + std::fmt::Debug, Res, E: std::fmt::Debug> Policy<Req, Res, E> for RetryLimit {
type Future = future::Ready<Self>;
type Future = Pin<Box<dyn Future<Output = Self> + Send + 'static>>;
fn retry(&self, req: &Req, result: Result<&Res, &E>) -> Option<Self::Future> {
if let Err(e) = result {
if self.remaining_tries > 0 {
tracing::debug!(?req, ?e, remaining_tries = self.remaining_tries, "retrying");
Some(future::ready(RetryLimit {
remaining_tries: self.remaining_tries - 1,
}))
let remaining_tries = self.remaining_tries - 1;
Some(
async move {
// Let other tasks run, so we're more likely to choose a different peer,
// and so that any notfound inv entries win the race to the PeerSet.
//
// TODO: move syncer retries into the PeerSet,
// so we always choose different peers (#3235)
tokio::task::yield_now().await;
RetryLimit { remaining_tries }
}
.boxed(),
)
} else {
None
}
@ -39,24 +53,3 @@ impl<Req: Clone + std::fmt::Debug, Res, E: std::fmt::Debug> Policy<Req, Res, E>
Some(req.clone())
}
}
/// A very basic retry policy that always retries failed requests.
///
/// XXX remove this when https://github.com/tower-rs/tower/pull/414 lands.
#[derive(Clone, Debug)]
pub struct RetryErrors;
impl<Req: Clone, Res, E> Policy<Req, Res, E> for RetryErrors {
type Future = future::Ready<Self>;
fn retry(&self, _: &Req, result: Result<&Res, &E>) -> Option<Self::Future> {
if result.is_err() {
Some(future::ready(RetryErrors))
} else {
None
}
}
fn clone_request(&self, req: &Req) -> Option<Req> {
Some(req.clone())
}
}

View File

@ -69,7 +69,13 @@ mod tests;
const FANOUT: usize = 3;
/// The delay between crawl events.
const RATE_LIMIT_DELAY: Duration = Duration::from_secs(75);
///
/// This should be less than the target block interval,
/// so that we crawl peer mempools at least once per block.
///
/// Using a prime number makes sure that mempool crawler fanouts
/// don't synchronise with other crawls.
const RATE_LIMIT_DELAY: Duration = Duration::from_secs(73);
/// The time to wait for a peer response.
///
@ -194,7 +200,14 @@ where
let mut requests = FuturesUnordered::new();
// get readiness for one peer at a time, to avoid peer set contention
for _ in 0..FANOUT {
for attempt in 0..FANOUT {
if attempt > 0 {
// Let other tasks run, so we're more likely to choose a different peer.
//
// TODO: move fanouts into the PeerSet, so we always choose different peers (#2214)
tokio::task::yield_now().await;
}
let mut peer_set = peer_set.clone();
// end the task on permanent peer set errors
let peer_set = peer_set.ready().await?;

View File

@ -1,369 +1,4 @@
use std::{collections::HashSet, time::Duration};
//! Tests for the mempool crawler.
use proptest::{
collection::{hash_set, vec},
prelude::*,
};
use tokio::time;
use zebra_chain::{parameters::Network, transaction::UnminedTxId};
use zebra_network as zn;
use zebra_state::ChainTipSender;
use zebra_test::mock_service::{MockService, PropTestAssertion};
use crate::components::{
mempool::{
self,
crawler::{Crawler, SyncStatus, FANOUT, RATE_LIMIT_DELAY},
downloads::Gossip,
error::MempoolError,
Config,
},
sync::RecentSyncLengths,
};
/// The number of iterations to crawl while testing.
///
/// Note that this affects the total run time of the [`crawler_requests_for_transaction_ids`] test.
/// There are [`CRAWL_ITERATIONS`] requests that are expected to not be sent, so the test runs for
/// at least `CRAWL_ITERATIONS` times the timeout for receiving a request (see more information in
/// [`MockServiceBuilder::with_max_request_delay`]).
const CRAWL_ITERATIONS: usize = 4;
/// The maximum number of transactions crawled from a mocked peer.
const MAX_CRAWLED_TX: usize = 10;
/// The amount of time to advance beyond the expected instant that the crawler wakes up.
const ERROR_MARGIN: Duration = Duration::from_millis(100);
/// A [`MockService`] representing the network service.
type MockPeerSet = MockService<zn::Request, zn::Response, PropTestAssertion>;
/// A [`MockService`] representing the mempool service.
type MockMempool = MockService<mempool::Request, mempool::Response, PropTestAssertion>;
proptest! {
/// Test if crawler periodically crawls for transaction IDs.
///
/// The crawler should periodically perform a fanned-out series of requests to obtain
/// transaction IDs from other peers. These requests should only be sent if the mempool is
/// enabled, i.e., if the block synchronizer is likely close to the chain tip.
#[test]
fn crawler_requests_for_transaction_ids(mut sync_lengths in any::<Vec<usize>>()) {
let runtime = zebra_test::init_async();
// Add a dummy last element, so that all of the original values are used.
sync_lengths.push(0);
runtime.block_on(async move {
let (mut peer_set, _mempool, sync_status, mut recent_sync_lengths, _chain_tip_sender,
) = setup_crawler();
time::pause();
for sync_length in sync_lengths {
let mempool_is_enabled = sync_status.is_close_to_tip();
for _ in 0..CRAWL_ITERATIONS {
for _ in 0..FANOUT {
if mempool_is_enabled {
respond_with_transaction_ids(&mut peer_set, HashSet::new()).await?;
} else {
peer_set.expect_no_requests().await?;
}
}
peer_set.expect_no_requests().await?;
time::sleep(RATE_LIMIT_DELAY + ERROR_MARGIN).await;
}
// Applying the update event at the end of the test iteration means that the first
// iteration runs with an empty recent sync. lengths vector. A dummy element is
// appended to the events so that all of the original values are applied.
recent_sync_lengths.push_extend_tips_length(sync_length);
}
Ok::<(), TestCaseError>(())
})?;
}
/// Test if crawled transactions are forwarded to the [`Mempool`][mempool::Mempool] service.
///
/// The transaction IDs sent by other peers to the crawler should be forwarded to the
/// [`Mempool`][mempool::Mempool] service so that they can be downloaded, verified and added to
/// the mempool.
#[test]
fn crawled_transactions_are_forwarded_to_downloader(
transaction_ids in hash_set(any::<UnminedTxId>(), 1..MAX_CRAWLED_TX),
) {
let runtime = zebra_test::init_async();
let transaction_id_count = transaction_ids.len();
runtime.block_on(async move {
let (mut peer_set, mut mempool, _sync_status, mut recent_sync_lengths, _chain_tip_sender) =
setup_crawler();
time::pause();
// Mock end of chain sync to enable the mempool crawler.
SyncStatus::sync_close_to_tip(&mut recent_sync_lengths);
crawler_iteration(&mut peer_set, vec![transaction_ids.clone()]).await?;
respond_to_queue_request(
&mut mempool,
transaction_ids,
vec![Ok(()); transaction_id_count],
).await?;
mempool.expect_no_requests().await?;
Ok::<(), TestCaseError>(())
})?;
}
/// Test if errors while forwarding transaction IDs do not stop the crawler.
///
/// The crawler should continue operating normally if some transactions fail to download or
/// even if the mempool service fails to enqueue the transactions to be downloaded.
#[test]
fn transaction_id_forwarding_errors_dont_stop_the_crawler(
service_call_error in any::<MempoolError>(),
transaction_ids_for_call_failure in hash_set(any::<UnminedTxId>(), 1..MAX_CRAWLED_TX),
transaction_ids_and_responses in
vec(any::<(UnminedTxId, Result<(), MempoolError>)>(), 1..MAX_CRAWLED_TX),
transaction_ids_for_return_to_normal in hash_set(any::<UnminedTxId>(), 1..MAX_CRAWLED_TX),
) {
let runtime = zebra_test::init_async();
// Make transaction_ids_and_responses unique
let unique_transaction_ids_and_responses: HashSet<UnminedTxId> = transaction_ids_and_responses.iter().map(|(id, _result)| id).copied().collect();
let transaction_ids_and_responses: Vec<(UnminedTxId, Result<(), MempoolError>)> = unique_transaction_ids_and_responses.iter().map(|unique_id| transaction_ids_and_responses.iter().find(|(id, _result)| id == unique_id).unwrap()).cloned().collect();
runtime.block_on(async move {
let (mut peer_set, mut mempool, _sync_status, mut recent_sync_lengths, _chain_tip_sender) =
setup_crawler();
time::pause();
// Mock end of chain sync to enable the mempool crawler.
SyncStatus::sync_close_to_tip(&mut recent_sync_lengths);
// Prepare to simulate download errors.
let download_result_count = transaction_ids_and_responses.len();
let mut transaction_ids_for_download_errors = HashSet::with_capacity(download_result_count);
let mut download_result_list = Vec::with_capacity(download_result_count);
for (transaction_id, result) in transaction_ids_and_responses {
transaction_ids_for_download_errors.insert(transaction_id);
download_result_list.push(result);
}
// First crawl iteration:
// 1. Fails with a mempool call error
// 2. Some downloads fail
// Rest: no crawled transactions
crawler_iteration(
&mut peer_set,
vec![
transaction_ids_for_call_failure.clone(),
transaction_ids_for_download_errors.clone(),
],
)
.await?;
// First test with an error returned from the Mempool service.
respond_to_queue_request_with_error(
&mut mempool,
transaction_ids_for_call_failure,
service_call_error,
).await?;
// Then test a failure to download transactions.
respond_to_queue_request(
&mut mempool,
transaction_ids_for_download_errors,
download_result_list,
).await?;
mempool.expect_no_requests().await?;
// Wait until next crawl iteration.
time::sleep(RATE_LIMIT_DELAY).await;
// Second crawl iteration:
// The mempool should continue crawling normally.
crawler_iteration(
&mut peer_set,
vec![transaction_ids_for_return_to_normal.clone()],
)
.await?;
let response_list = vec![Ok(()); transaction_ids_for_return_to_normal.len()];
respond_to_queue_request(
&mut mempool,
transaction_ids_for_return_to_normal,
response_list,
).await?;
mempool.expect_no_requests().await?;
Ok::<(), TestCaseError>(())
})?;
}
}
/// Spawn a crawler instance using mock services.
fn setup_crawler() -> (
MockPeerSet,
MockMempool,
SyncStatus,
RecentSyncLengths,
ChainTipSender,
) {
let peer_set = MockService::build().for_prop_tests();
let mempool = MockService::build().for_prop_tests();
let (sync_status, recent_sync_lengths) = SyncStatus::new();
// the network should be irrelevant here
let (chain_tip_sender, _latest_chain_tip, chain_tip_change) =
ChainTipSender::new(None, Network::Mainnet);
Crawler::spawn(
&Config::default(),
peer_set.clone(),
mempool.clone(),
sync_status.clone(),
chain_tip_change,
);
(
peer_set,
mempool,
sync_status,
recent_sync_lengths,
chain_tip_sender,
)
}
/// Intercept a request for mempool transaction IDs and respond with the `transaction_ids` list.
async fn respond_with_transaction_ids(
peer_set: &mut MockPeerSet,
transaction_ids: HashSet<UnminedTxId>,
) -> Result<(), TestCaseError> {
peer_set
.expect_request(zn::Request::MempoolTransactionIds)
.await?
.respond(zn::Response::TransactionIds(
transaction_ids.into_iter().collect(),
));
Ok(())
}
/// Intercept fanned-out requests for mempool transaction IDs and answer with the `responses`.
///
/// Each item in `responses` is a list of transaction IDs to send back to a single request.
/// Therefore, each item represents the response sent by a peer in the network.
///
/// If there are less items in `responses` the [`FANOUT`] number, then the remaining requests are
/// answered with an empty list of transaction IDs.
///
/// # Panics
///
/// If `responses` contains more items than the [`FANOUT`] number.
async fn crawler_iteration(
peer_set: &mut MockPeerSet,
responses: Vec<HashSet<UnminedTxId>>,
) -> Result<(), TestCaseError> {
let empty_responses = FANOUT
.checked_sub(responses.len())
.expect("Too many responses to be sent in a single crawl iteration");
for response in responses {
respond_with_transaction_ids(peer_set, response).await?;
}
for _ in 0..empty_responses {
respond_with_transaction_ids(peer_set, HashSet::new()).await?;
}
peer_set.expect_no_requests().await?;
Ok(())
}
/// Intercept request for mempool to download and verify transactions.
///
/// The intercepted request will be verified to check if it has the `expected_transaction_ids`, and
/// it will be answered with a list of results, one for each transaction requested to be
/// downloaded.
///
/// # Panics
///
/// If `response` and `expected_transaction_ids` have different sizes.
async fn respond_to_queue_request(
mempool: &mut MockMempool,
expected_transaction_ids: HashSet<UnminedTxId>,
response: Vec<Result<(), MempoolError>>,
) -> Result<(), TestCaseError> {
mempool
.expect_request_that(|req| {
if let mempool::Request::Queue(req) = req {
let ids: HashSet<UnminedTxId> = req
.iter()
.filter_map(|gossip| {
if let Gossip::Id(id) = gossip {
Some(*id)
} else {
None
}
})
.collect();
ids == expected_transaction_ids
} else {
false
}
})
.await?
.respond(mempool::Response::Queued(response));
Ok(())
}
/// Intercept request for mempool to download and verify transactions, and answer with an error.
///
/// The intercepted request will be verified to check if it has the `expected_transaction_ids`, and
/// it will be answered with `error`, as if the service had an internal failure that prevented it
/// from queuing the transactions for downloading.
async fn respond_to_queue_request_with_error(
mempool: &mut MockMempool,
expected_transaction_ids: HashSet<UnminedTxId>,
error: MempoolError,
) -> Result<(), TestCaseError> {
mempool
.expect_request_that(|req| {
if let mempool::Request::Queue(req) = req {
let ids: HashSet<UnminedTxId> = req
.iter()
.filter_map(|gossip| {
if let Gossip::Id(id) = gossip {
Some(*id)
} else {
None
}
})
.collect();
ids == expected_transaction_ids
} else {
false
}
})
.await?
.respond(Err(error));
Ok(())
}
mod prop;
mod timing;

View File

@ -0,0 +1,371 @@
//! Randomised property tests for the mempool crawler.
use std::{collections::HashSet, time::Duration};
use proptest::{
collection::{hash_set, vec},
prelude::*,
};
use tokio::time;
use zebra_chain::{parameters::Network, transaction::UnminedTxId};
use zebra_network as zn;
use zebra_state::ChainTipSender;
use zebra_test::mock_service::{MockService, PropTestAssertion};
use crate::components::{
mempool::{
self,
crawler::{Crawler, SyncStatus, FANOUT, RATE_LIMIT_DELAY},
downloads::Gossip,
error::MempoolError,
Config,
},
sync::RecentSyncLengths,
};
/// The number of iterations to crawl while testing.
///
/// Note that this affects the total run time of the [`crawler_requests_for_transaction_ids`] test.
/// There are [`CRAWL_ITERATIONS`] requests that are expected to not be sent, so the test runs for
/// at least `CRAWL_ITERATIONS` times the timeout for receiving a request (see more information in
/// [`MockServiceBuilder::with_max_request_delay`]).
const CRAWL_ITERATIONS: usize = 4;
/// The maximum number of transactions crawled from a mocked peer.
const MAX_CRAWLED_TX: usize = 10;
/// The amount of time to advance beyond the expected instant that the crawler wakes up.
const ERROR_MARGIN: Duration = Duration::from_millis(100);
/// A [`MockService`] representing the network service.
type MockPeerSet = MockService<zn::Request, zn::Response, PropTestAssertion>;
/// A [`MockService`] representing the mempool service.
type MockMempool = MockService<mempool::Request, mempool::Response, PropTestAssertion>;
proptest! {
/// Test if crawler periodically crawls for transaction IDs.
///
/// The crawler should periodically perform a fanned-out series of requests to obtain
/// transaction IDs from other peers. These requests should only be sent if the mempool is
/// enabled, i.e., if the block synchronizer is likely close to the chain tip.
#[test]
fn crawler_requests_for_transaction_ids(mut sync_lengths in any::<Vec<usize>>()) {
let runtime = zebra_test::init_async();
// Add a dummy last element, so that all of the original values are used.
sync_lengths.push(0);
runtime.block_on(async move {
let (mut peer_set, _mempool, sync_status, mut recent_sync_lengths, _chain_tip_sender,
) = setup_crawler();
time::pause();
for sync_length in sync_lengths {
let mempool_is_enabled = sync_status.is_close_to_tip();
for _ in 0..CRAWL_ITERATIONS {
for _ in 0..FANOUT {
if mempool_is_enabled {
respond_with_transaction_ids(&mut peer_set, HashSet::new()).await?;
} else {
peer_set.expect_no_requests().await?;
}
}
peer_set.expect_no_requests().await?;
time::sleep(RATE_LIMIT_DELAY + ERROR_MARGIN).await;
}
// Applying the update event at the end of the test iteration means that the first
// iteration runs with an empty recent sync. lengths vector. A dummy element is
// appended to the events so that all of the original values are applied.
recent_sync_lengths.push_extend_tips_length(sync_length);
}
Ok::<(), TestCaseError>(())
})?;
}
/// Test if crawled transactions are forwarded to the [`Mempool`][mempool::Mempool] service.
///
/// The transaction IDs sent by other peers to the crawler should be forwarded to the
/// [`Mempool`][mempool::Mempool] service so that they can be downloaded, verified and added to
/// the mempool.
#[test]
fn crawled_transactions_are_forwarded_to_downloader(
transaction_ids in hash_set(any::<UnminedTxId>(), 1..MAX_CRAWLED_TX),
) {
let runtime = zebra_test::init_async();
let transaction_id_count = transaction_ids.len();
runtime.block_on(async move {
let (mut peer_set, mut mempool, _sync_status, mut recent_sync_lengths, _chain_tip_sender) =
setup_crawler();
time::pause();
// Mock end of chain sync to enable the mempool crawler.
SyncStatus::sync_close_to_tip(&mut recent_sync_lengths);
crawler_iteration(&mut peer_set, vec![transaction_ids.clone()]).await?;
respond_to_queue_request(
&mut mempool,
transaction_ids,
vec![Ok(()); transaction_id_count],
).await?;
mempool.expect_no_requests().await?;
Ok::<(), TestCaseError>(())
})?;
}
/// Test if errors while forwarding transaction IDs do not stop the crawler.
///
/// The crawler should continue operating normally if some transactions fail to download or
/// even if the mempool service fails to enqueue the transactions to be downloaded.
#[test]
fn transaction_id_forwarding_errors_dont_stop_the_crawler(
service_call_error in any::<MempoolError>(),
transaction_ids_for_call_failure in hash_set(any::<UnminedTxId>(), 1..MAX_CRAWLED_TX),
transaction_ids_and_responses in
vec(any::<(UnminedTxId, Result<(), MempoolError>)>(), 1..MAX_CRAWLED_TX),
transaction_ids_for_return_to_normal in hash_set(any::<UnminedTxId>(), 1..MAX_CRAWLED_TX),
) {
let runtime = zebra_test::init_async();
// Make transaction_ids_and_responses unique
let unique_transaction_ids_and_responses: HashSet<UnminedTxId> = transaction_ids_and_responses.iter().map(|(id, _result)| id).copied().collect();
let transaction_ids_and_responses: Vec<(UnminedTxId, Result<(), MempoolError>)> = unique_transaction_ids_and_responses.iter().map(|unique_id| transaction_ids_and_responses.iter().find(|(id, _result)| id == unique_id).unwrap()).cloned().collect();
runtime.block_on(async move {
let (mut peer_set, mut mempool, _sync_status, mut recent_sync_lengths, _chain_tip_sender) =
setup_crawler();
time::pause();
// Mock end of chain sync to enable the mempool crawler.
SyncStatus::sync_close_to_tip(&mut recent_sync_lengths);
// Prepare to simulate download errors.
let download_result_count = transaction_ids_and_responses.len();
let mut transaction_ids_for_download_errors = HashSet::with_capacity(download_result_count);
let mut download_result_list = Vec::with_capacity(download_result_count);
for (transaction_id, result) in transaction_ids_and_responses {
transaction_ids_for_download_errors.insert(transaction_id);
download_result_list.push(result);
}
// First crawl iteration:
// 1. Fails with a mempool call error
// 2. Some downloads fail
// Rest: no crawled transactions
crawler_iteration(
&mut peer_set,
vec![
transaction_ids_for_call_failure.clone(),
transaction_ids_for_download_errors.clone(),
],
)
.await?;
// First test with an error returned from the Mempool service.
respond_to_queue_request_with_error(
&mut mempool,
transaction_ids_for_call_failure,
service_call_error,
).await?;
// Then test a failure to download transactions.
respond_to_queue_request(
&mut mempool,
transaction_ids_for_download_errors,
download_result_list,
).await?;
mempool.expect_no_requests().await?;
// Wait until next crawl iteration.
time::sleep(RATE_LIMIT_DELAY).await;
// Second crawl iteration:
// The mempool should continue crawling normally.
crawler_iteration(
&mut peer_set,
vec![transaction_ids_for_return_to_normal.clone()],
)
.await?;
let response_list = vec![Ok(()); transaction_ids_for_return_to_normal.len()];
respond_to_queue_request(
&mut mempool,
transaction_ids_for_return_to_normal,
response_list,
).await?;
mempool.expect_no_requests().await?;
Ok::<(), TestCaseError>(())
})?;
}
}
/// Spawn a crawler instance using mock services.
fn setup_crawler() -> (
MockPeerSet,
MockMempool,
SyncStatus,
RecentSyncLengths,
ChainTipSender,
) {
let peer_set = MockService::build().for_prop_tests();
let mempool = MockService::build().for_prop_tests();
let (sync_status, recent_sync_lengths) = SyncStatus::new();
// the network should be irrelevant here
let (chain_tip_sender, _latest_chain_tip, chain_tip_change) =
ChainTipSender::new(None, Network::Mainnet);
Crawler::spawn(
&Config::default(),
peer_set.clone(),
mempool.clone(),
sync_status.clone(),
chain_tip_change,
);
(
peer_set,
mempool,
sync_status,
recent_sync_lengths,
chain_tip_sender,
)
}
/// Intercept a request for mempool transaction IDs and respond with the `transaction_ids` list.
async fn respond_with_transaction_ids(
peer_set: &mut MockPeerSet,
transaction_ids: HashSet<UnminedTxId>,
) -> Result<(), TestCaseError> {
peer_set
.expect_request(zn::Request::MempoolTransactionIds)
.await?
.respond(zn::Response::TransactionIds(
transaction_ids.into_iter().collect(),
));
Ok(())
}
/// Intercept fanned-out requests for mempool transaction IDs and answer with the `responses`.
///
/// Each item in `responses` is a list of transaction IDs to send back to a single request.
/// Therefore, each item represents the response sent by a peer in the network.
///
/// If there are less items in `responses` the [`FANOUT`] number, then the remaining requests are
/// answered with an empty list of transaction IDs.
///
/// # Panics
///
/// If `responses` contains more items than the [`FANOUT`] number.
async fn crawler_iteration(
peer_set: &mut MockPeerSet,
responses: Vec<HashSet<UnminedTxId>>,
) -> Result<(), TestCaseError> {
let empty_responses = FANOUT
.checked_sub(responses.len())
.expect("Too many responses to be sent in a single crawl iteration");
for response in responses {
respond_with_transaction_ids(peer_set, response).await?;
}
for _ in 0..empty_responses {
respond_with_transaction_ids(peer_set, HashSet::new()).await?;
}
peer_set.expect_no_requests().await?;
Ok(())
}
/// Intercept request for mempool to download and verify transactions.
///
/// The intercepted request will be verified to check if it has the `expected_transaction_ids`, and
/// it will be answered with a list of results, one for each transaction requested to be
/// downloaded.
///
/// # Panics
///
/// If `response` and `expected_transaction_ids` have different sizes.
async fn respond_to_queue_request(
mempool: &mut MockMempool,
expected_transaction_ids: HashSet<UnminedTxId>,
response: Vec<Result<(), MempoolError>>,
) -> Result<(), TestCaseError> {
mempool
.expect_request_that(|req| {
if let mempool::Request::Queue(req) = req {
let ids: HashSet<UnminedTxId> = req
.iter()
.filter_map(|gossip| {
if let Gossip::Id(id) = gossip {
Some(*id)
} else {
None
}
})
.collect();
ids == expected_transaction_ids
} else {
false
}
})
.await?
.respond(mempool::Response::Queued(response));
Ok(())
}
/// Intercept request for mempool to download and verify transactions, and answer with an error.
///
/// The intercepted request will be verified to check if it has the `expected_transaction_ids`, and
/// it will be answered with `error`, as if the service had an internal failure that prevented it
/// from queuing the transactions for downloading.
async fn respond_to_queue_request_with_error(
mempool: &mut MockMempool,
expected_transaction_ids: HashSet<UnminedTxId>,
error: MempoolError,
) -> Result<(), TestCaseError> {
mempool
.expect_request_that(|req| {
if let mempool::Request::Queue(req) = req {
let ids: HashSet<UnminedTxId> = req
.iter()
.filter_map(|gossip| {
if let Gossip::Id(id) = gossip {
Some(*id)
} else {
None
}
})
.collect();
ids == expected_transaction_ids
} else {
false
}
})
.await?
.respond(Err(error));
Ok(())
}

View File

@ -0,0 +1,29 @@
//! Timing tests for the mempool crawler.
use std::convert::TryInto;
use zebra_chain::parameters::POST_BLOSSOM_POW_TARGET_SPACING;
use zebra_network::constants::{DEFAULT_CRAWL_NEW_PEER_INTERVAL, HANDSHAKE_TIMEOUT};
use crate::components::mempool::crawler::RATE_LIMIT_DELAY;
#[test]
fn ensure_timing_consistent() {
assert!(
RATE_LIMIT_DELAY.as_secs()
< POST_BLOSSOM_POW_TARGET_SPACING
.try_into()
.expect("not negative"),
"a mempool crawl should complete before most new blocks"
);
// The default peer crawler interval should be at least
// `HANDSHAKE_TIMEOUT` lower than all other crawler intervals.
//
// See `DEFAULT_CRAWL_NEW_PEER_INTERVAL` for details.
assert!(
DEFAULT_CRAWL_NEW_PEER_INTERVAL.as_secs() + HANDSHAKE_TIMEOUT.as_secs()
< RATE_LIMIT_DELAY.as_secs(),
"an address crawl and peer connections should complete before most syncer restarts"
);
}

View File

@ -156,12 +156,14 @@ pub(super) const BLOCK_VERIFY_TIMEOUT: Duration = Duration::from_secs(180);
/// This delay is particularly important on instances with slow or unreliable
/// networks, and on testnet, which has a small number of slow peers.
///
/// Using a prime number makes sure that syncer fanouts don't synchronise with other crawls.
///
/// ## Correctness
///
/// If this delay is removed (or set too low), the syncer will
/// sometimes get stuck in a failure loop, due to leftover downloads from
/// previous sync runs.
const SYNC_RESTART_DELAY: Duration = Duration::from_secs(61);
const SYNC_RESTART_DELAY: Duration = Duration::from_secs(67);
/// Controls how long we wait to retry a failed attempt to download
/// and verify the genesis block.
@ -433,7 +435,14 @@ where
tracing::debug!(?block_locator, "got block locator");
let mut requests = FuturesUnordered::new();
for _ in 0..FANOUT {
for attempt in 0..FANOUT {
if attempt > 0 {
// Let other tasks run, so we're more likely to choose a different peer.
//
// TODO: move fanouts into the PeerSet, so we always choose different peers (#2214)
tokio::task::yield_now().await;
}
requests.push(self.tip_network.ready().await.map_err(|e| eyre!(e))?.call(
zn::Request::FindBlocks {
known_blocks: block_locator.clone(),
@ -552,7 +561,14 @@ where
for tip in tips {
tracing::debug!(?tip, "asking peers to extend chain tip");
let mut responses = FuturesUnordered::new();
for _ in 0..FANOUT {
for attempt in 0..FANOUT {
if attempt > 0 {
// Let other tasks run, so we're more likely to choose a different peer.
//
// TODO: move fanouts into the PeerSet, so we always choose different peers (#2214)
tokio::task::yield_now().await;
}
responses.push(self.tip_network.ready().await.map_err(|e| eyre!(e))?.call(
zn::Request::FindBlocks {
known_blocks: vec![tip.tip],

View File

@ -1,12 +1,16 @@
use std::sync::{
atomic::{AtomicU8, Ordering},
Arc,
use std::{
convert::TryInto,
sync::{
atomic::{AtomicU8, Ordering},
Arc,
},
};
use futures::future;
use tokio::time::{timeout, Duration};
use zebra_chain::parameters::Network;
use zebra_chain::parameters::{Network, POST_BLOSSOM_POW_TARGET_SPACING};
use zebra_network::constants::{DEFAULT_CRAWL_NEW_PEER_INTERVAL, HANDSHAKE_TIMEOUT};
use super::super::*;
use crate::config::ZebradConfig;
@ -61,10 +65,28 @@ fn ensure_timeouts_consistent() {
// This constraint makes genesis retries more likely to succeed
assert!(
GENESIS_TIMEOUT_RETRY.as_secs() > zebra_network::constants::HANDSHAKE_TIMEOUT.as_secs()
GENESIS_TIMEOUT_RETRY.as_secs() > HANDSHAKE_TIMEOUT.as_secs()
&& GENESIS_TIMEOUT_RETRY.as_secs() < BLOCK_DOWNLOAD_TIMEOUT.as_secs(),
"Genesis retries should wait for new peers, but they shouldn't wait too long"
);
assert!(
SYNC_RESTART_DELAY.as_secs()
< POST_BLOSSOM_POW_TARGET_SPACING
.try_into()
.expect("not negative"),
"a syncer tip crawl should complete before most new blocks"
);
// The default peer crawler interval should be at least
// `HANDSHAKE_TIMEOUT` lower than all other crawler intervals.
//
// See `DEFAULT_CRAWL_NEW_PEER_INTERVAL` for details.
assert!(
DEFAULT_CRAWL_NEW_PEER_INTERVAL.as_secs() + HANDSHAKE_TIMEOUT.as_secs()
< SYNC_RESTART_DELAY.as_secs(),
"an address crawl and peer connections should complete before most syncer tips crawls"
);
}
/// Test that calls to [`ChainSync::request_genesis`] are rate limited.