diff --git a/zebra-state/Cargo.toml b/zebra-state/Cargo.toml index 610c0064..7fabe6e4 100644 --- a/zebra-state/Cargo.toml +++ b/zebra-state/Cargo.toml @@ -23,6 +23,7 @@ tower = "0.3.1" tracing = "0.1" tracing-error = "0.1.2" thiserror = "1.0.20" +tokio = { version = "0.2.22", features = ["sync"] } [dev-dependencies] zebra-test = { path = "../zebra-test/" } diff --git a/zebra-state/src/lib.rs b/zebra-state/src/lib.rs index 559bbc87..ebcea597 100644 --- a/zebra-state/src/lib.rs +++ b/zebra-state/src/lib.rs @@ -20,6 +20,7 @@ mod util; mod tests; use memory_state::MemoryState; +use service::QueuedBlock; use sled_state::SledState; pub use config::Config; diff --git a/zebra-state/src/service.rs b/zebra-state/src/service.rs index 51369764..36d1d2bf 100644 --- a/zebra-state/src/service.rs +++ b/zebra-state/src/service.rs @@ -1,15 +1,29 @@ use std::{ future::Future, pin::Pin, + sync::Arc, task::{Context, Poll}, }; use futures::future::{FutureExt, TryFutureExt}; +use tokio::sync::oneshot; use tower::{buffer::Buffer, util::BoxService, Service}; -use zebra_chain::parameters::Network; +use zebra_chain::{ + block::{self, Block}, + parameters::Network, +}; use crate::{BoxError, Config, HashOrHeight, MemoryState, Request, Response, SledState}; +// todo: put this somewhere +pub struct QueuedBlock { + pub block: Arc, + // TODO: add these parameters when we can compute anchors. + // sprout_anchor: sprout::tree::Root, + // sapling_anchor: sapling::tree::Root, + pub rsp_tx: oneshot::Sender>, +} + struct StateService { /// Holds data relating to finalized chain state. sled: SledState, @@ -39,12 +53,18 @@ impl Service for StateService { match req { Request::CommitBlock { block } => unimplemented!(), Request::CommitFinalizedBlock { block } => { - let rsp = self - .sled - .commit_finalized(block) - .map(|hash| Response::Committed(hash)); + let (rsp_tx, rsp_rx) = oneshot::channel(); - async move { rsp }.boxed() + self.sled.queue(QueuedBlock { block, rsp_tx }); + self.sled.process_queue(); + + async move { + rsp_rx + .await + .expect("sender oneshot is not dropped") + .map(|hash| Response::Committed(hash)) + } + .boxed() } Request::Depth(hash) => { // todo: handle in memory and sled diff --git a/zebra-state/src/sled_state.rs b/zebra-state/src/sled_state.rs index fb2cb510..848b4f45 100644 --- a/zebra-state/src/sled_state.rs +++ b/zebra-state/src/sled_state.rs @@ -1,16 +1,19 @@ //! The primary implementation of the `zebra_state::Service` built upon sled -use crate::Config; -use std::{convert::TryInto, future::Future, sync::Arc}; + +use std::{collections::HashMap, convert::TryInto, future::Future}; + use zebra_chain::serialization::ZcashSerialize; use zebra_chain::{ - block::{self, Block}, + block::{self}, parameters::Network, }; -use crate::BoxError; +use crate::{BoxError, Config, QueuedBlock}; -#[derive(Clone)] pub struct SledState { + /// Queued blocks that arrived out of order, indexed by their parent block hash. + queued_by_prev_hash: HashMap, + hash_by_height: sled::Tree, height_by_hash: sled::Tree, block_by_height: sled::Tree, @@ -27,6 +30,7 @@ impl SledState { let db = config.sled_config(network).open().unwrap(); Self { + queued_by_prev_hash: HashMap::new(), hash_by_height: db.open_tree(b"hash_by_height").unwrap(), height_by_hash: db.open_tree(b"height_by_hash").unwrap(), block_by_height: db.open_tree(b"block_by_height").unwrap(), @@ -37,9 +41,33 @@ impl SledState { } } + /// Queue a finalized block to be committed to the state. + pub fn queue(&mut self, queued_block: QueuedBlock) { + let prev_hash = queued_block.block.header.previous_block_hash; + self.queued_by_prev_hash.insert(prev_hash, queued_block); + } + + pub fn process_queue(&mut self) { + // Cloning means the closure doesn't hold a borrow of &self, + // conflicting with mutable access in the loop below. + let hash_by_height = self.hash_by_height.clone(); + let tip_hash = || { + read_tip(&hash_by_height) + .expect("inability to look up tip is unrecoverable") + .map(|(_height, hash)| hash) + .unwrap_or(block::Hash([0; 32])) + }; + + while let Some(queued_block) = self.queued_by_prev_hash.remove(&tip_hash()) { + self.commit_finalized(queued_block) + } + } + /// Commit a finalized block to the state. It's the caller's responsibility /// to ensure that blocks are committed in order. - pub fn commit_finalized(&self, block: Arc) -> Result { + fn commit_finalized(&mut self, queued_block: QueuedBlock) { + let QueuedBlock { block, rsp_tx } = queued_block; + // The only valid block without a coinbase height is the genesis // block. By this point the block has been validated, so if // there's no coinbase height, it must be the genesis block. @@ -48,7 +76,7 @@ impl SledState { let hash = block.hash(); use sled::Transactional; - ( + let transaction_result = ( &self.hash_by_height, &self.height_by_hash, &self.block_by_height, @@ -68,9 +96,9 @@ impl SledState { // for some reason type inference fails here Ok::<_, sled::transaction::ConflictableTransactionError>(()) - })?; + }); - Ok(hash) + let _ = rsp_tx.send(transaction_result.map(|_| hash).map_err(Into::into)); } // TODO: this impl works only during checkpointing, it needs to be rewritten @@ -101,20 +129,7 @@ impl SledState { &self, ) -> impl Future, BoxError>> { let hash_by_height = self.hash_by_height.clone(); - async move { - Ok(hash_by_height - .iter() - .rev() - .next() - .transpose()? - .map(|(height_bytes, hash_bytes)| { - let height = block::Height(u32::from_be_bytes( - height_bytes.as_ref().try_into().unwrap(), - )); - let hash = block::Hash(hash_bytes.as_ref().try_into().unwrap()); - (height, hash) - })) - } + async move { read_tip(&hash_by_height) } } pub fn depth(&self, hash: block::Hash) -> impl Future, BoxError>> { @@ -137,3 +152,19 @@ impl SledState { } } } + +// Split into a helper function to be called synchronously or asynchronously. +fn read_tip(hash_by_height: &sled::Tree) -> Result, BoxError> { + Ok(hash_by_height + .iter() + .rev() + .next() + .transpose()? + .map(|(height_bytes, hash_bytes)| { + let height = block::Height(u32::from_be_bytes( + height_bytes.as_ref().try_into().unwrap(), + )); + let hash = block::Hash(hash_bytes.as_ref().try_into().unwrap()); + (height, hash) + })) +}