sync: create requests sequentially to respect backpressure.
This seems like a better design on principle but also appears to give a much nicer sawtooth pattern of queued blocks in the checkpointer and a much smoother pattern of block requests.
This commit is contained in:
parent
0cd8b7e7bd
commit
b59cfc49b7
|
|
@ -390,16 +390,24 @@ where
|
||||||
async fn request_blocks(&mut self, hashes: Vec<BlockHeaderHash>) -> Result<(), Report> {
|
async fn request_blocks(&mut self, hashes: Vec<BlockHeaderHash>) -> Result<(), Report> {
|
||||||
tracing::debug!(hashes.len = hashes.len(), "requesting blocks");
|
tracing::debug!(hashes.len = hashes.len(), "requesting blocks");
|
||||||
for hash in hashes.into_iter() {
|
for hash in hashes.into_iter() {
|
||||||
let mut retry_peer_set = self.block_network.clone();
|
// We construct the block download requests sequentially, waiting
|
||||||
let mut verifier = self.verifier.clone();
|
// for the peer set to be ready to process each request. This
|
||||||
|
// ensures that we start block downloads in the order we want them
|
||||||
|
// (though they may resolve out of order), and it means that we
|
||||||
|
// respect backpressure. Otherwise, if we waited for readiness and
|
||||||
|
// did the service call in the spawned tasks, all of the spawned
|
||||||
|
// tasks would race each other waiting for the network to become
|
||||||
|
// ready.
|
||||||
|
let block_req = self
|
||||||
|
.block_network
|
||||||
|
.ready_and()
|
||||||
|
.await
|
||||||
|
.map_err(|e| eyre!(e))?
|
||||||
|
.call(zn::Request::BlocksByHash(iter::once(hash).collect()));
|
||||||
let span = tracing::info_span!("block_fetch_verify", ?hash);
|
let span = tracing::info_span!("block_fetch_verify", ?hash);
|
||||||
|
let mut verifier = self.verifier.clone();
|
||||||
let task = tokio::spawn(async move {
|
let task = tokio::spawn(async move {
|
||||||
let block = match retry_peer_set
|
let block = match block_req.await {
|
||||||
.ready_and()
|
|
||||||
.await?
|
|
||||||
.call(zn::Request::BlocksByHash(iter::once(hash).collect()))
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(zn::Response::Blocks(blocks)) => blocks
|
Ok(zn::Response::Blocks(blocks)) => blocks
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.next()
|
.next()
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue