diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index 6f4a860a..8a998445 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -84,8 +84,8 @@ impl StartCmd { .map_err(|_| eyre!("could not send setup data to inbound service"))?; info!("initializing syncer"); - // TODO: use sync_length_receiver to activate the mempool (#2592) - let (syncer, _sync_length_receiver) = + // TODO: use sync_status to activate the mempool (#2592) + let (syncer, _sync_status) = ChainSync::new(&config, peer_set.clone(), state, chain_verifier); select! { diff --git a/zebrad/src/components/sync.rs b/zebrad/src/components/sync.rs index 333ff3ff..67f04757 100644 --- a/zebrad/src/components/sync.rs +++ b/zebrad/src/components/sync.rs @@ -5,7 +5,7 @@ use futures::{ future::FutureExt, stream::{FuturesUnordered, StreamExt}, }; -use tokio::{sync::watch, time::sleep}; +use tokio::time::sleep; use tower::{ builder::ServiceBuilder, hedge::Hedge, limit::ConcurrencyLimit, retry::Retry, timeout::Timeout, Service, ServiceExt, @@ -22,12 +22,14 @@ use crate::{config::ZebradConfig, BoxError}; mod downloads; mod recent_sync_lengths; +mod status; #[cfg(test)] mod tests; use downloads::{AlwaysHedge, Downloads}; use recent_sync_lengths::RecentSyncLengths; +pub use status::SyncStatus; /// Controls the number of peers used for each ObtainTips and ExtendTips request. const FANOUT: usize = 4; @@ -222,13 +224,8 @@ where /// - state: the zebra-state that stores the chain /// - verifier: the zebra-consensus verifier that checks the chain /// - /// Also returns a [`watch::Receiver`] endpoint for receiving recent sync lengths. - pub fn new( - config: &ZebradConfig, - peers: ZN, - state: ZS, - verifier: ZV, - ) -> (Self, watch::Receiver>) { + /// Also returns a [`SyncStatus`] to check if the syncer has likely reached the chain tip. + pub fn new(config: &ZebradConfig, peers: ZN, state: ZS, verifier: ZV) -> (Self, SyncStatus) { let tip_network = Timeout::new(peers.clone(), TIPS_RESPONSE_TIMEOUT); // The Hedge middleware is the outermost layer, hedging requests // between two retry-wrapped networks. The innermost timeout @@ -264,7 +261,7 @@ where MIN_LOOKAHEAD_LIMIT ); - let (recent_syncs, sync_length_receiver) = RecentSyncLengths::new(); + let (sync_status, recent_syncs) = SyncStatus::new(); let new_syncer = Self { genesis_hash: genesis_hash(config.network.network), @@ -276,7 +273,7 @@ where recent_syncs, }; - (new_syncer, sync_length_receiver) + (new_syncer, sync_status) } #[instrument(skip(self))] diff --git a/zebrad/src/components/sync/status.rs b/zebrad/src/components/sync/status.rs new file mode 100644 index 00000000..64db08a6 --- /dev/null +++ b/zebrad/src/components/sync/status.rs @@ -0,0 +1,49 @@ +// TODO: Remove this attribute once this type is used (#2603). +#![allow(dead_code)] + +use tokio::sync::watch; + +use super::RecentSyncLengths; + +#[cfg(test)] +mod tests; + +/// A helper type to determine if the synchronizer has likely reached the chain tip. +/// +/// This type can be used as a handle, so cloning it is cheap. +#[derive(Clone, Debug)] +pub struct SyncStatus { + latest_sync_length: watch::Receiver>, +} + +impl SyncStatus { + /// Create an instance of [`SyncStatus`]. + /// + /// The status is determined based on the latest counts of synchronized blocks, observed + /// through `latest_sync_length`. + pub fn new() -> (Self, RecentSyncLengths) { + let (recent_sync_lengths, latest_sync_length) = RecentSyncLengths::new(); + let status = SyncStatus { latest_sync_length }; + + (status, recent_sync_lengths) + } + + /// Wait until the synchronization is likely close to the tip. + /// + /// Returns an error if communication with the synchronizer is lost. + pub async fn wait_until_close_to_tip(&mut self) -> Result<(), watch::error::RecvError> { + while !self.is_close_to_tip() { + self.latest_sync_length.changed().await?; + } + + Ok(()) + } + + /// Check if the synchronization is likely close to the chain tip. + pub fn is_close_to_tip(&self) -> bool { + let _sync_lengths = self.latest_sync_length.borrow(); + + // TODO: Determine if the synchronization is actually close to the tip (#2592). + true + } +} diff --git a/zebrad/src/components/sync/status/tests.rs b/zebrad/src/components/sync/status/tests.rs new file mode 100644 index 00000000..203b6798 --- /dev/null +++ b/zebrad/src/components/sync/status/tests.rs @@ -0,0 +1,140 @@ +use std::{env, sync::Arc, time::Duration}; + +use futures::{select, FutureExt}; +use proptest::prelude::*; +use tokio::{sync::Semaphore, time::timeout}; + +use super::{super::RecentSyncLengths, SyncStatus}; + +/// The default number of test cases to run. +const DEFAULT_ASYNC_SYNCHRONIZED_TASKS_PROPTEST_CASES: u32 = 32; + +/// The maximum time one test instance should run. +/// +/// If the test exceeds this time it is considered to have failed. +const MAX_TEST_EXECUTION: Duration = Duration::from_secs(1); + +/// The maximum time to wait for an event to be received. +/// +/// If an event is not received in this time, it is considered that it will never be received. +const EVENT_TIMEOUT: Duration = Duration::from_millis(5); + +proptest! { + #![proptest_config( + proptest::test_runner::Config::with_cases(env::var("PROPTEST_CASES") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(DEFAULT_ASYNC_SYNCHRONIZED_TASKS_PROPTEST_CASES)) + )] + + /// Test if the [`SyncStatus`] correctly waits until the chain tip is reached. + /// + /// This is an asynchronous test with two concurrent tasks. The main task mocks chain sync + /// length updates and verifies if the other task was awakened by the update. + #[test] + fn waits_until_close_to_tip(sync_lengths in any::>()) { + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .expect("Failed to create Tokio runtime"); + let _guard = runtime.enter(); + + runtime.block_on(timeout(MAX_TEST_EXECUTION, root_task(sync_lengths)))??; + + /// The root task that the runtime executes. + /// + /// Spawns the two concurrent tasks, and sets up the synchronization channels between them. + async fn root_task(sync_lengths: Vec) -> Result<(), TestCaseError> { + let update_events = Arc::new(Semaphore::new(0)); + let wake_events = Arc::new(Semaphore::new(0)); + + let (status, recent_sync_lengths) = SyncStatus::new(); + + let mut wait_task_handle = tokio::spawn(wait_task( + status.clone(), + update_events.clone(), + wake_events.clone(), + )) + .fuse(); + + let mut main_task_handle = tokio::spawn(main_task( + sync_lengths, + status, + recent_sync_lengths, + update_events, + wake_events, + )) + .fuse(); + + select! { + result = main_task_handle => result.expect("Failed to wait for main test task"), + result = wait_task_handle => result.expect("Failed to wait for wait test task"), + } + } + + /// The main task. + /// + /// 1. Applies each chain sync length update from the `sync_lengths` parameter. + /// 2. If necessary, notify the other task that an update was applied. This is to avoid + /// having the other task enter an infinite loop while it thinks it has reached the + /// chain tip. + /// 3. Waits to see if the other task sends a wake event, meaning that it awoke because it + /// was notified that it has reached the chain tip. + /// 4. Compares to see if the there was an awake event and if it was expected or not based + /// on whether the [`SyncStatus`] says that it's close to the tip. + async fn main_task( + sync_lengths: Vec, + status: SyncStatus, + mut recent_sync_lengths: RecentSyncLengths, + update_events: Arc, + wake_events: Arc, + ) -> Result<(), TestCaseError> { + let mut needs_update_event = true; + + for length in sync_lengths { + recent_sync_lengths.push_extend_tips_length(length); + + if needs_update_event { + update_events.add_permits(1); + } + + let awoke = match timeout(EVENT_TIMEOUT, wake_events.acquire()).await { + Ok(permit) => { + permit.forget(); + true + } + Err(_) => false, + }; + + needs_update_event = awoke; + + assert_eq!(status.is_close_to_tip(), awoke); + } + + Ok(()) + } + + /// The helper task that repeatedly waits until the chain tip is close. + /// + /// 1. Waits for an update event granting permission to run an iteration. This avoids + /// looping repeatedly while [`SyncStatus`] reports that it is close to the chain tip. + /// 2. Waits until [`SyncStatus`] reports that it is close to the chain tip. + /// 3. Notifies the main task that it awoke, i.e., that the [`SyncStatus`] has finished + /// wating until it was close to the chain tip. + async fn wait_task( + mut status: SyncStatus, + update_events: Arc, + wake_events: Arc, + ) -> Result<(), TestCaseError> { + loop { + update_events.acquire().await.forget(); + + if status.wait_until_close_to_tip().await.is_err() { + return Ok(()); + } + + wake_events.add_permits(1); + } + } + } +}