diff --git a/zebra-network/src/protocol/internal/request.rs b/zebra-network/src/protocol/internal/request.rs index ea922f16..03390e09 100644 --- a/zebra-network/src/protocol/internal/request.rs +++ b/zebra-network/src/protocol/internal/request.rs @@ -203,9 +203,9 @@ impl fmt::Display for Request { Request::Ping(_) => "Ping".to_string(), Request::BlocksByHash(hashes) => { - format!("BlocksByHash {{ hashes: {} }}", hashes.len()) + format!("BlocksByHash({})", hashes.len()) } - Request::TransactionsById(ids) => format!("TransactionsById {{ ids: {} }}", ids.len()), + Request::TransactionsById(ids) => format!("TransactionsById({})", ids.len()), Request::FindBlocks { known_blocks, stop } => format!( "FindBlocks {{ known_blocks: {}, stop: {} }}", @@ -220,7 +220,7 @@ impl fmt::Display for Request { Request::PushTransaction(_) => "PushTransaction".to_string(), Request::AdvertiseTransactionIds(ids) => { - format!("AdvertiseTransactionIds {{ ids: {} }}", ids.len()) + format!("AdvertiseTransactionIds({})", ids.len()) } Request::AdvertiseBlock(_) => "AdvertiseBlock".to_string(), diff --git a/zebrad/src/components/mempool.rs b/zebrad/src/components/mempool.rs index dbc5beba..bd8e2199 100644 --- a/zebrad/src/components/mempool.rs +++ b/zebrad/src/components/mempool.rs @@ -450,6 +450,9 @@ impl Service for Mempool { if !send_to_peers_ids.is_empty() { tracing::trace!(?send_to_peers_ids, "sending new transactions to peers"); + // TODO: + // - if the transaction gossip task is slow, we can overwrite unsent IDs here + // - does this happen often enough to be worth a fix? self.transaction_sender.send(send_to_peers_ids)?; } } diff --git a/zebrad/src/components/mempool/gossip.rs b/zebrad/src/components/mempool/gossip.rs index b55dc37c..cca47234 100644 --- a/zebrad/src/components/mempool/gossip.rs +++ b/zebrad/src/components/mempool/gossip.rs @@ -3,18 +3,18 @@ //! This module is just a function [`gossip_mempool_transaction_id`] that waits for mempool //! insertion events received in a channel and broadcasts the transactions to peers. -use tower::{timeout::Timeout, Service, ServiceExt}; - -use zebra_network as zn; - -use tokio::sync::watch; -use zebra_chain::transaction::UnminedTxId; - use std::collections::HashSet; -use crate::BoxError; +use tokio::sync::watch; +use tower::{timeout::Timeout, Service, ServiceExt}; -use crate::components::sync::TIPS_RESPONSE_TIMEOUT; +use zebra_chain::transaction::UnminedTxId; +use zebra_network as zn; + +use crate::{components::sync::TIPS_RESPONSE_TIMEOUT, BoxError}; + +/// The maximum number of times we will delay sending because there is a new change. +pub const MAX_CHANGES_BEFORE_SEND: usize = 10; /// Runs continuously, gossiping new [`UnminedTxId`] to peers. /// @@ -37,14 +37,37 @@ where let mut broadcast_network = Timeout::new(broadcast_network, TIPS_RESPONSE_TIMEOUT); loop { - // once we get new data in the channel, broadcast to peers - receiver.changed().await?; + let mut combined_changes = 1; + + // once we get new data in the channel, broadcast to peers, + // the mempool automatically combines some transactions that arrive close together + receiver.changed().await?; + let mut txs = receiver.borrow().clone(); + tokio::task::yield_now().await; + + // also combine transactions that arrived shortly after this one + while receiver.has_changed()? && combined_changes < MAX_CHANGES_BEFORE_SEND { + // Correctness + // - set the has_changed() flag to false using borrow_and_update() + // - clone() so we don't hold the watch channel lock while modifying txs + let extra_txs = receiver.borrow_and_update().clone(); + txs.extend(extra_txs.iter()); + + combined_changes += 1; + + tokio::task::yield_now().await; + } - let txs = receiver.borrow().clone(); let txs_len = txs.len(); let request = zn::Request::AdvertiseTransactionIds(txs); - info!(?request, "sending mempool transaction broadcast"); + // TODO: rate-limit this info level log? + info!(%request, changes = %combined_changes, "sending mempool transaction broadcast"); + debug!( + ?request, + changes = ?combined_changes, + "full list of mempool transactions in broadcast" + ); // broadcast requests don't return errors, and we'd just want to ignore them anyway let _ = broadcast_network.ready().await?.call(request).await;