Add a Mutex<HashSet<Nonce>> to detect self-conns.

This commit is contained in:
Henry de Valence 2019-10-15 16:10:43 -07:00
parent ed335e68f4
commit db7ac53f3b
4 changed files with 62 additions and 24 deletions

View File

@ -11,5 +11,5 @@ mod server;
pub use client::PeerClient; pub use client::PeerClient;
pub use connector::PeerConnector; pub use connector::PeerConnector;
pub use error::{PeerError, SharedPeerError}; pub use error::{HandshakeError, PeerError, SharedPeerError};
pub use server::PeerServer; pub use server::PeerServer;

View File

@ -1,6 +1,8 @@
use std::{ use std::{
collections::HashSet,
net::SocketAddr, net::SocketAddr,
pin::Pin, pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll}, task::{Context, Poll},
}; };
@ -20,7 +22,7 @@ use crate::{
BoxedStdError, Config, Network, BoxedStdError, Config, Network,
}; };
use super::{error::ErrorSlot, server::ServerState, PeerClient, PeerError, PeerServer}; use super::{error::ErrorSlot, server::ServerState, HandshakeError, PeerClient, PeerServer};
/// A [`Service`] that connects to a remote peer and constructs a client/server pair. /// A [`Service`] that connects to a remote peer and constructs a client/server pair.
pub struct PeerConnector<S> { pub struct PeerConnector<S> {
@ -28,13 +30,13 @@ pub struct PeerConnector<S> {
network: Network, network: Network,
internal_service: S, internal_service: S,
sender: mpsc::Sender<PeerLastSeen>, sender: mpsc::Sender<PeerLastSeen>,
nonces: Arc<Mutex<HashSet<Nonce>>>,
} }
impl<S> PeerConnector<S> impl<S> PeerConnector<S>
where where
S: Service<Request, Response = Response> + Clone + Send + 'static, S: Service<Request, Response = Response, Error = BoxedStdError> + Clone + Send + 'static,
S::Future: Send, S::Future: Send,
S::Error: Into<BoxedStdError>,
{ {
/// Construct a new `PeerConnector`. /// Construct a new `PeerConnector`.
pub fn new( pub fn new(
@ -54,6 +56,7 @@ where
network, network,
internal_service, internal_service,
sender, sender,
nonces: Arc::new(Mutex::new(HashSet::new())),
} }
} }
} }
@ -62,10 +65,9 @@ impl<S> Service<(TcpStream, SocketAddr)> for PeerConnector<S>
where where
S: Service<Request, Response = Response, Error = BoxedStdError> + Clone + Send + 'static, S: Service<Request, Response = Response, Error = BoxedStdError> + Clone + Send + 'static,
S::Future: Send, S::Future: Send,
S::Error: Send + Into<BoxedStdError>,
{ {
type Response = PeerClient; type Response = PeerClient;
type Error = PeerError; type Error = HandshakeError;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>; type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
@ -83,6 +85,7 @@ where
let internal_service = self.internal_service.clone(); let internal_service = self.internal_service.clone();
let sender = self.sender.clone(); let sender = self.sender.clone();
let user_agent = self.config.user_agent.clone(); let user_agent = self.config.user_agent.clone();
let nonces = self.nonces.clone();
let fut = async move { let fut = async move {
info!("connecting to remote peer"); info!("connecting to remote peer");
@ -90,6 +93,12 @@ where
let mut stream = let mut stream =
Framed::new(tcp_stream, Codec::builder().for_network(network).finish()); Framed::new(tcp_stream, Codec::builder().for_network(network).finish());
let local_nonce = Nonce::default();
nonces
.lock()
.expect("mutex should be unpoisoned")
.insert(local_nonce);
let version = Message::Version { let version = Message::Version {
version: constants::CURRENT_VERSION, version: constants::CURRENT_VERSION,
services: PeerServices::NODE_NETWORK, services: PeerServices::NODE_NETWORK,
@ -99,7 +108,7 @@ where
PeerServices::NODE_NETWORK, PeerServices::NODE_NETWORK,
"127.0.0.1:9000".parse().unwrap(), "127.0.0.1:9000".parse().unwrap(),
), ),
nonce: Nonce::default(), nonce: local_nonce,
user_agent, user_agent,
// XXX eventually the `PeerConnector` will need to have a handle // XXX eventually the `PeerConnector` will need to have a handle
// for a service that gets the current block height. // for a service that gets the current block height.
@ -108,36 +117,48 @@ where
}; };
debug!(?version, "sending initial version message"); debug!(?version, "sending initial version message");
stream.send(version).await?; stream.send(version).await?;
let remote_version = stream let remote_msg = stream
.next() .next()
.await .await
.ok_or_else(|| PeerError::ConnectionClosed)??; .ok_or_else(|| HandshakeError::ConnectionClosed)??;
debug!( // Check that we got a Version and destructure its fields into the local scope.
?remote_version, debug!(?remote_msg, "got message from remote peer");
"got remote peer's version message, sending verack" let remote_nonce = if let Message::Version { nonce, .. } = remote_msg {
); nonce
} else {
return Err(HandshakeError::UnexpectedMessage(remote_msg));
};
// Check for nonce reuse, indicating self-connection.
if {
let mut locked_nonces = nonces.lock().expect("mutex should be unpoisoned");
let nonce_reuse = locked_nonces.contains(&remote_nonce);
// Regardless of whether we observed nonce reuse, clean up the nonce set.
locked_nonces.remove(&local_nonce);
nonce_reuse
} {
return Err(HandshakeError::NonceReuse);
}
stream.send(Message::Verack).await?; stream.send(Message::Verack).await?;
let remote_verack = stream
let remote_msg = stream
.next() .next()
.await .await
.ok_or_else(|| PeerError::ConnectionClosed)??; .ok_or_else(|| HandshakeError::ConnectionClosed)??;
if let Message::Verack = remote_msg {
debug!(?remote_verack, "got remote peer's verack"); debug!("got verack from remote peer");
} else {
return Err(HandshakeError::UnexpectedMessage(remote_msg));
}
// XXX here is where we would set the version to the minimum of the // XXX here is where we would set the version to the minimum of the
// two versions, etc. -- actually is it possible to edit the `Codec` // two versions, etc. -- actually is it possible to edit the `Codec`
// after using it to make a framed adapter? // after using it to make a framed adapter?
// XXX do we want to use the random nonces to detect
// self-connections or do a different mechanism? if we use the
// nonces for their intended purpose we need communication between
// PeerConnector and PeerListener.
debug!("constructing PeerClient, spawning PeerServer"); debug!("constructing PeerClient, spawning PeerServer");
let (tx, rx) = mpsc::channel(0); let (tx, rx) = mpsc::channel(0);

View File

@ -51,3 +51,20 @@ impl ErrorSlot {
.map(|e| e.clone()) .map(|e| e.clone())
} }
} }
/// An error during a handshake with a remote peer.
#[derive(Error, Debug)]
pub enum HandshakeError {
/// The remote peer sent an unexpected message during the handshake.
#[error("The remote peer sent an unexpected message: {0:?}")]
UnexpectedMessage(crate::protocol::message::Message),
/// The peer connector detected handshake nonce reuse, possibly indicating self-connection.
#[error("Detected nonce reuse, possible self-connection")]
NonceReuse,
/// The remote peer closed the connection.
#[error("Peer closed connection")]
ConnectionClosed,
/// A serialization error occurred while reading or writing a message.
#[error("Serialization error")]
Serialization(#[from] SerializationError),
}

View File

@ -34,7 +34,7 @@ bitflags! {
} }
/// A nonce used in the networking layer to identify messages. /// A nonce used in the networking layer to identify messages.
#[derive(Copy, Clone, Debug, Eq, PartialEq)] #[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
pub struct Nonce(pub u64); pub struct Nonce(pub u64);
impl Default for Nonce { impl Default for Nonce {