From add94c1c4537b012751e05f93fd2fb5e2bc3c762 Mon Sep 17 00:00:00 2001 From: Henry de Valence Date: Thu, 19 Nov 2020 11:54:20 -0800 Subject: [PATCH] deps: move to tokio 0.3, tower 0.4 This change is mostly mechanical, with the exception of the changes to the `tower-batch` middleware. This middleware was adapted from `tower::buffer`, and the `tower::buffer` code was changed to implement its own bounded queue, because Tokio 0.3 removed the `mpsc::Sender::poll_send` method. See https://github.com/tower-rs/tower/commit/ddc64e8d4d4d499adef569e284f317288edd6513 for more context on the Tower changes. To match Tower as closely as possible in order to be able to upstream `tower-batch`, those changes are copied from `tower::Buffer` to `tower-batch`. --- Cargo.lock | 251 ++++++++++++------ Cargo.toml | 2 +- tower-batch/Cargo.toml | 6 +- tower-batch/src/lib.rs | 1 + tower-batch/src/message.rs | 1 + tower-batch/src/semaphore.rs | 78 ++++++ tower-batch/src/service.rs | 79 ++++-- tower-batch/src/worker.rs | 85 +++--- tower-batch/tests/ed25519.rs | 2 +- tower-fallback/Cargo.toml | 4 +- zebra-consensus/Cargo.toml | 6 +- zebra-consensus/src/primitives/redjubjub.rs | 2 +- zebra-network/Cargo.toml | 8 +- zebra-network/src/isolated.rs | 2 +- zebra-network/src/peer/connection.rs | 6 +- zebra-network/src/peer_set/initialize.rs | 2 +- .../src/peer_set/inventory_registry.rs | 31 ++- zebra-network/src/protocol/external/codec.rs | 13 +- zebra-state/Cargo.toml | 6 +- zebra-test/Cargo.toml | 4 +- zebrad/Cargo.toml | 4 +- zebrad/src/components/sync.rs | 6 +- zebrad/src/components/tokio.rs | 3 +- 23 files changed, 401 insertions(+), 201 deletions(-) create mode 100644 tower-batch/src/semaphore.rs diff --git a/Cargo.lock b/Cargo.lock index d7b3f4d5..489294e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -378,6 +378,12 @@ version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38" +[[package]] +name = "bytes" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0dcbc35f504eb6fc275a6d20e4ebcda18cf50d40ba6fabff8c711fa16cb3b16" + [[package]] name = "canonical-path" version = "2.0.2" @@ -463,6 +469,15 @@ dependencies = [ "bitflags", ] +[[package]] +name = "cloudabi" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4344512281c643ae7638bbabc3af17a11307803ec8f0fcad9fae512a8bf36467" +dependencies = [ + "bitflags", +] + [[package]] name = "color-backtrace" version = "0.3.0" @@ -1046,7 +1061,7 @@ dependencies = [ "futures-sink", "futures-task", "memchr", - "pin-project 1.0.1", + "pin-project 1.0.2", "pin-utils", "proc-macro-hack", "proc-macro-nested", @@ -1166,7 +1181,7 @@ version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e4728fd124914ad25e99e3d15a9361a879f6620f63cb56bbb08f95abb97a535" dependencies = [ - "bytes", + "bytes 0.5.6", "fnv", "futures-core", "futures-sink", @@ -1174,7 +1189,7 @@ dependencies = [ "http", "indexmap", "slab", - "tokio", + "tokio 0.2.23", "tokio-util 0.3.1", "tracing", "tracing-futures", @@ -1230,7 +1245,7 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28d569972648b2c512421b5f2a405ad6ac9666547189d0c5477a3f200f3e02f9" dependencies = [ - "bytes", + "bytes 0.5.6", "fnv", "itoa", ] @@ -1241,7 +1256,7 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13d5ff830006f7646652e057693569bfe0d51760c0085a071769d142a205111b" dependencies = [ - "bytes", + "bytes 0.5.6", "http", ] @@ -1272,7 +1287,7 @@ version = "0.13.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f6ad767baac13b44d4529fcf58ba2cd0995e36e7b435bc5b039de6f47e880dbf" dependencies = [ - "bytes", + "bytes 0.5.6", "futures-channel", "futures-core", "futures-util", @@ -1282,9 +1297,9 @@ dependencies = [ "httparse", "httpdate", "itoa", - "pin-project 1.0.1", + "pin-project 1.0.2", "socket2", - "tokio", + "tokio 0.2.23", "tower-service", "tracing", "want", @@ -1362,6 +1377,15 @@ dependencies = [ "str_stack", ] +[[package]] +name = "instant" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61124eeebbd69b8190558df225adf7e4caafce0d743919e5d6b19652314ec5ec" +dependencies = [ + "cfg-if 1.0.0", +] + [[package]] name = "iovec" version = "0.1.4" @@ -1466,6 +1490,15 @@ dependencies = [ "scopeguard", ] +[[package]] +name = "lock_api" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd96ffd135b2fd7b973ac026d28085defbe8983df057ced3eb4f2130b0831312" +dependencies = [ + "scopeguard", +] + [[package]] name = "log" version = "0.4.11" @@ -1558,7 +1591,7 @@ checksum = "f3fc63816bd5f8bde5eb31ce471f9633adc69ba1c55b44191b4d5fc7e263e8ab" dependencies = [ "log", "metrics-core", - "tokio", + "tokio 0.2.23", ] [[package]] @@ -1614,15 +1647,15 @@ dependencies = [ "metrics-observer-prometheus", "metrics-observer-yaml", "metrics-util", - "parking_lot", + "parking_lot 0.10.2", "quanta", ] [[package]] name = "metrics-util" -version = "0.3.1" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d11f8090a8886339f9468a04eeea0711e4cf27538b134014664308041307a1c5" +checksum = "277619f040719a5a23d75724586d5601286e8fa53451cfaaca3b8c627c2c2378" dependencies = [ "crossbeam-epoch 0.8.2", "serde", @@ -1658,26 +1691,16 @@ dependencies = [ ] [[package]] -name = "mio-named-pipes" -version = "0.1.7" +name = "mio" +version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0840c1c50fd55e521b247f949c241c9997709f23bd7f023b9762cd561e935656" +checksum = "f33bc887064ef1fd66020c9adfc45bb9f33d75a42096c81e7c56c65b75dd1a8b" dependencies = [ - "log", - "mio", - "miow 0.3.5", - "winapi 0.3.9", -] - -[[package]] -name = "mio-uds" -version = "0.6.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "afcb699eb26d4332647cc848492bbc15eafb26f08d0304550d5aa1f612e066f0" -dependencies = [ - "iovec", "libc", - "mio", + "log", + "miow 0.3.6", + "ntapi", + "winapi 0.3.9", ] [[package]] @@ -1694,9 +1717,9 @@ dependencies = [ [[package]] name = "miow" -version = "0.3.5" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07b88fb9795d4d36d62a012dfbf49a8f5cf12751f36d31a9dbe66d528e58979e" +checksum = "5a33c1b55807fbed163481b5ba66db4b2fa6cde694a5027be10fb724206c5897" dependencies = [ "socket2", "winapi 0.3.9", @@ -1739,6 +1762,15 @@ dependencies = [ "version_check 0.9.2", ] +[[package]] +name = "ntapi" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f6bb902e437b6d86e03cce10a7e2af662292c5dfef23b65899ea3ac9354ad44" +dependencies = [ + "winapi 0.3.9", +] + [[package]] name = "num-format" version = "0.4.0" @@ -1844,8 +1876,19 @@ version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3a704eb390aafdc107b0e392f56a82b668e3a71366993b5340f5833fd62505e" dependencies = [ - "lock_api", - "parking_lot_core", + "lock_api 0.3.4", + "parking_lot_core 0.7.2", +] + +[[package]] +name = "parking_lot" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d7744ac029df22dca6284efe4e898991d28e3085c706c972bcd7da4a27a15eb" +dependencies = [ + "instant", + "lock_api 0.4.2", + "parking_lot_core 0.8.0", ] [[package]] @@ -1855,10 +1898,25 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d58c7c768d4ba344e3e8d72518ac13e259d7c7ade24167003b8488e10b6740a3" dependencies = [ "cfg-if 0.1.10", - "cloudabi", + "cloudabi 0.0.3", "libc", "redox_syscall", - "smallvec 1.4.2", + "smallvec 1.5.0", + "winapi 0.3.9", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c361aa727dd08437f2f1447be8b59a33b0edd15e0fcee698f935613d9efbca9b" +dependencies = [ + "cfg-if 0.1.10", + "cloudabi 0.1.0", + "instant", + "libc", + "redox_syscall", + "smallvec 1.5.0", "winapi 0.3.9", ] @@ -1885,11 +1943,11 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.0.1" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee41d838744f60d959d7074e3afb6b35c7456d0f61cad38a24e35e6553f73841" +checksum = "9ccc2237c2c489783abd8c4c80e5450fc0e98644555b1364da68cc29aa151ca7" dependencies = [ - "pin-project-internal 1.0.1", + "pin-project-internal 1.0.2", ] [[package]] @@ -1905,9 +1963,9 @@ dependencies = [ [[package]] name = "pin-project-internal" -version = "1.0.1" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81a4ffa594b66bff340084d4081df649a7dc049ac8d7fc458d8e628bfbbb2f86" +checksum = "f8e8d2bf0b23038a4424865103a4df472855692821aab4e4f5c3312d461d9e5f" dependencies = [ "proc-macro2 1.0.24", "quote 1.0.7", @@ -1920,6 +1978,12 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c917123afa01924fc84bb20c4c03f004d9c38e5127e3c039bbf7f4b9c76a2f6b" +[[package]] +name = "pin-project-lite" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b063f57ec186e6140e2b8b6921e5f1bd89c7356dda5b33acc5401203ca6131c" + [[package]] name = "pin-utils" version = "0.1.0" @@ -2559,9 +2623,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.4.2" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbee7696b84bbf3d89a1c2eccff0850e3047ed46bfcd2e92c29a2d074d57e252" +checksum = "7acad6f34eb9e8a259d3283d1e8c1d34d7415943d4895f65cc73813c7396fc85" [[package]] name = "socket2" @@ -2717,9 +2781,9 @@ dependencies = [ [[package]] name = "termcolor" -version = "1.1.0" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb6bfa289a4d7c5766392812c0a1f4c1ba45afa1ad47803c11e1f407d846d75f" +checksum = "2dfed899f0eb03f32ee8c6a0aabdb8a7949659e3466561fc0adf54e26d88c5f4" dependencies = [ "winapi-util", ] @@ -2794,18 +2858,33 @@ version = "0.2.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a6d7ad61edd59bfcc7e80dababf0f4aed2e6d5e0ba1659356ae889752dfc12ff" dependencies = [ - "bytes", + "bytes 0.5.6", "fnv", "futures-core", "iovec", "lazy_static", + "memchr", + "mio 0.6.22", + "pin-project-lite 0.1.11", + "slab", +] + +[[package]] +name = "tokio" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dfe2523e6fa84ddf5e688151d4e5fddc51678de9752c6512a24714c23818d61" +dependencies = [ + "autocfg", + "bytes 0.6.0", + "futures-core", + "lazy_static", "libc", "memchr", - "mio", - "mio-named-pipes", - "mio-uds", + "mio 0.7.6", "num_cpus", - "pin-project-lite", + "parking_lot 0.11.1", + "pin-project-lite 0.2.0", "signal-hook-registry", "slab", "tokio-macros", @@ -2815,41 +2894,41 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "0.2.6" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e44da00bfc73a25f814cd8d7e57a68a5c31b74b3152a0a1d1f590c97ed06265a" +checksum = "21d30fdbb5dc2d8f91049691aa1a9d4d4ae422a21c334ce8936e5886d30c5c45" dependencies = [ "proc-macro2 1.0.24", "quote 1.0.7", "syn 1.0.48", ] -[[package]] -name = "tokio-util" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "571da51182ec208780505a32528fc5512a8fe1443ab960b3f2f3ef093cd16930" -dependencies = [ - "bytes", - "futures-core", - "futures-sink", - "log", - "pin-project-lite", - "tokio", -] - [[package]] name = "tokio-util" version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be8242891f2b6cbef26a2d7e8605133c2c554cd35b3e4948ea892d6d68436499" dependencies = [ - "bytes", + "bytes 0.5.6", "futures-core", "futures-sink", "log", - "pin-project-lite", - "tokio", + "pin-project-lite 0.1.11", + "tokio 0.2.23", +] + +[[package]] +name = "tokio-util" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73af76301319bcacf00d26d3c75534ef248dcad7ceaf36d93ec902453c3b1706" +dependencies = [ + "bytes 0.6.0", + "futures-core", + "futures-sink", + "log", + "pin-project-lite 0.1.11", + "tokio 0.3.4", ] [[package]] @@ -2863,14 +2942,14 @@ dependencies = [ [[package]] name = "tower" -version = "0.3.1" -source = "git+https://github.com/tower-rs/tower?rev=1a84543#1a845433177b31ebc6daff765ee81468c42d6676" +version = "0.4.0" +source = "git+https://github.com/tower-rs/tower?rev=5e1e07744820028877654c336a3b9fe057bf46f1#5e1e07744820028877654c336a3b9fe057bf46f1" dependencies = [ "futures-core", "futures-util", "hdrhistogram", - "pin-project 0.4.27", - "tokio", + "pin-project 1.0.2", + "tokio 0.3.4", "tower-layer", "tower-service", "tracing", @@ -2886,7 +2965,7 @@ dependencies = [ "futures-core", "pin-project 0.4.27", "rand 0.7.3", - "tokio", + "tokio 0.3.4", "tower", "tower-fallback", "tracing", @@ -2900,7 +2979,7 @@ version = "0.1.0" dependencies = [ "futures-core", "pin-project 0.4.27", - "tokio", + "tokio 0.3.4", "tower", "tracing", "zebra-test", @@ -2909,7 +2988,7 @@ dependencies = [ [[package]] name = "tower-layer" version = "0.3.0" -source = "git+https://github.com/tower-rs/tower?rev=1a84543#1a845433177b31ebc6daff765ee81468c42d6676" +source = "git+https://github.com/tower-rs/tower?rev=5e1e07744820028877654c336a3b9fe057bf46f1#5e1e07744820028877654c336a3b9fe057bf46f1" [[package]] name = "tower-service" @@ -2937,7 +3016,7 @@ checksum = "b0987850db3733619253fe60e17cb59b82d37c7e6c0236bb81e4d6b87c879f27" dependencies = [ "cfg-if 0.1.10", "log", - "pin-project-lite", + "pin-project-lite 0.1.11", "tracing-attributes", "tracing-core", ] @@ -3045,7 +3124,7 @@ dependencies = [ "serde", "serde_json", "sharded-slab", - "smallvec 1.4.2", + "smallvec 1.5.0", "thread_local", "tracing", "tracing-core", @@ -3088,9 +3167,9 @@ dependencies = [ [[package]] name = "unicode-normalization" -version = "0.1.14" +version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7f98e67a4d84f730d343392f9bfff7d21e3fca562b9cb7a43b768350beeddc6" +checksum = "a13e63ab62dbe32aeee58d1c5408d35c36c392bba5d9d3142287219721afe606" dependencies = [ "tinyvec", ] @@ -3340,7 +3419,7 @@ dependencies = [ "serde", "spandoc", "thiserror", - "tokio", + "tokio 0.3.4", "tower", "tower-batch", "tower-fallback", @@ -3361,7 +3440,7 @@ version = "3.0.0-alpha.0" dependencies = [ "bitflags", "byteorder", - "bytes", + "bytes 0.6.0", "chrono", "futures", "hex", @@ -3373,8 +3452,8 @@ dependencies = [ "rand 0.7.3", "serde", "thiserror", - "tokio", - "tokio-util 0.2.0", + "tokio 0.3.4", + "tokio-util 0.5.0", "tower", "tracing", "tracing-error", @@ -3419,7 +3498,7 @@ dependencies = [ "spandoc", "tempdir", "thiserror", - "tokio", + "tokio 0.3.4", "tower", "tracing", "tracing-error", @@ -3442,7 +3521,7 @@ dependencies = [ "spandoc", "tempdir", "thiserror", - "tokio", + "tokio 0.3.4", "tower", "tracing", "tracing-error", @@ -3484,7 +3563,7 @@ dependencies = [ "serde", "tempdir", "thiserror", - "tokio", + "tokio 0.3.4", "toml", "tower", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 13fcf98f..1eb6e23e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,4 +21,4 @@ panic = "abort" panic = "abort" [patch.crates-io] -tower = { git = "https://github.com/tower-rs/tower", rev = "1a84543" } +tower = { git = "https://github.com/tower-rs/tower", rev = "5e1e07744820028877654c336a3b9fe057bf46f1" } diff --git a/tower-batch/Cargo.toml b/tower-batch/Cargo.toml index 8a65493c..c5063db4 100644 --- a/tower-batch/Cargo.toml +++ b/tower-batch/Cargo.toml @@ -6,8 +6,8 @@ license = "MIT" edition = "2018" [dependencies] -tokio = { version = "0.2.22", features = ["time", "sync", "stream", "tracing"] } -tower = { version = "0.3", features = ["util", "buffer"] } +tokio = { version = "0.3", features = ["time", "sync", "stream", "tracing"] } +tower = { version = "0.4", features = ["util", "buffer"] } futures-core = "0.3.6" pin-project = "0.4.27" tracing = "0.1.21" @@ -17,7 +17,7 @@ futures = "0.3.7" [dev-dependencies] ed25519-zebra = "2.1.0" rand = "0.7" -tokio = { version = "0.2", features = ["full"]} +tokio = { version = "0.3", features = ["full"]} tracing = "0.1.21" zebra-test = { path = "../zebra-test/" } tower-fallback = { path = "../tower-fallback/" } diff --git a/tower-batch/src/lib.rs b/tower-batch/src/lib.rs index 9ed70a9e..6eea4fb5 100644 --- a/tower-batch/src/lib.rs +++ b/tower-batch/src/lib.rs @@ -89,6 +89,7 @@ pub mod error; pub mod future; mod layer; mod message; +mod semaphore; mod service; mod worker; diff --git a/tower-batch/src/message.rs b/tower-batch/src/message.rs index dc73a6ad..287076f2 100644 --- a/tower-batch/src/message.rs +++ b/tower-batch/src/message.rs @@ -7,6 +7,7 @@ pub(crate) struct Message { pub(crate) request: Request, pub(crate) tx: Tx, pub(crate) span: tracing::Span, + pub(super) _permit: crate::semaphore::Permit, } /// Response sender diff --git a/tower-batch/src/semaphore.rs b/tower-batch/src/semaphore.rs new file mode 100644 index 00000000..cdab56d0 --- /dev/null +++ b/tower-batch/src/semaphore.rs @@ -0,0 +1,78 @@ +// Copied from tower/src/semaphore.rs +// When/if tower-batch is upstreamed, delete this file +// and use the common tower semaphore implementation + +pub(crate) use self::sync::OwnedSemaphorePermit as Permit; +use futures_core::ready; +use std::{ + fmt, + future::Future, + mem, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; +use tokio::sync; + +#[derive(Debug)] +pub(crate) struct Semaphore { + semaphore: Arc, + state: State, +} + +enum State { + Waiting(Pin + Send + Sync + 'static>>), + Ready(Permit), + Empty, +} + +impl Semaphore { + pub(crate) fn new(permits: usize) -> Self { + Self { + semaphore: Arc::new(sync::Semaphore::new(permits)), + state: State::Empty, + } + } + + pub(crate) fn poll_acquire(&mut self, cx: &mut Context<'_>) -> Poll<()> { + loop { + self.state = match self.state { + State::Ready(_) => return Poll::Ready(()), + State::Waiting(ref mut fut) => { + let permit = ready!(Pin::new(fut).poll(cx)); + State::Ready(permit) + } + State::Empty => State::Waiting(Box::pin(self.semaphore.clone().acquire_owned())), + }; + } + } + + pub(crate) fn take_permit(&mut self) -> Option { + if let State::Ready(permit) = mem::replace(&mut self.state, State::Empty) { + return Some(permit); + } + None + } +} + +impl Clone for Semaphore { + fn clone(&self) -> Self { + Self { + semaphore: self.semaphore.clone(), + state: State::Empty, + } + } +} + +impl fmt::Debug for State { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + State::Waiting(_) => f + .debug_tuple("State::Waiting") + .field(&format_args!("...")) + .finish(), + State::Ready(ref r) => f.debug_tuple("State::Ready").field(&r).finish(), + State::Empty => f.debug_tuple("State::Empty").finish(), + } + } +} diff --git a/tower-batch/src/service.rs b/tower-batch/src/service.rs index 28c4e713..6ddeb215 100644 --- a/tower-batch/src/service.rs +++ b/tower-batch/src/service.rs @@ -5,6 +5,7 @@ use super::{ BatchControl, }; +use crate::semaphore::Semaphore; use futures_core::ready; use std::task::{Context, Poll}; use tokio::sync::{mpsc, oneshot}; @@ -18,7 +19,19 @@ pub struct Batch where T: Service>, { - tx: mpsc::Sender>, + // Note: this actually _is_ bounded, but rather than using Tokio's unbounded + // channel, we use tokio's semaphore separately to implement the bound. + tx: mpsc::UnboundedSender>, + // When the buffer's channel is full, we want to exert backpressure in + // `poll_ready`, so that callers such as load balancers could choose to call + // another service rather than waiting for buffer capacity. + // + // Unfortunately, this can't be done easily using Tokio's bounded MPSC + // channel, because it doesn't expose a polling-based interface, only an + // `async fn ready`, which borrows the sender. Therefore, we implement our + // own bounded MPSC on top of the unbounded channel, using a semaphore to + // limit how many items are in the channel. + semaphore: Semaphore, handle: Handle, } @@ -45,10 +58,16 @@ where Request: Send + 'static, { // XXX(hdevalence): is this bound good - let (tx, rx) = mpsc::channel(1); + let bound = 1; + let (tx, rx) = mpsc::unbounded_channel(); let (handle, worker) = Worker::new(service, rx, max_items, max_latency); tokio::spawn(worker.run()); - Batch { tx, handle } + let semaphore = Semaphore::new(bound); + Batch { + tx, + handle, + semaphore, + } } fn get_worker_error(&self) -> crate::BoxError { @@ -66,40 +85,43 @@ where type Future = ResponseFuture; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - // If the inner service has errored, then we error here. - if ready!(self.tx.poll_ready(cx)).is_err() { - Poll::Ready(Err(self.get_worker_error())) - } else { - Poll::Ready(Ok(())) + // First, check if the worker is still alive. + if self.tx.is_closed() { + // If the inner service has errored, then we error here. + return Poll::Ready(Err(self.get_worker_error())); } + + // Then, poll to acquire a semaphore permit. If we acquire a permit, + // then there's enough buffer capacity to send a new request. Otherwise, + // we need to wait for capacity. + ready!(self.semaphore.poll_acquire(cx)); + + Poll::Ready(Ok(())) } fn call(&mut self, request: Request) -> Self::Future { - // TODO: - // ideally we'd poll_ready again here so we don't allocate the oneshot - // if the try_send is about to fail, but sadly we can't call poll_ready - // outside of task context. - let (tx, rx) = oneshot::channel(); + tracing::trace!("sending request to buffer worker"); + let _permit = self + .semaphore + .take_permit() + .expect("buffer full; poll_ready must be called first"); // get the current Span so that we can explicitly propagate it to the worker // if we didn't do this, events on the worker related to this span wouldn't be counted // towards that span since the worker would have no way of entering it. let span = tracing::Span::current(); - tracing::trace!(parent: &span, "sending request to batch worker"); - match self.tx.try_send(Message { request, span, tx }) { - Err(mpsc::error::TrySendError::Closed(_)) => { - ResponseFuture::failed(self.get_worker_error()) - } - Err(mpsc::error::TrySendError::Full(_)) => { - // When `mpsc::Sender::poll_ready` returns `Ready`, a slot - // in the channel is reserved for the handle. Other `Sender` - // handles may not send a message using that slot. This - // guarantees capacity for `request`. - // - // Given this, the only way to hit this code path is if - // `poll_ready` has not been called & `Ready` returned. - panic!("buffer full; poll_ready must be called first"); - } + + // If we've made it here, then a semaphore permit has already been + // acquired, so we can freely allocate a oneshot. + let (tx, rx) = oneshot::channel(); + + match self.tx.send(Message { + request, + span, + tx, + _permit, + }) { + Err(_) => ResponseFuture::failed(self.get_worker_error()), Ok(_) => ResponseFuture::new(rx), } } @@ -113,6 +135,7 @@ where Self { tx: self.tx.clone(), handle: self.handle.clone(), + semaphore: self.semaphore.clone(), } } } diff --git a/tower-batch/src/worker.rs b/tower-batch/src/worker.rs index 8d4ab367..ff793951 100644 --- a/tower-batch/src/worker.rs +++ b/tower-batch/src/worker.rs @@ -1,18 +1,20 @@ +use std::sync::{Arc, Mutex}; + +use futures::future::TryFutureExt; +use pin_project::pin_project; +use tokio::{ + stream::StreamExt, + sync::mpsc, + time::{sleep, Sleep}, +}; +use tower::{Service, ServiceExt}; +use tracing_futures::Instrument; + use super::{ error::{Closed, ServiceError}, message::{self, Message}, BatchControl, }; -use futures::future::TryFutureExt; -use pin_project::pin_project; -use std::sync::{Arc, Mutex}; -use tokio::{ - stream::StreamExt, - sync::mpsc, - time::{delay_for, Delay}, -}; -use tower::{Service, ServiceExt}; -use tracing_futures::Instrument; /// Task that handles processing the buffer. This type should not be used /// directly, instead `Buffer` requires an `Executor` that can accept this task. @@ -28,7 +30,7 @@ where T: Service>, T::Error: Into, { - rx: mpsc::Receiver>, + rx: mpsc::UnboundedReceiver>, service: T, failed: Option, handle: Handle, @@ -49,7 +51,7 @@ where { pub(crate) fn new( service: T, - rx: mpsc::Receiver>, + rx: mpsc::UnboundedReceiver>, max_items: usize, max_latency: std::time::Duration, ) -> (Handle, Worker) { @@ -103,12 +105,11 @@ where } pub async fn run(mut self) { - use futures::future::Either::{Left, Right}; // The timer is started when the first entry of a new batch is // submitted, so that the batch latency of all entries is at most // self.max_latency. However, we don't keep the timer running unless // there is a pending request to prevent wakeups on idle services. - let mut timer: Option = None; + let mut timer: Option = None; let mut pending_items = 0usize; loop { match timer { @@ -120,40 +121,42 @@ where // Apply the provided span to request processing .instrument(span) .await; - timer = Some(delay_for(self.max_latency)); + timer = Some(sleep(self.max_latency)); pending_items = 1; } // No more messages, ever. None => return, }, - Some(delay) => { + Some(mut sleep) => { // Wait on either a new message or the batch timer. - match futures::future::select(self.rx.next(), delay).await { - Left((Some(msg), delay)) => { - let span = msg.span; - self.process_req(msg.request, msg.tx) - // Apply the provided span to request processing. - .instrument(span) - .await; - pending_items += 1; - // Check whether we have too many pending items. - if pending_items >= self.max_items { - // XXX(hdevalence): what span should instrument this? - self.flush_service().await; - // Now we have an empty batch. - timer = None; - pending_items = 0; - } else { - // The timer is still running, set it back! - timer = Some(delay); + tokio::select! { + maybe_msg = self.rx.recv() => match maybe_msg { + Some(msg) => { + let span = msg.span; + self.process_req(msg.request, msg.tx) + // Apply the provided span to request processing. + .instrument(span) + .await; + pending_items += 1; + // Check whether we have too many pending items. + if pending_items >= self.max_items { + // XXX(hdevalence): what span should instrument this? + self.flush_service().await; + // Now we have an empty batch. + timer = None; + pending_items = 0; + } else { + // The timer is still running, set it back! + timer = Some(sleep); + } } - } - // No more messages, ever. - Left((None, _delay)) => { - return; - } - // The batch timer elapsed. - Right(((), _next)) => { + None => { + // No more messages, ever. + return; + } + }, + () = &mut sleep => { + // The batch timer elapsed. // XXX(hdevalence): what span should instrument this? self.flush_service().await; timer = None; diff --git a/tower-batch/tests/ed25519.rs b/tower-batch/tests/ed25519.rs index a5fb5288..947c0736 100644 --- a/tower-batch/tests/ed25519.rs +++ b/tower-batch/tests/ed25519.rs @@ -10,7 +10,7 @@ use color_eyre::{eyre::eyre, Report}; use ed25519_zebra::*; use futures::stream::{FuturesUnordered, StreamExt}; use rand::thread_rng; -use tokio::sync::broadcast::{channel, RecvError, Sender}; +use tokio::sync::broadcast::{channel, error::RecvError, Sender}; use tower::{Service, ServiceExt}; use tower_batch::{Batch, BatchControl}; use tower_fallback::Fallback; diff --git a/tower-fallback/Cargo.toml b/tower-fallback/Cargo.toml index eb1cb3a9..67f45258 100644 --- a/tower-fallback/Cargo.toml +++ b/tower-fallback/Cargo.toml @@ -6,11 +6,11 @@ license = "MIT" edition = "2018" [dependencies] -tower = "0.3" +tower = "0.4" futures-core = "0.3.6" pin-project = "0.4.27" tracing = "0.1" [dev-dependencies] zebra-test = { path = "../zebra-test/" } -tokio = { version = "0.2", features = ["full"]} +tokio = { version = "0.3", features = ["full"]} diff --git a/zebra-consensus/Cargo.toml b/zebra-consensus/Cargo.toml index eeae83d0..db7facb8 100644 --- a/zebra-consensus/Cargo.toml +++ b/zebra-consensus/Cargo.toml @@ -19,8 +19,8 @@ futures = "0.3.7" futures-util = "0.3.6" metrics = "0.12" thiserror = "1.0.22" -tokio = { version = "0.2.22", features = ["time", "sync", "stream", "tracing"] } -tower = { version = "0.3", features = ["timeout", "util", "buffer"] } +tokio = { version = "0.3", features = ["time", "sync", "stream", "tracing"] } +tower = { version = "0.4", features = ["timeout", "util", "buffer"] } tower-util = "0.3" tracing = "0.1.21" tracing-futures = "0.2.4" @@ -34,7 +34,7 @@ zebra-script = { path = "../zebra-script" } [dev-dependencies] rand = "0.7" spandoc = "0.2" -tokio = { version = "0.2", features = ["full"] } +tokio = { version = "0.3", features = ["full"] } tracing-error = "0.1.2" tracing-subscriber = "0.2.15" diff --git a/zebra-consensus/src/primitives/redjubjub.rs b/zebra-consensus/src/primitives/redjubjub.rs index ac9021f2..472312ae 100644 --- a/zebra-consensus/src/primitives/redjubjub.rs +++ b/zebra-consensus/src/primitives/redjubjub.rs @@ -14,7 +14,7 @@ use futures::future::{ready, Ready}; use once_cell::sync::Lazy; use rand::thread_rng; -use tokio::sync::broadcast::{channel, RecvError, Sender}; +use tokio::sync::broadcast::{channel, error::RecvError, Sender}; use tower::{util::ServiceFn, Service}; use tower_batch::{Batch, BatchControl}; use tower_fallback::Fallback; diff --git a/zebra-network/Cargo.toml b/zebra-network/Cargo.toml index 19bca8f5..3a21a702 100644 --- a/zebra-network/Cargo.toml +++ b/zebra-network/Cargo.toml @@ -10,7 +10,7 @@ edition = "2018" [dependencies] bitflags = "1.2" byteorder = "1.3" -bytes = "0.5" +bytes = "0.6" chrono = "0.4" hex = "0.4" # indexmap has rayon support for parallel iteration, @@ -22,9 +22,9 @@ serde = { version = "1", features = ["serde_derive"] } thiserror = "1" futures = "0.3" -tokio = { version = "0.2.22", features = ["net", "time", "stream", "tracing", "macros"] } -tokio-util = { version = "0.2", features = ["codec"] } -tower = { version = "0.3", features = ["retry", "discover", "load", "load-shed", "timeout", "util", "buffer"] } +tokio = { version = "0.3", features = ["net", "time", "stream", "tracing", "macros"] } +tokio-util = { version = "0.5", features = ["codec"] } +tower = { version = "0.4", features = ["retry", "discover", "load", "load-shed", "timeout", "util", "buffer"] } metrics = "0.12" tracing = "0.1" diff --git a/zebra-network/src/isolated.rs b/zebra-network/src/isolated.rs index 65320b5f..1d6132f2 100644 --- a/zebra-network/src/isolated.rs +++ b/zebra-network/src/isolated.rs @@ -93,7 +93,7 @@ mod tests { use futures::stream::StreamExt; use tokio_util::codec::Framed; - let mut listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let listen_addr = listener.local_addr().unwrap(); let conn = tokio::net::TcpStream::connect(listen_addr).await.unwrap(); diff --git a/zebra-network/src/peer/connection.rs b/zebra-network/src/peer/connection.rs index 48932489..014050e4 100644 --- a/zebra-network/src/peer/connection.rs +++ b/zebra-network/src/peer/connection.rs @@ -23,7 +23,7 @@ use futures::{ prelude::*, stream::Stream, }; -use tokio::time::{delay_for, Delay}; +use tokio::time::{sleep, Sleep}; use tower::Service; use tracing_futures::Instrument; @@ -221,7 +221,7 @@ pub struct Connection { /// A timeout for a client request. This is stored separately from /// State so that we can move the future out of it independently of /// other state handling. - pub(super) request_timer: Option, + pub(super) request_timer: Option, pub(super) svc: S, pub(super) client_rx: mpsc::Receiver, /// A slot for an error shared between the Connection and the Client that uses it. @@ -553,7 +553,7 @@ where } { Ok(new_state) => { self.state = new_state; - self.request_timer = Some(delay_for(constants::REQUEST_TIMEOUT)); + self.request_timer = Some(sleep(constants::REQUEST_TIMEOUT)); } Err(e) => self.fail_with(e), } diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index f679e7b4..ae273e1f 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -211,7 +211,7 @@ where S: Service<(TcpStream, SocketAddr), Response = peer::Client, Error = BoxError> + Clone, S::Future: Send + 'static, { - let mut listener = TcpListener::bind(addr).await?; + let listener = TcpListener::bind(addr).await?; let local_addr = listener.local_addr()?; info!("Opened Zcash protocol endpoint at {}", local_addr); loop { diff --git a/zebra-network/src/peer_set/inventory_registry.rs b/zebra-network/src/peer_set/inventory_registry.rs index da119b9f..5d211e47 100644 --- a/zebra-network/src/peer_set/inventory_registry.rs +++ b/zebra-network/src/peer_set/inventory_registry.rs @@ -1,8 +1,7 @@ //! Inventory Registry Implementation //! //! [RFC]: https://zebra.zfnd.org/dev/rfcs/0003-inventory-tracking.html -use crate::{protocol::external::InventoryHash, BoxError}; -use futures::Stream; + use std::{ collections::{HashMap, HashSet}, net::SocketAddr, @@ -10,17 +9,20 @@ use std::{ task::{Context, Poll}, time::Duration, }; + +use futures::{Stream, StreamExt}; use tokio::{ sync::broadcast, time::{self, Interval}, }; +use crate::{protocol::external::InventoryHash, BoxError}; + /// An Inventory Registry for tracking recent inventory advertisements by peer. /// /// For more details please refer to the [RFC]. /// /// [RFC]: https://zebra.zfnd.org/dev/rfcs/0003-inventory-tracking.html -#[derive(Debug)] pub struct InventoryRegistry { /// Map tracking the inventory advertisements from the current interval /// period @@ -28,18 +30,33 @@ pub struct InventoryRegistry { /// Map tracking inventory advertisements from the previous interval period prev: HashMap>, /// Stream of incoming inventory hashes to register - inv_stream: broadcast::Receiver<(InventoryHash, SocketAddr)>, + inv_stream: Pin< + Box< + dyn Stream> + + Send + + 'static, + >, + >, /// Interval tracking how frequently we should rotate our maps interval: Interval, } +impl std::fmt::Debug for InventoryRegistry { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("InventoryRegistry") + .field("current", &self.current) + .field("prev", &self.prev) + .finish() + } +} + impl InventoryRegistry { /// Returns an Inventory Registry pub fn new(inv_stream: broadcast::Receiver<(InventoryHash, SocketAddr)>) -> Self { Self { current: Default::default(), prev: Default::default(), - inv_stream, + inv_stream: inv_stream.into_stream().boxed(), interval: time::interval(Duration::from_secs(75)), } } @@ -60,7 +77,7 @@ impl InventoryRegistry { /// - rotates HashMaps based on interval events /// - drains the inv_stream channel and registers all advertised inventory pub fn poll_inventory(&mut self, cx: &mut Context<'_>) -> Result<(), BoxError> { - while let Poll::Ready(_) = self.interval.poll_tick(cx) { + while let Poll::Ready(_) = Pin::new(&mut self.interval).poll_next(cx) { self.rotate(); } @@ -80,7 +97,7 @@ impl InventoryRegistry { // rather than propagating it through the peer set's Service::poll_ready // implementation, where reporting a failure means reporting a permanent // failure of the peer set. - use broadcast::RecvError; + use broadcast::error::RecvError; while let Poll::Ready(Some(channel_result)) = Pin::new(&mut self.inv_stream).poll_next(cx) { match channel_result { Ok((hash, addr)) => self.register(hash, addr), diff --git a/zebra-network/src/protocol/external/codec.rs b/zebra-network/src/protocol/external/codec.rs index ac3c56ad..e8925352 100644 --- a/zebra-network/src/protocol/external/codec.rs +++ b/zebra-network/src/protocol/external/codec.rs @@ -104,11 +104,10 @@ impl Builder { // ======== Encoding ========= -impl Encoder for Codec { - type Item = Message; +impl Encoder for Codec { type Error = Error; - fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + fn encode(&mut self, item: Message, dst: &mut BytesMut) -> Result<(), Self::Error> { use Error::Parse; // XXX(HACK): this is inefficient and does an extra allocation. // instead, we should have a size estimator for the message, reserve @@ -587,7 +586,7 @@ mod tests { let services = PeerServices::NODE_NETWORK; let timestamp = Utc.timestamp(1_568_000_000, 0); - let mut rt = Runtime::new().unwrap(); + let rt = Runtime::new().unwrap(); let v = Message::Version { version: crate::constants::CURRENT_VERSION, @@ -634,7 +633,7 @@ mod tests { fn filterload_message_round_trip() { zebra_test::init(); - let mut rt = Runtime::new().unwrap(); + let rt = Runtime::new().unwrap(); let v = Message::FilterLoad { filter: Filter(vec![0; 35999]), @@ -670,7 +669,7 @@ mod tests { fn filterload_message_too_large_round_trip() { zebra_test::init(); - let mut rt = Runtime::new().unwrap(); + let rt = Runtime::new().unwrap(); let v = Message::FilterLoad { filter: Filter(vec![0; 40000]), @@ -706,7 +705,7 @@ mod tests { use zebra_chain::serialization::ZcashDeserializeInto; zebra_test::init(); - let mut rt = Runtime::new().unwrap(); + let rt = Runtime::new().unwrap(); // make tests with a Tx message let tx = zebra_test::vectors::DUMMY_TX1 diff --git a/zebra-state/Cargo.toml b/zebra-state/Cargo.toml index fa7d8ab0..fb6afc68 100644 --- a/zebra-state/Cargo.toml +++ b/zebra-state/Cargo.toml @@ -18,11 +18,11 @@ serde = { version = "1", features = ["serde_derive"] } futures = "0.3.7" metrics = "0.12" -tower = { version = "0.3.1", features = ["buffer", "util"] } +tower = { version = "0.4", features = ["buffer", "util"] } tracing = "0.1" tracing-error = "0.1.2" thiserror = "1.0.22" -tokio = { version = "0.2.22", features = ["sync"] } +tokio = { version = "0.3", features = ["sync"] } displaydoc = "0.1.7" rocksdb = "0.15.0" tempdir = "0.3.7" @@ -34,6 +34,6 @@ zebra-test = { path = "../zebra-test/" } once_cell = "1.5" spandoc = "0.2" tempdir = "0.3.7" -tokio = { version = "0.2.22", features = ["full"] } +tokio = { version = "0.3", features = ["full"] } proptest = "0.10.1" primitive-types = "0.7.3" diff --git a/zebra-test/Cargo.toml b/zebra-test/Cargo.toml index 3394e313..adc40b05 100644 --- a/zebra-test/Cargo.toml +++ b/zebra-test/Cargo.toml @@ -10,7 +10,7 @@ edition = "2018" [dependencies] hex = "0.4.2" lazy_static = "1.4.0" -tower = { version = "0.3.1", features = ["util"] } +tower = { version = "0.4", features = ["util"] } futures = "0.3.7" color-eyre = "0.5.7" tracing = "0.1.21" @@ -25,4 +25,4 @@ proptest = "0.10.1" tempdir = "0.3.7" [dev-dependencies] -tokio = { version = "0.2", features = ["full"] } +tokio = { version = "0.3", features = ["full"] } diff --git a/zebrad/Cargo.toml b/zebrad/Cargo.toml index d88d7d29..49b8452a 100644 --- a/zebrad/Cargo.toml +++ b/zebrad/Cargo.toml @@ -21,8 +21,8 @@ rand = "0.7" hyper = "0.13.9" futures = "0.3" -tokio = { version = "0.2.22", features = ["time", "rt-threaded", "stream", "macros", "tracing", "signal"] } -tower = { version = "0.3", features = ["hedge", "limit"] } +tokio = { version = "0.3", features = ["time", "rt-multi-thread", "stream", "macros", "tracing", "signal"] } +tower = { version = "0.4", features = ["hedge", "limit"] } pin-project = "0.4.23" color-eyre = { version = "0.5.7", features = ["issue-url"] } diff --git a/zebrad/src/components/sync.rs b/zebrad/src/components/sync.rs index d758f1c2..1cbf944c 100644 --- a/zebrad/src/components/sync.rs +++ b/zebrad/src/components/sync.rs @@ -5,7 +5,7 @@ use futures::{ future::FutureExt, stream::{FuturesUnordered, StreamExt}, }; -use tokio::time::delay_for; +use tokio::time::sleep; use tower::{ builder::ServiceBuilder, hedge::Hedge, limit::ConcurrencyLimit, retry::Retry, timeout::Timeout, Service, ServiceExt, @@ -153,7 +153,7 @@ where // due to protocol limitations self.request_genesis().await?; - // Distinguishes a restart from a start, so we don't delay when starting + // Distinguishes a restart from a start, so we don't sleep when starting // the sync process, but we can keep restart logic in one place. let mut started_once = false; @@ -163,7 +163,7 @@ where self.prospective_tips = HashSet::new(); self.downloads.cancel_all(); self.update_metrics(); - delay_for(SYNC_RESTART_TIMEOUT).await; + sleep(SYNC_RESTART_TIMEOUT).await; } else { started_once = true; } diff --git a/zebrad/src/components/tokio.rs b/zebrad/src/components/tokio.rs index c0dc22d0..66131fe6 100644 --- a/zebrad/src/components/tokio.rs +++ b/zebrad/src/components/tokio.rs @@ -22,9 +22,8 @@ impl TokioComponent { pub fn new() -> Result { Ok(Self { rt: Some( - tokio::runtime::Builder::new() + tokio::runtime::Builder::new_multi_thread() .enable_all() - .threaded_scheduler() .build() .unwrap(), ),