Revert "Stop ignoring some completed Responses" (#3274)

* Revert "Stop ignoring some completed Responses"

This reverts commit 0383562e1098ee2b49a4b5dd1b37646e6512782f from PR #3120,
but keeps the metrics and logging changes since that commit.

* Document why the request handling needs to happen in this order
This commit is contained in:
teor 2021-12-22 00:55:20 +10:00 committed by GitHub
parent a8d2e45e08
commit 8db0528165
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 46 additions and 42 deletions

View File

@ -580,7 +580,7 @@ where
// &mut self. This is a sign that we don't properly // &mut self. This is a sign that we don't properly
// factor the state required for inbound and // factor the state required for inbound and
// outbound requests. // outbound requests.
let mut request_msg = match self.state { let request_msg = match self.state {
State::AwaitingResponse { State::AwaitingResponse {
ref mut handler, .. ref mut handler, ..
} => span.in_scope(|| handler.process_message(peer_msg)), } => span.in_scope(|| handler.process_message(peer_msg)),
@ -593,7 +593,22 @@ where
self.update_state_metrics(None); self.update_state_metrics(None);
// Check whether the handler is finished // # Correctness
//
// Handle any unsolicited messages first, to clear the queue.
// Then check for responses to our request messages.
//
// This significantly reduces our message failure rate.
// (Otherwise, every unsolicited message can disrupt our pending request.)
// If the message was not consumed, check whether it
// should be handled as a request.
if let Some(msg) = request_msg {
// do NOT instrument with the request span, this is
// independent work
self.handle_message_as_request(msg).await;
} else {
// Otherwise, check whether the handler is finished
// processing messages and update the state. // processing messages and update the state.
// //
// Replace the state with a temporary value, // Replace the state with a temporary value,
@ -616,13 +631,12 @@ where
} else { } else {
debug!(error = ?response, "error in peer response to Zebra request"); debug!(error = ?response, "error in peer response to Zebra request");
} }
let _ = tx.send(response.map_err(Into::into)); let _ = tx.send(response.map_err(Into::into));
State::AwaitingRequest State::AwaitingRequest
} }
pending @ State::AwaitingResponse { .. } => { pending @ State::AwaitingResponse { .. } => {
// Drop the un-consumed request message, // Drop the new request message from the remote peer,
// because we can't process multiple messages at the same time. // because we can't process multiple requests at the same time.
debug!( debug!(
new_request = %request_msg new_request = %request_msg
.as_ref() .as_ref()
@ -631,7 +645,6 @@ where
awaiting_response = %pending, awaiting_response = %pending,
"ignoring new request while awaiting a response" "ignoring new request while awaiting a response"
); );
request_msg = None;
pending pending
}, },
_ => unreachable!( _ => unreachable!(
@ -639,15 +652,6 @@ where
self.client_rx self.client_rx
), ),
}; };
self.update_state_metrics(None);
// If the message was not consumed, check whether it
// should be handled as a request.
if let Some(msg) = request_msg {
// do NOT instrument with the request span, this is
// independent work
self.handle_message_as_request(msg).await;
} }
} }
Either::Left((Either::Right(_), _peer_fut)) => { Either::Left((Either::Right(_), _peer_fut)) => {