Upgrade tokio, futures, hyper to released versions.

This commit is contained in:
Henry de Valence 2019-12-13 14:25:14 -08:00 committed by Deirdre Connolly
parent 4315235d52
commit 2965187b91
18 changed files with 366 additions and 581 deletions

796
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -9,7 +9,3 @@ members = [
"zebra-client", "zebra-client",
"zebrad", "zebrad",
] ]
[patch.crates-io]
# Required because we pull tower-load from git
tower = { git = "https://github.com/tower-rs/tower", branch = "v0.3.x" }

View File

@ -2,7 +2,7 @@
use std::{fmt, io}; use std::{fmt, io};
use byteorder::{BigEndian, LittleEndian, ReadBytesExt, WriteBytesExt}; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use chrono::{DateTime, TimeZone, Utc}; use chrono::{DateTime, TimeZone, Utc};
use hex; use hex;

View File

@ -10,7 +10,7 @@ edition = "2018"
[dependencies] [dependencies]
bitflags = "1.2" bitflags = "1.2"
byteorder = "1.3" byteorder = "1.3"
bytes = "0.4" bytes = "0.5"
chrono = "0.4" chrono = "0.4"
hex = "0.4" hex = "0.4"
# indexmap has rayon support for parallel iteration, # indexmap has rayon support for parallel iteration,
@ -21,13 +21,14 @@ rand = "0.7"
serde = { version = "1", features = ["serde_derive"] } serde = { version = "1", features = ["serde_derive"] }
thiserror = "1" thiserror = "1"
tokio = "=0.2.0-alpha.6" tokio = { version = "0.2", features = ["net", "time", "stream"] }
futures-preview = "=0.3.0-alpha.19" tokio-util = { version = "0.2", features = ["codec"] }
futures = "0.3"
tracing = "0.1" tracing = "0.1"
tracing-futures = { version = "0.1", features = ["tokio-alpha"], default-features = false } tracing-futures = "0.2"
tower = "=0.3.0-alpha.2" tower = { git = "https://github.com/tower-rs/tower" }
tower-load = { git = "https://github.com/tower-rs/tower", branch = "v0.3.x"} tower-load = { git = "https://github.com/tower-rs/tower" }
zebra-chain = { path = "../zebra-chain" } zebra-chain = { path = "../zebra-chain" }

View File

@ -1,4 +1,5 @@
use std::{ use std::{
future::Future,
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
}; };
@ -7,7 +8,6 @@ use futures::{
channel::{mpsc, oneshot}, channel::{mpsc, oneshot},
future, ready, future, ready,
}; };
use tokio::prelude::*;
use tower::Service; use tower::Service;
use crate::protocol::internal::{Request, Response}; use crate::protocol::internal::{Request, Response};

View File

@ -1,10 +1,12 @@
use std::{ use std::{
future::Future,
net::SocketAddr, net::SocketAddr,
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
}; };
use tokio::{net::TcpStream, prelude::*}; use futures::prelude::*;
use tokio::net::TcpStream;
use tower::{discover::Change, Service, ServiceExt}; use tower::{discover::Change, Service, ServiceExt};
use crate::{BoxedStdError, Request, Response}; use crate::{BoxedStdError, Request, Response};

View File

@ -1,5 +1,6 @@
use std::{ use std::{
collections::HashSet, collections::HashSet,
future::Future,
net::SocketAddr, net::SocketAddr,
pin::Pin, pin::Pin,
sync::{Arc, Mutex}, sync::{Arc, Mutex},
@ -7,8 +8,9 @@ use std::{
}; };
use chrono::Utc; use chrono::Utc;
use futures::channel::mpsc; use futures::{channel::mpsc, prelude::*};
use tokio::{codec::Framed, net::TcpStream, prelude::*, timer::Interval}; use tokio::net::TcpStream;
use tokio_util::codec::Framed;
use tower::Service; use tower::Service;
use tracing::{span, Level}; use tracing::{span, Level};
use tracing_futures::Instrument; use tracing_futures::Instrument;
@ -235,10 +237,10 @@ where
let mut server_tx = server_tx; let mut server_tx = server_tx;
let mut interval_stream = Interval::new_interval(constants::HEARTBEAT_INTERVAL); let mut interval_stream = tokio::time::interval(constants::HEARTBEAT_INTERVAL);
loop { loop {
interval_stream.next().await; interval_stream.tick().await;
// We discard the server handle because our // We discard the server handle because our
// heartbeat `Ping`s are a special case, and we // heartbeat `Ping`s are a special case, and we

View File

@ -3,12 +3,10 @@ use std::sync::Arc;
use futures::{ use futures::{
channel::{mpsc, oneshot}, channel::{mpsc, oneshot},
future::{self, Either}, future::{self, Either},
prelude::*,
stream::Stream, stream::Stream,
}; };
use tokio::{ use tokio::time::{delay_for, Delay};
prelude::*,
timer::{delay_for, Delay},
};
use tower::Service; use tower::Service;
use zebra_chain::{serialization::SerializationError, transaction::TransactionHash}; use zebra_chain::{serialization::SerializationError, transaction::TransactionHash};

View File

@ -239,9 +239,8 @@ where
} }
} }
use tokio::timer::Interval;
let mut connect_signal = futures::stream::select( let mut connect_signal = futures::stream::select(
Interval::new_interval(new_peer_interval).map(|_| ()), tokio::time::interval(new_peer_interval).map(|_| ()),
demand_signal, demand_signal,
); );
while let Some(()) = connect_signal.next().await { while let Some(()) = connect_signal.next().await {

View File

@ -1,6 +1,7 @@
use std::{ use std::{
collections::HashMap, collections::HashMap,
fmt::Debug, fmt::Debug,
future::Future,
marker::PhantomData, marker::PhantomData,
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
@ -8,10 +9,10 @@ use std::{
use futures::{ use futures::{
channel::{mpsc, oneshot}, channel::{mpsc, oneshot},
prelude::*,
stream::FuturesUnordered, stream::FuturesUnordered,
}; };
use indexmap::IndexMap; use indexmap::IndexMap;
use tokio::prelude::*;
use tower::{ use tower::{
discover::{Change, Discover}, discover::{Change, Discover},
Service, Service,

View File

@ -1,13 +1,13 @@
// Adapted from tower-balance // Adapted from tower-balance
use std::{ use std::{
future::Future,
marker::PhantomData, marker::PhantomData,
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
}; };
use futures::{channel::oneshot, ready}; use futures::{channel::oneshot, ready};
use tokio::prelude::*;
use tower::Service; use tower::Service;
/// A Future that becomes satisfied when an `S`-typed service is ready. /// A Future that becomes satisfied when an `S`-typed service is ready.

View File

@ -6,7 +6,7 @@ use std::io::{Cursor, Read, Write};
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use bytes::BytesMut; use bytes::BytesMut;
use chrono::{TimeZone, Utc}; use chrono::{TimeZone, Utc};
use tokio::codec::{Decoder, Encoder}; use tokio_util::codec::{Decoder, Encoder};
use zebra_chain::{ use zebra_chain::{
block::{Block, BlockHeader, BlockHeaderHash}, block::{Block, BlockHeader, BlockHeaderHash},
@ -641,6 +641,7 @@ impl Codec {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use futures::prelude::*;
use tokio::runtime::Runtime; use tokio::runtime::Runtime;
#[test] #[test]
@ -649,7 +650,7 @@ mod tests {
let services = PeerServices::NODE_NETWORK; let services = PeerServices::NODE_NETWORK;
let timestamp = Utc.timestamp(1568000000, 0); let timestamp = Utc.timestamp(1568000000, 0);
let rt = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let v = Message::Version { let v = Message::Version {
version: crate::constants::CURRENT_VERSION, version: crate::constants::CURRENT_VERSION,
@ -669,8 +670,8 @@ mod tests {
relay: true, relay: true,
}; };
use tokio::codec::{FramedRead, FramedWrite};
use tokio::prelude::*; use tokio::prelude::*;
use tokio_util::codec::{FramedRead, FramedWrite};
let v_bytes = rt.block_on(async { let v_bytes = rt.block_on(async {
let mut bytes = Vec::new(); let mut bytes = Vec::new();
{ {
@ -695,7 +696,7 @@ mod tests {
#[test] #[test]
fn filterload_message_round_trip() { fn filterload_message_round_trip() {
let rt = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let v = Message::FilterLoad { let v = Message::FilterLoad {
filter: Filter(vec![0; 35999]), filter: Filter(vec![0; 35999]),
@ -704,8 +705,8 @@ mod tests {
flags: 0, flags: 0,
}; };
use tokio::codec::{FramedRead, FramedWrite};
use tokio::prelude::*; use tokio::prelude::*;
use tokio_util::codec::{FramedRead, FramedWrite};
let v_bytes = rt.block_on(async { let v_bytes = rt.block_on(async {
let mut bytes = Vec::new(); let mut bytes = Vec::new();
{ {
@ -730,7 +731,7 @@ mod tests {
#[test] #[test]
fn filterload_message_too_large_round_trip() { fn filterload_message_too_large_round_trip() {
let rt = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let v = Message::FilterLoad { let v = Message::FilterLoad {
filter: Filter(vec![0; 40000]), filter: Filter(vec![0; 40000]),
@ -739,8 +740,8 @@ mod tests {
flags: 0, flags: 0,
}; };
use tokio::codec::{FramedRead, FramedWrite};
use tokio::prelude::*; use tokio::prelude::*;
use tokio_util::codec::{FramedRead, FramedWrite};
let v_bytes = rt.block_on(async { let v_bytes = rt.block_on(async {
let mut bytes = Vec::new(); let mut bytes = Vec::new();
{ {

View File

@ -2,8 +2,7 @@
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use futures::channel::mpsc; use futures::{channel::mpsc, prelude::*};
use tokio::prelude::*;
use crate::{types::MetaAddr, AddressBook}; use crate::{types::MetaAddr, AddressBook};

View File

@ -15,18 +15,17 @@ lazy_static = "1"
serde = { version = "1", features = ["serde_derive"] } serde = { version = "1", features = ["serde_derive"] }
toml = "0.5" toml = "0.5"
tokio = "=0.2.0-alpha.6" tokio = { version = "0.2", features = ["time", "rt-threaded", "stream"] }
futures-preview = "=0.3.0-alpha.19" futures = "0.3"
tracing = "0.1" tracing = "0.1"
tracing-futures = { version = "0.1", features = ["tokio-alpha"], default-features = false } tracing-futures = "0.2"
tracing-subscriber = "0.1" tracing-subscriber = "0.1"
tracing-log = "0.1" tracing-log = "0.1"
hyper = "=0.13.0-alpha.4" hyper = "0.13.0"
tower = "=0.3.0-alpha.2" tower = { git = "https://github.com/tower-rs/tower" }
tower-load = { git = "https://github.com/tower-rs/tower", branch = "v0.3.x"}
zebra-chain = { path = "../zebra-chain" } zebra-chain = { path = "../zebra-chain" }
zebra-network = { path = "../zebra-network" } zebra-network = { path = "../zebra-network" }

View File

@ -4,6 +4,8 @@ use crate::prelude::*;
use abscissa_core::{Command, Options, Runnable}; use abscissa_core::{Command, Options, Runnable};
use futures::prelude::*;
/// `connect` subcommand /// `connect` subcommand
#[derive(Command, Debug, Options)] #[derive(Command, Debug, Options)]
pub struct ConnectCmd { pub struct ConnectCmd {
@ -21,10 +23,10 @@ impl Runnable for ConnectCmd {
info!(connect.addr = ?self.addr); info!(connect.addr = ?self.addr);
use crate::components::tokio::TokioComponent; use crate::components::tokio::TokioComponent;
let _ = app_reader() let _ = app_writer()
.state() .state_mut()
.components .components
.get_downcast_ref::<TokioComponent>() .get_downcast_mut::<TokioComponent>()
.expect("TokioComponent should be available") .expect("TokioComponent should be available")
.rt .rt
.block_on(self.connect()); .block_on(self.connect());
@ -95,7 +97,7 @@ impl ConnectCmd {
tail.extend_from_slice(&addrs[addrs.len() - 5..]); tail.extend_from_slice(&addrs[addrs.len() - 5..]);
info!(addrs.first = ?head, addrs.last = ?tail); info!(addrs.first = ?head, addrs.last = ?tail);
let eternity = tokio::future::pending::<()>(); let eternity = future::pending::<()>();
eternity.await; eternity.await;
Ok(()) Ok(())

View File

@ -7,14 +7,13 @@ use std::{
task::{Context, Poll}, task::{Context, Poll},
}; };
use abscissa_core::{config, Command, FrameworkError, Options, Runnable}; use abscissa_core::{Command, Options, Runnable};
use futures::channel::oneshot; use futures::{channel::oneshot, prelude::*};
use tower::{buffer::Buffer, Service, ServiceExt}; use tower::{buffer::Buffer, Service, ServiceExt};
use tracing::{span, Level};
use zebra_network::{AddressBook, BoxedStdError, Request, Response}; use zebra_network::{AddressBook, BoxedStdError, Request, Response};
use crate::{config::ZebradConfig, prelude::*}; use crate::prelude::*;
/// Whether our `SeedService` is poll_ready or not. /// Whether our `SeedService` is poll_ready or not.
#[derive(Debug)] #[derive(Debug)]
@ -110,10 +109,10 @@ impl Runnable for SeedCmd {
fn run(&self) { fn run(&self) {
use crate::components::tokio::TokioComponent; use crate::components::tokio::TokioComponent;
let _ = app_reader() let _ = app_writer()
.state() .state_mut()
.components .components
.get_downcast_ref::<TokioComponent>() .get_downcast_mut::<TokioComponent>()
.expect("TokioComponent should be available") .expect("TokioComponent should be available")
.rt .rt
.block_on(self.seed()); .block_on(self.seed());
@ -145,12 +144,11 @@ impl SeedCmd {
#[cfg(dos)] #[cfg(dos)]
use std::time::Duration; use std::time::Duration;
use tokio::timer::Interval;
#[cfg(dos)] #[cfg(dos)]
// Fire GetPeers requests at ourselves, for testing. // Fire GetPeers requests at ourselves, for testing.
tokio::spawn(async move { tokio::spawn(async move {
let mut interval_stream = Interval::new_interval(Duration::from_secs(1)); let mut interval_stream = tokio::time::interval(Duration::from_secs(1));
loop { loop {
interval_stream.next().await; interval_stream.next().await;
@ -159,7 +157,7 @@ impl SeedCmd {
} }
}); });
let eternity = tokio::future::pending::<()>(); let eternity = future::pending::<()>();
eternity.await; eternity.await;
Ok(()) Ok(())

View File

@ -5,8 +5,11 @@
use crate::prelude::*; use crate::prelude::*;
use crate::config::ZebradConfig; use crate::config::ZebradConfig;
use abscissa_core::{config, Command, FrameworkError, Options, Runnable}; use abscissa_core::{config, Command, FrameworkError, Options, Runnable};
use futures::prelude::*;
/// `start` subcommand /// `start` subcommand
/// ///
/// The `Options` proc macro generates an option parser based on the struct /// The `Options` proc macro generates an option parser based on the struct
@ -37,13 +40,13 @@ impl Runnable for StartCmd {
use crate::components::tokio::TokioComponent; use crate::components::tokio::TokioComponent;
app_reader() app_writer()
.state() .state_mut()
.components .components
.get_downcast_ref::<TokioComponent>() .get_downcast_mut::<TokioComponent>()
.expect("TokioComponent should be available") .expect("TokioComponent should be available")
.rt .rt
.block_on(tokio::future::pending::<()>()); .block_on(future::pending::<()>());
} }
} }

View File

@ -81,17 +81,19 @@ impl TracingEndpoint {
.parse() .parse()
.expect("Hardcoded address should be parseable"); .expect("Hardcoded address should be parseable");
let server = match Server::try_bind(&addr) { tokio_component.rt.spawn(async move {
Ok(s) => s, // try_bind uses the tokio runtime, so we
Err(e) => { // need to construct it inside the task.
error!("Could not open tracing endpoint listener"); let server = match Server::try_bind(&addr) {
error!("Error: {}", e); Ok(s) => s,
return Ok(()); Err(e) => {
error!("Could not open tracing endpoint listener");
error!("Error: {}", e);
return;
}
} }
} .serve(service);
.serve(service);
tokio_component.rt.spawn(async {
if let Err(e) = server.await { if let Err(e) = server.await {
error!("Server error: {}", e); error!("Server error: {}", e);
} }
@ -101,11 +103,10 @@ impl TracingEndpoint {
} }
} }
fn reload_filter_from_chunk<S: Subscriber>( fn reload_filter_from_bytes<S: Subscriber>(
handle: Handle<EnvFilter, S>, handle: Handle<EnvFilter, S>,
chunk: hyper::Chunk, bytes: hyper::body::Bytes,
) -> Result<(), String> { ) -> Result<(), String> {
let bytes = chunk.into_bytes();
let body = std::str::from_utf8(bytes.as_ref()).map_err(|e| format!("{}", e))?; let body = std::str::from_utf8(bytes.as_ref()).map_err(|e| format!("{}", e))?;
trace!(request.body = ?body); trace!(request.body = ?body);
let filter = body.parse::<EnvFilter>().map_err(|e| format!("{}", e))?; let filter = body.parse::<EnvFilter>().map_err(|e| format!("{}", e))?;
@ -137,13 +138,8 @@ curl -X POST localhost:3000/filter -d "zebrad=trace"
)), )),
(&Method::POST, "/filter") => { (&Method::POST, "/filter") => {
// Combine all HTTP request chunks into one // Combine all HTTP request chunks into one
//let whole_chunk = req.into_body().try_concat().await?; let body_bytes = hyper::body::to_bytes(req.into_body()).await?;
// XXX try_concat extension trait is not applying for some reason, match reload_filter_from_bytes(handle, body_bytes) {
// just pull one chunk
let mut body = req.into_body();
let maybe_chunk = body.next().await;
let whole_chunk = maybe_chunk.unwrap()?;
match reload_filter_from_chunk(handle, whole_chunk) {
Err(e) => Response::builder() Err(e) => Response::builder()
.status(StatusCode::BAD_REQUEST) .status(StatusCode::BAD_REQUEST)
.body(Body::from(e)) .body(Body::from(e))