From 469fa6b917a7395fcd6e0ab2bf0d757790fc4fd7 Mon Sep 17 00:00:00 2001 From: teor Date: Wed, 5 Jan 2022 09:43:30 +1000 Subject: [PATCH] 1. Fix some address crawler timing issues (#3293) * Stop holding completed messages until the next inbound message * Add more info to network message block download debug logs * Simplify address metrics logs * Try handling inbound messages as responses, then try as a new request * Improve address book logging * Fix a race between the first heartbeat and getaddr requests * Temporarily reduce the getaddr fanout to 1 * Update metrics when exiting the Connection run loop * Downgrade some debug logs to trace --- zebra-network/src/address_book.rs | 18 ++ zebra-network/src/address_book_updater.rs | 11 +- zebra-network/src/constants.rs | 9 +- zebra-network/src/peer/connection.rs | 213 ++++++++++-------- zebra-network/src/peer/handshake.rs | 19 +- zebra-network/src/peer_set/initialize.rs | 6 + zebra-network/src/peer_set/set.rs | 4 +- .../src/protocol/external/message.rs | 18 +- .../src/protocol/internal/response.rs | 14 ++ 9 files changed, 207 insertions(+), 105 deletions(-) diff --git a/zebra-network/src/address_book.rs b/zebra-network/src/address_book.rs index 7986b191..5324ce37 100644 --- a/zebra-network/src/address_book.rs +++ b/zebra-network/src/address_book.rs @@ -279,6 +279,7 @@ impl AddressBook { ?previous, total_peers = self.by_addr.len(), recent_peers = self.recently_live_peers(chrono_now).count(), + "calculated updated address book entry", ); if let Some(updated) = updated { @@ -303,6 +304,15 @@ impl AddressBook { self.by_addr.insert(updated.addr, updated); + debug!( + ?change, + ?updated, + ?previous, + total_peers = self.by_addr.len(), + recent_peers = self.recently_live_peers(chrono_now).count(), + "updated address book entry", + ); + // Security: Limit the number of peers in the address book. // // We only delete outdated peers when we have too many peers. @@ -317,6 +327,14 @@ impl AddressBook { .expect("just checked there is at least one peer"); self.by_addr.remove(&surplus_peer.addr); + + debug!( + surplus = ?surplus_peer, + ?updated, + total_peers = self.by_addr.len(), + recent_peers = self.recently_live_peers(chrono_now).count(), + "removed surplus address book entry", + ); } assert!(self.len() <= self.addr_limit); diff --git a/zebra-network/src/address_book_updater.rs b/zebra-network/src/address_book_updater.rs index 294f5d7d..49afe2db 100644 --- a/zebra-network/src/address_book_updater.rs +++ b/zebra-network/src/address_book_updater.rs @@ -49,14 +49,17 @@ impl AddressBookUpdater { // based on the maximum number of inbound and outbound peers. let (worker_tx, mut worker_rx) = mpsc::channel(config.peerset_total_connection_limit()); - let address_book = - AddressBook::new(local_listener, span!(Level::TRACE, "address book updater")); + let address_book = AddressBook::new(local_listener, span!(Level::TRACE, "address book")); let address_metrics = address_book.address_metrics_watcher(); let address_book = Arc::new(std::sync::Mutex::new(address_book)); let worker_address_book = address_book.clone(); let worker = move || { + info!("starting the address book updater"); + while let Some(event) = worker_rx.blocking_recv() { + trace!(?event, "got address book change"); + // # Correctness // // Briefly hold the address book threaded mutex, to update the @@ -67,7 +70,9 @@ impl AddressBookUpdater { .update(event); } - Err(AllAddressBookUpdaterSendersClosed.into()) + let error = Err(AllAddressBookUpdaterSendersClosed.into()); + info!(?error, "stopping address book updater"); + error }; // Correctness: spawn address book accesses on a blocking thread, diff --git a/zebra-network/src/constants.rs b/zebra-network/src/constants.rs index 2dffaeb9..67867f0d 100644 --- a/zebra-network/src/constants.rs +++ b/zebra-network/src/constants.rs @@ -133,14 +133,19 @@ pub const PEER_GET_ADDR_TIMEOUT: Duration = Duration::from_secs(8); /// The number of GetAddr requests sent when crawling for new peers. /// -/// ## SECURITY +/// # Security /// /// The fanout should be greater than 2, so that Zebra avoids getting a majority /// of its initial address book entries from a single peer. /// /// Zebra regularly crawls for new peers, initiating a new crawl every /// [`crawl_new_peer_interval`](crate::config::Config.crawl_new_peer_interval). -pub const GET_ADDR_FANOUT: usize = 3; +/// +/// TODO: Restore the fanout to 3, once fanouts are limited to the number of ready peers (#2214) +/// +/// In #3110, we changed the fanout to 1, to make sure we actually use cached address responses. +/// With a fanout of 3, we were dropping a lot of responses, because the overall crawl timed out. +pub const GET_ADDR_FANOUT: usize = 1; /// The maximum number of addresses allowed in an `addr` or `addrv2` message. /// diff --git a/zebra-network/src/peer/connection.rs b/zebra-network/src/peer/connection.rs index 48fb47de..61d3a63e 100644 --- a/zebra-network/src/peer/connection.rs +++ b/zebra-network/src/peer/connection.rs @@ -525,7 +525,14 @@ where } Either::Left((Some(Err(e)), _)) => self.fail_with(e), Either::Left((Some(Ok(msg)), _)) => { - self.handle_message_as_request(msg).await + let unhandled_msg = self.handle_message_as_request(msg).await; + + if let Some(unhandled_msg) = unhandled_msg { + debug!( + %unhandled_msg, + "ignoring unhandled request while awaiting a request" + ); + } } Either::Right((None, _)) => { trace!("client_rx closed, ending connection"); @@ -593,32 +600,19 @@ where self.update_state_metrics(None); - // # Correctness + // Check whether the handler is finished processing messages, + // and update the state. + // (Some messages can indicate that a response has finished, + // even if the message wasn't consumed as a response or a request.) // - // Handle any unsolicited messages first, to clear the queue. - // Then check for responses to our request messages. - // - // This significantly reduces our message failure rate. - // (Otherwise, every unsolicited message can disrupt our pending request.) - - // If the message was not consumed, check whether it - // should be handled as a request. - if let Some(msg) = request_msg { - // do NOT instrument with the request span, this is - // independent work - self.handle_message_as_request(msg).await; - } else { - // Otherwise, check whether the handler is finished - // processing messages and update the state. - // - // Replace the state with a temporary value, - // so we can take ownership of the response sender. - self.state = match std::mem::replace(&mut self.state, State::Failed) { - State::AwaitingResponse { - handler: Handler::Finished(response), - tx, - .. - } => { + // Replace the state with a temporary value, + // so we can take ownership of the response sender. + self.state = match std::mem::replace(&mut self.state, State::Failed) { + State::AwaitingResponse { + handler: Handler::Finished(response), + tx, + .. + } => { if let Ok(response) = response.as_ref() { debug!(%response, "finished receiving peer response to Zebra request"); // Add a metric for inbound responses to outbound requests. @@ -631,27 +625,36 @@ where } else { debug!(error = ?response, "error in peer response to Zebra request"); } - let _ = tx.send(response.map_err(Into::into)); - State::AwaitingRequest - } - pending @ State::AwaitingResponse { .. } => { - // Drop the new request message from the remote peer, - // because we can't process multiple requests at the same time. - debug!( - new_request = %request_msg - .as_ref() - .map(|m| m.to_string()) - .unwrap_or_else(|| "None".into()), - awaiting_response = %pending, - "ignoring new request while awaiting a response" - ); - pending - }, - _ => unreachable!( - "unexpected failed connection state while AwaitingResponse: client_receiver: {:?}", - self.client_rx - ), - }; + let _ = tx.send(response.map_err(Into::into)); + State::AwaitingRequest + } + pending @ State::AwaitingResponse { .. } => + pending + , + _ => unreachable!( + "unexpected failed connection state while AwaitingResponse: client_receiver: {:?}", + self.client_rx + ), + }; + + self.update_state_metrics(None); + + // If the message was not consumed as a response, + // check whether it can be handled as a request. + let unused_msg = if let Some(request_msg) = request_msg { + // do NOT instrument with the request span, this is + // independent work + self.handle_message_as_request(request_msg).await + } else { + None + }; + + if let Some(unused_msg) = unused_msg { + debug!( + %unused_msg, + %self.state, + "ignoring peer message: not a response or a request", + ); } } Either::Left((Either::Right(_), _peer_fut)) => { @@ -697,10 +700,13 @@ where } } + let error = self.error_slot.try_get_error(); assert!( - self.error_slot.try_get_error().is_some(), + error.is_some(), "closing connections must call fail_with() or shutdown() to set the error slot" ); + + self.update_state_metrics(error.expect("checked is_some").to_string()); } /// Fail this connection. @@ -937,12 +943,15 @@ where }; } + /// Handle `msg` as a request from a peer to this Zebra instance. + /// + /// If the message is not handled, it is returned. // This function has its own span, because we're creating a new work // context (namely, the work of processing the inbound msg as a request) - #[instrument(name = "msg_as_req", skip(self, msg), fields(%msg))] - async fn handle_message_as_request(&mut self, msg: Message) { + #[instrument(name = "msg_as_req", skip(self, msg), fields(msg = msg.command()))] + async fn handle_message_as_request(&mut self, msg: Message) -> Option { trace!(?msg); - debug!(state = %self.state, %msg, "received peer request to Zebra"); + debug!(state = %self.state, %msg, "received inbound peer message"); self.update_state_metrics(format!("In::Msg::{}", msg.command())); @@ -952,40 +961,40 @@ where if let Err(e) = self.peer_tx.send(Message::Pong(nonce)).await { self.fail_with(e); } - return; + None } // These messages shouldn't be sent outside of a handshake. Message::Version { .. } => { self.fail_with(PeerError::DuplicateHandshake); - return; + None } Message::Verack { .. } => { self.fail_with(PeerError::DuplicateHandshake); - return; + None } // These messages should already be handled as a response if they // could be a response, so if we see them here, they were either // sent unsolicited, or they were sent in response to a canceled request // that we've already forgotten about. Message::Reject { .. } => { - tracing::debug!("got reject message unsolicited or from canceled request"); - return; + debug!(%msg, "got reject message unsolicited or from canceled request"); + None } Message::NotFound { .. } => { - tracing::debug!("got notfound message unsolicited or from canceled request"); - return; + debug!(%msg, "got notfound message unsolicited or from canceled request"); + None } Message::Pong(_) => { - tracing::debug!("got pong message unsolicited or from canceled request"); - return; + debug!(%msg, "got pong message unsolicited or from canceled request"); + None } Message::Block(_) => { - tracing::debug!("got block message unsolicited or from canceled request"); - return; + debug!(%msg, "got block message unsolicited or from canceled request"); + None } Message::Headers(_) => { - tracing::debug!("got headers message unsolicited or from canceled request"); - return; + debug!(%msg, "got headers message unsolicited or from canceled request"); + None } // These messages should never be sent by peers. Message::FilterLoad { .. } @@ -998,45 +1007,45 @@ where // // Since we can't verify their source, Zebra needs to ignore unexpected messages, // because closing the connection could cause a denial of service or eclipse attack. - debug!("got BIP111 message without advertising NODE_BLOOM"); - return; + debug!(%msg, "got BIP111 message without advertising NODE_BLOOM"); + None } // Zebra crawls the network proactively, to prevent // peers from inserting data into our address book. Message::Addr(_) => { - trace!("ignoring unsolicited addr message"); - return; + debug!(%msg, "ignoring unsolicited addr message"); + None } - Message::Tx(transaction) => Request::PushTransaction(transaction), - Message::Inv(items) => match &items[..] { + Message::Tx(ref transaction) => Some(Request::PushTransaction(transaction.clone())), + Message::Inv(ref items) => match &items[..] { // We don't expect to be advertised multiple blocks at a time, // so we ignore any advertisements of multiple blocks. - [InventoryHash::Block(hash)] => Request::AdvertiseBlock(*hash), + [InventoryHash::Block(hash)] => Some(Request::AdvertiseBlock(*hash)), // Some peers advertise invs with mixed item types. // But we're just interested in the transaction invs. // // TODO: split mixed invs into multiple requests, // but skip runs of multiple blocks. - tx_ids if tx_ids.iter().any(|item| item.unmined_tx_id().is_some()) => { - Request::AdvertiseTransactionIds(transaction_ids(&items).collect()) - } + tx_ids if tx_ids.iter().any(|item| item.unmined_tx_id().is_some()) => Some( + Request::AdvertiseTransactionIds(transaction_ids(items).collect()), + ), // Log detailed messages for ignored inv advertisement messages. [] => { - debug!("ignoring empty inv"); - return; + debug!(%msg, "ignoring empty inv"); + None } [InventoryHash::Block(_), InventoryHash::Block(_), ..] => { - debug!("ignoring inv with multiple blocks"); - return; + debug!(%msg, "ignoring inv with multiple blocks"); + None } _ => { - debug!("ignoring inv with no transactions"); - return; + debug!(%msg, "ignoring inv with no transactions"); + None } }, - Message::GetData(items) => match &items[..] { + Message::GetData(ref items) => match &items[..] { // Some peers advertise invs with mixed item types. // So we suspect they might do the same with getdata. // @@ -1050,31 +1059,47 @@ where .iter() .any(|item| matches!(item, InventoryHash::Block(_))) => { - Request::BlocksByHash(block_hashes(&items).collect()) + Some(Request::BlocksByHash(block_hashes(items).collect())) } tx_ids if tx_ids.iter().any(|item| item.unmined_tx_id().is_some()) => { - Request::TransactionsById(transaction_ids(&items).collect()) + Some(Request::TransactionsById(transaction_ids(items).collect())) } // Log detailed messages for ignored getdata request messages. [] => { - debug!("ignoring empty getdata"); - return; + debug!(%msg, "ignoring empty getdata"); + None } _ => { - debug!("ignoring getdata with no blocks or transactions"); - return; + debug!(%msg, "ignoring getdata with no blocks or transactions"); + None } }, - Message::GetAddr => Request::Peers, - Message::GetBlocks { known_blocks, stop } => Request::FindBlocks { known_blocks, stop }, - Message::GetHeaders { known_blocks, stop } => { - Request::FindHeaders { known_blocks, stop } - } - Message::Mempool => Request::MempoolTransactionIds, + Message::GetAddr => Some(Request::Peers), + Message::GetBlocks { + ref known_blocks, + stop, + } => Some(Request::FindBlocks { + known_blocks: known_blocks.clone(), + stop, + }), + Message::GetHeaders { + ref known_blocks, + stop, + } => Some(Request::FindHeaders { + known_blocks: known_blocks.clone(), + stop, + }), + Message::Mempool => Some(Request::MempoolTransactionIds), }; - self.drive_peer_request(req).await + if let Some(req) = req { + self.drive_peer_request(req).await; + None + } else { + // return the unused message + Some(msg) + } } /// Given a `req` originating from the peer, drive it to completion and send diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index 30c39897..512e1d8d 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -11,7 +11,12 @@ use std::{ use chrono::{TimeZone, Utc}; use futures::{channel::oneshot, future, FutureExt, SinkExt, StreamExt}; -use tokio::{net::TcpStream, sync::broadcast, task::JoinError, time::timeout}; +use tokio::{ + net::TcpStream, + sync::broadcast, + task::JoinError, + time::{timeout, Instant}, +}; use tokio_stream::wrappers::IntervalStream; use tokio_util::codec::Framed; use tower::Service; @@ -978,8 +983,16 @@ async fn send_periodic_heartbeats( ) { use futures::future::Either; - let mut interval_stream = - IntervalStream::new(tokio::time::interval(constants::HEARTBEAT_INTERVAL)); + // Don't send the first heartbeat immediately - we've just completed the handshake! + let mut interval = tokio::time::interval_at( + Instant::now() + constants::HEARTBEAT_INTERVAL, + constants::HEARTBEAT_INTERVAL, + ); + // If the heartbeat is delayed, also delay all future heartbeats. + // (Shorter heartbeat intervals just add load, without any benefit.) + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + + let mut interval_stream = IntervalStream::new(interval); loop { let shutdown_rx_ref = Pin::new(&mut shutdown_rx); diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index 04b44211..7406280e 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -650,6 +650,12 @@ where // - use the `select!` macro for all actions, because the `select` function // is biased towards the first ready future + info!( + crawl_new_peer_interval = ?config.crawl_new_peer_interval, + outbound_connections = ?active_outbound_connections.update_count(), + "starting the peer address crawler", + ); + let mut handshakes = FuturesUnordered::new(); // returns None when empty. // Keeping an unresolved future in the pool means the stream diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index 1d33d93d..48f96baf 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -777,7 +777,7 @@ where self.last_peer_log = Some(Instant::now()); - let address_metrics = self.address_metrics.borrow(); + let address_metrics = *self.address_metrics.borrow(); if unready_services_len == 0 { warn!( ?address_metrics, @@ -804,7 +804,7 @@ where // Security: make sure we haven't exceeded the connection limit if num_peers > self.peerset_total_connection_limit { - let address_metrics = self.address_metrics.borrow(); + let address_metrics = *self.address_metrics.borrow(); panic!( "unexpectedly exceeded configured peer set connection limit: \n\ peers: {:?}, ready: {:?}, unready: {:?}, \n\ diff --git a/zebra-network/src/protocol/external/message.rs b/zebra-network/src/protocol/external/message.rs index e606a018..a3674666 100644 --- a/zebra-network/src/protocol/external/message.rs +++ b/zebra-network/src/protocol/external/message.rs @@ -388,8 +388,10 @@ impl fmt::Display for Message { user_agent, ), Message::Verack => "verack".to_string(), + Message::Ping(_) => "ping".to_string(), Message::Pong(_) => "pong".to_string(), + Message::Reject { message, reason, @@ -401,25 +403,39 @@ impl fmt::Display for Message { reason, if data.is_some() { "Some" } else { "None" }, ), + Message::GetAddr => "getaddr".to_string(), Message::Addr(addrs) => format!("addr {{ addrs: {} }}", addrs.len()), + Message::GetBlocks { known_blocks, stop } => format!( "getblocks {{ known_blocks: {}, stop: {} }}", known_blocks.len(), if stop.is_some() { "Some" } else { "None" }, ), Message::Inv(invs) => format!("inv {{ invs: {} }}", invs.len()), + Message::GetHeaders { known_blocks, stop } => format!( "getheaders {{ known_blocks: {}, stop: {} }}", known_blocks.len(), if stop.is_some() { "Some" } else { "None" }, ), Message::Headers(headers) => format!("headers {{ headers: {} }}", headers.len()), + Message::GetData(invs) => format!("getdata {{ invs: {} }}", invs.len()), - Message::Block(_) => "block".to_string(), + Message::Block(block) => format!( + "block {{ height: {}, hash: {} }}", + block + .coinbase_height() + .as_ref() + .map(|h| h.0.to_string()) + .unwrap_or_else(|| "None".into()), + block.hash(), + ), Message::Tx(_) => "tx".to_string(), Message::NotFound(invs) => format!("notfound {{ invs: {} }}", invs.len()), + Message::Mempool => "mempool".to_string(), + Message::FilterLoad { .. } => "filterload".to_string(), Message::FilterAdd { .. } => "filteradd".to_string(), Message::FilterClear => "filterclear".to_string(), diff --git a/zebra-network/src/protocol/internal/response.rs b/zebra-network/src/protocol/internal/response.rs index ac14ef13..df0f5639 100644 --- a/zebra-network/src/protocol/internal/response.rs +++ b/zebra-network/src/protocol/internal/response.rs @@ -79,7 +79,21 @@ impl fmt::Display for Response { Response::Peers(peers) => format!("Peers {{ peers: {} }}", peers.len()), + // Display heights for single-block responses (which Zebra requests and expects) + Response::Blocks(blocks) if blocks.len() == 1 => { + let block = blocks.first().expect("len is 1"); + format!( + "Block {{ height: {}, hash: {} }}", + block + .coinbase_height() + .as_ref() + .map(|h| h.0.to_string()) + .unwrap_or_else(|| "None".into()), + block.hash(), + ) + } Response::Blocks(blocks) => format!("Blocks {{ blocks: {} }}", blocks.len()), + Response::BlockHashes(hashes) => format!("BlockHashes {{ hashes: {} }}", hashes.len()), Response::BlockHeaders(headers) => { format!("BlockHeaders {{ headers: {} }}", headers.len())