From f3ee76f202b584926e218ce4ff7204fef49fb490 Mon Sep 17 00:00:00 2001 From: Conrado Gouvea Date: Thu, 9 Sep 2021 10:04:44 -0300 Subject: [PATCH] Verify inbound PushTransactions (#2727) * Verify inbound PushTransactions * Add GossipedTx and refactor downloader to use it * remove grafana changes * remove TODOs * Tidy the transaction fetching in mempool downloader Co-authored-by: Alfredo Garcia Co-authored-by: Deirdre Connolly --- zebrad/src/components/inbound.rs | 16 +++-- zebrad/src/components/mempool/downloads.rs | 77 ++++++++++++++++------ 2 files changed, 66 insertions(+), 27 deletions(-) diff --git a/zebrad/src/components/inbound.rs b/zebrad/src/components/inbound.rs index 2e54a4cf..a7e127f0 100644 --- a/zebrad/src/components/inbound.rs +++ b/zebrad/src/components/inbound.rs @@ -352,18 +352,20 @@ impl Service for Inbound { }) .boxed() } - zn::Request::PushTransaction(_transaction) => { - debug!("ignoring unimplemented request"); - // TODO: send to Tx Download & Verify Stream - // https://github.com/ZcashFoundation/zebra/issues/2692 + zn::Request::PushTransaction(transaction) => { + if let Setup::Initialized { tx_downloads, .. } = &mut self.network_setup { + tx_downloads.download_if_needed_and_verify(transaction.into()); + } else { + info!( + "ignoring `AdvertiseTransactionIds` request from remote peer during network setup" + ); + } async { Ok(zn::Response::Nil) }.boxed() } zn::Request::AdvertiseTransactionIds(transactions) => { if let Setup::Initialized { tx_downloads, .. } = &mut self.network_setup { - // TODO: check if we're close to the tip before proceeding? - // what do we do if it's not? for txid in transactions { - tx_downloads.download_and_verify(txid); + tx_downloads.download_if_needed_and_verify(txid.into()); } } else { info!( diff --git a/zebrad/src/components/mempool/downloads.rs b/zebrad/src/components/mempool/downloads.rs index 14b5f3ba..27245a28 100644 --- a/zebrad/src/components/mempool/downloads.rs +++ b/zebrad/src/components/mempool/downloads.rs @@ -16,7 +16,7 @@ use tokio::{sync::oneshot, task::JoinHandle}; use tower::{Service, ServiceExt}; use tracing_futures::Instrument; -use zebra_chain::transaction::UnminedTxId; +use zebra_chain::transaction::{UnminedTx, UnminedTxId}; use zebra_consensus::transaction as tx; use zebra_network as zn; use zebra_state as zs; @@ -83,6 +83,34 @@ pub enum DownloadAction { FullQueue, } +/// A gossiped transaction, which can be the transaction itself or just its ID. +pub enum GossipedTx { + Id(UnminedTxId), + Tx(UnminedTx), +} + +impl GossipedTx { + /// Return the [`UnminedTxId`] of a gossiped transaction. + fn id(&self) -> UnminedTxId { + match self { + GossipedTx::Id(txid) => *txid, + GossipedTx::Tx(tx) => tx.id, + } + } +} + +impl From for GossipedTx { + fn from(txid: UnminedTxId) -> Self { + GossipedTx::Id(txid) + } +} + +impl From for GossipedTx { + fn from(tx: UnminedTx) -> Self { + GossipedTx::Tx(tx) + } +} + /// Represents a [`Stream`] of download and verification tasks. #[pin_project] #[derive(Debug)] @@ -194,11 +222,13 @@ where } } - /// Queue a transaction for download and verification. + /// Queue a transaction for download (if needed) and verification. /// /// Returns the action taken in response to the queue request. - #[instrument(skip(self, txid), fields(txid = %txid))] - pub fn download_and_verify(&mut self, txid: UnminedTxId) -> DownloadAction { + #[instrument(skip(self, gossiped_tx), fields(txid = %gossiped_tx.id()))] + pub fn download_if_needed_and_verify(&mut self, gossiped_tx: GossipedTx) -> DownloadAction { + let txid = gossiped_tx.id(); + if self.cancel_handles.contains_key(&txid) { tracing::debug!( ?txid, @@ -228,7 +258,7 @@ where let mut mempool = self.mempool.clone(); let fut = async move { - Self::should_download(&mut state, &mut mempool, txid).await?; + Self::should_download_or_verify(&mut state, &mut mempool, txid).await?; let height = match state.oneshot(zs::Request::Tip).await { Ok(zs::Response::Tip(None)) => Err("no block at the tip".into()), @@ -238,19 +268,25 @@ where }?; let height = (height + 1).ok_or_else(|| eyre!("no next height"))?; - let tx = if let zn::Response::Transactions(txs) = network - .oneshot(zn::Request::TransactionsById( - std::iter::once(txid).collect(), - )) - .await? - { - txs.into_iter() - .next() - .expect("successful response has the transaction in it") - } else { - unreachable!("wrong response to transaction request"); + let tx = match gossiped_tx { + GossipedTx::Id(txid) => { + let req = zn::Request::TransactionsById(std::iter::once(txid).collect()); + + let tx = match network.oneshot(req).await? { + zn::Response::Transactions(mut txs) => txs + .pop() + .expect("successful response has the transaction in it"), + _ => unreachable!("wrong response to transaction request"), + }; + + metrics::counter!("gossip.downloaded.transaction.count", 1); + tx + } + GossipedTx::Tx(tx) => { + metrics::counter!("gossip.pushed.transaction.count", 1); + tx + } }; - metrics::counter!("gossip.downloaded.transaction.count", 1); let result = verifier .oneshot(tx::Request::Mempool { @@ -302,11 +338,12 @@ where DownloadAction::AddedToQueue } - /// Check if transaction should be downloaded and verified. + /// Check if transaction should be downloaded and/or verified. /// /// If it is already in the mempool (or in its rejected list) - /// or in state, then it shouldn't be downloaded (and an error is returned). - async fn should_download( + /// or in state, then it shouldn't be downloaded/verified + /// (and an error is returned). + async fn should_download_or_verify( state: &mut ZS, mempool: &mut ZM, txid: UnminedTxId,