Refactor and document correctness for std::sync::Mutex in ErrorSlot
This commit is contained in:
parent
3f45735f3f
commit
905b90d6a1
|
|
@ -516,6 +516,12 @@ where
|
||||||
parent: &span,
|
parent: &span,
|
||||||
"sending an error response to a pending request on a failed connection"
|
"sending an error response to a pending request on a failed connection"
|
||||||
);
|
);
|
||||||
|
// Correctness
|
||||||
|
//
|
||||||
|
// Error slots use a threaded `std::sync::Mutex`, so
|
||||||
|
// accessing the slot can block the async task's
|
||||||
|
// current thread. So we only hold the lock for long
|
||||||
|
// enough to get a reference to the error.
|
||||||
let e = self
|
let e = self
|
||||||
.error_slot
|
.error_slot
|
||||||
.try_get_error()
|
.try_get_error()
|
||||||
|
|
@ -532,6 +538,10 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Marks the peer as having failed with error `e`.
|
/// Marks the peer as having failed with error `e`.
|
||||||
|
///
|
||||||
|
/// # Panics
|
||||||
|
///
|
||||||
|
/// If `self` has already failed with a previous error.
|
||||||
fn fail_with<E>(&mut self, e: E)
|
fn fail_with<E>(&mut self, e: E)
|
||||||
where
|
where
|
||||||
E: Into<SharedPeerError>,
|
E: Into<SharedPeerError>,
|
||||||
|
|
@ -541,32 +551,28 @@ where
|
||||||
connection_state = ?self.state,
|
connection_state = ?self.state,
|
||||||
client_receiver = ?self.client_rx,
|
client_receiver = ?self.client_rx,
|
||||||
"failing peer service with error");
|
"failing peer service with error");
|
||||||
|
|
||||||
// Update the shared error slot
|
// Update the shared error slot
|
||||||
let mut guard = self
|
//
|
||||||
.error_slot
|
// # Correctness
|
||||||
.0
|
//
|
||||||
.lock()
|
// Error slots use a threaded `std::sync::Mutex`, so accessing the slot
|
||||||
.expect("mutex should be unpoisoned");
|
// can block the async task's current thread. We only perform a single
|
||||||
if let Some(original_error) = guard.clone() {
|
// slot update per `Client`, and panic to enforce this constraint.
|
||||||
|
if self.error_slot.try_update_error(e).is_err() {
|
||||||
// This panic typically happens due to these bugs:
|
// This panic typically happens due to these bugs:
|
||||||
// * we mark a connection as failed without using fail_with
|
// * we mark a connection as failed without using fail_with
|
||||||
// * we call fail_with without checking for a failed connection
|
// * we call fail_with without checking for a failed connection
|
||||||
// state
|
// state
|
||||||
|
// * we continue processing messages after calling fail_with
|
||||||
//
|
//
|
||||||
// See the original bug #1510 and PR #1531, and the later bug #1599
|
// See the original bug #1510 and PR #1531, and the later bug #1599
|
||||||
// and PR #1600.
|
// and PR #1600.
|
||||||
panic!(
|
panic!("calling fail_with on already-failed connection state: failed connections must stop processing pending requests and responses, then close the connection. state: {:?} client receiver: {:?}",
|
||||||
"calling fail_with on already-failed connection state: failed connections must stop processing pending requests and responses, then close the connection. state: {:?} original error: {:?} new error: {:?} client receiver: {:?}",
|
|
||||||
self.state,
|
self.state,
|
||||||
original_error,
|
|
||||||
e,
|
|
||||||
self.client_rx
|
self.client_rx
|
||||||
);
|
);
|
||||||
} else {
|
|
||||||
*guard = Some(e);
|
|
||||||
}
|
}
|
||||||
// Drop the guard immediately to release the mutex.
|
|
||||||
std::mem::drop(guard);
|
|
||||||
|
|
||||||
// We want to close the client channel and set State::Failed so
|
// We want to close the client channel and set State::Failed so
|
||||||
// that we can flush any pending client requests. However, we may have
|
// that we can flush any pending client requests. However, we may have
|
||||||
|
|
@ -575,8 +581,14 @@ where
|
||||||
self.client_rx.close();
|
self.client_rx.close();
|
||||||
let old_state = std::mem::replace(&mut self.state, State::Failed);
|
let old_state = std::mem::replace(&mut self.state, State::Failed);
|
||||||
if let State::AwaitingResponse { tx, .. } = old_state {
|
if let State::AwaitingResponse { tx, .. } = old_state {
|
||||||
|
// # Correctness
|
||||||
|
//
|
||||||
// We know the slot has Some(e) because we just set it above,
|
// We know the slot has Some(e) because we just set it above,
|
||||||
// and the error slot is never unset.
|
// and the error slot is never unset.
|
||||||
|
//
|
||||||
|
// Accessing the error slot locks a threaded std::sync::Mutex, which
|
||||||
|
// can block the current async task thread. We briefly lock the mutex
|
||||||
|
// to get a reference to the error.
|
||||||
let e = self.error_slot.try_get_error().unwrap();
|
let e = self.error_slot.try_get_error().unwrap();
|
||||||
let _ = tx.send(Err(e));
|
let _ = tx.send(Err(e));
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::Arc;
|
||||||
|
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
|
||||||
|
|
@ -64,10 +64,25 @@ pub enum PeerError {
|
||||||
NotFound(Vec<InventoryHash>),
|
NotFound(Vec<InventoryHash>),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A shared error slot for peer errors.
|
||||||
|
///
|
||||||
|
/// # Correctness
|
||||||
|
///
|
||||||
|
/// Error slots are shared between sync and async code. In async code, the error
|
||||||
|
/// mutex should be held for as short a time as possible. This avoids blocking
|
||||||
|
/// the async task thread on acquiring the mutex.
|
||||||
#[derive(Default, Clone)]
|
#[derive(Default, Clone)]
|
||||||
pub(super) struct ErrorSlot(pub(super) Arc<Mutex<Option<SharedPeerError>>>);
|
pub(super) struct ErrorSlot(Arc<std::sync::Mutex<Option<SharedPeerError>>>);
|
||||||
|
|
||||||
impl ErrorSlot {
|
impl ErrorSlot {
|
||||||
|
/// Read the current error in the slot.
|
||||||
|
///
|
||||||
|
/// Returns `None` if there is no error in the slot.
|
||||||
|
///
|
||||||
|
/// # Correctness
|
||||||
|
///
|
||||||
|
/// Briefly locks the error slot's threaded `std::sync::Mutex`, to get a
|
||||||
|
/// reference to the error in the slot.
|
||||||
pub fn try_get_error(&self) -> Option<SharedPeerError> {
|
pub fn try_get_error(&self) -> Option<SharedPeerError> {
|
||||||
self.0
|
self.0
|
||||||
.lock()
|
.lock()
|
||||||
|
|
@ -75,8 +90,31 @@ impl ErrorSlot {
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.cloned()
|
.cloned()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Update the current error in the slot.
|
||||||
|
///
|
||||||
|
/// Returns `Err(AlreadyErrored)` if there was already an error in the slot.
|
||||||
|
///
|
||||||
|
/// # Correctness
|
||||||
|
///
|
||||||
|
/// Briefly locks the error slot's threaded `std::sync::Mutex`, to check for
|
||||||
|
/// a previous error, then update the error in the slot.
|
||||||
|
pub fn try_update_error(&self, e: SharedPeerError) -> Result<(), AlreadyErrored> {
|
||||||
|
let mut guard = self.0.lock().expect("error mutex should be unpoisoned");
|
||||||
|
|
||||||
|
if let Some(original_error) = guard.clone() {
|
||||||
|
error!(?original_error, new_error = ?e, "peer connection already errored");
|
||||||
|
Err(AlreadyErrored)
|
||||||
|
} else {
|
||||||
|
*guard = Some(e);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The `ErrorSlot` already contains an error.
|
||||||
|
pub struct AlreadyErrored;
|
||||||
|
|
||||||
/// An error during a handshake with a remote peer.
|
/// An error during a handshake with a remote peer.
|
||||||
#[derive(Error, Debug)]
|
#[derive(Error, Debug)]
|
||||||
pub enum HandshakeError {
|
pub enum HandshakeError {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue