diff --git a/Cargo.lock b/Cargo.lock index 100a5317..f4058381 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2449,6 +2449,7 @@ dependencies = [ "tracing-log", "tracing-subscriber", "zebra-chain", + "zebra-consensus", "zebra-network", "zebra-state", ] diff --git a/zebrad/Cargo.toml b/zebrad/Cargo.toml index af65fbec..590a4367 100644 --- a/zebrad/Cargo.toml +++ b/zebrad/Cargo.toml @@ -34,6 +34,7 @@ zebra-state = { path = "../zebra-state" } tracing-subscriber = { version = "0.2.6", features = ["tracing-log"] } tracing-error = "0.1.2" color-eyre = "0.5" +zebra-consensus = { path = "../zebra-consensus/" } [dev-dependencies] abscissa_core = { version = "0.5", features = ["testing"] } diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index 401558be..06e062e2 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -30,7 +30,7 @@ use zebra_chain::block::BlockHeaderHash; mod sync; // genesis -static GENESIS: BlockHeaderHash = BlockHeaderHash([ +const GENESIS: BlockHeaderHash = BlockHeaderHash([ 8, 206, 61, 151, 49, 176, 0, 192, 131, 56, 69, 92, 138, 74, 107, 208, 93, 161, 110, 38, 177, 29, 170, 27, 145, 113, 132, 236, 232, 15, 4, 0, ]); @@ -58,13 +58,13 @@ impl StartCmd { let config = app_config().network.clone(); let state = zebra_state::on_disk::init(zebra_state::Config::default()); let (peer_set, _address_book) = zebra_network::init(config, node).await; + let verifier = zebra_consensus::verify::init(state.clone()); let mut syncer = sync::Syncer { peer_set, state, + verifier, block_requests: FuturesUnordered::new(), - downloading: HashSet::new(), - downloaded: HashSet::new(), fanout: 4, prospective_tips: HashSet::new(), }; diff --git a/zebrad/src/commands/start/sync.rs b/zebrad/src/commands/start/sync.rs index 1ead8f39..4b61f19b 100644 --- a/zebrad/src/commands/start/sync.rs +++ b/zebrad/src/commands/start/sync.rs @@ -1,34 +1,36 @@ use color_eyre::eyre::{eyre, Report}; use futures::stream::{FuturesUnordered, StreamExt}; -use std::{collections::HashSet, iter, time::Duration}; +use std::{collections::HashSet, iter, sync::Arc, time::Duration}; use tokio::time::delay_for; use tower::{Service, ServiceExt}; -use tracing_futures::Instrument; -use zebra_chain::{block::BlockHeaderHash, types::BlockHeight}; +use zebra_chain::{ + block::{Block, BlockHeaderHash}, + types::BlockHeight, +}; use zebra_network as zn; use zebra_state as zs; -pub struct Syncer +pub struct Syncer where ZN: Service, { pub peer_set: ZN, - // TODO(jlusby): add validator pub state: ZS, + pub verifier: ZV, pub prospective_tips: HashSet, pub block_requests: FuturesUnordered, - pub downloading: HashSet, - pub downloaded: HashSet, pub fanout: NumReq, } -impl Syncer +impl Syncer where ZN: Service + Send + Clone + 'static, ZN::Future: Send, ZS: Service + Send + Clone + 'static, ZS::Future: Send, + ZV: Service, Response = BlockHeaderHash, Error = Error> + Send + Clone + 'static, + ZV::Future: Send, { pub async fn run(&mut self) -> Result<(), Report> { loop { @@ -41,9 +43,6 @@ where while !self.prospective_tips.is_empty() { info!("extending prospective tips"); self.extend_tips().await?; - - // TODO(jlusby): move this to a background task and check it for errors after each step. - self.process_blocks().await?; } delay_for(Duration::from_secs(15)).await; @@ -161,12 +160,7 @@ where }) .await; match res.map_err::(|e| eyre!(e)) { - Ok(zn::Response::BlockHeaderHashes(hashes)) => { - if hashes.is_empty() { - tracing::debug!("skipping empty response"); - continue; - } - + Ok(zn::Response::BlockHeaderHashes(mut hashes)) => { // ExtendTips Step 3 // // For each response, check whether the first hash in the @@ -174,22 +168,26 @@ where // It indicates that the remote peer does not have any blocks // following the prospective tip. // TODO(jlusby): reject both main and test net genesis blocks - if hashes[0] == super::GENESIS { - tracing::debug!("skipping response that does not extend the tip"); - continue; + match hashes.first() { + Some(&super::GENESIS) => { + tracing::debug!("skipping response that does not extend the tip"); + continue; + } + None => { + tracing::debug!("skipping empty response"); + continue; + } + Some(_) => {} } + let new_tip = hashes.pop().expect("expected: hashes must have len > 0"); + // ExtendTips Step 4 // // Combine the last elements of the remaining responses into // a set, and add this set to the set of prospective tips. - let new_tip = *hashes.last().expect("already checked is_empty"); let _ = self.prospective_tips.insert(new_tip); - // ExtendTips Step 5 - // - // Combine all elements of the remaining responses into a - // set, and queue download and verification of those blocks download_set.extend(hashes); } Ok(r) => tracing::error!("unexpected response {:?}", r), @@ -198,100 +196,77 @@ where } } - self.request_blocks(download_set.into_iter().collect()) - .await?; + // ExtendTips Step ?? + // + // Remove tips that are already included behind one of the other + // returned tips + self.prospective_tips + .retain(|tip| !download_set.contains(tip)); + + // ExtendTips Step 5 + // + // Combine all elements of the remaining responses into a + // set, and queue download and verification of those blocks + self.request_blocks( + download_set + .into_iter() + .chain(self.prospective_tips.iter().cloned()) + .collect(), + ) + .await?; Ok(()) } /// Queue downloads for each block that isn't currently known to our node - async fn request_blocks(&mut self, mut hashes: Vec) -> Result<(), Report> { - hashes.retain(|hash| !self.known_block(hash)); - + async fn request_blocks(&mut self, hashes: Vec) -> Result<(), Report> { for chunk in hashes.chunks(10usize) { - self.queue_download(chunk).await?; - } + let set = chunk.iter().cloned().collect(); - Ok(()) - } + let request = self + .peer_set + .ready_and() + .await + .map_err(|e| eyre!(e))? + .call(zn::Request::BlocksByHash(set)); - /// Drive block downloading futures to completion and dispatch downloaded - /// blocks to the validator - async fn process_blocks(&mut self) -> Result<(), Report> { - info!(in_flight = self.block_requests.len(), "processing blocks"); + let verifier = self.verifier.clone(); - while let Some(res) = self.block_requests.next().await { - match res.map_err::(|e| eyre!(e)) { - Ok(zn::Response::Blocks(blocks)) => { - info!(count = blocks.len(), "received blocks"); - for block in blocks { - let hash = block.as_ref().into(); - assert!( - self.downloading.remove(&hash), - "all received blocks should be explicitly requested and received once" - ); - let _ = self.downloaded.insert(hash); - self.validate_block(block).await?; + let _ = tokio::spawn(async move { + let result_fut = async move { + let mut handles = FuturesUnordered::new(); + let resp = request.await?; + + if let zn::Response::Blocks(blocks) = resp { + debug!(count = blocks.len(), "received blocks"); + + for block in blocks { + let mut verifier = verifier.clone(); + let handle = tokio::spawn(async move { + verifier.ready_and().await?.call(block).await + }); + handles.push(handle); + } + } else { + debug!(?resp, "unexpected response"); } + + while let Some(res) = handles.next().await { + let _hash = res??; + } + + Ok::<_, Error>(()) + }; + + match result_fut.await { + Ok(()) => {} + Err(e) => error!("{:?}", e), } - Ok(_) => continue, - Err(e) => { - error!("{:?}", e); - } - } + }); } Ok(()) } - - /// Validate a downloaded block using the validator service, inserting the - /// block into the state if successful - #[tracing::instrument(skip(self))] - async fn validate_block( - &mut self, - block: std::sync::Arc, - ) -> Result<(), Report> { - let fut = self - .state - .ready_and() - .await - .map_err(|e| eyre!(e))? - .call(zs::Request::AddBlock { block }); - - let _handle = tokio::spawn( - async move { - match fut.await.map_err::(|e| eyre!(e)) { - Ok(_) => {} - Err(report) => error!("{:?}", report), - } - } - .in_current_span(), - ); - - Ok(()) - } - - /// Returns true if the block is being downloaded or has been downloaded - fn known_block(&self, hash: &BlockHeaderHash) -> bool { - self.downloading.contains(hash) || self.downloaded.contains(hash) - } - - /// Queue a future to download a set of blocks from the network - async fn queue_download(&mut self, chunk: &[BlockHeaderHash]) -> Result<(), Report> { - let set = chunk.iter().cloned().collect(); - - let request = self - .peer_set - .ready_and() - .await - .map_err(|e| eyre!(e))? - .call(zn::Request::BlocksByHash(set)); - - self.downloading.extend(chunk); - self.block_requests.push(request); - - Ok(()) - } } /// Get the heights of the blocks for constructing a block_locator list