Partially complete heartbeats to peer

This commit is contained in:
Deirdre Connolly 2019-10-18 03:51:57 -04:00 committed by Deirdre Connolly
parent 62e423bad8
commit adffc4239d
4 changed files with 22 additions and 1 deletions

View File

@ -70,7 +70,7 @@ impl Service<Request> for PeerClient {
} }
} }
Ok(()) => { Ok(()) => {
// The reciever end of the oneshot is itself a future. // The receiver end of the oneshot is itself a future.
rx.map(|oneshot_recv_result| { rx.map(|oneshot_recv_result| {
oneshot_recv_result oneshot_recv_result
.expect("ClientRequest oneshot sender must not be dropped before send") .expect("ClientRequest oneshot sender must not be dropped before send")

View File

@ -33,6 +33,9 @@ pub enum PeerError {
/// already complete. /// already complete.
#[error("Remote peer sent handshake messages after handshake")] #[error("Remote peer sent handshake messages after handshake")]
DuplicateHandshake, DuplicateHandshake,
/// A badly-behaved remote peer sent the wrong nonce in response to a heartbeat `Ping`.
#[error("Remote peer sent the wrong heartbeat nonce")]
HeartbeatNonceMismatch,
/// This node's internal services were overloaded, so the connection was dropped /// This node's internal services were overloaded, so the connection was dropped
/// to shed load. /// to shed load.
#[error("Internal services over capacity")] #[error("Internal services over capacity")]

View File

@ -121,6 +121,7 @@ where
Either::Right(((), _peer_fut)) => { Either::Right(((), _peer_fut)) => {
trace!("client request timed out"); trace!("client request timed out");
// Re-matching lets us take ownership of tx // Re-matching lets us take ownership of tx
// XXX check here for Ping heartbeat timeout?
self.state = match self.state { self.state = match self.state {
ServerState::AwaitingResponse(_, tx) => { ServerState::AwaitingResponse(_, tx) => {
let e = PeerError::ClientRequestTimeout; let e = PeerError::ClientRequestTimeout;
@ -216,6 +217,13 @@ where
let _ = tx.send(Ok(Response::Ok)); let _ = tx.send(Ok(Response::Ok));
AwaitingRequest AwaitingRequest
}), }),
(AwaitingRequest, Ping(nonce)) => self
.peer_tx
.send(Message::Ping(nonce))
.await
.map_err(|e| e.into())
.map(|()| AwaitingResponse(Ping(nonce), tx)),
// XXX timeout handling here?
} { } {
Ok(new_state) => { Ok(new_state) => {
self.state = new_state; self.state = new_state;
@ -250,6 +258,12 @@ where
.expect("response oneshot should be unused"); .expect("response oneshot should be unused");
AwaitingRequest AwaitingRequest
} }
(AwaitingResponse(Ping(req_nonce), tx), Message::Pong(res_nonce)) => {
if req_nonce != res_nonce {
self.fail_with(PeerError::HeartbeatNonceMismatch);
}
AwaitingRequest
}
// By default, messages are not responses. // By default, messages are not responses.
(state, msg) => { (state, msg) => {
ignored_msg = Some(msg); ignored_msg = Some(msg);

View File

@ -6,6 +6,8 @@
use crate::meta_addr::MetaAddr; use crate::meta_addr::MetaAddr;
use super::types::Nonce;
/// A network request, represented in internal format. /// A network request, represented in internal format.
#[derive(Clone, Debug, Eq, PartialEq)] #[derive(Clone, Debug, Eq, PartialEq)]
pub enum Request { pub enum Request {
@ -13,6 +15,8 @@ pub enum Request {
GetPeers, GetPeers,
/// Advertises peers to the remote server. /// Advertises peers to the remote server.
PushPeers(Vec<MetaAddr>), PushPeers(Vec<MetaAddr>),
/// Heartbeats triggered on peer connection start.
Ping(Nonce),
} }
/// A response to a network request, represented in internal format. /// A response to a network request, represented in internal format.