Shut down channels and tasks on PeerSet Drop (#3078)

* Shut down channels and tasks on PeerSet Drop

* Document all the PeerSet fields

* Close the peer set background task handle on shutdown

* Receive background tasks during shutdown

Also, split receiving and polling background tasks into separate methods.
This commit is contained in:
teor 2021-11-23 11:29:34 +10:00 committed by GitHub
parent e054d57622
commit b39f4ca5aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 144 additions and 29 deletions

View File

@ -108,37 +108,80 @@ pub struct CancelClientWork;
/// Otherwise, malicious peers could interfere with other peers' `PeerSet` state.
pub struct PeerSet<D>
where
D: Discover<Key = SocketAddr>,
D: Discover<Key = SocketAddr> + Unpin,
D::Service: Service<Request, Response = Response> + Load,
D::Error: Into<BoxError>,
<D::Service as Service<Request>>::Error: Into<BoxError> + 'static,
<D::Service as Service<Request>>::Future: Send + 'static,
<D::Service as Load>::Metric: Debug,
{
/// Provides new and deleted peer [`Change`]s to the peer set,
/// via the [`Discover`] trait implementation.
discover: D,
/// Connected peers that are ready to receive requests from Zebra,
/// or send requests to Zebra.
ready_services: IndexMap<D::Key, D::Service>,
/// A preselected index for a ready service.
/// INVARIANT: If this is `Some(i)`, `i` must be a valid index for `ready_services`.
/// This means that every change to `ready_services` must invalidate or correct it.
preselected_p2c_index: Option<usize>,
ready_services: IndexMap<D::Key, D::Service>,
cancel_handles: HashMap<D::Key, oneshot::Sender<CancelClientWork>>,
/// Stores gossiped inventory from connected peers.
/// Used to route inventory requests to peers that are likely to have it.
inventory_registry: InventoryRegistry,
/// Connected peers that are handling a Zebra request,
/// or Zebra is handling one of their requests.
unready_services: FuturesUnordered<UnreadyService<D::Key, D::Service, Request>>,
/// Channels used to cancel the request that an unready service is doing.
cancel_handles: HashMap<D::Key, oneshot::Sender<CancelClientWork>>,
/// A channel that asks the peer crawler task to connect to more peers.
demand_signal: mpsc::Sender<MorePeers>,
/// Channel for passing ownership of tokio JoinHandles from PeerSet's background tasks
///
/// The join handles passed into the PeerSet are used populate the `guards` member
handle_rx: tokio::sync::oneshot::Receiver<Vec<JoinHandle<Result<(), BoxError>>>>,
/// Unordered set of handles to background tasks associated with the `PeerSet`
///
/// These guards are checked for errors as part of `poll_ready` which lets
/// the `PeerSet` propagate errors from background tasks back to the user
guards: futures::stream::FuturesUnordered<JoinHandle<Result<(), BoxError>>>,
inventory_registry: InventoryRegistry,
/// The last time we logged a message about the peer set size
last_peer_log: Option<Instant>,
/// A shared list of peer addresses.
///
/// Used for logging diagnostics.
address_book: Arc<std::sync::Mutex<AddressBook>>,
/// The last time we logged a message about the peer set size
last_peer_log: Option<Instant>,
/// The configured limit for inbound and outbound connections.
///
/// The peer set panics if this size is exceeded.
/// If that happens, our connection limit code has a bug.
peerset_total_connection_limit: usize,
}
impl<D> Drop for PeerSet<D>
where
D: Discover<Key = SocketAddr> + Unpin,
D::Service: Service<Request, Response = Response> + Load,
D::Error: Into<BoxError>,
<D::Service as Service<Request>>::Error: Into<BoxError> + 'static,
<D::Service as Service<Request>>::Future: Send + 'static,
<D::Service as Load>::Metric: Debug,
{
fn drop(&mut self) {
self.shut_down_tasks_and_channels()
}
}
impl<D> PeerSet<D>
where
D: Discover<Key = SocketAddr> + Unpin,
@ -169,15 +212,22 @@ where
address_book: Arc<std::sync::Mutex<AddressBook>>,
) -> Self {
Self {
// Ready peers
discover,
preselected_p2c_index: None,
ready_services: IndexMap::new(),
cancel_handles: HashMap::new(),
unready_services: FuturesUnordered::new(),
demand_signal,
guards: futures::stream::FuturesUnordered::new(),
handle_rx,
preselected_p2c_index: None,
inventory_registry: InventoryRegistry::new(inv_stream),
// Unready peers
unready_services: FuturesUnordered::new(),
cancel_handles: HashMap::new(),
demand_signal,
// Background tasks
handle_rx,
guards: futures::stream::FuturesUnordered::new(),
// Metrics
last_peer_log: None,
address_book,
peerset_total_connection_limit: config.peerset_total_connection_limit(),
@ -189,42 +239,107 @@ where
/// If any background task exits, shuts down all other background tasks,
/// and returns an error.
fn poll_background_errors(&mut self, cx: &mut Context) -> Result<(), BoxError> {
if self.guards.is_empty() {
match self.handle_rx.try_recv() {
Ok(handles) => {
for handle in handles {
self.guards.push(handle);
}
}
Err(TryRecvError::Closed) => unreachable!(
"try_recv will never be called if the futures have already been received"
),
Err(TryRecvError::Empty) => return Ok(()),
}
if let Some(result) = self.receive_tasks_if_needed() {
return result;
}
let exit_error = match Pin::new(&mut self.guards).poll_next(cx) {
Poll::Pending => return Ok(()),
match Pin::new(&mut self.guards).poll_next(cx) {
// All background tasks are still running.
Poll::Pending => Ok(()),
Poll::Ready(Some(res)) => {
info!(
background_tasks = %self.guards.len(),
"a peer set background task exited, shutting down other peer set tasks"
);
self.shut_down_tasks_and_channels();
// Flatten the join result and inner result,
// then turn Ok() task exits into errors.
res.map_err(Into::into)
// TODO: replace with Result::flatten when it stabilises (#70142)
.and_then(convert::identity)
.and(Err("a peer set background task exited".into()))
}
Poll::Ready(None) => Err("all peer set background tasks have exited".into()),
};
Poll::Ready(None) => {
self.shut_down_tasks_and_channels();
Err("all peer set background tasks have exited".into())
}
}
}
/// Receive background tasks, if they've been sent on the channel,
/// but not consumed yet.
///
/// Returns a result representing the current task state,
/// or `None` if the background tasks should be polled to check their state.
fn receive_tasks_if_needed(&mut self) -> Option<Result<(), BoxError>> {
if self.guards.is_empty() {
match self.handle_rx.try_recv() {
// The tasks haven't been sent yet.
Err(TryRecvError::Empty) => Some(Ok(())),
// The tasks have been sent, but not consumed.
Ok(handles) => {
// Currently, the peer set treats an empty background task set as an error.
//
// TODO: refactor `handle_rx` and `guards` into an enum
// for the background task state: Waiting/Running/Shutdown.
assert!(
!handles.is_empty(),
"the peer set requires at least one background task"
);
for handle in handles {
self.guards.push(handle);
}
None
}
// The tasks have been sent and consumed, but then they exited.
//
// Correctness: the peer set must receive at least one task.
//
// TODO: refactor `handle_rx` and `guards` into an enum
// for the background task state: Waiting/Running/Shutdown.
Err(TryRecvError::Closed) => {
Some(Err("all peer set background tasks have exited".into()))
}
}
} else {
None
}
}
/// Shut down:
/// - services by dropping the service lists
/// - background tasks via their join handles or cancel handles
/// - channels by closing the channel
fn shut_down_tasks_and_channels(&mut self) {
// Drop services and cancel their background tasks.
self.preselected_p2c_index = None;
self.ready_services = IndexMap::new();
for (_peer_key, handle) in self.cancel_handles.drain() {
let _ = handle.send(CancelClientWork);
}
self.unready_services = FuturesUnordered::new();
// Close the MorePeers channel for all senders,
// so we don't add more peers to a shut down peer set.
self.demand_signal.close_channel();
// Shut down background tasks.
self.handle_rx.close();
self.receive_tasks_if_needed();
for guard in self.guards.iter() {
guard.abort();
}
exit_error
// TODO: implement graceful shutdown for InventoryRegistry (#1678)
}
fn poll_unready(&mut self, cx: &mut Context<'_>) {