From 6906f87eadd9df2cba990e1b3e721b6ee25e057e Mon Sep 17 00:00:00 2001 From: Jane Lusby Date: Wed, 10 Feb 2021 15:28:45 -0800 Subject: [PATCH] introduce Transition enum --- zebra-network/src/peer.rs | 1 - zebra-network/src/peer/client.rs | 25 +- zebra-network/src/peer/connection.rs | 574 +++++++++++---------------- zebra-network/src/peer/error.rs | 18 +- zebra-network/src/peer/handshake.rs | 7 +- 5 files changed, 247 insertions(+), 378 deletions(-) diff --git a/zebra-network/src/peer.rs b/zebra-network/src/peer.rs index 1711ee9f..0f9c6be0 100644 --- a/zebra-network/src/peer.rs +++ b/zebra-network/src/peer.rs @@ -15,7 +15,6 @@ use client::ClientRequest; use client::ClientRequestReceiver; use client::InProgressClientRequest; use client::MustUseOneshotSender; -use error::ErrorSlot; pub use client::Client; pub use connection::Connection; diff --git a/zebra-network/src/peer/client.rs b/zebra-network/src/peer/client.rs index d3a5640e..edc36382 100644 --- a/zebra-network/src/peer/client.rs +++ b/zebra-network/src/peer/client.rs @@ -6,14 +6,14 @@ use std::{ use futures::{ channel::{mpsc, oneshot}, - future, ready, + ready, stream::{Stream, StreamExt}, }; use tower::Service; use crate::protocol::internal::{Request, Response}; -use super::{ErrorSlot, PeerError, SharedPeerError}; +use super::{PeerError, SharedPeerError}; /// The "client" duplex half of a peer connection. pub struct Client { @@ -21,7 +21,6 @@ pub struct Client { // This is always Some except when we take it on drop. pub(super) shutdown_tx: Option>, pub(super) server_tx: mpsc::Sender, - pub(super) error_slot: ErrorSlot, } /// A message from the `peer::Client` to the `peer::Server`. @@ -98,13 +97,6 @@ impl From for InProgressClientRequest { } } -impl ClientRequestReceiver { - /// Forwards to `inner.close()` - pub fn close(&mut self) { - self.inner.close() - } -} - impl Stream for ClientRequestReceiver { type Item = InProgressClientRequest; @@ -199,10 +191,7 @@ impl Service for Client { fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { if ready!(self.server_tx.poll_ready(cx)).is_err() { - Poll::Ready(Err(self - .error_slot - .try_get_error() - .expect("failed servers must set their error slot"))) + Poll::Ready(Err(PeerError::ConnectionClosed.into())) } else { Poll::Ready(Ok(())) } @@ -221,13 +210,7 @@ impl Service for Client { match self.server_tx.try_send(ClientRequest { request, span, tx }) { Err(e) => { if e.is_disconnected() { - let ClientRequest { tx, .. } = e.into_inner(); - let _ = tx.send(Err(PeerError::ConnectionClosed.into())); - future::ready(Err(self - .error_slot - .try_get_error() - .expect("failed servers must set their error slot"))) - .boxed() + async { Err(PeerError::ConnectionClosed.into()) }.boxed() } else { // sending fails when there's not enough // channel space, but we called poll_ready diff --git a/zebra-network/src/peer/connection.rs b/zebra-network/src/peer/connection.rs index 893b6388..cc09e341 100644 --- a/zebra-network/src/peer/connection.rs +++ b/zebra-network/src/peer/connection.rs @@ -7,14 +7,18 @@ //! And it's unclear if these assumptions match the `zcashd` implementation. //! It should be refactored into a cleaner set of request/response pairs (#1515). -use std::{collections::HashSet, sync::Arc}; +use std::{ + collections::HashSet, + convert::{TryFrom, TryInto}, + sync::Arc, +}; use futures::{ future::{self, Either}, prelude::*, stream::Stream, }; -use tokio::time::{sleep, Sleep}; +use tokio::time::Sleep; use tower::Service; use tracing_futures::Instrument; @@ -25,7 +29,6 @@ use zebra_chain::{ }; use crate::{ - constants, protocol::{ external::{types::Nonce, InventoryHash, Message}, internal::{Request, Response}, @@ -34,7 +37,7 @@ use crate::{ }; use super::{ - ClientRequestReceiver, ErrorSlot, InProgressClientRequest, MustUseOneshotSender, PeerError, + ClientRequestReceiver, InProgressClientRequest, MustUseOneshotSender, PeerError, SharedPeerError, }; @@ -317,13 +320,151 @@ pub(super) enum State { tx: MustUseOneshotSender>, span: tracing::Span, }, - /// A failure has occurred and we are shutting down the connection. - Failed, +} + +impl State { + async fn step(self, conn: &mut Connection, peer_rx: &mut Rx) -> Transition + where + Rx: Stream> + Unpin, + S: Service, + S::Error: Into, + Tx: Sink + Unpin, + { + match self { + State::AwaitingRequest => { + trace!("awaiting client request or peer message"); + match future::select(peer_rx.next(), conn.client_rx.next()).await { + Either::Left((None, _)) => Transition::Exit(PeerError::ConnectionClosed.into()), + Either::Left((Some(Err(e)), _)) => Transition::Exit(e.into()), + Either::Left((Some(Ok(msg)), _)) => { + match conn.handle_message_as_request(msg).await { + Ok(()) => Transition::AwaitRequest, + Err(e) => Transition::Exit(e.into()), + } + } + Either::Right((None, _)) => { + trace!("client_rx closed, ending connection"); + Transition::Exit(PeerError::ConnectionDropped.into()) + } + Either::Right((Some(req), _)) => { + if req.tx.is_canceled() { + metrics::counter!("peer.canceled", 1); + tracing::debug!("ignoring canceled request"); + return Transition::AwaitRequest; + } + + let span = req.span.clone(); + conn.handle_client_request(req).instrument(span).await + } + } + } + // We're awaiting a response to a client request, + // so wait on either a peer message, or on a request cancellation. + State::AwaitingResponse { + span, + mut tx, + mut handler, + } => { + // we have to get rid of the span reference so we can tamper with the state + let span = span.clone(); + trace!(parent: &span, "awaiting response to client request"); + let timer_ref = conn + .request_timer + .as_mut() + .expect("timeout must be set while awaiting response"); + let cancel = future::select(timer_ref, tx.cancellation()); + match future::select(peer_rx.next(), cancel) + .instrument(span.clone()) + .await + { + Either::Left((None, _)) => Transition::ExitResponse { + e: PeerError::ConnectionClosed.into(), + tx, + }, + Either::Left((Some(Err(e)), _)) => Transition::ExitResponse { e: e.into(), tx }, + Either::Left((Some(Ok(peer_msg)), _cancel)) => { + let request_msg = span.in_scope(|| handler.process_message(peer_msg)); + // If the message was not consumed, check whether it + // should be handled as a request. + if let Some(msg) = request_msg { + // do NOT instrument with the request span, this is + // independent work + match conn.handle_message_as_request(msg).await { + Ok(()) => Transition::AwaitRequest, + Err(e) => Transition::Exit(e.into()), + } + } else { + // Otherwise, check whether the handler is finished + // processing messages and update the state. + match handler { + Handler::Finished(response) => { + let _ = tx.send(response.map_err(Into::into)); + Transition::AwaitRequest + } + _ => Transition::AwaitResponse { tx, handler, span }, + } + } + } + Either::Right((Either::Left(_), _peer_fut)) => { + trace!(parent: &span, "client request timed out"); + let e = PeerError::ClientRequestTimeout; + match handler { + Handler::Ping(_) => Transition::ExitResponse { e: e.into(), tx }, + _ => { + let _ = tx.send(Err(e.into())); + Transition::AwaitRequest + } + } + } + Either::Right((Either::Right(_), _peer_fut)) => { + trace!(parent: &span, "client request was cancelled"); + Transition::AwaitRequest + } + } + } + } + } +} + +/// Enum describing the next state transition that should be taken after any +/// given `step`. +enum Transition { + AwaitRequest, + AwaitResponse { + handler: Handler, + tx: MustUseOneshotSender>, + span: tracing::Span, + }, + // Exiting while no client response is expected + Exit(SharedPeerError), + // Exiting while processing a client response + ExitResponse { + tx: MustUseOneshotSender>, + e: SharedPeerError, + }, +} + +impl TryFrom for State { + type Error = SharedPeerError; + + fn try_from(trans: Transition) -> Result { + match trans { + Transition::AwaitRequest => Ok(State::AwaitingRequest), + Transition::AwaitResponse { handler, tx, span } => { + Ok(State::AwaitingResponse { handler, tx, span }) + } + Transition::Exit(e) => Err(e), + Transition::ExitResponse { tx, e } => { + let _ = tx.send(Err(e.clone())); + Err(e) + } + } + } } /// The state associated with a peer connection. pub struct Connection { - pub(super) state: State, + pub(super) state: Option, /// A timeout for a client request. This is stored separately from /// State so that we can move the future out of it independently of /// other state handling. @@ -332,8 +473,6 @@ pub struct Connection { /// A `mpsc::Receiver` that converts its results to /// `InProgressClientRequest` pub(super) client_rx: ClientRequestReceiver, - /// A slot for an error shared between the Connection and the Client that uses it. - pub(super) error_slot: ErrorSlot, //pub(super) peer_rx: Rx, pub(super) peer_tx: Tx, } @@ -369,287 +508,60 @@ where // If there is a pending request, we wait only on an incoming peer message, and // check whether it can be interpreted as a response to the pending request. loop { - match self.state { - State::AwaitingRequest => { - trace!("awaiting client request or peer message"); - match future::select(peer_rx.next(), self.client_rx.next()).await { - Either::Left((None, _)) => { - self.fail_with(PeerError::ConnectionClosed); - } - Either::Left((Some(Err(e)), _)) => self.fail_with(e), - Either::Left((Some(Ok(msg)), _)) => { - match self.handle_message_as_request(msg).await { - Ok(()) => {} - Err(e) => self.fail_with(e), - } - } - Either::Right((None, _)) => { - trace!("client_rx closed, ending connection"); - return; - } - Either::Right((Some(req), _)) => { - let span = req.span.clone(); - self.handle_client_request(req).instrument(span).await - } - } - } - // We're awaiting a response to a client request, - // so wait on either a peer message, or on a request cancellation. - State::AwaitingResponse { - ref span, - ref mut tx, - .. - } => { - // we have to get rid of the span reference so we can tamper with the state - let span = span.clone(); - trace!(parent: &span, "awaiting response to client request"); - let timer_ref = self - .request_timer - .as_mut() - .expect("timeout must be set while awaiting response"); - let cancel = future::select(timer_ref, tx.cancellation()); - match future::select(peer_rx.next(), cancel) - .instrument(span.clone()) - .await + let transition = self + .state + .take() + .expect("state only None during steps") + .step(&mut self, &mut peer_rx) + .await; + + self.state = match transition.try_into() { + Ok(state) => Some(state), + Err(e) => { + while let Some(InProgressClientRequest { tx, span, .. }) = + self.client_rx.next().await { - Either::Left((None, _)) => self.fail_with(PeerError::ConnectionClosed), - Either::Left((Some(Err(e)), _)) => self.fail_with(e), - Either::Left((Some(Ok(peer_msg)), _cancel)) => { - // Try to process the message using the handler. - // This extremely awkward construction avoids - // keeping a live reference to handler across the - // call to handle_message_as_request, which takes - // &mut self. This is a sign that we don't properly - // factor the state required for inbound and - // outbound requests. - let request_msg = match self.state { - State::AwaitingResponse { - ref mut handler, .. - } => span.in_scope(|| handler.process_message(peer_msg)), - _ => unreachable!("unexpected state after AwaitingResponse: {:?}, peer_msg: {:?}, client_receiver: {:?}", - self.state, - peer_msg, - self.client_rx, - ), - }; - // If the message was not consumed, check whether it - // should be handled as a request. - if let Some(msg) = request_msg { - // do NOT instrument with the request span, this is - // independent work - match self.handle_message_as_request(msg).await { - Ok(()) => {} - Err(e) => self.fail_with(e), - } - } else { - // Otherwise, check whether the handler is finished - // processing messages and update the state. - self.state = match self.state { - State::AwaitingResponse { - handler: Handler::Finished(response), - tx, - .. - } => { - let _ = tx.send(response.map_err(Into::into)); - State::AwaitingRequest - } - pending @ State::AwaitingResponse { .. } => pending, - _ => unreachable!( - "unexpected failed connection state while AwaitingResponse: client_receiver: {:?}", - self.client_rx - ), - }; - } - } - Either::Right((Either::Left(_), _peer_fut)) => { - trace!(parent: &span, "client request timed out"); - let e = PeerError::ClientRequestTimeout; - self.state = match self.state { - // Special case: ping timeouts fail the connection. - State::AwaitingResponse { - handler: Handler::Ping(_), - .. - } => { - self.fail_with(e); - State::Failed - } - // Other request timeouts fail the request. - State::AwaitingResponse { tx, .. } => { - let _ = tx.send(Err(e.into())); - State::AwaitingRequest - } - _ => unreachable!( - "unexpected failed connection state while AwaitingResponse: client_receiver: {:?}", - self.client_rx - ), - }; - } - Either::Right((Either::Right(_), _peer_fut)) => { - trace!(parent: &span, "client request was cancelled"); - self.state = State::AwaitingRequest; - } - } - } - // We've failed, but we need to flush all pending client - // requests before we can return and complete the future. - State::Failed => { - match self.client_rx.next().await { - Some(InProgressClientRequest { tx, span, .. }) => { - trace!( - parent: &span, - "sending an error response to a pending request on a failed connection" - ); - let e = self - .error_slot - .try_get_error() - .expect("cannot enter failed state without setting error slot"); - let _ = tx.send(Err(e)); - // Continue until we've errored all queued reqs - continue; - } - None => return, + trace!( + parent: &span, + "sending an error response to a pending request on a failed connection" + ); + let _ = tx.send(Err(e.clone())); } + return; } } } } - /// Marks the peer as having failed with error `e`. - fn fail_with(&mut self, e: E) - where - E: Into, - { - let e = e.into(); - debug!(%e, - connection_state = ?self.state, - client_receiver = ?self.client_rx, - "failing peer service with error"); - // Update the shared error slot - let mut guard = self - .error_slot - .0 - .lock() - .expect("mutex should be unpoisoned"); - if let Some(original_error) = guard.clone() { - // This panic typically happens due to these bugs: - // * we mark a connection as failed without using fail_with - // * we call fail_with without checking for a failed connection - // state - // - // See the original bug #1510 and PR #1531, and the later bug #1599 - // and PR #1600. - panic!( - "calling fail_with on already-failed connection state: failed connections must stop processing pending requests and responses, then close the connection. state: {:?} original error: {:?} new error: {:?} client receiver: {:?}", - self.state, - original_error, - e, - self.client_rx - ); - } else { - *guard = Some(e); - } - // Drop the guard immediately to release the mutex. - std::mem::drop(guard); - - // We want to close the client channel and set State::Failed so - // that we can flush any pending client requests. However, we may have - // an outstanding client request in State::AwaitingResponse, so - // we need to deal with it first if it exists. - self.client_rx.close(); - let old_state = std::mem::replace(&mut self.state, State::Failed); - if let State::AwaitingResponse { tx, .. } = old_state { - // We know the slot has Some(e) because we just set it above, - // and the error slot is never unset. - let e = self.error_slot.try_get_error().unwrap(); - let _ = tx.send(Err(e)); - } - } - /// Handle an incoming client request, possibly generating outgoing messages to the /// remote peer. /// + /// Correctness: This function MUST only be called while in the AwaitingRequest state + /// /// NOTE: the caller should use .instrument(msg.span) to instrument the function. - async fn handle_client_request(&mut self, req: InProgressClientRequest) { + async fn handle_client_request(&mut self, req: InProgressClientRequest) -> Transition { trace!(?req.request); - use State::*; - - if req.tx.is_canceled() { - metrics::counter!("peer.canceled", 1); - tracing::debug!("ignoring canceled request"); - return; - } - - let new_state_result = self._handle_client_request(req).await; - - // Updates state or fails. - match new_state_result { - Ok(AwaitingRequest) => { - self.state = AwaitingRequest; - self.request_timer = Some(sleep(constants::REQUEST_TIMEOUT)); - } - Ok(new_state @ AwaitingResponse { .. }) => { - self.state = new_state; - self.request_timer = Some(sleep(constants::REQUEST_TIMEOUT)); - } - Err((e, tx)) => { - let e = SharedPeerError::from(e); - let _ = tx.send(Err(e.clone())); - self.fail_with(e); - } - // unreachable states - Ok(Failed) => unreachable!( - "failed client requests must use fail_with(error) to reach a Failed state." - ), - }; - } - - async fn _handle_client_request( - &mut self, - req: InProgressClientRequest, - ) -> Result< - State, - ( - SerializationError, - MustUseOneshotSender>, - ), - > { use Request::*; - use State::*; let InProgressClientRequest { request, tx, span } = req; - match (&self.state, request) { - (Failed, request) => panic!( - "failed connection cannot handle new request: {:?}, client_receiver: {:?}", - request, - self.client_rx - ), - (pending @ AwaitingResponse { .. }, request) => panic!( - "tried to process new request: {:?} while awaiting a response: {:?}, client_receiver: {:?}", - request, - pending, - self.client_rx - ), - (AwaitingRequest, Peers) => match self.peer_tx.send(Message::GetAddr).await { - Ok(()) => Ok( - AwaitingResponse { - handler: Handler::Peers, - tx, - span, - }, - ), - Err(e) => Err((e, tx)), + match request { + Peers => match self.peer_tx.send(Message::GetAddr).await { + Ok(()) => Transition::AwaitResponse { + handler: Handler::Peers, + tx, + span, + }, + Err(e) => Transition::ExitResponse { e: e.into(), tx }, }, - (AwaitingRequest, Ping(nonce)) => match self.peer_tx.send(Message::Ping(nonce)).await { - Ok(()) => Ok( - AwaitingResponse { - handler: Handler::Ping(nonce), - tx, - span, - }, - ), - Err(e) => Err((e, tx)), + Ping(nonce) => match self.peer_tx.send(Message::Ping(nonce)).await { + Ok(()) => Transition::AwaitResponse { + handler: Handler::Ping(nonce), + tx, + span, + }, + Err(e) => Transition::ExitResponse { e: e.into(), tx }, }, - (AwaitingRequest, BlocksByHash(hashes)) => { + BlocksByHash(hashes) => { match self .peer_tx .send(Message::GetData( @@ -657,20 +569,18 @@ where )) .await { - Ok(()) => Ok( - AwaitingResponse { - handler: Handler::BlocksByHash { - blocks: Vec::with_capacity(hashes.len()), - hashes, - }, - tx, - span, + Ok(()) => Transition::AwaitResponse { + handler: Handler::BlocksByHash { + blocks: Vec::with_capacity(hashes.len()), + hashes, }, - ), - Err(e) => Err((e, tx)), + tx, + span, + }, + Err(e) => Transition::ExitResponse { e: e.into(), tx }, } } - (AwaitingRequest, TransactionsByHash(hashes)) => { + TransactionsByHash(hashes) => { match self .peer_tx .send(Message::GetData( @@ -678,75 +588,65 @@ where )) .await { - Ok(()) => Ok( - AwaitingResponse { - handler: Handler::TransactionsByHash { - transactions: Vec::with_capacity(hashes.len()), - hashes, - }, - tx, - span, + Ok(()) => Transition::AwaitResponse { + handler: Handler::TransactionsByHash { + transactions: Vec::with_capacity(hashes.len()), + hashes, }, - ), - Err(e) => Err((e, tx)), + tx, + span, + }, + Err(e) => Transition::ExitResponse { e: e.into(), tx }, } } - (AwaitingRequest, FindBlocks { known_blocks, stop }) => { + FindBlocks { known_blocks, stop } => { match self .peer_tx .send(Message::GetBlocks { known_blocks, stop }) .await { - Ok(()) => Ok( - AwaitingResponse { - handler: Handler::FindBlocks, - tx, - span, - }, - ), - Err(e) => Err((e, tx)), + Ok(()) => Transition::AwaitResponse { + handler: Handler::FindBlocks, + tx, + span, + }, + Err(e) => Transition::ExitResponse { e: e.into(), tx }, } } - (AwaitingRequest, FindHeaders { known_blocks, stop }) => { + FindHeaders { known_blocks, stop } => { match self .peer_tx .send(Message::GetHeaders { known_blocks, stop }) .await { - Ok(()) => Ok( - AwaitingResponse { - handler: Handler::FindHeaders, - tx, - span, - }, - ), - Err(e) => Err((e, tx)), + Ok(()) => Transition::AwaitResponse { + handler: Handler::FindHeaders, + tx, + span, + }, + Err(e) => Transition::ExitResponse { e: e.into(), tx }, } } - (AwaitingRequest, MempoolTransactions) => { - match self.peer_tx.send(Message::Mempool).await { - Ok(()) => Ok( - AwaitingResponse { - handler: Handler::MempoolTransactions, - tx, - span, - }, - ), - Err(e) => Err((e, tx)), - } - } - (AwaitingRequest, PushTransaction(transaction)) => { + MempoolTransactions => match self.peer_tx.send(Message::Mempool).await { + Ok(()) => Transition::AwaitResponse { + handler: Handler::MempoolTransactions, + tx, + span, + }, + Err(e) => Transition::ExitResponse { e: e.into(), tx }, + }, + PushTransaction(transaction) => { match self.peer_tx.send(Message::Tx(transaction)).await { Ok(()) => { // Since we're not waiting for further messages, we need to // send a response before dropping tx. let _ = tx.send(Ok(Response::Nil)); - Ok(AwaitingRequest) - }, - Err(e) => Err((e, tx)), + Transition::AwaitRequest + } + Err(e) => Transition::ExitResponse { e: e.into(), tx }, } } - (AwaitingRequest, AdvertiseTransactions(hashes)) => { + AdvertiseTransactions(hashes) => { match self .peer_tx .send(Message::Inv(hashes.iter().map(|h| (*h).into()).collect())) @@ -756,20 +656,20 @@ where // Since we're not waiting for further messages, we need to // send a response before dropping tx. let _ = tx.send(Ok(Response::Nil)); - Ok(AwaitingRequest) - }, - Err(e) => Err((e, tx)), + Transition::AwaitRequest + } + Err(e) => Transition::ExitResponse { e: e.into(), tx }, } } - (AwaitingRequest, AdvertiseBlock(hash)) => { + AdvertiseBlock(hash) => { match self.peer_tx.send(Message::Inv(vec![hash.into()])).await { Ok(()) => { // Since we're not waiting for further messages, we need to // send a response before dropping tx. let _ = tx.send(Ok(Response::Nil)); - Ok(AwaitingRequest) - }, - Err(e) => Err((e, tx)), + Transition::AwaitRequest + } + Err(e) => Transition::ExitResponse { e: e.into(), tx }, } } } @@ -902,7 +802,7 @@ where }; match rsp { - Response::Nil => { /* generic success, do nothing */ }, + Response::Nil => { /* generic success, do nothing */ } Response::Peers(addrs) => self.peer_tx.send(Message::Addr(addrs)).await?, Response::Transactions(transactions) => { // Generate one tx message per transaction. diff --git a/zebra-network/src/peer/error.rs b/zebra-network/src/peer/error.rs index d18f6b93..8436fb54 100644 --- a/zebra-network/src/peer/error.rs +++ b/zebra-network/src/peer/error.rs @@ -1,4 +1,4 @@ -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use thiserror::Error; @@ -28,6 +28,9 @@ pub enum PeerError { /// The remote peer closed the connection. #[error("Peer closed connection")] ConnectionClosed, + /// The local client closed the connection. + #[error("Internal client dropped connection")] + ConnectionDropped, /// The remote peer did not respond to a [`peer::Client`] request in time. #[error("Client request timed out")] ClientRequestTimeout, @@ -64,19 +67,6 @@ pub enum PeerError { NotFound(Vec), } -#[derive(Default, Clone)] -pub(super) struct ErrorSlot(pub(super) Arc>>); - -impl ErrorSlot { - pub fn try_get_error(&self) -> Option { - self.0 - .lock() - .expect("error mutex should be unpoisoned") - .as_ref() - .cloned() - } -} - /// An error during a handshake with a remote peer. #[derive(Error, Debug)] pub enum HandshakeError { diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index 2d859724..92fc366d 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -30,7 +30,7 @@ use crate::{ BoxError, Config, PeerAddrState, }; -use super::{Client, Connection, ErrorSlot, HandshakeError, PeerError}; +use super::{Client, Connection, HandshakeError, PeerError}; /// A [`Service`] that handshakes with a remote peer and constructs a /// client/server pair. @@ -349,12 +349,10 @@ where // in this block, see constants.rs for more. let (server_tx, server_rx) = mpsc::channel(0); let (shutdown_tx, shutdown_rx) = oneshot::channel(); - let slot = ErrorSlot::default(); let client = Client { shutdown_tx: Some(shutdown_tx), server_tx: server_tx.clone(), - error_slot: slot.clone(), }; let (peer_tx, peer_rx) = stream.split(); @@ -434,10 +432,9 @@ where use super::connection; let server = Connection { - state: connection::State::AwaitingRequest, + state: Some(connection::State::AwaitingRequest), svc: inbound_service, client_rx: server_rx.into(), - error_slot: slot, peer_tx, request_timer: None, };