diff --git a/zebrad/src/components/inbound.rs b/zebrad/src/components/inbound.rs index 3f2924b7..4d4e1f2d 100644 --- a/zebrad/src/components/inbound.rs +++ b/zebrad/src/components/inbound.rs @@ -10,6 +10,7 @@ use futures::{ future::{FutureExt, TryFutureExt}, stream::Stream, }; +use mem::swap; use oneshot::error::TryRecvError; use tokio::sync::oneshot; use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service, ServiceExt}; @@ -131,6 +132,12 @@ impl Inbound { state, } } + + fn take_setup(&mut self) -> Setup { + let mut network = Setup::FailedInit; + std::mem::swap(&mut self.network, &mut network); + network + } } impl Service for Inbound { @@ -145,64 +152,58 @@ impl Service for Inbound { // and reporting unreadiness might cause unwanted load-shedding, since // the load-shed middleware is unable to distinguish being unready due // to load from being unready while waiting on setup. - if matches!(self.network, Setup::AwaitingNetwork { .. }) { - // Unfortunately, we can't match, swap, and destructure at the same time - let mut awaiting_state = Setup::FailedInit; - mem::swap(&mut self.network, &mut awaiting_state); - if let Setup::AwaitingNetwork { + let mut result = Ok(()); + + self.network = match self.take_setup() { + Setup::AwaitingNetwork { mut network_setup, verifier, - } = awaiting_state - { - match network_setup.try_recv() { - Ok((outbound, address_book)) => { - let downloads = Box::pin(Downloads::new( - Timeout::new(outbound, BLOCK_DOWNLOAD_TIMEOUT), - Timeout::new(verifier, BLOCK_VERIFY_TIMEOUT), - self.state.clone(), - )); - self.network = Setup::Initialized { - address_book, - downloads, - }; - } - Err(TryRecvError::Empty) => { - // There's no setup data yet, so keep waiting for it - self.network = Setup::AwaitingNetwork { - network_setup, - verifier, - }; - } - Err(error @ TryRecvError::Closed) => { - // Mark the service as failed, because network setup failed - error!(?error, "inbound network setup failed"); - let error: SharedRecvError = error.into(); - self.network = Setup::FailedRecv { - error: error.clone(), - }; - return Poll::Ready(Err(error.into())); + } => match network_setup.try_recv() { + Ok((outbound, address_book)) => { + let downloads = Box::pin(Downloads::new( + Timeout::new(outbound, BLOCK_DOWNLOAD_TIMEOUT), + Timeout::new(verifier, BLOCK_VERIFY_TIMEOUT), + self.state.clone(), + )); + Setup::Initialized { + address_book, + downloads, } } + Err(TryRecvError::Empty) => { + // There's no setup data yet, so keep waiting for it + Setup::AwaitingNetwork { + network_setup, + verifier, + } + } + Err(error @ TryRecvError::Closed) => { + // Mark the service as failed, because network setup failed + error!(?error, "inbound network setup failed"); + let error: SharedRecvError = error.into(); + result = Err(error.clone().into()); + Setup::FailedRecv { error } + } + }, + // Make sure we left the network setup in a valid state + Setup::FailedInit => unreachable!("incomplete Inbound initialization"), + // If network setup failed, report service failure + Setup::FailedRecv { error } => { + result = Err(error.clone().into()); + Setup::FailedRecv { error } } - } - - // Unfortunately, we can't combine these matches into an exhaustive match statement, - // because they use mutable references, or they depend on the state we've just modified. - - // Make sure we left the network setup in a valid state - if matches!(self.network, Setup::FailedInit) { - unreachable!("incomplete Inbound initialization"); - } - - // If network setup failed, report service failure - if let Setup::FailedRecv { error } = &mut self.network { - return Poll::Ready(Err(error.clone().into())); - } - - // Clean up completed download tasks, ignoring their results - if let Setup::Initialized { downloads, .. } = &mut self.network { - while let Poll::Ready(Some(_)) = downloads.as_mut().poll_next(cx) {} - } + // Clean up completed download tasks, ignoring their results + Setup::Initialized { + address_book, + mut downloads, + } => { + while let Poll::Ready(Some(_)) = downloads.as_mut().poll_next(cx) {} + Setup::Initialized { + address_book, + downloads, + } + } + }; // TODO: // * do we want to propagate backpressure from the download queue or its outbound network? @@ -213,7 +214,7 @@ impl Service for Inbound { // So we might also want to propagate backpressure from its buffer. // * if we want to propagate backpressure, add a ReadyCache for each service, to ensure // that each poll_ready has a matching call. See #1593 for details. - Poll::Ready(Ok(())) + Poll::Ready(result) } #[instrument(name = "inbound", skip(self, req))]