diff --git a/.gitignore b/.gitignore index d2bfd814..d57c34d5 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,4 @@ *\# # Vscode detrius .vscode/ +.zebra-state/ diff --git a/zebra-state/src/in_memory.rs b/zebra-state/src/in_memory.rs index 7f991c05..06c9b251 100644 --- a/zebra-state/src/in_memory.rs +++ b/zebra-state/src/in_memory.rs @@ -12,6 +12,7 @@ use std::{ task::{Context, Poll}, }; use tower::{buffer::Buffer, Service}; +use zebra_chain::block::BlockHeaderHash; mod block_index; @@ -20,7 +21,11 @@ struct InMemoryState { index: block_index::BlockIndex, } -type Error = Box; +impl InMemoryState { + fn contains(&mut self, _hash: BlockHeaderHash) -> Result, Error> { + todo!() + } +} impl Service for InMemoryState { type Response = Response; @@ -60,6 +65,16 @@ impl Service for InMemoryState { async move { result }.boxed() } + Request::GetDepth { hash } => { + let res = self.contains(hash); + + async move { + let depth = res?; + + Ok(Response::Depth(depth)) + } + .boxed() + } } } } @@ -76,3 +91,5 @@ pub fn init() -> impl Service< + 'static { Buffer::new(InMemoryState::default(), 1) } + +type Error = Box; diff --git a/zebra-state/src/lib.rs b/zebra-state/src/lib.rs index f83ad73f..19d88fc4 100644 --- a/zebra-state/src/lib.rs +++ b/zebra-state/src/lib.rs @@ -58,12 +58,17 @@ pub enum Request { }, /// Get the block that is the tip of the current chain GetTip, + /// Ask the state if the given hash is part of the current best chain + GetDepth { + /// The hash to check against the current chain + hash: BlockHeaderHash, + }, } #[derive(Debug, PartialEq)] /// A state response pub enum Response { - /// A response to a `AddBlock` request indicating a block was successfully + /// The response to a `AddBlock` request indicating a block was successfully /// added to the state Added { /// The hash of the block that was added @@ -79,6 +84,12 @@ pub enum Response { /// The hash of the block at the tip of the current chain hash: BlockHeaderHash, }, + /// The response to a `Contains` request indicating that the given has is in + /// the current best chain + Depth( + /// The number of blocks above the given block in the current best chain + Option, + ), } #[cfg(test)] diff --git a/zebra-state/src/on_disk.rs b/zebra-state/src/on_disk.rs index 1a534944..85dcc233 100644 --- a/zebra-state/src/on_disk.rs +++ b/zebra-state/src/on_disk.rs @@ -75,19 +75,23 @@ impl SledState { } } - pub(super) fn get_tip(&self) -> Result, Error> { + pub(super) fn get_tip(&self) -> Result>, Error> { let tree = self.storage.open_tree(b"by_height")?; let last_entry = tree.iter().values().next_back(); match last_entry { - Some(Ok(bytes)) => { - let block = Arc::::zcash_deserialize(bytes.as_ref())?; - Ok(Some(block.as_ref().into())) - } + Some(Ok(bytes)) => Ok(Some(ZcashDeserialize::zcash_deserialize(bytes.as_ref())?)), Some(Err(e)) => Err(e)?, None => Ok(None), } } + + fn contains(&self, hash: &BlockHeaderHash) -> Result { + let by_hash = self.storage.open_tree(b"by_hash")?; + let key = &hash.0; + + Ok(by_hash.contains_key(key)?) + } } impl Default for SledState { @@ -129,15 +133,55 @@ impl Service for SledState { async move { storage .get_tip()? + .map(|block| block.as_ref().into()) .map(|hash| Response::Tip { hash }) .ok_or_else(|| "zebra-state contains no blocks".into()) } .boxed() } + Request::GetDepth { hash } => { + let storage = self.clone(); + + async move { + if !storage.contains(&hash)? { + return Ok(Response::Depth(None)); + } + + let block = storage + .get(hash)? + .expect("block must be present if contains returned true"); + let tip = storage + .get_tip()? + .expect("storage must have a tip if it contains the previous block"); + + let depth = + tip.coinbase_height().unwrap().0 - block.coinbase_height().unwrap().0; + + Ok(Response::Depth(Some(depth))) + } + .boxed() + } } } } +/// An alternate repr for `BlockHeight` that implements `AsRef<[u8]>` for usage +/// with sled +struct BytesHeight(u32, [u8; 4]); + +impl From for BytesHeight { + fn from(height: BlockHeight) -> Self { + let bytes = height.0.to_be_bytes(); + Self(height.0, bytes) + } +} + +impl AsRef<[u8]> for BytesHeight { + fn as_ref(&self) -> &[u8] { + &self.1[..] + } +} + pub(super) enum BlockQuery { ByHash(BlockHeaderHash), ByHeight(BlockHeight), diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index 4c218fe0..401558be 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -22,14 +22,12 @@ use crate::config::ZebradConfig; use crate::{components::tokio::TokioComponent, prelude::*}; use abscissa_core::{config, Command, FrameworkError, Options, Runnable}; use color_eyre::eyre::Report; -use color_eyre::eyre::{eyre, WrapErr}; -use futures::{ - prelude::*, - stream::{FuturesUnordered, StreamExt}, -}; -use std::collections::BTreeSet; -use tower::{buffer::Buffer, service_fn, Service, ServiceExt}; -use zebra_chain::{block::BlockHeaderHash, types::BlockHeight}; +use futures::stream::FuturesUnordered; +use std::collections::HashSet; +use tower::{buffer::Buffer, service_fn}; +use zebra_chain::block::BlockHeaderHash; + +mod sync; // genesis static GENESIS: BlockHeaderHash = BlockHeaderHash([ @@ -57,26 +55,21 @@ impl StartCmd { }), 1, ); - let config = app_config().network.clone(); let state = zebra_state::on_disk::init(zebra_state::Config::default()); let (peer_set, _address_book) = zebra_network::init(config, node).await; - let retry_peer_set = tower::retry::Retry::new(zebra_network::RetryErrors, peer_set.clone()); - let mut downloaded_block_heights = BTreeSet::::new(); - downloaded_block_heights.insert(BlockHeight(0)); - - let mut connect = Core { - retry_peer_set, + let mut syncer = sync::Syncer { peer_set, state, - tip: GENESIS, block_requests: FuturesUnordered::new(), - requested_block_heights: 0, - downloaded_block_heights, + downloading: HashSet::new(), + downloaded: HashSet::new(), + fanout: 4, + prospective_tips: HashSet::new(), }; - connect.run().await + syncer.run().await } } @@ -117,129 +110,3 @@ impl config::Override for StartCmd { Ok(config) } } - -struct Core -where - ZN: Service, -{ - retry_peer_set: tower::retry::Retry, - peer_set: ZN, - state: ZS, - tip: BlockHeaderHash, - block_requests: FuturesUnordered, - requested_block_heights: usize, - downloaded_block_heights: BTreeSet, -} - -impl Core -where - ZN: Service - + Send - + Clone - + 'static, - ZN::Future: Send, - ZS: Service - + Send - + Clone - + 'static, - ZS::Future: Send, -{ - async fn run(&mut self) -> Result<(), Report> { - // TODO(jlusby): Replace with real state service - - while self.requested_block_heights < 700_000 { - let hashes = self.next_hashes().await?; - self.tip = *hashes.last().unwrap(); - - // Request the corresponding blocks in chunks - self.request_blocks(hashes).await?; - - // Allow at most 300 block requests in flight. - self.drain_requests(300).await?; - } - - self.drain_requests(0).await?; - - let eternity = future::pending::<()>(); - eternity.await; - - Ok(()) - } - - async fn next_hashes(&mut self) -> Result, Report> { - // Request the next 500 hashes. - self.retry_peer_set - .ready_and() - .await - .map_err(|e| eyre!(e))? - .call(zebra_network::Request::FindBlocks { - known_blocks: vec![self.tip], - stop: None, - }) - .await - .map_err(|e| eyre!(e)) - .wrap_err("request failed, TODO implement retry") - .map(|response| match response { - zebra_network::Response::BlockHeaderHashes(hashes) => hashes, - _ => unreachable!("FindBlocks always gets a BlockHeaderHashes response"), - }) - .map(|hashes| { - info!( - new_hashes = hashes.len(), - requested = self.requested_block_heights, - in_flight = self.block_requests.len(), - downloaded = self.downloaded_block_heights.len(), - highest = self.downloaded_block_heights.iter().next_back().unwrap().0, - "requested more hashes" - ); - self.requested_block_heights += hashes.len(); - hashes - }) - } - - async fn request_blocks(&mut self, hashes: Vec) -> Result<(), Report> { - for chunk in hashes.chunks(10usize) { - let request = self.peer_set.ready_and().await.map_err(|e| eyre!(e))?.call( - zebra_network::Request::BlocksByHash(chunk.iter().cloned().collect()), - ); - - self.block_requests.push(request); - } - - Ok(()) - } - - async fn drain_requests(&mut self, request_goal: usize) -> Result<(), Report> { - while self.block_requests.len() > request_goal { - match self - .block_requests - .next() - .await - .expect("expected: block_requests is never empty") - .map_err::(|e| eyre!(e)) - { - Ok(zebra_network::Response::Blocks(blocks)) => { - for block in blocks { - self.downloaded_block_heights - .insert(block.coinbase_height().unwrap()); - self.state - .ready_and() - .await - .map_err(|e| eyre!(e))? - .call(zebra_state::Request::AddBlock { block }) - .await - .map_err(|e| eyre!(e))?; - } - } - Ok(_) => continue, - Err(e) => { - error!("{:?}", e); - } - } - } - - Ok(()) - } -} - -type Error = Box; diff --git a/zebrad/src/commands/start/sync.rs b/zebrad/src/commands/start/sync.rs new file mode 100644 index 00000000..3d444b17 --- /dev/null +++ b/zebrad/src/commands/start/sync.rs @@ -0,0 +1,329 @@ +use color_eyre::eyre::{eyre, Report}; +use futures::stream::{FuturesUnordered, StreamExt}; +use std::{collections::HashSet, iter, time::Duration}; +use tokio::time::delay_for; +use tower::{Service, ServiceExt}; +use tracing_futures::Instrument; +use zebra_chain::{block::BlockHeaderHash, types::BlockHeight}; + +pub struct Syncer +where + ZN: Service, +{ + pub peer_set: ZN, + // TODO(jlusby): add validator + pub state: ZS, + pub prospective_tips: HashSet, + pub block_requests: FuturesUnordered, + pub downloading: HashSet, + pub downloaded: HashSet, + pub fanout: NumReq, +} + +impl Syncer +where + ZN: Service + + Send + + Clone + + 'static, + ZN::Future: Send, + ZS: Service + + Send + + Clone + + 'static, + ZS::Future: Send, +{ + pub async fn run(&mut self) -> Result<(), Report> { + loop { + info!("populating prospective tips list"); + self.obtain_tips().await?; + + // ObtainTips Step 6 + // + // If there are any prospective tips, call ExtendTips. Continue this step until there are no more prospective tips. + while !self.prospective_tips.is_empty() { + info!("extending prospective tips"); + self.extend_tips().await?; + + // TODO(jlusby): move this to a background task and check it for errors after each step. + self.process_blocks().await?; + } + + delay_for(Duration::from_secs(15)).await; + } + } + + /// Given a block_locator list fan out request for subsequent hashes to + /// multiple peers + async fn obtain_tips(&mut self) -> Result<(), Report> { + // ObtainTips Step 1 + // + // Query the current state to construct the sequence of hashes: handled by + // the caller + // + // TODO(jlusby): get the block_locator from the state + let block_locator = vec![super::GENESIS]; + let mut tip_futs = FuturesUnordered::new(); + + // ObtainTips Step 2 + // + // Make a FindBlocksByHash request to the network F times, where F is a + // fanout parameter, to get resp1, ..., respF + for _ in 0..self.fanout { + let req = self.peer_set.ready_and().await.map_err(|e| eyre!(e))?.call( + zebra_network::Request::FindBlocks { + known_blocks: block_locator.clone(), + stop: None, + }, + ); + tip_futs.push(req); + } + + let mut download_set = HashSet::new(); + while let Some(res) = tip_futs.next().await { + match res.map_err::(|e| eyre!(e)) { + Ok(zebra_network::Response::BlockHeaderHashes(hashes)) => { + info!( + new_hashes = hashes.len(), + in_flight = self.block_requests.len(), + downloaded = self.downloaded.len(), + "requested more hashes" + ); + + // TODO(jlusby): reject both main and test net genesis blocks + if hashes.last() == Some(&super::GENESIS) { + continue; + } + + let mut hashes = hashes.into_iter().peekable(); + let new_tip = if let Some(tip) = hashes.next() { + tip + } else { + continue; + }; + + // ObtainTips Step 3 + // + // For each response, starting from the beginning of the + // list, prune any block hashes already included in the + // state, stopping at the first unknown hash to get resp1', + // ..., respF'. (These lists may be empty). + while let Some(&next) = hashes.peek() { + let resp = self + .state + .ready_and() + .await + .map_err(|e| eyre!(e))? + .call(zebra_state::Request::GetDepth { hash: next }) + .await + .map_err(|e| eyre!(e))?; + + let should_download = matches!(resp, zebra_state::Response::Depth(None)); + + if should_download { + download_set.extend(hashes); + break; + } else { + let _ = hashes.next(); + } + } + + // ObtainTips Step 4 + // + // Combine the last elements of each list into a set; this + // is the set of prospective tips. + let _ = self.prospective_tips.insert(new_tip); + } + Ok(_) => {} + Err(e) => { + error!("{:?}", e); + } + } + } + + // ObtainTips Step 5 + // + // Combine all elements of each list into a set, and queue + // download and verification of those blocks. + self.request_blocks(download_set.into_iter().collect()) + .await?; + + Ok(()) + } + + async fn extend_tips(&mut self) -> Result<(), Report> { + // Extend Tips 1 + // + // remove all prospective tips and iterate over them individually + let tips = std::mem::take(&mut self.prospective_tips); + + let mut download_set = HashSet::new(); + for tip in tips { + // ExtendTips Step 2 + // + // Create a FindBlocksByHash request consisting of just the + // prospective tip. Send this request to the network F times + for _ in 0..self.fanout { + let res = self + .peer_set + .ready_and() + .await + .map_err(|e| eyre!(e))? + .call(zebra_network::Request::FindBlocks { + known_blocks: vec![tip], + stop: None, + }) + .await; + match res.map_err::(|e| eyre!(e)) { + Ok(zebra_network::Response::BlockHeaderHashes(hashes)) => { + info!( + new_hashes = hashes.len(), + in_flight = self.block_requests.len(), + downloaded = self.downloaded.len(), + "requested more hashes" + ); + + // ExtendTips Step 3 + // + // For each response, check whether the first hash in the + // response is the genesis block; if so, discard the response. + // It indicates that the remote peer does not have any blocks + // following the prospective tip. + if hashes.last() == Some(&super::GENESIS) { + continue; + } + + let mut hashes = hashes.into_iter(); + let new_tip = if let Some(tip) = hashes.next() { + tip + } else { + continue; + }; + + // ExtendTips Step 4 + // + // Combine the last elements of the remaining responses into + // a set, and add this set to the set of prospective tips. + let _ = self.prospective_tips.insert(new_tip); + + // ExtendTips Step 5 + // + // Combine all elements of the remaining responses into a + // set, and queue download and verification of those blocks + download_set.extend(hashes); + } + Ok(_) => {} + Err(e) => { + error!("{:?}", e); + } + } + } + } + + self.request_blocks(download_set.into_iter().collect()) + .await?; + + Ok(()) + } + + /// Queue downloads for each block that isn't currently known to our node + async fn request_blocks(&mut self, mut hashes: Vec) -> Result<(), Report> { + hashes.retain(|hash| !self.known_block(hash)); + + for chunk in hashes.chunks(10usize) { + self.queue_download(chunk).await?; + } + + Ok(()) + } + + /// Drive block downloading futures to completion and dispatch downloaded + /// blocks to the validator + async fn process_blocks(&mut self) -> Result<(), Report> { + info!(in_flight = self.block_requests.len(), "processing blocks"); + + while let Some(res) = self.block_requests.next().await { + match res.map_err::(|e| eyre!(e)) { + Ok(zebra_network::Response::Blocks(blocks)) => { + info!(count = blocks.len(), "received blocks"); + for block in blocks { + let hash = block.as_ref().into(); + assert!( + self.downloading.remove(&hash), + "all received blocks should be explicitly requested and received once" + ); + let _ = self.downloaded.insert(hash); + self.validate_block(block).await?; + } + } + Ok(_) => continue, + Err(e) => { + error!("{:?}", e); + } + } + } + + Ok(()) + } + + /// Validate a downloaded block using the validator service, inserting the + /// block into the state if successful + #[tracing::instrument(skip(self))] + async fn validate_block( + &mut self, + block: std::sync::Arc, + ) -> Result<(), Report> { + let fut = self + .state + .ready_and() + .await + .map_err(|e| eyre!(e))? + .call(zebra_state::Request::AddBlock { block }); + + let _handle = tokio::spawn( + async move { + match fut.await.map_err::(|e| eyre!(e)) { + Ok(_) => {} + Err(report) => error!("{:?}", report), + } + } + .in_current_span(), + ); + + Ok(()) + } + + /// Returns true if the block is being downloaded or has been downloaded + fn known_block(&self, hash: &BlockHeaderHash) -> bool { + self.downloading.contains(hash) || self.downloaded.contains(hash) + } + + /// Queue a future to download a set of blocks from the network + async fn queue_download(&mut self, chunk: &[BlockHeaderHash]) -> Result<(), Report> { + let set = chunk.iter().cloned().collect(); + + let request = self + .peer_set + .ready_and() + .await + .map_err(|e| eyre!(e))? + .call(zebra_network::Request::BlocksByHash(set)); + + self.downloading.extend(chunk); + self.block_requests.push(request); + + Ok(()) + } +} + +/// Get the heights of the blocks for constructing a block_locator list +#[allow(dead_code)] +pub fn block_locator_heights(tip_height: BlockHeight) -> impl Iterator { + iter::successors(Some(1u32), |h| h.checked_mul(2)) + .flat_map(move |step| tip_height.0.checked_sub(step)) + .map(BlockHeight) + .chain(iter::once(BlockHeight(0))) +} + +type Error = Box; +type NumReq = u32;