diff --git a/zebra-network/Cargo.toml b/zebra-network/Cargo.toml index 12d2d7b8..92210033 100644 --- a/zebra-network/Cargo.toml +++ b/zebra-network/Cargo.toml @@ -13,8 +13,11 @@ rand = "0.7" byteorder = "1.3" chrono = "0.4" failure = "0.1" -tokio = "=0.2.0-alpha.5" +tokio = "=0.2.0-alpha.6" +tower = { git = "https://github.com/tower-rs/tower", branch = "v0.3.x" } tracing = { git = "https://github.com/tokio-rs/tracing" } tracing-futures = { git = "https://github.com/tokio-rs/tracing", features = ["tokio-alpha"], default-features = false } +futures-preview = { version = "=0.3.0-alpha.19", features = ["async-await"] } -zebra-chain = { path = "../zebra-chain" } \ No newline at end of file + +zebra-chain = { path = "../zebra-chain" } diff --git a/zebra-network/src/lib.rs b/zebra-network/src/lib.rs index f0d4035a..2607f796 100644 --- a/zebra-network/src/lib.rs +++ b/zebra-network/src/lib.rs @@ -14,7 +14,8 @@ pub use network::Network; pub mod protocol; -// XXX make this private once connect is removed -pub mod meta_addr; -// XXX make this private once connect is removed +// XXX revisit privacy once we finish encapsulation. pub mod constants; +pub mod meta_addr; +pub mod peer; +pub mod timestamp_collector; diff --git a/zebra-network/src/peer.rs b/zebra-network/src/peer.rs new file mode 100644 index 00000000..8d402dd9 --- /dev/null +++ b/zebra-network/src/peer.rs @@ -0,0 +1,22 @@ +//! Peer handling. + +/// Handles outbound requests from our node to the network. +pub mod client; +/// Asynchronously connects to peers. +pub mod connector; +/// Handles inbound requests from the network to our node. +pub mod server; + +/// An error related to a peer connection. +#[derive(Fail, Debug, Clone)] +pub enum PeerError { + /// Wrapper around `failure::Error` that can be `Clone`. + #[fail(display = "{}", _0)] + Inner(std::sync::Arc), +} + +impl From for PeerError { + fn from(e: failure::Error) -> PeerError { + PeerError::Inner(std::sync::Arc::new(e)) + } +} diff --git a/zebra-network/src/peer/client.rs b/zebra-network/src/peer/client.rs new file mode 100644 index 00000000..47949676 --- /dev/null +++ b/zebra-network/src/peer/client.rs @@ -0,0 +1,82 @@ +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use futures::{ + channel::{mpsc, oneshot}, + future, ready, +}; +use tokio::prelude::*; +use tower::Service; + +use crate::protocol::internal::{Request, Response}; + +use super::{server::ErrorSlot, PeerError}; + +/// The "client" duplex half of a peer connection. +pub struct PeerClient { + pub(super) span: tracing::Span, + pub(super) server_tx: mpsc::Sender, + pub(super) error_slot: ErrorSlot, +} + +/// A message from the `PeerClient` to the `PeerServer`, containing both a +/// request and a return message channel. The reason the return channel is +/// included is because `PeerClient::call` returns a future that may be moved +/// around before it resolves, so the future must have ownership of the channel +/// on which it receives the response. +#[derive(Debug)] +pub(super) struct ClientRequest( + pub(super) Request, + pub(super) oneshot::Sender>, +); + +impl Service for PeerClient { + type Response = Response; + type Error = PeerError; + type Future = Pin>>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + if let Err(_) = ready!(self.server_tx.poll_ready(cx)) { + Poll::Ready(Err(self + .error_slot + .try_get_error() + .expect("failed PeerServers must set their error slot"))) + } else { + Poll::Ready(Ok(())) + } + } + + fn call(&mut self, req: Request) -> Self::Future { + use futures::future::FutureExt; + use tracing_futures::Instrument; + let (tx, rx) = oneshot::channel(); + match self.server_tx.try_send(ClientRequest(req, tx)) { + Err(e) => { + if e.is_disconnected() { + future::ready(Err(self + .error_slot + .try_get_error() + .expect("failed PeerServers must set their error slot"))) + .instrument(self.span.clone()) + .boxed() + } else { + // sending fails when there's not enough + // channel space, but we called poll_ready + panic!("called call without poll_ready"); + } + } + // need a bit of yoga to get result types to align, + // because the oneshot future can error + Ok(()) => rx + .map(|val| match val { + Ok(Ok(rsp)) => Ok(rsp), + Ok(Err(e)) => Err(e), + Err(_) => Err(format_err!("oneshot died").into()), + }) + .instrument(self.span.clone()) + .boxed(), + } + } +} diff --git a/zebra-network/src/peer/connector.rs b/zebra-network/src/peer/connector.rs new file mode 100644 index 00000000..e61c9387 --- /dev/null +++ b/zebra-network/src/peer/connector.rs @@ -0,0 +1,164 @@ +use std::{ + net::SocketAddr, + pin::Pin, + task::{Context, Poll}, +}; + +use chrono::Utc; +use failure::Error; +use futures::channel::mpsc; +use tokio::{codec::Framed, net::TcpStream, prelude::*}; +use tower::Service; +use tracing::{span, Level}; +use tracing_futures::Instrument; + +use zebra_chain::types::BlockHeight; + +use crate::{ + constants, + protocol::{codec::*, internal::*, message::*, types::*}, + timestamp_collector::{PeerLastSeen, TimestampCollector}, + Network, +}; + +use super::{ + client::PeerClient, + server::{ErrorSlot, PeerServer, ServerState}, +}; + +/// A [`Service`] that connects to a remote peer and constructs a client/server pair. +pub struct PeerConnector { + network: Network, + internal_service: S, + sender: mpsc::Sender, +} + +impl PeerConnector +where + S: Service + Clone + Send + 'static, + S::Future: Send, + //S::Error: Into, +{ + /// XXX replace with a builder + pub fn new(network: Network, internal_service: S, collector: &TimestampCollector) -> Self { + let sender = collector.sender_handle(); + PeerConnector { + network, + internal_service, + sender, + } + } +} + +impl Service for PeerConnector +where + S: Service + Clone + Send + 'static, + S::Future: Send, + S::Error: Send, + //S::Error: Into, +{ + type Response = PeerClient; + type Error = Error; + type Future = Pin>>>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + // XXX when this asks a second service for + // an address to connect to, it should call inner.ready + Poll::Ready(Ok(())) + } + + fn call(&mut self, addr: SocketAddr) -> Self::Future { + let connector_span = span!(Level::INFO, "connector", addr = ?addr); + let connection_span = span!(Level::INFO, "peer", addr = ?addr); + + // Clone these upfront, so they can be moved into the future. + let network = self.network.clone(); + let internal_service = self.internal_service.clone(); + let sender = self.sender.clone(); + + let fut = async move { + info!("beginning connection"); + let mut stream = Framed::new( + TcpStream::connect(addr).await?, + Codec::builder().for_network(network).finish(), + ); + + // XXX construct the Version message from a config + let version = Message::Version { + version: constants::CURRENT_VERSION, + services: PeerServices::NODE_NETWORK, + timestamp: Utc::now(), + address_recv: (PeerServices::NODE_NETWORK, addr), + address_from: ( + PeerServices::NODE_NETWORK, + "127.0.0.1:9000".parse().unwrap(), + ), + nonce: Nonce::default(), + user_agent: "Zebra Peer".to_owned(), + start_height: BlockHeight(0), + relay: false, + }; + + stream.send(version).await?; + + let _remote_version = stream + .next() + .await + .ok_or_else(|| format_err!("stream closed during handshake"))??; + + stream.send(Message::Verack).await?; + let _remote_verack = stream + .next() + .await + .ok_or_else(|| format_err!("stream closed during handshake"))??; + + // XXX here is where we would set the version to the minimum of the + // two versions, etc. -- actually is it possible to edit the `Codec` + // after using it to make a framed adapter? + + // Construct a PeerClient, PeerServer pair + + let (tx, rx) = mpsc::channel(0); + let slot = ErrorSlot::default(); + + let client = PeerClient { + span: connection_span.clone(), + server_tx: tx, + error_slot: slot.clone(), + }; + + let (peer_tx, peer_rx) = stream.split(); + + let server = PeerServer { + state: ServerState::AwaitingRequest, + svc: internal_service, + client_rx: rx, + error_slot: slot, + peer_tx, + }; + + let hooked_peer_rx = peer_rx + .then(move |msg| { + let mut sender = sender.clone(); + async move { + if let Ok(_) = msg { + use futures::sink::SinkExt; + let _ = sender.send((addr, Utc::now())).await; + } + msg + } + }) + .boxed(); + + tokio::spawn( + server + .run(hooked_peer_rx) + .instrument(connection_span) + .boxed(), + ); + + Ok(client) + }; + fut.instrument(connector_span).boxed() + } +} diff --git a/zebra-network/src/peer/server.rs b/zebra-network/src/peer/server.rs new file mode 100644 index 00000000..ef4d4359 --- /dev/null +++ b/zebra-network/src/peer/server.rs @@ -0,0 +1,329 @@ +use std::sync::{Arc, Mutex}; + +use failure::Error; +use futures::{ + channel::{mpsc, oneshot}, + stream::Stream, +}; +use tokio::prelude::*; +use tower::Service; + +use crate::protocol::{ + internal::{Request, Response}, + message::Message, +}; + +use super::{client::ClientRequest, PeerError}; + +#[derive(Default, Clone)] +pub(super) struct ErrorSlot(Arc>>); + +impl ErrorSlot { + pub fn try_get_error(&self) -> Option { + self.0 + .lock() + .expect("error mutex should be unpoisoned") + .as_ref() + .map(|e| e.clone()) + } +} + +pub(super) enum ServerState { + /// Awaiting a client request or a peer message. + AwaitingRequest, + /// Awaiting a peer message we can interpret as a client request. + AwaitingResponse(Request, oneshot::Sender>), + /// A failure has occurred and we are shutting down the server. + Failed, +} + +/// The "server" duplex half of a peer connection. +pub struct PeerServer { + pub(super) state: ServerState, + pub(super) svc: S, + pub(super) client_rx: mpsc::Receiver, + /// A slot shared between the PeerServer and PeerClient for storing an error. + pub(super) error_slot: ErrorSlot, + //pub(super) peer_rx: Rx, + pub(super) peer_tx: Tx, +} + +impl PeerServer +where + S: Service, + S::Error: Send, + //S::Error: Into, + Tx: Sink + Unpin, + Tx::Error: Into, +{ + /// Run this peer server to completion. + pub async fn run(mut self, mut peer_rx: Rx) + where + Rx: Stream> + Unpin, + { + // At a high level, the event loop we want is as follows: we check for any + // incoming messages from the remote peer, check if they should be interpreted + // as a response to a pending client request, and if not, interpret them as a + // request from the remote peer to our node. + // + // We also need to handle those client requests in the first place. The client + // requests are received from the corresponding `PeerClient` over a bounded + // channel (with bound 1, to minimize buffering), but there is no relationship + // between the stream of client requests and the stream of peer messages, so we + // cannot ignore one kind while waiting on the other. Moreover, we cannot accept + // a second client request while the first one is still pending. + // + // To do this, we inspect the current request state. + // + // If there is no pending request, we wait on either an incoming peer message or + // an incoming request, whichever comes first. + // + // If there is a pending request, we wait only on an incoming peer message, and + // check whether it can be interpreted as a response to the pending request. + + use futures::future::FutureExt; + use futures::select; + + // This future represents the next message received from the peer. + // It needs to be stored outside of the event loop, so that we can overwrite + // it with the new "next message future" every time we get a new message. + let mut peer_rx_fut = peer_rx.next().fuse(); + loop { + match self.state { + // We're awaiting a client request, so listen for both + // client requests and peer messages simultaneously. + ServerState::AwaitingRequest => select! { + req = self.client_rx.next() => { + match req { + Some(req) => self.handle_client_request(req).await, + None => { + trace!("client_rx closed, shutting down"); + return; + } + } + } + msg = peer_rx_fut => { + peer_rx_fut = peer_rx.next().fuse(); + match msg { + None => { + trace!("peer stream closed, shutting down"); + return; + } + // We got a peer message but it was malformed. + //Some(Err(e)) => self.fail_with(e.into()), + // XXX remove this when we parse all message types + Some(Err(e)) => { + error!(%e); + } + // We got a peer message and it was well-formed. + Some(Ok(msg)) => self.handle_message_as_request(msg).await, + } + } + }, + // We're awaiting a response to a client request, + // so only listen to peer messages, not further requests. + ServerState::AwaitingResponse { .. } => { + let msg = peer_rx_fut.await; + peer_rx_fut = peer_rx.next().fuse(); + match msg { + // The peer channel has closed -- no more messages. + // However, we still need to flush pending client requests. + None => self.fail_with(format_err!("peer closed connection").into()), + // We got a peer message but it was malformed. + //Some(Err(e)) => self.fail_with(e.into()), + // XXX remove this when we parse all message types + Some(Err(e)) => { + error!(%e); + } + // We got a peer message and it was well-formed. + Some(Ok(msg)) => match self.handle_message_as_response(msg) { + None => continue, + Some(msg) => self.handle_message_as_request(msg).await, + }, + } + } + // We've failed, but we need to flush all pending client + // requests before we can return and complete the future. + ServerState::Failed => { + match self.client_rx.next().await { + Some(ClientRequest(_, tx)) => { + let e = self + .error_slot + .try_get_error() + .expect("cannot enter failed state without setting error slot"); + let _ = tx.send(Err(e)); + // Continue until we've errored all queued reqs + continue; + } + None => return, + } + } + } + } + } + + /// Marks the peer as having failed with error `e`. + fn fail_with(&mut self, e: PeerError) { + trace!(%e, "failing peer service with error"); + // Update the shared error slot + let mut guard = self + .error_slot + .0 + .lock() + .expect("mutex should be unpoisoned"); + if guard.is_some() { + panic!("called fail_with on already-failed server state"); + } else { + *guard = Some(e); + } + // Drop the guard immediately to release the mutex. + std::mem::drop(guard); + + // We want to close the client channel and set ServerState::Failed so + // that we can flush any pending client requests. However, we may have + // an outstanding client request in ServerState::AwaitingResponse, so + // we need to deal with it first if it exists. + self.client_rx.close(); + let old_state = std::mem::replace(&mut self.state, ServerState::Failed); + if let ServerState::AwaitingResponse(_, tx) = old_state { + // We know the slot has Some(e) because we just set it above, + // and the error slot is never unset. + let e = self.error_slot.try_get_error().unwrap(); + let _ = tx.send(Err(e)); + } + } + + /// Handle an incoming client request, possibly generating outgoing messages to the + /// remote peer. + async fn handle_client_request(&mut self, msg: ClientRequest) { + trace!(?msg); + use Request::*; + use ServerState::*; + let ClientRequest(req, tx) = msg; + + // Inner match returns Result with the new state or an error. + // Outer match updates state or fails. + match match (&self.state, req) { + (Failed, _) => panic!("failed service cannot handle requests"), + (AwaitingResponse { .. }, _) => panic!("tried to update pending request"), + (AwaitingRequest, GetPeers) => self + .peer_tx + .send(Message::GetAddr) + .await + .map_err(|e| e.into().into()) + .map(|()| AwaitingResponse(GetPeers, tx)), + (AwaitingRequest, PushPeers(addrs)) => self + .peer_tx + .send(Message::Addr(addrs)) + .await + .map_err(|e| e.into().into()) + .map(|()| { + // PushPeers does not have a response, so we return OK as + // soon as we send the request. Sending through a oneshot + // can only fail if the rx end is dropped before calling + // send, which we can safely ignore (the request future was + // cancelled). + let _ = tx.send(Ok(Response::Ok)); + AwaitingRequest + }), + } { + Ok(new_state) => self.state = new_state, + Err(e) => self.fail_with(e), + } + } + + /// Try to handle `msg` as a response to a client request, possibly consuming + /// it in the process. + /// + /// Taking ownership of the message means that we can pass ownership of its + /// contents to responses without additional copies. If the message is not + /// interpretable as a response, we return ownership to the caller. + fn handle_message_as_response(&mut self, msg: Message) -> Option { + trace!(?msg); + // This function is where we statefully interpret Bitcoin/Zcash messages + // into responses to messages in the internal request/response protocol. + // This conversion is done by a sequence of (request, message) match arms, + // each of which contains the conversion logic for that pair. + use Request::*; + use ServerState::*; + let mut ignored_msg = None; + // We want to be able to consume the state, but it's behind a mutable + // reference, so we can't move it out of self without swapping in a + // placeholder, even if we immediately overwrite the placeholder. + let tmp_state = std::mem::replace(&mut self.state, AwaitingRequest); + self.state = match (tmp_state, msg) { + (AwaitingResponse(GetPeers, tx), Message::Addr(addrs)) => { + tx.send(Ok(Response::Peers(addrs))) + .expect("response oneshot should be unused"); + AwaitingRequest + } + // By default, messages are not responses. + (state, msg) => { + ignored_msg = Some(msg); + state + } + }; + + ignored_msg + } + + async fn handle_message_as_request(&mut self, msg: Message) { + trace!(?msg); + // These messages are transport-related, handle them separately: + match msg { + Message::Version { .. } => { + self.fail_with(format_err!("got version message after handshake").into()); + return; + } + Message::Verack { .. } => { + self.fail_with(format_err!("got verack message after handshake").into()); + return; + } + Message::Ping(nonce) => { + match self.peer_tx.send(Message::Pong(nonce)).await { + Ok(()) => {} + Err(e) => self.fail_with(e.into().into()), + } + return; + } + _ => {} + } + + // Interpret `msg` as a request from the remote peer to our node, + // and try to construct an appropriate request object. + let req = match msg { + Message::Addr(addrs) => Some(Request::PushPeers(addrs)), + _ => None, + }; + + match req { + Some(req) => self.drive_peer_request(req).await, + None => {} + } + } + + /// Given a `req` originating from the peer, drive it to completion and send + /// any appropriate messages to the remote peer. If an error occurs while + /// processing the request (e.g., the service is shedding load), then we call + /// fail_with to terminate the entire peer connection, shrinking the number + /// of connected peers. + async fn drive_peer_request(&mut self, req: Request) { + trace!(?req); + use tower::ServiceExt; + // XXX Drop the errors on the floor for now so that + // we can ignore error type alignment + match self.svc.ready().await { + Err(_) => self.fail_with(format_err!("svc err").into()), + Ok(()) => {} + } + match self.svc.call(req).await { + Err(_) => self.fail_with(format_err!("svc err").into()), + Ok(Response::Ok) => { /* generic success, do nothing */ } + Ok(Response::Peers(addrs)) => { + if let Err(e) = self.peer_tx.send(Message::Addr(addrs)).await { + self.fail_with(e.into().into()); + } + } + } + } +} diff --git a/zebra-network/src/protocol.rs b/zebra-network/src/protocol.rs index 24cbfb04..38ac65db 100644 --- a/zebra-network/src/protocol.rs +++ b/zebra-network/src/protocol.rs @@ -5,3 +5,10 @@ pub mod message; pub mod types; pub mod inv; + +// XXX at some later point the above should move to an `external` submodule, so +// that we have +// - protocol::external::{all_bitcoin_zcash_types}; +// - protocol::internal::{all_internal_req_rsp_types}; + +pub mod internal; diff --git a/zebra-network/src/protocol/codec.rs b/zebra-network/src/protocol/codec.rs index c1f47e05..327f4ab1 100644 --- a/zebra-network/src/protocol/codec.rs +++ b/zebra-network/src/protocol/codec.rs @@ -98,7 +98,6 @@ impl Encoder for Codec { type Item = Message; type Error = Error; - #[instrument(skip(src))] fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { // XXX(HACK): this is inefficient and does an extra allocation. // instead, we should have a size estimator for the message, reserve @@ -136,7 +135,7 @@ impl Encoder for Codec { FilterClear { .. } => b"filterclear\0", MerkleBlock { .. } => b"merkleblock\0", }; - trace!(?command, len = body.len()); + trace!(?item, len = body.len()); // XXX this should write directly into the buffer, // but leave it for now until we fix the issue above. @@ -237,7 +236,6 @@ impl Decoder for Codec { type Item = Message; type Error = Error; - #[instrument(skip(src))] fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { match self.state { DecodeState::Head => { @@ -329,7 +327,7 @@ impl Decoder for Codec { // We need Ok(Some(msg)) to signal that we're done decoding. // This is also convenient for tracing the parse result. .map(|msg| { - trace!(?msg); + trace!("finished message decoding"); Some(msg) }) } diff --git a/zebra-network/src/protocol/internal.rs b/zebra-network/src/protocol/internal.rs new file mode 100644 index 00000000..64bd9e49 --- /dev/null +++ b/zebra-network/src/protocol/internal.rs @@ -0,0 +1,25 @@ +//! Message types for the internal request/response protocol. +//! +//! These are currently defined just as enums with all possible requests and +//! responses, so that we have unified types to pass around. No serialization +//! is performed as these are only internal types. + +use crate::meta_addr::MetaAddr; + +/// A network request, represented in internal format. +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum Request { + /// Requests additional peers from the server. + GetPeers, + /// Advertises peers to the remote server. + PushPeers(Vec), +} + +/// A response to a network request, represented in internal format. +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum Response { + /// Generic success. + Ok, + /// A list of peers, used to respond to `GetPeers`. + Peers(Vec), +} diff --git a/zebra-network/src/timestamp_collector.rs b/zebra-network/src/timestamp_collector.rs new file mode 100644 index 00000000..30efd560 --- /dev/null +++ b/zebra-network/src/timestamp_collector.rs @@ -0,0 +1,120 @@ +//! Management of peer liveness / last-seen information. + +use std::{ + collections::{BTreeMap, HashMap}, + net::SocketAddr, + sync::{Arc, Mutex}, +}; + +use chrono::{DateTime, Utc}; +use futures::channel::mpsc; +use tokio::prelude::*; + +/// A type alias for a timestamp event sent to a `TimestampCollector`. +pub(crate) type PeerLastSeen = (SocketAddr, DateTime); + +/// Maintains a lookup table from peer addresses to last-seen times. +/// +/// On creation, the `TimestampCollector` spawns a worker task to process new +/// timestamp events. The resulting `TimestampCollector` can be cloned, and the +/// worker task and state are shared among all of the clones. +/// +/// XXX add functionality for querying the timestamp data +#[derive(Clone, Debug)] +pub struct TimestampCollector { + // We do not expect mutex contention to be a problem, because + // the dominant accessor is the collector worker, and it has a long + // event buffer to hide latency if other tasks block it temporarily. + data: Arc>, + shutdown: Arc, + worker_tx: mpsc::Sender, +} + +#[derive(Default, Debug)] +struct TimestampData { + by_addr: HashMap>, + by_time: BTreeMap, SocketAddr>, +} + +impl TimestampData { + fn update(&mut self, event: PeerLastSeen) { + use std::collections::hash_map::Entry; + let (addr, timestamp) = event; + trace!(?addr, ?timestamp); + match self.by_addr.entry(addr) { + Entry::Occupied(mut entry) => { + let last_timestamp = entry.get(); + self.by_time + .remove(last_timestamp) + .expect("cannot have by_addr entry without by_time entry"); + entry.insert(timestamp); + self.by_time.insert(timestamp, addr); + } + Entry::Vacant(entry) => { + entry.insert(timestamp); + self.by_time.insert(timestamp, addr); + } + } + } +} + +impl TimestampCollector { + /// Constructs a new `TimestampCollector`, spawning a worker task to process updates. + pub fn new() -> TimestampCollector { + let data = Arc::new(Mutex::new(TimestampData::default())); + // We need to make a copy now so we can move data into the async block. + let data2 = data.clone(); + + const TIMESTAMP_WORKER_BUFFER_SIZE: usize = 100; + let (worker_tx, mut worker_rx) = mpsc::channel(TIMESTAMP_WORKER_BUFFER_SIZE); + let (shutdown_tx, mut shutdown_rx) = mpsc::channel(0); + + // Construct and then spawn a worker. + let worker = async move { + use futures::select; + loop { + select! { + _ = shutdown_rx.next() => return, + msg = worker_rx.next() => { + match msg { + Some(event) => { + data2 + .lock() + .expect("mutex should be unpoisoned") + .update(event) + } + None => return, + } + } + } + } + }; + tokio::spawn(worker.boxed()); + + TimestampCollector { + data, + worker_tx, + shutdown: Arc::new(ShutdownSignal { tx: shutdown_tx }), + } + } + + pub(crate) fn sender_handle(&self) -> mpsc::Sender { + self.worker_tx.clone() + } +} + +/// Sends a signal when dropped. +#[derive(Debug)] +struct ShutdownSignal { + /// Sending () signals that the task holding the rx end should terminate. + /// + /// This should really be a oneshot but calling a oneshot consumes it, + /// and we can't move out of self in Drop. + tx: mpsc::Sender<()>, +} + +impl Drop for ShutdownSignal { + fn drop(&mut self) { + self.tx.try_send(()).expect("tx is only used in drop"); + } +} diff --git a/zebrad/Cargo.toml b/zebrad/Cargo.toml index 8c59902f..f0a9306b 100644 --- a/zebrad/Cargo.toml +++ b/zebrad/Cargo.toml @@ -12,18 +12,16 @@ gumdrop = "0.6" lazy_static = "1" serde = { version = "1", features = ["serde_derive"] } toml = "0.5" -tokio = "=0.2.0-alpha.5" +tokio = "=0.2.0-alpha.6" # Replace with git to pick up instrument derive changes, revert on release. #tracing = "0.1" tracing = { git = "https://github.com/tokio-rs/tracing" } tracing-futures = { git = "https://github.com/tokio-rs/tracing", features = ["tokio-alpha"], default-features = false } tracing-subscriber = { git = "https://github.com/tokio-rs/tracing" } tracing-log = { git = "https://github.com/tokio-rs/tracing" } -# Can't use published alpha because of conflicts tracking pin-project alphas -#hyper = "=0.13.0-alpha.1" -hyper = { git = "https://github.com/hyperium/hyper" } -futures-core-preview = { version = "=0.3.0-alpha.18" } -futures-util-preview = { version = "=0.3.0-alpha.18" } +hyper = { git = "https://github.com/hyperium/hyper"} +tower = { git = "https://github.com/tower-rs/tower", branch = "v0.3.x" } +futures-preview = { version = "=0.3.0-alpha.19", features = ["async-await"] } zebra-chain = { path = "../zebra-chain" } zebra-network = { path = "../zebra-network" } diff --git a/zebrad/src/commands/connect.rs b/zebrad/src/commands/connect.rs index f6719fe5..999874c5 100644 --- a/zebrad/src/commands/connect.rs +++ b/zebrad/src/commands/connect.rs @@ -26,7 +26,7 @@ impl Runnable for ConnectCmd { // Combine the connect future with an infinite wait // so that the program has to be explicitly killed and // won't die before all tracing messages are written. - let fut = futures_util::future::join( + let fut = futures::future::join( async { match self.connect().await { Ok(()) => {} @@ -51,64 +51,40 @@ impl Runnable for ConnectCmd { impl ConnectCmd { async fn connect(&self) -> Result<(), failure::Error> { - use chrono::Utc; - use tokio::{codec::Framed, net::TcpStream, prelude::*}; - - use zebra_chain::types::BlockHeight; use zebra_network::{ - constants, - protocol::{codec::*, message::*, types::*}, + peer::connector::PeerConnector, + protocol::internal::{Request, Response}, + timestamp_collector::TimestampCollector, Network, }; - info!("connecting"); + info!("begin tower-based peer handling test stub"); - let mut stream = Framed::new( - TcpStream::connect(self.addr).await?, - Codec::builder().for_network(Network::Mainnet).finish(), + use tower::{buffer::Buffer, service_fn, Service, ServiceExt}; + + let node = Buffer::new( + service_fn(|req| { + async move { + info!(?req); + Ok::(Response::Ok) + } + }), + 1, ); - let version = Message::Version { - version: constants::CURRENT_VERSION, - services: PeerServices::NODE_NETWORK, - timestamp: Utc::now(), - address_recv: (PeerServices::NODE_NETWORK, self.addr), - // We just make something up because at this stage the `connect` command - // doesn't run a server or anything -- will the zcashd respond on the - // same tcp connection or try to open one to the bogus address below? - address_from: ( - PeerServices::NODE_NETWORK, - "127.0.0.1:9000".parse().unwrap(), - ), - nonce: Nonce(1), - user_agent: "Zebra Connect".to_owned(), - start_height: BlockHeight(0), - relay: false, - }; + let collector = TimestampCollector::new(); - info!(version = ?version); + let mut pc = PeerConnector::new(Network::Mainnet, node, &collector); + // no need to call ready because pc is always ready + let mut client = pc.call(self.addr.clone()).await?; - stream.send(version).await?; + client.ready().await?; + let rsp = client.call(Request::GetPeers).await?; + info!(?rsp); - let resp_version: Message = stream.next().await.expect("expected data")?; - - info!(resp_version = ?resp_version); - - stream.send(Message::Verack).await?; - - let resp_verack = stream.next().await.expect("expected data")?; - info!(resp_verack = ?resp_verack); - - while let Some(maybe_msg) = stream.next().await { - match maybe_msg { - Ok(msg) => match msg { - Message::Ping(nonce) => { - stream.send(Message::Pong(nonce)).await?; - } - _ => warn!("Unknown message"), - }, - Err(e) => error!("{}", e), - }; + loop { + // empty loop ensures we don't exit the application, + // and this is throwaway code } Ok(())