diff --git a/zebra-network/src/peer/connection.rs b/zebra-network/src/peer/connection.rs index baf4da96..7b4350db 100644 --- a/zebra-network/src/peer/connection.rs +++ b/zebra-network/src/peer/connection.rs @@ -1,3 +1,4 @@ +use std::collections::HashSet; use std::sync::Arc; use futures::{ @@ -9,12 +10,15 @@ use futures::{ use tokio::time::{delay_for, Delay}; use tower::Service; -use zebra_chain::{serialization::SerializationError, transaction::TransactionHash}; +use zebra_chain::{ + block::{Block, BlockHeaderHash}, + serialization::SerializationError, +}; use crate::{ constants, protocol::{ - external::{InventoryHash, Message}, + external::{types::Nonce, Message}, internal::{Request, Response}, }, BoxedStdError, @@ -22,11 +26,77 @@ use crate::{ use super::{ClientRequest, ErrorSlot, PeerError, SharedPeerError}; +pub(super) enum Handler { + /// Indicates that the handler has finished processing the request. + Finished(Result), + Ping(Nonce), + GetPeers, + GetBlocksByHash { + hashes: HashSet, + blocks: Vec, + }, +} + +impl Handler { + /// Try to handle `msg` as a response to a client request, possibly consuming + /// it in the process. + /// + /// Taking ownership of the message means that we can pass ownership of its + /// contents to responses without additional copies. If the message is not + /// interpretable as a response, we return ownership to the caller. + fn process_message(&mut self, msg: Message) -> Option { + trace!(?msg); + // This function is where we statefully interpret Bitcoin/Zcash messages + // into responses to messages in the internal request/response protocol. + // This conversion is done by a sequence of (request, message) match arms, + // each of which contains the conversion logic for that pair. + use Handler::*; + let mut ignored_msg = None; + // XXX can this be avoided? + let tmp_state = std::mem::replace(self, Finished(Ok(Response::Ok))); + *self = match (tmp_state, msg) { + (Ping(req_nonce), Message::Pong(rsp_nonce)) => { + if req_nonce == rsp_nonce { + Finished(Ok(Response::Ok)) + } else { + Ping(req_nonce) + } + } + (GetPeers, Message::Addr(addrs)) => Finished(Ok(Response::Peers(addrs))), + ( + GetBlocksByHash { + mut hashes, + mut blocks, + }, + Message::Block(block), + ) => { + if hashes.remove(&BlockHeaderHash::from(block.as_ref())) { + blocks.push(*block); + if hashes.is_empty() { + Finished(Ok(Response::Blocks(blocks))) + } else { + GetBlocksByHash { hashes, blocks } + } + } else { + Finished(Err(Arc::new(PeerError::WrongBlock).into())) + } + } + // By default, messages are not responses. + (state, msg) => { + ignored_msg = Some(msg); + state + } + }; + + ignored_msg + } +} + pub(super) enum State { /// Awaiting a client request or a peer message. AwaitingRequest, /// Awaiting a peer message we can interpret as a client request. - AwaitingResponse(Request, oneshot::Sender>), + AwaitingResponse(Handler, oneshot::Sender>), /// A failure has occurred and we are shutting down the connection. Failed, } @@ -84,9 +154,7 @@ where Either::Left((None, _)) => { self.fail_with(PeerError::ConnectionClosed); } - // XXX switch back to hard failure when we parse all message types - //Either::Left((Some(Err(e)), _)) => self.fail_with(e.into()), - Either::Left((Some(Err(e)), _)) => error!(%e), + Either::Left((Some(Err(e)), _)) => self.fail_with(e.into()), Either::Left((Some(Ok(msg)), _)) => { self.handle_message_as_request(msg).await } @@ -106,29 +174,53 @@ where .expect("timeout must be set while awaiting response"); match future::select(peer_rx.next(), timer_ref).await { Either::Left((None, _)) => self.fail_with(PeerError::ConnectionClosed), - // XXX switch back to hard failure when we parse all message types - //Either::Left((Some(Err(e)), _)) => self.fail_with(e.into()), - Either::Left((Some(Err(peer_err)), _timer)) => error!(%peer_err), + Either::Left((Some(Err(e)), _)) => self.fail_with(e.into()), Either::Left((Some(Ok(peer_msg)), _timer)) => { - match self.handle_message_as_response(peer_msg) { - None => continue, - Some(msg) => self.handle_message_as_request(msg).await, + // Try to process the message using the handler. + // This extremely awkward construction avoids + // keeping a live reference to handler across the + // call to handle_message_as_request, which takes + // &mut self. This is a sign that we don't properly + // factor the state required for inbound and + // outbound requests. + let request_msg = match self.state { + State::AwaitingResponse(ref mut handler, _) => { + handler.process_message(peer_msg) + } + _ => unreachable!(), + }; + // If the message was not consumed, check whether it + // should be handled as a request. + if let Some(msg) = request_msg { + self.handle_message_as_request(msg).await; + } else { + // Otherwise, check whether the handler is finished + // processing messages and update the state. + self.state = match self.state { + State::AwaitingResponse(Handler::Finished(response), tx) => { + let _ = tx.send(response); + State::AwaitingRequest + } + pending @ State::AwaitingResponse(_, _) => pending, + _ => unreachable!(), + }; } } Either::Right(((), _peer_fut)) => { trace!("client request timed out"); - // Re-matching lets us take ownership of tx + let e = PeerError::ClientRequestTimeout; self.state = match self.state { - State::AwaitingResponse(Request::Ping(_), _) => { - self.fail_with(PeerError::ClientRequestTimeout); + // Special case: ping timeouts fail the connection. + State::AwaitingResponse(Handler::Ping(_), _) => { + self.fail_with(e); State::Failed } + // Other request timeouts fail the request. State::AwaitingResponse(_, tx) => { - let e = PeerError::ClientRequestTimeout; let _ = tx.send(Err(Arc::new(e).into())); State::AwaitingRequest } - _ => panic!("unreachable"), + _ => unreachable!(), }; } } @@ -197,12 +289,12 @@ where match match (&self.state, req) { (Failed, _) => panic!("failed connection cannot handle requests"), (AwaitingResponse { .. }, _) => panic!("tried to update pending request"), - (AwaitingRequest, GetPeers) => self + (AwaitingRequest, Peers) => self .peer_tx .send(Message::GetAddr) .await .map_err(|e| e.into()) - .map(|()| AwaitingResponse(GetPeers, tx)), + .map(|()| AwaitingResponse(Handler::GetPeers, tx)), (AwaitingRequest, PushPeers(addrs)) => self .peer_tx .send(Message::Addr(addrs)) @@ -222,14 +314,23 @@ where .send(Message::Ping(nonce)) .await .map_err(|e| e.into()) - .map(|()| AwaitingResponse(Ping(nonce), tx)), - (AwaitingRequest, GetMempool) => self + .map(|()| AwaitingResponse(Handler::Ping(nonce), tx)), + (AwaitingRequest, BlocksByHash(hashes)) => self .peer_tx - .send(Message::Mempool) + .send(Message::GetData( + hashes.iter().map(|h| (*h).into()).collect(), + )) .await .map_err(|e| e.into()) - .map(|()| AwaitingResponse(GetMempool, tx)), - // XXX timeout handling here? + .map(|()| { + AwaitingResponse( + Handler::GetBlocksByHash { + blocks: Vec::with_capacity(hashes.len()), + hashes, + }, + tx, + ) + }), } { Ok(new_state) => { self.state = new_state; @@ -239,68 +340,6 @@ where } } - /// Try to handle `msg` as a response to a client request, possibly consuming - /// it in the process. - /// - /// Taking ownership of the message means that we can pass ownership of its - /// contents to responses without additional copies. If the message is not - /// interpretable as a response, we return ownership to the caller. - fn handle_message_as_response(&mut self, msg: Message) -> Option { - trace!(?msg); - // This function is where we statefully interpret Bitcoin/Zcash messages - // into responses to messages in the internal request/response protocol. - // This conversion is done by a sequence of (request, message) match arms, - // each of which contains the conversion logic for that pair. - use Request::*; - use State::*; - let mut ignored_msg = None; - // We want to be able to consume the state, but it's behind a mutable - // reference, so we can't move it out of self without swapping in a - // placeholder, even if we immediately overwrite the placeholder. - let tmp_state = std::mem::replace(&mut self.state, AwaitingRequest); - self.state = match (tmp_state, msg) { - (AwaitingResponse(GetPeers, tx), Message::Addr(addrs)) => { - tx.send(Ok(Response::Peers(addrs))) - .expect("response oneshot should be unused"); - AwaitingRequest - } - // In this special case, we ignore tx, because we handle ping/pong - // messages internally; the "shadow client" only serves to generate - // outbound pings for us to process. - (AwaitingResponse(Ping(req_nonce), _tx), Message::Pong(res_nonce)) => { - if req_nonce != res_nonce { - self.fail_with(PeerError::HeartbeatNonceMismatch); - } - AwaitingRequest - } - ( - AwaitingResponse(_, tx), - Message::Reject { - message, - ccode, - reason, - data, - }, - ) => { - tx.send(Err(SharedPeerError::from(Arc::new(PeerError::Rejected)))) - .expect("response oneshot should be unused"); - - error!( - "{:?} message rejected: {:?}, {:?}, {:?}", - message, ccode, reason, data - ); - AwaitingRequest - } - // By default, messages are not responses. - (state, msg) => { - ignored_msg = Some(msg); - state - } - }; - - ignored_msg - } - async fn handle_message_as_request(&mut self, msg: Message) { trace!(?msg); // These messages are transport-related, handle them separately: @@ -339,8 +378,7 @@ where // and try to construct an appropriate request object. let req = match msg { Message::Addr(addrs) => Some(Request::PushPeers(addrs)), - Message::GetAddr => Some(Request::GetPeers), - Message::Mempool => Some(Request::GetMempool), + Message::GetAddr => Some(Request::Peers), _ => { debug!("unhandled message type"); None @@ -391,14 +429,12 @@ where self.fail_with(e.into()); } } - Response::Transactions(txs) => { - let hashes = txs - .into_iter() - .map(|tx| InventoryHash::Tx(TransactionHash::from(tx))) - .collect::>(); - - if let Err(e) = self.peer_tx.send(Message::Inv(hashes)).await { - self.fail_with(e.into()); + Response::Blocks(blocks) => { + // Generate one block message per block. + for block in blocks.into_iter() { + if let Err(e) = self.peer_tx.send(Message::Block(Box::new(block))).await { + self.fail_with(e.into()); + } } } } diff --git a/zebra-network/src/peer/error.rs b/zebra-network/src/peer/error.rs index 734da959..f5cb15f1 100644 --- a/zebra-network/src/peer/error.rs +++ b/zebra-network/src/peer/error.rs @@ -2,7 +2,7 @@ use std::sync::{Arc, Mutex}; use thiserror::Error; -use zebra_chain::serialization::SerializationError; +use zebra_chain::{serialization::SerializationError}; /// A wrapper around `Arc` that implements `Error`. #[derive(Error, Debug, Clone)] @@ -33,9 +33,6 @@ pub enum PeerError { /// already complete. #[error("Remote peer sent handshake messages after handshake")] DuplicateHandshake, - /// A badly-behaved remote peer sent the wrong nonce in response to a heartbeat `Ping`. - #[error("Remote peer sent the wrong heartbeat nonce")] - HeartbeatNonceMismatch, /// This node's internal services were overloaded, so the connection was dropped /// to shed load. #[error("Internal services over capacity")] @@ -52,6 +49,9 @@ pub enum PeerError { // appropriate error when a `Reject` message is received. #[error("Received a Reject message")] Rejected, + /// The remote peer responded with a block we didn't ask for. + #[error("Remote peer responded with a block we didn't ask for.")] + WrongBlock, } #[derive(Default, Clone)] diff --git a/zebra-network/src/peer_set/candidate_set.rs b/zebra-network/src/peer_set/candidate_set.rs index a7a29744..0b4b1408 100644 --- a/zebra-network/src/peer_set/candidate_set.rs +++ b/zebra-network/src/peer_set/candidate_set.rs @@ -108,7 +108,7 @@ where let mut responses = FuturesUnordered::new(); for _ in 0..2usize { self.peer_service.ready().await?; - responses.push(self.peer_service.call(Request::GetPeers)); + responses.push(self.peer_service.call(Request::Peers)); } while let Some(rsp) = responses.next().await { if let Ok(Response::Peers(addrs)) = rsp { diff --git a/zebra-network/src/protocol/internal/request.rs b/zebra-network/src/protocol/internal/request.rs index bfeb5bff..0156aab4 100644 --- a/zebra-network/src/protocol/internal/request.rs +++ b/zebra-network/src/protocol/internal/request.rs @@ -1,20 +1,33 @@ +use std::collections::HashSet; + use crate::meta_addr::MetaAddr; +use zebra_chain::block::BlockHeaderHash; use super::super::types::Nonce; /// A network request, represented in internal format. -#[derive(Clone, Debug, Eq, PartialEq)] +#[derive(Clone, Debug)] pub enum Request { /// Requests additional peers from the server. - GetPeers, + Peers, /// Advertises peers to the remote server. + // XXX potentially remove this -- we don't use it? PushPeers(Vec), /// Heartbeats triggered on peer connection start. - // This is included as a bit of a hack, it should only be used - // internally for connection management. You should not expect to - // be firing or handling `Ping` requests or `Pong` responses. + /// + /// This is included as a bit of a hack, it should only be used + /// internally for connection management. You should not expect to + /// be firing or handling `Ping` requests or `Pong` responses. Ping(Nonce), - /// Requests the transactions the remote server has verified but - /// not yet confirmed. - GetMempool, + /// Request block data by block hashes. + /// + /// This uses a `HashSet` rather than a `Vec` for two reasons. First, it + /// automatically deduplicates the requested blocks. Second, the internal + /// protocol translator needs to maintain a `HashSet` anyways, in order to + /// keep track of which requested blocks have been received and when the + /// request is ready. Rather than force the internals to always convert into + /// a `HashSet`, we require the caller to pass one, so that if the caller + /// didn't start with a `Vec` but with, e.g., an iterator, they can collect + /// directly into a `HashSet` and save work. + BlocksByHash(HashSet), } diff --git a/zebra-network/src/protocol/internal/response.rs b/zebra-network/src/protocol/internal/response.rs index 8441e507..13dd6a76 100644 --- a/zebra-network/src/protocol/internal/response.rs +++ b/zebra-network/src/protocol/internal/response.rs @@ -1,12 +1,12 @@ use std::error::Error; // XXX clean module layout of zebra_chain -use zebra_chain::transaction::Transaction; +use zebra_chain::block::Block; use crate::meta_addr::MetaAddr; /// A response to a network request, represented in internal format. -#[derive(Clone, Debug, Eq, PartialEq)] +#[derive(Clone, Debug)] pub enum Response { /// Generic success. Ok, @@ -14,8 +14,8 @@ pub enum Response { Error, /// A list of peers, used to respond to `GetPeers`. Peers(Vec), - /// A list of transactions, such as in response to `GetMempool`. - Transactions(Vec), + /// A list of blocks. + Blocks(Vec), } impl From for Response diff --git a/zebrad/src/commands/connect.rs b/zebrad/src/commands/connect.rs index 038f6011..de77460e 100644 --- a/zebrad/src/commands/connect.rs +++ b/zebrad/src/commands/connect.rs @@ -43,7 +43,7 @@ impl Runnable for ConnectCmd { impl ConnectCmd { async fn connect(&self) -> Result<(), Error> { - use zebra_network::{AddressBook, Request, Response}; + use zebra_network::{Request, Response}; info!("begin tower-based peer handling test stub"); use tower::{buffer::Buffer, service_fn, Service, ServiceExt}; @@ -70,44 +70,48 @@ impl ConnectCmd { .await .map_err(|e| Error::from(ErrorKind::Io.context(e)))?; - info!("peer_set became ready, constructing addr requests"); + info!("peer_set became ready"); - use futures::stream::{FuturesUnordered, StreamExt}; + peer_set.ready().await.unwrap(); - let mut addr_reqs = FuturesUnordered::new(); - for i in 0..10usize { - info!(i, "awaiting peer_set ready"); - peer_set - .ready() - .await - .map_err(|e| Error::from(ErrorKind::Io.context(e)))?; - info!(i, "calling peer_set"); - addr_reqs.push(peer_set.call(Request::GetPeers)); + use zebra_chain::block::BlockHeaderHash; + use zebra_chain::serialization::ZcashDeserialize; + let hash_415000 = BlockHeaderHash::zcash_deserialize( + &[ + 104, 97, 133, 175, 186, 67, 219, 26, 10, 37, 145, 232, 63, 170, 25, 37, 8, 250, 47, + 43, 38, 113, 231, 60, 121, 55, 171, 1, 0, 0, 0, 0, + ][..], + ) + .unwrap(); + let rsp = peer_set + .call(Request::BlocksByHash( + std::iter::once(hash_415000).collect(), + )) + .await; + + info!(?rsp); + + let block_415000 = if let Ok(Response::Blocks(blocks)) = rsp { + blocks[0].clone() + } else { + panic!("did not get block"); + }; + + let hash_414999 = block_415000.header.previous_block_hash; + + let two_blocks = + Request::BlocksByHash([hash_415000, hash_414999].iter().cloned().collect()); + info!(?two_blocks); + peer_set.ready().await.unwrap(); + let mut rsp = peer_set.call(two_blocks.clone()).await; + info!(?rsp); + while let Err(_) = rsp { + info!("retry"); + peer_set.ready().await.unwrap(); + rsp = peer_set.call(two_blocks.clone()).await; + info!(?rsp); } - use tracing::Level; - let mut all_addrs = AddressBook::new(span!(Level::TRACE, "connect stub addressbook")); - while let Some(Ok(Response::Peers(addrs))) = addr_reqs.next().await { - info!(addrs.len = addrs.len(), "got address response"); - - let prev_count = all_addrs.peers().count(); - all_addrs.extend(addrs.into_iter()); - let count = all_addrs.peers().count(); - info!( - new_addrs = count - prev_count, - count, "added addrs to addressbook" - ); - } - - let addrs = all_addrs.drain_newest().collect::>(); - - info!(addrs.len = addrs.len(), ab.len = all_addrs.peers().count()); - let mut head = Vec::new(); - head.extend_from_slice(&addrs[0..5]); - let mut tail = Vec::new(); - tail.extend_from_slice(&addrs[addrs.len() - 5..]); - info!(addrs.first = ?head, addrs.last = ?tail); - let eternity = future::pending::<()>(); eternity.await; diff --git a/zebrad/src/commands/seed.rs b/zebrad/src/commands/seed.rs index fc5210da..698ac352 100644 --- a/zebrad/src/commands/seed.rs +++ b/zebrad/src/commands/seed.rs @@ -72,7 +72,7 @@ impl Service for SeedService { }; let response = match req { - Request::GetPeers => { + Request::Peers => { // Collect a list of known peers from the address book // and sanitize their timestamps. let mut peers = address_book