From f6e62b0f5ea2e885ebe53223a2eeaa025f5a24ad Mon Sep 17 00:00:00 2001 From: Henry de Valence Date: Tue, 15 Oct 2019 20:38:26 -0700 Subject: [PATCH] Remove failure from zebra-chain, zebra-network. Failure uses a distinct Fail trait rather than the standard library's Error trait, which causes a lot of interoperability problems with tower and other Error-using crates. Since failure was created, the standard library's Error trait was improved, and its conveniences are now available without the custom Fail trait using `thiserror` (for easy error derives) and `anyhow` (for a better boxed Error). --- zebra-chain/Cargo.toml | 2 +- zebra-chain/src/lib.rs | 3 - zebra-chain/src/serialization.rs | 18 ++---- zebra-network/Cargo.toml | 2 +- zebra-network/src/lib.rs | 2 - zebra-network/src/peer.rs | 24 +------ zebra-network/src/peer/client.rs | 22 +++---- zebra-network/src/peer/connector.rs | 16 ++--- zebra-network/src/peer/error.rs | 53 ++++++++++++++++ zebra-network/src/peer/server.rs | 86 ++++++++++++-------------- zebra-network/src/peer_set/discover.rs | 5 +- zebra-network/src/protocol/codec.rs | 53 +++++++--------- zebra-network/src/protocol/inv.rs | 2 +- 13 files changed, 146 insertions(+), 142 deletions(-) create mode 100644 zebra-network/src/peer/error.rs diff --git a/zebra-chain/Cargo.toml b/zebra-chain/Cargo.toml index f81dbc31..644e54aa 100644 --- a/zebra-chain/Cargo.toml +++ b/zebra-chain/Cargo.toml @@ -8,8 +8,8 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +thiserror = "1" byteorder = "1.3" chrono = "0.4" -failure = "0.1" #hex = "0.4" This conflicts with tracing-subscriber? sha2 = "0.8" diff --git a/zebra-chain/src/lib.rs b/zebra-chain/src/lib.rs index ac407ede..d97b71ed 100644 --- a/zebra-chain/src/lib.rs +++ b/zebra-chain/src/lib.rs @@ -1,9 +1,6 @@ //! Blockchain-related datastructures for Zebra. 🦓 #![deny(missing_docs)] -#[macro_use] -extern crate failure; - mod merkle_tree; mod sha256d_writer; diff --git a/zebra-chain/src/serialization.rs b/zebra-chain/src/serialization.rs index 44ec9903..0aa0e88e 100644 --- a/zebra-chain/src/serialization.rs +++ b/zebra-chain/src/serialization.rs @@ -10,25 +10,19 @@ use std::io; use std::net::{IpAddr, SocketAddr}; use byteorder::{BigEndian, LittleEndian, ReadBytesExt, WriteBytesExt}; +use thiserror::Error; /// A serialization error. // XXX refine error types -- better to use boxed errors? -#[derive(Fail, Debug)] +#[derive(Error, Debug)] pub enum SerializationError { /// An underlying IO error. - #[fail(display = "io error {}", _0)] - IoError(io::Error), + #[error("io error")] + Io(#[from] io::Error), /// The data to be deserialized was malformed. // XXX refine errors - #[fail(display = "parse error: {}", _0)] - ParseError(&'static str), -} - -// Allow upcasting io::Error to SerializationError -impl From for SerializationError { - fn from(e: io::Error) -> Self { - Self::IoError(e) - } + #[error("parse error: {0}")] + Parse(&'static str), } /// Consensus-critical serialization for Zcash. diff --git a/zebra-network/Cargo.toml b/zebra-network/Cargo.toml index 9652bca6..5244c08f 100644 --- a/zebra-network/Cargo.toml +++ b/zebra-network/Cargo.toml @@ -13,7 +13,7 @@ bytes = "0.4" rand = "0.7" byteorder = "1.3" chrono = "0.4" -failure = "0.1" +thiserror = "1" serde = { version = "1", features = ["serde_derive"] } pin-project = "0.4" # indexmap has rayon support for parallel iteration, diff --git a/zebra-network/src/lib.rs b/zebra-network/src/lib.rs index cc006747..c0513f02 100644 --- a/zebra-network/src/lib.rs +++ b/zebra-network/src/lib.rs @@ -7,8 +7,6 @@ extern crate pin_project; #[macro_use] extern crate serde; #[macro_use] -extern crate failure; -#[macro_use] extern crate tracing; #[macro_use] extern crate bitflags; diff --git a/zebra-network/src/peer.rs b/zebra-network/src/peer.rs index e3993e72..def9ac74 100644 --- a/zebra-network/src/peer.rs +++ b/zebra-network/src/peer.rs @@ -4,30 +4,12 @@ mod client; /// Asynchronously connects to peers. mod connector; +/// Peer-related errors. +mod error; /// Handles inbound requests from the network to our node. mod server; pub use client::PeerClient; pub use connector::PeerConnector; +pub use error::{PeerError, SharedPeerError}; pub use server::PeerServer; - -/// An error related to a peer connection. -#[derive(Fail, Debug, Clone)] -pub enum PeerError { - /// Wrapper around `failure::Error` that can be `Clone`. - #[fail(display = "{}", _0)] - Inner(std::sync::Arc), -} - -impl From for PeerError { - fn from(e: failure::Error) -> PeerError { - PeerError::Inner(std::sync::Arc::new(e)) - } -} - -// XXX hack -impl Into for PeerError { - fn into(self) -> crate::BoxedStdError { - Box::new(format_err!("dropped error info").compat()) - } -} diff --git a/zebra-network/src/peer/client.rs b/zebra-network/src/peer/client.rs index 560d7d57..f890ac88 100644 --- a/zebra-network/src/peer/client.rs +++ b/zebra-network/src/peer/client.rs @@ -12,7 +12,7 @@ use tower::Service; use crate::protocol::internal::{Request, Response}; -use super::{server::ErrorSlot, PeerError}; +use super::{error::ErrorSlot, SharedPeerError}; /// The "client" duplex half of a peer connection. pub struct PeerClient { @@ -29,12 +29,12 @@ pub struct PeerClient { #[derive(Debug)] pub(super) struct ClientRequest( pub(super) Request, - pub(super) oneshot::Sender>, + pub(super) oneshot::Sender>, ); impl Service for PeerClient { type Response = Response; - type Error = PeerError; + type Error = SharedPeerError; type Future = Pin> + Send + 'static>>; @@ -52,6 +52,7 @@ impl Service for PeerClient { fn call(&mut self, req: Request) -> Self::Future { use futures::future::FutureExt; use tracing_futures::Instrument; + let (tx, rx) = oneshot::channel(); match self.server_tx.try_send(ClientRequest(req, tx)) { Err(e) => { @@ -68,16 +69,15 @@ impl Service for PeerClient { panic!("called call without poll_ready"); } } - // need a bit of yoga to get result types to align, - // because the oneshot future can error - Ok(()) => rx - .map(|val| match val { - Ok(Ok(rsp)) => Ok(rsp), - Ok(Err(e)) => Err(e), - Err(_) => Err(format_err!("oneshot died").into()), + Ok(()) => { + // The reciever end of the oneshot is itself a future. + rx.map(|oneshot_recv_result| { + oneshot_recv_result + .expect("ClientRequest oneshot sender must not be dropped before send") }) .instrument(self.span.clone()) - .boxed(), + .boxed() + } } } } diff --git a/zebra-network/src/peer/connector.rs b/zebra-network/src/peer/connector.rs index 08ba7b1b..bf76126f 100644 --- a/zebra-network/src/peer/connector.rs +++ b/zebra-network/src/peer/connector.rs @@ -5,7 +5,6 @@ use std::{ }; use chrono::Utc; -use failure::Error; use futures::channel::mpsc; use tokio::{codec::Framed, net::TcpStream, prelude::*}; use tower::Service; @@ -21,10 +20,7 @@ use crate::{ BoxedStdError, Config, Network, }; -use super::{ - client::PeerClient, - server::{ErrorSlot, PeerServer, ServerState}, -}; +use super::{error::ErrorSlot, server::ServerState, PeerClient, PeerError, PeerServer}; /// A [`Service`] that connects to a remote peer and constructs a client/server pair. pub struct PeerConnector { @@ -64,12 +60,12 @@ where impl Service for PeerConnector where - S: Service + Clone + Send + 'static, + S: Service + Clone + Send + 'static, S::Future: Send, S::Error: Send + Into, { type Response = PeerClient; - type Error = Error; + type Error = PeerError; type Future = Pin>>>; fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { @@ -93,7 +89,7 @@ where debug!("opening tcp stream"); let mut stream = Framed::new( - TcpStream::connect(addr).await?, + TcpStream::connect(addr).await.expect("PeerError does not contain an io::Error variant, but this code will be removed in the next PR, so there's no need to handle this error"), Codec::builder().for_network(network).finish(), ); @@ -121,7 +117,7 @@ where let remote_version = stream .next() .await - .ok_or_else(|| format_err!("stream closed during handshake"))??; + .ok_or_else(|| PeerError::ConnectionClosed)??; debug!( ?remote_version, @@ -132,7 +128,7 @@ where let remote_verack = stream .next() .await - .ok_or_else(|| format_err!("stream closed during handshake"))??; + .ok_or_else(|| PeerError::ConnectionClosed)??; debug!(?remote_verack, "got remote peer's verack"); diff --git a/zebra-network/src/peer/error.rs b/zebra-network/src/peer/error.rs new file mode 100644 index 00000000..6c79ec08 --- /dev/null +++ b/zebra-network/src/peer/error.rs @@ -0,0 +1,53 @@ +use std::sync::{Arc, Mutex}; + +use thiserror::Error; + +use zebra_chain::serialization::SerializationError; + +/// A wrapper around `Arc` that implements `Error`. +#[derive(Error, Debug, Clone)] +#[error("{0}")] +pub struct SharedPeerError(#[from] Arc); + +/// An error related to peer connection handling. +#[derive(Error, Debug)] +pub enum PeerError { + /// The remote peer closed the connection. + #[error("Peer closed connection")] + ConnectionClosed, + /// The [`PeerClient`] half of the [`PeerClient`]/[`PeerServer`] pair died before + /// the [`PeerServer`] half did. + #[error("PeerClient instance died")] + DeadPeerClient, + /// The [`PeerServer`] half of the [`PeerServer`]/[`PeerClient`] pair died before + /// the [`PeerClient`] half did. + #[error("PeerServer instance died")] + DeadPeerServer, + /// The remote peer did not respond to a [`PeerClient`] request in time. + #[error("Client request timed out")] + ClientRequestTimeout, + /// A serialization error occurred while reading or writing a message. + #[error("Serialization error")] + Serialization(#[from] SerializationError), + /// A badly-behaved remote peer sent a handshake message after the handshake was + /// already complete. + #[error("Remote peer sent handshake messages after handshake")] + DuplicateHandshake, + /// This node's internal services were overloaded, so the connection was dropped + /// to shed load. + #[error("Internal services over capacity")] + Overloaded, +} + +#[derive(Default, Clone)] +pub(super) struct ErrorSlot(pub(super) Arc>>); + +impl ErrorSlot { + pub fn try_get_error(&self) -> Option { + self.0 + .lock() + .expect("error mutex should be unpoisoned") + .as_ref() + .map(|e| e.clone()) + } +} diff --git a/zebra-network/src/peer/server.rs b/zebra-network/src/peer/server.rs index bd5a529f..7794e539 100644 --- a/zebra-network/src/peer/server.rs +++ b/zebra-network/src/peer/server.rs @@ -1,6 +1,5 @@ use std::sync::{Arc, Mutex}; -use failure::Error; use futures::{ channel::{mpsc, oneshot}, future::{self, Either}, @@ -11,6 +10,7 @@ use tokio::{ timer::{delay_for, Delay}, }; use tower::Service; +use zebra_chain::serialization::SerializationError; use crate::{ constants, @@ -21,26 +21,13 @@ use crate::{ BoxedStdError, }; -use super::{client::ClientRequest, PeerError}; - -#[derive(Default, Clone)] -pub(super) struct ErrorSlot(Arc>>); - -impl ErrorSlot { - pub fn try_get_error(&self) -> Option { - self.0 - .lock() - .expect("error mutex should be unpoisoned") - .as_ref() - .map(|e| e.clone()) - } -} +use super::{client::ClientRequest, error::ErrorSlot, PeerError, SharedPeerError}; pub(super) enum ServerState { /// Awaiting a client request or a peer message. AwaitingRequest, /// Awaiting a peer message we can interpret as a client request. - AwaitingResponse(Request, oneshot::Sender>), + AwaitingResponse(Request, oneshot::Sender>), /// A failure has occurred and we are shutting down the server. Failed, } @@ -62,15 +49,14 @@ pub struct PeerServer { impl PeerServer where - S: Service, + S: Service, S::Error: Into, - Tx: Sink + Unpin, - Tx::Error: Into, + Tx: Sink + Unpin, { /// Run this peer server to completion. pub async fn run(mut self, mut peer_rx: Rx) where - Rx: Stream> + Unpin, + Rx: Stream> + Unpin, { // At a high level, the event loop we want is as follows: we check for any // incoming messages from the remote peer, check if they should be interpreted @@ -122,9 +108,7 @@ where .as_mut() .expect("timeout must be set while awaiting response"); match future::select(peer_rx.next(), timer_ref).await { - Either::Left((None, _)) => { - self.fail_with(format_err!("peer closed connection").into()) - } + Either::Left((None, _)) => self.fail_with(PeerError::ConnectionClosed), // XXX switch back to hard failure when we parse all message types //Either::Left((Some(Err(e)), _)) => self.fail_with(e.into()), Either::Left((Some(Err(peer_err)), _timer)) => error!(%peer_err), @@ -139,7 +123,8 @@ where // Re-matching lets us take ownership of tx self.state = match self.state { ServerState::AwaitingResponse(_, tx) => { - let _ = tx.send(Err(format_err!("request timed out").into())); + let e = PeerError::ClientRequestTimeout; + let _ = tx.send(Err(Arc::new(e).into())); ServerState::AwaitingRequest } _ => panic!("unreachable"), @@ -179,7 +164,7 @@ where if guard.is_some() { panic!("called fail_with on already-failed server state"); } else { - *guard = Some(e); + *guard = Some(Arc::new(e).into()); } // Drop the guard immediately to release the mutex. std::mem::drop(guard); @@ -215,13 +200,13 @@ where .peer_tx .send(Message::GetAddr) .await - .map_err(|e| e.into().into()) + .map_err(|e| e.into()) .map(|()| AwaitingResponse(GetPeers, tx)), (AwaitingRequest, PushPeers(addrs)) => self .peer_tx .send(Message::Addr(addrs)) .await - .map_err(|e| e.into().into()) + .map_err(|e| e.into()) .map(|()| { // PushPeers does not have a response, so we return OK as // soon as we send the request. Sending through a oneshot @@ -280,17 +265,17 @@ where // These messages are transport-related, handle them separately: match msg { Message::Version { .. } => { - self.fail_with(format_err!("got version message after handshake").into()); + self.fail_with(PeerError::DuplicateHandshake); return; } Message::Verack { .. } => { - self.fail_with(format_err!("got verack message after handshake").into()); + self.fail_with(PeerError::DuplicateHandshake); return; } Message::Ping(nonce) => { match self.peer_tx.send(Message::Pong(nonce)).await { Ok(()) => {} - Err(e) => self.fail_with(e.into().into()), + Err(e) => self.fail_with(e.into()), } return; } @@ -317,26 +302,31 @@ where /// of connected peers. async fn drive_peer_request(&mut self, req: Request) { trace!(?req); - use tower::ServiceExt; - if let Err(e) = self - .svc - .ready() - .await - .map_err(|e| Error::from_boxed_compat(e.into())) - { - self.fail_with(e.into()); + use tower::{load_shed::error::Overloaded, ServiceExt}; + + if let Err(_) = self.svc.ready().await { + // Treat all service readiness errors as Overloaded + self.fail_with(PeerError::Overloaded); } - match self - .svc - .call(req) - .await - .map_err(|e| Error::from_boxed_compat(e.into())) - { - Err(e) => self.fail_with(e.into()), - Ok(Response::Ok) => { /* generic success, do nothing */ } - Ok(Response::Peers(addrs)) => { + + let rsp = match self.svc.call(req).await { + Err(e) => { + if e.is::() { + self.fail_with(PeerError::Overloaded); + } else { + // We could send a reject to the remote peer. + error!(%e); + } + return; + } + Ok(rsp) => rsp, + }; + + match rsp { + Response::Ok => { /* generic success, do nothing */ } + Response::Peers(addrs) => { if let Err(e) = self.peer_tx.send(Message::Addr(addrs)).await { - self.fail_with(e.into().into()); + self.fail_with(e.into()); } } } diff --git a/zebra-network/src/peer_set/discover.rs b/zebra-network/src/peer_set/discover.rs index add4a33c..a803c315 100644 --- a/zebra-network/src/peer_set/discover.rs +++ b/zebra-network/src/peer_set/discover.rs @@ -4,11 +4,10 @@ use std::{ task::{Context, Poll}, }; -use failure::Error; use tokio::prelude::*; use tower::discover::{Change, Discover}; -use crate::peer::PeerClient; +use crate::peer::{PeerClient, PeerError}; /// A [`tower::discover::Discover`] implementation to report new `PeerClient`s. /// @@ -22,7 +21,7 @@ pub struct PeerDiscover { impl Discover for PeerDiscover { type Key = SocketAddr; type Service = PeerClient; - type Error = Error; + type Error = PeerError; fn poll_discover( self: Pin<&mut Self>, diff --git a/zebra-network/src/protocol/codec.rs b/zebra-network/src/protocol/codec.rs index 25c6f8f8..5e28e131 100644 --- a/zebra-network/src/protocol/codec.rs +++ b/zebra-network/src/protocol/codec.rs @@ -5,12 +5,13 @@ use std::io::{Cursor, Read, Write}; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use bytes::BytesMut; use chrono::{TimeZone, Utc}; -use failure::Error; use tokio::codec::{Decoder, Encoder}; use zebra_chain::{ block::{Block, BlockHeader, BlockHeaderHash}, - serialization::{ReadZcashExt, WriteZcashExt, ZcashDeserialize, ZcashSerialize}, + serialization::{ + ReadZcashExt, SerializationError as Error, WriteZcashExt, ZcashDeserialize, ZcashSerialize, + }, transaction::Transaction, types::{BlockHeight, Sha256dChecksum}, }; @@ -273,7 +274,7 @@ impl Codec { // FilterAdd => {} // FilterClear => {} // MerkleBlock => {} - _ => bail!("unimplemented message type"), + _ => return Err(Error::Parse("unimplemented message type")), } Ok(()) } @@ -296,6 +297,7 @@ impl Decoder for Codec { type Error = Error; fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + use Error::Parse; match self.state { DecodeState::Head => { // First check that the src buffer contains an entire header. @@ -316,14 +318,12 @@ impl Decoder for Codec { let checksum = Sha256dChecksum(header_reader.read_4_bytes()?); trace!(?self.state, ?magic, ?command, body_len, ?checksum, "read header from src buffer"); - ensure!( - magic == self.builder.network.magic(), - "supplied magic did not meet expectations" - ); - ensure!( - body_len < self.builder.max_len, - "body length exceeded maximum size", - ); + if magic != self.builder.network.magic() { + return Err(Parse("supplied magic did not meet expectations")); + } + if body_len >= self.builder.max_len { + return Err(Parse("body length exceeded maximum size")); + } // Reserve buffer space for the expected body and the following header. src.reserve(body_len + HEADER_LEN); @@ -354,10 +354,11 @@ impl Decoder for Codec { let body = src.split_to(body_len); self.state = DecodeState::Head; - ensure!( - checksum == Sha256dChecksum::from(&body[..]), - "supplied message checksum does not match computed checksum" - ); + if checksum != Sha256dChecksum::from(&body[..]) { + return Err(Parse( + "supplied message checksum does not match computed checksum", + )); + } let body_reader = Cursor::new(&body); match &command { @@ -381,7 +382,7 @@ impl Decoder for Codec { b"filteradd\0\0\0" => self.read_filteradd(body_reader), b"filterclear\0" => self.read_filterclear(body_reader), b"merkleblock\0" => self.read_merkleblock(body_reader), - _ => bail!("unknown command"), + _ => return Err(Parse("unknown command")), } // We need Ok(Some(msg)) to signal that we're done decoding. // This is also convenient for tracing the parse result. @@ -415,7 +416,7 @@ impl Codec { relay: match reader.read_u8()? { 0 => false, 1 => true, - _ => bail!("non-bool value supplied in relay field"), + _ => return Err(Error::Parse("non-bool value supplied in relay field")), }, }) } @@ -433,8 +434,7 @@ impl Codec { } fn read_reject(&self, mut _reader: R) -> Result { - trace!("reject"); - bail!("unimplemented message type") + return Err(Error::Parse("reject messages are not implemented")); } fn read_addr(&self, mut reader: R) -> Result { @@ -544,28 +544,23 @@ impl Codec { } fn read_mempool(&self, mut _reader: R) -> Result { - trace!("mempool"); - bail!("unimplemented message type") + return Err(Error::Parse("mempool messages are not implemented")); } fn read_filterload(&self, mut _reader: R) -> Result { - trace!("filterload"); - bail!("unimplemented message type") + return Err(Error::Parse("filterload messages are not implemented")); } fn read_filteradd(&self, mut _reader: R) -> Result { - trace!("filteradd"); - bail!("unimplemented message type") + return Err(Error::Parse("filteradd messages are not implemented")); } fn read_filterclear(&self, mut _reader: R) -> Result { - trace!("filterclear"); - bail!("unimplemented message type") + return Err(Error::Parse("filterclear messages are not implemented")); } fn read_merkleblock(&self, mut _reader: R) -> Result { - trace!("merkleblock"); - bail!("unimplemented message type") + return Err(Error::Parse("merkleblock messages are not implemented")); } } diff --git a/zebra-network/src/protocol/inv.rs b/zebra-network/src/protocol/inv.rs index 072e1909..f3c6aeac 100644 --- a/zebra-network/src/protocol/inv.rs +++ b/zebra-network/src/protocol/inv.rs @@ -63,7 +63,7 @@ impl ZcashDeserialize for InventoryHash { 1 => Ok(InventoryHash::Tx(TransactionHash(bytes))), 2 => Ok(InventoryHash::Block(BlockHeaderHash(bytes))), 3 => Ok(InventoryHash::FilteredBlock(BlockHeaderHash(bytes))), - _ => Err(SerializationError::ParseError("invalid inventory code")), + _ => Err(SerializationError::Parse("invalid inventory code")), } } }