//! Internal mining in Zebra. //! //! # TODO //! - pause mining if we have no peers, like `zcashd` does, //! and add a developer config that mines regardless of how many peers we have. //! //! - move common code into zebra-chain or zebra-node-services and remove the RPC dependency. use std::{cmp::min, sync::Arc, thread::available_parallelism, time::Duration}; use color_eyre::Report; use futures::{stream::FuturesUnordered, StreamExt}; use thread_priority::{ThreadBuilder, ThreadPriority}; use tokio::{select, sync::watch, task::JoinHandle, time::sleep}; use tower::Service; use tracing::{Instrument, Span}; use zebra_chain::{ block::{self, Block}, chain_sync_status::ChainSyncStatus, chain_tip::ChainTip, diagnostic::task::WaitForPanics, serialization::{AtLeastOne, ZcashSerialize}, shutdown::is_shutting_down, work::equihash::{Solution, SolverCancelled}, }; use zebra_network::AddressBookPeers; use zebra_node_services::mempool; use zebra_rpc::{ config::mining::Config, methods::{ get_block_template_rpcs::{ get_block_template::{ self, proposal::TimeSource, proposal_block_from_template, GetBlockTemplateCapability::*, GetBlockTemplateRequestMode::*, }, types::hex_data::HexData, }, GetBlockTemplateRpc, GetBlockTemplateRpcImpl, }, }; use zebra_state::WatchReceiver; /// The amount of time we wait between block template retries. pub const BLOCK_TEMPLATE_WAIT_TIME: Duration = Duration::from_secs(20); /// A rate-limit for block template refreshes. pub const BLOCK_TEMPLATE_REFRESH_LIMIT: Duration = Duration::from_secs(2); /// How long we wait after mining a block, before expecting a new template. /// /// This should be slightly longer than `BLOCK_TEMPLATE_REFRESH_LIMIT` to allow for template /// generation. pub const BLOCK_MINING_WAIT_TIME: Duration = Duration::from_secs(3); /// Initialize the miner based on its config, and spawn a task for it. /// /// This method is CPU and memory-intensive. It uses 144 MB of RAM and one CPU core per configured /// mining thread. /// /// See [`run_mining_solver()`] for more details. pub fn spawn_init( config: &Config, rpc: GetBlockTemplateRpcImpl, ) -> JoinHandle> // TODO: simplify or avoid repeating these generics (how?) where Mempool: Service< mempool::Request, Response = mempool::Response, Error = zebra_node_services::BoxError, > + Clone + Send + Sync + 'static, Mempool::Future: Send, State: Service< zebra_state::ReadRequest, Response = zebra_state::ReadResponse, Error = zebra_state::BoxError, > + Clone + Send + Sync + 'static, >::Future: Send, Tip: ChainTip + Clone + Send + Sync + 'static, BlockVerifierRouter: Service + Clone + Send + Sync + 'static, >::Future: Send, SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static, AddressBook: AddressBookPeers + Clone + Send + Sync + 'static, { let config = config.clone(); // TODO: spawn an entirely new executor here, so mining is isolated from higher priority tasks. tokio::spawn(init(config, rpc).in_current_span()) } /// Initialize the miner based on its config. /// /// This method is CPU and memory-intensive. It uses 144 MB of RAM and one CPU core per configured /// mining thread. /// /// See [`run_mining_solver()`] for more details. pub async fn init( _config: Config, rpc: GetBlockTemplateRpcImpl, ) -> Result<(), Report> where Mempool: Service< mempool::Request, Response = mempool::Response, Error = zebra_node_services::BoxError, > + Clone + Send + Sync + 'static, Mempool::Future: Send, State: Service< zebra_state::ReadRequest, Response = zebra_state::ReadResponse, Error = zebra_state::BoxError, > + Clone + Send + Sync + 'static, >::Future: Send, Tip: ChainTip + Clone + Send + Sync + 'static, BlockVerifierRouter: Service + Clone + Send + Sync + 'static, >::Future: Send, SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static, AddressBook: AddressBookPeers + Clone + Send + Sync + 'static, { // TODO: change this to `config.internal_miner_threads` when internal miner feature is added back. // https://github.com/ZcashFoundation/zebra/issues/8183 let configured_threads = 1; // If we can't detect the number of cores, use the configured number. let available_threads = available_parallelism() .map(usize::from) .unwrap_or(configured_threads); // Use the minimum of the configured and available threads. let solver_count = min(configured_threads, available_threads); info!( ?solver_count, "launching mining tasks with parallel solvers" ); let (template_sender, template_receiver) = watch::channel(None); let template_receiver = WatchReceiver::new(template_receiver); // Spawn these tasks, to avoid blocked cooperative futures, and improve shutdown responsiveness. // This is particularly important when there are a large number of solver threads. let mut abort_handles = Vec::new(); let template_generator = tokio::task::spawn( generate_block_templates(rpc.clone(), template_sender).in_current_span(), ); abort_handles.push(template_generator.abort_handle()); let template_generator = template_generator.wait_for_panics(); let mut mining_solvers = FuturesUnordered::new(); for solver_id in 0..solver_count { // Assume there are less than 256 cores. If there are more, only run 256 tasks. let solver_id = min(solver_id, usize::from(u8::MAX)) .try_into() .expect("just limited to u8::MAX"); let solver = tokio::task::spawn( run_mining_solver(solver_id, template_receiver.clone(), rpc.clone()).in_current_span(), ); abort_handles.push(solver.abort_handle()); mining_solvers.push(solver.wait_for_panics()); } // These tasks run forever unless there is a fatal error or shutdown. // When that happens, the first task to error returns, and the other JoinHandle futures are // cancelled. let first_result; select! { result = template_generator => { first_result = result; } result = mining_solvers.next() => { first_result = result .expect("stream never terminates because there is at least one solver task"); } } // But the spawned async tasks keep running, so we need to abort them here. for abort_handle in abort_handles { abort_handle.abort(); } // Any spawned blocking threads will keep running. When this task returns and drops the // `template_sender`, it cancels all the spawned miner threads. This works because we've // aborted the `template_generator` task, which owns the `template_sender`. (And it doesn't // spawn any blocking threads.) first_result } /// Generates block templates using `rpc`, and sends them to mining threads using `template_sender`. #[instrument(skip(rpc, template_sender))] pub async fn generate_block_templates< Mempool, State, Tip, BlockVerifierRouter, SyncStatus, AddressBook, >( rpc: GetBlockTemplateRpcImpl, template_sender: watch::Sender>>, ) -> Result<(), Report> where Mempool: Service< mempool::Request, Response = mempool::Response, Error = zebra_node_services::BoxError, > + Clone + Send + Sync + 'static, Mempool::Future: Send, State: Service< zebra_state::ReadRequest, Response = zebra_state::ReadResponse, Error = zebra_state::BoxError, > + Clone + Send + Sync + 'static, >::Future: Send, Tip: ChainTip + Clone + Send + Sync + 'static, BlockVerifierRouter: Service + Clone + Send + Sync + 'static, >::Future: Send, SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static, AddressBook: AddressBookPeers + Clone + Send + Sync + 'static, { // Pass the correct arguments, even if Zebra currently ignores them. let mut parameters = get_block_template::JsonParameters { mode: Template, data: None, capabilities: vec![LongPoll, CoinbaseTxn], long_poll_id: None, _work_id: None, }; // Shut down the task when all the template receivers are dropped, or Zebra shuts down. while !template_sender.is_closed() && !is_shutting_down() { let template = rpc.get_block_template(Some(parameters.clone())).await; // Wait for the chain to sync so we get a valid template. let Ok(template) = template else { debug!( ?BLOCK_TEMPLATE_WAIT_TIME, "waiting for a valid block template", ); // Skip the wait if we got an error because we are shutting down. if !is_shutting_down() { sleep(BLOCK_TEMPLATE_WAIT_TIME).await; } continue; }; // Convert from RPC GetBlockTemplate to Block let template = template .try_into_template() .expect("invalid RPC response: proposal in response to a template request"); info!( height = ?template.height, transactions = ?template.transactions.len(), "mining with an updated block template", ); // Tell the next get_block_template() call to wait until the template has changed. parameters.long_poll_id = Some(template.long_poll_id); let block = proposal_block_from_template(&template, TimeSource::CurTime) .expect("unexpected invalid block template"); // If the template has actually changed, send an updated template. template_sender.send_if_modified(|old_block| { if old_block.as_ref().map(|b| *b.header) == Some(*block.header) { return false; } *old_block = Some(Arc::new(block)); true }); // If the blockchain is changing rapidly, limit how often we'll update the template. // But if we're shutting down, do that immediately. if !template_sender.is_closed() && !is_shutting_down() { sleep(BLOCK_TEMPLATE_REFRESH_LIMIT).await; } } Ok(()) } /// Runs a single mining thread that gets blocks from the `template_receiver`, calculates equihash /// solutions with nonces based on `solver_id`, and submits valid blocks to Zebra's block validator. /// /// This method is CPU and memory-intensive. It uses 144 MB of RAM and one CPU core while running. /// It can run for minutes or hours if the network difficulty is high. Mining uses a thread with /// low CPU priority. #[instrument(skip(template_receiver, rpc))] pub async fn run_mining_solver( solver_id: u8, mut template_receiver: WatchReceiver>>, rpc: GetBlockTemplateRpcImpl, ) -> Result<(), Report> where Mempool: Service< mempool::Request, Response = mempool::Response, Error = zebra_node_services::BoxError, > + Clone + Send + Sync + 'static, Mempool::Future: Send, State: Service< zebra_state::ReadRequest, Response = zebra_state::ReadResponse, Error = zebra_state::BoxError, > + Clone + Send + Sync + 'static, >::Future: Send, Tip: ChainTip + Clone + Send + Sync + 'static, BlockVerifierRouter: Service + Clone + Send + Sync + 'static, >::Future: Send, SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static, AddressBook: AddressBookPeers + Clone + Send + Sync + 'static, { // Shut down the task when the template sender is dropped, or Zebra shuts down. while template_receiver.has_changed().is_ok() && !is_shutting_down() { // Get the latest block template, and mark the current value as seen. // We mark the value first to avoid missed updates. template_receiver.mark_as_seen(); let template = template_receiver.cloned_watch_data(); let Some(template) = template else { if solver_id == 0 { info!( ?solver_id, ?BLOCK_TEMPLATE_WAIT_TIME, "solver waiting for initial block template" ); } else { debug!( ?solver_id, ?BLOCK_TEMPLATE_WAIT_TIME, "solver waiting for initial block template" ); } // Skip the wait if we didn't get a template because we are shutting down. if !is_shutting_down() { sleep(BLOCK_TEMPLATE_WAIT_TIME).await; } continue; }; let height = template.coinbase_height().expect("template is valid"); // Set up the cancellation conditions for the miner. let mut cancel_receiver = template_receiver.clone(); let old_header = *template.header; let cancel_fn = move || match cancel_receiver.has_changed() { // Guard against get_block_template() providing an identical header. This could happen // if something irrelevant to the block data changes, the time was within 1 second, or // there is a spurious channel change. Ok(has_changed) => { cancel_receiver.mark_as_seen(); // We only need to check header equality, because the block data is bound to the // header. if has_changed && Some(old_header) != cancel_receiver.cloned_watch_data().map(|b| *b.header) { Err(SolverCancelled) } else { Ok(()) } } // If the sender was dropped, we're likely shutting down, so cancel the solver. Err(_sender_dropped) => Err(SolverCancelled), }; // Mine at least one block using the equihash solver. let Ok(blocks) = mine_a_block(solver_id, template, cancel_fn).await else { // If the solver was cancelled, we're either shutting down, or we have a new template. if solver_id == 0 { info!( ?height, ?solver_id, new_template = ?template_receiver.has_changed(), shutting_down = ?is_shutting_down(), "solver cancelled: getting a new block template or shutting down" ); } else { debug!( ?height, ?solver_id, new_template = ?template_receiver.has_changed(), shutting_down = ?is_shutting_down(), "solver cancelled: getting a new block template or shutting down" ); } // If the blockchain is changing rapidly, limit how often we'll update the template. // But if we're shutting down, do that immediately. if template_receiver.has_changed().is_ok() && !is_shutting_down() { sleep(BLOCK_TEMPLATE_REFRESH_LIMIT).await; } continue; }; // Submit the newly mined blocks to the verifiers. // // TODO: if there is a new template (`cancel_fn().is_err()`), and // GetBlockTemplate.submit_old is false, return immediately, and skip submitting the // blocks. let mut any_success = false; for block in blocks { let data = block .zcash_serialize_to_vec() .expect("serializing to Vec never fails"); match rpc.submit_block(HexData(data), None).await { Ok(success) => { info!( ?height, hash = ?block.hash(), ?solver_id, ?success, "successfully mined a new block", ); any_success = true; } Err(error) => info!( ?height, hash = ?block.hash(), ?solver_id, ?error, "validating a newly mined block failed, trying again", ), } } // Start re-mining quickly after a failed solution. // If there's a new template, we'll use it, otherwise the existing one is ok. if !any_success { // If the blockchain is changing rapidly, limit how often we'll update the template. // But if we're shutting down, do that immediately. if template_receiver.has_changed().is_ok() && !is_shutting_down() { sleep(BLOCK_TEMPLATE_REFRESH_LIMIT).await; } continue; } // Wait for the new block to verify, and the RPC task to pick up a new template. // But don't wait too long, we could have mined on a fork. tokio::select! { shutdown_result = template_receiver.changed() => shutdown_result?, _ = sleep(BLOCK_MINING_WAIT_TIME) => {} } } Ok(()) } /// Mines one or more blocks based on `template`. Calculates equihash solutions, checks difficulty, /// and returns as soon as it has at least one block. Uses a different nonce range for each /// `solver_id`. /// /// If `cancel_fn()` returns an error, returns early with `Err(SolverCancelled)`. /// /// See [`run_mining_solver()`] for more details. pub async fn mine_a_block( solver_id: u8, template: Arc, cancel_fn: F, ) -> Result, SolverCancelled> where F: FnMut() -> Result<(), SolverCancelled> + Send + Sync + 'static, { // TODO: Replace with Arc::unwrap_or_clone() when it stabilises: // https://github.com/rust-lang/rust/issues/93610 let mut header = *template.header; // Use a different nonce for each solver thread. // Change both the first and last bytes, so we don't have to care if the nonces are incremented in // big-endian or little-endian order. And we can see the thread that mined a block from the nonce. *header.nonce.first_mut().unwrap() = solver_id; *header.nonce.last_mut().unwrap() = solver_id; // Mine one or more blocks using the solver, in a low-priority blocking thread. let span = Span::current(); let solved_headers = tokio::task::spawn_blocking(move || span.in_scope(move || { let miner_thread_handle = ThreadBuilder::default().name("zebra-miner").priority(ThreadPriority::Min).spawn(move |priority_result| { if let Err(error) = priority_result { info!(?error, "could not set miner to run at a low priority: running at default priority"); } Solution::solve(header, cancel_fn) }).expect("unable to spawn miner thread"); miner_thread_handle.wait_for_panics() })) .wait_for_panics() .await?; // Modify the template into solved blocks. // TODO: Replace with Arc::unwrap_or_clone() when it stabilises let block = (*template).clone(); let solved_blocks: Vec = solved_headers .into_iter() .map(|header| { let mut block = block.clone(); block.header = Arc::new(header); block }) .collect(); Ok(solved_blocks .try_into() .expect("a 1:1 mapping of AtLeastOne produces at least one block")) }