Stop using ServiceExt::call_all due to buffer bugs
ServiceExt::call_all leaks Tower::Buffer reservations, so we can't use it in Zebra. Instead, use a loop in the returned future. See #1593 for details.
This commit is contained in:
parent
64bc45cd2e
commit
94eb91305b
|
|
@ -530,13 +530,6 @@ impl Service<Request> for StateService {
|
||||||
type Future =
|
type Future =
|
||||||
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
|
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
|
||||||
|
|
||||||
// ## Correctness:
|
|
||||||
//
|
|
||||||
// This function must not return Poll::Pending, unless:
|
|
||||||
// 1. We remove all instances of `call_all` on the state service, or fix the leaked
|
|
||||||
// service reservation in the `CallAll` implementation:
|
|
||||||
// https://github.com/tower-rs/tower/blob/master/tower/src/util/call_all/common.rs#L112
|
|
||||||
// 2. We schedule the current task for wakeup via the `Context`
|
|
||||||
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -7,7 +7,7 @@ use std::{
|
||||||
|
|
||||||
use futures::{
|
use futures::{
|
||||||
future::{FutureExt, TryFutureExt},
|
future::{FutureExt, TryFutureExt},
|
||||||
stream::{Stream, TryStreamExt},
|
stream::Stream,
|
||||||
};
|
};
|
||||||
use tokio::sync::oneshot;
|
use tokio::sync::oneshot;
|
||||||
use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service, ServiceExt};
|
use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service, ServiceExt};
|
||||||
|
|
@ -52,12 +52,6 @@ pub type SetupData = (Outbound, Arc<Mutex<AddressBook>>);
|
||||||
/// behind the current tip, while the `Inbound` service is *externally driven*,
|
/// behind the current tip, while the `Inbound` service is *externally driven*,
|
||||||
/// responding to block gossip by attempting to download and validate advertised
|
/// responding to block gossip by attempting to download and validate advertised
|
||||||
/// blocks.
|
/// blocks.
|
||||||
///
|
|
||||||
/// ## Correctness
|
|
||||||
///
|
|
||||||
/// The `state` service must not return `Poll::Pending`. If it does, a bug in the
|
|
||||||
/// `ServiceExt::call_all` implementation might cause the `state` buffer to fill
|
|
||||||
/// up, and make Zebra hang.
|
|
||||||
pub struct Inbound {
|
pub struct Inbound {
|
||||||
// invariants:
|
// invariants:
|
||||||
// * Before setup: address_book and downloads are None, and the *_setup members are Some
|
// * Before setup: address_book and downloads are None, and the *_setup members are Some
|
||||||
|
|
@ -160,18 +154,14 @@ impl Service<zn::Request> for Inbound {
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO:
|
// TODO:
|
||||||
// * do we want to propagate backpressure from the download queue or its outbound network here?
|
// * do we want to propagate backpressure from the download queue or its outbound network?
|
||||||
// currently, the download queue waits for the outbound network in the download future, and
|
// currently, the download queue waits for the outbound network in the download future,
|
||||||
// drops new requests after it reaches a hard-coded limit. This is the "load shed directly"
|
// and drops new requests after it reaches a hard-coded limit. This is the
|
||||||
// pattern from #1618.
|
// "load shed directly" pattern from #1618.
|
||||||
// * if we want to propagate backpressure, add a ReadyCache to ensure that each poll_ready
|
// * currently, the state service is always ready, unless its buffer is full.
|
||||||
// has a matching call. See #1593 for details.
|
// So we might also want to propagate backpressure from its buffer.
|
||||||
|
// * if we want to propagate backpressure, add a ReadyCache for each service, to ensure
|
||||||
// Ignore state readiness, to avoid reserving its buffer slots.
|
// that each poll_ready has a matching call. See #1593 for details.
|
||||||
// We can't use a state ReadyCache, because call_all uses state directly.
|
|
||||||
// We can't call state.poll_ready, because:
|
|
||||||
// * call_all also calls poll_ready
|
|
||||||
// * some requests don't use the state
|
|
||||||
Poll::Ready(Ok(()))
|
Poll::Ready(Ok(()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -194,34 +184,26 @@ impl Service<zn::Request> for Inbound {
|
||||||
zn::Request::BlocksByHash(hashes) => {
|
zn::Request::BlocksByHash(hashes) => {
|
||||||
// Correctness:
|
// Correctness:
|
||||||
//
|
//
|
||||||
// We don't need to use ServiceExt::oneshot here, because
|
// We can't use `call_all` here, because it leaks buffer slots:
|
||||||
// call_all uses poll_ready internally.
|
|
||||||
//
|
|
||||||
// The state must not return Poll::Pending, because call_all
|
|
||||||
// leaks a buffer reservation every time that happens
|
|
||||||
// https://github.com/tower-rs/tower/blob/master/tower/src/util/call_all/common.rs#L112
|
// https://github.com/tower-rs/tower/blob/master/tower/src/util/call_all/common.rs#L112
|
||||||
let state = self.state.clone();
|
let mut state = self.state.clone();
|
||||||
let requests = futures::stream::iter(
|
async move {
|
||||||
hashes
|
let mut blocks = Vec::new();
|
||||||
.into_iter()
|
for hash in hashes {
|
||||||
.map(|hash| zs::Request::Block(hash.into())),
|
let request = zs::Request::Block(hash.into());
|
||||||
);
|
// we can't use ServiceExt::oneshot here, due to lifetime issues
|
||||||
|
match state.ready_and().await?.call(request).await? {
|
||||||
state
|
zs::Response::Block(Some(block)) => blocks.push(block),
|
||||||
.call_all(requests)
|
|
||||||
.try_filter_map(|rsp| {
|
|
||||||
futures::future::ready(match rsp {
|
|
||||||
zs::Response::Block(Some(block)) => Ok(Some(block)),
|
|
||||||
// `zcashd` ignores missing blocks in GetData responses,
|
// `zcashd` ignores missing blocks in GetData responses,
|
||||||
// rather than including them in a trailing `NotFound`
|
// rather than including them in a trailing `NotFound`
|
||||||
// message
|
// message
|
||||||
zs::Response::Block(None) => Ok(None),
|
zs::Response::Block(None) => {}
|
||||||
_ => unreachable!("wrong response from state"),
|
_ => unreachable!("wrong response from state"),
|
||||||
})
|
}
|
||||||
})
|
}
|
||||||
.try_collect::<Vec<_>>()
|
Ok(zn::Response::Blocks(blocks))
|
||||||
.map_ok(zn::Response::Blocks)
|
}
|
||||||
.boxed()
|
.boxed()
|
||||||
}
|
}
|
||||||
zn::Request::TransactionsByHash(_transactions) => {
|
zn::Request::TransactionsByHash(_transactions) => {
|
||||||
// `zcashd` returns a list of found transactions, followed by a
|
// `zcashd` returns a list of found transactions, followed by a
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue