Initial tower-based peer implementation. (#17)

Add a tower-based peer implementation.  

Tower provides middleware for request-response oriented protocols, while Bitcoin/Zcash just send messages which could be interpreted either as requests or responses, depending on context.  To bridge this mismatch we define our own internal request/response protocol, and implement a per-peer event loop that scans incoming messages and interprets them either as requests from the remote peer to our node, or as responses to requests we made previously.  This is performed by the `PeerService` task, and a corresponding `PeerClient: tower::Service` can send it requests.  These tasks are themselves created by a `PeerConnector: tower::Service` which dials a remote peer and performs a handshake.
This commit is contained in:
Henry de Valence 2019-10-07 15:36:16 -07:00 committed by GitHub
parent 5939857fbb
commit ed608f7231
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 789 additions and 64 deletions

View File

@ -13,8 +13,11 @@ rand = "0.7"
byteorder = "1.3"
chrono = "0.4"
failure = "0.1"
tokio = "=0.2.0-alpha.5"
tokio = "=0.2.0-alpha.6"
tower = { git = "https://github.com/tower-rs/tower", branch = "v0.3.x" }
tracing = { git = "https://github.com/tokio-rs/tracing" }
tracing-futures = { git = "https://github.com/tokio-rs/tracing", features = ["tokio-alpha"], default-features = false }
futures-preview = { version = "=0.3.0-alpha.19", features = ["async-await"] }
zebra-chain = { path = "../zebra-chain" }
zebra-chain = { path = "../zebra-chain" }

View File

@ -14,7 +14,8 @@ pub use network::Network;
pub mod protocol;
// XXX make this private once connect is removed
pub mod meta_addr;
// XXX make this private once connect is removed
// XXX revisit privacy once we finish encapsulation.
pub mod constants;
pub mod meta_addr;
pub mod peer;
pub mod timestamp_collector;

22
zebra-network/src/peer.rs Normal file
View File

@ -0,0 +1,22 @@
//! Peer handling.
/// Handles outbound requests from our node to the network.
pub mod client;
/// Asynchronously connects to peers.
pub mod connector;
/// Handles inbound requests from the network to our node.
pub mod server;
/// An error related to a peer connection.
#[derive(Fail, Debug, Clone)]
pub enum PeerError {
/// Wrapper around `failure::Error` that can be `Clone`.
#[fail(display = "{}", _0)]
Inner(std::sync::Arc<failure::Error>),
}
impl From<failure::Error> for PeerError {
fn from(e: failure::Error) -> PeerError {
PeerError::Inner(std::sync::Arc::new(e))
}
}

View File

@ -0,0 +1,82 @@
use std::{
pin::Pin,
task::{Context, Poll},
};
use futures::{
channel::{mpsc, oneshot},
future, ready,
};
use tokio::prelude::*;
use tower::Service;
use crate::protocol::internal::{Request, Response};
use super::{server::ErrorSlot, PeerError};
/// The "client" duplex half of a peer connection.
pub struct PeerClient {
pub(super) span: tracing::Span,
pub(super) server_tx: mpsc::Sender<ClientRequest>,
pub(super) error_slot: ErrorSlot,
}
/// A message from the `PeerClient` to the `PeerServer`, containing both a
/// request and a return message channel. The reason the return channel is
/// included is because `PeerClient::call` returns a future that may be moved
/// around before it resolves, so the future must have ownership of the channel
/// on which it receives the response.
#[derive(Debug)]
pub(super) struct ClientRequest(
pub(super) Request,
pub(super) oneshot::Sender<Result<Response, PeerError>>,
);
impl Service<Request> for PeerClient {
type Response = Response;
type Error = PeerError;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if let Err(_) = ready!(self.server_tx.poll_ready(cx)) {
Poll::Ready(Err(self
.error_slot
.try_get_error()
.expect("failed PeerServers must set their error slot")))
} else {
Poll::Ready(Ok(()))
}
}
fn call(&mut self, req: Request) -> Self::Future {
use futures::future::FutureExt;
use tracing_futures::Instrument;
let (tx, rx) = oneshot::channel();
match self.server_tx.try_send(ClientRequest(req, tx)) {
Err(e) => {
if e.is_disconnected() {
future::ready(Err(self
.error_slot
.try_get_error()
.expect("failed PeerServers must set their error slot")))
.instrument(self.span.clone())
.boxed()
} else {
// sending fails when there's not enough
// channel space, but we called poll_ready
panic!("called call without poll_ready");
}
}
// need a bit of yoga to get result types to align,
// because the oneshot future can error
Ok(()) => rx
.map(|val| match val {
Ok(Ok(rsp)) => Ok(rsp),
Ok(Err(e)) => Err(e),
Err(_) => Err(format_err!("oneshot died").into()),
})
.instrument(self.span.clone())
.boxed(),
}
}
}

View File

@ -0,0 +1,164 @@
use std::{
net::SocketAddr,
pin::Pin,
task::{Context, Poll},
};
use chrono::Utc;
use failure::Error;
use futures::channel::mpsc;
use tokio::{codec::Framed, net::TcpStream, prelude::*};
use tower::Service;
use tracing::{span, Level};
use tracing_futures::Instrument;
use zebra_chain::types::BlockHeight;
use crate::{
constants,
protocol::{codec::*, internal::*, message::*, types::*},
timestamp_collector::{PeerLastSeen, TimestampCollector},
Network,
};
use super::{
client::PeerClient,
server::{ErrorSlot, PeerServer, ServerState},
};
/// A [`Service`] that connects to a remote peer and constructs a client/server pair.
pub struct PeerConnector<S> {
network: Network,
internal_service: S,
sender: mpsc::Sender<PeerLastSeen>,
}
impl<S> PeerConnector<S>
where
S: Service<Request, Response = Response> + Clone + Send + 'static,
S::Future: Send,
//S::Error: Into<Error>,
{
/// XXX replace with a builder
pub fn new(network: Network, internal_service: S, collector: &TimestampCollector) -> Self {
let sender = collector.sender_handle();
PeerConnector {
network,
internal_service,
sender,
}
}
}
impl<S> Service<SocketAddr> for PeerConnector<S>
where
S: Service<Request, Response = Response> + Clone + Send + 'static,
S::Future: Send,
S::Error: Send,
//S::Error: Into<Error>,
{
type Response = PeerClient;
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// XXX when this asks a second service for
// an address to connect to, it should call inner.ready
Poll::Ready(Ok(()))
}
fn call(&mut self, addr: SocketAddr) -> Self::Future {
let connector_span = span!(Level::INFO, "connector", addr = ?addr);
let connection_span = span!(Level::INFO, "peer", addr = ?addr);
// Clone these upfront, so they can be moved into the future.
let network = self.network.clone();
let internal_service = self.internal_service.clone();
let sender = self.sender.clone();
let fut = async move {
info!("beginning connection");
let mut stream = Framed::new(
TcpStream::connect(addr).await?,
Codec::builder().for_network(network).finish(),
);
// XXX construct the Version message from a config
let version = Message::Version {
version: constants::CURRENT_VERSION,
services: PeerServices::NODE_NETWORK,
timestamp: Utc::now(),
address_recv: (PeerServices::NODE_NETWORK, addr),
address_from: (
PeerServices::NODE_NETWORK,
"127.0.0.1:9000".parse().unwrap(),
),
nonce: Nonce::default(),
user_agent: "Zebra Peer".to_owned(),
start_height: BlockHeight(0),
relay: false,
};
stream.send(version).await?;
let _remote_version = stream
.next()
.await
.ok_or_else(|| format_err!("stream closed during handshake"))??;
stream.send(Message::Verack).await?;
let _remote_verack = stream
.next()
.await
.ok_or_else(|| format_err!("stream closed during handshake"))??;
// XXX here is where we would set the version to the minimum of the
// two versions, etc. -- actually is it possible to edit the `Codec`
// after using it to make a framed adapter?
// Construct a PeerClient, PeerServer pair
let (tx, rx) = mpsc::channel(0);
let slot = ErrorSlot::default();
let client = PeerClient {
span: connection_span.clone(),
server_tx: tx,
error_slot: slot.clone(),
};
let (peer_tx, peer_rx) = stream.split();
let server = PeerServer {
state: ServerState::AwaitingRequest,
svc: internal_service,
client_rx: rx,
error_slot: slot,
peer_tx,
};
let hooked_peer_rx = peer_rx
.then(move |msg| {
let mut sender = sender.clone();
async move {
if let Ok(_) = msg {
use futures::sink::SinkExt;
let _ = sender.send((addr, Utc::now())).await;
}
msg
}
})
.boxed();
tokio::spawn(
server
.run(hooked_peer_rx)
.instrument(connection_span)
.boxed(),
);
Ok(client)
};
fut.instrument(connector_span).boxed()
}
}

View File

@ -0,0 +1,329 @@
use std::sync::{Arc, Mutex};
use failure::Error;
use futures::{
channel::{mpsc, oneshot},
stream::Stream,
};
use tokio::prelude::*;
use tower::Service;
use crate::protocol::{
internal::{Request, Response},
message::Message,
};
use super::{client::ClientRequest, PeerError};
#[derive(Default, Clone)]
pub(super) struct ErrorSlot(Arc<Mutex<Option<PeerError>>>);
impl ErrorSlot {
pub fn try_get_error(&self) -> Option<PeerError> {
self.0
.lock()
.expect("error mutex should be unpoisoned")
.as_ref()
.map(|e| e.clone())
}
}
pub(super) enum ServerState {
/// Awaiting a client request or a peer message.
AwaitingRequest,
/// Awaiting a peer message we can interpret as a client request.
AwaitingResponse(Request, oneshot::Sender<Result<Response, PeerError>>),
/// A failure has occurred and we are shutting down the server.
Failed,
}
/// The "server" duplex half of a peer connection.
pub struct PeerServer<S, Tx> {
pub(super) state: ServerState,
pub(super) svc: S,
pub(super) client_rx: mpsc::Receiver<ClientRequest>,
/// A slot shared between the PeerServer and PeerClient for storing an error.
pub(super) error_slot: ErrorSlot,
//pub(super) peer_rx: Rx,
pub(super) peer_tx: Tx,
}
impl<S, Tx> PeerServer<S, Tx>
where
S: Service<Request, Response = Response>,
S::Error: Send,
//S::Error: Into<Error>,
Tx: Sink<Message> + Unpin,
Tx::Error: Into<Error>,
{
/// Run this peer server to completion.
pub async fn run<Rx>(mut self, mut peer_rx: Rx)
where
Rx: Stream<Item = Result<Message, Error>> + Unpin,
{
// At a high level, the event loop we want is as follows: we check for any
// incoming messages from the remote peer, check if they should be interpreted
// as a response to a pending client request, and if not, interpret them as a
// request from the remote peer to our node.
//
// We also need to handle those client requests in the first place. The client
// requests are received from the corresponding `PeerClient` over a bounded
// channel (with bound 1, to minimize buffering), but there is no relationship
// between the stream of client requests and the stream of peer messages, so we
// cannot ignore one kind while waiting on the other. Moreover, we cannot accept
// a second client request while the first one is still pending.
//
// To do this, we inspect the current request state.
//
// If there is no pending request, we wait on either an incoming peer message or
// an incoming request, whichever comes first.
//
// If there is a pending request, we wait only on an incoming peer message, and
// check whether it can be interpreted as a response to the pending request.
use futures::future::FutureExt;
use futures::select;
// This future represents the next message received from the peer.
// It needs to be stored outside of the event loop, so that we can overwrite
// it with the new "next message future" every time we get a new message.
let mut peer_rx_fut = peer_rx.next().fuse();
loop {
match self.state {
// We're awaiting a client request, so listen for both
// client requests and peer messages simultaneously.
ServerState::AwaitingRequest => select! {
req = self.client_rx.next() => {
match req {
Some(req) => self.handle_client_request(req).await,
None => {
trace!("client_rx closed, shutting down");
return;
}
}
}
msg = peer_rx_fut => {
peer_rx_fut = peer_rx.next().fuse();
match msg {
None => {
trace!("peer stream closed, shutting down");
return;
}
// We got a peer message but it was malformed.
//Some(Err(e)) => self.fail_with(e.into()),
// XXX remove this when we parse all message types
Some(Err(e)) => {
error!(%e);
}
// We got a peer message and it was well-formed.
Some(Ok(msg)) => self.handle_message_as_request(msg).await,
}
}
},
// We're awaiting a response to a client request,
// so only listen to peer messages, not further requests.
ServerState::AwaitingResponse { .. } => {
let msg = peer_rx_fut.await;
peer_rx_fut = peer_rx.next().fuse();
match msg {
// The peer channel has closed -- no more messages.
// However, we still need to flush pending client requests.
None => self.fail_with(format_err!("peer closed connection").into()),
// We got a peer message but it was malformed.
//Some(Err(e)) => self.fail_with(e.into()),
// XXX remove this when we parse all message types
Some(Err(e)) => {
error!(%e);
}
// We got a peer message and it was well-formed.
Some(Ok(msg)) => match self.handle_message_as_response(msg) {
None => continue,
Some(msg) => self.handle_message_as_request(msg).await,
},
}
}
// We've failed, but we need to flush all pending client
// requests before we can return and complete the future.
ServerState::Failed => {
match self.client_rx.next().await {
Some(ClientRequest(_, tx)) => {
let e = self
.error_slot
.try_get_error()
.expect("cannot enter failed state without setting error slot");
let _ = tx.send(Err(e));
// Continue until we've errored all queued reqs
continue;
}
None => return,
}
}
}
}
}
/// Marks the peer as having failed with error `e`.
fn fail_with(&mut self, e: PeerError) {
trace!(%e, "failing peer service with error");
// Update the shared error slot
let mut guard = self
.error_slot
.0
.lock()
.expect("mutex should be unpoisoned");
if guard.is_some() {
panic!("called fail_with on already-failed server state");
} else {
*guard = Some(e);
}
// Drop the guard immediately to release the mutex.
std::mem::drop(guard);
// We want to close the client channel and set ServerState::Failed so
// that we can flush any pending client requests. However, we may have
// an outstanding client request in ServerState::AwaitingResponse, so
// we need to deal with it first if it exists.
self.client_rx.close();
let old_state = std::mem::replace(&mut self.state, ServerState::Failed);
if let ServerState::AwaitingResponse(_, tx) = old_state {
// We know the slot has Some(e) because we just set it above,
// and the error slot is never unset.
let e = self.error_slot.try_get_error().unwrap();
let _ = tx.send(Err(e));
}
}
/// Handle an incoming client request, possibly generating outgoing messages to the
/// remote peer.
async fn handle_client_request(&mut self, msg: ClientRequest) {
trace!(?msg);
use Request::*;
use ServerState::*;
let ClientRequest(req, tx) = msg;
// Inner match returns Result with the new state or an error.
// Outer match updates state or fails.
match match (&self.state, req) {
(Failed, _) => panic!("failed service cannot handle requests"),
(AwaitingResponse { .. }, _) => panic!("tried to update pending request"),
(AwaitingRequest, GetPeers) => self
.peer_tx
.send(Message::GetAddr)
.await
.map_err(|e| e.into().into())
.map(|()| AwaitingResponse(GetPeers, tx)),
(AwaitingRequest, PushPeers(addrs)) => self
.peer_tx
.send(Message::Addr(addrs))
.await
.map_err(|e| e.into().into())
.map(|()| {
// PushPeers does not have a response, so we return OK as
// soon as we send the request. Sending through a oneshot
// can only fail if the rx end is dropped before calling
// send, which we can safely ignore (the request future was
// cancelled).
let _ = tx.send(Ok(Response::Ok));
AwaitingRequest
}),
} {
Ok(new_state) => self.state = new_state,
Err(e) => self.fail_with(e),
}
}
/// Try to handle `msg` as a response to a client request, possibly consuming
/// it in the process.
///
/// Taking ownership of the message means that we can pass ownership of its
/// contents to responses without additional copies. If the message is not
/// interpretable as a response, we return ownership to the caller.
fn handle_message_as_response(&mut self, msg: Message) -> Option<Message> {
trace!(?msg);
// This function is where we statefully interpret Bitcoin/Zcash messages
// into responses to messages in the internal request/response protocol.
// This conversion is done by a sequence of (request, message) match arms,
// each of which contains the conversion logic for that pair.
use Request::*;
use ServerState::*;
let mut ignored_msg = None;
// We want to be able to consume the state, but it's behind a mutable
// reference, so we can't move it out of self without swapping in a
// placeholder, even if we immediately overwrite the placeholder.
let tmp_state = std::mem::replace(&mut self.state, AwaitingRequest);
self.state = match (tmp_state, msg) {
(AwaitingResponse(GetPeers, tx), Message::Addr(addrs)) => {
tx.send(Ok(Response::Peers(addrs)))
.expect("response oneshot should be unused");
AwaitingRequest
}
// By default, messages are not responses.
(state, msg) => {
ignored_msg = Some(msg);
state
}
};
ignored_msg
}
async fn handle_message_as_request(&mut self, msg: Message) {
trace!(?msg);
// These messages are transport-related, handle them separately:
match msg {
Message::Version { .. } => {
self.fail_with(format_err!("got version message after handshake").into());
return;
}
Message::Verack { .. } => {
self.fail_with(format_err!("got verack message after handshake").into());
return;
}
Message::Ping(nonce) => {
match self.peer_tx.send(Message::Pong(nonce)).await {
Ok(()) => {}
Err(e) => self.fail_with(e.into().into()),
}
return;
}
_ => {}
}
// Interpret `msg` as a request from the remote peer to our node,
// and try to construct an appropriate request object.
let req = match msg {
Message::Addr(addrs) => Some(Request::PushPeers(addrs)),
_ => None,
};
match req {
Some(req) => self.drive_peer_request(req).await,
None => {}
}
}
/// Given a `req` originating from the peer, drive it to completion and send
/// any appropriate messages to the remote peer. If an error occurs while
/// processing the request (e.g., the service is shedding load), then we call
/// fail_with to terminate the entire peer connection, shrinking the number
/// of connected peers.
async fn drive_peer_request(&mut self, req: Request) {
trace!(?req);
use tower::ServiceExt;
// XXX Drop the errors on the floor for now so that
// we can ignore error type alignment
match self.svc.ready().await {
Err(_) => self.fail_with(format_err!("svc err").into()),
Ok(()) => {}
}
match self.svc.call(req).await {
Err(_) => self.fail_with(format_err!("svc err").into()),
Ok(Response::Ok) => { /* generic success, do nothing */ }
Ok(Response::Peers(addrs)) => {
if let Err(e) = self.peer_tx.send(Message::Addr(addrs)).await {
self.fail_with(e.into().into());
}
}
}
}
}

View File

@ -5,3 +5,10 @@ pub mod message;
pub mod types;
pub mod inv;
// XXX at some later point the above should move to an `external` submodule, so
// that we have
// - protocol::external::{all_bitcoin_zcash_types};
// - protocol::internal::{all_internal_req_rsp_types};
pub mod internal;

View File

@ -98,7 +98,6 @@ impl Encoder for Codec {
type Item = Message;
type Error = Error;
#[instrument(skip(src))]
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
// XXX(HACK): this is inefficient and does an extra allocation.
// instead, we should have a size estimator for the message, reserve
@ -136,7 +135,7 @@ impl Encoder for Codec {
FilterClear { .. } => b"filterclear\0",
MerkleBlock { .. } => b"merkleblock\0",
};
trace!(?command, len = body.len());
trace!(?item, len = body.len());
// XXX this should write directly into the buffer,
// but leave it for now until we fix the issue above.
@ -237,7 +236,6 @@ impl Decoder for Codec {
type Item = Message;
type Error = Error;
#[instrument(skip(src))]
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
match self.state {
DecodeState::Head => {
@ -329,7 +327,7 @@ impl Decoder for Codec {
// We need Ok(Some(msg)) to signal that we're done decoding.
// This is also convenient for tracing the parse result.
.map(|msg| {
trace!(?msg);
trace!("finished message decoding");
Some(msg)
})
}

View File

@ -0,0 +1,25 @@
//! Message types for the internal request/response protocol.
//!
//! These are currently defined just as enums with all possible requests and
//! responses, so that we have unified types to pass around. No serialization
//! is performed as these are only internal types.
use crate::meta_addr::MetaAddr;
/// A network request, represented in internal format.
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum Request {
/// Requests additional peers from the server.
GetPeers,
/// Advertises peers to the remote server.
PushPeers(Vec<MetaAddr>),
}
/// A response to a network request, represented in internal format.
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum Response {
/// Generic success.
Ok,
/// A list of peers, used to respond to `GetPeers`.
Peers(Vec<MetaAddr>),
}

View File

@ -0,0 +1,120 @@
//! Management of peer liveness / last-seen information.
use std::{
collections::{BTreeMap, HashMap},
net::SocketAddr,
sync::{Arc, Mutex},
};
use chrono::{DateTime, Utc};
use futures::channel::mpsc;
use tokio::prelude::*;
/// A type alias for a timestamp event sent to a `TimestampCollector`.
pub(crate) type PeerLastSeen = (SocketAddr, DateTime<Utc>);
/// Maintains a lookup table from peer addresses to last-seen times.
///
/// On creation, the `TimestampCollector` spawns a worker task to process new
/// timestamp events. The resulting `TimestampCollector` can be cloned, and the
/// worker task and state are shared among all of the clones.
///
/// XXX add functionality for querying the timestamp data
#[derive(Clone, Debug)]
pub struct TimestampCollector {
// We do not expect mutex contention to be a problem, because
// the dominant accessor is the collector worker, and it has a long
// event buffer to hide latency if other tasks block it temporarily.
data: Arc<Mutex<TimestampData>>,
shutdown: Arc<ShutdownSignal>,
worker_tx: mpsc::Sender<PeerLastSeen>,
}
#[derive(Default, Debug)]
struct TimestampData {
by_addr: HashMap<SocketAddr, DateTime<Utc>>,
by_time: BTreeMap<DateTime<Utc>, SocketAddr>,
}
impl TimestampData {
fn update(&mut self, event: PeerLastSeen) {
use std::collections::hash_map::Entry;
let (addr, timestamp) = event;
trace!(?addr, ?timestamp);
match self.by_addr.entry(addr) {
Entry::Occupied(mut entry) => {
let last_timestamp = entry.get();
self.by_time
.remove(last_timestamp)
.expect("cannot have by_addr entry without by_time entry");
entry.insert(timestamp);
self.by_time.insert(timestamp, addr);
}
Entry::Vacant(entry) => {
entry.insert(timestamp);
self.by_time.insert(timestamp, addr);
}
}
}
}
impl TimestampCollector {
/// Constructs a new `TimestampCollector`, spawning a worker task to process updates.
pub fn new() -> TimestampCollector {
let data = Arc::new(Mutex::new(TimestampData::default()));
// We need to make a copy now so we can move data into the async block.
let data2 = data.clone();
const TIMESTAMP_WORKER_BUFFER_SIZE: usize = 100;
let (worker_tx, mut worker_rx) = mpsc::channel(TIMESTAMP_WORKER_BUFFER_SIZE);
let (shutdown_tx, mut shutdown_rx) = mpsc::channel(0);
// Construct and then spawn a worker.
let worker = async move {
use futures::select;
loop {
select! {
_ = shutdown_rx.next() => return,
msg = worker_rx.next() => {
match msg {
Some(event) => {
data2
.lock()
.expect("mutex should be unpoisoned")
.update(event)
}
None => return,
}
}
}
}
};
tokio::spawn(worker.boxed());
TimestampCollector {
data,
worker_tx,
shutdown: Arc::new(ShutdownSignal { tx: shutdown_tx }),
}
}
pub(crate) fn sender_handle(&self) -> mpsc::Sender<PeerLastSeen> {
self.worker_tx.clone()
}
}
/// Sends a signal when dropped.
#[derive(Debug)]
struct ShutdownSignal {
/// Sending () signals that the task holding the rx end should terminate.
///
/// This should really be a oneshot but calling a oneshot consumes it,
/// and we can't move out of self in Drop.
tx: mpsc::Sender<()>,
}
impl Drop for ShutdownSignal {
fn drop(&mut self) {
self.tx.try_send(()).expect("tx is only used in drop");
}
}

View File

@ -12,18 +12,16 @@ gumdrop = "0.6"
lazy_static = "1"
serde = { version = "1", features = ["serde_derive"] }
toml = "0.5"
tokio = "=0.2.0-alpha.5"
tokio = "=0.2.0-alpha.6"
# Replace with git to pick up instrument derive changes, revert on release.
#tracing = "0.1"
tracing = { git = "https://github.com/tokio-rs/tracing" }
tracing-futures = { git = "https://github.com/tokio-rs/tracing", features = ["tokio-alpha"], default-features = false }
tracing-subscriber = { git = "https://github.com/tokio-rs/tracing" }
tracing-log = { git = "https://github.com/tokio-rs/tracing" }
# Can't use published alpha because of conflicts tracking pin-project alphas
#hyper = "=0.13.0-alpha.1"
hyper = { git = "https://github.com/hyperium/hyper" }
futures-core-preview = { version = "=0.3.0-alpha.18" }
futures-util-preview = { version = "=0.3.0-alpha.18" }
hyper = { git = "https://github.com/hyperium/hyper"}
tower = { git = "https://github.com/tower-rs/tower", branch = "v0.3.x" }
futures-preview = { version = "=0.3.0-alpha.19", features = ["async-await"] }
zebra-chain = { path = "../zebra-chain" }
zebra-network = { path = "../zebra-network" }

View File

@ -26,7 +26,7 @@ impl Runnable for ConnectCmd {
// Combine the connect future with an infinite wait
// so that the program has to be explicitly killed and
// won't die before all tracing messages are written.
let fut = futures_util::future::join(
let fut = futures::future::join(
async {
match self.connect().await {
Ok(()) => {}
@ -51,64 +51,40 @@ impl Runnable for ConnectCmd {
impl ConnectCmd {
async fn connect(&self) -> Result<(), failure::Error> {
use chrono::Utc;
use tokio::{codec::Framed, net::TcpStream, prelude::*};
use zebra_chain::types::BlockHeight;
use zebra_network::{
constants,
protocol::{codec::*, message::*, types::*},
peer::connector::PeerConnector,
protocol::internal::{Request, Response},
timestamp_collector::TimestampCollector,
Network,
};
info!("connecting");
info!("begin tower-based peer handling test stub");
let mut stream = Framed::new(
TcpStream::connect(self.addr).await?,
Codec::builder().for_network(Network::Mainnet).finish(),
use tower::{buffer::Buffer, service_fn, Service, ServiceExt};
let node = Buffer::new(
service_fn(|req| {
async move {
info!(?req);
Ok::<Response, failure::Error>(Response::Ok)
}
}),
1,
);
let version = Message::Version {
version: constants::CURRENT_VERSION,
services: PeerServices::NODE_NETWORK,
timestamp: Utc::now(),
address_recv: (PeerServices::NODE_NETWORK, self.addr),
// We just make something up because at this stage the `connect` command
// doesn't run a server or anything -- will the zcashd respond on the
// same tcp connection or try to open one to the bogus address below?
address_from: (
PeerServices::NODE_NETWORK,
"127.0.0.1:9000".parse().unwrap(),
),
nonce: Nonce(1),
user_agent: "Zebra Connect".to_owned(),
start_height: BlockHeight(0),
relay: false,
};
let collector = TimestampCollector::new();
info!(version = ?version);
let mut pc = PeerConnector::new(Network::Mainnet, node, &collector);
// no need to call ready because pc is always ready
let mut client = pc.call(self.addr.clone()).await?;
stream.send(version).await?;
client.ready().await?;
let rsp = client.call(Request::GetPeers).await?;
info!(?rsp);
let resp_version: Message = stream.next().await.expect("expected data")?;
info!(resp_version = ?resp_version);
stream.send(Message::Verack).await?;
let resp_verack = stream.next().await.expect("expected data")?;
info!(resp_verack = ?resp_verack);
while let Some(maybe_msg) = stream.next().await {
match maybe_msg {
Ok(msg) => match msg {
Message::Ping(nonce) => {
stream.send(Message::Pong(nonce)).await?;
}
_ => warn!("Unknown message"),
},
Err(e) => error!("{}", e),
};
loop {
// empty loop ensures we don't exit the application,
// and this is throwaway code
}
Ok(())