diff --git a/zebrad/src/components/inbound.rs b/zebrad/src/components/inbound.rs index 7f1f1e56..786f10b1 100644 --- a/zebrad/src/components/inbound.rs +++ b/zebrad/src/components/inbound.rs @@ -7,7 +7,7 @@ use std::{ use futures::{ future::{FutureExt, TryFutureExt}, - stream::TryStreamExt, + stream::{Stream, TryStreamExt}, }; use tokio::sync::oneshot; use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt}; diff --git a/zebrad/src/components/inbound/downloads.rs b/zebrad/src/components/inbound/downloads.rs index 05ba567d..be312865 100644 --- a/zebrad/src/components/inbound/downloads.rs +++ b/zebrad/src/components/inbound/downloads.rs @@ -79,11 +79,11 @@ where impl Downloads where - ZN: Service + Send + 'static, + ZN: Service + Send + Clone + 'static, ZN::Future: Send, ZV: Service, Response = block::Hash, Error = BoxError> + Send + Clone + 'static, ZV::Future: Send, - ZS: Service + Send + 'static, + ZS: Service + Send + Clone + 'static, ZS::Future: Send, { /// Initialize a new download stream with the provided `network` and @@ -104,95 +104,61 @@ where /// Queue a block for download and verification. /// - /// This method waits for the network to become ready, and returns an error - /// only if the network service fails. It returns immediately after queuing - /// the request. + /// Returns true if the block was newly queued, and false if it was already queued. #[instrument(skip(self))] - pub async fn download_and_verify(&mut self, hash: block::Hash) -> Result { + pub fn download_and_verify(&mut self, hash: block::Hash) -> bool { if self.cancel_handles.contains_key(&hash) { - return Ok(false); + return false; } - // Check if the block is already in the state. - let block_state_req = self - .state - .ready_and() - .await - .map_err(|e| eyre!(e))? - .call(zs::Request::Block(zs::HashOrHeight::from(hash))) - .await; - - let block_in_chain = match block_state_req { - Ok(zs::Response::Block(block)) => block, - _ => None, - }; - - // Block already in state, get out. - if block_in_chain.is_some() { - return Ok(false); - } - - // We construct the block requests sequentially, waiting for the peer - // set to be ready to process each request. This ensures that we start - // block downloads in the order we want them (though they may resolve - // out of order), and it means that we respect backpressure. Otherwise, - // if we waited for readiness and did the service call in the spawned - // tasks, all of the spawned tasks would race each other waiting for the - // network to become ready. - tracing::debug!("waiting to request block"); - let block_req = self - .network - .ready_and() - .await - .map_err(|e| eyre!(e))? - .call(zn::Request::BlocksByHash(std::iter::once(hash).collect())); - tracing::debug!("requested block"); - // This oneshot is used to signal cancellation to the download task. let (cancel_tx, mut cancel_rx) = oneshot::channel::<()>(); + let mut state = self.state.clone(); + let mut network = self.network.clone(); let mut verifier = self.verifier.clone(); - let task = tokio::spawn( - async move { - let rsp = tokio::select! { - _ = &mut cancel_rx => { - tracing::trace!("task cancelled prior to download completion"); - metrics::counter!("gossip.cancelled.download.count", 1); - return Err("canceled block_fetch_verify".into()) - } - rsp = block_req => rsp?, - }; - let block = if let zn::Response::Blocks(blocks) = rsp { - blocks - .into_iter() - .next() - .expect("successful response has the block in it") - } else { - unreachable!("wrong response to block request"); - }; - metrics::counter!("gossip.downloaded.block.count", 1); + let fut = async move { + // Check if the block is already in the state. + match state.oneshot(zs::Request::Depth(hash.into())).await { + Ok(zs::Response::Depth(None)) => Ok(()), + Ok(zs::Response::Depth(Some(_))) => Err("already present".into()), + Err(e) => Err(e), + }?; - let rsp = verifier.ready_and().await?.call(block); - let verification = tokio::select! { - _ = &mut cancel_rx => { - tracing::trace!("task cancelled prior to verification"); - metrics::counter!("gossip.cancelled.verify.count", 1); - return Err("canceled block_fetch_verify".into()) - } - verification = rsp => verification, - }; - if verification.is_ok() { - metrics::counter!("sync.verified.block.count", 1); + let block = if let zn::Response::Blocks(blocks) = network + .oneshot(zn::Request::BlocksByHash(std::iter::once(hash).collect())) + .await? + { + blocks + .into_iter() + .next() + .expect("successful response has the block in it") + } else { + unreachable!("wrong response to block request"); + }; + metrics::counter!("gossip.downloaded.block.count", 1); + + verifier.oneshot(block).await + } + .map_ok(|hash| { + tracing::info!(?hash, "verified advertised block"); + metrics::counter!("gossip.verified.block.count", 1); + }) + // Tack the hash onto the error so we can remove the cancel handle + // on failure as well as on success. + .map_err(move |e| (e, hash)) + .in_current_span(); + + let task = tokio::spawn(async move { + tokio::select! { + _ = &mut cancel_rx => { + tracing::trace!("task cancelled prior to completion"); + metrics::counter!("gossip.cancelled.count", 1); } - - verification + verification = fut => verification, } - .in_current_span() - // Tack the hash onto the error so we can remove the cancel handle - // on failure as well as on success. - .map_err(move |e| (e, hash)), - ); + }); self.pending.push(task); // XXX replace with expect_none when stable @@ -201,6 +167,6 @@ where "blocks are only queued once" ); - Ok(true) + true } }