feat(rpc): Implement `getaddressbalance` RPC (#4138)

* Add `Amount::serialize_as_string` helper method

A helper method that makes it easier to serialize an `Amount` as a
string. This is needed for the response type of the `getaccountbalance`
RPC.

* Implement state service call for address balance

Add `Read{Request,Response}::AddressBalance` variants and implement the
handler that calls the query function.

* Create an `AddressBalance` response type

Only contains the `balance` field which is needed by `lightwalletd`.
That field is serialized as a string, following the RPC specification.

* Implement `get_address_balance` RPC

Query the read-only state service for the information, and wrap it in an
`AddressBalance` response type so that it is serialized correctly.

* Run `rustfmt` inside `proptest!` block

Fix some minor formatting details.

* Test `get_address_balance` with valid addresses

Check that the RPC leads to a query to the mocked state service for a
balance amount.

* Test `get_address_balance` with invalid addresses

An error message should be returned by the RPC.

* Rename metric to `address_balance`

Keep it consistent with how it's named in other places.

Co-authored-by: teor <teor@riseup.net>

* Revert "Add `Amount::serialize_as_string` helper method"

This reverts commit 01b432e3d2ac2313a90d55d06b3fa855c0b71330.

* Serialize amount as an integer

This is different from what the documentation says, but it's what
lightwalletd expects.

* Add reference to RPC documentation

Make sure it is linked to for easy access.

* Create an `AddressStrings` type

To be used as the input for the `get_address_balance` RPC method.

* Use `AddressStrings` in `get_address_balance` RPC

Fix the input parameter so that the list of address strings is placed
inside a JSON map.

* Update property tests to use `AddressStrings`

Make sure the proper input type is created.

Co-authored-by: teor <teor@riseup.net>
This commit is contained in:
Janito Vaqueiro Ferreira Filho 2022-04-20 18:27:00 +00:00 committed by GitHub
parent dff25473aa
commit e5f00c5902
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 274 additions and 20 deletions

View File

@ -19,6 +19,7 @@ use tower::{buffer::Buffer, Service, ServiceExt};
use tracing::Instrument; use tracing::Instrument;
use zebra_chain::{ use zebra_chain::{
amount::{Amount, NonNegative},
block::{self, Height, SerializedBlock}, block::{self, Height, SerializedBlock},
chain_tip::ChainTip, chain_tip::ChainTip,
parameters::{ConsensusBranchId, Network, NetworkUpgrade}, parameters::{ConsensusBranchId, Network, NetworkUpgrade},
@ -69,6 +70,32 @@ pub trait Rpc {
#[rpc(name = "getblockchaininfo")] #[rpc(name = "getblockchaininfo")]
fn get_blockchain_info(&self) -> Result<GetBlockChainInfo>; fn get_blockchain_info(&self) -> Result<GetBlockChainInfo>;
/// Returns the total balance of a provided `addresses` in an [`AddressBalance`] instance.
///
/// zcashd reference: [`getaddressbalance`](https://zcash.github.io/rpc/getaddressbalance.html)
///
/// # Parameters
///
/// - `address_strings`: (map) A JSON map with a single entry
/// - `addresses`: (array of strings) A list of base-58 encoded addresses.
///
/// # Notes
///
/// zcashd also accepts a single string parameter instead of an array of strings, but Zebra
/// doesn't because lightwalletd always calls this RPC with an array of addresses.
///
/// zcashd also returns the total amount of Zatoshis received by the addresses, but Zebra
/// doesn't because lightwalletd doesn't use that information.
///
/// The RPC documentation says that the returned object has a string `balance` field, but
/// zcashd actually [returns an
/// integer](https://github.com/zcash/lightwalletd/blob/bdaac63f3ee0dbef62bde04f6817a9f90d483b00/common/common.go#L128-L130).
#[rpc(name = "getaddressbalance")]
fn get_address_balance(
&self,
address_strings: AddressStrings,
) -> BoxFuture<Result<AddressBalance>>;
/// Sends the raw bytes of a signed transaction to the local node's mempool, if the transaction is valid. /// Sends the raw bytes of a signed transaction to the local node's mempool, if the transaction is valid.
/// Returns the [`SentTransactionHash`] for the transaction, as a JSON string. /// Returns the [`SentTransactionHash`] for the transaction, as a JSON string.
/// ///
@ -369,6 +396,40 @@ where
Ok(response) Ok(response)
} }
fn get_address_balance(
&self,
address_strings: AddressStrings,
) -> BoxFuture<Result<AddressBalance>> {
let state = self.state.clone();
async move {
let addresses: HashSet<Address> = address_strings
.addresses
.into_iter()
.map(|address| {
address.parse().map_err(|error| {
Error::invalid_params(&format!("invalid address {address:?}: {error}"))
})
})
.collect::<Result<_>>()?;
let request = zebra_state::ReadRequest::AddressBalance(addresses);
let response = state.oneshot(request).await.map_err(|error| Error {
code: ErrorCode::ServerError(0),
message: error.to_string(),
data: None,
})?;
match response {
zebra_state::ReadResponse::AddressBalance(balance) => {
Ok(AddressBalance { balance })
}
_ => unreachable!("Unexpected response from state service: {response:?}"),
}
}
.boxed()
}
fn send_raw_transaction( fn send_raw_transaction(
&self, &self,
raw_transaction_hex: String, raw_transaction_hex: String,
@ -657,6 +718,20 @@ pub struct GetBlockChainInfo {
consensus: TipConsensusBranch, consensus: TipConsensusBranch,
} }
/// A wrapper type with a list of strings of addresses.
///
/// This is used for the input parameter of [`Rpc::get_account_balance`].
#[derive(Clone, Debug, Eq, PartialEq, Hash, serde::Deserialize)]
pub struct AddressStrings {
addresses: Vec<String>,
}
/// The transparent balance of a set of addresses.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, serde::Serialize)]
pub struct AddressBalance {
balance: Amount<NonNegative>,
}
/// A hex-encoded [`ConsensusBranchId`] string. /// A hex-encoded [`ConsensusBranchId`] string.
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] #[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
struct ConsensusBranchIdHex(#[serde(with = "hex")] ConsensusBranchId); struct ConsensusBranchIdHex(#[serde(with = "hex")] ConsensusBranchId);

View File

@ -2,14 +2,15 @@
use std::collections::HashSet; use std::collections::HashSet;
use futures::FutureExt; use futures::{join, FutureExt, TryFutureExt};
use hex::ToHex; use hex::ToHex;
use jsonrpc_core::{Error, ErrorCode}; use jsonrpc_core::{Error, ErrorCode};
use proptest::prelude::*; use proptest::{collection::vec, prelude::*};
use thiserror::Error; use thiserror::Error;
use tower::buffer::Buffer; use tower::buffer::Buffer;
use zebra_chain::{ use zebra_chain::{
amount::{Amount, NonNegative},
block::{Block, Height}, block::{Block, Height},
chain_tip::{mock::MockChainTip, NoChainTip}, chain_tip::{mock::MockChainTip, NoChainTip},
parameters::{ parameters::{
@ -18,13 +19,16 @@ use zebra_chain::{
}, },
serialization::{ZcashDeserialize, ZcashSerialize}, serialization::{ZcashDeserialize, ZcashSerialize},
transaction::{self, Transaction, UnminedTx, UnminedTxId}, transaction::{self, Transaction, UnminedTx, UnminedTxId},
transparent,
}; };
use zebra_node_services::mempool; use zebra_node_services::mempool;
use zebra_state::BoxError; use zebra_state::BoxError;
use zebra_test::mock_service::MockService; use zebra_test::mock_service::MockService;
use super::super::{NetworkUpgradeStatus, Rpc, RpcImpl, SentTransactionHash}; use super::super::{
AddressBalance, AddressStrings, NetworkUpgradeStatus, Rpc, RpcImpl, SentTransactionHash,
};
proptest! { proptest! {
/// Test that when sending a raw transaction, it is received by the mempool service. /// Test that when sending a raw transaction, it is received by the mempool service.
@ -304,8 +308,7 @@ proptest! {
/// Make the mock mempool service return a list of transaction IDs, and check that the RPC call /// Make the mock mempool service return a list of transaction IDs, and check that the RPC call
/// returns those IDs as hexadecimal strings. /// returns those IDs as hexadecimal strings.
#[test] #[test]
fn mempool_transactions_are_sent_to_caller(transaction_ids in any::<HashSet<UnminedTxId>>()) fn mempool_transactions_are_sent_to_caller(transaction_ids in any::<HashSet<UnminedTxId>>()) {
{
let runtime = zebra_test::init_async(); let runtime = zebra_test::init_async();
let _guard = runtime.enter(); let _guard = runtime.enter();
@ -357,7 +360,9 @@ proptest! {
/// Try to call `get_raw_transaction` using a string parameter that has at least one /// Try to call `get_raw_transaction` using a string parameter that has at least one
/// non-hexadecimal character, and check that it fails with an expected error. /// non-hexadecimal character, and check that it fails with an expected error.
#[test] #[test]
fn get_raw_transaction_non_hexadecimal_string_results_in_an_error(non_hex_string in ".*[^0-9A-Fa-f].*") { fn get_raw_transaction_non_hexadecimal_string_results_in_an_error(
non_hex_string in ".*[^0-9A-Fa-f].*",
) {
let runtime = zebra_test::init_async(); let runtime = zebra_test::init_async();
let _guard = runtime.enter(); let _guard = runtime.enter();
@ -409,7 +414,9 @@ proptest! {
/// Try to call `get_raw_transaction` using random bytes that fail to deserialize as a /// Try to call `get_raw_transaction` using random bytes that fail to deserialize as a
/// transaction, and check that it fails with an expected error. /// transaction, and check that it fails with an expected error.
#[test] #[test]
fn get_raw_transaction_invalid_transaction_results_in_an_error(random_bytes in any::<Vec<u8>>()) { fn get_raw_transaction_invalid_transaction_results_in_an_error(
random_bytes in any::<Vec<u8>>(),
) {
let runtime = zebra_test::init_async(); let runtime = zebra_test::init_async();
let _guard = runtime.enter(); let _guard = runtime.enter();
@ -476,7 +483,10 @@ proptest! {
); );
let response = rpc.get_blockchain_info(); let response = rpc.get_blockchain_info();
prop_assert_eq!(&response.err().unwrap().message, "No Chain tip available yet"); prop_assert_eq!(
&response.err().unwrap().message,
"No Chain tip available yet"
);
// The queue task should continue without errors or panics // The queue task should continue without errors or panics
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
@ -529,8 +539,18 @@ proptest! {
prop_assert_eq!(info.best_block_hash.0, block_hash); prop_assert_eq!(info.best_block_hash.0, block_hash);
prop_assert!(info.estimated_height < Height::MAX.0); prop_assert!(info.estimated_height < Height::MAX.0);
prop_assert_eq!(info.consensus.chain_tip.0, NetworkUpgrade::current(network, block_height).branch_id().unwrap()); prop_assert_eq!(
prop_assert_eq!(info.consensus.next_block.0, NetworkUpgrade::current(network, (block_height + 1).unwrap()).branch_id().unwrap()); info.consensus.chain_tip.0,
NetworkUpgrade::current(network, block_height)
.branch_id()
.unwrap()
);
prop_assert_eq!(
info.consensus.next_block.0,
NetworkUpgrade::current(network, (block_height + 1).unwrap())
.branch_id()
.unwrap()
);
for u in info.upgrades { for u in info.upgrades {
let mut status = NetworkUpgradeStatus::Active; let mut status = NetworkUpgradeStatus::Active;
@ -539,10 +559,10 @@ proptest! {
} }
prop_assert_eq!(u.1.status, status); prop_assert_eq!(u.1.status, status);
} }
}, }
Err(_) => { Err(_) => {
unreachable!("Test should never error with the data we are feeding it") unreachable!("Test should never error with the data we are feeding it")
}, }
}; };
// The queue task should continue without errors or panics // The queue task should continue without errors or panics
@ -558,10 +578,133 @@ proptest! {
})?; })?;
} }
/// Test the `get_address_balance` RPC using an arbitrary set of addresses.
#[test]
fn queries_balance_for_valid_addresses(
network in any::<Network>(),
addresses in any::<HashSet<transparent::Address>>(),
balance in any::<Amount<NonNegative>>(),
) {
let runtime = zebra_test::init_async();
let _guard = runtime.enter();
let mut mempool = MockService::build().for_prop_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();
// Create a mocked `ChainTip`
let (chain_tip, _mock_chain_tip_sender) = MockChainTip::new();
// Prepare the list of addresses.
let address_strings = AddressStrings {
addresses: addresses
.iter()
.map(|address| address.to_string())
.collect(),
};
tokio::time::pause();
// Start RPC with the mocked `ChainTip`
runtime.block_on(async move {
let (rpc, _rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
chain_tip,
network,
);
// Build the future to call the RPC
let call = rpc.get_address_balance(address_strings);
// The RPC should perform a state query
let state_query = state
.expect_request(zebra_state::ReadRequest::AddressBalance(addresses))
.map_ok(|responder| {
responder.respond(zebra_state::ReadResponse::AddressBalance(balance))
});
// Await the RPC call and the state query
let (response, state_query_result) = join!(call, state_query);
state_query_result?;
// Check that response contains the expected balance
let received_balance = response?;
prop_assert_eq!(received_balance, AddressBalance { balance });
// Check no further requests were made during this test
mempool.expect_no_requests().await?;
state.expect_no_requests().await?;
Ok::<_, TestCaseError>(())
})?;
}
/// Test the `get_address_balance` RPC using an invalid list of addresses.
///
/// An error should be returned.
#[test]
fn does_not_query_balance_for_invalid_addresses(
network in any::<Network>(),
at_least_one_invalid_address in vec(".*", 1..10),
) {
let runtime = zebra_test::init_async();
let _guard = runtime.enter();
prop_assume!(at_least_one_invalid_address
.iter()
.any(|string| string.parse::<transparent::Address>().is_err()));
let mut mempool = MockService::build().for_prop_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();
// Create a mocked `ChainTip`
let (chain_tip, _mock_chain_tip_sender) = MockChainTip::new();
tokio::time::pause();
// Start RPC with the mocked `ChainTip`
runtime.block_on(async move {
let (rpc, _rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
chain_tip,
network,
);
let address_strings = AddressStrings {
addresses: at_least_one_invalid_address,
};
// Build the future to call the RPC
let result = rpc.get_address_balance(address_strings).await;
// Check that the invalid addresses lead to an error
prop_assert!(
matches!(
result,
Err(Error {
code: ErrorCode::InvalidParams,
..
})
),
"Result is not a server error: {result:?}"
);
// Check no requests were made during this test
mempool.expect_no_requests().await?;
state.expect_no_requests().await?;
Ok::<_, TestCaseError>(())
})?;
}
/// Test the queue functionality using `send_raw_transaction` /// Test the queue functionality using `send_raw_transaction`
#[test] #[test]
fn rpc_queue_main_loop(tx in any::<Transaction>()) fn rpc_queue_main_loop(tx in any::<Transaction>()) {
{
let runtime = zebra_test::init_async(); let runtime = zebra_test::init_async();
let _guard = runtime.enter(); let _guard = runtime.enter();
@ -627,7 +770,8 @@ proptest! {
.respond(response); .respond(response);
// now a retry will be sent to the mempool // now a retry will be sent to the mempool
let expected_request = mempool::Request::Queue(vec![mempool::Gossip::Tx(tx_unmined.clone())]); let expected_request =
mempool::Request::Queue(vec![mempool::Gossip::Tx(tx_unmined.clone())]);
let response = mempool::Response::Queued(vec![Ok(())]); let response = mempool::Response::Queued(vec![Ok(())]);
mempool mempool
@ -649,8 +793,7 @@ proptest! {
/// Test we receive all transactions that are sent in a channel /// Test we receive all transactions that are sent in a channel
#[test] #[test]
fn rpc_queue_receives_all_transactions_from_channel(txs in any::<[Transaction; 2]>()) fn rpc_queue_receives_all_transactions_from_channel(txs in any::<[Transaction; 2]>()) {
{
let runtime = zebra_test::init_async(); let runtime = zebra_test::init_async();
let _guard = runtime.enter(); let _guard = runtime.enter();
@ -715,14 +858,17 @@ proptest! {
// we use `expect_request_that` because we can't guarantee the state request order // we use `expect_request_that` because we can't guarantee the state request order
state state
.expect_request_that(|request| matches!(request, zebra_state::ReadRequest::Transaction(_))) .expect_request_that(|request| {
matches!(request, zebra_state::ReadRequest::Transaction(_))
})
.await? .await?
.respond(response); .respond(response);
} }
// each transaction will be retried // each transaction will be retried
for tx in txs.clone() { for tx in txs.clone() {
let expected_request = mempool::Request::Queue(vec![mempool::Gossip::Tx(UnminedTx::from(tx))]); let expected_request =
mempool::Request::Queue(vec![mempool::Gossip::Tx(UnminedTx::from(tx))]);
let response = mempool::Response::Queued(vec![Ok(())]); let response = mempool::Response::Queued(vec![Ok(())]);
mempool mempool

View File

@ -1,6 +1,9 @@
//! State [`tower::Service`] request types. //! State [`tower::Service`] request types.
use std::{collections::HashMap, sync::Arc}; use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use zebra_chain::{ use zebra_chain::{
amount::NegativeAllowed, amount::NegativeAllowed,
@ -445,4 +448,9 @@ pub enum ReadRequest {
/// Returned txids are in the order they appear in blocks, which ensures that they are topologically sorted /// Returned txids are in the order they appear in blocks, which ensures that they are topologically sorted
/// (i.e. parent txids will appear before child txids). /// (i.e. parent txids will appear before child txids).
TransactionsByAddresses(Vec<transparent::Address>, block::Height, block::Height), TransactionsByAddresses(Vec<transparent::Address>, block::Height, block::Height),
/// Looks up the balance of a set of transparent addresses.
///
/// Returns an [`Amount`] with the total balance of the set of addresses.
AddressBalance(HashSet<transparent::Address>),
} }

View File

@ -3,6 +3,7 @@
use std::sync::Arc; use std::sync::Arc;
use zebra_chain::{ use zebra_chain::{
amount::{Amount, NonNegative},
block::{self, Block}, block::{self, Block},
transaction::{Hash, Transaction}, transaction::{Hash, Transaction},
transparent, transparent,
@ -57,4 +58,7 @@ pub enum ReadResponse {
/// Response to [`ReadRequest::TransactionsByAddresses`] with the obtained transaction ids, /// Response to [`ReadRequest::TransactionsByAddresses`] with the obtained transaction ids,
/// in the order they appear in blocks. /// in the order they appear in blocks.
TransactionIds(Vec<Hash>), TransactionIds(Vec<Hash>),
/// Response to [`ReadRequest::AddressBalance`] with the total balance of the addresses.
AddressBalance(Amount<NonNegative>),
} }

View File

@ -1014,6 +1014,27 @@ impl Service<ReadRequest> for ReadStateService {
} }
.boxed() .boxed()
} }
// For the get_address_balance RPC.
ReadRequest::AddressBalance(addresses) => {
metrics::counter!(
"state.requests",
1,
"service" => "read_state",
"type" => "address_balance",
);
let state = self.clone();
async move {
let balance = state.best_chain_receiver.with_watch_data(|best_chain| {
read::transparent_balance(best_chain, &state.db, addresses)
})?;
Ok(ReadResponse::AddressBalance(balance))
}
.boxed()
}
} }
} }
} }