Restore PeerSet code in connect stub.

This was commented out because making the PeerConnector take a TcpStream
meant that the PeerConnector futures couldn't be constructed in the same
way as before, but now that the PeerConnector is Buffer'able, we can
just clone a buffered copy.
This commit is contained in:
Henry de Valence 2019-10-15 17:07:59 -07:00
parent ff27334e81
commit 17b93523bd
2 changed files with 22 additions and 7 deletions

View File

@ -77,14 +77,19 @@ impl ConnectCmd {
let config = app_config().network.clone();
let collector = TimestampCollector::new();
let mut pc = PeerConnector::new(config, Network::Mainnet, node, &collector);
let mut pc = Buffer::new(
PeerConnector::new(config, Network::Mainnet, node, &collector),
1,
);
let tcp_stream = TcpStream::connect(self.addr).await?;
pc.ready()
.await?;
.await
.map_err(failure::Error::from_boxed_compat)?;
let mut client = pc
.call((tcp_stream, self.addr))
.await?;
.await
.map_err(failure::Error::from_boxed_compat)?;
client.ready().await?;
@ -97,7 +102,6 @@ impl ConnectCmd {
"got addresses from first connected peer"
);
/*
use failure::Error;
use futures::{
future,
@ -113,8 +117,13 @@ impl ConnectCmd {
addrs
.into_iter()
.map(|meta| {
let svc_fut = pc.call(meta.addr);
async move { Ok::<_, Error>(Change::Insert(meta.addr, svc_fut.await?)) }
let mut pc = pc.clone();
async move {
let stream = TcpStream::connect(meta.addr).await?;
pc.ready().await?;
let client = pc.call((stream, meta.addr)).await?;
Ok::<_, BoxedStdError>(Change::Insert(meta.addr, client))
}
})
.collect::<FuturesUnordered<_>>()
// Discard any errored connections...
@ -156,7 +165,6 @@ impl ConnectCmd {
// empty loop ensures we don't exit the application,
// and this is throwaway code
}
*/
Ok(())
}

View File

@ -10,3 +10,10 @@ pub use abscissa_core::{Application, Command, Runnable};
// These are disabled because we use tracing.
// Logging macros
//pub use abscissa_core::log::{debug, error, info, log, log_enabled, trace, warn};
/// Type alias to make working with tower traits easier.
///
/// Note: the 'static lifetime bound means that the *type* cannot have any
/// non-'static lifetimes, (e.g., when a type contains a borrow and is
/// parameterized by 'a), *not* that the object itself has 'static lifetime.
pub(crate) type BoxedStdError = Box<dyn std::error::Error + Send + Sync + 'static>;