deps: use Tower 0.4 from git instead of 0.3.1.

This addresses at least three pain points:

- we were affected by bugs that were already fixed in git, but not in
  the released crate;
- we can use service combinators to transform requests and responses;
- we can use the hedge middleware.

The version in git is still marked as 0.3.1 but these changes will be
part of tower 0.4: https://github.com/tower-rs/tower/issues/431
This commit is contained in:
Henry de Valence 2020-09-21 14:00:20 -07:00
parent 16cc095484
commit 6dd7318d3b
10 changed files with 26 additions and 124 deletions

108
Cargo.lock generated
View File

@ -2730,19 +2730,15 @@ dependencies = [
[[package]] [[package]]
name = "tower" name = "tower"
version = "0.3.1" version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "git+https://github.com/tower-rs/tower?rev=ad348d8#ad348d8ee5106f21b87155d2a0e8e18b90bd6b73"
checksum = "fd3169017c090b7a28fce80abaad0ab4f5566423677c9331bb320af7e49cfe62"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"tower-buffer", "futures-util",
"tower-discover", "pin-project",
"tokio",
"tower-layer", "tower-layer",
"tower-limit",
"tower-load-shed",
"tower-retry",
"tower-service", "tower-service",
"tower-timeout", "tracing",
"tower-util",
] ]
[[package]] [[package]]
@ -2763,31 +2759,6 @@ dependencies = [
"zebra-test", "zebra-test",
] ]
[[package]]
name = "tower-buffer"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4887dc2a65d464c8b9b66e0e4d51c2fd6cf5b3373afc72805b0a60bce00446a"
dependencies = [
"futures-core",
"pin-project",
"tokio",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tower-discover"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f6b5000c3c54d269cc695dff28136bb33d08cbf1df2c48129e143ab65bf3c2a"
dependencies = [
"futures-core",
"pin-project",
"tower-service",
]
[[package]] [[package]]
name = "tower-fallback" name = "tower-fallback"
version = "0.1.0" version = "0.1.0"
@ -2803,61 +2774,7 @@ dependencies = [
[[package]] [[package]]
name = "tower-layer" name = "tower-layer"
version = "0.3.0" version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "git+https://github.com/tower-rs/tower?rev=ad348d8#ad348d8ee5106f21b87155d2a0e8e18b90bd6b73"
checksum = "a35d656f2638b288b33495d1053ea74c40dc05ec0b92084dd71ca5566c4ed1dc"
[[package]]
name = "tower-limit"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92c3040c5dbed68abffaa0d4517ac1a454cd741044f33ab0eefab6b8d1361404"
dependencies = [
"futures-core",
"pin-project",
"tokio",
"tower-layer",
"tower-load",
"tower-service",
]
[[package]]
name = "tower-load"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8cc79fc3afd07492b7966d7efa7c6c50f8ed58d768a6075dd7ae6591c5d2017b"
dependencies = [
"futures-core",
"log",
"pin-project",
"tokio",
"tower-discover",
"tower-service",
]
[[package]]
name = "tower-load-shed"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f021e23900173dc315feb4b6922510dae3e79c689b74c089112066c11f0ae4e"
dependencies = [
"futures-core",
"pin-project",
"tower-layer",
"tower-service",
]
[[package]]
name = "tower-retry"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6727956aaa2f8957d4d9232b308fe8e4e65d99db30f42b225646e86c9b6a952"
dependencies = [
"futures-core",
"pin-project",
"tokio",
"tower-layer",
"tower-service",
]
[[package]] [[package]]
name = "tower-service" name = "tower-service"
@ -2865,18 +2782,6 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e987b6bf443f4b5b3b6f38704195592cca41c5bb7aedd3c3693c7081f8289860" checksum = "e987b6bf443f4b5b3b6f38704195592cca41c5bb7aedd3c3693c7081f8289860"
[[package]]
name = "tower-timeout"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "127b8924b357be938823eaaec0608c482d40add25609481027b96198b2e4b31e"
dependencies = [
"pin-project",
"tokio",
"tower-layer",
"tower-service",
]
[[package]] [[package]]
name = "tower-util" name = "tower-util"
version = "0.3.1" version = "0.3.1"
@ -3320,7 +3225,6 @@ dependencies = [
"tokio", "tokio",
"tokio-util 0.2.0", "tokio-util 0.2.0",
"tower", "tower",
"tower-load",
"tracing", "tracing",
"tracing-error", "tracing-error",
"tracing-futures", "tracing-futures",

View File

@ -19,3 +19,6 @@ panic = "abort"
[profile.release] [profile.release]
panic = "abort" panic = "abort"
[patch.crates-io]
tower = { git = "https://github.com/tower-rs/tower", rev = "ad348d8" }

View File

@ -7,7 +7,7 @@ edition = "2018"
[dependencies] [dependencies]
tokio = { version = "0.2.22", features = ["time", "sync", "stream", "tracing"] } tokio = { version = "0.2.22", features = ["time", "sync", "stream", "tracing"] }
tower = "0.3" tower = { version = "0.3", features = ["util", "buffer"] }
futures-core = "0.3.5" futures-core = "0.3.5"
pin-project = "0.4.23" pin-project = "0.4.23"
tracing = "0.1.19" tracing = "0.1.19"

View File

@ -18,7 +18,7 @@ futures-util = "0.3.5"
metrics = "0.12" metrics = "0.12"
thiserror = "1.0.20" thiserror = "1.0.20"
tokio = { version = "0.2.22", features = ["time", "sync", "stream", "tracing"] } tokio = { version = "0.2.22", features = ["time", "sync", "stream", "tracing"] }
tower = "0.3" tower = { version = "0.3", features = ["timeout", "util", "buffer"] }
tower-util = "0.3" tower-util = "0.3"
tracing = "0.1.19" tracing = "0.1.19"
tracing-futures = "0.2.4" tracing-futures = "0.2.4"

View File

@ -15,10 +15,9 @@ use once_cell::sync::Lazy;
use rand::thread_rng; use rand::thread_rng;
use redjubjub::{batch, *}; use redjubjub::{batch, *};
use tokio::sync::broadcast::{channel, RecvError, Sender}; use tokio::sync::broadcast::{channel, RecvError, Sender};
use tower::Service; use tower::{util::ServiceFn, Service};
use tower_batch::{Batch, BatchControl}; use tower_batch::{Batch, BatchControl};
use tower_fallback::Fallback; use tower_fallback::Fallback;
use tower_util::ServiceFn;
/// Global batch verification context for RedJubjub signatures. /// Global batch verification context for RedJubjub signatures.
/// ///

View File

@ -24,8 +24,7 @@ thiserror = "1"
futures = "0.3" futures = "0.3"
tokio = { version = "0.2.22", features = ["net", "time", "stream", "tracing", "macros"] } tokio = { version = "0.2.22", features = ["net", "time", "stream", "tracing", "macros"] }
tokio-util = { version = "0.2", features = ["codec"] } tokio-util = { version = "0.2", features = ["codec"] }
tower = "0.3" tower = { version = "0.3", features = ["retry", "discover", "load", "load-shed", "timeout", "util", "buffer"] }
tower-load = "0.3"
metrics = "0.12" metrics = "0.12"
tracing = "0.1" tracing = "0.1"

View File

@ -19,13 +19,9 @@ use tokio::{
sync::broadcast, sync::broadcast,
}; };
use tower::{ use tower::{
buffer::Buffer, buffer::Buffer, discover::Change, layer::Layer, load::peak_ewma::PeakEwmaDiscover,
discover::{Change, ServiceStream}, util::BoxService, Service, ServiceExt,
layer::Layer,
util::BoxService,
Service, ServiceExt,
}; };
use tower_load::{peak_ewma::PeakEwmaDiscover, NoInstrument};
use crate::{ use crate::{
constants, peer, timestamp_collector::TimestampCollector, AddressBook, BoxError, Config, constants, peer, timestamp_collector::TimestampCollector, AddressBook, BoxError, Config,
@ -106,14 +102,12 @@ where
// Connect the rx end to a PeerSet, wrapping new peers in load instruments. // Connect the rx end to a PeerSet, wrapping new peers in load instruments.
let peer_set = PeerSet::new( let peer_set = PeerSet::new(
PeakEwmaDiscover::new( PeakEwmaDiscover::new(
ServiceStream::new( // Discover interprets an error as stream termination,
// ServiceStream interprets an error as stream termination, // so discard any errored connections...
// so discard any errored connections... peerset_rx.filter(|result| future::ready(result.is_ok())),
peerset_rx.filter(|result| future::ready(result.is_ok())),
),
constants::EWMA_DEFAULT_RTT, constants::EWMA_DEFAULT_RTT,
constants::EWMA_DECAY_TIME, constants::EWMA_DECAY_TIME,
NoInstrument, tower::load::CompleteOnResponse::default(),
), ),
demand_tx.clone(), demand_tx.clone(),
handle_rx, handle_rx,

View File

@ -20,9 +20,9 @@ use tokio::sync::{broadcast, oneshot::error::TryRecvError};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tower::{ use tower::{
discover::{Change, Discover}, discover::{Change, Discover},
load::Load,
Service, Service,
}; };
use tower_load::Load;
use crate::{ use crate::{
protocol::{ protocol::{
@ -131,7 +131,10 @@ where
fn poll_discover(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), BoxError>> { fn poll_discover(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), BoxError>> {
use futures::ready; use futures::ready;
loop { loop {
match ready!(Pin::new(&mut self.discover).poll_discover(cx)).map_err(Into::into)? { match ready!(Pin::new(&mut self.discover).poll_discover(cx))
.ok_or("discovery stream closed")?
.map_err(Into::into)?
{
Change::Remove(key) => { Change::Remove(key) => {
trace!(?key, "got Change::Remove from Discover"); trace!(?key, "got Change::Remove from Discover");
self.remove(&key); self.remove(&key);

View File

@ -19,7 +19,7 @@ sled = "0.34.4"
futures = "0.3.5" futures = "0.3.5"
metrics = "0.12" metrics = "0.12"
tower = "0.3.1" tower = { version = "0.3.1", features = ["buffer", "util"] }
tracing = "0.1" tracing = "0.1"
tracing-error = "0.1.2" tracing-error = "0.1.2"
thiserror = "1.0.20" thiserror = "1.0.20"

View File

@ -10,7 +10,7 @@ edition = "2018"
[dependencies] [dependencies]
hex = "0.4.2" hex = "0.4.2"
lazy_static = "1.4.0" lazy_static = "1.4.0"
tower = "0.3.1" tower = { version = "0.3.1", features = ["util"] }
futures = "0.3.5" futures = "0.3.5"
color-eyre = "0.5.3" color-eyre = "0.5.3"
tracing = "0.1.19" tracing = "0.1.19"