Handle 'mempool' messages as 'GetMempool' requests

With a 'Transactions' response that gets turned into an 'Inv(Vec<InventoryHash::Tx>)' message.

We don't yet handle a response from our peer for a 'mempool', which will have to be
a more generic 'Inv' type because we might receive transaction hashes we don't know about yet.

Pertains to #26
This commit is contained in:
Deirdre Connolly 2019-11-16 00:08:53 -05:00 committed by Deirdre Connolly
parent 98079c9d77
commit 189d89a7fc
3 changed files with 28 additions and 2 deletions

View File

@ -10,12 +10,14 @@ use tokio::{
timer::{delay_for, Delay}, timer::{delay_for, Delay},
}; };
use tower::Service; use tower::Service;
use zebra_chain::serialization::SerializationError;
use zebra_chain::{serialization::SerializationError, transaction::TransactionHash};
use crate::{ use crate::{
constants, constants,
protocol::{ protocol::{
internal::{Request, Response}, internal::{Request, Response},
inv::InventoryHash,
message::Message, message::Message,
}, },
BoxedStdError, BoxedStdError,
@ -224,6 +226,12 @@ where
.await .await
.map_err(|e| e.into()) .map_err(|e| e.into())
.map(|()| AwaitingResponse(Ping(nonce), tx)), .map(|()| AwaitingResponse(Ping(nonce), tx)),
(AwaitingRequest, GetMempool) => self
.peer_tx
.send(Message::Mempool)
.await
.map_err(|e| e.into())
.map(|()| AwaitingResponse(GetMempool, tx)),
// XXX timeout handling here? // XXX timeout handling here?
} { } {
Ok(new_state) => { Ok(new_state) => {
@ -305,6 +313,7 @@ where
let req = match msg { let req = match msg {
Message::Addr(addrs) => Some(Request::PushPeers(addrs)), Message::Addr(addrs) => Some(Request::PushPeers(addrs)),
Message::GetAddr => Some(Request::GetPeers), Message::GetAddr => Some(Request::GetPeers),
Message::Mempool => Some(Request::GetMempool),
_ => { _ => {
debug!("unhandled message type"); debug!("unhandled message type");
None None
@ -351,6 +360,16 @@ where
self.fail_with(e.into()); self.fail_with(e.into());
} }
} }
Response::Transactions(txs) => {
let hashes = txs
.into_iter()
.map(|tx| InventoryHash::Tx(TransactionHash::from(tx)))
.collect::<Vec<_>>();
if let Err(e) = self.peer_tx.send(Message::Inv(hashes)).await {
self.fail_with(e.into());
}
}
} }
} }
} }

View File

@ -4,6 +4,8 @@
//! responses, so that we have unified types to pass around. No serialization //! responses, so that we have unified types to pass around. No serialization
//! is performed as these are only internal types. //! is performed as these are only internal types.
use zebra_chain::transaction::Transaction;
use crate::meta_addr::MetaAddr; use crate::meta_addr::MetaAddr;
use super::types::Nonce; use super::types::Nonce;
@ -20,6 +22,9 @@ pub enum Request {
// internally for connection management. You should not expect to // internally for connection management. You should not expect to
// be firing or handling `Ping` requests or `Pong` responses. // be firing or handling `Ping` requests or `Pong` responses.
Ping(Nonce), Ping(Nonce),
/// Requests the transactions the remote server has verified but
/// not yet confirmed.
GetMempool,
} }
/// A response to a network request, represented in internal format. /// A response to a network request, represented in internal format.
@ -29,4 +34,6 @@ pub enum Response {
Ok, Ok,
/// A list of peers, used to respond to `GetPeers`. /// A list of peers, used to respond to `GetPeers`.
Peers(Vec<MetaAddr>), Peers(Vec<MetaAddr>),
/// A list of transactions, such as in response to `GetMempool`.
Transactions(Vec<Transaction>),
} }

View File

@ -54,7 +54,7 @@ impl ConnectCmd {
// Connect only to the specified peer. // Connect only to the specified peer.
config.initial_mainnet_peers = vec![self.addr.to_string()]; config.initial_mainnet_peers = vec![self.addr.to_string()];
let (mut peer_set, address_book) = zebra_network::init(config, node).await; let (mut peer_set, _address_book) = zebra_network::init(config, node).await;
info!("waiting for peer_set ready"); info!("waiting for peer_set ready");
peer_set.ready().await.map_err(Error::from_boxed_compat)?; peer_set.ready().await.map_err(Error::from_boxed_compat)?;