//! Transaction Queue. //! //! All transactions that are sent from RPC methods should be added to this queue for retries. //! Transactions can fail to be inserted to the mempool immediately by different reasons, //! like having not mined utxos. //! //! The [`Queue`] is just an `IndexMap` of transactions with insertion date. //! We use this data type because we want the transactions in the queue to be in order. //! The [`Runner`] component will do the processing in it's [`Runner::run()`] method. use std::{collections::HashSet, sync::Arc}; use chrono::Duration; use indexmap::IndexMap; use tokio::{ sync::broadcast::{self, error::TryRecvError}, time::Instant, }; use tower::{Service, ServiceExt}; use zebra_chain::{ block::Height, chain_tip::ChainTip, parameters::{Network, NetworkUpgrade}, transaction::{Transaction, UnminedTx, UnminedTxId}, }; use zebra_node_services::{ mempool::{Gossip, Request, Response}, BoxError, }; use zebra_state::{MinedTx, ReadRequest, ReadResponse}; #[cfg(test)] mod tests; /// The approximate target number of blocks a transaction can be in the queue. const NUMBER_OF_BLOCKS_TO_EXPIRE: i64 = 5; /// Size of the queue and channel. pub const CHANNEL_AND_QUEUE_CAPACITY: usize = 20; /// The height to use in spacing calculation if we don't have a chain tip. const NO_CHAIN_TIP_HEIGHT: Height = Height(1); #[derive(Clone, Debug)] /// The queue is a container of transactions that are going to be /// sent to the mempool again. pub struct Queue { transactions: IndexMap, Instant)>, } #[derive(Debug)] /// The runner will make the processing of the transactions in the queue. pub struct Runner { queue: Queue, receiver: broadcast::Receiver, tip_height: Height, } impl Queue { /// Start a new queue pub fn start() -> (Runner, broadcast::Sender) { let (sender, receiver) = broadcast::channel(CHANNEL_AND_QUEUE_CAPACITY); let queue = Queue { transactions: IndexMap::new(), }; let runner = Runner { queue, receiver, tip_height: Height(0), }; (runner, sender) } /// Get the transactions in the queue. pub fn transactions(&self) -> IndexMap, Instant)> { self.transactions.clone() } /// Insert a transaction to the queue. pub fn insert(&mut self, unmined_tx: UnminedTx) { self.transactions .insert(unmined_tx.id, (unmined_tx.transaction, Instant::now())); // remove if queue is over capacity if self.transactions.len() > CHANNEL_AND_QUEUE_CAPACITY { self.remove_first(); } } /// Remove a transaction from the queue. pub fn remove(&mut self, unmined_id: UnminedTxId) { self.transactions.swap_remove(&unmined_id); } /// Remove the oldest transaction from the queue. pub fn remove_first(&mut self) { self.transactions.shift_remove_index(0); } } impl Runner { /// Get the queue transactions as a `HashSet` of unmined ids. fn transactions_as_hash_set(&self) -> HashSet { let transactions = self.queue.transactions(); transactions.iter().map(|t| *t.0).collect() } /// Get the queue transactions as a `Vec` of transactions. fn transactions_as_vec(&self) -> Vec> { let transactions = self.queue.transactions(); transactions.iter().map(|t| t.1 .0.clone()).collect() } /// Update the `tip_height` field with a new height. pub fn update_tip_height(&mut self, height: Height) { self.tip_height = height; } /// Retry sending to mempool if needed. /// /// Creates a loop that will run each time a new block is mined. /// In this loop, get the transactions that are in the queue and: /// - Check if they are now in the mempool and if so, delete the transaction from the queue. /// - Check if the transaction is now part of a block in the state and if so, /// delete the transaction from the queue. /// - With the transactions left in the queue, retry sending them to the mempool ignoring /// the result of this operation. /// /// Additionally, each iteration of the above loop, will receive and insert to the queue /// transactions that are pending in the channel. pub async fn run( mut self, mempool: Mempool, state: State, tip: Tip, network: Network, ) where Mempool: Service + Clone + 'static, State: Service + Clone + Send + Sync + 'static, Tip: ChainTip + Clone + Send + Sync + 'static, { loop { // if we don't have a chain use `NO_CHAIN_TIP_HEIGHT` to get block spacing let tip_height = match tip.best_tip_height() { Some(height) => height, _ => NO_CHAIN_TIP_HEIGHT, }; // get spacing between blocks let spacing = NetworkUpgrade::target_spacing_for_height(&network, tip_height); // sleep until the next block tokio::time::sleep(spacing.to_std().expect("should never be less than zero")).await; // get transactions from the channel loop { let tx = match self.receiver.try_recv() { Ok(tx) => tx, Err(TryRecvError::Empty) => break, Err(TryRecvError::Lagged(skipped_count)) => { tracing::info!("sendrawtransaction queue was full: skipped {skipped_count} transactions"); continue; } Err(TryRecvError::Closed) => { tracing::info!( "sendrawtransaction queue was closed: is Zebra shutting down?" ); return; } }; self.queue.insert(tx.clone()); } // skip some work if stored tip height is the same as the one arriving // TODO: check tip block hashes instead, so we always retry when there is a chain fork (these are rare) if tip_height != self.tip_height { // update the chain tip self.update_tip_height(tip_height); if !self.queue.transactions().is_empty() { // remove what is expired self.remove_expired(spacing); // remove if any of the queued transactions is now in the mempool let in_mempool = Self::check_mempool(mempool.clone(), self.transactions_as_hash_set()).await; self.remove_committed(in_mempool); // remove if any of the queued transactions is now in the state let in_state = Self::check_state(state.clone(), self.transactions_as_hash_set()).await; self.remove_committed(in_state); // retry what is left in the queue let _retried = Self::retry(mempool.clone(), self.transactions_as_vec()).await; } } } } /// Remove transactions that are expired according to number of blocks and current spacing between blocks. fn remove_expired(&mut self, spacing: Duration) { // Have some extra time to to make sure we re-submit each transaction `NUMBER_OF_BLOCKS_TO_EXPIRE` // times, as the main loop also takes some time to run. let extra_time = Duration::seconds(5); let duration_to_expire = Duration::seconds(NUMBER_OF_BLOCKS_TO_EXPIRE * spacing.num_seconds()) + extra_time; let transactions = self.queue.transactions(); let now = Instant::now(); for tx in transactions.iter() { let tx_time = tx.1 .1 .checked_add( duration_to_expire .to_std() .expect("should never be less than zero"), ) .expect("this is low numbers, should always be inside bounds"); if now > tx_time { self.queue.remove(*tx.0); } } } /// Remove transactions from the queue that had been inserted to the state or the mempool. fn remove_committed(&mut self, to_remove: HashSet) { for r in to_remove { self.queue.remove(r); } } /// Check the mempool for given transactions. /// /// Returns transactions that are in the mempool. async fn check_mempool( mempool: Mempool, transactions: HashSet, ) -> HashSet where Mempool: Service + Clone + 'static, { let mut response = HashSet::new(); if !transactions.is_empty() { let request = Request::TransactionsById(transactions); // ignore any error coming from the mempool let mempool_response = mempool.oneshot(request).await; if let Ok(Response::Transactions(txs)) = mempool_response { for tx in txs { response.insert(tx.id); } } } response } /// Check the state for given transactions. /// /// Returns transactions that are in the state. async fn check_state( state: State, transactions: HashSet, ) -> HashSet where State: Service + Clone + Send + Sync + 'static, { let mut response = HashSet::new(); for t in transactions { let request = ReadRequest::Transaction(t.mined_id()); // ignore any error coming from the state let state_response = state.clone().oneshot(request).await; if let Ok(ReadResponse::Transaction(Some(MinedTx { tx, .. }))) = state_response { response.insert(tx.unmined_id()); } } response } /// Retry sending given transactions to mempool. /// /// Returns the transaction ids that were retried. async fn retry( mempool: Mempool, transactions: Vec>, ) -> HashSet where Mempool: Service + Clone + 'static, { let mut retried = HashSet::new(); for tx in transactions { let unmined = UnminedTx::from(tx); let gossip = Gossip::Tx(unmined.clone()); let request = Request::Queue(vec![gossip]); // Send to mempool and ignore any error let _ = mempool.clone().oneshot(request).await; // return what we retried but don't delete from the queue, // we might retry again in a next call. retried.insert(unmined.id); } retried } }