Wake waiting tower-batch tasks on drop
When other tower-batch tasks drop, wake any tasks that are waiting for a semaphore permit. Otherwise, tower-batch can hang. We currently pin tower in our workspace to: d4d1c67 hedge: use auto-resizing histograms (tower-rs/tower#484) Copy tower/src/semaphore.rs from that commit, to pick up tower-rs/tower#480.
This commit is contained in:
parent
76e4b8f693
commit
47084ea85e
|
|
@ -1,6 +1,8 @@
|
|||
// Copied from tower/src/semaphore.rs
|
||||
// When/if tower-batch is upstreamed, delete this file
|
||||
// and use the common tower semaphore implementation
|
||||
// Copied from tower/src/semaphore.rs, commit:
|
||||
// d4d1c67 hedge: use auto-resizing histograms (#484)
|
||||
//
|
||||
// When we upgrade to tower 0.4, we can use tokio's PollSemaphore, like tower's:
|
||||
// ccfaffc buffer, limit: use `tokio-util`'s `PollSemaphore` (#556)
|
||||
|
||||
pub(crate) use self::sync::OwnedSemaphorePermit as Permit;
|
||||
use futures_core::ready;
|
||||
|
|
@ -9,7 +11,7 @@ use std::{
|
|||
future::Future,
|
||||
mem,
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
sync::{Arc, Weak},
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use tokio::sync;
|
||||
|
|
@ -20,13 +22,32 @@ pub(crate) struct Semaphore {
|
|||
state: State,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct Close {
|
||||
semaphore: Weak<sync::Semaphore>,
|
||||
permits: usize,
|
||||
}
|
||||
|
||||
enum State {
|
||||
Waiting(Pin<Box<dyn Future<Output = Permit> + Send + Sync + 'static>>),
|
||||
Waiting(Pin<Box<dyn Future<Output = Permit> + Send + 'static>>),
|
||||
Ready(Permit),
|
||||
Empty,
|
||||
}
|
||||
|
||||
impl Semaphore {
|
||||
pub(crate) fn new_with_close(permits: usize) -> (Self, Close) {
|
||||
let semaphore = Arc::new(sync::Semaphore::new(permits));
|
||||
let close = Close {
|
||||
semaphore: Arc::downgrade(&semaphore),
|
||||
permits,
|
||||
};
|
||||
let semaphore = Self {
|
||||
semaphore,
|
||||
state: State::Empty,
|
||||
};
|
||||
(semaphore, close)
|
||||
}
|
||||
|
||||
pub(crate) fn new(permits: usize) -> Self {
|
||||
Self {
|
||||
semaphore: Arc::new(sync::Semaphore::new(permits)),
|
||||
|
|
@ -76,3 +97,23 @@ impl fmt::Debug for State {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Close {
|
||||
/// Close the semaphore, waking any remaining tasks currently awaiting a permit.
|
||||
pub(crate) fn close(self) {
|
||||
// The maximum number of permits that a `tokio::sync::Semaphore`
|
||||
// can hold is usize::MAX >> 3. If we attempt to add more than that
|
||||
// number of permits, the semaphore will panic.
|
||||
// XXX(eliza): another shift is kinda janky but if we add (usize::MAX
|
||||
// > 3 - initial permits) the semaphore impl panics (I think due to a
|
||||
// bug in tokio?).
|
||||
// TODO(eliza): Tokio should _really_ just expose `Semaphore::close`
|
||||
// publicly so we don't have to do this nonsense...
|
||||
const MAX: usize = std::usize::MAX >> 4;
|
||||
if let Some(semaphore) = self.semaphore.upgrade() {
|
||||
// If we added `MAX - available_permits`, any tasks that are
|
||||
// currently holding permits could drop them, overflowing the max.
|
||||
semaphore.add_permits(MAX - self.permits);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue