Remove services that are never `call`ed from Inbound

Uses the `ServiceExt::oneshot` design pattern from #1593.
This commit is contained in:
teor 2021-01-22 20:41:05 +10:00
parent 25b6491929
commit 2a25b9ee72
1 changed files with 33 additions and 31 deletions

View File

@ -53,7 +53,7 @@ pub type SetupData = (Outbound, Arc<Mutex<AddressBook>>);
/// responding to block gossip by attempting to download and validate advertised /// responding to block gossip by attempting to download and validate advertised
/// blocks. /// blocks.
pub struct Inbound { pub struct Inbound {
// invariant: address_book, outbound, downloads are Some if network_setup is None // invariant: address_book and downloads are Some if network_setup or verifier are None
// //
// why not use an enum for the inbound state? because it would mean // why not use an enum for the inbound state? because it would mean
// match-wrapping the body of Service::call rather than just expect()ing // match-wrapping the body of Service::call rather than just expect()ing
@ -62,28 +62,29 @@ pub struct Inbound {
// Setup // Setup
/// A oneshot channel used to receive the address_book and outbound services /// A oneshot channel used to receive the address_book and outbound services
/// after the network is set up. /// after the network is set up.
///
/// `None` after the network is set up.
network_setup: Option<oneshot::Receiver<SetupData>>, network_setup: Option<oneshot::Receiver<SetupData>>,
/// A service that verifies downloaded blocks. Given to `downloads`
/// after the network is set up.
///
/// `None` after the network is set up and `downloads` is created.
verifier: Option<Verifier>,
// Services // Services
/// A list of peer addresses. /// A service that maintains a list of peer addresses.
///
/// `None` until the network is set up.
address_book: Option<Arc<Mutex<zn::AddressBook>>>, address_book: Option<Arc<Mutex<zn::AddressBook>>>,
/// A service that downloads and verifies gossipped blocks. /// A stream that downloads and verifies gossipped blocks.
downloads: Option<Pin<Box<Downloads<Timeout<Outbound>, Timeout<Verifier>, State>>>>,
/// A service that forwards requests to connected peers, and returns their
/// responses.
/// ///
/// Only used for readiness checks, and via `downloads`. /// `None` until the network is set up.
outbound: Option<Outbound>, downloads: Option<Pin<Box<Downloads<Timeout<Outbound>, Timeout<Verifier>, State>>>>,
/// A service that manages cached blockchain state. /// A service that manages cached blockchain state.
state: State, state: State,
/// A service that verifies downloaded blocks.
///
/// Only used for readiness checks, and via `downloads`.
verifier: Verifier,
} }
impl Inbound { impl Inbound {
@ -94,11 +95,10 @@ impl Inbound {
) -> Self { ) -> Self {
Self { Self {
network_setup: Some(network_setup), network_setup: Some(network_setup),
verifier: Some(verifier),
address_book: None, address_book: None,
downloads: None, downloads: None,
outbound: None,
state, state,
verifier,
} }
} }
} }
@ -119,14 +119,18 @@ impl Service<zn::Request> for Inbound {
use oneshot::error::TryRecvError; use oneshot::error::TryRecvError;
match rx.try_recv() { match rx.try_recv() {
Ok((outbound, address_book)) => { Ok((outbound, address_book)) => {
self.outbound = Some(outbound.clone());
self.address_book = Some(address_book); self.address_book = Some(address_book);
self.network_setup = None;
self.downloads = Some(Box::pin(Downloads::new( self.downloads = Some(Box::pin(Downloads::new(
Timeout::new(outbound, BLOCK_DOWNLOAD_TIMEOUT), Timeout::new(outbound, BLOCK_DOWNLOAD_TIMEOUT),
Timeout::new(self.verifier.clone(), BLOCK_VERIFY_TIMEOUT), Timeout::new(
self.verifier
.take()
.expect("verifier is Some when network_setup is Some"),
BLOCK_VERIFY_TIMEOUT,
),
self.state.clone(), self.state.clone(),
))); )));
self.network_setup = None;
} }
Err(TryRecvError::Empty) => { Err(TryRecvError::Empty) => {
self.network_setup = Some(rx); self.network_setup = Some(rx);
@ -141,23 +145,21 @@ impl Service<zn::Request> for Inbound {
}; };
} }
// Clean up completed download tasks // Clean up completed download tasks, ignoring their results
if let Some(downloads) = self.downloads.as_mut() { if let Some(downloads) = self.downloads.as_mut() {
while let Poll::Ready(Some(_)) = downloads.as_mut().poll_next(cx) {} while let Poll::Ready(Some(_)) = downloads.as_mut().poll_next(cx) {}
} }
// Now report readiness based on readiness of the inner services, if they're available. // Now report readiness based on readiness of the inner services, if they're available.
// XXX do we want to propagate backpressure from the network here? //
match ( // TODO: do we want to propagate backpressure from the download queue or its outbound network here?
self.state.poll_ready(cx), // currently, the download queue waits for the outbound network in the download future, and
self.outbound // drops new requests after it reaches a hard-coded limit. This is the "load shed directly"
.as_mut() // pattern from #1618.
.map(|svc| svc.poll_ready(cx)) match self.state.poll_ready(cx) {
.unwrap_or(Poll::Ready(Ok(()))), Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
) { Poll::Pending => Poll::Pending,
(Poll::Ready(Err(e)), _) | (_, Poll::Ready(Err(e))) => Poll::Ready(Err(e)), Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
(Poll::Pending, _) | (_, Poll::Pending) => Poll::Pending,
(Poll::Ready(Ok(())), Poll::Ready(Ok(()))) => Poll::Ready(Ok(())),
} }
} }