feat(elasticsearch): Export block data to elasticsearch database (#6274)
* add initial ES support * hide elasticsearch behind feture, add config * make the builds and clippy happy * move ES code to a function * change database name * fix database name and panic * increase close to tip interval * update deny for elastic * remove a block clone * try to fix builds * fix builds 2 * refactor some imports and unwraps * typo * change argument of elasticsearch function * move elastic call to the end of `commit_finalized_direct` * get height from block
This commit is contained in:
parent
47cf0f475f
commit
4dedffebbc
127
Cargo.lock
127
Cargo.lock
|
|
@ -164,6 +164,19 @@ version = "0.7.2"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6"
|
checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "async-compression"
|
||||||
|
version = "0.3.15"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "942c7cd7ae39e91bde4820d74132e9862e62c2f386c3aa90ccf55949f5bad63a"
|
||||||
|
dependencies = [
|
||||||
|
"flate2",
|
||||||
|
"futures-core",
|
||||||
|
"memchr",
|
||||||
|
"pin-project-lite",
|
||||||
|
"tokio",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "async-stream"
|
name = "async-stream"
|
||||||
version = "0.3.2"
|
version = "0.3.2"
|
||||||
|
|
@ -273,6 +286,12 @@ dependencies = [
|
||||||
"rustc-demangle",
|
"rustc-demangle",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "base64"
|
||||||
|
version = "0.11.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "b41b7ea54a0c9d92199de89e20e58d49f02f8e699814ef3fdf266f6f748d15c7"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "base64"
|
name = "base64"
|
||||||
version = "0.13.0"
|
version = "0.13.0"
|
||||||
|
|
@ -1041,6 +1060,16 @@ dependencies = [
|
||||||
"darling_macro 0.10.2",
|
"darling_macro 0.10.2",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "darling"
|
||||||
|
version = "0.13.4"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "a01d95850c592940db9b8194bc39f4bc0e89dee5c4265e4b1807c34a9aba453c"
|
||||||
|
dependencies = [
|
||||||
|
"darling_core 0.13.4",
|
||||||
|
"darling_macro 0.13.4",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "darling"
|
name = "darling"
|
||||||
version = "0.14.1"
|
version = "0.14.1"
|
||||||
|
|
@ -1065,6 +1094,20 @@ dependencies = [
|
||||||
"syn 1.0.104",
|
"syn 1.0.104",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "darling_core"
|
||||||
|
version = "0.13.4"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "859d65a907b6852c9361e3185c862aae7fafd2887876799fa55f5f99dc40d610"
|
||||||
|
dependencies = [
|
||||||
|
"fnv",
|
||||||
|
"ident_case",
|
||||||
|
"proc-macro2 1.0.47",
|
||||||
|
"quote 1.0.20",
|
||||||
|
"strsim 0.10.0",
|
||||||
|
"syn 1.0.104",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "darling_core"
|
name = "darling_core"
|
||||||
version = "0.14.1"
|
version = "0.14.1"
|
||||||
|
|
@ -1090,6 +1133,17 @@ dependencies = [
|
||||||
"syn 1.0.104",
|
"syn 1.0.104",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "darling_macro"
|
||||||
|
version = "0.13.4"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "9c972679f83bdf9c42bd905396b6c3588a843a17f0f16dfcfa3e2c5d57441835"
|
||||||
|
dependencies = [
|
||||||
|
"darling_core 0.13.4",
|
||||||
|
"quote 1.0.20",
|
||||||
|
"syn 1.0.104",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "darling_macro"
|
name = "darling_macro"
|
||||||
version = "0.14.1"
|
version = "0.14.1"
|
||||||
|
|
@ -1179,6 +1233,12 @@ dependencies = [
|
||||||
"syn 1.0.104",
|
"syn 1.0.104",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "dyn-clone"
|
||||||
|
version = "1.0.11"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "68b0cf012f1230e43cd00ebb729c6bb58707ecfa8ad08b52ef3a4ccd2697fc30"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ed25519-zebra"
|
name = "ed25519-zebra"
|
||||||
version = "3.1.0"
|
version = "3.1.0"
|
||||||
|
|
@ -1200,6 +1260,26 @@ version = "1.6.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457"
|
checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "elasticsearch"
|
||||||
|
version = "8.5.0-alpha.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "40d9bd57d914cc66ce878f098f63ed7b5d5b64c30644a5adb950b008f874a6c6"
|
||||||
|
dependencies = [
|
||||||
|
"base64 0.11.0",
|
||||||
|
"bytes",
|
||||||
|
"dyn-clone",
|
||||||
|
"lazy_static",
|
||||||
|
"percent-encoding",
|
||||||
|
"reqwest",
|
||||||
|
"rustc_version 0.2.3",
|
||||||
|
"serde",
|
||||||
|
"serde_json",
|
||||||
|
"serde_with 1.14.0",
|
||||||
|
"url",
|
||||||
|
"void",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "encode_unicode"
|
name = "encode_unicode"
|
||||||
version = "0.3.6"
|
version = "0.3.6"
|
||||||
|
|
@ -3472,6 +3552,7 @@ version = "0.11.14"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "21eed90ec8570952d53b772ecf8f206aa1ec9a3d76b2521c56c42973f2d91ee9"
|
checksum = "21eed90ec8570952d53b772ecf8f206aa1ec9a3d76b2521c56c42973f2d91ee9"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"async-compression",
|
||||||
"base64 0.21.0",
|
"base64 0.21.0",
|
||||||
"bytes",
|
"bytes",
|
||||||
"encoding_rs",
|
"encoding_rs",
|
||||||
|
|
@ -3499,6 +3580,7 @@ dependencies = [
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-native-tls",
|
"tokio-native-tls",
|
||||||
"tokio-rustls",
|
"tokio-rustls",
|
||||||
|
"tokio-util 0.7.7",
|
||||||
"tower-service",
|
"tower-service",
|
||||||
"url",
|
"url",
|
||||||
"wasm-bindgen",
|
"wasm-bindgen",
|
||||||
|
|
@ -3589,6 +3671,15 @@ version = "2.1.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "3e75f6a532d0fd9f7f13144f392b6ad56a32696bfcd9c78f797f16bbb6f072d6"
|
checksum = "3e75f6a532d0fd9f7f13144f392b6ad56a32696bfcd9c78f797f16bbb6f072d6"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "rustc_version"
|
||||||
|
version = "0.2.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a"
|
||||||
|
dependencies = [
|
||||||
|
"semver 0.9.0",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "rustc_version"
|
name = "rustc_version"
|
||||||
version = "0.4.0"
|
version = "0.4.0"
|
||||||
|
|
@ -3805,7 +3896,7 @@ dependencies = [
|
||||||
"hostname",
|
"hostname",
|
||||||
"libc",
|
"libc",
|
||||||
"os_info",
|
"os_info",
|
||||||
"rustc_version",
|
"rustc_version 0.4.0",
|
||||||
"sentry-core",
|
"sentry-core",
|
||||||
"uname",
|
"uname",
|
||||||
]
|
]
|
||||||
|
|
@ -3913,6 +4004,16 @@ dependencies = [
|
||||||
"serde",
|
"serde",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "serde_with"
|
||||||
|
version = "1.14.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "678b5a069e50bf00ecd22d0cd8ddf7c236f68581b03db652061ed5eb13a312ff"
|
||||||
|
dependencies = [
|
||||||
|
"serde",
|
||||||
|
"serde_with_macros 1.5.2",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "serde_with"
|
name = "serde_with"
|
||||||
version = "2.3.0"
|
version = "2.3.0"
|
||||||
|
|
@ -3925,10 +4026,22 @@ dependencies = [
|
||||||
"indexmap",
|
"indexmap",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"serde_with_macros",
|
"serde_with_macros 2.3.0",
|
||||||
"time 0.3.17",
|
"time 0.3.17",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "serde_with_macros"
|
||||||
|
version = "1.5.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "e182d6ec6f05393cc0e5ed1bf81ad6db3a8feedf8ee515ecdd369809bcce8082"
|
||||||
|
dependencies = [
|
||||||
|
"darling 0.13.4",
|
||||||
|
"proc-macro2 1.0.47",
|
||||||
|
"quote 1.0.20",
|
||||||
|
"syn 1.0.104",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "serde_with_macros"
|
name = "serde_with_macros"
|
||||||
version = "2.3.0"
|
version = "2.3.0"
|
||||||
|
|
@ -4939,6 +5052,12 @@ version = "0.9.4"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
|
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "void"
|
||||||
|
version = "1.0.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "wait-timeout"
|
name = "wait-timeout"
|
||||||
version = "0.2.0"
|
version = "0.2.0"
|
||||||
|
|
@ -5529,7 +5648,7 @@ dependencies = [
|
||||||
"secp256k1",
|
"secp256k1",
|
||||||
"serde",
|
"serde",
|
||||||
"serde-big-array",
|
"serde-big-array",
|
||||||
"serde_with",
|
"serde_with 2.3.0",
|
||||||
"sha2",
|
"sha2",
|
||||||
"spandoc",
|
"spandoc",
|
||||||
"static_assertions",
|
"static_assertions",
|
||||||
|
|
@ -5690,6 +5809,7 @@ dependencies = [
|
||||||
"chrono",
|
"chrono",
|
||||||
"color-eyre",
|
"color-eyre",
|
||||||
"dirs",
|
"dirs",
|
||||||
|
"elasticsearch",
|
||||||
"futures",
|
"futures",
|
||||||
"halo2_proofs",
|
"halo2_proofs",
|
||||||
"hex",
|
"hex",
|
||||||
|
|
@ -5708,6 +5828,7 @@ dependencies = [
|
||||||
"rlimit",
|
"rlimit",
|
||||||
"rocksdb",
|
"rocksdb",
|
||||||
"serde",
|
"serde",
|
||||||
|
"serde_json",
|
||||||
"spandoc",
|
"spandoc",
|
||||||
"tempfile",
|
"tempfile",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
|
|
|
||||||
17
deny.toml
17
deny.toml
|
|
@ -94,6 +94,23 @@ skip-tree = [
|
||||||
{ name = "darling", version = "=0.10.2" },
|
{ name = "darling", version = "=0.10.2" },
|
||||||
{ name = "semver", version = "=0.9.0" },
|
{ name = "semver", version = "=0.9.0" },
|
||||||
{ name = "tracing-subscriber", version = "=0.1.6" },
|
{ name = "tracing-subscriber", version = "=0.1.6" },
|
||||||
|
|
||||||
|
# Elasticsearch dependencies
|
||||||
|
|
||||||
|
# wait for elasticsearch to update.
|
||||||
|
{ name = "base64", version = "=0.11.0" },
|
||||||
|
# wait for elasticsearch to update.
|
||||||
|
{ name = "darling", version = "=0.13.4" },
|
||||||
|
# wait for elasticsearch to update.
|
||||||
|
{ name = "darling_core", version = "=0.13.4" },
|
||||||
|
# wait for elasticsearch to update.
|
||||||
|
{ name = "darling_macro", version = "=0.13.4" },
|
||||||
|
# wait for elasticsearch to update.
|
||||||
|
{ name = "rustc_version", version = "=0.2.3" },
|
||||||
|
# wait for elasticsearch to update.
|
||||||
|
{ name = "serde_with", version = "=1.14.0" },
|
||||||
|
# wait for elasticsearch to update.
|
||||||
|
{ name = "serde_with_macros", version = "=1.5.2" },
|
||||||
]
|
]
|
||||||
|
|
||||||
# This section is considered when running `cargo deny check sources`.
|
# This section is considered when running `cargo deny check sources`.
|
||||||
|
|
|
||||||
|
|
@ -22,6 +22,12 @@ proptest-impl = [
|
||||||
"zebra-chain/proptest-impl"
|
"zebra-chain/proptest-impl"
|
||||||
]
|
]
|
||||||
|
|
||||||
|
# Experimental elasticsearch support
|
||||||
|
elasticsearch = [
|
||||||
|
"dep:elasticsearch",
|
||||||
|
"dep:serde_json",
|
||||||
|
]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
bincode = "1.3.3"
|
bincode = "1.3.3"
|
||||||
chrono = { version = "0.4.23", default-features = false, features = ["clock", "std"] }
|
chrono = { version = "0.4.23", default-features = false, features = ["clock", "std"] }
|
||||||
|
|
@ -45,6 +51,10 @@ tokio = { version = "1.26.0", features = ["sync", "tracing"] }
|
||||||
tower = { version = "0.4.13", features = ["buffer", "util"] }
|
tower = { version = "0.4.13", features = ["buffer", "util"] }
|
||||||
tracing = "0.1.37"
|
tracing = "0.1.37"
|
||||||
|
|
||||||
|
# elasticsearch specific dependencies.
|
||||||
|
elasticsearch = { version = "8.5.0-alpha.1", package = "elasticsearch", optional = true }
|
||||||
|
serde_json = { version = "1.0.93", package = "serde_json", optional = true }
|
||||||
|
|
||||||
zebra-chain = { path = "../zebra-chain" }
|
zebra-chain = { path = "../zebra-chain" }
|
||||||
zebra-test = { path = "../zebra-test/", optional = true }
|
zebra-test = { path = "../zebra-test/", optional = true }
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -71,6 +71,18 @@ pub struct Config {
|
||||||
/// no check for old database versions will be made and nothing will be
|
/// no check for old database versions will be made and nothing will be
|
||||||
/// deleted.
|
/// deleted.
|
||||||
pub delete_old_database: bool,
|
pub delete_old_database: bool,
|
||||||
|
|
||||||
|
#[cfg(feature = "elasticsearch")]
|
||||||
|
/// The elasticsearch database url.
|
||||||
|
pub elasticsearch_url: String,
|
||||||
|
|
||||||
|
#[cfg(feature = "elasticsearch")]
|
||||||
|
/// The elasticsearch database username.
|
||||||
|
pub elasticsearch_username: String,
|
||||||
|
|
||||||
|
#[cfg(feature = "elasticsearch")]
|
||||||
|
/// The elasticsearch database password.
|
||||||
|
pub elasticsearch_password: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn gen_temp_path(prefix: &str) -> PathBuf {
|
fn gen_temp_path(prefix: &str) -> PathBuf {
|
||||||
|
|
@ -123,6 +135,12 @@ impl Default for Config {
|
||||||
ephemeral: false,
|
ephemeral: false,
|
||||||
debug_stop_at_height: None,
|
debug_stop_at_height: None,
|
||||||
delete_old_database: true,
|
delete_old_database: true,
|
||||||
|
#[cfg(feature = "elasticsearch")]
|
||||||
|
elasticsearch_url: "https://localhost:9200".to_string(),
|
||||||
|
#[cfg(feature = "elasticsearch")]
|
||||||
|
elasticsearch_username: "elastic".to_string(),
|
||||||
|
#[cfg(feature = "elasticsearch")]
|
||||||
|
elasticsearch_password: "".to_string(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -24,6 +24,15 @@ use std::{
|
||||||
time::{Duration, Instant},
|
time::{Duration, Instant},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
#[cfg(feature = "elasticsearch")]
|
||||||
|
use elasticsearch::{
|
||||||
|
auth::Credentials::Basic,
|
||||||
|
cert::CertificateValidation,
|
||||||
|
http::transport::{SingleNodeConnectionPool, TransportBuilder},
|
||||||
|
http::Url,
|
||||||
|
Elasticsearch,
|
||||||
|
};
|
||||||
|
|
||||||
use futures::future::FutureExt;
|
use futures::future::FutureExt;
|
||||||
use tokio::sync::{oneshot, watch};
|
use tokio::sync::{oneshot, watch};
|
||||||
use tower::{util::BoxService, Service, ServiceExt};
|
use tower::{util::BoxService, Service, ServiceExt};
|
||||||
|
|
@ -308,7 +317,28 @@ impl StateService {
|
||||||
) -> (Self, ReadStateService, LatestChainTip, ChainTipChange) {
|
) -> (Self, ReadStateService, LatestChainTip, ChainTipChange) {
|
||||||
let timer = CodeTimer::start();
|
let timer = CodeTimer::start();
|
||||||
|
|
||||||
let finalized_state = FinalizedState::new(&config, network);
|
#[cfg(feature = "elasticsearch")]
|
||||||
|
let finalized_state = {
|
||||||
|
let conn_pool = SingleNodeConnectionPool::new(
|
||||||
|
Url::parse(config.elasticsearch_url.as_str())
|
||||||
|
.expect("configured elasticsearch url is invalid"),
|
||||||
|
);
|
||||||
|
let transport = TransportBuilder::new(conn_pool)
|
||||||
|
.cert_validation(CertificateValidation::None)
|
||||||
|
.auth(Basic(
|
||||||
|
config.clone().elasticsearch_username,
|
||||||
|
config.clone().elasticsearch_password,
|
||||||
|
))
|
||||||
|
.build()
|
||||||
|
.expect("elasticsearch transport builder should not fail");
|
||||||
|
let elastic_db = Some(Elasticsearch::new(transport));
|
||||||
|
|
||||||
|
FinalizedState::new(&config, network, elastic_db)
|
||||||
|
};
|
||||||
|
|
||||||
|
#[cfg(not(feature = "elasticsearch"))]
|
||||||
|
let finalized_state = { FinalizedState::new(&config, network) };
|
||||||
|
|
||||||
timer.finish(module_path!(), line!(), "opening finalized state database");
|
timer.finish(module_path!(), line!(), "opening finalized state database");
|
||||||
|
|
||||||
let timer = CodeTimer::start();
|
let timer = CodeTimer::start();
|
||||||
|
|
|
||||||
|
|
@ -53,7 +53,7 @@ pub use disk_db::{DiskWriteBatch, WriteDisk};
|
||||||
///
|
///
|
||||||
/// This is different from `NonFinalizedState::clone()`,
|
/// This is different from `NonFinalizedState::clone()`,
|
||||||
/// which returns an independent copy of the chains.
|
/// which returns an independent copy of the chains.
|
||||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct FinalizedState {
|
pub struct FinalizedState {
|
||||||
// Configuration
|
// Configuration
|
||||||
//
|
//
|
||||||
|
|
@ -78,14 +78,36 @@ pub struct FinalizedState {
|
||||||
/// so this database object can be freely cloned.
|
/// so this database object can be freely cloned.
|
||||||
/// The last instance that is dropped will close the underlying database.
|
/// The last instance that is dropped will close the underlying database.
|
||||||
pub db: ZebraDb,
|
pub db: ZebraDb,
|
||||||
|
|
||||||
|
#[cfg(feature = "elasticsearch")]
|
||||||
|
/// The elasticsearch handle.
|
||||||
|
pub elastic_db: Option<elasticsearch::Elasticsearch>,
|
||||||
|
|
||||||
|
#[cfg(feature = "elasticsearch")]
|
||||||
|
/// A collection of blocks to be sent to elasticsearch as a bulk.
|
||||||
|
pub elastic_blocks: Vec<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FinalizedState {
|
impl FinalizedState {
|
||||||
/// Returns an on-disk database instance for `config` and `network`.
|
/// Returns an on-disk database instance for `config` and `network`.
|
||||||
/// If there is no existing database, creates a new database on disk.
|
/// If there is no existing database, creates a new database on disk.
|
||||||
pub fn new(config: &Config, network: Network) -> Self {
|
pub fn new(
|
||||||
|
config: &Config,
|
||||||
|
network: Network,
|
||||||
|
#[cfg(feature = "elasticsearch")] elastic_db: Option<elasticsearch::Elasticsearch>,
|
||||||
|
) -> Self {
|
||||||
let db = ZebraDb::new(config, network);
|
let db = ZebraDb::new(config, network);
|
||||||
|
|
||||||
|
#[cfg(feature = "elasticsearch")]
|
||||||
|
let new_state = Self {
|
||||||
|
network,
|
||||||
|
debug_stop_at_height: config.debug_stop_at_height.map(block::Height),
|
||||||
|
db,
|
||||||
|
elastic_db,
|
||||||
|
elastic_blocks: vec![],
|
||||||
|
};
|
||||||
|
|
||||||
|
#[cfg(not(feature = "elasticsearch"))]
|
||||||
let new_state = Self {
|
let new_state = Self {
|
||||||
network,
|
network,
|
||||||
debug_stop_at_height: config.debug_stop_at_height.map(block::Height),
|
debug_stop_at_height: config.debug_stop_at_height.map(block::Height),
|
||||||
|
|
@ -290,6 +312,9 @@ impl FinalizedState {
|
||||||
let finalized_height = finalized.height;
|
let finalized_height = finalized.height;
|
||||||
let finalized_hash = finalized.hash;
|
let finalized_hash = finalized.hash;
|
||||||
|
|
||||||
|
#[cfg(feature = "elasticsearch")]
|
||||||
|
let finalized_block = finalized.block.clone();
|
||||||
|
|
||||||
let result = self.db.write_block(
|
let result = self.db.write_block(
|
||||||
finalized,
|
finalized,
|
||||||
history_tree,
|
history_tree,
|
||||||
|
|
@ -298,29 +323,102 @@ impl FinalizedState {
|
||||||
source,
|
source,
|
||||||
);
|
);
|
||||||
|
|
||||||
// TODO: move the stop height check to the syncer (#3442)
|
if result.is_ok() {
|
||||||
if result.is_ok() && self.is_at_stop_height(finalized_height) {
|
// Save blocks to elasticsearch if the feature is enabled.
|
||||||
tracing::info!(
|
#[cfg(feature = "elasticsearch")]
|
||||||
height = ?finalized_height,
|
self.elasticsearch(&finalized_block);
|
||||||
hash = ?finalized_hash,
|
|
||||||
block_source = ?source,
|
|
||||||
"stopping at configured height, flushing database to disk"
|
|
||||||
);
|
|
||||||
|
|
||||||
// We're just about to do a forced exit, so it's ok to do a forced db shutdown
|
// TODO: move the stop height check to the syncer (#3442)
|
||||||
self.db.shutdown(true);
|
if self.is_at_stop_height(finalized_height) {
|
||||||
|
tracing::info!(
|
||||||
|
height = ?finalized_height,
|
||||||
|
hash = ?finalized_hash,
|
||||||
|
block_source = ?source,
|
||||||
|
"stopping at configured height, flushing database to disk"
|
||||||
|
);
|
||||||
|
|
||||||
// Drops tracing log output that's hasn't already been written to stdout
|
// We're just about to do a forced exit, so it's ok to do a forced db shutdown
|
||||||
// since this exits before calling drop on the WorkerGuard for the logger thread.
|
self.db.shutdown(true);
|
||||||
// This is okay for now because this is test-only code
|
|
||||||
//
|
// Drops tracing log output that's hasn't already been written to stdout
|
||||||
// TODO: Call ZebradApp.shutdown or drop its Tracing component before calling exit_process to flush logs to stdout
|
// since this exits before calling drop on the WorkerGuard for the logger thread.
|
||||||
Self::exit_process();
|
// This is okay for now because this is test-only code
|
||||||
|
//
|
||||||
|
// TODO: Call ZebradApp.shutdown or drop its Tracing component before calling exit_process to flush logs to stdout
|
||||||
|
Self::exit_process();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
result
|
result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "elasticsearch")]
|
||||||
|
/// Store finalized blocks into an elasticsearch database.
|
||||||
|
///
|
||||||
|
/// We use the elasticsearch bulk api to index multiple blocks at a time while we are
|
||||||
|
/// synchronizing the chain, when we get close to tip we index blocks one by one.
|
||||||
|
pub fn elasticsearch(&mut self, block: &Arc<block::Block>) {
|
||||||
|
if let Some(client) = self.elastic_db.clone() {
|
||||||
|
let block_time = block.header.time.timestamp();
|
||||||
|
let local_time = chrono::Utc::now().timestamp();
|
||||||
|
|
||||||
|
const AWAY_FROM_TIP_BULK_SIZE: usize = 800;
|
||||||
|
const CLOSE_TO_TIP_BULK_SIZE: usize = 2;
|
||||||
|
const CLOSE_TO_TIP_SECONDS: i64 = 14400; // 4 hours
|
||||||
|
|
||||||
|
// If we are close to the tip index one block per bulk call.
|
||||||
|
let mut blocks_size_to_dump = AWAY_FROM_TIP_BULK_SIZE;
|
||||||
|
if local_time - block_time < CLOSE_TO_TIP_SECONDS {
|
||||||
|
blocks_size_to_dump = CLOSE_TO_TIP_BULK_SIZE;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Insert the operation line.
|
||||||
|
let height_number = block.coinbase_height().unwrap_or(block::Height(0)).0;
|
||||||
|
self.elastic_blocks.push(
|
||||||
|
serde_json::json!({
|
||||||
|
"index": {
|
||||||
|
"_id": height_number.to_string().as_str()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.to_string(),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Insert the block itself.
|
||||||
|
self.elastic_blocks
|
||||||
|
.push(serde_json::json!(block).to_string());
|
||||||
|
|
||||||
|
// We are in bulk time, insert to ES all we have.
|
||||||
|
if self.elastic_blocks.len() >= blocks_size_to_dump {
|
||||||
|
let rt = tokio::runtime::Runtime::new()
|
||||||
|
.expect("runtime creation for elasticsearch should not fail.");
|
||||||
|
let blocks = self.elastic_blocks.clone();
|
||||||
|
let network = self.network;
|
||||||
|
|
||||||
|
rt.block_on(async move {
|
||||||
|
let response = client
|
||||||
|
.bulk(elasticsearch::BulkParts::Index(
|
||||||
|
format!("zcash_{}", network.to_string().to_lowercase()).as_str(),
|
||||||
|
))
|
||||||
|
.body(blocks)
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.expect("ES Request should never fail");
|
||||||
|
|
||||||
|
// Make sure no errors ever.
|
||||||
|
let response_body = response
|
||||||
|
.json::<serde_json::Value>()
|
||||||
|
.await
|
||||||
|
.expect("ES response parsing to a json_body should never fail");
|
||||||
|
let errors = response_body["errors"].as_bool().unwrap_or(true);
|
||||||
|
assert!(!errors, "{}", format!("ES error: {response_body}"));
|
||||||
|
});
|
||||||
|
|
||||||
|
// clean the block storage.
|
||||||
|
self.elastic_blocks.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Stop the process if `block_height` is greater than or equal to the
|
/// Stop the process if `block_height` is greater than or equal to the
|
||||||
/// configured stop height.
|
/// configured stop height.
|
||||||
fn is_at_stop_height(&self, block_height: block::Height) -> bool {
|
fn is_at_stop_height(&self, block_height: block::Height) -> bool {
|
||||||
|
|
|
||||||
|
|
@ -62,7 +62,12 @@ fn test_raw_rocksdb_column_families_with_network(network: Network) {
|
||||||
let mut net_suffix = network.to_string();
|
let mut net_suffix = network.to_string();
|
||||||
net_suffix.make_ascii_lowercase();
|
net_suffix.make_ascii_lowercase();
|
||||||
|
|
||||||
let mut state = FinalizedState::new(&Config::ephemeral(), network);
|
let mut state = FinalizedState::new(
|
||||||
|
&Config::ephemeral(),
|
||||||
|
network,
|
||||||
|
#[cfg(feature = "elasticsearch")]
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
|
||||||
// Snapshot the column family names
|
// Snapshot the column family names
|
||||||
let mut cf_names = state.db.list_cf().expect("empty database is valid");
|
let mut cf_names = state.db.list_cf().expect("empty database is valid");
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,7 @@ fn blocks_with_v5_transactions() -> Result<()> {
|
||||||
.and_then(|v| v.parse().ok())
|
.and_then(|v| v.parse().ok())
|
||||||
.unwrap_or(DEFAULT_PARTIAL_CHAIN_PROPTEST_CASES)),
|
.unwrap_or(DEFAULT_PARTIAL_CHAIN_PROPTEST_CASES)),
|
||||||
|((chain, count, network, _history_tree) in PreparedChain::default())| {
|
|((chain, count, network, _history_tree) in PreparedChain::default())| {
|
||||||
let mut state = FinalizedState::new(&Config::ephemeral(), network);
|
let mut state = FinalizedState::new(&Config::ephemeral(), network, #[cfg(feature = "elasticsearch")] None);
|
||||||
let mut height = Height(0);
|
let mut height = Height(0);
|
||||||
// use `count` to minimize test failures, so they are easier to diagnose
|
// use `count` to minimize test failures, so they are easier to diagnose
|
||||||
for block in chain.iter().take(count) {
|
for block in chain.iter().take(count) {
|
||||||
|
|
@ -65,7 +65,7 @@ fn all_upgrades_and_wrong_commitments_with_fake_activation_heights() -> Result<(
|
||||||
.unwrap_or(DEFAULT_PARTIAL_CHAIN_PROPTEST_CASES)),
|
.unwrap_or(DEFAULT_PARTIAL_CHAIN_PROPTEST_CASES)),
|
||||||
|((chain, _count, network, _history_tree) in PreparedChain::default().with_valid_commitments().no_shrink())| {
|
|((chain, _count, network, _history_tree) in PreparedChain::default().with_valid_commitments().no_shrink())| {
|
||||||
|
|
||||||
let mut state = FinalizedState::new(&Config::ephemeral(), network);
|
let mut state = FinalizedState::new(&Config::ephemeral(), network, #[cfg(feature = "elasticsearch")] None);
|
||||||
let mut height = Height(0);
|
let mut height = Height(0);
|
||||||
let heartwood_height = NetworkUpgrade::Heartwood.activation_height(network).unwrap();
|
let heartwood_height = NetworkUpgrade::Heartwood.activation_height(network).unwrap();
|
||||||
let heartwood_height_plus1 = (heartwood_height + 1).unwrap();
|
let heartwood_height_plus1 = (heartwood_height + 1).unwrap();
|
||||||
|
|
|
||||||
|
|
@ -165,7 +165,12 @@ fn test_block_and_transaction_data_with_network(network: Network) {
|
||||||
let mut net_suffix = network.to_string();
|
let mut net_suffix = network.to_string();
|
||||||
net_suffix.make_ascii_lowercase();
|
net_suffix.make_ascii_lowercase();
|
||||||
|
|
||||||
let mut state = FinalizedState::new(&Config::ephemeral(), network);
|
let mut state = FinalizedState::new(
|
||||||
|
&Config::ephemeral(),
|
||||||
|
network,
|
||||||
|
#[cfg(feature = "elasticsearch")]
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
|
||||||
// Assert that empty databases are the same, regardless of the network.
|
// Assert that empty databases are the same, regardless of the network.
|
||||||
let mut settings = insta::Settings::clone_current();
|
let mut settings = insta::Settings::clone_current();
|
||||||
|
|
|
||||||
|
|
@ -77,7 +77,12 @@ fn test_block_db_round_trip_with(
|
||||||
) {
|
) {
|
||||||
let _init_guard = zebra_test::init();
|
let _init_guard = zebra_test::init();
|
||||||
|
|
||||||
let state = FinalizedState::new(&Config::ephemeral(), network);
|
let state = FinalizedState::new(
|
||||||
|
&Config::ephemeral(),
|
||||||
|
network,
|
||||||
|
#[cfg(feature = "elasticsearch")]
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
|
||||||
// Check that each block round-trips to the database
|
// Check that each block round-trips to the database
|
||||||
for original_block in block_test_cases.into_iter() {
|
for original_block in block_test_cases.into_iter() {
|
||||||
|
|
|
||||||
|
|
@ -478,7 +478,7 @@ fn rejection_restores_internal_state_genesis() -> Result<()> {
|
||||||
}
|
}
|
||||||
))| {
|
))| {
|
||||||
let mut state = NonFinalizedState::new(network);
|
let mut state = NonFinalizedState::new(network);
|
||||||
let finalized_state = FinalizedState::new(&Config::ephemeral(), network);
|
let finalized_state = FinalizedState::new(&Config::ephemeral(), network, #[cfg(feature = "elasticsearch")] None);
|
||||||
|
|
||||||
let fake_value_pool = ValueBalance::<NonNegative>::fake_populated_pool();
|
let fake_value_pool = ValueBalance::<NonNegative>::fake_populated_pool();
|
||||||
finalized_state.set_finalized_value_pool(fake_value_pool);
|
finalized_state.set_finalized_value_pool(fake_value_pool);
|
||||||
|
|
|
||||||
|
|
@ -156,7 +156,12 @@ fn best_chain_wins_for_network(network: Network) -> Result<()> {
|
||||||
let expected_hash = block2.hash();
|
let expected_hash = block2.hash();
|
||||||
|
|
||||||
let mut state = NonFinalizedState::new(network);
|
let mut state = NonFinalizedState::new(network);
|
||||||
let finalized_state = FinalizedState::new(&Config::ephemeral(), network);
|
let finalized_state = FinalizedState::new(
|
||||||
|
&Config::ephemeral(),
|
||||||
|
network,
|
||||||
|
#[cfg(feature = "elasticsearch")]
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
|
||||||
state.commit_new_chain(block2.prepare(), &finalized_state)?;
|
state.commit_new_chain(block2.prepare(), &finalized_state)?;
|
||||||
state.commit_new_chain(child.prepare(), &finalized_state)?;
|
state.commit_new_chain(child.prepare(), &finalized_state)?;
|
||||||
|
|
@ -194,7 +199,12 @@ fn finalize_pops_from_best_chain_for_network(network: Network) -> Result<()> {
|
||||||
let child = block1.make_fake_child().set_work(1);
|
let child = block1.make_fake_child().set_work(1);
|
||||||
|
|
||||||
let mut state = NonFinalizedState::new(network);
|
let mut state = NonFinalizedState::new(network);
|
||||||
let finalized_state = FinalizedState::new(&Config::ephemeral(), network);
|
let finalized_state = FinalizedState::new(
|
||||||
|
&Config::ephemeral(),
|
||||||
|
network,
|
||||||
|
#[cfg(feature = "elasticsearch")]
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
|
||||||
let fake_value_pool = ValueBalance::<NonNegative>::fake_populated_pool();
|
let fake_value_pool = ValueBalance::<NonNegative>::fake_populated_pool();
|
||||||
finalized_state.set_finalized_value_pool(fake_value_pool);
|
finalized_state.set_finalized_value_pool(fake_value_pool);
|
||||||
|
|
@ -247,7 +257,12 @@ fn commit_block_extending_best_chain_doesnt_drop_worst_chains_for_network(
|
||||||
let child2 = block2.make_fake_child().set_work(1);
|
let child2 = block2.make_fake_child().set_work(1);
|
||||||
|
|
||||||
let mut state = NonFinalizedState::new(network);
|
let mut state = NonFinalizedState::new(network);
|
||||||
let finalized_state = FinalizedState::new(&Config::ephemeral(), network);
|
let finalized_state = FinalizedState::new(
|
||||||
|
&Config::ephemeral(),
|
||||||
|
network,
|
||||||
|
#[cfg(feature = "elasticsearch")]
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
|
||||||
let fake_value_pool = ValueBalance::<NonNegative>::fake_populated_pool();
|
let fake_value_pool = ValueBalance::<NonNegative>::fake_populated_pool();
|
||||||
finalized_state.set_finalized_value_pool(fake_value_pool);
|
finalized_state.set_finalized_value_pool(fake_value_pool);
|
||||||
|
|
@ -294,7 +309,12 @@ fn shorter_chain_can_be_best_chain_for_network(network: Network) -> Result<()> {
|
||||||
let short_chain_block = block1.make_fake_child().set_work(3);
|
let short_chain_block = block1.make_fake_child().set_work(3);
|
||||||
|
|
||||||
let mut state = NonFinalizedState::new(network);
|
let mut state = NonFinalizedState::new(network);
|
||||||
let finalized_state = FinalizedState::new(&Config::ephemeral(), network);
|
let finalized_state = FinalizedState::new(
|
||||||
|
&Config::ephemeral(),
|
||||||
|
network,
|
||||||
|
#[cfg(feature = "elasticsearch")]
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
|
||||||
let fake_value_pool = ValueBalance::<NonNegative>::fake_populated_pool();
|
let fake_value_pool = ValueBalance::<NonNegative>::fake_populated_pool();
|
||||||
finalized_state.set_finalized_value_pool(fake_value_pool);
|
finalized_state.set_finalized_value_pool(fake_value_pool);
|
||||||
|
|
@ -341,7 +361,12 @@ fn longer_chain_with_more_work_wins_for_network(network: Network) -> Result<()>
|
||||||
let short_chain_block = block1.make_fake_child().set_work(3);
|
let short_chain_block = block1.make_fake_child().set_work(3);
|
||||||
|
|
||||||
let mut state = NonFinalizedState::new(network);
|
let mut state = NonFinalizedState::new(network);
|
||||||
let finalized_state = FinalizedState::new(&Config::ephemeral(), network);
|
let finalized_state = FinalizedState::new(
|
||||||
|
&Config::ephemeral(),
|
||||||
|
network,
|
||||||
|
#[cfg(feature = "elasticsearch")]
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
|
||||||
let fake_value_pool = ValueBalance::<NonNegative>::fake_populated_pool();
|
let fake_value_pool = ValueBalance::<NonNegative>::fake_populated_pool();
|
||||||
finalized_state.set_finalized_value_pool(fake_value_pool);
|
finalized_state.set_finalized_value_pool(fake_value_pool);
|
||||||
|
|
@ -386,7 +411,12 @@ fn equal_length_goes_to_more_work_for_network(network: Network) -> Result<()> {
|
||||||
let expected_hash = more_work_child.hash();
|
let expected_hash = more_work_child.hash();
|
||||||
|
|
||||||
let mut state = NonFinalizedState::new(network);
|
let mut state = NonFinalizedState::new(network);
|
||||||
let finalized_state = FinalizedState::new(&Config::ephemeral(), network);
|
let finalized_state = FinalizedState::new(
|
||||||
|
&Config::ephemeral(),
|
||||||
|
network,
|
||||||
|
#[cfg(feature = "elasticsearch")]
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
|
||||||
let fake_value_pool = ValueBalance::<NonNegative>::fake_populated_pool();
|
let fake_value_pool = ValueBalance::<NonNegative>::fake_populated_pool();
|
||||||
finalized_state.set_finalized_value_pool(fake_value_pool);
|
finalized_state.set_finalized_value_pool(fake_value_pool);
|
||||||
|
|
@ -430,7 +460,12 @@ fn history_tree_is_updated_for_network_upgrade(
|
||||||
);
|
);
|
||||||
|
|
||||||
let mut state = NonFinalizedState::new(network);
|
let mut state = NonFinalizedState::new(network);
|
||||||
let finalized_state = FinalizedState::new(&Config::ephemeral(), network);
|
let finalized_state = FinalizedState::new(
|
||||||
|
&Config::ephemeral(),
|
||||||
|
network,
|
||||||
|
#[cfg(feature = "elasticsearch")]
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
|
||||||
state
|
state
|
||||||
.commit_new_chain(prev_block.clone().prepare(), &finalized_state)
|
.commit_new_chain(prev_block.clone().prepare(), &finalized_state)
|
||||||
|
|
@ -526,7 +561,12 @@ fn commitment_is_validated_for_network_upgrade(network: Network, network_upgrade
|
||||||
);
|
);
|
||||||
|
|
||||||
let mut state = NonFinalizedState::new(network);
|
let mut state = NonFinalizedState::new(network);
|
||||||
let finalized_state = FinalizedState::new(&Config::ephemeral(), network);
|
let finalized_state = FinalizedState::new(
|
||||||
|
&Config::ephemeral(),
|
||||||
|
network,
|
||||||
|
#[cfg(feature = "elasticsearch")]
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
|
||||||
state
|
state
|
||||||
.commit_new_chain(prev_block.clone().prepare(), &finalized_state)
|
.commit_new_chain(prev_block.clone().prepare(), &finalized_state)
|
||||||
|
|
|
||||||
|
|
@ -92,7 +92,12 @@ pub(crate) fn new_state_with_mainnet_genesis() -> (FinalizedState, NonFinalizedS
|
||||||
let config = Config::ephemeral();
|
let config = Config::ephemeral();
|
||||||
let network = Mainnet;
|
let network = Mainnet;
|
||||||
|
|
||||||
let mut finalized_state = FinalizedState::new(&config, network);
|
let mut finalized_state = FinalizedState::new(
|
||||||
|
&config,
|
||||||
|
network,
|
||||||
|
#[cfg(feature = "elasticsearch")]
|
||||||
|
None,
|
||||||
|
);
|
||||||
let non_finalized_state = NonFinalizedState::new(network);
|
let non_finalized_state = NonFinalizedState::new(network);
|
||||||
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
|
|
|
||||||
|
|
@ -34,6 +34,10 @@ getblocktemplate-rpcs = [
|
||||||
"zebra-chain/getblocktemplate-rpcs",
|
"zebra-chain/getblocktemplate-rpcs",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
elasticsearch = [
|
||||||
|
"zebra-state/elasticsearch",
|
||||||
|
]
|
||||||
|
|
||||||
sentry = ["dep:sentry"]
|
sentry = ["dep:sentry"]
|
||||||
flamegraph = ["tracing-flame", "inferno"]
|
flamegraph = ["tracing-flame", "inferno"]
|
||||||
journald = ["tracing-journald"]
|
journald = ["tracing-journald"]
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue