diff --git a/zebra-network/src/peer/server.rs b/zebra-network/src/peer/server.rs index 743ca6b6..90e92a64 100644 --- a/zebra-network/src/peer/server.rs +++ b/zebra-network/src/peer/server.rs @@ -10,12 +10,14 @@ use tokio::{ timer::{delay_for, Delay}, }; use tower::Service; -use zebra_chain::serialization::SerializationError; + +use zebra_chain::{serialization::SerializationError, transaction::TransactionHash}; use crate::{ constants, protocol::{ internal::{Request, Response}, + inv::InventoryHash, message::Message, }, BoxedStdError, @@ -224,6 +226,12 @@ where .await .map_err(|e| e.into()) .map(|()| AwaitingResponse(Ping(nonce), tx)), + (AwaitingRequest, GetMempool) => self + .peer_tx + .send(Message::Mempool) + .await + .map_err(|e| e.into()) + .map(|()| AwaitingResponse(GetMempool, tx)), // XXX timeout handling here? } { Ok(new_state) => { @@ -305,6 +313,7 @@ where let req = match msg { Message::Addr(addrs) => Some(Request::PushPeers(addrs)), Message::GetAddr => Some(Request::GetPeers), + Message::Mempool => Some(Request::GetMempool), _ => { debug!("unhandled message type"); None @@ -351,6 +360,16 @@ where self.fail_with(e.into()); } } + Response::Transactions(txs) => { + let hashes = txs + .into_iter() + .map(|tx| InventoryHash::Tx(TransactionHash::from(tx))) + .collect::>(); + + if let Err(e) = self.peer_tx.send(Message::Inv(hashes)).await { + self.fail_with(e.into()); + } + } } } } diff --git a/zebra-network/src/protocol/internal.rs b/zebra-network/src/protocol/internal.rs index 34a1ed22..d7d804f9 100644 --- a/zebra-network/src/protocol/internal.rs +++ b/zebra-network/src/protocol/internal.rs @@ -4,6 +4,8 @@ //! responses, so that we have unified types to pass around. No serialization //! is performed as these are only internal types. +use zebra_chain::transaction::Transaction; + use crate::meta_addr::MetaAddr; use super::types::Nonce; @@ -20,6 +22,9 @@ pub enum Request { // internally for connection management. You should not expect to // be firing or handling `Ping` requests or `Pong` responses. Ping(Nonce), + /// Requests the transactions the remote server has verified but + /// not yet confirmed. + GetMempool, } /// A response to a network request, represented in internal format. @@ -29,4 +34,6 @@ pub enum Response { Ok, /// A list of peers, used to respond to `GetPeers`. Peers(Vec), + /// A list of transactions, such as in response to `GetMempool`. + Transactions(Vec), } diff --git a/zebrad/src/commands/connect.rs b/zebrad/src/commands/connect.rs index e29524b2..225de8b6 100644 --- a/zebrad/src/commands/connect.rs +++ b/zebrad/src/commands/connect.rs @@ -54,7 +54,7 @@ impl ConnectCmd { // Connect only to the specified peer. config.initial_mainnet_peers = vec![self.addr.to_string()]; - let (mut peer_set, address_book) = zebra_network::init(config, node).await; + let (mut peer_set, _address_book) = zebra_network::init(config, node).await; info!("waiting for peer_set ready"); peer_set.ready().await.map_err(Error::from_boxed_compat)?;