fix errors
This commit is contained in:
parent
e645e3bf0c
commit
527f4d39ed
|
|
@ -123,7 +123,8 @@ impl Service<zn::Request> for Inbound {
|
||||||
|
|
||||||
// Clean up completed download tasks
|
// Clean up completed download tasks
|
||||||
if let Some(downloads) = self.downloads.as_mut() {
|
if let Some(downloads) = self.downloads.as_mut() {
|
||||||
while let Poll::Ready(Some(_)) = Pin::new(downloads).poll_next(cx) {}
|
let downloads = Pin::new(downloads);
|
||||||
|
while let Poll::Ready(Some(_)) = downloads.poll_next(cx) {}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now report readiness based on readiness of the inner services, if they're available.
|
// Now report readiness based on readiness of the inner services, if they're available.
|
||||||
|
|
@ -200,24 +201,10 @@ impl Service<zn::Request> for Inbound {
|
||||||
async { Ok(zn::Response::Nil) }.boxed()
|
async { Ok(zn::Response::Nil) }.boxed()
|
||||||
}
|
}
|
||||||
zn::Request::AdvertiseBlock(hash) => {
|
zn::Request::AdvertiseBlock(hash) => {
|
||||||
// this sucks
|
if let Some(downloads) = self.downloads.as_mut() {
|
||||||
let mut downloads = self.downloads.take().unwrap();
|
downloads.download_and_verify(hash);
|
||||||
self.downloads = Some(Downloads::new(
|
|
||||||
self.outbound.as_ref().unwrap().clone(),
|
|
||||||
self.verifier.clone(),
|
|
||||||
self.state.clone(),
|
|
||||||
));
|
|
||||||
|
|
||||||
async move {
|
|
||||||
if downloads.download_and_verify(hash).await? {
|
|
||||||
tracing::info!(?hash, "queued download and verification of gossiped block");
|
|
||||||
} else {
|
|
||||||
tracing::debug!(?hash, "gossiped block already queued or verified");
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(zn::Response::Nil)
|
|
||||||
}
|
}
|
||||||
.boxed()
|
async { Ok(zn::Response::Nil) }.boxed()
|
||||||
}
|
}
|
||||||
zn::Request::MempoolTransactions => {
|
zn::Request::MempoolTransactions => {
|
||||||
debug!("ignoring unimplemented request");
|
debug!("ignoring unimplemented request");
|
||||||
|
|
|
||||||
|
|
@ -108,21 +108,23 @@ where
|
||||||
#[instrument(skip(self))]
|
#[instrument(skip(self))]
|
||||||
pub fn download_and_verify(&mut self, hash: block::Hash) -> bool {
|
pub fn download_and_verify(&mut self, hash: block::Hash) -> bool {
|
||||||
if self.cancel_handles.contains_key(&hash) {
|
if self.cancel_handles.contains_key(&hash) {
|
||||||
|
tracing::debug!("hash already queued for download");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// This oneshot is used to signal cancellation to the download task.
|
// This oneshot is used to signal cancellation to the download task.
|
||||||
let (cancel_tx, mut cancel_rx) = oneshot::channel::<()>();
|
let (cancel_tx, mut cancel_rx) = oneshot::channel::<()>();
|
||||||
|
|
||||||
let mut state = self.state.clone();
|
let state = self.state.clone();
|
||||||
let mut network = self.network.clone();
|
let network = self.network.clone();
|
||||||
let mut verifier = self.verifier.clone();
|
let verifier = self.verifier.clone();
|
||||||
|
|
||||||
let fut = async move {
|
let fut = async move {
|
||||||
// Check if the block is already in the state.
|
// Check if the block is already in the state.
|
||||||
match state.oneshot(zs::Request::Depth(hash.into())).await {
|
match state.oneshot(zs::Request::Depth(hash.into())).await {
|
||||||
Ok(zs::Response::Depth(None)) => Ok(()),
|
Ok(zs::Response::Depth(None)) => Ok(()),
|
||||||
Ok(zs::Response::Depth(Some(_))) => Err("already present".into()),
|
Ok(zs::Response::Depth(Some(_))) => Err("already present".into()),
|
||||||
|
Ok(_) => unreachable!("wrong response"),
|
||||||
Err(e) => Err(e),
|
Err(e) => Err(e),
|
||||||
}?;
|
}?;
|
||||||
|
|
||||||
|
|
@ -142,8 +144,8 @@ where
|
||||||
verifier.oneshot(block).await
|
verifier.oneshot(block).await
|
||||||
}
|
}
|
||||||
.map_ok(|hash| {
|
.map_ok(|hash| {
|
||||||
tracing::info!(?hash, "verified advertised block");
|
|
||||||
metrics::counter!("gossip.verified.block.count", 1);
|
metrics::counter!("gossip.verified.block.count", 1);
|
||||||
|
hash
|
||||||
})
|
})
|
||||||
// Tack the hash onto the error so we can remove the cancel handle
|
// Tack the hash onto the error so we can remove the cancel handle
|
||||||
// on failure as well as on success.
|
// on failure as well as on success.
|
||||||
|
|
@ -155,6 +157,7 @@ where
|
||||||
_ = &mut cancel_rx => {
|
_ = &mut cancel_rx => {
|
||||||
tracing::trace!("task cancelled prior to completion");
|
tracing::trace!("task cancelled prior to completion");
|
||||||
metrics::counter!("gossip.cancelled.count", 1);
|
metrics::counter!("gossip.cancelled.count", 1);
|
||||||
|
Err(("canceled".into(), hash))
|
||||||
}
|
}
|
||||||
verification = fut => verification,
|
verification = fut => verification,
|
||||||
}
|
}
|
||||||
|
|
@ -167,6 +170,7 @@ where
|
||||||
"blocks are only queued once"
|
"blocks are only queued once"
|
||||||
);
|
);
|
||||||
|
|
||||||
|
tracing::debug!("queued hash for download");
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue