ZIP-401: weighted random mempool eviction (#2889)

* ZIP-401 weighted random mempool eviction

* rename zcash.mempool.total_cost.bytes to zcash.mempool.cost.bytes

Co-authored-by: teor <teor@riseup.net>

* Remove duplicated lines

* Add cost() method to UnminedTx

Update serialization failure messages

* More docs quoting ZIP-401 rules

* Change mempool::Storage::new() to handle Copy-less HashMap, HashSet

* mempool: tidy cost types and evict_one()

* More consensus rule docs

* Refactor calculating mempool costs for Unmined transactions

* Add a note on asympotic performance of calculating weights of txs in mempool

* Bump test mempool / storage config to avoid weighted random cost limits

* Use mempool tx_cost_limit = u64::MAX for some tests

* Remove failing tests for now

* Allow(clippy::field-reassign-with-default) because of a move on a type that doesn't impl Copy

* Fix mistaken doctest formatting

Co-authored-by: Conrado Gouvea <conrado@zfnd.org>

* Increase test timeout for Windows builds

Co-authored-by: teor <teor@riseup.net>
Co-authored-by: Conrado Gouvea <conrado@zfnd.org>
This commit is contained in:
Deirdre Connolly 2021-10-26 20:21:19 -04:00 committed by GitHub
parent a166964a34
commit 0381c2347b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 207 additions and 411 deletions

View File

@ -12,7 +12,7 @@ jobs:
test:
name: Test (+${{ matrix.rust }}) on ${{ matrix.os }}
# The large timeout is to accommodate Windows builds
timeout-minutes: 60
timeout-minutes: 75
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false

1
Cargo.lock generated
View File

@ -4984,6 +4984,7 @@ dependencies = [
"pin-project 1.0.7",
"proptest",
"proptest-derive",
"rand 0.8.4",
"regex",
"semver 1.0.3",
"sentry",

View File

@ -29,6 +29,24 @@ use crate::{
use UnminedTxId::*;
/// The minimum cost value for a transaction in the mempool.
///
/// Contributes to the randomized, weighted eviction of transactions from the
/// mempool when it reaches a max size, also based on the total cost.
///
/// > Each transaction has a cost, which is an integer defined as:
/// >
/// > max(serialized transaction size in bytes, 4000)
/// >
/// > The threshold 4000 for the cost function is chosen so that the size in bytes
/// > of a typical fully shielded Sapling transaction (with, say, 2 shielded outputs
/// > and up to 5 shielded inputs) will fall below the threshold. This has the effect
/// > of ensuring that such transactions are not evicted preferentially to typical
/// > transparent transactions because of their size.
///
/// [ZIP-401]: https://zips.z.cash/zip-0401
const MEMPOOL_TRANSACTION_COST_THRESHOLD: u64 = 4000;
/// A unique identifier for an unmined transaction, regardless of version.
///
/// "The transaction ID of a version 4 or earlier transaction is the SHA-256d hash
@ -195,11 +213,13 @@ impl fmt::Display for UnminedTx {
impl From<Transaction> for UnminedTx {
fn from(transaction: Transaction) -> Self {
let size = transaction.zcash_serialized_size().expect(
"unexpected serialization failure: all structurally valid transactions have a size",
);
Self {
id: (&transaction).into(),
size: transaction
.zcash_serialized_size()
.expect("all transactions have a size"),
size,
transaction: Arc::new(transaction),
}
}
@ -207,36 +227,42 @@ impl From<Transaction> for UnminedTx {
impl From<&Transaction> for UnminedTx {
fn from(transaction: &Transaction) -> Self {
let size = transaction.zcash_serialized_size().expect(
"unexpected serialization failure: all structurally valid transactions have a size",
);
Self {
id: transaction.into(),
transaction: Arc::new(transaction.clone()),
size: transaction
.zcash_serialized_size()
.expect("all transactions have a size"),
size,
}
}
}
impl From<Arc<Transaction>> for UnminedTx {
fn from(transaction: Arc<Transaction>) -> Self {
let size = transaction.zcash_serialized_size().expect(
"unexpected serialization failure: all structurally valid transactions have a size",
);
Self {
id: transaction.as_ref().into(),
size: transaction
.zcash_serialized_size()
.expect("all transactions have a size"),
transaction,
size,
}
}
}
impl From<&Arc<Transaction>> for UnminedTx {
fn from(transaction: &Arc<Transaction>) -> Self {
let size = transaction.zcash_serialized_size().expect(
"unexpected serialization failure: all structurally valid transactions have a size",
);
Self {
id: transaction.as_ref().into(),
transaction: transaction.clone(),
size: transaction
.zcash_serialized_size()
.expect("all transactions have a size"),
size,
}
}
}
@ -271,4 +297,45 @@ impl VerifiedUnminedTx {
miner_fee,
}
}
/// The cost in bytes of the transaction, as defined in [ZIP-401].
///
/// A reflection of the work done by the network in processing them (proof
/// and signature verification; networking overheads; size of in-memory data
/// structures).
///
/// > Each transaction has a cost, which is an integer defined as:
/// >
/// > max(serialized transaction size in bytes, 4000)
///
/// [ZIP-401]: https://zips.z.cash/zip-0401
pub fn cost(&self) -> u64 {
std::cmp::max(
self.transaction.size as u64,
MEMPOOL_TRANSACTION_COST_THRESHOLD,
)
}
/// The computed _eviction weight_ of a verified unmined transaction as part
/// of the mempool set.
///
/// Consensus rule:
///
/// > Each transaction also has an eviction weight, which is cost +
/// > low_fee_penalty, where low_fee_penalty is 16000 if the transaction pays
/// > a fee less than the conventional fee, otherwise 0. The conventional fee
/// > is currently defined as 1000 zatoshis
///
/// [ZIP-401]: https://zips.z.cash/zip-0401
pub fn eviction_weight(self) -> u64 {
let conventional_fee = 1000;
let low_fee_penalty = if u64::from(self.miner_fee) < conventional_fee {
16_000
} else {
0
};
self.cost() + low_fee_penalty
}
}

View File

@ -46,6 +46,8 @@ atty = "0.2.14"
sentry = { version = "0.21.0", default-features = false, features = ["backtrace", "contexts", "reqwest", "rustls"] }
sentry-tracing = { git = "https://github.com/kellpossible/sentry-tracing.git", rev = "f1a4a4a16b5ff1022ae60be779eb3fb928ce9b0f" }
rand = "0.8.4"
[build-dependencies]
vergen = { version = "5.1.16", default-features = false, features = ["cargo", "git"] }

View File

@ -189,6 +189,9 @@ impl ActiveState {
/// of that have yet to be confirmed by the Zcash network. A transaction is
/// confirmed when it has been included in a block ('mined').
pub struct Mempool {
/// The configurable options for the mempool, persisted between states.
config: Config,
/// The state of the mempool.
active_state: ActiveState,
@ -236,6 +239,7 @@ impl Mempool {
tokio::sync::watch::channel(HashSet::new());
let mut service = Mempool {
config: config.clone(),
active_state: ActiveState::Disabled,
sync_status,
debug_enable_at_height: config.debug_enable_at_height.map(Height),
@ -305,7 +309,7 @@ impl Mempool {
self.state.clone(),
));
self.active_state = ActiveState::Enabled {
storage: Default::default(),
storage: storage::Storage::new(&self.config),
tx_downloads,
};
} else {

View File

@ -23,8 +23,11 @@ pub struct Config {
///
/// This limits the total serialized byte size of all transactions in the mempool.
///
/// Consensus rule:
/// > There MUST be a configuration option mempooltxcostlimit, which SHOULD default to 80000000.
///
/// This corresponds to `mempooltxcostlimit` from [ZIP-401](https://zips.z.cash/zip-0401#specification).
pub tx_cost_limit: u32,
pub tx_cost_limit: u64,
/// The mempool transaction eviction age limit.
///

View File

@ -17,7 +17,7 @@ use thiserror::Error;
use zebra_chain::transaction::{self, UnminedTx, UnminedTxId, VerifiedUnminedTx};
use self::verified_set::VerifiedSet;
use super::{downloads::TransactionDownloadVerifyError, MempoolError};
use super::{config, downloads::TransactionDownloadVerifyError, MempoolError};
#[cfg(any(test, feature = "proptest-impl"))]
use proptest_derive::Arbitrary;
@ -127,6 +127,10 @@ pub struct Storage {
/// Any transaction with the same `transaction::Hash` is invalid.
chain_rejected_same_effects:
HashMap<SameEffectsChainRejectionError, HashSet<transaction::Hash>>,
/// Max total cost of the verified mempool set, beyond which transactions
/// are evicted to make room.
tx_cost_limit: u64,
}
impl Drop for Storage {
@ -136,6 +140,13 @@ impl Drop for Storage {
}
impl Storage {
#[allow(clippy::field_reassign_with_default)]
pub(crate) fn new(config: &config::Config) -> Self {
let mut default: Storage = Default::default();
default.tx_cost_limit = config.tx_cost_limit;
default
}
/// Insert a [`VerifiedUnminedTx`] into the mempool, caching any rejections.
///
/// Returns an error if the mempool's verified transactions or rejection caches
@ -172,20 +183,22 @@ impl Storage {
}
// Once inserted, we evict transactions over the pool size limit.
while self.verified.transaction_count() > MEMPOOL_SIZE {
let evicted_tx = self
while self.verified.transaction_count() > MEMPOOL_SIZE
|| self.verified.total_cost() > self.tx_cost_limit
{
let victim_tx = self
.verified
.evict_one()
.expect("mempool is empty, but was expected to be full");
self.reject(
evicted_tx.transaction.id,
victim_tx.transaction.id,
SameEffectsChainRejectionError::RandomlyEvicted.into(),
);
// If this transaction gets evicted, set its result to the same error
// (we could return here, but we still want to check the mempool size)
if evicted_tx.transaction.id == tx_id {
if victim_tx.transaction.id == tx_id {
result = Err(SameEffectsChainRejectionError::RandomlyEvicted.into());
}
}
@ -333,6 +346,9 @@ impl Storage {
self.tip_rejected_same_effects.insert(txid.mined_id(), e);
}
RejectionError::SameEffectsChain(e) => {
// TODO: track evicted victims times, removing those older than
// config.eviction_memory_time, as well as FIFO more than
// MAX_EVICTION_MEMORY_ENTRIES
self.chain_rejected_same_effects
.entry(e)
.or_default()

View File

@ -17,6 +17,7 @@ use zebra_chain::{
};
use crate::components::mempool::{
config::Config,
storage::{
MempoolError, RejectionError, SameEffectsTipRejectionError, Storage,
MAX_EVICTION_MEMORY_ENTRIES, MEMPOOL_SIZE,
@ -46,7 +47,11 @@ proptest! {
input in any::<SpendConflictTestInput>(),
mut rejection_template in any::<UnminedTxId>()
) {
let mut storage = Storage::default();
let mut storage = Storage::new(
&Config {
tx_cost_limit: 160_000_000,
..Default::default()
});
let (first_transaction, second_transaction) = input.conflicting_transactions();
let input_permutations = vec![
@ -99,7 +104,10 @@ proptest! {
transactions in vec(any::<VerifiedUnminedTx>(), MEMPOOL_SIZE + 1).prop_map(SummaryDebug),
mut rejection_template in any::<UnminedTxId>()
) {
let mut storage = Storage::default();
let mut storage: Storage = Storage::new(&Config {
tx_cost_limit: 160_000_000,
..Default::default()
});
// Make unique IDs by converting the index to bytes, and writing it to each ID
let unique_ids = (0..MAX_EVICTION_MEMORY_ENTRIES as u32).map(move |index| {
@ -158,7 +166,10 @@ proptest! {
rejection_error in any::<RejectionError>(),
mut rejection_template in any::<UnminedTxId>()
) {
let mut storage = Storage::default();
let mut storage: Storage = Storage::new(&Config {
tx_cost_limit: 160_000_000,
..Default::default()
});
// Make unique IDs by converting the index to bytes, and writing it to each ID
let unique_ids = (0..(MAX_EVICTION_MEMORY_ENTRIES + 1) as u32).map(move |index| {
@ -195,7 +206,10 @@ proptest! {
/// same nullifier.
#[test]
fn conflicting_transactions_are_rejected(input in any::<SpendConflictTestInput>()) {
let mut storage = Storage::default();
let mut storage: Storage = Storage::new(&Config {
tx_cost_limit: 160_000_000,
..Default::default()
});
let (first_transaction, second_transaction) = input.conflicting_transactions();
let input_permutations = vec![
@ -227,7 +241,10 @@ proptest! {
#[test]
fn rejected_transactions_are_properly_rolled_back(input in any::<SpendConflictTestInput>())
{
let mut storage = Storage::default();
let mut storage: Storage = Storage::new(&Config {
tx_cost_limit: 160_000_000,
..Default::default()
});
let (first_unconflicting_transaction, second_unconflicting_transaction) =
input.clone().unconflicting_transactions();
@ -280,7 +297,10 @@ proptest! {
/// others.
#[test]
fn removal_of_multiple_transactions(input in any::<MultipleTransactionRemovalTestInput>()) {
let mut storage = Storage::default();
let mut storage: Storage = Storage::new(&Config {
tx_cost_limit: 160_000_000,
..Default::default()
});
// Insert all input transactions, and keep track of the IDs of the one that were actually
// inserted.

View File

@ -11,7 +11,7 @@ use zebra_chain::{
};
use crate::components::mempool::{
storage::tests::unmined_transactions_in_blocks, storage::*, Mempool,
config, storage::tests::unmined_transactions_in_blocks, storage::*, Mempool,
};
#[test]
@ -21,7 +21,10 @@ fn mempool_storage_crud_exact_mainnet() {
let network = Network::Mainnet;
// Create an empty storage instance
let mut storage: Storage = Default::default();
let mut storage: Storage = Storage::new(&config::Config {
tx_cost_limit: u64::MAX,
..Default::default()
});
// Get one (1) unmined transaction
let unmined_tx = unmined_transactions_in_blocks(.., network)
@ -49,7 +52,10 @@ fn mempool_storage_crud_same_effects_mainnet() {
let network = Network::Mainnet;
// Create an empty storage instance
let mut storage: Storage = Default::default();
let mut storage: Storage = Storage::new(&config::Config {
tx_cost_limit: 160_000_000,
..Default::default()
});
// Get one (1) unmined transaction
let unmined_tx = unmined_transactions_in_blocks(.., network)
@ -71,79 +77,6 @@ fn mempool_storage_crud_same_effects_mainnet() {
assert!(!storage.contains_transaction_exact(&unmined_tx.transaction.id));
}
#[test]
fn mempool_storage_basic() -> Result<()> {
zebra_test::init();
mempool_storage_basic_for_network(Network::Mainnet)?;
mempool_storage_basic_for_network(Network::Testnet)?;
Ok(())
}
fn mempool_storage_basic_for_network(network: Network) -> Result<()> {
// Create an empty storage
let mut storage: Storage = Default::default();
// Get transactions from the first 10 blocks of the Zcash blockchain
let unmined_transactions: Vec<_> = unmined_transactions_in_blocks(..=10, network).collect();
let total_transactions = unmined_transactions.len();
// Insert them all to the storage
for unmined_transaction in unmined_transactions.clone() {
storage.insert(unmined_transaction)?;
}
// Separate transactions into the ones expected to be in the mempool and those expected to be
// rejected.
let rejected_transaction_count = total_transactions - MEMPOOL_SIZE;
let expected_to_be_rejected = &unmined_transactions[..rejected_transaction_count];
let expected_in_mempool = &unmined_transactions[rejected_transaction_count..];
// Only MEMPOOL_SIZE should land in verified
assert_eq!(storage.verified.transaction_count(), MEMPOOL_SIZE);
// The rest of the transactions will be in rejected
assert_eq!(
storage.rejected_transaction_count(),
rejected_transaction_count
);
// Make sure the last MEMPOOL_SIZE transactions we sent are in the verified
for tx in expected_in_mempool {
assert!(storage.contains_transaction_exact(&tx.transaction.id));
}
// Anything greater should not be in the verified
for tx in expected_to_be_rejected {
assert!(!storage.contains_transaction_exact(&tx.transaction.id));
}
// Query all the ids we have for rejected, get back `total - MEMPOOL_SIZE`
let all_ids: HashSet<UnminedTxId> = unmined_transactions
.iter()
.map(|tx| tx.transaction.id)
.collect();
// Convert response to a `HashSet`, because the order of the response doesn't matter.
let rejected_response: HashSet<UnminedTxId> =
storage.rejected_transactions(all_ids).into_iter().collect();
let rejected_ids = expected_to_be_rejected
.iter()
.map(|tx| tx.transaction.id)
.collect();
assert_eq!(rejected_response, rejected_ids);
// Make sure the first id stored is now rejected
assert!(storage.contains_rejected(&expected_to_be_rejected[0].transaction.id));
// Make sure the last id stored is not rejected
assert!(!storage.contains_rejected(&expected_in_mempool[0].transaction.id));
Ok(())
}
#[test]
fn mempool_expired_basic() -> Result<()> {
zebra_test::init();
@ -156,7 +89,10 @@ fn mempool_expired_basic() -> Result<()> {
fn mempool_expired_basic_for_network(network: Network) -> Result<()> {
// Create an empty storage
let mut storage: Storage = Default::default();
let mut storage: Storage = Storage::new(&config::Config {
tx_cost_limit: 160_000_000,
..Default::default()
});
let block: Block = match network {
Network::Mainnet => {

View File

@ -30,6 +30,9 @@ pub struct VerifiedSet {
/// serialized.
transactions_serialized_size: usize,
/// The total cost of the verified transactons in the set.
total_cost: u64,
/// The set of spent out points by the verified transactions.
spent_outpoints: HashSet<transparent::OutPoint>,
@ -61,6 +64,13 @@ impl VerifiedSet {
self.transactions.len()
}
/// Returns the total cost of the verified transactions in the set.
///
/// [ZIP-401]: https://zips.z.cash/zip-0401
pub fn total_cost(&self) -> u64 {
self.total_cost
}
/// Returns `true` if the set of verified transactions contains the transaction with the
/// specified `id.
pub fn contains(&self, id: &UnminedTxId) -> bool {
@ -77,6 +87,7 @@ impl VerifiedSet {
self.sapling_nullifiers.clear();
self.orchard_nullifiers.clear();
self.transactions_serialized_size = 0;
self.total_cost = 0;
self.update_metrics();
}
@ -97,6 +108,7 @@ impl VerifiedSet {
self.cache_outputs_from(&transaction.transaction.transaction);
self.transactions_serialized_size += transaction.transaction.size;
self.total_cost += transaction.cost();
self.transactions.push_front(transaction);
self.update_metrics();
@ -104,15 +116,41 @@ impl VerifiedSet {
Ok(())
}
/// Evict one transaction from the set to open space for another transaction.
/// Evict one transaction from the set, returns the victim transaction.
///
/// Removes a transaction with probability in direct proportion to the
/// eviction weight, as per [ZIP-401].
///
/// Consensus rule:
///
/// > Each transaction also has an eviction weight, which is cost +
/// > low_fee_penalty, where low_fee_penalty is 16000 if the transaction pays
/// > a fee less than the conventional fee, otherwise 0. The conventional fee
/// > is currently defined as 1000 zatoshis
///
/// # Note
///
/// Collecting and calculating weights is O(n). But in practice n is limited
/// to 20,000 (mempooltxcostlimit/min(cost)), so the actual cost shouldn't
/// be too bad.
///
/// [ZIP-401]: https://zips.z.cash/zip-0401
pub fn evict_one(&mut self) -> Option<VerifiedUnminedTx> {
if self.transactions.is_empty() {
None
} else {
// TODO: use random weighted eviction as specified in ZIP-401 (#2780)
let last_index = self.transactions.len() - 1;
use rand::distributions::{Distribution, WeightedIndex};
use rand::prelude::thread_rng;
Some(self.remove(last_index))
let weights: Vec<u64> = self
.transactions
.iter()
.map(|tx| tx.clone().eviction_weight())
.collect();
let dist = WeightedIndex::new(weights).unwrap();
Some(self.remove(dist.sample(&mut thread_rng())))
}
}
@ -154,6 +192,7 @@ impl VerifiedSet {
.expect("invalid transaction index");
self.transactions_serialized_size -= removed_tx.transaction.size;
self.total_cost -= removed_tx.cost();
self.remove_outputs(&removed_tx.transaction);
self.update_metrics();
@ -228,5 +267,6 @@ impl VerifiedSet {
"zcash.mempool.size.bytes",
self.transactions_serialized_size as _
);
metrics::gauge!("zcash.mempool.cost.bytes", u64::from(self.total_cost) as _);
}
}

View File

@ -1,20 +1,19 @@
//! Randomised property tests for the mempool.
use proptest::collection::vec;
use proptest::prelude::*;
use proptest_derive::Arbitrary;
use tokio::time;
use tower::{buffer::Buffer, util::BoxService};
use zebra_chain::{block, parameters::Network, transaction::VerifiedUnminedTx};
use zebra_chain::{parameters::Network, transaction::VerifiedUnminedTx};
use zebra_consensus::{error::TransactionError, transaction as tx};
use zebra_network as zn;
use zebra_state::{self as zs, ChainTipBlock, ChainTipSender};
use zebra_test::mock_service::{MockService, PropTestAssertion};
use crate::components::{
mempool::{self, Mempool},
mempool::{config::Config, Mempool},
sync::{RecentSyncLengths, SyncStatus},
};
@ -27,8 +26,6 @@ type MockState = MockService<zs::Request, zs::Response, PropTestAssertion>;
/// A [`MockService`] representing the Zebra transaction verifier service.
type MockTxVerifier = MockService<tx::Request, tx::Response, PropTestAssertion, TransactionError>;
const CHAIN_LENGTH: usize = 10;
proptest! {
/// Test if the mempool storage is cleared on a chain reset.
#[test]
@ -84,94 +81,6 @@ proptest! {
})?;
}
/// Test if the mempool storage is cleared on multiple chain resets.
#[test]
fn storage_is_cleared_on_chain_resets(
network in any::<Network>(),
mut previous_chain_tip in any::<ChainTipBlock>(),
mut transactions in vec(any::<VerifiedUnminedTx>(), 0..CHAIN_LENGTH),
fake_chain_tips in vec(any::<FakeChainTip>(), 0..CHAIN_LENGTH),
) {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to create Tokio runtime");
let _guard = runtime.enter();
runtime.block_on(async move {
let (
mut mempool,
mut peer_set,
mut state_service,
mut tx_verifier,
mut recent_syncs,
mut chain_tip_sender,
) = setup(network);
time::pause();
mempool.enable(&mut recent_syncs).await;
// Set the initial chain tip.
chain_tip_sender.set_best_non_finalized_tip(previous_chain_tip.clone());
// Call the mempool so that it is aware of the initial chain tip.
mempool.dummy_call().await;
for (fake_chain_tip, transaction) in fake_chain_tips.iter().zip(transactions.iter_mut()) {
// Obtain a new chain tip based on the previous one.
let chain_tip = fake_chain_tip.to_chain_tip_block(&previous_chain_tip);
// Adjust the transaction expiry height based on the new chain
// tip height so that the mempool does not evict the transaction
// when there is a chain growth.
if let Some(expiry_height) = transaction.transaction.transaction.expiry_height() {
if chain_tip.height >= expiry_height {
let mut tmp_tx = (*transaction.transaction.transaction).clone();
// Set a new expiry height that is greater than the
// height of the current chain tip.
*tmp_tx.expiry_height_mut() = block::Height(chain_tip.height.0 + 1);
transaction.transaction = tmp_tx.into();
}
}
// Insert the dummy transaction into the mempool.
mempool
.storage()
.insert(transaction.clone())
.expect("Inserting a transaction should succeed");
// Set the new chain tip.
chain_tip_sender.set_best_non_finalized_tip(chain_tip.clone());
// Call the mempool so that it is aware of the new chain tip.
mempool.dummy_call().await;
match fake_chain_tip {
FakeChainTip::Grow(_) => {
// The mempool should not be empty because we had a regular chain growth.
prop_assert_ne!(mempool.storage().transaction_count(), 0);
}
FakeChainTip::Reset(_) => {
// The mempool should be empty because we had a chain tip reset.
prop_assert_eq!(mempool.storage().transaction_count(), 0);
},
}
// Remember the current chain tip so that the next one can refer to it.
previous_chain_tip = chain_tip;
}
peer_set.expect_no_requests().await?;
state_service.expect_no_requests().await?;
tx_verifier.expect_no_requests().await?;
Ok(())
})?;
}
/// Test if the mempool storage is cleared if the syncer falls behind and starts to catch up.
#[test]
fn storage_is_cleared_if_syncer_falls_behind(
@ -248,7 +157,10 @@ fn setup(
let (chain_tip_sender, latest_chain_tip, chain_tip_change) = ChainTipSender::new(None, network);
let (mempool, _transaction_receiver) = Mempool::new(
&mempool::Config::default(),
&Config {
tx_cost_limit: 160_000_000,
..Default::default()
},
Buffer::new(BoxService::new(peer_set.clone()), 1),
Buffer::new(BoxService::new(state_service.clone()), 1),
Buffer::new(BoxService::new(tx_verifier.clone()), 1),
@ -273,21 +185,3 @@ enum FakeChainTip {
Grow(ChainTipBlock),
Reset(ChainTipBlock),
}
impl FakeChainTip {
/// Returns a new [`ChainTipBlock`] placed on top of the previous block if
/// the chain is supposed to grow. Otherwise returns a [`ChainTipBlock`]
/// that does not reference the previous one.
fn to_chain_tip_block(&self, previous: &ChainTipBlock) -> ChainTipBlock {
match self {
Self::Grow(chain_tip_block) => ChainTipBlock {
hash: chain_tip_block.hash,
height: block::Height(previous.height.0 + 1),
transaction_hashes: chain_tip_block.transaction_hashes.clone(),
previous_block_hash: previous.hash,
},
Self::Reset(chain_tip_block) => chain_tip_block.clone(),
}
}
}

View File

@ -1,6 +1,6 @@
//! Fixed test vectors for the mempool.
use std::{collections::HashSet, sync::Arc};
use std::sync::Arc;
use color_eyre::Report;
use tokio::time;
@ -25,196 +25,6 @@ type StateService = Buffer<BoxService<zs::Request, zs::Response, zs::BoxError>,
/// A [`MockService`] representing the Zebra transaction verifier service.
type MockTxVerifier = MockService<tx::Request, tx::Response, PanicAssertion, TransactionError>;
#[tokio::test]
async fn mempool_service_basic() -> Result<(), Report> {
// Using the mainnet for now
let network = Network::Mainnet;
let (mut service, _peer_set, _state_service, _tx_verifier, mut recent_syncs) =
setup(network).await;
// get the genesis block transactions from the Zcash blockchain.
let mut unmined_transactions = unmined_transactions_in_blocks(..=10, network);
let genesis_transaction = unmined_transactions
.next()
.expect("Missing genesis transaction");
let last_transaction = unmined_transactions.next_back().unwrap();
let more_transactions = unmined_transactions;
// Enable the mempool
let _ = service.enable(&mut recent_syncs).await;
// Insert the genesis block coinbase transaction into the mempool storage.
service.storage().insert(genesis_transaction.clone())?;
// Test `Request::TransactionIds`
let response = service
.ready_and()
.await
.unwrap()
.call(Request::TransactionIds)
.await
.unwrap();
let genesis_transaction_ids = match response {
Response::TransactionIds(ids) => ids,
_ => unreachable!("will never happen in this test"),
};
// Test `Request::TransactionsById`
let genesis_transactions_hash_set = genesis_transaction_ids
.iter()
.copied()
.collect::<HashSet<_>>();
let response = service
.ready_and()
.await
.unwrap()
.call(Request::TransactionsById(
genesis_transactions_hash_set.clone(),
))
.await
.unwrap();
let transactions = match response {
Response::Transactions(transactions) => transactions,
_ => unreachable!("will never happen in this test"),
};
// Make sure the transaction from the blockchain test vector is the same as the
// response of `Request::TransactionsById`
assert_eq!(genesis_transaction.transaction, transactions[0]);
// Insert more transactions into the mempool storage.
// This will cause the genesis transaction to be moved into rejected.
// Skip the last (will be used later)
for tx in more_transactions {
service.storage().insert(tx.clone())?;
}
// Test `Request::RejectedTransactionIds`
let response = service
.ready_and()
.await
.unwrap()
.call(Request::RejectedTransactionIds(
genesis_transactions_hash_set,
))
.await
.unwrap();
let rejected_ids = match response {
Response::RejectedTransactionIds(ids) => ids,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(rejected_ids, genesis_transaction_ids);
// Test `Request::Queue`
// Use the ID of the last transaction in the list
let response = service
.ready_and()
.await
.unwrap()
.call(Request::Queue(vec![last_transaction.transaction.id.into()]))
.await
.unwrap();
let queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert!(queued_responses[0].is_ok());
assert_eq!(service.tx_downloads().in_flight(), 1);
Ok(())
}
#[tokio::test]
async fn mempool_queue() -> Result<(), Report> {
// Using the mainnet for now
let network = Network::Mainnet;
let (mut service, _peer_set, _state_service, _tx_verifier, mut recent_syncs) =
setup(network).await;
// Get transactions to use in the test
let unmined_transactions = unmined_transactions_in_blocks(..=10, network);
let mut transactions = unmined_transactions;
// Split unmined_transactions into:
// [rejected_tx, transactions..., stored_tx, new_tx]
//
// The first transaction to be added in the mempool which will be eventually
// put in the rejected list
let rejected_tx = transactions.next().unwrap().clone();
// A transaction not in the mempool that will be Queued
let new_tx = transactions.next_back().unwrap();
// The last transaction that will be added in the mempool (and thus not rejected)
let stored_tx = transactions.next_back().unwrap().clone();
// Enable the mempool
let _ = service.enable(&mut recent_syncs).await;
// Insert [rejected_tx, transactions..., stored_tx] into the mempool storage.
// Insert the genesis block coinbase transaction into the mempool storage.
service.storage().insert(rejected_tx.clone())?;
// Insert more transactions into the mempool storage.
// This will cause the `rejected_tx` to be moved into rejected.
for tx in transactions {
service.storage().insert(tx.clone())?;
}
service.storage().insert(stored_tx.clone())?;
// Test `Request::Queue` for a new transaction
let response = service
.ready_and()
.await
.unwrap()
.call(Request::Queue(vec![new_tx.transaction.id.into()]))
.await
.unwrap();
let queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert!(queued_responses[0].is_ok());
// Test `Request::Queue` for a transaction already in the mempool
let response = service
.ready_and()
.await
.unwrap()
.call(Request::Queue(vec![stored_tx.transaction.id.into()]))
.await
.unwrap();
let queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert_eq!(queued_responses[0], Err(MempoolError::InMempool));
// Test `Request::Queue` for a transaction rejected by the mempool
let response = service
.ready_and()
.await
.unwrap()
.call(Request::Queue(vec![rejected_tx.transaction.id.into()]))
.await
.unwrap();
let queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert_eq!(
queued_responses[0],
Err(MempoolError::StorageEffectsChain(
SameEffectsChainRejectionError::RandomlyEvicted
))
);
Ok(())
}
#[tokio::test]
async fn mempool_service_disabled() -> Result<(), Report> {
// Using the mainnet for now
@ -673,7 +483,10 @@ async fn setup(
let (sync_status, recent_syncs) = SyncStatus::new();
let (mempool, _mempool_transaction_receiver) = Mempool::new(
&mempool::Config::default(),
&mempool::Config {
tx_cost_limit: u64::MAX,
..Default::default()
},
Buffer::new(BoxService::new(peer_set.clone()), 1),
state_service.clone(),
Buffer::new(BoxService::new(tx_verifier.clone()), 1),