Add peer set tracing and unreachable panics (#1468)
Add some extra tracing and panics to double-check our assumptions about the peer set state machine.
This commit is contained in:
parent
2d1698a120
commit
8e2f08221f
|
|
@ -106,6 +106,7 @@ where
|
||||||
// existing peers, but we don't make too many because update may be
|
// existing peers, but we don't make too many because update may be
|
||||||
// called while the peer set is already loaded.
|
// called while the peer set is already loaded.
|
||||||
let mut responses = FuturesUnordered::new();
|
let mut responses = FuturesUnordered::new();
|
||||||
|
trace!("sending GetPeers requests");
|
||||||
// Yes this loops only once (for now), until we add fanout back.
|
// Yes this loops only once (for now), until we add fanout back.
|
||||||
for _ in 0..1usize {
|
for _ in 0..1usize {
|
||||||
self.peer_service.ready_and().await?;
|
self.peer_service.ready_and().await?;
|
||||||
|
|
@ -115,7 +116,7 @@ where
|
||||||
if let Ok(Response::Peers(addrs)) = rsp {
|
if let Ok(Response::Peers(addrs)) = rsp {
|
||||||
let addr_len = addrs.len();
|
let addr_len = addrs.len();
|
||||||
let prev_len = self.gossiped.len();
|
let prev_len = self.gossiped.len();
|
||||||
// Filter new addresses to ensure that gossiped
|
// Filter new addresses to ensure that gossiped addresses are actually new
|
||||||
let failed = &self.failed;
|
let failed = &self.failed;
|
||||||
let peer_set = &self.peer_set;
|
let peer_set = &self.peer_set;
|
||||||
let new_addrs = addrs
|
let new_addrs = addrs
|
||||||
|
|
@ -151,6 +152,11 @@ where
|
||||||
metrics::gauge!("candidate_set.disconnected", self.disconnected.len() as i64);
|
metrics::gauge!("candidate_set.disconnected", self.disconnected.len() as i64);
|
||||||
metrics::gauge!("candidate_set.gossiped", self.gossiped.len() as i64);
|
metrics::gauge!("candidate_set.gossiped", self.gossiped.len() as i64);
|
||||||
metrics::gauge!("candidate_set.failed", self.failed.len() as i64);
|
metrics::gauge!("candidate_set.failed", self.failed.len() as i64);
|
||||||
|
debug!(
|
||||||
|
disconnected_peers = self.disconnected.len(),
|
||||||
|
gossiped_peers = self.gossiped.len(),
|
||||||
|
failed_peers = self.failed.len()
|
||||||
|
);
|
||||||
let guard = self.peer_set.lock().unwrap();
|
let guard = self.peer_set.lock().unwrap();
|
||||||
self.disconnected
|
self.disconnected
|
||||||
.drain_oldest()
|
.drain_oldest()
|
||||||
|
|
|
||||||
|
|
@ -314,9 +314,10 @@ where
|
||||||
let _ = demand_tx.try_send(());
|
let _ = demand_tx.try_send(());
|
||||||
}
|
}
|
||||||
Right((Some(Ok(change)), _)) => {
|
Right((Some(Ok(change)), _)) => {
|
||||||
// in fact all changes are Insert so this branch is always taken
|
|
||||||
if let Change::Insert(ref addr, _) = change {
|
if let Change::Insert(ref addr, _) = change {
|
||||||
debug!(candidate.addr = ?addr, "successfully dialed new peer");
|
debug!(candidate.addr = ?addr, "successfully dialed new peer");
|
||||||
|
} else {
|
||||||
|
unreachable!("unexpected handshake result: all changes should be Insert");
|
||||||
}
|
}
|
||||||
success_tx.send(Ok(change)).await?;
|
success_tx.send(Ok(change)).await?;
|
||||||
}
|
}
|
||||||
|
|
@ -328,9 +329,18 @@ where
|
||||||
// turned into a connection, so add it back:
|
// turned into a connection, so add it back:
|
||||||
let _ = demand_tx.try_send(());
|
let _ = demand_tx.try_send(());
|
||||||
}
|
}
|
||||||
// If we don't match one of these patterns, shutdown.
|
// We don't expect to see these patterns during normal operation
|
||||||
_ => break,
|
Left((Left((None, _)), _)) => {
|
||||||
|
unreachable!("demand_rx never fails, because we hold demand_tx");
|
||||||
|
}
|
||||||
|
Left((Right((None, _)), _)) => {
|
||||||
|
unreachable!("crawl_timer never terminates");
|
||||||
|
}
|
||||||
|
Right((None, _)) => {
|
||||||
|
unreachable!(
|
||||||
|
"handshakes never terminates, because it contains a future that never resolves"
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue