From 65877cb4b173e9ded7cc905030a49436e4f6c9c7 Mon Sep 17 00:00:00 2001 From: Henry de Valence Date: Fri, 18 Sep 2020 14:11:29 -0700 Subject: [PATCH] zebrad: make Inbound propagate backpressure --- zebrad/src/components/inbound.rs | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/zebrad/src/components/inbound.rs b/zebrad/src/components/inbound.rs index 40503a50..a811b4d8 100644 --- a/zebrad/src/components/inbound.rs +++ b/zebrad/src/components/inbound.rs @@ -50,10 +50,11 @@ impl Service for Inbound { type Future = Pin> + Send + 'static>>; - #[instrument(skip(self, _cx))] - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + #[instrument(skip(self, cx))] + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { use oneshot::error::TryRecvError; match self.network_setup.take() { + // If we're waiting for setup, check if we became ready Some(mut rx) => match rx.try_recv() { Ok((outbound, address_book)) => { self.outbound = Some(outbound); @@ -73,7 +74,17 @@ impl Service for Inbound { Poll::Pending } }, - None => Poll::Ready(Ok(())), + // Otherwise, check readiness of services we might call to propagate backpressure. + None => { + match ( + self.state.poll_ready(cx), + self.outbound.as_mut().unwrap().poll_ready(cx), + ) { + (Poll::Ready(Err(e)), _) | (_, Poll::Ready(Err(e))) => Poll::Ready(Err(e)), + (Poll::Pending, _) | (_, Poll::Pending) => Poll::Pending, + (Poll::Ready(Ok(())), Poll::Ready(Ok(()))) => Poll::Ready(Ok(())), + } + } } }