From a5e89f4f2baf28d1c360c4d6a474496081896152 Mon Sep 17 00:00:00 2001 From: teor Date: Tue, 23 Feb 2021 10:54:48 +1000 Subject: [PATCH] Revert "accidental drop on mustusesender" This reverts commit 5ec8d09e0dd9a9233bf0c4b02f2ad0646cc1d982. --- zebra-network/src/peer/connection.rs | 46 ++++++++++------------------ 1 file changed, 17 insertions(+), 29 deletions(-) diff --git a/zebra-network/src/peer/connection.rs b/zebra-network/src/peer/connection.rs index aca6fd95..cc09e341 100644 --- a/zebra-network/src/peer/connection.rs +++ b/zebra-network/src/peer/connection.rs @@ -18,7 +18,7 @@ use futures::{ prelude::*, stream::Stream, }; -use tokio::time::{sleep, Sleep}; +use tokio::time::Sleep; use tower::Service; use tracing_futures::Instrument; @@ -29,7 +29,6 @@ use zebra_chain::{ }; use crate::{ - constants, protocol::{ external::{types::Nonce, InventoryHash, Message}, internal::{Request, Response}, @@ -378,15 +377,11 @@ impl State { .instrument(span.clone()) .await { - Either::Left((None, _)) => { - Transition::ExitResponse { - e: PeerError::ConnectionClosed.into(), - tx, - } - } - Either::Left((Some(Err(e)), _)) => { - Transition::ExitResponse { e: e.into(), tx } - } + Either::Left((None, _)) => Transition::ExitResponse { + e: PeerError::ConnectionClosed.into(), + tx, + }, + Either::Left((Some(Err(e)), _)) => Transition::ExitResponse { e: e.into(), tx }, Either::Left((Some(Ok(peer_msg)), _cancel)) => { let request_msg = span.in_scope(|| handler.process_message(peer_msg)); // If the message was not consumed, check whether it @@ -395,11 +390,8 @@ impl State { // do NOT instrument with the request span, this is // independent work match conn.handle_message_as_request(msg).await { - Ok(()) => { - Transition::AwaitResponse { tx, handler, span } - // Transition::AwaitRequest - } - Err(e) => Transition::ExitResponse { e: e.into(), tx }, + Ok(()) => Transition::AwaitRequest, + Err(e) => Transition::Exit(e.into()), } } else { // Otherwise, check whether the handler is finished @@ -523,22 +515,18 @@ where .step(&mut self, &mut peer_rx) .await; - if matches!(transition, Transition::AwaitResponse { .. }) { - self.request_timer = Some(sleep(constants::REQUEST_TIMEOUT)); - } - self.state = match transition.try_into() { Ok(state) => Some(state), Err(e) => { - // while let Some(InProgressClientRequest { tx, span, .. }) = - // self.client_rx.next().await - // { - // trace!( - // parent: &span, - // "sending an error response to a pending request on a failed connection" - // ); - // let _ = tx.send(Err(e.clone())); - // } + while let Some(InProgressClientRequest { tx, span, .. }) = + self.client_rx.next().await + { + trace!( + parent: &span, + "sending an error response to a pending request on a failed connection" + ); + let _ = tx.send(Err(e.clone())); + } return; } }