4. feat(state): support Request::Transaction in ReadStateService (#3865)

* Add request metrics to ReadStateService

Add a field to distinguish StateService and ReadStateService metrics.
Add missing StateService request metrics.

* Refactor state transaction lookup so it can be shared between services

* Implement ReadState Request::Transaction
This commit is contained in:
teor 2022-03-18 04:48:13 +10:00 committed by GitHub
parent b563b2a1c1
commit 88ab6deeac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 139 additions and 35 deletions

View File

@ -464,12 +464,10 @@ impl StateService {
read::block(self.mem.best_chain(), self.disk.db(), hash_or_height) read::block(self.mem.best_chain(), self.disk.db(), hash_or_height)
} }
/// Return the transaction identified by `hash` if it exists in the current /// Returns the [`Transaction`] with [`transaction::Hash`],
/// best chain. /// if it exists in the current best chain.
pub fn best_transaction(&self, hash: transaction::Hash) -> Option<Arc<Transaction>> { pub fn best_transaction(&self, hash: transaction::Hash) -> Option<Arc<Transaction>> {
self.mem read::transaction(self.mem.best_chain(), self.disk.db(), hash)
.best_transaction(hash)
.or_else(|| self.disk.db().transaction(hash))
} }
/// Return the hash for the block at `height` in the current best chain. /// Return the hash for the block at `height` in the current best chain.
@ -739,7 +737,12 @@ impl Service<Request> for StateService {
fn call(&mut self, req: Request) -> Self::Future { fn call(&mut self, req: Request) -> Self::Future {
match req { match req {
Request::CommitBlock(prepared) => { Request::CommitBlock(prepared) => {
metrics::counter!("state.requests", 1, "type" => "commit_block"); metrics::counter!(
"state.requests",
1,
"service" => "state",
"type" => "commit_block",
);
self.assert_block_can_be_validated(&prepared); self.assert_block_can_be_validated(&prepared);
@ -757,7 +760,12 @@ impl Service<Request> for StateService {
.boxed() .boxed()
} }
Request::CommitFinalizedBlock(finalized) => { Request::CommitFinalizedBlock(finalized) => {
metrics::counter!("state.requests", 1, "type" => "commit_finalized_block"); metrics::counter!(
"state.requests",
1,
"service" => "state",
"type" => "commit_finalized_block",
);
self.pending_utxos.check_against(&finalized.new_outputs); self.pending_utxos.check_against(&finalized.new_outputs);
let rsp_rx = self.queue_and_commit_finalized(finalized); let rsp_rx = self.queue_and_commit_finalized(finalized);
@ -772,32 +780,67 @@ impl Service<Request> for StateService {
.boxed() .boxed()
} }
Request::Depth(hash) => { Request::Depth(hash) => {
metrics::counter!("state.requests", 1, "type" => "depth"); metrics::counter!(
"state.requests",
1,
"service" => "state",
"type" => "depth",
);
let rsp = Ok(self.best_depth(hash)).map(Response::Depth); let rsp = Ok(self.best_depth(hash)).map(Response::Depth);
async move { rsp }.boxed() async move { rsp }.boxed()
} }
Request::Tip => { Request::Tip => {
metrics::counter!("state.requests", 1, "type" => "tip"); metrics::counter!(
"state.requests",
1,
"service" => "state",
"type" => "tip",
);
let rsp = Ok(self.best_tip()).map(Response::Tip); let rsp = Ok(self.best_tip()).map(Response::Tip);
async move { rsp }.boxed() async move { rsp }.boxed()
} }
Request::BlockLocator => { Request::BlockLocator => {
metrics::counter!("state.requests", 1, "type" => "block_locator"); metrics::counter!(
"state.requests",
1,
"service" => "state",
"type" => "block_locator",
);
let rsp = Ok(self.block_locator().unwrap_or_default()).map(Response::BlockLocator); let rsp = Ok(self.block_locator().unwrap_or_default()).map(Response::BlockLocator);
async move { rsp }.boxed() async move { rsp }.boxed()
} }
Request::Transaction(hash) => { Request::Transaction(hash) => {
metrics::counter!("state.requests", 1, "type" => "transaction"); metrics::counter!(
"state.requests",
1,
"service" => "state",
"type" => "transaction",
);
let rsp = Ok(self.best_transaction(hash)).map(Response::Transaction); let rsp = Ok(self.best_transaction(hash)).map(Response::Transaction);
async move { rsp }.boxed() async move { rsp }.boxed()
} }
Request::Block(hash_or_height) => { Request::Block(hash_or_height) => {
metrics::counter!("state.requests", 1, "type" => "block"); metrics::counter!(
"state.requests",
1,
"service" => "state",
"type" => "block",
);
let rsp = Ok(self.best_block(hash_or_height)).map(Response::Block); let rsp = Ok(self.best_block(hash_or_height)).map(Response::Block);
async move { rsp }.boxed() async move { rsp }.boxed()
} }
Request::AwaitUtxo(outpoint) => { Request::AwaitUtxo(outpoint) => {
metrics::counter!("state.requests", 1, "type" => "await_utxo"); metrics::counter!(
"state.requests",
1,
"service" => "state",
"type" => "await_utxo",
);
let fut = self.pending_utxos.queue(outpoint); let fut = self.pending_utxos.queue(outpoint);
@ -808,12 +851,26 @@ impl Service<Request> for StateService {
fut.boxed() fut.boxed()
} }
Request::FindBlockHashes { known_blocks, stop } => { Request::FindBlockHashes { known_blocks, stop } => {
metrics::counter!(
"state.requests",
1,
"service" => "state",
"type" => "find_block_hashes",
);
const MAX_FIND_BLOCK_HASHES_RESULTS: usize = 500; const MAX_FIND_BLOCK_HASHES_RESULTS: usize = 500;
let res = let res =
self.find_best_chain_hashes(known_blocks, stop, MAX_FIND_BLOCK_HASHES_RESULTS); self.find_best_chain_hashes(known_blocks, stop, MAX_FIND_BLOCK_HASHES_RESULTS);
async move { Ok(Response::BlockHashes(res)) }.boxed() async move { Ok(Response::BlockHashes(res)) }.boxed()
} }
Request::FindBlockHeaders { known_blocks, stop } => { Request::FindBlockHeaders { known_blocks, stop } => {
metrics::counter!(
"state.requests",
1,
"service" => "state",
"type" => "find_block_headers",
);
const MAX_FIND_BLOCK_HEADERS_RESULTS: usize = 160; const MAX_FIND_BLOCK_HEADERS_RESULTS: usize = 160;
// Zcashd will blindly request more block headers as long as it // Zcashd will blindly request more block headers as long as it
// got 160 block headers in response to a previous query, EVEN // got 160 block headers in response to a previous query, EVEN
@ -860,6 +917,13 @@ impl Service<Request> for ReadStateService {
match req { match req {
// Used by get_block RPC. // Used by get_block RPC.
Request::Block(hash_or_height) => { Request::Block(hash_or_height) => {
metrics::counter!(
"state.requests",
1,
"service" => "read_state",
"type" => "block",
);
let state = self.clone(); let state = self.clone();
async move { async move {
@ -872,11 +936,25 @@ impl Service<Request> for ReadStateService {
.boxed() .boxed()
} }
// TODO: implement for lightwalletd as part of these tickets // For the get_raw_transaction RPC, to be implemented in #3145.
Request::Transaction(hash) => {
metrics::counter!(
"state.requests",
1,
"service" => "read_state",
"type" => "transaction",
);
// get_raw_transaction (#3145) let state = self.clone();
Request::Transaction(_hash) => {
unimplemented!("ReadStateService doesn't Transaction yet") async move {
let transaction = state.best_chain_receiver.with_watch_data(|best_chain| {
read::transaction(best_chain, &state.db, hash)
});
Ok(Response::Transaction(transaction))
}
.boxed()
} }
// TODO: split the Request enum, then implement these new ReadRequests for lightwalletd // TODO: split the Request enum, then implement these new ReadRequests for lightwalletd

View File

@ -69,7 +69,7 @@ impl ZebraDb {
self.db.zs_get(height_by_hash, &hash) self.db.zs_get(height_by_hash, &hash)
} }
/// Returns the [`Block`] with [`Hash`](zebra_chain::block::Hash) or /// Returns the [`Block`] with [`block::Hash`](zebra_chain::block::Hash) or
/// [`Height`](zebra_chain::block::Height), if it exists in the finalized chain. /// [`Height`](zebra_chain::block::Height), if it exists in the finalized chain.
pub fn block(&self, hash_or_height: HashOrHeight) -> Option<Arc<Block>> { pub fn block(&self, hash_or_height: HashOrHeight) -> Option<Arc<Block>> {
let height_by_hash = self.db.cf_handle("height_by_hash").unwrap(); let height_by_hash = self.db.cf_handle("height_by_hash").unwrap();
@ -102,7 +102,8 @@ impl ZebraDb {
// Read transaction methods // Read transaction methods
/// Returns the given transaction if it exists. /// Returns the [`Transaction`] with [`transaction::Hash`],
/// if it exists in the finalized chain.
pub fn transaction(&self, hash: transaction::Hash) -> Option<Arc<Transaction>> { pub fn transaction(&self, hash: transaction::Hash) -> Option<Arc<Transaction>> {
let tx_by_hash = self.db.cf_handle("tx_by_hash").unwrap(); let tx_by_hash = self.db.cf_handle("tx_by_hash").unwrap();
self.db self.db

View File

@ -9,9 +9,7 @@ use zebra_chain::{
history_tree::HistoryTree, history_tree::HistoryTree,
orchard, orchard,
parameters::Network, parameters::Network,
sapling, sprout, sapling, sprout, transparent,
transaction::{self, Transaction},
transparent,
}; };
use crate::{ use crate::{
@ -347,15 +345,6 @@ impl NonFinalizedState {
None None
} }
/// Returns the given transaction if it exists in the best chain.
pub fn best_transaction(&self, hash: transaction::Hash) -> Option<Arc<Transaction>> {
let best_chain = self.best_chain()?;
best_chain
.tx_by_hash
.get(&hash)
.map(|(height, index)| best_chain.blocks[height].block.transactions[*index].clone())
}
/// Returns `true` if the best chain contains `sprout_nullifier`. /// Returns `true` if the best chain contains `sprout_nullifier`.
#[cfg(test)] #[cfg(test)]
pub fn best_contains_sprout_nullifier(&self, sprout_nullifier: &sprout::Nullifier) -> bool { pub fn best_contains_sprout_nullifier(&self, sprout_nullifier: &sprout::Nullifier) -> bool {

View File

@ -5,6 +5,7 @@ use std::{
cmp::Ordering, cmp::Ordering,
collections::{BTreeMap, HashMap, HashSet}, collections::{BTreeMap, HashMap, HashSet},
ops::Deref, ops::Deref,
sync::Arc,
}; };
use mset::MultiSet; use mset::MultiSet;
@ -17,8 +18,9 @@ use zebra_chain::{
orchard, orchard,
parameters::Network, parameters::Network,
primitives::Groth16Proof, primitives::Groth16Proof,
sapling, sprout, transaction, sapling, sprout,
transaction::Transaction::*, transaction::Transaction::*,
transaction::{self, Transaction},
transparent, transparent,
value_balance::ValueBalance, value_balance::ValueBalance,
work::difficulty::PartialCumulativeWork, work::difficulty::PartialCumulativeWork,
@ -318,7 +320,7 @@ impl Chain {
Ok(Some(forked)) Ok(Some(forked))
} }
/// Returns the [`ContextuallyValidBlock`] with [`Hash`](zebra_chain::block::Hash) or /// Returns the [`ContextuallyValidBlock`] with [`block::Hash`] or
/// [`Height`](zebra_chain::block::Height), if it exists in this chain. /// [`Height`](zebra_chain::block::Height), if it exists in this chain.
pub fn block(&self, hash_or_height: HashOrHeight) -> Option<&ContextuallyValidBlock> { pub fn block(&self, hash_or_height: HashOrHeight) -> Option<&ContextuallyValidBlock> {
let height = let height =
@ -327,6 +329,13 @@ impl Chain {
self.blocks.get(&height) self.blocks.get(&height)
} }
/// Returns the [`Transaction`] with [`transaction::Hash`], if it exists in this chain.
pub fn transaction(&self, hash: transaction::Hash) -> Option<&Arc<Transaction>> {
self.tx_by_hash
.get(&hash)
.map(|(height, index)| &self.blocks[height].block.transactions[*index])
}
/// Returns the block hash of the tip block. /// Returns the block hash of the tip block.
pub fn non_finalized_tip_hash(&self) -> block::Hash { pub fn non_finalized_tip_hash(&self) -> block::Hash {
self.blocks self.blocks

View File

@ -6,14 +6,17 @@
use std::sync::Arc; use std::sync::Arc;
use zebra_chain::block::Block; use zebra_chain::{
block::Block,
transaction::{self, Transaction},
};
use crate::{ use crate::{
service::{finalized_state::ZebraDb, non_finalized_state::Chain}, service::{finalized_state::ZebraDb, non_finalized_state::Chain},
HashOrHeight, HashOrHeight,
}; };
/// Returns the [`Block`] with [`Hash`](zebra_chain::block::Hash) or /// Returns the [`Block`] with [`block::Hash`](zebra_chain::block::Hash) or
/// [`Height`](zebra_chain::block::Height), /// [`Height`](zebra_chain::block::Height),
/// if it exists in the non-finalized `chain` or finalized `db`. /// if it exists in the non-finalized `chain` or finalized `db`.
pub(crate) fn block<C>( pub(crate) fn block<C>(
@ -38,3 +41,27 @@ where
.map(|contextual| contextual.block.clone()) .map(|contextual| contextual.block.clone())
.or_else(|| db.block(hash_or_height)) .or_else(|| db.block(hash_or_height))
} }
/// Returns the [`Transaction`] with [`transaction::Hash`],
/// if it exists in the non-finalized `chain` or finalized `db`.
pub(crate) fn transaction<C>(
chain: Option<C>,
db: &ZebraDb,
hash: transaction::Hash,
) -> Option<Arc<Transaction>>
where
C: AsRef<Chain>,
{
// # Correctness
//
// The StateService commits blocks to the finalized state before updating the latest chain,
// and it can commit additional blocks after we've cloned this `chain` variable.
//
// Since transactions are the same in the finalized and non-finalized state,
// we check the most efficient alternative first.
// (`chain` is always in memory, but `db` stores transactions on disk, with a memory cache.)
chain
.as_ref()
.and_then(|chain| chain.as_ref().transaction(hash).cloned())
.or_else(|| db.transaction(hash))
}