diff --git a/zebra-chain/src/transaction.rs b/zebra-chain/src/transaction.rs index 2fc9a2af..ef5fbdb4 100644 --- a/zebra-chain/src/transaction.rs +++ b/zebra-chain/src/transaction.rs @@ -294,8 +294,7 @@ impl Transaction { /// Get this transaction's expiry height, if any. pub fn expiry_height(&self) -> Option { match self { - Transaction::V1 { .. } => None, - Transaction::V2 { .. } => None, + Transaction::V1 { .. } | Transaction::V2 { .. } => None, Transaction::V3 { expiry_height, .. } | Transaction::V4 { expiry_height, .. } | Transaction::V5 { expiry_height, .. } => match expiry_height { @@ -308,6 +307,32 @@ impl Transaction { } } + /// Modify the expiry height of this transaction. + /// + /// # Panics + /// + /// - if called on a v1 or v2 transaction + #[cfg(any(test, feature = "proptest-impl"))] + pub fn expiry_height_mut(&mut self) -> &mut block::Height { + match self { + Transaction::V1 { .. } | Transaction::V2 { .. } => { + panic!("v1 and v2 transactions are not supported") + } + Transaction::V3 { + ref mut expiry_height, + .. + } + | Transaction::V4 { + ref mut expiry_height, + .. + } + | Transaction::V5 { + ref mut expiry_height, + .. + } => expiry_height, + } + } + /// Get this transaction's network upgrade field, if any. /// This field is serialized as `nConsensusBranchId` ([7.1]). /// diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index afa80bb6..282e149f 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -83,7 +83,7 @@ impl StartCmd { )); let (peer_set, address_book) = - zebra_network::init(config.network.clone(), inbound, latest_chain_tip).await; + zebra_network::init(config.network.clone(), inbound, latest_chain_tip.clone()).await; info!("initializing syncer"); let (syncer, sync_status) = @@ -96,6 +96,7 @@ impl StartCmd { state, tx_verifier, sync_status.clone(), + latest_chain_tip, chain_tip_change.clone(), )); let mempool = ServiceBuilder::new().buffer(20).service(mempool_service); diff --git a/zebrad/src/components/inbound/tests.rs b/zebrad/src/components/inbound/tests.rs index 41169df2..3828a009 100644 --- a/zebrad/src/components/inbound/tests.rs +++ b/zebrad/src/components/inbound/tests.rs @@ -25,7 +25,7 @@ use zebra_test::mock_service::{MockService, PanicAssertion}; #[tokio::test] async fn mempool_requests_for_transactions() { - let (inbound_service, added_transactions, _, mut peer_set) = setup(true).await; + let (inbound_service, added_transactions, _, mut peer_set, _) = setup(true).await; let added_transaction_ids: Vec = added_transactions .clone() @@ -74,7 +74,7 @@ async fn mempool_push_transaction() -> Result<(), crate::BoxError> { // use the first transaction that is not coinbase let tx = block.transactions[1].clone(); - let (inbound_service, _, mut tx_verifier, mut peer_set) = setup(false).await; + let (inbound_service, _, mut tx_verifier, mut peer_set, _) = setup(false).await; // Test `Request::PushTransaction` let request = inbound_service @@ -123,7 +123,7 @@ async fn mempool_advertise_transaction_ids() -> Result<(), crate::BoxError> { let test_transaction_id = test_transaction.unmined_id(); let txs = HashSet::from_iter([test_transaction_id]); - let (inbound_service, _, mut tx_verifier, mut peer_set) = setup(false).await; + let (inbound_service, _, mut tx_verifier, mut peer_set, _) = setup(false).await; // Test `Request::AdvertiseTransactionIds` let request = inbound_service @@ -173,6 +173,178 @@ async fn mempool_advertise_transaction_ids() -> Result<(), crate::BoxError> { Ok(()) } +#[tokio::test] +async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> { + // Get a block that has at least one non coinbase transaction + let block: Block = zebra_test::vectors::BLOCK_MAINNET_982681_BYTES.zcash_deserialize_into()?; + + // Use the first transaction that is not coinbase to test expiration + let tx1 = &*(block.transactions[1]).clone(); + let mut tx1_id = tx1.unmined_id(); + + // Change the expiration height of the transaction to block 3 + let mut tx1 = tx1.clone(); + *tx1.expiry_height_mut() = zebra_chain::block::Height(3); + + // Use the second transaction that is not coinbase to trigger `remove_expired_transactions()` + let tx2 = block.transactions[2].clone(); + let mut tx2_id = tx2.unmined_id(); + + // Get services + let (inbound_service, _, mut tx_verifier, _peer_set, state_service) = setup(false).await; + + // Push test transaction + let request = inbound_service + .clone() + .oneshot(Request::PushTransaction(tx1.clone().into())); + // Simulate a successful transaction verification + let verification = tx_verifier.expect_request_that(|_| true).map(|responder| { + tx1_id = responder.request().tx_id(); + responder.respond(tx1_id); + }); + let (response, _) = futures::join!(request, verification); + match response { + Ok(Response::Nil) => (), + _ => unreachable!("`PushTransaction` requests should always respond `Ok(Nil)`"), + }; + + // Use `Request::MempoolTransactionIds` to check the transaction was inserted to mempool + let request = inbound_service + .clone() + .oneshot(Request::MempoolTransactionIds) + .await; + + match request { + Ok(Response::TransactionIds(response)) => { + assert_eq!(response, vec![tx1_id]) + } + _ => unreachable!( + "`MempoolTransactionIds` requests should always respond `Ok(Vec)`" + ), + }; + + // Add a new block to the state (make the chain tip advance) + let block_one: Arc = zebra_test::vectors::BLOCK_MAINNET_2_BYTES + .zcash_deserialize_into() + .unwrap(); + state_service + .clone() + .oneshot(zebra_state::Request::CommitFinalizedBlock( + block_one.clone().into(), + )) + .await + .unwrap(); + + // Make sure tx1 is still in the mempool as it is not expired yet. + let request = inbound_service + .clone() + .oneshot(Request::MempoolTransactionIds) + .await; + + match request { + Ok(Response::TransactionIds(response)) => { + assert_eq!(response, vec![tx1_id]) + } + _ => unreachable!( + "`MempoolTransactionIds` requests should always respond `Ok(Vec)`" + ), + }; + + // As our test transaction will expire at a block height greater or equal to 3 we need to push block 3. + let block_two: Arc = zebra_test::vectors::BLOCK_MAINNET_3_BYTES + .zcash_deserialize_into() + .unwrap(); + state_service + .clone() + .oneshot(zebra_state::Request::CommitFinalizedBlock( + block_two.clone().into(), + )) + .await + .unwrap(); + + // Push a second transaction to trigger `remove_expired_transactions()` + let request = inbound_service + .clone() + .oneshot(Request::PushTransaction(tx2.clone().into())); + // Simulate a successful transaction verification + let verification = tx_verifier.expect_request_that(|_| true).map(|responder| { + tx2_id = responder.request().tx_id(); + responder.respond(tx2_id); + }); + let (response, _) = futures::join!(request, verification); + match response { + Ok(Response::Nil) => (), + _ => unreachable!("`PushTransaction` requests should always respond `Ok(Nil)`"), + }; + + // Use `Request::MempoolTransactionIds` to check the transaction was inserted to mempool + let request = inbound_service + .clone() + .oneshot(Request::MempoolTransactionIds) + .await; + + // Only tx2 will be in the mempool while tx1 was expired + match request { + Ok(Response::TransactionIds(response)) => { + assert_eq!(response, vec![tx2_id]) + } + _ => unreachable!( + "`MempoolTransactionIds` requests should always respond `Ok(Vec)`" + ), + }; + + // Add all the rest of the continous blocks we have to test tx2 will never expire. + let more_blocks: Vec> = vec![ + zebra_test::vectors::BLOCK_MAINNET_4_BYTES + .zcash_deserialize_into() + .unwrap(), + zebra_test::vectors::BLOCK_MAINNET_5_BYTES + .zcash_deserialize_into() + .unwrap(), + zebra_test::vectors::BLOCK_MAINNET_6_BYTES + .zcash_deserialize_into() + .unwrap(), + zebra_test::vectors::BLOCK_MAINNET_7_BYTES + .zcash_deserialize_into() + .unwrap(), + zebra_test::vectors::BLOCK_MAINNET_8_BYTES + .zcash_deserialize_into() + .unwrap(), + zebra_test::vectors::BLOCK_MAINNET_9_BYTES + .zcash_deserialize_into() + .unwrap(), + zebra_test::vectors::BLOCK_MAINNET_10_BYTES + .zcash_deserialize_into() + .unwrap(), + ]; + for block in more_blocks { + state_service + .clone() + .oneshot(zebra_state::Request::CommitFinalizedBlock( + block.clone().into(), + )) + .await + .unwrap(); + + let request = inbound_service + .clone() + .oneshot(Request::MempoolTransactionIds) + .await; + + // tx2 is still in the mempool as the blockchain progress because the zero expiration height + match request { + Ok(Response::TransactionIds(response)) => { + assert_eq!(response, vec![tx2_id]) + } + _ => unreachable!( + "`MempoolTransactionIds` requests should always respond `Ok(Vec)`" + ), + }; + } + + Ok(()) +} + async fn setup( add_transactions: bool, ) -> ( @@ -180,6 +352,14 @@ async fn setup( Option>, MockService, MockService, + Buffer< + BoxService< + zebra_state::Request, + zebra_state::Response, + Box, + >, + zebra_state::Request, + >, ) { let network = Network::Mainnet; let consensus_config = ConsensusConfig::default(); @@ -187,7 +367,7 @@ async fn setup( let address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Span::none()); let address_book = Arc::new(std::sync::Mutex::new(address_book)); let (sync_status, mut recent_syncs) = SyncStatus::new(); - let (state, _latest_chain_tip, chain_tip_change) = + let (state, latest_chain_tip, chain_tip_change) = zebra_state::init(state_config.clone(), network); let mut state_service = ServiceBuilder::new().buffer(1).service(state); @@ -218,12 +398,27 @@ async fn setup( .await .unwrap(); + // Also push block 1. + // Block one is a network upgrade and the mempool will be cleared at it, + // let all our tests start after this event. + let block_one: Arc = zebra_test::vectors::BLOCK_MAINNET_1_BYTES + .zcash_deserialize_into() + .unwrap(); + state_service + .clone() + .oneshot(zebra_state::Request::CommitFinalizedBlock( + block_one.clone().into(), + )) + .await + .unwrap(); + let mut mempool_service = Mempool::new( network, buffered_peer_set.clone(), state_service.clone(), buffered_tx_verifier.clone(), sync_status, + latest_chain_tip, chain_tip_change, ); @@ -258,6 +453,7 @@ async fn setup( added_transactions, mock_tx_verifier, peer_set, + state_service, ) } diff --git a/zebrad/src/components/mempool.rs b/zebrad/src/components/mempool.rs index 0787e1d7..d1d55a21 100644 --- a/zebrad/src/components/mempool.rs +++ b/zebrad/src/components/mempool.rs @@ -12,6 +12,7 @@ use futures::{future::FutureExt, stream::Stream}; use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service}; use zebra_chain::{ + chain_tip::ChainTip, parameters::Network, transaction::{UnminedTx, UnminedTxId}, }; @@ -100,6 +101,8 @@ pub struct Mempool { #[allow(dead_code)] sync_status: SyncStatus, + /// Allow efficient access to the best tip of the blockchain. + latest_chain_tip: zs::LatestChainTip, /// Allows the detection of chain tip resets. #[allow(dead_code)] chain_tip_change: ChainTipChange, @@ -125,11 +128,13 @@ impl Mempool { state: State, tx_verifier: TxVerifier, sync_status: SyncStatus, + latest_chain_tip: zs::LatestChainTip, chain_tip_change: ChainTipChange, ) -> Self { Mempool { active_state: ActiveState::Disabled, sync_status, + latest_chain_tip, chain_tip_change, outbound, state, @@ -246,7 +251,7 @@ impl Service for Mempool { storage.clear(); } - // Clean up completed download tasks and add to mempool if successful + // Clean up completed download tasks and add to mempool if successful. while let Poll::Ready(Some(r)) = tx_downloads.as_mut().poll_next(cx) { if let Ok(tx) = r { // Storage handles conflicting transactions or a full mempool internally, @@ -254,6 +259,11 @@ impl Service for Mempool { let _ = storage.insert(tx); } } + + // Remove expired transactions from the mempool. + if let Some(tip_height) = self.latest_chain_tip.best_tip_height() { + remove_expired_transactions(storage, tip_height); + } } ActiveState::Disabled => { // When the mempool is disabled we still return that the service is ready. @@ -261,6 +271,7 @@ impl Service for Mempool { // which may not be the desired behaviour. } } + Poll::Ready(Ok(())) } @@ -322,3 +333,22 @@ impl Service for Mempool { } } } + +/// Remove transactions from the mempool if they have not been mined after a specified height. +/// +/// https://zips.z.cash/zip-0203#specification +fn remove_expired_transactions( + storage: &mut storage::Storage, + tip_height: zebra_chain::block::Height, +) { + let ids = storage.tx_ids().iter().copied().collect(); + let transactions = storage.transactions(ids); + + for t in transactions { + if let Some(expiry_height) = t.transaction.expiry_height() { + if tip_height >= expiry_height { + storage.remove(&t.id); + } + } + } +} diff --git a/zebrad/src/components/mempool/tests.rs b/zebrad/src/components/mempool/tests.rs index b7965900..dfef11a9 100644 --- a/zebrad/src/components/mempool/tests.rs +++ b/zebrad/src/components/mempool/tests.rs @@ -16,7 +16,7 @@ async fn mempool_service_basic() -> Result<(), Report> { let state_config = StateConfig::ephemeral(); let peer_set = MockService::build().for_unit_tests(); let (sync_status, mut recent_syncs) = SyncStatus::new(); - let (state, _latest_chain_tip, chain_tip_change) = + let (state, latest_chain_tip, chain_tip_change) = zebra_state::init(state_config.clone(), network); let state_service = ServiceBuilder::new().buffer(1).service(state); @@ -39,6 +39,7 @@ async fn mempool_service_basic() -> Result<(), Report> { state_service.clone(), tx_verifier, sync_status, + latest_chain_tip, chain_tip_change, ); @@ -136,7 +137,7 @@ async fn mempool_queue() -> Result<(), Report> { let state_config = StateConfig::ephemeral(); let peer_set = MockService::build().for_unit_tests(); let (sync_status, mut recent_syncs) = SyncStatus::new(); - let (state, _latest_chain_tip, chain_tip_change) = + let (state, latest_chain_tip, chain_tip_change) = zebra_state::init(state_config.clone(), network); let state_service = ServiceBuilder::new().buffer(1).service(state); @@ -165,6 +166,7 @@ async fn mempool_queue() -> Result<(), Report> { state_service.clone(), tx_verifier, sync_status, + latest_chain_tip, chain_tip_change, ); @@ -238,7 +240,7 @@ async fn mempool_service_disabled() -> Result<(), Report> { let peer_set = MockService::build().for_unit_tests(); let (sync_status, mut recent_syncs) = SyncStatus::new(); - let (state, _latest_chain_tip, chain_tip_change) = zebra_state::init(state_config, network); + let (state, latest_chain_tip, chain_tip_change) = zebra_state::init(state_config, network); let state_service = ServiceBuilder::new().buffer(1).service(state); let (_chain_verifier, tx_verifier) = zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone()) @@ -258,6 +260,7 @@ async fn mempool_service_disabled() -> Result<(), Report> { state_service.clone(), tx_verifier, sync_status, + latest_chain_tip, chain_tip_change, );