From 6c787dd1885a415cfc2e0f8941b365225a5e3b13 Mon Sep 17 00:00:00 2001 From: teor Date: Sat, 15 Jan 2022 05:34:59 +1000 Subject: [PATCH] T1. Fix isolated connection bugs, improve tests, upgrade dependencies (#3302) * Make handshakes generic over AsyncRead + AsyncWrite * Simplify connect_isolated using ServiceExt::map_err and BoxError * Move isolated network tests to their own module * Improve isolated TCP connection tests * Add an in-memory connection test that uses AsyncReadWrite * Support connect_isolated on testnet * Add a wrapper function for isolated TCP connections to an IP address * Run test tasks for a while, and clean up after them * Upgrade Zebra dependencies to be compatible with arti, but don't add arti yet * Fix deny.toml Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> Co-authored-by: Conrado Gouvea --- Cargo.lock | 332 ++++++++++-------- deny.toml | 4 +- zebra-network/src/isolated.rs | 159 +++------ zebra-network/src/isolated/tests.rs | 3 + zebra-network/src/isolated/tests/vectors.rs | 220 ++++++++++++ zebra-network/src/lib.rs | 2 +- zebra-network/src/peer/connector.rs | 31 +- zebra-network/src/peer/handshake.rs | 149 +++++--- zebra-network/src/peer_set/initialize.rs | 7 +- .../src/peer_set/initialize/tests/vectors.rs | 126 +++---- 10 files changed, 658 insertions(+), 375 deletions(-) create mode 100644 zebra-network/src/isolated/tests.rs create mode 100644 zebra-network/src/isolated/tests/vectors.rs diff --git a/Cargo.lock b/Cargo.lock index de8d3e01..6cfe991b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -38,9 +38,9 @@ checksum = "74f5722bc48763cb9d81d8427ca05b6aa2842f6632cf8e4c0a29eef9baececcc" dependencies = [ "darling", "ident_case", - "proc-macro2 1.0.24", - "quote 1.0.7", - "syn 1.0.60", + "proc-macro2 1.0.34", + "quote 1.0.10", + "syn 1.0.83", "synstructure", ] @@ -86,7 +86,7 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "43bb833f0bf979d8475d38fbf09ed3b8a55e1885fe93ad3f93239fc6a4f17b98" dependencies = [ - "getrandom 0.2.0", + "getrandom 0.2.3", "once_cell", "version_check 0.9.2", ] @@ -167,9 +167,9 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3548b8efc9f8e8a5a0a2808c5bd8451a9031b9e5b879a79590304ae928b0a70" dependencies = [ - "proc-macro2 1.0.24", - "quote 1.0.7", - "syn 1.0.60", + "proc-macro2 1.0.34", + "quote 1.0.10", + "syn 1.0.83", ] [[package]] @@ -291,8 +291,8 @@ dependencies = [ "lazy_static", "lazycell", "peeking_take_while", - "proc-macro2 1.0.24", - "quote 1.0.7", + "proc-macro2 1.0.34", + "quote 1.0.10", "regex", "rustc-hash", "shlex 0.1.1", @@ -313,8 +313,8 @@ dependencies = [ "lazycell", "log", "peeking_take_while", - "proc-macro2 1.0.24", - "quote 1.0.7", + "proc-macro2 1.0.34", + "quote 1.0.10", "regex", "rustc-hash", "shlex 1.0.0", @@ -467,12 +467,6 @@ version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" -[[package]] -name = "bytes" -version = "0.5.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38" - [[package]] name = "bytes" version = "1.1.0" @@ -871,7 +865,7 @@ checksum = "22813a6dc45b335f9bade10bf7271dc477e81113e89eb251a0bc2a8a81c536e1" dependencies = [ "bstr", "csv-core", - "itoa 0.4.6", + "itoa 0.4.8", "ryu", "serde", ] @@ -917,10 +911,10 @@ checksum = "f0c960ae2da4de88a91b2d920c2a7233b400bc33cb28453a2987822d8392519b" dependencies = [ "fnv", "ident_case", - "proc-macro2 1.0.24", - "quote 1.0.7", + "proc-macro2 1.0.34", + "quote 1.0.10", "strsim 0.9.3", - "syn 1.0.60", + "syn 1.0.83", ] [[package]] @@ -930,8 +924,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9b5a2f4ac4969822c62224815d069952656cadc7084fdca9751e6d959189b72" dependencies = [ "darling_core", - "quote 1.0.7", - "syn 1.0.60", + "quote 1.0.10", + "syn 1.0.83", ] [[package]] @@ -998,9 +992,9 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "278ef1934318d524612205f69df005eea30ec10edf7913e500b5a527fce55bc0" dependencies = [ - "proc-macro2 1.0.24", - "quote 1.0.7", - "syn 1.0.60", + "proc-macro2 1.0.34", + "quote 1.0.10", + "syn 1.0.83", ] [[package]] @@ -1054,9 +1048,9 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e94aa31f7c0dc764f57896dc615ddd76fc13b0d5dca7eb6cc5e018a5a09ec06" dependencies = [ - "proc-macro2 1.0.24", - "quote 1.0.7", - "syn 1.0.60", + "proc-macro2 1.0.34", + "quote 1.0.10", + "syn 1.0.83", ] [[package]] @@ -1223,9 +1217,9 @@ version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6dbd947adfffb0efc70599b3ddcf7b5597bb5fa9e245eb99f62b3a5f7bb8bd3c" dependencies = [ - "proc-macro2 1.0.24", - "quote 1.0.7", - "syn 1.0.60", + "proc-macro2 1.0.34", + "quote 1.0.10", + "syn 1.0.83", ] [[package]] @@ -1303,13 +1297,13 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.2.0" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee8025cf36f917e6a52cce185b7c7177689b838b7ec138364e50cc2277a56cf4" +checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753" dependencies = [ - "cfg-if 0.1.10", + "cfg-if 1.0.0", "libc", - "wasi 0.9.0+wasi-snapshot-preview1", + "wasi 0.10.0+wasi-snapshot-preview1", ] [[package]] @@ -1319,9 +1313,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24b328c01a4d71d2d8173daa93562a73ab0fe85616876f02500f53d82948c504" dependencies = [ "proc-macro-error", - "proc-macro2 1.0.24", - "quote 1.0.7", - "syn 1.0.60", + "proc-macro2 1.0.34", + "quote 1.0.10", + "syn 1.0.83", ] [[package]] @@ -1376,9 +1370,9 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90454ce4de40b7ca6a8968b5ef367bdab48413962588d0d2b1638d60090c35d7" dependencies = [ - "proc-macro2 1.0.24", - "quote 1.0.7", - "syn 1.0.60", + "proc-macro2 1.0.34", + "quote 1.0.10", + "syn 1.0.83", ] [[package]] @@ -1387,7 +1381,7 @@ version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f072413d126e57991455e0a922b31e4c8ba7c2ffbebf6b78b4f8521397d65cd" dependencies = [ - "bytes 1.1.0", + "bytes", "fnv", "futures-core", "futures-sink", @@ -1490,13 +1484,13 @@ dependencies = [ [[package]] name = "http" -version = "0.2.1" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28d569972648b2c512421b5f2a405ad6ac9666547189d0c5477a3f200f3e02f9" +checksum = "1323096b05d41827dadeaee54c9981958c0f94e670bc94ed80037d1a7b8b186b" dependencies = [ - "bytes 0.5.6", + "bytes", "fnv", - "itoa 0.4.6", + "itoa 0.4.8", ] [[package]] @@ -1505,7 +1499,7 @@ version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ff4f84919677303da5f147645dbea6b1881f368d03ac84e1dc09031ebd7b2c6" dependencies = [ - "bytes 1.1.0", + "bytes", "http", "pin-project-lite", ] @@ -1534,7 +1528,7 @@ version = "0.14.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b7ec3e62bdc98a2f0393a5048e4c30ef659440ea6e0e572965103e72bd836f55" dependencies = [ - "bytes 1.1.0", + "bytes", "futures-channel", "futures-core", "futures-util", @@ -1543,7 +1537,7 @@ dependencies = [ "http-body", "httparse", "httpdate", - "itoa 0.4.6", + "itoa 0.4.8", "pin-project-lite", "socket2 0.4.2", "tokio", @@ -1554,17 +1548,15 @@ dependencies = [ [[package]] name = "hyper-rustls" -version = "0.22.1" +version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f9f7a97316d44c0af9b0301e65010573a853a9fc97046d7331d7f6bc0fd5a64" +checksum = "d87c48c02e0dc5e3b849a2041db3029fd066650f8f717c07bf8ed78ccb895cac" dependencies = [ - "futures-util", + "http", "hyper", - "log", - "rustls", + "rustls 0.20.2", "tokio", "tokio-rustls", - "webpki", ] [[package]] @@ -1660,9 +1652,9 @@ dependencies = [ [[package]] name = "itoa" -version = "0.4.6" +version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc6f3ad7b9d11a0c00842ff8de1b60ee58661048eb8049ed33c73594f359d7e6" +checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4" [[package]] name = "itoa" @@ -1841,9 +1833,9 @@ checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00" [[package]] name = "memchr" -version = "2.3.4" +version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ee1c47aaa256ecabcaea351eae4a9b01ef39ed810004e298d2511ed284b1525" +checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" [[package]] name = "memoffset" @@ -1906,10 +1898,10 @@ checksum = "caa72e4a3d157986dd2565c82ecbddcc23941513669a3766b938f6b72eb87f3f" dependencies = [ "lazy_static", "proc-macro-hack", - "proc-macro2 1.0.24", - "quote 1.0.7", + "proc-macro2 1.0.34", + "quote 1.0.10", "regex", - "syn 1.0.60", + "syn 1.0.83", ] [[package]] @@ -1959,8 +1951,8 @@ checksum = "d5f7db7a675c4b46b8842105b9371d6151e95fbbecd9b0e54dc2ea814397d2cc" dependencies = [ "lazy_static", "log", - "rustls", - "webpki", + "rustls 0.19.1", + "webpki 0.21.2", "webpki-roots 0.18.0", ] @@ -1998,7 +1990,7 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77a5d83df9f36fe23f0c3648c6bbb8b0298bb5f1939c8f2704431371f4b84d43" dependencies = [ - "smallvec 1.5.0", + "smallvec 1.7.0", ] [[package]] @@ -2060,7 +2052,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bafe4179722c2894288ee77a9f044f02811c86af699344c498b0840c698a2465" dependencies = [ "arrayvec 0.4.12", - "itoa 0.4.6", + "itoa 0.4.8", ] [[package]] @@ -2213,7 +2205,7 @@ dependencies = [ "instant", "libc", "redox_syscall 0.1.57", - "smallvec 1.5.0", + "smallvec 1.7.0", "winapi", ] @@ -2289,9 +2281,9 @@ version = "0.4.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3be26700300be6d9d23264c73211d8190e755b6b5ca7a1b28230025511b52a5e" dependencies = [ - "proc-macro2 1.0.24", - "quote 1.0.7", - "syn 1.0.60", + "proc-macro2 1.0.34", + "quote 1.0.10", + "syn 1.0.83", ] [[package]] @@ -2300,9 +2292,9 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c950132583b500556b1efd71d45b319029f2b71518d979fcc208e16b42426f" dependencies = [ - "proc-macro2 1.0.24", - "quote 1.0.7", - "syn 1.0.60", + "proc-macro2 1.0.34", + "quote 1.0.10", + "syn 1.0.83", ] [[package]] @@ -2375,9 +2367,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" dependencies = [ "proc-macro-error-attr", - "proc-macro2 1.0.24", - "quote 1.0.7", - "syn 1.0.60", + "proc-macro2 1.0.34", + "quote 1.0.10", + "syn 1.0.83", "version_check 0.9.2", ] @@ -2387,8 +2379,8 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" dependencies = [ - "proc-macro2 1.0.24", - "quote 1.0.7", + "proc-macro2 1.0.34", + "quote 1.0.10", "version_check 0.9.2", ] @@ -2409,9 +2401,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.24" +version = "1.0.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e0704ee1a7e00d7bb417d0770ea303c1bccbabf0ef1667dae92b5967f5f8a71" +checksum = "2f84e92c0f7c9d58328b85a78557813e4bd845130db68d7184635344399423b1" dependencies = [ "unicode-xid 0.2.1", ] @@ -2496,9 +2488,9 @@ version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "608c156fd8e97febc07dc9c2e2c80bf74cfc6ef26893eae3daf8bc2bc94a4b7f" dependencies = [ - "proc-macro2 1.0.24", - "quote 1.0.7", - "syn 1.0.60", + "proc-macro2 1.0.34", + "quote 1.0.10", + "syn 1.0.83", ] [[package]] @@ -2512,11 +2504,11 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.7" +version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa563d17ecb180e500da1cfd2b028310ac758de548efdd203e18f283af693f37" +checksum = "38bc8cc6a5f2e3655e0899c1b848643b2562f853f114bfec7be120678e3ace05" dependencies = [ - "proc-macro2 1.0.24", + "proc-macro2 1.0.34", ] [[package]] @@ -2595,7 +2587,7 @@ version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7" dependencies = [ - "getrandom 0.2.0", + "getrandom 0.2.3", ] [[package]] @@ -2712,7 +2704,7 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "528532f3d801c87aec9def2add9ca802fe569e44a544afe633765267840abe64" dependencies = [ - "getrandom 0.2.0", + "getrandom 0.2.3", "redox_syscall 0.2.10", ] @@ -2754,12 +2746,12 @@ dependencies = [ [[package]] name = "reqwest" -version = "0.11.6" +version = "0.11.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66d2927ca2f685faf0fc620ac4834690d29e7abb153add10f5812eef20b5e280" +checksum = "7c4e0a76dc12a116108933f6301b95e83634e0c47b0afbed6abbaa0601e99258" dependencies = [ "base64 0.13.0", - "bytes 1.1.0", + "bytes", "encoding_rs", "futures-core", "futures-util", @@ -2774,7 +2766,8 @@ dependencies = [ "mime", "percent-encoding", "pin-project-lite", - "rustls", + "rustls 0.20.2", + "rustls-pemfile", "serde", "serde_json", "serde_urlencoded", @@ -2784,7 +2777,7 @@ dependencies = [ "wasm-bindgen", "wasm-bindgen-futures", "web-sys", - "webpki-roots 0.21.1", + "webpki-roots 0.22.1", "winreg", ] @@ -2881,8 +2874,29 @@ dependencies = [ "base64 0.13.0", "log", "ring", - "sct", - "webpki", + "sct 0.6.0", + "webpki 0.21.2", +] + +[[package]] +name = "rustls" +version = "0.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d37e5e2290f3e040b594b1a9e04377c2c671f1a1cfd9bfdef82106ac1c113f84" +dependencies = [ + "log", + "ring", + "sct 0.7.0", + "webpki 0.22.0", +] + +[[package]] +name = "rustls-pemfile" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5eebeaeb360c87bfb72e84abdb3447159c0eaececf1bef2aecd65a8be949d1c9" +dependencies = [ + "base64 0.13.0", ] [[package]] @@ -2940,6 +2954,16 @@ dependencies = [ "untrusted", ] +[[package]] +name = "sct" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "secp256k1" version = "0.21.2" @@ -3107,9 +3131,9 @@ version = "1.0.133" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed201699328568d8d08208fdd080e3ff594e6c422e438b6705905da01005d537" dependencies = [ - "proc-macro2 1.0.24", - "quote 1.0.7", - "syn 1.0.60", + "proc-macro2 1.0.34", + "quote 1.0.10", + "syn 1.0.83", ] [[package]] @@ -3130,7 +3154,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "edfa57a7f8d9c1d260a549e7224100f6c43d43f9103e06dd8b4095a9b2b43ce9" dependencies = [ "form_urlencoded", - "itoa 0.4.6", + "itoa 0.4.8", "ryu", "serde", ] @@ -3212,9 +3236,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.5.0" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7acad6f34eb9e8a259d3283d1e8c1d34d7415943d4895f65cc73813c7396fc85" +checksum = "1ecab6c735a6bb4139c0caafd0cc3635748bbb3acf4550e8138122099251f309" [[package]] name = "socket2" @@ -3255,9 +3279,9 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5bdfb59103e43a0f99a346b57860d50f2138a7008d08acd964e9ac0fef3ae9a5" dependencies = [ - "proc-macro2 1.0.24", - "quote 1.0.7", - "syn 1.0.60", + "proc-macro2 1.0.34", + "quote 1.0.10", + "syn 1.0.83", ] [[package]] @@ -3315,9 +3339,9 @@ checksum = "dcb5ae327f9cc13b68763b5749770cb9e048a99bd9dfdfa58d0cf05d5f64afe0" dependencies = [ "heck", "proc-macro-error", - "proc-macro2 1.0.24", - "quote 1.0.7", - "syn 1.0.60", + "proc-macro2 1.0.34", + "quote 1.0.10", + "syn 1.0.83", ] [[package]] @@ -3339,12 +3363,12 @@ dependencies = [ [[package]] name = "syn" -version = "1.0.60" +version = "1.0.83" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c700597eca8a5a762beb35753ef6b94df201c81cca676604f547495a0d7f0081" +checksum = "23a1dfb999630e338648c83e91c59a4e9fb7620f520c3194b6b89e276f2f1959" dependencies = [ - "proc-macro2 1.0.24", - "quote 1.0.7", + "proc-macro2 1.0.34", + "quote 1.0.10", "unicode-xid 0.2.1", ] @@ -3354,9 +3378,9 @@ version = "0.12.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b834f2d66f734cb897113e34aaff2f1ab4719ca946f9a7358dba8f8064148701" dependencies = [ - "proc-macro2 1.0.24", - "quote 1.0.7", - "syn 1.0.60", + "proc-macro2 1.0.34", + "quote 1.0.10", + "syn 1.0.83", "unicode-xid 0.2.1", ] @@ -3413,9 +3437,9 @@ version = "1.0.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aa32fd3f627f367fe16f893e2597ae3c05020f8bba2666a4e6ea73d377e5714b" dependencies = [ - "proc-macro2 1.0.24", - "quote 1.0.7", - "syn 1.0.60", + "proc-macro2 1.0.34", + "quote 1.0.10", + "syn 1.0.83", ] [[package]] @@ -3469,7 +3493,7 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fbbf1c778ec206785635ce8ad57fe52b3009ae9e0c9f574a728f3049d3e55838" dependencies = [ - "bytes 1.1.0", + "bytes", "libc", "memchr", "mio", @@ -3489,20 +3513,20 @@ version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b557f72f448c511a979e2564e55d74e6c4432fc96ff4f6241bc6bded342643b7" dependencies = [ - "proc-macro2 1.0.24", - "quote 1.0.7", - "syn 1.0.60", + "proc-macro2 1.0.34", + "quote 1.0.10", + "syn 1.0.83", ] [[package]] name = "tokio-rustls" -version = "0.22.0" +version = "0.23.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6" +checksum = "a27d5f2b839802bd8267fa19b0530f5a08b9c08cd417976be2a65d130fe1c11b" dependencies = [ - "rustls", + "rustls 0.20.2", "tokio", - "webpki", + "webpki 0.22.0", ] [[package]] @@ -3524,7 +3548,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53474327ae5e166530d17f2d956afcb4f8a004de581b3cae10f12006bc8163e3" dependencies = [ "async-stream", - "bytes 1.1.0", + "bytes", "futures-core", "tokio", "tokio-stream", @@ -3536,7 +3560,7 @@ version = "0.6.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e99e1983e5d376cd8eb4b66604d2e99e79f5bd988c3055891dcd8c9e2604cc0" dependencies = [ - "bytes 1.1.0", + "bytes", "futures-core", "futures-sink", "log", @@ -3648,9 +3672,9 @@ version = "0.1.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f4f480b8f81512e825f337ad51e94c1eb5d3bbdf2b363dcd01e2b19a9ffe3f8e" dependencies = [ - "proc-macro2 1.0.24", - "quote 1.0.7", - "syn 1.0.60", + "proc-macro2 1.0.34", + "quote 1.0.10", + "syn 1.0.83", ] [[package]] @@ -3755,7 +3779,7 @@ dependencies = [ "serde", "serde_json", "sharded-slab", - "smallvec 1.5.0", + "smallvec 1.7.0", "thread_local", "tracing", "tracing-core", @@ -3771,9 +3795,9 @@ checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" [[package]] name = "typenum" -version = "1.12.0" +version = "1.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "373c8a200f9e67a0c95e62a4f52fbf80c23b4381c05a17845531982fa99e6b33" +checksum = "b63708a265f51345575b27fe43f9500ad611579e764c79edbc2037b1121959ec" [[package]] name = "uint" @@ -3978,9 +4002,9 @@ dependencies = [ "bumpalo", "lazy_static", "log", - "proc-macro2 1.0.24", - "quote 1.0.7", - "syn 1.0.60", + "proc-macro2 1.0.34", + "quote 1.0.10", + "syn 1.0.83", "wasm-bindgen-shared", ] @@ -4002,7 +4026,7 @@ version = "0.2.69" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a6ac8995ead1f084a8dea1e65f194d0973800c7f571f6edd70adf06ecf77084" dependencies = [ - "quote 1.0.7", + "quote 1.0.10", "wasm-bindgen-macro-support", ] @@ -4012,9 +4036,9 @@ version = "0.2.69" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5a48c72f299d80557c7c62e37e7225369ecc0c963964059509fbafe917c7549" dependencies = [ - "proc-macro2 1.0.24", - "quote 1.0.7", - "syn 1.0.60", + "proc-macro2 1.0.34", + "quote 1.0.10", + "syn 1.0.83", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -4045,22 +4069,32 @@ dependencies = [ "untrusted", ] +[[package]] +name = "webpki" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "webpki-roots" version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "91cd5736df7f12a964a5067a12c62fa38e1bd8080aff1f80bc29be7c80d19ab4" dependencies = [ - "webpki", + "webpki 0.21.2", ] [[package]] name = "webpki-roots" -version = "0.21.1" +version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aabe153544e473b775453675851ecc86863d2a81d786d741f6b76778f2a48940" +checksum = "c475786c6f47219345717a043a37ec04cb4bc185e28853adcc4fa0a947eba630" dependencies = [ - "webpki", + "webpki 0.22.0", ] [[package]] @@ -4346,7 +4380,7 @@ version = "1.0.0-beta.3" dependencies = [ "bitflags", "byteorder", - "bytes 1.1.0", + "bytes", "chrono", "futures", "hex", @@ -4522,8 +4556,8 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3f369ddb18862aba61aa49bf31e74d29f0f162dec753063200e1dc084345d16" dependencies = [ - "proc-macro2 1.0.24", - "quote 1.0.7", - "syn 1.0.60", + "proc-macro2 1.0.34", + "quote 1.0.10", + "syn 1.0.83", "synstructure", ] diff --git a/deny.toml b/deny.toml index e602aede..421d3ad9 100644 --- a/deny.toml +++ b/deny.toml @@ -52,9 +52,6 @@ skip-tree = [ # ticket #2998: hdrhistogram dependencies { name = "hdrhistogram", version = "=6.3.4" }, - # ticket #2999: http dependencies - { name = "bytes", version = "=0.5.6" }, - # ticket #3061: reqwest and minreq dependencies { name = "webpki-roots", version = "=0.18.0" }, @@ -70,6 +67,7 @@ skip-tree = [ # wait for lots of crates in the cryptographic ecosystem to upgrade { name = "rand", version = "=0.7.3" }, + { name = "rustls", version = "=0.19.1" }, # wait for lots of crates in the tokio ecosystem to upgrade { name = "socket2", version = "=0.3.16" }, diff --git a/zebra-network/src/isolated.rs b/zebra-network/src/isolated.rs index b5e99d85..73396a1c 100644 --- a/zebra-network/src/isolated.rs +++ b/zebra-network/src/isolated.rs @@ -1,19 +1,15 @@ -//! Code for creating isolated connections to specific peers. +//! Creating isolated connections to specific peers. -use std::{ - future::Future, - pin::Pin, - task::{Context, Poll}, -}; +use std::{future::Future, net::SocketAddr}; -use futures::future::{FutureExt, TryFutureExt}; -use tokio::net::TcpStream; +use futures::future::TryFutureExt; +use tokio::io::{AsyncRead, AsyncWrite}; use tower::{ util::{BoxService, Oneshot}, - Service, + ServiceExt, }; -use zebra_chain::chain_tip::NoChainTip; +use zebra_chain::{chain_tip::NoChainTip, parameters::Network}; use crate::{ peer::{self, ConnectedAddr, HandshakeRequest}, @@ -21,42 +17,50 @@ use crate::{ BoxError, Config, Request, Response, }; -/// Use the provided TCP connection to create a Zcash connection completely -/// isolated from all other node state. +#[cfg(test)] +mod tests; + +/// Creates a Zcash peer connection using the provided data stream. +/// This connection is completely isolated from all other node state. /// -/// The connection pool returned by `init` should be used for all requests that +/// The connection pool returned by [`init`](zebra_network::init) +/// should be used for all requests that /// don't require isolated state or use of an existing TCP connection. However, /// this low-level API is useful for custom network crawlers or Tor connections. /// /// In addition to being completely isolated from all other node state, this /// method also aims to be minimally distinguishable from other clients. /// +/// SECURITY TODO: check if the timestamp field can be zeroed, to remove another distinguisher (#3300) +/// /// Note that this method does not implement any timeout behavior, so callers may /// want to layer it with a timeout as appropriate for their application. /// /// # Inputs /// -/// - `conn`: an existing TCP connection to use. Passing an existing TCP -/// connection allows this method to be used with clearnet or Tor transports. +/// - `network`: the Zcash [`Network`] used for this connection. +/// +/// - `data_stream`: an existing data stream. This can be a non-anonymised TCP connection, +/// or a Tor client [`DataStream`]. /// /// - `user_agent`: a valid BIP14 user-agent, e.g., the empty string. -/// -/// # Bug -/// -/// `connect_isolated` only works on `Mainnet`, see #1687. -pub fn connect_isolated( - conn: TcpStream, +pub fn connect_isolated( + network: Network, + data_stream: AsyncReadWrite, user_agent: String, -) -> impl Future< - Output = Result< - BoxService>, - Box, - >, -> { +) -> impl Future, BoxError>> +where + AsyncReadWrite: AsyncRead + AsyncWrite + Unpin + Send + 'static, +{ + let config = Config { + network, + ..Config::default() + }; + let handshake = peer::Handshake::builder() - .with_config(Config::default()) + .with_config(config) .with_inbound_service(tower::service_fn(|_req| async move { - Ok::>(Response::Nil) + Ok::(Response::Nil) })) .with_user_agent(user_agent) .with_latest_chain_tip(NoChainTip) @@ -70,88 +74,29 @@ pub fn connect_isolated( Oneshot::new( handshake, HandshakeRequest { - tcp_stream: conn, + data_stream, connected_addr, connection_tracker, }, ) - .map_ok(|client| BoxService::new(Wrapper(client))) + .map_ok(|client| BoxService::new(client.map_err(Into::into))) } -// This can be deleted when a new version of Tower with map_err is released. -struct Wrapper(peer::Client); - -impl Service for Wrapper { - type Response = Response; - type Error = BoxError; - type Future = - Pin> + Send + 'static>>; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.0.poll_ready(cx).map_err(Into::into) - } - - fn call(&mut self, req: Request) -> Self::Future { - self.0.call(req).map_err(Into::into).boxed() - } -} - -#[cfg(test)] -mod tests { - - use super::*; - - #[tokio::test] - async fn connect_isolated_sends_minimally_distinguished_version_message() { - use std::net::SocketAddr; - - use futures::stream::StreamExt; - use tokio_util::codec::Framed; - - use crate::{ - protocol::external::{AddrInVersion, Codec, Message}, - types::PeerServices, - }; - - let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); - let listen_addr = listener.local_addr().unwrap(); - - let fixed_isolated_addr: SocketAddr = "0.0.0.0:8233".parse().unwrap(); - - let conn = tokio::net::TcpStream::connect(listen_addr).await.unwrap(); - - tokio::spawn(connect_isolated(conn, "".to_string())); - - let (conn, _) = listener.accept().await.unwrap(); - - let mut stream = Framed::new(conn, Codec::builder().finish()); - if let Message::Version { - services, - timestamp, - address_from, - user_agent, - start_height, - relay, - .. - } = stream - .next() - .await - .expect("stream item") - .expect("item is Ok(msg)") - { - // Check that the version message sent by connect_isolated - // has the fields specified in the Stolon RFC. - assert_eq!(services, PeerServices::empty()); - assert_eq!(timestamp.timestamp() % (5 * 60), 0); - assert_eq!( - address_from, - AddrInVersion::new(fixed_isolated_addr, PeerServices::empty()), - ); - assert_eq!(user_agent, ""); - assert_eq!(start_height.0, 0); - assert!(!relay); - } else { - panic!("handshake did not send version message"); - } - } +/// Creates a direct TCP Zcash peer connection to `addr`. +/// This connection is completely isolated from all other node state. +/// +/// See [`connect_isolated`] for details. +/// +/// # Privacy +/// +/// Transactions sent over this connection can be linked to the sending and receiving IP address +/// by passive internet observers. +pub fn connect_isolated_tcp_direct( + network: Network, + addr: SocketAddr, + user_agent: String, +) -> impl Future, BoxError>> { + tokio::net::TcpStream::connect(addr) + .err_into() + .and_then(move |tcp_stream| connect_isolated(network, tcp_stream, user_agent)) } diff --git a/zebra-network/src/isolated/tests.rs b/zebra-network/src/isolated/tests.rs new file mode 100644 index 00000000..37d10880 --- /dev/null +++ b/zebra-network/src/isolated/tests.rs @@ -0,0 +1,3 @@ +//! Tests for isolated Zebra connections. + +mod vectors; diff --git a/zebra-network/src/isolated/tests/vectors.rs b/zebra-network/src/isolated/tests/vectors.rs new file mode 100644 index 00000000..d2f9061b --- /dev/null +++ b/zebra-network/src/isolated/tests/vectors.rs @@ -0,0 +1,220 @@ +//! Fixed test vectors for isolated Zebra connections. + +use std::{net::SocketAddr, task::Poll, time::Duration}; + +use futures::stream::StreamExt; +use tokio_util::codec::Framed; + +use crate::{ + constants::CURRENT_NETWORK_PROTOCOL_VERSION, + protocol::external::{AddrInVersion, Codec, Message}, + types::PeerServices, +}; + +use super::super::*; + +use Network::*; + +/// Test that `connect_isolated` sends a version message with minimal distinguishing features, +/// when sent over TCP. +#[tokio::test] +async fn connect_isolated_sends_anonymised_version_message_tcp() { + zebra_test::init(); + + if zebra_test::net::zebra_skip_network_tests() { + return; + } + + connect_isolated_sends_anonymised_version_message_tcp_net(Mainnet).await; + connect_isolated_sends_anonymised_version_message_tcp_net(Testnet).await; +} + +async fn connect_isolated_sends_anonymised_version_message_tcp_net(network: Network) { + // These tests might fail on machines with no configured IPv4 addresses. + // (Localhost should be enough.) + + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let listen_addr = listener.local_addr().unwrap(); + + // Connection errors are detected using the JoinHandle. + // (They might also make the test hang.) + let mut outbound_join_handle = tokio::spawn(connect_isolated_tcp_direct( + network, + listen_addr, + "".to_string(), + )); + + let (inbound_conn, _) = listener.accept().await.unwrap(); + + let mut inbound_stream = + Framed::new(inbound_conn, Codec::builder().for_network(network).finish()); + + // We don't need to send any bytes to get a version message. + if let Message::Version { + version, + services, + timestamp, + address_recv, + address_from, + nonce: _, + user_agent, + start_height, + relay, + } = inbound_stream + .next() + .await + .expect("stream item") + .expect("item is Ok(msg)") + { + // Check that the version message sent by connect_isolated + // anonymises all the fields that it possibly can. + // + // The version field needs to be accurate, because it controls protocol features. + // The nonce must be randomised for security. + // + // SECURITY TODO: check if the timestamp field can be zeroed, to remove another distinguisher (#3300) + + let mut fixed_isolated_addr: SocketAddr = "0.0.0.0:0".parse().unwrap(); + fixed_isolated_addr.set_port(network.default_port()); + + // Required fields should be accurate and match most other peers. + // (We can't test nonce randomness here.) + assert_eq!(version, CURRENT_NETWORK_PROTOCOL_VERSION); + assert_eq!(timestamp.timestamp() % (5 * 60), 0); + + // Other fields should be empty or zeroed. + assert_eq!(services, PeerServices::empty()); + assert_eq!( + address_recv, + // Since we're connecting to the peer, we expect it to have the node flag. + // + // SECURITY TODO: should this just be zeroed anyway? (#3300) + AddrInVersion::new(fixed_isolated_addr, PeerServices::NODE_NETWORK), + ); + assert_eq!( + address_from, + AddrInVersion::new(fixed_isolated_addr, PeerServices::empty()), + ); + assert_eq!(user_agent, ""); + assert_eq!(start_height.0, 0); + assert!(!relay); + } else { + panic!("handshake did not send version message"); + } + + // Let the spawned task run for a short time. + tokio::time::sleep(Duration::from_secs(1)).await; + + // Make sure that the isolated connection did not: + // - panic, or + // - return a service. + // + // This test doesn't send a version message on `inbound_conn`, + // so providing a service is incorrect behaviour. + // + // A timeout error would be acceptable, + // but a TCP connection error indicates a potential test setup issue. + // So we fail on them both, because we expect this test to complete before the timeout. + let outbound_result = futures::poll!(&mut outbound_join_handle); + assert!(matches!(outbound_result, Poll::Pending)); + + outbound_join_handle.abort(); +} + +/// Test that `connect_isolated` sends a version message with minimal distinguishing features, +/// when sent in-memory. +/// +/// This test also: +/// - checks `AsyncReadWrite` support, and +/// - runs even if network tests are disabled. +#[tokio::test] +async fn connect_isolated_sends_anonymised_version_message_mem() { + zebra_test::init(); + + connect_isolated_sends_anonymised_version_message_mem_net(Mainnet).await; + connect_isolated_sends_anonymised_version_message_mem_net(Testnet).await; +} + +async fn connect_isolated_sends_anonymised_version_message_mem_net(network: Network) { + // We expect version messages to be ~100 bytes + let (inbound_stream, outbound_stream) = tokio::io::duplex(1024); + + let mut outbound_join_handle = + tokio::spawn(connect_isolated(network, outbound_stream, "".to_string())); + + let mut inbound_stream = Framed::new( + inbound_stream, + Codec::builder().for_network(network).finish(), + ); + + // We don't need to send any bytes to get a version message. + if let Message::Version { + version, + services, + timestamp, + address_recv, + address_from, + nonce: _, + user_agent, + start_height, + relay, + } = inbound_stream + .next() + .await + .expect("stream item") + .expect("item is Ok(msg)") + { + // Check that the version message sent by connect_isolated + // anonymises all the fields that it possibly can. + // + // The version field needs to be accurate, because it controls protocol features. + // The nonce must be randomised for security. + // + // SECURITY TODO: check if the timestamp field can be zeroed, to remove another distinguisher (#3300) + + let mut fixed_isolated_addr: SocketAddr = "0.0.0.0:0".parse().unwrap(); + fixed_isolated_addr.set_port(network.default_port()); + + // Required fields should be accurate and match most other peers. + // (We can't test nonce randomness here.) + assert_eq!(version, CURRENT_NETWORK_PROTOCOL_VERSION); + assert_eq!(timestamp.timestamp() % (5 * 60), 0); + + // Other fields should be empty or zeroed. + assert_eq!(services, PeerServices::empty()); + assert_eq!( + address_recv, + // Since we're connecting to the peer, we expect it to have the node flag. + // + // SECURITY TODO: should this just be zeroed anyway? (#3300) + AddrInVersion::new(fixed_isolated_addr, PeerServices::NODE_NETWORK), + ); + assert_eq!( + address_from, + AddrInVersion::new(fixed_isolated_addr, PeerServices::empty()), + ); + assert_eq!(user_agent, ""); + assert_eq!(start_height.0, 0); + assert!(!relay); + } else { + panic!("handshake did not send version message"); + } + + // Let the spawned task run for a short time. + tokio::time::sleep(Duration::from_secs(1)).await; + + // Make sure that the isolated connection did not: + // - panic, or + // - return a service. + // + // This test doesn't send a version message on `inbound_conn`, + // so providing a service is incorrect behaviour. + // (But a timeout error would be acceptable.) + let outbound_result = futures::poll!(&mut outbound_join_handle); + assert!(matches!( + outbound_result, + Poll::Pending | Poll::Ready(Ok(Err(_))) + )); + + outbound_join_handle.abort(); +} diff --git a/zebra-network/src/lib.rs b/zebra-network/src/lib.rs index d9867251..eb81d168 100644 --- a/zebra-network/src/lib.rs +++ b/zebra-network/src/lib.rs @@ -70,7 +70,7 @@ mod protocol; pub use crate::{ address_book::AddressBook, config::Config, - isolated::connect_isolated, + isolated::{connect_isolated, connect_isolated_tcp_direct}, meta_addr::PeerAddrState, peer::{HandshakeError, PeerError, SharedPeerError}, peer_set::init, diff --git a/zebra-network/src/peer/connector.rs b/zebra-network/src/peer/connector.rs index 9fd08b44..52355b37 100644 --- a/zebra-network/src/peer/connector.rs +++ b/zebra-network/src/peer/connector.rs @@ -21,11 +21,21 @@ use crate::{ /// A wrapper around [`peer::Handshake`] that opens a TCP connection before /// forwarding to the inner handshake service. Writing this as its own /// [`tower::Service`] lets us apply unified timeout policies, etc. -pub struct Connector { - handshaker: Handshake, +pub struct Connector +where + S: Service + Clone + Send + 'static, + S::Future: Send, + C: ChainTip + Clone + Send + 'static, +{ + handshaker: Handshake, } -impl Clone for Connector { +impl Clone for Connector +where + S: Service + Clone + Send + 'static, + S::Future: Send, + C: ChainTip + Clone + Send + 'static, +{ fn clone(&self) -> Self { Connector { handshaker: self.handshaker.clone(), @@ -33,8 +43,13 @@ impl Clone for Connector { } } -impl Connector { - pub fn new(handshaker: Handshake) -> Self { +impl Connector +where + S: Service + Clone + Send + 'static, + S::Future: Send, + C: ChainTip + Clone + Send + 'static, +{ + pub fn new(handshaker: Handshake) -> Self { Connector { handshaker } } } @@ -77,11 +92,11 @@ where let connector_span = info_span!("connector", peer = ?connected_addr); async move { - let stream = TcpStream::connect(addr).await?; + let tcp_stream = TcpStream::connect(addr).await?; hs.ready().await?; let client = hs - .call(HandshakeRequest { - tcp_stream: stream, + .call(HandshakeRequest:: { + data_stream: tcp_stream, connected_addr, connection_tracker, }) diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index 43d76159..68281703 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -3,6 +3,7 @@ use std::{ collections::HashSet, fmt, future::Future, + marker::PhantomData, net::{IpAddr, Ipv4Addr, SocketAddr}, pin::Pin, sync::Arc, @@ -12,7 +13,7 @@ use std::{ use chrono::{TimeZone, Utc}; use futures::{channel::oneshot, future, pin_mut, FutureExt, SinkExt, StreamExt}; use tokio::{ - net::TcpStream, + io::{AsyncRead, AsyncWrite}, sync::broadcast, task::JoinError, time::{timeout, Instant}, @@ -53,18 +54,51 @@ use crate::{ /// To avoid hangs, each handshake (or its connector) should be: /// - launched in a separate task, and /// - wrapped in a timeout. -#[derive(Clone)] -pub struct Handshake { +pub struct Handshake +where + S: Service + Clone + Send + 'static, + S::Future: Send, + C: ChainTip + Clone + Send + 'static, + AsyncReadWrite: AsyncRead + AsyncWrite + Unpin + Send + 'static, +{ config: Config, - inbound_service: S, - address_book_updater: tokio::sync::mpsc::Sender, - inv_collector: broadcast::Sender<(InventoryHash, SocketAddr)>, - nonces: Arc>>, user_agent: String, our_services: PeerServices, relay: bool, - parent_span: Span, + + inbound_service: S, + address_book_updater: tokio::sync::mpsc::Sender, + inv_collector: broadcast::Sender<(InventoryHash, SocketAddr)>, minimum_peer_version: MinimumPeerVersion, + nonces: Arc>>, + + parent_span: Span, + + _phantom_data: PhantomData, +} + +impl Clone for Handshake +where + S: Service + Clone + Send + 'static, + S::Future: Send, + C: ChainTip + Clone + Send + 'static, + AsyncReadWrite: AsyncRead + AsyncWrite + Unpin + Send + 'static, +{ + fn clone(&self) -> Self { + Self { + config: self.config.clone(), + user_agent: self.user_agent.clone(), + our_services: self.our_services, + relay: self.relay, + inbound_service: self.inbound_service.clone(), + address_book_updater: self.address_book_updater.clone(), + inv_collector: self.inv_collector.clone(), + minimum_peer_version: self.minimum_peer_version.clone(), + nonces: self.nonces.clone(), + parent_span: self.parent_span.clone(), + _phantom_data: self._phantom_data, + } + } } /// The peer address that we are handshaking with. @@ -306,22 +340,32 @@ impl fmt::Debug for ConnectedAddr { } /// A builder for `Handshake`. -pub struct Builder { - config: Option, - inbound_service: Option, - address_book_updater: Option>, - our_services: Option, - user_agent: Option, - relay: Option, - inv_collector: Option>, - latest_chain_tip: C, -} - -impl Builder +pub struct Builder where S: Service + Clone + Send + 'static, S::Future: Send, - C: ChainTip, + C: ChainTip + Clone + Send + 'static, + AsyncReadWrite: AsyncRead + AsyncWrite + Unpin + Send + 'static, +{ + config: Option, + our_services: Option, + user_agent: Option, + relay: Option, + + inbound_service: Option, + address_book_updater: Option>, + inv_collector: Option>, + latest_chain_tip: C, + + _phantom_data: PhantomData, +} + +impl Builder +where + S: Service + Clone + Send + 'static, + S::Future: Send, + C: ChainTip + Clone + Send + 'static, + AsyncReadWrite: AsyncRead + AsyncWrite + Unpin + Send + 'static, { /// Provide a config. Mandatory. pub fn with_config(mut self, config: Config) -> Self { @@ -381,9 +425,16 @@ where /// constant over network upgrade activations. /// /// Use [`NoChainTip`] to explicitly provide no chain tip. - pub fn with_latest_chain_tip(self, latest_chain_tip: NewC) -> Builder { + pub fn with_latest_chain_tip( + self, + latest_chain_tip: NewC, + ) -> Builder + where + NewC: ChainTip + Clone + Send + 'static, + { Builder { latest_chain_tip, + // TODO: Until Rust RFC 2528 reaches stable, we can't do `..self` config: self.config, inbound_service: self.inbound_service, @@ -392,6 +443,7 @@ where user_agent: self.user_agent, relay: self.relay, inv_collector: self.inv_collector, + _phantom_data: self._phantom_data, } } @@ -406,7 +458,7 @@ where /// Consume this builder and produce a [`Handshake`]. /// /// Returns an error only if any mandatory field was unset. - pub fn finish(self) -> Result, &'static str> { + pub fn finish(self) -> Result, &'static str> { let config = self.config.ok_or("did not specify config")?; let inbound_service = self .inbound_service @@ -430,38 +482,41 @@ where Ok(Handshake { config, - inbound_service, - inv_collector, - address_book_updater, - nonces, user_agent, our_services, relay, - parent_span: Span::current(), + inbound_service, + address_book_updater, + inv_collector, minimum_peer_version, + nonces, + parent_span: Span::current(), + _phantom_data: self._phantom_data, }) } } -impl Handshake +impl Handshake where S: Service + Clone + Send + 'static, S::Future: Send, + AsyncReadWrite: AsyncRead + AsyncWrite + Unpin + Send + 'static, { /// Create a builder that configures a [`Handshake`] service. - pub fn builder() -> Builder { + pub fn builder() -> Builder { // We don't derive `Default` because the derive inserts a `where S: // Default` bound even though `Option` implements `Default` even if // `S` does not. Builder { config: None, + our_services: None, + user_agent: None, + relay: None, inbound_service: None, address_book_updater: None, - user_agent: None, - our_services: None, - relay: None, inv_collector: None, latest_chain_tip: NoChainTip, + _phantom_data: PhantomData::default(), } } } @@ -472,8 +527,8 @@ where /// We split `Handshake` into its components before calling this function, /// to avoid infectious `Sync` bounds on the returned future. #[allow(clippy::too_many_arguments)] -pub async fn negotiate_version( - peer_conn: &mut Framed, +pub async fn negotiate_version( + peer_conn: &mut Framed, connected_addr: &ConnectedAddr, config: Config, nonces: Arc>>, @@ -481,7 +536,10 @@ pub async fn negotiate_version( our_services: PeerServices, relay: bool, mut minimum_peer_version: MinimumPeerVersion, -) -> Result<(Version, PeerServices, SocketAddr), HandshakeError> { +) -> Result<(Version, PeerServices, SocketAddr), HandshakeError> +where + AsyncReadWrite: AsyncRead + AsyncWrite + Unpin + Send + 'static, +{ // Create a random nonce for this connection let local_nonce = Nonce::default(); // # Correctness @@ -671,9 +729,12 @@ pub async fn negotiate_version( /// A handshake request. /// Contains the information needed to handshake with the peer. -pub struct HandshakeRequest { - /// The TCP connection to the peer. - pub tcp_stream: TcpStream, +pub struct HandshakeRequest +where + AsyncReadWrite: AsyncRead + AsyncWrite + Unpin + Send + 'static, +{ + /// The tokio [`TcpStream`] or Tor [`DataStream`] to the peer. + pub data_stream: AsyncReadWrite, /// The address of the peer, and other related information. pub connected_addr: ConnectedAddr, @@ -684,11 +745,13 @@ pub struct HandshakeRequest { pub connection_tracker: ConnectionTracker, } -impl Service for Handshake +impl Service> + for Handshake where S: Service + Clone + Send + 'static, S::Future: Send, C: ChainTip + Clone + Send + 'static, + AsyncReadWrite: AsyncRead + AsyncWrite + Unpin + Send + 'static, { type Response = Client; type Error = BoxError; @@ -699,9 +762,9 @@ where Poll::Ready(Ok(())) } - fn call(&mut self, req: HandshakeRequest) -> Self::Future { + fn call(&mut self, req: HandshakeRequest) -> Self::Future { let HandshakeRequest { - tcp_stream, + data_stream, connected_addr, connection_tracker, } = req; @@ -735,7 +798,7 @@ where // As a defence-in-depth against hangs, every send() or next() on peer_conn // should be wrapped in a timeout. let mut peer_conn = Framed::new( - tcp_stream, + data_stream, Codec::builder() .for_network(config.network) .with_metrics_addr_label(connected_addr.get_transient_addr_label()) diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index 74e8a5a2..5a992db4 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -13,7 +13,7 @@ use futures::{ }; use rand::seq::SliceRandom; use tokio::{ - net::TcpListener, + net::{TcpListener, TcpStream}, sync::broadcast, time::{sleep, Instant}, }; @@ -482,7 +482,8 @@ async fn accept_inbound_connections( peerset_tx: futures::channel::mpsc::Sender, ) -> Result<(), BoxError> where - S: Service + Clone, + S: Service, Response = peer::Client, Error = BoxError> + + Clone, S::Future: Send + 'static, { let mut active_inbound_connections = ActiveConnectionCounter::new_counter(); @@ -534,7 +535,7 @@ where // Construct a handshake future but do not drive it yet.... let handshake = handshaker.call(HandshakeRequest { - tcp_stream, + data_stream: tcp_stream, connected_addr, connection_tracker, }); diff --git a/zebra-network/src/peer_set/initialize/tests/vectors.rs b/zebra-network/src/peer_set/initialize/tests/vectors.rs index cd33a20e..f1a1e66f 100644 --- a/zebra-network/src/peer_set/initialize/tests/vectors.rs +++ b/zebra-network/src/peer_set/initialize/tests/vectors.rs @@ -725,24 +725,25 @@ async fn listener_peer_limit_one_handshake_ok_then_drop() { return; } - let success_disconnect_inbound_handshaker = service_fn(|req: HandshakeRequest| async move { - let HandshakeRequest { - tcp_stream, - connected_addr: _, - connection_tracker, - } = req; + let success_disconnect_inbound_handshaker = + service_fn(|req: HandshakeRequest| async move { + let HandshakeRequest { + data_stream: tcp_stream, + connected_addr: _, + connection_tracker, + } = req; - let (fake_client, _harness) = ClientTestHarness::build().finish(); + let (fake_client, _harness) = ClientTestHarness::build().finish(); - // Actually close the connection. - std::mem::drop(connection_tracker); - std::mem::drop(tcp_stream); + // Actually close the connection. + std::mem::drop(connection_tracker); + std::mem::drop(tcp_stream); - // Give the crawler time to get the message. - tokio::task::yield_now().await; + // Give the crawler time to get the message. + tokio::task::yield_now().await; - Ok(fake_client) - }); + Ok(fake_client) + }); let (config, mut peerset_rx) = spawn_inbound_listener_with_peer_limit(1, success_disconnect_inbound_handshaker).await; @@ -791,25 +792,26 @@ async fn listener_peer_limit_one_handshake_ok_stay_open() { let (peer_tracker_tx, mut peer_tracker_rx) = mpsc::unbounded(); - let success_stay_open_inbound_handshaker = service_fn(move |req: HandshakeRequest| { - let peer_tracker_tx = peer_tracker_tx.clone(); - async move { - let HandshakeRequest { - tcp_stream, - connected_addr: _, - connection_tracker, - } = req; + let success_stay_open_inbound_handshaker = + service_fn(move |req: HandshakeRequest| { + let peer_tracker_tx = peer_tracker_tx.clone(); + async move { + let HandshakeRequest { + data_stream: tcp_stream, + connected_addr: _, + connection_tracker, + } = req; - let (fake_client, _harness) = ClientTestHarness::build().finish(); + let (fake_client, _harness) = ClientTestHarness::build().finish(); - // Make the connection staying open. - peer_tracker_tx - .unbounded_send((tcp_stream, connection_tracker)) - .expect("unexpected error sending to unbounded channel"); + // Make the connection staying open. + peer_tracker_tx + .unbounded_send((tcp_stream, connection_tracker)) + .expect("unexpected error sending to unbounded channel"); - Ok(fake_client) - } - }); + Ok(fake_client) + } + }); let (config, mut peerset_rx) = spawn_inbound_listener_with_peer_limit(1, success_stay_open_inbound_handshaker).await; @@ -913,24 +915,25 @@ async fn listener_peer_limit_default_handshake_ok_then_drop() { return; } - let success_disconnect_inbound_handshaker = service_fn(|req: HandshakeRequest| async move { - let HandshakeRequest { - tcp_stream, - connected_addr: _, - connection_tracker, - } = req; + let success_disconnect_inbound_handshaker = + service_fn(|req: HandshakeRequest| async move { + let HandshakeRequest { + data_stream: tcp_stream, + connected_addr: _, + connection_tracker, + } = req; - let (fake_client, _harness) = ClientTestHarness::build().finish(); + let (fake_client, _harness) = ClientTestHarness::build().finish(); - // Actually close the connection. - std::mem::drop(connection_tracker); - std::mem::drop(tcp_stream); + // Actually close the connection. + std::mem::drop(connection_tracker); + std::mem::drop(tcp_stream); - // Give the crawler time to get the message. - tokio::task::yield_now().await; + // Give the crawler time to get the message. + tokio::task::yield_now().await; - Ok(fake_client) - }); + Ok(fake_client) + }); let (config, mut peerset_rx) = spawn_inbound_listener_with_peer_limit(None, success_disconnect_inbound_handshaker).await; @@ -979,25 +982,26 @@ async fn listener_peer_limit_default_handshake_ok_stay_open() { let (peer_tracker_tx, mut peer_tracker_rx) = mpsc::unbounded(); - let success_stay_open_inbound_handshaker = service_fn(move |req: HandshakeRequest| { - let peer_tracker_tx = peer_tracker_tx.clone(); - async move { - let HandshakeRequest { - tcp_stream, - connected_addr: _, - connection_tracker, - } = req; + let success_stay_open_inbound_handshaker = + service_fn(move |req: HandshakeRequest| { + let peer_tracker_tx = peer_tracker_tx.clone(); + async move { + let HandshakeRequest { + data_stream: tcp_stream, + connected_addr: _, + connection_tracker, + } = req; - let (fake_client, _harness) = ClientTestHarness::build().finish(); + let (fake_client, _harness) = ClientTestHarness::build().finish(); - // Make the connection staying open. - peer_tracker_tx - .unbounded_send((tcp_stream, connection_tracker)) - .expect("unexpected error sending to unbounded channel"); + // Make the connection staying open. + peer_tracker_tx + .unbounded_send((tcp_stream, connection_tracker)) + .expect("unexpected error sending to unbounded channel"); - Ok(fake_client) - } - }); + Ok(fake_client) + } + }); let (config, mut peerset_rx) = spawn_inbound_listener_with_peer_limit(None, success_stay_open_inbound_handshaker).await; @@ -1353,7 +1357,7 @@ async fn spawn_inbound_listener_with_peer_limit( listen_handshaker: S, ) -> (Config, mpsc::Receiver) where - S: Service + S: Service, Response = peer::Client, Error = BoxError> + Clone + Send + 'static,