From d90e709ce15734014a9e3271dd1054b5282eba2d Mon Sep 17 00:00:00 2001 From: Henry de Valence Date: Mon, 23 Nov 2020 23:52:40 -0800 Subject: [PATCH] network: tidy peer set implementation - rename functions more descriptively - create a common `take_ready_service` function - organize poll_ functions separately --- zebra-network/src/peer_set/set.rs | 189 +++++++++++++++--------------- 1 file changed, 95 insertions(+), 94 deletions(-) diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index fe9d50d7..ad95dc84 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -83,9 +83,9 @@ where { discover: D, /// A preselected index for a ready service. - /// INVARIANT: If `next_idx` is `Some(i)`, `i` must be a valid index for `ready_services`. - /// This means that every change to `ready_services` must invalidate or correct `next_idx`. - next_idx: Option, + /// INVARIANT: If this is `Some(i)`, `i` must be a valid index for `ready_services`. + /// This means that every change to `ready_services` must invalidate or correct it. + preselected_p2c_index: Option, ready_services: IndexMap, cancel_handles: HashMap>, unready_services: FuturesUnordered>, @@ -120,7 +120,7 @@ where ) -> Self { Self { discover, - next_idx: None, + preselected_p2c_index: None, ready_services: IndexMap::new(), cancel_handles: HashMap::new(), unready_services: FuturesUnordered::new(), @@ -131,61 +131,7 @@ where } } - fn poll_discover(&mut self, cx: &mut Context<'_>) -> Poll> { - use futures::ready; - loop { - match ready!(Pin::new(&mut self.discover).poll_discover(cx)) - .ok_or("discovery stream closed")? - .map_err(Into::into)? - { - Change::Remove(key) => { - trace!(?key, "got Change::Remove from Discover"); - self.remove(&key); - } - Change::Insert(key, svc) => { - trace!(?key, "got Change::Insert from Discover"); - self.remove(&key); - self.push_unready(key, svc); - } - } - } - } - - fn remove(&mut self, key: &D::Key) { - if let Some((i, _, _)) = self.ready_services.swap_remove_full(key) { - // swap_remove perturbs the position of the last element of - // ready_services, so we may have invalidated self.next_idx, in - // which case we need to fix it. Specifically, swap_remove swaps the - // position of the removee and the last element, then drops the - // removee from the end, so we compare the active and removed indices: - // - // We just removed one element, so this was the index of the last element. - let last_idx = self.ready_services.len(); - self.next_idx = match self.next_idx { - None => None, // No active index - Some(j) if j == i => None, // We removed j - Some(j) if j == last_idx => Some(i), // We swapped i and j - Some(j) => Some(j), // We swapped an unrelated service. - }; - // No Heisenservices: they must be ready or unready. - assert!(!self.cancel_handles.contains_key(key)); - } else if let Some(handle) = self.cancel_handles.remove(key) { - let _ = handle.send(()); - } - } - - fn push_unready(&mut self, key: D::Key, svc: D::Service) { - let (tx, rx) = oneshot::channel(); - self.cancel_handles.insert(key, tx); - self.unready_services.push(UnreadyService { - key: Some(key), - service: Some(svc), - cancel: rx, - _req: PhantomData, - }); - } - - fn check_for_background_errors(&mut self, cx: &mut Context) -> Result<(), BoxError> { + fn poll_background_errors(&mut self, cx: &mut Context) -> Result<(), BoxError> { if self.guards.is_empty() { match self.handle_rx.try_recv() { Ok(handles) => { @@ -237,8 +183,71 @@ where } } + fn poll_discover(&mut self, cx: &mut Context<'_>) -> Poll> { + use futures::ready; + loop { + match ready!(Pin::new(&mut self.discover).poll_discover(cx)) + .ok_or("discovery stream closed")? + .map_err(Into::into)? + { + Change::Remove(key) => { + trace!(?key, "got Change::Remove from Discover"); + self.remove(&key); + } + Change::Insert(key, svc) => { + trace!(?key, "got Change::Insert from Discover"); + self.remove(&key); + self.push_unready(key, svc); + } + } + } + } + + /// Takes a ready service by key, preserving `preselected_p2c_index` if possible. + fn take_ready_service(&mut self, key: &D::Key) -> Option<(D::Key, D::Service)> { + if let Some((i, key, svc)) = self.ready_services.swap_remove_full(key) { + // swap_remove perturbs the position of the last element of + // ready_services, so we may have invalidated self.next_idx, in + // which case we need to fix it. Specifically, swap_remove swaps the + // position of the removee and the last element, then drops the + // removee from the end, so we compare the active and removed indices: + // + // We just removed one element, so this was the index of the last element. + let last_idx = self.ready_services.len(); + self.preselected_p2c_index = match self.preselected_p2c_index { + None => None, // No active index + Some(j) if j == i => None, // We removed j + Some(j) if j == last_idx => Some(i), // We swapped i and j + Some(j) => Some(j), // We swapped an unrelated service. + }; + // No Heisenservices: they must be ready or unready. + assert!(!self.cancel_handles.contains_key(&key)); + Some((key, svc)) + } else { + None + } + } + + fn remove(&mut self, key: &D::Key) { + if let Some(_) = self.take_ready_service(key) { + } else if let Some(handle) = self.cancel_handles.remove(key) { + let _ = handle.send(()); + } + } + + fn push_unready(&mut self, key: D::Key, svc: D::Service) { + let (tx, rx) = oneshot::channel(); + self.cancel_handles.insert(key, tx); + self.unready_services.push(UnreadyService { + key: Some(key), + service: Some(svc), + cancel: rx, + _req: PhantomData, + }); + } + /// Performs P2C on inner services to select a ready service. - fn select_next_ready_index(&mut self) -> Option { + fn preselect_p2c_index(&mut self) -> Option { match self.ready_services.len() { 0 => None, 1 => Some(0), @@ -248,8 +257,8 @@ where (idxs.index(0), idxs.index(1)) }; - let a_load = self.ready_index_load(a); - let b_load = self.ready_index_load(b); + let a_load = self.query_load(a); + let b_load = self.query_load(b); let selected = if a_load <= b_load { a } else { b }; @@ -261,7 +270,7 @@ where } /// Accesses a ready endpoint by index and returns its current load. - fn ready_index_load(&self, index: usize) -> ::Metric { + fn query_load(&self, index: usize) -> ::Metric { let (_, svc) = self.ready_services.get_index(index).expect("invalid index"); svc.load() } @@ -269,7 +278,7 @@ where /// Routes a request using P2C load-balancing. fn route_p2c(&mut self, req: Request) -> >::Future { let index = self - .next_idx + .preselected_p2c_index .take() .expect("ready service must have valid preselected index"); @@ -290,30 +299,21 @@ where req: Request, hash: InventoryHash, ) -> >::Future { - let candidate_index = self + let peer = self .inventory_registry .peers(&hash) - .find_map(|addr| self.ready_services.get_index_of(addr)); - - match candidate_index { - Some(index) => { - let (key, mut svc) = self - .ready_services - .swap_remove_index(index) - .expect("found index must be valid"); - // We changed ready_services, so next_idx is invalid - self.next_idx = None; + .find(|&key| self.ready_services.contains_key(key)) + .cloned(); + match peer.and_then(|key| self.take_ready_service(&key)) { + Some((key, mut svc)) => { tracing::debug!(?hash, ?key, "routing based on inventory"); let fut = svc.call(req); self.push_unready(key, svc); fut.map_err(Into::into).boxed() } None => { - tracing::debug!( - ?hash, - "could not find ready peer for inventory hash, falling back to p2c" - ); + tracing::debug!(?hash, "no ready peer for inventory, falling back to p2c"); self.route_p2c(req) } } @@ -324,7 +324,7 @@ where // This is not needless: otherwise, we'd hold a &mut reference to self.ready_services, // blocking us from passing &mut self to push_unready. let ready_services = std::mem::take(&mut self.ready_services); - self.next_idx = None; // We changed ready_services, so next_idx is invalid + self.preselected_p2c_index = None; // All services are now unready. let futs = FuturesUnordered::new(); for (key, mut svc) in ready_services { @@ -342,6 +342,15 @@ where } .boxed() } + + fn update_metrics(&self) { + let num_ready = self.ready_services.len(); + let num_unready = self.unready_services.len(); + let num_peers = num_ready + num_unready; + metrics::gauge!("pool.num_ready", num_ready.try_into().unwrap()); + metrics::gauge!("pool.num_unready", num_unready.try_into().unwrap()); + metrics::gauge!("pool.num_peers", num_peers.try_into().unwrap()); + } } impl Service for PeerSet @@ -359,27 +368,19 @@ where Pin> + Send + 'static>>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.check_for_background_errors(cx)?; + self.poll_background_errors(cx)?; // Process peer discovery updates. let _ = self.poll_discover(cx)?; self.inventory_registry.poll_inventory(cx)?; - - // Poll unready services to drive them to readiness. self.poll_unready(cx); - let num_ready = self.ready_services.len(); - let num_unready = self.unready_services.len(); - metrics::gauge!("pool.num_ready", num_ready.try_into().unwrap(),); - metrics::gauge!("pool.num_unready", num_unready.try_into().unwrap(),); - metrics::gauge!( - "pool.num_peers", - (num_ready + num_unready).try_into().unwrap(), - ); + + self.update_metrics(); loop { // Re-check that the pre-selected service is ready, in case // something has happened since (e.g., it failed, peer closed // connection, ...) - if let Some(index) = self.next_idx { + if let Some(index) = self.preselected_p2c_index { let (key, service) = self .ready_services .get_index_mut(index) @@ -406,9 +407,9 @@ where } trace!("preselected service was not ready, reselecting"); - self.next_idx = self.select_next_ready_index(); + self.preselected_p2c_index = self.preselect_p2c_index(); - if self.next_idx.is_none() { + if self.preselected_p2c_index.is_none() { trace!("no ready services, sending demand signal"); let _ = self.demand_signal.try_send(()); return Poll::Pending;