T2. Add isolated Tor connection API, but don't enable it by default (#3303)
* Add arti as a zebra-network dependency * Add a method for isolated anonymised Tor connections to a specific hostname * Add tests for isolated tor connections * Use a shared tor client instance for all isolated connections * Silence a spurious tor warning in tests * Make tor support optional, activate it via a new "tor" feature * Extra Cargo.lock changes * fastmod AsyncReadWrite PeerTransport zebra* * Remove unnecessary PeerTransport generics * Refactor common test code into a function * Don't drop the stream until the end of the test Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
This commit is contained in:
parent
a1f4cec0de
commit
499ae89c80
File diff suppressed because it is too large
Load Diff
|
|
@ -68,6 +68,9 @@ skip-tree = [
|
||||||
|
|
||||||
# wait for lots of crates in the tokio ecosystem to upgrade
|
# wait for lots of crates in the tokio ecosystem to upgrade
|
||||||
{ name = "socket2", version = "=0.3.16" },
|
{ name = "socket2", version = "=0.3.16" },
|
||||||
|
|
||||||
|
# wait for arti to stabilise
|
||||||
|
{ name = "arti-client" },
|
||||||
]
|
]
|
||||||
|
|
||||||
# This section is considered when running `cargo deny check sources`.
|
# This section is considered when running `cargo deny check sources`.
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,10 @@ edition = "2021"
|
||||||
|
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
|
[features]
|
||||||
|
default = []
|
||||||
|
tor = ["arti-client", "tor-rtcompat"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
bitflags = "1.2"
|
bitflags = "1.2"
|
||||||
byteorder = "1.4"
|
byteorder = "1.4"
|
||||||
|
|
@ -32,6 +36,10 @@ tracing = "0.1"
|
||||||
tracing-futures = "0.2"
|
tracing-futures = "0.2"
|
||||||
tracing-error = { version = "0.1.2", features = ["traced-error"] }
|
tracing-error = { version = "0.1.2", features = ["traced-error"] }
|
||||||
|
|
||||||
|
# tor dependencies
|
||||||
|
arti-client = { version = "0.0.2", optional = true }
|
||||||
|
tor-rtcompat = { version = "0.0.2", optional = true }
|
||||||
|
|
||||||
zebra-chain = { path = "../zebra-chain" }
|
zebra-chain = { path = "../zebra-chain" }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
|
|
|
||||||
|
|
@ -101,14 +101,19 @@ impl Config {
|
||||||
self.peerset_outbound_connection_limit() + self.peerset_inbound_connection_limit()
|
self.peerset_outbound_connection_limit() + self.peerset_inbound_connection_limit()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the initial seed peers based on the configured network.
|
/// Returns the initial seed peer hostnames for the configured network.
|
||||||
pub async fn initial_peers(&self) -> HashSet<SocketAddr> {
|
pub fn initial_peer_hostnames(&self) -> &HashSet<String> {
|
||||||
match self.network {
|
match self.network {
|
||||||
Network::Mainnet => Config::resolve_peers(&self.initial_mainnet_peers).await,
|
Network::Mainnet => &self.initial_mainnet_peers,
|
||||||
Network::Testnet => Config::resolve_peers(&self.initial_testnet_peers).await,
|
Network::Testnet => &self.initial_testnet_peers,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Resolve initial seed peer IP addresses, based on the configured network.
|
||||||
|
pub async fn initial_peers(&self) -> HashSet<SocketAddr> {
|
||||||
|
Config::resolve_peers(self.initial_peer_hostnames()).await
|
||||||
|
}
|
||||||
|
|
||||||
/// Concurrently resolves `peers` into zero or more IP addresses, with a
|
/// Concurrently resolves `peers` into zero or more IP addresses, with a
|
||||||
/// timeout of a few seconds on each DNS request.
|
/// timeout of a few seconds on each DNS request.
|
||||||
///
|
///
|
||||||
|
|
|
||||||
|
|
@ -17,6 +17,9 @@ use crate::{
|
||||||
BoxError, Config, Request, Response,
|
BoxError, Config, Request, Response,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
#[cfg(feature = "tor")]
|
||||||
|
pub(crate) mod tor;
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests;
|
mod tests;
|
||||||
|
|
||||||
|
|
@ -44,13 +47,13 @@ mod tests;
|
||||||
/// or a Tor client [`DataStream`].
|
/// or a Tor client [`DataStream`].
|
||||||
///
|
///
|
||||||
/// - `user_agent`: a valid BIP14 user-agent, e.g., the empty string.
|
/// - `user_agent`: a valid BIP14 user-agent, e.g., the empty string.
|
||||||
pub fn connect_isolated<AsyncReadWrite>(
|
pub fn connect_isolated<PeerTransport>(
|
||||||
network: Network,
|
network: Network,
|
||||||
data_stream: AsyncReadWrite,
|
data_stream: PeerTransport,
|
||||||
user_agent: String,
|
user_agent: String,
|
||||||
) -> impl Future<Output = Result<BoxService<Request, Response, BoxError>, BoxError>>
|
) -> impl Future<Output = Result<BoxService<Request, Response, BoxError>, BoxError>>
|
||||||
where
|
where
|
||||||
AsyncReadWrite: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
PeerTransport: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
||||||
{
|
{
|
||||||
let config = Config {
|
let config = Config {
|
||||||
network,
|
network,
|
||||||
|
|
@ -91,6 +94,8 @@ where
|
||||||
///
|
///
|
||||||
/// Transactions sent over this connection can be linked to the sending and receiving IP address
|
/// Transactions sent over this connection can be linked to the sending and receiving IP address
|
||||||
/// by passive internet observers.
|
/// by passive internet observers.
|
||||||
|
///
|
||||||
|
/// Prefer [`connect_isolated_run_tor`](tor::connect_isolated_run_tor) if available.
|
||||||
pub fn connect_isolated_tcp_direct(
|
pub fn connect_isolated_tcp_direct(
|
||||||
network: Network,
|
network: Network,
|
||||||
addr: SocketAddr,
|
addr: SocketAddr,
|
||||||
|
|
|
||||||
|
|
@ -49,58 +49,7 @@ async fn connect_isolated_sends_anonymised_version_message_tcp_net(network: Netw
|
||||||
let mut inbound_stream =
|
let mut inbound_stream =
|
||||||
Framed::new(inbound_conn, Codec::builder().for_network(network).finish());
|
Framed::new(inbound_conn, Codec::builder().for_network(network).finish());
|
||||||
|
|
||||||
// We don't need to send any bytes to get a version message.
|
check_version_message(network, &mut inbound_stream).await;
|
||||||
if let Message::Version {
|
|
||||||
version,
|
|
||||||
services,
|
|
||||||
timestamp,
|
|
||||||
address_recv,
|
|
||||||
address_from,
|
|
||||||
nonce: _,
|
|
||||||
user_agent,
|
|
||||||
start_height,
|
|
||||||
relay,
|
|
||||||
} = inbound_stream
|
|
||||||
.next()
|
|
||||||
.await
|
|
||||||
.expect("stream item")
|
|
||||||
.expect("item is Ok(msg)")
|
|
||||||
{
|
|
||||||
// Check that the version message sent by connect_isolated
|
|
||||||
// anonymises all the fields that it possibly can.
|
|
||||||
//
|
|
||||||
// The version field needs to be accurate, because it controls protocol features.
|
|
||||||
// The nonce must be randomised for security.
|
|
||||||
//
|
|
||||||
// SECURITY TODO: check if the timestamp field can be zeroed, to remove another distinguisher (#3300)
|
|
||||||
|
|
||||||
let mut fixed_isolated_addr: SocketAddr = "0.0.0.0:0".parse().unwrap();
|
|
||||||
fixed_isolated_addr.set_port(network.default_port());
|
|
||||||
|
|
||||||
// Required fields should be accurate and match most other peers.
|
|
||||||
// (We can't test nonce randomness here.)
|
|
||||||
assert_eq!(version, CURRENT_NETWORK_PROTOCOL_VERSION);
|
|
||||||
assert_eq!(timestamp.timestamp() % (5 * 60), 0);
|
|
||||||
|
|
||||||
// Other fields should be empty or zeroed.
|
|
||||||
assert_eq!(services, PeerServices::empty());
|
|
||||||
assert_eq!(
|
|
||||||
address_recv,
|
|
||||||
// Since we're connecting to the peer, we expect it to have the node flag.
|
|
||||||
//
|
|
||||||
// SECURITY TODO: should this just be zeroed anyway? (#3300)
|
|
||||||
AddrInVersion::new(fixed_isolated_addr, PeerServices::NODE_NETWORK),
|
|
||||||
);
|
|
||||||
assert_eq!(
|
|
||||||
address_from,
|
|
||||||
AddrInVersion::new(fixed_isolated_addr, PeerServices::empty()),
|
|
||||||
);
|
|
||||||
assert_eq!(user_agent, "");
|
|
||||||
assert_eq!(start_height.0, 0);
|
|
||||||
assert!(!relay);
|
|
||||||
} else {
|
|
||||||
panic!("handshake did not send version message");
|
|
||||||
}
|
|
||||||
|
|
||||||
// Let the spawned task run for a short time.
|
// Let the spawned task run for a short time.
|
||||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||||
|
|
@ -125,7 +74,7 @@ async fn connect_isolated_sends_anonymised_version_message_tcp_net(network: Netw
|
||||||
/// when sent in-memory.
|
/// when sent in-memory.
|
||||||
///
|
///
|
||||||
/// This test also:
|
/// This test also:
|
||||||
/// - checks `AsyncReadWrite` support, and
|
/// - checks `PeerTransport` support, and
|
||||||
/// - runs even if network tests are disabled.
|
/// - runs even if network tests are disabled.
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn connect_isolated_sends_anonymised_version_message_mem() {
|
async fn connect_isolated_sends_anonymised_version_message_mem() {
|
||||||
|
|
@ -147,6 +96,36 @@ async fn connect_isolated_sends_anonymised_version_message_mem_net(network: Netw
|
||||||
Codec::builder().for_network(network).finish(),
|
Codec::builder().for_network(network).finish(),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
check_version_message(network, &mut inbound_stream).await;
|
||||||
|
|
||||||
|
// Let the spawned task run for a short time.
|
||||||
|
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||||
|
|
||||||
|
// Make sure that the isolated connection did not:
|
||||||
|
// - panic, or
|
||||||
|
// - return a service.
|
||||||
|
//
|
||||||
|
// This test doesn't send a version message on `inbound_conn`,
|
||||||
|
// so providing a service is incorrect behaviour.
|
||||||
|
// (But a timeout error would be acceptable.)
|
||||||
|
let outbound_result = futures::poll!(&mut outbound_join_handle);
|
||||||
|
assert!(matches!(
|
||||||
|
outbound_result,
|
||||||
|
Poll::Pending | Poll::Ready(Ok(Err(_)))
|
||||||
|
));
|
||||||
|
|
||||||
|
outbound_join_handle.abort();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Wait to receive a version message on `inbound_stream`,
|
||||||
|
/// then check that it is correctly anonymised.
|
||||||
|
#[track_caller]
|
||||||
|
async fn check_version_message<PeerTransport>(
|
||||||
|
network: Network,
|
||||||
|
inbound_stream: &mut Framed<PeerTransport, Codec>,
|
||||||
|
) where
|
||||||
|
PeerTransport: AsyncRead + Unpin,
|
||||||
|
{
|
||||||
// We don't need to send any bytes to get a version message.
|
// We don't need to send any bytes to get a version message.
|
||||||
if let Message::Version {
|
if let Message::Version {
|
||||||
version,
|
version,
|
||||||
|
|
@ -199,22 +178,4 @@ async fn connect_isolated_sends_anonymised_version_message_mem_net(network: Netw
|
||||||
} else {
|
} else {
|
||||||
panic!("handshake did not send version message");
|
panic!("handshake did not send version message");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Let the spawned task run for a short time.
|
|
||||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
|
||||||
|
|
||||||
// Make sure that the isolated connection did not:
|
|
||||||
// - panic, or
|
|
||||||
// - return a service.
|
|
||||||
//
|
|
||||||
// This test doesn't send a version message on `inbound_conn`,
|
|
||||||
// so providing a service is incorrect behaviour.
|
|
||||||
// (But a timeout error would be acceptable.)
|
|
||||||
let outbound_result = futures::poll!(&mut outbound_join_handle);
|
|
||||||
assert!(matches!(
|
|
||||||
outbound_result,
|
|
||||||
Poll::Pending | Poll::Ready(Ok(Err(_)))
|
|
||||||
));
|
|
||||||
|
|
||||||
outbound_join_handle.abort();
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,91 @@
|
||||||
|
//! Uses tor to create isolated and anonymised connections to specific peers.
|
||||||
|
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
|
use arti_client::{TorAddr, TorClient, TorClientConfig};
|
||||||
|
use tor_rtcompat::tokio::TokioRuntimeHandle;
|
||||||
|
use tower::util::BoxService;
|
||||||
|
|
||||||
|
use zebra_chain::parameters::Network;
|
||||||
|
|
||||||
|
use crate::{connect_isolated, BoxError, Request, Response};
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests;
|
||||||
|
|
||||||
|
lazy_static::lazy_static! {
|
||||||
|
/// The shared isolated [`TorClient`] instance.
|
||||||
|
///
|
||||||
|
/// TODO: turn this into a tower service that takes a hostname, and returns an `arti_client::DataStream`
|
||||||
|
/// (or a task that updates a watch channel when it's done?)
|
||||||
|
pub static ref SHARED_TOR_CLIENT: Arc<Mutex<Option<TorClient<TokioRuntimeHandle>>>> =
|
||||||
|
Arc::new(Mutex::new(None));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Creates a Zcash peer connection to `hostname` via Tor.
|
||||||
|
/// This connection is completely isolated from all other node state.
|
||||||
|
///
|
||||||
|
/// See [`connect_isolated`] for details.
|
||||||
|
///
|
||||||
|
/// # Privacy
|
||||||
|
///
|
||||||
|
/// The sender IP address is anonymised using Tor.
|
||||||
|
/// But transactions sent over this connection can still be linked to the receiving IP address
|
||||||
|
/// by passive internet observers.
|
||||||
|
/// This happens because the Zcash network protocol uses unencrypted TCP connections.
|
||||||
|
///
|
||||||
|
/// `hostname` should be a DNS name for the Tor exit to look up, or a hard-coded IP address.
|
||||||
|
/// If the application does a local DNS lookup on a hostname, and passes the IP address to this function,
|
||||||
|
/// passive internet observers can link the hostname to the sender's IP address.
|
||||||
|
///
|
||||||
|
/// For details, see
|
||||||
|
/// [`TorAddr`](https://tpo.pages.torproject.net/core/doc/rust/arti_client/struct.TorAddr.html).
|
||||||
|
pub async fn connect_isolated_tor(
|
||||||
|
network: Network,
|
||||||
|
hostname: String,
|
||||||
|
user_agent: String,
|
||||||
|
) -> Result<BoxService<Request, Response, BoxError>, BoxError> {
|
||||||
|
let addr = TorAddr::from(hostname)?;
|
||||||
|
|
||||||
|
// Initialize or clone the shared tor client instance
|
||||||
|
let tor_client = match cloned_tor_client() {
|
||||||
|
Some(tor_client) => tor_client,
|
||||||
|
None => new_tor_client().await?,
|
||||||
|
};
|
||||||
|
|
||||||
|
let tor_stream = tor_client.connect(addr, None).await?;
|
||||||
|
|
||||||
|
connect_isolated(network, tor_stream, user_agent).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns a new tor client instance, and updates [`SHARED_TOR_CLIENT`].
|
||||||
|
///
|
||||||
|
/// If there is a bootstrap error, [`SHARED_TOR_CLIENT`] is not modified.
|
||||||
|
async fn new_tor_client() -> Result<TorClient<TokioRuntimeHandle>, BoxError> {
|
||||||
|
let runtime = tokio::runtime::Handle::current();
|
||||||
|
let runtime = TokioRuntimeHandle::new(runtime);
|
||||||
|
let tor_client = TorClient::bootstrap(runtime, TorClientConfig::default()).await?;
|
||||||
|
|
||||||
|
// # Correctness
|
||||||
|
//
|
||||||
|
// It is ok for multiple tasks to race, because all tor clients have identical configs.
|
||||||
|
// And all connections are isolated, regardless of whether they use a new or cloned client.
|
||||||
|
// (Any replaced clients will be dropped.)
|
||||||
|
let mut shared_tor_client = SHARED_TOR_CLIENT
|
||||||
|
.lock()
|
||||||
|
.expect("panic in shared tor client mutex guard");
|
||||||
|
*shared_tor_client = Some(tor_client.isolated_client());
|
||||||
|
|
||||||
|
Ok(tor_client)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns an isolated tor client instance by cloning [`SHARED_TOR_CLIENT`].
|
||||||
|
///
|
||||||
|
/// If [`new_tor_client`] has not run successfully yet, returns `None`.
|
||||||
|
fn cloned_tor_client() -> Option<TorClient<TokioRuntimeHandle>> {
|
||||||
|
SHARED_TOR_CLIENT
|
||||||
|
.lock()
|
||||||
|
.expect("panic in shared tor client mutex guard")
|
||||||
|
.as_ref()
|
||||||
|
.map(TorClient::isolated_client)
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,3 @@
|
||||||
|
//! Tests for isolated Tor connections.
|
||||||
|
|
||||||
|
mod vectors;
|
||||||
|
|
@ -0,0 +1,92 @@
|
||||||
|
//! Fixed test vectors for isolated Zebra connections.
|
||||||
|
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use crate::Config;
|
||||||
|
|
||||||
|
use super::super::*;
|
||||||
|
|
||||||
|
use futures::stream::FuturesUnordered;
|
||||||
|
use tokio_stream::StreamExt;
|
||||||
|
use Network::*;
|
||||||
|
|
||||||
|
/// The maximum allowed test runtime.
|
||||||
|
const MAX_TEST_DURATION: Duration = Duration::from_secs(20);
|
||||||
|
|
||||||
|
/// Test that `connect_isolated` doesn't panic when used over Tor.
|
||||||
|
///
|
||||||
|
/// (We can't connect to ourselves over Tor, so there's not much more we can do here.)
|
||||||
|
#[tokio::test]
|
||||||
|
async fn connect_isolated_run_tor_once() {
|
||||||
|
zebra_test::init();
|
||||||
|
|
||||||
|
if zebra_test::net::zebra_skip_network_tests() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// These tests might take a long time on machines where Tor is censored.
|
||||||
|
|
||||||
|
// Pick a mainnet seeder hostname, it doesn't matter which one.
|
||||||
|
let config = Config::default();
|
||||||
|
let seeder_hostname = config
|
||||||
|
.initial_peer_hostnames()
|
||||||
|
.iter()
|
||||||
|
.next()
|
||||||
|
.unwrap()
|
||||||
|
.clone();
|
||||||
|
|
||||||
|
connect_isolated_run_tor_once_with(Mainnet, seeder_hostname).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Test that `connect_isolated` can use multiple isolated Tor connections at the same time.
|
||||||
|
///
|
||||||
|
/// Use the multi-threaded runtime to test concurrent Tor instances.
|
||||||
|
#[tokio::test(flavor = "multi_thread")]
|
||||||
|
async fn connect_isolated_run_tor_multi() {
|
||||||
|
zebra_test::init();
|
||||||
|
|
||||||
|
if zebra_test::net::zebra_skip_network_tests() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// These tests might take a long time on machines where Tor is censored.
|
||||||
|
|
||||||
|
let mut isolated_conns = FuturesUnordered::new();
|
||||||
|
|
||||||
|
// Use all the seeder hostnames for each network
|
||||||
|
for network in [Mainnet, Testnet] {
|
||||||
|
let config = Config {
|
||||||
|
network,
|
||||||
|
..Config::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
for seeder_hostname in config.initial_peer_hostnames().iter().cloned() {
|
||||||
|
let conn = connect_isolated_run_tor_once_with(network, seeder_hostname);
|
||||||
|
isolated_conns.push(conn);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for all the connections to complete (or timeout)
|
||||||
|
while let Some(()) = isolated_conns.next().await {}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn connect_isolated_run_tor_once_with(network: Network, hostname: String) {
|
||||||
|
// Connection errors are detected and ignored using the JoinHandle.
|
||||||
|
// (They might also make the test hang.)
|
||||||
|
let mut outbound_join_handle =
|
||||||
|
tokio::spawn(connect_isolated_tor(network, hostname, "".to_string()));
|
||||||
|
|
||||||
|
// Let the spawned task run for a long time.
|
||||||
|
let outbound_join_handle_timeout =
|
||||||
|
tokio::time::timeout(MAX_TEST_DURATION, &mut outbound_join_handle);
|
||||||
|
|
||||||
|
// Make sure that the isolated connection did not panic.
|
||||||
|
//
|
||||||
|
// We can't control network reliability in the test, so the only bad outcome is a panic.
|
||||||
|
// We make the test pass if there are network errors, if we get a valid running service,
|
||||||
|
// or if we are still waiting for Tor or the handshake.
|
||||||
|
let outbound_result = outbound_join_handle_timeout.await;
|
||||||
|
assert!(matches!(outbound_result, Ok(Ok(_)) | Err(_),));
|
||||||
|
|
||||||
|
outbound_join_handle.abort();
|
||||||
|
}
|
||||||
|
|
@ -147,6 +147,9 @@ mod peer_set;
|
||||||
mod policies;
|
mod policies;
|
||||||
mod protocol;
|
mod protocol;
|
||||||
|
|
||||||
|
#[cfg(feature = "tor")]
|
||||||
|
pub use crate::isolated::tor::connect_isolated_tor;
|
||||||
|
|
||||||
pub use crate::{
|
pub use crate::{
|
||||||
address_book::AddressBook,
|
address_book::AddressBook,
|
||||||
config::Config,
|
config::Config,
|
||||||
|
|
|
||||||
|
|
@ -27,7 +27,7 @@ where
|
||||||
S::Future: Send,
|
S::Future: Send,
|
||||||
C: ChainTip + Clone + Send + 'static,
|
C: ChainTip + Clone + Send + 'static,
|
||||||
{
|
{
|
||||||
handshaker: Handshake<S, TcpStream, C>,
|
handshaker: Handshake<S, C>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S, C> Clone for Connector<S, C>
|
impl<S, C> Clone for Connector<S, C>
|
||||||
|
|
@ -49,7 +49,7 @@ where
|
||||||
S::Future: Send,
|
S::Future: Send,
|
||||||
C: ChainTip + Clone + Send + 'static,
|
C: ChainTip + Clone + Send + 'static,
|
||||||
{
|
{
|
||||||
pub fn new(handshaker: Handshake<S, TcpStream, C>) -> Self {
|
pub fn new(handshaker: Handshake<S, C>) -> Self {
|
||||||
Connector { handshaker }
|
Connector { handshaker }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -87,15 +87,14 @@ where
|
||||||
connection_tracker,
|
connection_tracker,
|
||||||
}: OutboundConnectorRequest = req;
|
}: OutboundConnectorRequest = req;
|
||||||
|
|
||||||
let mut hs = self.handshaker.clone();
|
let hs = self.handshaker.clone();
|
||||||
let connected_addr = ConnectedAddr::new_outbound_direct(addr);
|
let connected_addr = ConnectedAddr::new_outbound_direct(addr);
|
||||||
let connector_span = info_span!("connector", peer = ?connected_addr);
|
let connector_span = info_span!("connector", peer = ?connected_addr);
|
||||||
|
|
||||||
async move {
|
async move {
|
||||||
let tcp_stream = TcpStream::connect(addr).await?;
|
let tcp_stream = TcpStream::connect(addr).await?;
|
||||||
hs.ready().await?;
|
|
||||||
let client = hs
|
let client = hs
|
||||||
.call(HandshakeRequest::<TcpStream> {
|
.oneshot(HandshakeRequest::<TcpStream> {
|
||||||
data_stream: tcp_stream,
|
data_stream: tcp_stream,
|
||||||
connected_addr,
|
connected_addr,
|
||||||
connection_tracker,
|
connection_tracker,
|
||||||
|
|
|
||||||
|
|
@ -1,9 +1,10 @@
|
||||||
|
//! Initial [`Handshake`s] with Zebra peers over a `PeerTransport`.
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
cmp::min,
|
cmp::min,
|
||||||
collections::HashSet,
|
collections::HashSet,
|
||||||
fmt,
|
fmt,
|
||||||
future::Future,
|
future::Future,
|
||||||
marker::PhantomData,
|
|
||||||
net::{IpAddr, Ipv4Addr, SocketAddr},
|
net::{IpAddr, Ipv4Addr, SocketAddr},
|
||||||
pin::Pin,
|
pin::Pin,
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
|
|
@ -54,12 +55,11 @@ use crate::{
|
||||||
/// To avoid hangs, each handshake (or its connector) should be:
|
/// To avoid hangs, each handshake (or its connector) should be:
|
||||||
/// - launched in a separate task, and
|
/// - launched in a separate task, and
|
||||||
/// - wrapped in a timeout.
|
/// - wrapped in a timeout.
|
||||||
pub struct Handshake<S, AsyncReadWrite, C = NoChainTip>
|
pub struct Handshake<S, C = NoChainTip>
|
||||||
where
|
where
|
||||||
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
|
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
|
||||||
S::Future: Send,
|
S::Future: Send,
|
||||||
C: ChainTip + Clone + Send + 'static,
|
C: ChainTip + Clone + Send + 'static,
|
||||||
AsyncReadWrite: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
|
||||||
{
|
{
|
||||||
config: Config,
|
config: Config,
|
||||||
user_agent: String,
|
user_agent: String,
|
||||||
|
|
@ -73,16 +73,13 @@ where
|
||||||
nonces: Arc<futures::lock::Mutex<HashSet<Nonce>>>,
|
nonces: Arc<futures::lock::Mutex<HashSet<Nonce>>>,
|
||||||
|
|
||||||
parent_span: Span,
|
parent_span: Span,
|
||||||
|
|
||||||
_phantom_data: PhantomData<AsyncReadWrite>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S, AsyncReadWrite, C> Clone for Handshake<S, AsyncReadWrite, C>
|
impl<S, C> Clone for Handshake<S, C>
|
||||||
where
|
where
|
||||||
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
|
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
|
||||||
S::Future: Send,
|
S::Future: Send,
|
||||||
C: ChainTip + Clone + Send + 'static,
|
C: ChainTip + Clone + Send + 'static,
|
||||||
AsyncReadWrite: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
|
||||||
{
|
{
|
||||||
fn clone(&self) -> Self {
|
fn clone(&self) -> Self {
|
||||||
Self {
|
Self {
|
||||||
|
|
@ -96,7 +93,6 @@ where
|
||||||
minimum_peer_version: self.minimum_peer_version.clone(),
|
minimum_peer_version: self.minimum_peer_version.clone(),
|
||||||
nonces: self.nonces.clone(),
|
nonces: self.nonces.clone(),
|
||||||
parent_span: self.parent_span.clone(),
|
parent_span: self.parent_span.clone(),
|
||||||
_phantom_data: self._phantom_data,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -340,12 +336,11 @@ impl fmt::Debug for ConnectedAddr {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A builder for `Handshake`.
|
/// A builder for `Handshake`.
|
||||||
pub struct Builder<S, AsyncReadWrite, C = NoChainTip>
|
pub struct Builder<S, C = NoChainTip>
|
||||||
where
|
where
|
||||||
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
|
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
|
||||||
S::Future: Send,
|
S::Future: Send,
|
||||||
C: ChainTip + Clone + Send + 'static,
|
C: ChainTip + Clone + Send + 'static,
|
||||||
AsyncReadWrite: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
|
||||||
{
|
{
|
||||||
config: Option<Config>,
|
config: Option<Config>,
|
||||||
our_services: Option<PeerServices>,
|
our_services: Option<PeerServices>,
|
||||||
|
|
@ -356,16 +351,13 @@ where
|
||||||
address_book_updater: Option<tokio::sync::mpsc::Sender<MetaAddrChange>>,
|
address_book_updater: Option<tokio::sync::mpsc::Sender<MetaAddrChange>>,
|
||||||
inv_collector: Option<broadcast::Sender<(InventoryHash, SocketAddr)>>,
|
inv_collector: Option<broadcast::Sender<(InventoryHash, SocketAddr)>>,
|
||||||
latest_chain_tip: C,
|
latest_chain_tip: C,
|
||||||
|
|
||||||
_phantom_data: PhantomData<AsyncReadWrite>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S, AsyncReadWrite, C> Builder<S, AsyncReadWrite, C>
|
impl<S, C> Builder<S, C>
|
||||||
where
|
where
|
||||||
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
|
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
|
||||||
S::Future: Send,
|
S::Future: Send,
|
||||||
C: ChainTip + Clone + Send + 'static,
|
C: ChainTip + Clone + Send + 'static,
|
||||||
AsyncReadWrite: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
|
||||||
{
|
{
|
||||||
/// Provide a config. Mandatory.
|
/// Provide a config. Mandatory.
|
||||||
pub fn with_config(mut self, config: Config) -> Self {
|
pub fn with_config(mut self, config: Config) -> Self {
|
||||||
|
|
@ -425,10 +417,7 @@ where
|
||||||
/// constant over network upgrade activations.
|
/// constant over network upgrade activations.
|
||||||
///
|
///
|
||||||
/// Use [`NoChainTip`] to explicitly provide no chain tip.
|
/// Use [`NoChainTip`] to explicitly provide no chain tip.
|
||||||
pub fn with_latest_chain_tip<NewC>(
|
pub fn with_latest_chain_tip<NewC>(self, latest_chain_tip: NewC) -> Builder<S, NewC>
|
||||||
self,
|
|
||||||
latest_chain_tip: NewC,
|
|
||||||
) -> Builder<S, AsyncReadWrite, NewC>
|
|
||||||
where
|
where
|
||||||
NewC: ChainTip + Clone + Send + 'static,
|
NewC: ChainTip + Clone + Send + 'static,
|
||||||
{
|
{
|
||||||
|
|
@ -443,7 +432,6 @@ where
|
||||||
user_agent: self.user_agent,
|
user_agent: self.user_agent,
|
||||||
relay: self.relay,
|
relay: self.relay,
|
||||||
inv_collector: self.inv_collector,
|
inv_collector: self.inv_collector,
|
||||||
_phantom_data: self._phantom_data,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -458,7 +446,7 @@ where
|
||||||
/// Consume this builder and produce a [`Handshake`].
|
/// Consume this builder and produce a [`Handshake`].
|
||||||
///
|
///
|
||||||
/// Returns an error only if any mandatory field was unset.
|
/// Returns an error only if any mandatory field was unset.
|
||||||
pub fn finish(self) -> Result<Handshake<S, AsyncReadWrite, C>, &'static str> {
|
pub fn finish(self) -> Result<Handshake<S, C>, &'static str> {
|
||||||
let config = self.config.ok_or("did not specify config")?;
|
let config = self.config.ok_or("did not specify config")?;
|
||||||
let inbound_service = self
|
let inbound_service = self
|
||||||
.inbound_service
|
.inbound_service
|
||||||
|
|
@ -491,19 +479,17 @@ where
|
||||||
minimum_peer_version,
|
minimum_peer_version,
|
||||||
nonces,
|
nonces,
|
||||||
parent_span: Span::current(),
|
parent_span: Span::current(),
|
||||||
_phantom_data: self._phantom_data,
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S, AsyncReadWrite> Handshake<S, AsyncReadWrite, NoChainTip>
|
impl<S> Handshake<S, NoChainTip>
|
||||||
where
|
where
|
||||||
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
|
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
|
||||||
S::Future: Send,
|
S::Future: Send,
|
||||||
AsyncReadWrite: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
|
||||||
{
|
{
|
||||||
/// Create a builder that configures a [`Handshake`] service.
|
/// Create a builder that configures a [`Handshake`] service.
|
||||||
pub fn builder() -> Builder<S, AsyncReadWrite, NoChainTip> {
|
pub fn builder() -> Builder<S, NoChainTip> {
|
||||||
// We don't derive `Default` because the derive inserts a `where S:
|
// We don't derive `Default` because the derive inserts a `where S:
|
||||||
// Default` bound even though `Option<S>` implements `Default` even if
|
// Default` bound even though `Option<S>` implements `Default` even if
|
||||||
// `S` does not.
|
// `S` does not.
|
||||||
|
|
@ -516,7 +502,6 @@ where
|
||||||
address_book_updater: None,
|
address_book_updater: None,
|
||||||
inv_collector: None,
|
inv_collector: None,
|
||||||
latest_chain_tip: NoChainTip,
|
latest_chain_tip: NoChainTip,
|
||||||
_phantom_data: PhantomData::default(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -527,8 +512,8 @@ where
|
||||||
/// We split `Handshake` into its components before calling this function,
|
/// We split `Handshake` into its components before calling this function,
|
||||||
/// to avoid infectious `Sync` bounds on the returned future.
|
/// to avoid infectious `Sync` bounds on the returned future.
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
pub async fn negotiate_version<AsyncReadWrite>(
|
pub async fn negotiate_version<PeerTransport>(
|
||||||
peer_conn: &mut Framed<AsyncReadWrite, Codec>,
|
peer_conn: &mut Framed<PeerTransport, Codec>,
|
||||||
connected_addr: &ConnectedAddr,
|
connected_addr: &ConnectedAddr,
|
||||||
config: Config,
|
config: Config,
|
||||||
nonces: Arc<futures::lock::Mutex<HashSet<Nonce>>>,
|
nonces: Arc<futures::lock::Mutex<HashSet<Nonce>>>,
|
||||||
|
|
@ -538,7 +523,7 @@ pub async fn negotiate_version<AsyncReadWrite>(
|
||||||
mut minimum_peer_version: MinimumPeerVersion<impl ChainTip>,
|
mut minimum_peer_version: MinimumPeerVersion<impl ChainTip>,
|
||||||
) -> Result<(Version, PeerServices, SocketAddr), HandshakeError>
|
) -> Result<(Version, PeerServices, SocketAddr), HandshakeError>
|
||||||
where
|
where
|
||||||
AsyncReadWrite: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
PeerTransport: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
||||||
{
|
{
|
||||||
// Create a random nonce for this connection
|
// Create a random nonce for this connection
|
||||||
let local_nonce = Nonce::default();
|
let local_nonce = Nonce::default();
|
||||||
|
|
@ -729,12 +714,12 @@ where
|
||||||
|
|
||||||
/// A handshake request.
|
/// A handshake request.
|
||||||
/// Contains the information needed to handshake with the peer.
|
/// Contains the information needed to handshake with the peer.
|
||||||
pub struct HandshakeRequest<AsyncReadWrite>
|
pub struct HandshakeRequest<PeerTransport>
|
||||||
where
|
where
|
||||||
AsyncReadWrite: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
PeerTransport: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
||||||
{
|
{
|
||||||
/// The tokio [`TcpStream`] or Tor [`DataStream`] to the peer.
|
/// The tokio [`TcpStream`] or Tor [`DataStream`] to the peer.
|
||||||
pub data_stream: AsyncReadWrite,
|
pub data_stream: PeerTransport,
|
||||||
|
|
||||||
/// The address of the peer, and other related information.
|
/// The address of the peer, and other related information.
|
||||||
pub connected_addr: ConnectedAddr,
|
pub connected_addr: ConnectedAddr,
|
||||||
|
|
@ -745,13 +730,12 @@ where
|
||||||
pub connection_tracker: ConnectionTracker,
|
pub connection_tracker: ConnectionTracker,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S, AsyncReadWrite, C> Service<HandshakeRequest<AsyncReadWrite>>
|
impl<S, PeerTransport, C> Service<HandshakeRequest<PeerTransport>> for Handshake<S, C>
|
||||||
for Handshake<S, AsyncReadWrite, C>
|
|
||||||
where
|
where
|
||||||
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
|
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
|
||||||
S::Future: Send,
|
S::Future: Send,
|
||||||
C: ChainTip + Clone + Send + 'static,
|
C: ChainTip + Clone + Send + 'static,
|
||||||
AsyncReadWrite: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
PeerTransport: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
||||||
{
|
{
|
||||||
type Response = Client;
|
type Response = Client;
|
||||||
type Error = BoxError;
|
type Error = BoxError;
|
||||||
|
|
@ -762,7 +746,7 @@ where
|
||||||
Poll::Ready(Ok(()))
|
Poll::Ready(Ok(()))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn call(&mut self, req: HandshakeRequest<AsyncReadWrite>) -> Self::Future {
|
fn call(&mut self, req: HandshakeRequest<PeerTransport>) -> Self::Future {
|
||||||
let HandshakeRequest {
|
let HandshakeRequest {
|
||||||
data_stream,
|
data_stream,
|
||||||
connected_addr,
|
connected_addr,
|
||||||
|
|
|
||||||
|
|
@ -65,6 +65,7 @@ pub fn init() {
|
||||||
.add_directive("zebra_network=error".parse().unwrap())
|
.add_directive("zebra_network=error".parse().unwrap())
|
||||||
.add_directive("zebra_state=error".parse().unwrap())
|
.add_directive("zebra_state=error".parse().unwrap())
|
||||||
.add_directive("zebrad=error".parse().unwrap())
|
.add_directive("zebrad=error".parse().unwrap())
|
||||||
|
.add_directive("tor_circmgr=error".parse().unwrap())
|
||||||
});
|
});
|
||||||
|
|
||||||
tracing_subscriber::registry()
|
tracing_subscriber::registry()
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue