diff --git a/Cargo.toml b/Cargo.toml index 98e9659b..db517062 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,3 +11,6 @@ members = [ "zebrad", ] +[patch.crates-io] +tracing-attributes = { git = "https://github.com/tokio-rs/tracing" } +tracing = { git = "https://github.com/tokio-rs/tracing" } \ No newline at end of file diff --git a/zebra-network/Cargo.toml b/zebra-network/Cargo.toml index 8e70c965..a6bea339 100644 --- a/zebra-network/Cargo.toml +++ b/zebra-network/Cargo.toml @@ -11,6 +11,8 @@ rand = "0.7" byteorder = "1.3" chrono = "0.4" failure = "0.1" -tokio = "=0.2.0-alpha.4" +tokio = "=0.2.0-alpha.5" +tracing = { git = "https://github.com/tokio-rs/tracing" } +tracing-futures = { git = "https://github.com/tokio-rs/tracing", features = ["tokio-alpha"], default-features = false } -zebra-chain = { path = "../zebra-chain" } +zebra-chain = { path = "../zebra-chain" } \ No newline at end of file diff --git a/zebra-network/src/lib.rs b/zebra-network/src/lib.rs index 2ef2c802..6c63049e 100644 --- a/zebra-network/src/lib.rs +++ b/zebra-network/src/lib.rs @@ -4,6 +4,8 @@ #[macro_use] extern crate failure; +#[macro_use] +extern crate tracing; pub mod message; pub mod types; diff --git a/zebra-network/src/message.rs b/zebra-network/src/message.rs index d7185ac1..af1183b2 100644 --- a/zebra-network/src/message.rs +++ b/zebra-network/src/message.rs @@ -284,6 +284,7 @@ pub enum RejectReason { impl Message { /// Send `self` to the given async writer (e.g., a network stream). + #[instrument(level = "debug", skip(writer))] pub async fn send( &self, mut writer: W, @@ -329,6 +330,7 @@ impl Message { // extension trait, which is only defined for sync Writers. // The header is 4+12+4+4=24 bytes long. + trace!(?command, body_len = body.len()); let mut header = [0u8; 24]; let mut header_writer = Cursor::new(&mut header[..]); header_writer.write_all(&magic.0)?; @@ -343,6 +345,7 @@ impl Message { } /// Receive a message from the given async reader (e.g., a network stream). + #[instrument(level = "debug", skip(reader))] pub async fn recv( mut reader: R, magic: Magic, @@ -366,6 +369,7 @@ impl Message { let command = header_reader.read_12_bytes()?; let body_len = header_reader.read_u32::()? as usize; let checksum = Sha256dChecksum(header_reader.read_4_bytes()?); + trace!(?message_magic, ?command, body_len, ?checksum); ensure!( magic == message_magic, @@ -418,6 +422,7 @@ impl Message { /// contain a checksum of the message body. fn write_body(&self, mut writer: W, _m: Magic, _v: Version) -> Result<(), Error> { use Message::*; + trace!(?self); match *self { Version { ref version, @@ -499,67 +504,99 @@ fn try_read_pong(mut reader: R, _version: Version) -> Result()?))) } +#[instrument(level = "trace", skip(_reader, _version))] fn try_read_reject(mut _reader: R, _version: Version) -> Result { + trace!("reject"); bail!("unimplemented message type") } +#[instrument(level = "trace", skip(_reader, _version))] fn try_read_addr(mut _reader: R, _version: Version) -> Result { + trace!("addr"); bail!("unimplemented message type") } +#[instrument(level = "trace", skip(_reader, _version))] fn try_read_getaddr(mut _reader: R, _version: Version) -> Result { + trace!("getaddr"); bail!("unimplemented message type") } +#[instrument(level = "trace", skip(_reader, _version))] fn try_read_block(mut _reader: R, _version: Version) -> Result { + trace!("block"); bail!("unimplemented message type") } +#[instrument(level = "trace", skip(_reader, _version))] fn try_read_getblocks(mut _reader: R, _version: Version) -> Result { + trace!("getblocks"); bail!("unimplemented message type") } +#[instrument(level = "trace", skip(_reader, _version))] fn try_read_headers(mut _reader: R, _version: Version) -> Result { + trace!("headers"); bail!("unimplemented message type") } +#[instrument(level = "trace", skip(_reader, _version))] fn try_read_getheaders(mut _reader: R, _version: Version) -> Result { + trace!("getheaders"); bail!("unimplemented message type") } +#[instrument(level = "trace", skip(_reader, _version))] fn try_read_inv(mut _reader: R, _version: Version) -> Result { + trace!("inv"); bail!("unimplemented message type") } +#[instrument(level = "trace", skip(_reader, _version))] fn try_read_getdata(mut _reader: R, _version: Version) -> Result { + trace!("getdata"); bail!("unimplemented message type") } +#[instrument(level = "trace", skip(_reader, _version))] fn try_read_notfound(mut _reader: R, _version: Version) -> Result { + trace!("notfound"); bail!("unimplemented message type") } +#[instrument(level = "trace", skip(_reader, _version))] fn try_read_tx(mut _reader: R, _version: Version) -> Result { + trace!("tx"); bail!("unimplemented message type") } +#[instrument(level = "trace", skip(_reader, _version))] fn try_read_mempool(mut _reader: R, _version: Version) -> Result { + trace!("mempool"); bail!("unimplemented message type") } +#[instrument(level = "trace", skip(_reader, _version))] fn try_read_filterload(mut _reader: R, _version: Version) -> Result { + trace!("filterload"); bail!("unimplemented message type") } +#[instrument(level = "trace", skip(_reader, _version))] fn try_read_filteradd(mut _reader: R, _version: Version) -> Result { + trace!("filteradd"); bail!("unimplemented message type") } +#[instrument(level = "trace", skip(_reader, _version))] fn try_read_filterclear(mut _reader: R, _version: Version) -> Result { + trace!("filterclear"); bail!("unimplemented message type") } +#[instrument(level = "trace", skip(_reader, _version))] fn try_read_merkleblock(mut _reader: R, _version: Version) -> Result { + trace!("merkleblock"); bail!("unimplemented message type") } diff --git a/zebrad/Cargo.toml b/zebrad/Cargo.toml index 26b596f1..8c59902f 100644 --- a/zebrad/Cargo.toml +++ b/zebrad/Cargo.toml @@ -12,10 +12,13 @@ gumdrop = "0.6" lazy_static = "1" serde = { version = "1", features = ["serde_derive"] } toml = "0.5" -tokio = "=0.2.0-alpha.4" -tracing = "0.1" -tracing-subscriber = "0.1" -tracing-log = "=0.0.1-alpha.2" +tokio = "=0.2.0-alpha.5" +# Replace with git to pick up instrument derive changes, revert on release. +#tracing = "0.1" +tracing = { git = "https://github.com/tokio-rs/tracing" } +tracing-futures = { git = "https://github.com/tokio-rs/tracing", features = ["tokio-alpha"], default-features = false } +tracing-subscriber = { git = "https://github.com/tokio-rs/tracing" } +tracing-log = { git = "https://github.com/tokio-rs/tracing" } # Can't use published alpha because of conflicts tracking pin-project alphas #hyper = "=0.13.0-alpha.1" hyper = { git = "https://github.com/hyperium/hyper" } @@ -28,4 +31,3 @@ zebra-network = { path = "../zebra-network" } [dev-dependencies.abscissa_core] version = "0.3.0" features = ["testing"] - diff --git a/zebrad/src/commands/connect.rs b/zebrad/src/commands/connect.rs index 255c999a..8f2a6966 100644 --- a/zebrad/src/commands/connect.rs +++ b/zebrad/src/commands/connect.rs @@ -101,6 +101,30 @@ impl ConnectCmd { .await?; info!(resp_verack = ?resp_verack); + loop { + match Message::recv( + &mut stream, + constants::magics::MAINNET, + constants::CURRENT_VERSION, + ) + .await + { + Ok(msg) => match msg { + Message::Ping(nonce) => { + let pong = Message::Pong(nonce); + pong.send( + &mut stream, + constants::magics::MAINNET, + constants::CURRENT_VERSION, + ) + .await?; + } + _ => warn!("Unknown message"), + }, + Err(e) => error!("{}", e), + }; + } + stream.shutdown(Shutdown::Both)?; Ok(()) diff --git a/zebrad/src/components/tracing.rs b/zebrad/src/components/tracing.rs index c5f15fe0..c427e026 100644 --- a/zebrad/src/components/tracing.rs +++ b/zebrad/src/components/tracing.rs @@ -9,7 +9,7 @@ use hyper::{Body, Request, Response, Server}; use tracing::Subscriber; use tracing_log::LogTracer; -use tracing_subscriber::{EnvFilter, reload::Handle, FmtSubscriber}; +use tracing_subscriber::{reload::Handle, EnvFilter, FmtSubscriber}; /// Abscissa component which runs a tracing filter endpoint. #[derive(Component)]