Set the new tips to be the last, not first, hash.
This commit is contained in:
parent
70241d3cad
commit
18eb212d8e
|
|
@ -6,9 +6,24 @@ use tower::{Service, ServiceExt};
|
||||||
use tracing_futures::Instrument;
|
use tracing_futures::Instrument;
|
||||||
use zebra_chain::{block::BlockHeaderHash, types::BlockHeight};
|
use zebra_chain::{block::BlockHeaderHash, types::BlockHeight};
|
||||||
|
|
||||||
|
use zebra_network as zn;
|
||||||
|
use zebra_state as zs;
|
||||||
|
|
||||||
|
/// Get the heights of the blocks for constructing a block_locator list
|
||||||
|
#[allow(dead_code)]
|
||||||
|
pub fn block_locator_heights(tip_height: BlockHeight) -> impl Iterator<Item = BlockHeight> {
|
||||||
|
iter::successors(Some(1u32), |h| h.checked_mul(2))
|
||||||
|
.flat_map(move |step| tip_height.0.checked_sub(step))
|
||||||
|
.map(BlockHeight)
|
||||||
|
.chain(iter::once(BlockHeight(0)))
|
||||||
|
}
|
||||||
|
|
||||||
|
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
|
||||||
|
type NumReq = u32;
|
||||||
|
|
||||||
pub struct Syncer<ZN, ZS>
|
pub struct Syncer<ZN, ZS>
|
||||||
where
|
where
|
||||||
ZN: Service<zebra_network::Request>,
|
ZN: Service<zn::Request>,
|
||||||
{
|
{
|
||||||
pub peer_set: ZN,
|
pub peer_set: ZN,
|
||||||
// TODO(jlusby): add validator
|
// TODO(jlusby): add validator
|
||||||
|
|
@ -22,15 +37,9 @@ where
|
||||||
|
|
||||||
impl<ZN, ZS> Syncer<ZN, ZS>
|
impl<ZN, ZS> Syncer<ZN, ZS>
|
||||||
where
|
where
|
||||||
ZN: Service<zebra_network::Request, Response = zebra_network::Response, Error = Error>
|
ZN: Service<zn::Request, Response = zn::Response, Error = Error> + Send + Clone + 'static,
|
||||||
+ Send
|
|
||||||
+ Clone
|
|
||||||
+ 'static,
|
|
||||||
ZN::Future: Send,
|
ZN::Future: Send,
|
||||||
ZS: Service<zebra_state::Request, Response = zebra_state::Response, Error = Error>
|
ZS: Service<zs::Request, Response = zs::Response, Error = Error> + Send + Clone + 'static,
|
||||||
+ Send
|
|
||||||
+ Clone
|
|
||||||
+ 'static,
|
|
||||||
ZS::Future: Send,
|
ZS::Future: Send,
|
||||||
{
|
{
|
||||||
pub async fn run(&mut self) -> Result<(), Report> {
|
pub async fn run(&mut self) -> Result<(), Report> {
|
||||||
|
|
@ -71,7 +80,7 @@ where
|
||||||
// fanout parameter, to get resp1, ..., respF
|
// fanout parameter, to get resp1, ..., respF
|
||||||
for _ in 0..self.fanout {
|
for _ in 0..self.fanout {
|
||||||
let req = self.peer_set.ready_and().await.map_err(|e| eyre!(e))?.call(
|
let req = self.peer_set.ready_and().await.map_err(|e| eyre!(e))?.call(
|
||||||
zebra_network::Request::FindBlocks {
|
zn::Request::FindBlocks {
|
||||||
known_blocks: block_locator.clone(),
|
known_blocks: block_locator.clone(),
|
||||||
stop: None,
|
stop: None,
|
||||||
},
|
},
|
||||||
|
|
@ -82,62 +91,51 @@ where
|
||||||
let mut download_set = HashSet::new();
|
let mut download_set = HashSet::new();
|
||||||
while let Some(res) = tip_futs.next().await {
|
while let Some(res) = tip_futs.next().await {
|
||||||
match res.map_err::<Report, _>(|e| eyre!(e)) {
|
match res.map_err::<Report, _>(|e| eyre!(e)) {
|
||||||
Ok(zebra_network::Response::BlockHeaderHashes(hashes)) => {
|
Ok(zn::Response::BlockHeaderHashes(hashes)) => {
|
||||||
info!(
|
if hashes.is_empty() {
|
||||||
new_hashes = hashes.len(),
|
tracing::debug!("skipping empty response");
|
||||||
in_flight = self.block_requests.len(),
|
|
||||||
downloaded = self.downloaded.len(),
|
|
||||||
"requested more hashes"
|
|
||||||
);
|
|
||||||
|
|
||||||
// TODO(jlusby): reject both main and test net genesis blocks
|
|
||||||
if hashes.last() == Some(&super::GENESIS) {
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut hashes = hashes.into_iter().peekable();
|
|
||||||
let new_tip = if let Some(tip) = hashes.next() {
|
|
||||||
tip
|
|
||||||
} else {
|
|
||||||
continue;
|
|
||||||
};
|
|
||||||
|
|
||||||
// ObtainTips Step 3
|
// ObtainTips Step 3
|
||||||
//
|
//
|
||||||
// For each response, starting from the beginning of the
|
// For each response, starting from the beginning of the
|
||||||
// list, prune any block hashes already included in the
|
// list, prune any block hashes already included in the
|
||||||
// state, stopping at the first unknown hash to get resp1',
|
// state, stopping at the first unknown hash to get resp1',
|
||||||
// ..., respF'. (These lists may be empty).
|
// ..., respF'. (These lists may be empty).
|
||||||
while let Some(&next) = hashes.peek() {
|
let mut first_unknown = 0;
|
||||||
let resp = self
|
for (i, &hash) in hashes.iter().enumerate() {
|
||||||
|
let depth = self
|
||||||
.state
|
.state
|
||||||
.ready_and()
|
.ready_and()
|
||||||
.await
|
.await
|
||||||
.map_err(|e| eyre!(e))?
|
.map_err(|e| eyre!(e))?
|
||||||
.call(zebra_state::Request::GetDepth { hash: next })
|
.call(zebra_state::Request::GetDepth { hash })
|
||||||
.await
|
.await
|
||||||
.map_err(|e| eyre!(e))?;
|
.map_err(|e| eyre!(e))?;
|
||||||
|
if let zs::Response::Depth(None) = depth {
|
||||||
let should_download = matches!(resp, zebra_state::Response::Depth(None));
|
first_unknown = i;
|
||||||
|
|
||||||
if should_download {
|
|
||||||
download_set.extend(hashes);
|
|
||||||
break;
|
break;
|
||||||
} else {
|
|
||||||
let _ = hashes.next();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if first_unknown == hashes.len() {
|
||||||
|
tracing::debug!("no new hashes, even though we gave our tip?");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let unknown_hashes = &hashes[first_unknown..];
|
||||||
|
download_set.extend(unknown_hashes);
|
||||||
|
|
||||||
// ObtainTips Step 4
|
// ObtainTips Step 4
|
||||||
//
|
//
|
||||||
// Combine the last elements of each list into a set; this
|
// Combine the last elements of each list into a set; this
|
||||||
// is the set of prospective tips.
|
// is the set of prospective tips.
|
||||||
|
let new_tip = *unknown_hashes
|
||||||
|
.last()
|
||||||
|
.expect("already checked first_unknown < hashes.len()");
|
||||||
let _ = self.prospective_tips.insert(new_tip);
|
let _ = self.prospective_tips.insert(new_tip);
|
||||||
}
|
}
|
||||||
Ok(_) => {}
|
Ok(r) => tracing::error!("unexpected response {:?}", r),
|
||||||
Err(e) => {
|
Err(e) => tracing::error!("{:?}", e),
|
||||||
error!("{:?}", e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -169,19 +167,17 @@ where
|
||||||
.ready_and()
|
.ready_and()
|
||||||
.await
|
.await
|
||||||
.map_err(|e| eyre!(e))?
|
.map_err(|e| eyre!(e))?
|
||||||
.call(zebra_network::Request::FindBlocks {
|
.call(zn::Request::FindBlocks {
|
||||||
known_blocks: vec![tip],
|
known_blocks: vec![tip],
|
||||||
stop: None,
|
stop: None,
|
||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
match res.map_err::<Report, _>(|e| eyre!(e)) {
|
match res.map_err::<Report, _>(|e| eyre!(e)) {
|
||||||
Ok(zebra_network::Response::BlockHeaderHashes(hashes)) => {
|
Ok(zn::Response::BlockHeaderHashes(hashes)) => {
|
||||||
info!(
|
if hashes.is_empty() {
|
||||||
new_hashes = hashes.len(),
|
tracing::debug!("skipping empty response");
|
||||||
in_flight = self.block_requests.len(),
|
continue;
|
||||||
downloaded = self.downloaded.len(),
|
}
|
||||||
"requested more hashes"
|
|
||||||
);
|
|
||||||
|
|
||||||
// ExtendTips Step 3
|
// ExtendTips Step 3
|
||||||
//
|
//
|
||||||
|
|
@ -189,21 +185,17 @@ where
|
||||||
// response is the genesis block; if so, discard the response.
|
// response is the genesis block; if so, discard the response.
|
||||||
// It indicates that the remote peer does not have any blocks
|
// It indicates that the remote peer does not have any blocks
|
||||||
// following the prospective tip.
|
// following the prospective tip.
|
||||||
if hashes.last() == Some(&super::GENESIS) {
|
// TODO(jlusby): reject both main and test net genesis blocks
|
||||||
|
if hashes[0] == super::GENESIS {
|
||||||
|
tracing::debug!("skipping response that does not extend the tip");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut hashes = hashes.into_iter();
|
|
||||||
let new_tip = if let Some(tip) = hashes.next() {
|
|
||||||
tip
|
|
||||||
} else {
|
|
||||||
continue;
|
|
||||||
};
|
|
||||||
|
|
||||||
// ExtendTips Step 4
|
// ExtendTips Step 4
|
||||||
//
|
//
|
||||||
// Combine the last elements of the remaining responses into
|
// Combine the last elements of the remaining responses into
|
||||||
// a set, and add this set to the set of prospective tips.
|
// a set, and add this set to the set of prospective tips.
|
||||||
|
let new_tip = *hashes.last().expect("already checked is_empty");
|
||||||
let _ = self.prospective_tips.insert(new_tip);
|
let _ = self.prospective_tips.insert(new_tip);
|
||||||
|
|
||||||
// ExtendTips Step 5
|
// ExtendTips Step 5
|
||||||
|
|
@ -212,10 +204,8 @@ where
|
||||||
// set, and queue download and verification of those blocks
|
// set, and queue download and verification of those blocks
|
||||||
download_set.extend(hashes);
|
download_set.extend(hashes);
|
||||||
}
|
}
|
||||||
Ok(_) => {}
|
Ok(r) => tracing::error!("unexpected response {:?}", r),
|
||||||
Err(e) => {
|
Err(e) => tracing::error!("{:?}", e),
|
||||||
error!("{:?}", e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -244,7 +234,7 @@ where
|
||||||
|
|
||||||
while let Some(res) = self.block_requests.next().await {
|
while let Some(res) = self.block_requests.next().await {
|
||||||
match res.map_err::<Report, _>(|e| eyre!(e)) {
|
match res.map_err::<Report, _>(|e| eyre!(e)) {
|
||||||
Ok(zebra_network::Response::Blocks(blocks)) => {
|
Ok(zn::Response::Blocks(blocks)) => {
|
||||||
info!(count = blocks.len(), "received blocks");
|
info!(count = blocks.len(), "received blocks");
|
||||||
for block in blocks {
|
for block in blocks {
|
||||||
let hash = block.as_ref().into();
|
let hash = block.as_ref().into();
|
||||||
|
|
@ -278,7 +268,7 @@ where
|
||||||
.ready_and()
|
.ready_and()
|
||||||
.await
|
.await
|
||||||
.map_err(|e| eyre!(e))?
|
.map_err(|e| eyre!(e))?
|
||||||
.call(zebra_state::Request::AddBlock { block });
|
.call(zs::Request::AddBlock { block });
|
||||||
|
|
||||||
let _handle = tokio::spawn(
|
let _handle = tokio::spawn(
|
||||||
async move {
|
async move {
|
||||||
|
|
@ -307,7 +297,7 @@ where
|
||||||
.ready_and()
|
.ready_and()
|
||||||
.await
|
.await
|
||||||
.map_err(|e| eyre!(e))?
|
.map_err(|e| eyre!(e))?
|
||||||
.call(zebra_network::Request::BlocksByHash(set));
|
.call(zn::Request::BlocksByHash(set));
|
||||||
|
|
||||||
self.downloading.extend(chunk);
|
self.downloading.extend(chunk);
|
||||||
self.block_requests.push(request);
|
self.block_requests.push(request);
|
||||||
|
|
@ -315,15 +305,3 @@ where
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the heights of the blocks for constructing a block_locator list
|
|
||||||
#[allow(dead_code)]
|
|
||||||
pub fn block_locator_heights(tip_height: BlockHeight) -> impl Iterator<Item = BlockHeight> {
|
|
||||||
iter::successors(Some(1u32), |h| h.checked_mul(2))
|
|
||||||
.flat_map(move |step| tip_height.0.checked_sub(step))
|
|
||||||
.map(BlockHeight)
|
|
||||||
.chain(iter::once(BlockHeight(0)))
|
|
||||||
}
|
|
||||||
|
|
||||||
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
|
|
||||||
type NumReq = u32;
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue