zebra-state: replace sled with rocksdb (#1325)

## Motivation

Prior to this PR we've been using `sled` as our database for storing persistent chain data on the disk between boots. We picked sled over rocksdb to minimize our c++ dependencies despite it being a less mature codebase. The theory was if it worked well enough we'd prefer to have a pure rust codebase, but if we ever ran into problems we knew we could easily swap it out with rocksdb.

Well, we ran into problems. Sled's memory usage was particularly high, and it seemed to be leaking memory. On top of all that, the performance for writes was pretty poor, causing us to become bottle-necked on sled instead of the network.

## Solution

This PR replaces `sled` with `rocksdb`. We've seen a 10x improvement in memory usage out of the box, no more leaking, and much better write performance. With this change writing chain data to disk is no longer a limiting factor in how quickly we can sync the chain.

The code in this pull request has:
  - [x] Documentation Comments
  - [x] Unit Tests and Property Tests

## Review

@hdevalence
This commit is contained in:
Jane Lusby 2020-11-18 18:05:06 -08:00 committed by GitHub
parent 65a605520f
commit 4c9bb87df2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 284 additions and 506 deletions

120
Cargo.lock generated
View File

@ -463,15 +463,6 @@ dependencies = [
"bitflags", "bitflags",
] ]
[[package]]
name = "cloudabi"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4344512281c643ae7638bbabc3af17a11307803ec8f0fcad9fae512a8bf36467"
dependencies = [
"bitflags",
]
[[package]] [[package]]
name = "color-backtrace" name = "color-backtrace"
version = "0.3.0" version = "0.3.0"
@ -939,16 +930,6 @@ dependencies = [
"percent-encoding", "percent-encoding",
] ]
[[package]]
name = "fs2"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213"
dependencies = [
"libc",
"winapi 0.3.9",
]
[[package]] [[package]]
name = "fuchsia-cprng" name = "fuchsia-cprng"
version = "0.1.1" version = "0.1.1"
@ -1072,15 +1053,6 @@ dependencies = [
"slab", "slab",
] ]
[[package]]
name = "fxhash"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c"
dependencies = [
"byteorder",
]
[[package]] [[package]]
name = "generational-arena" name = "generational-arena"
version = "0.2.8" version = "0.2.8"
@ -1390,15 +1362,6 @@ dependencies = [
"str_stack", "str_stack",
] ]
[[package]]
name = "instant"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb1fc4429a33e1f80d41dc9fea4d108a88bec1de8053878898ae448a0b52f613"
dependencies = [
"cfg-if 1.0.0",
]
[[package]] [[package]]
name = "iovec" name = "iovec"
version = "0.1.4" version = "0.1.4"
@ -1476,6 +1439,18 @@ dependencies = [
"winapi 0.3.9", "winapi 0.3.9",
] ]
[[package]]
name = "librocksdb-sys"
version = "6.11.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb5b56f651c204634b936be2f92dbb42c36867e00ff7fe2405591f3b9fa66f09"
dependencies = [
"bindgen",
"cc",
"glob",
"libc",
]
[[package]] [[package]]
name = "linked-hash-map" name = "linked-hash-map"
version = "0.5.3" version = "0.5.3"
@ -1491,15 +1466,6 @@ dependencies = [
"scopeguard", "scopeguard",
] ]
[[package]]
name = "lock_api"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28247cc5a5be2f05fbcd76dd0cf2c7d3b5400cb978a28042abcd4fa0b3f8261c"
dependencies = [
"scopeguard",
]
[[package]] [[package]]
name = "log" name = "log"
version = "0.4.11" version = "0.4.11"
@ -1648,7 +1614,7 @@ dependencies = [
"metrics-observer-prometheus", "metrics-observer-prometheus",
"metrics-observer-yaml", "metrics-observer-yaml",
"metrics-util", "metrics-util",
"parking_lot 0.10.2", "parking_lot",
"quanta", "quanta",
] ]
@ -1878,19 +1844,8 @@ version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3a704eb390aafdc107b0e392f56a82b668e3a71366993b5340f5833fd62505e" checksum = "d3a704eb390aafdc107b0e392f56a82b668e3a71366993b5340f5833fd62505e"
dependencies = [ dependencies = [
"lock_api 0.3.4", "lock_api",
"parking_lot_core 0.7.2", "parking_lot_core",
]
[[package]]
name = "parking_lot"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4893845fa2ca272e647da5d0e46660a314ead9c2fdd9a883aabc32e481a8733"
dependencies = [
"instant",
"lock_api 0.4.1",
"parking_lot_core 0.8.0",
] ]
[[package]] [[package]]
@ -1900,22 +1855,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d58c7c768d4ba344e3e8d72518ac13e259d7c7ade24167003b8488e10b6740a3" checksum = "d58c7c768d4ba344e3e8d72518ac13e259d7c7ade24167003b8488e10b6740a3"
dependencies = [ dependencies = [
"cfg-if 0.1.10", "cfg-if 0.1.10",
"cloudabi 0.0.3", "cloudabi",
"libc",
"redox_syscall",
"smallvec 1.4.2",
"winapi 0.3.9",
]
[[package]]
name = "parking_lot_core"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c361aa727dd08437f2f1447be8b59a33b0edd15e0fcee698f935613d9efbca9b"
dependencies = [
"cfg-if 0.1.10",
"cloudabi 0.1.0",
"instant",
"libc", "libc",
"redox_syscall", "redox_syscall",
"smallvec 1.4.2", "smallvec 1.4.2",
@ -2367,6 +2307,16 @@ dependencies = [
"opaque-debug 0.2.3", "opaque-debug 0.2.3",
] ]
[[package]]
name = "rocksdb"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23d83c02c429044d58474eaf5ae31e062d0de894e21125b47437ec0edc1397e6"
dependencies = [
"libc",
"librocksdb-sys",
]
[[package]] [[package]]
name = "rust-argon2" name = "rust-argon2"
version = "0.8.2" version = "0.8.2"
@ -2598,22 +2548,6 @@ version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"
[[package]]
name = "sled"
version = "0.34.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d0132f3e393bcb7390c60bb45769498cf4550bcb7a21d7f95c02b69f6362cdc"
dependencies = [
"crc32fast",
"crossbeam-epoch 0.9.0",
"crossbeam-utils 0.8.0",
"fs2",
"fxhash",
"libc",
"log",
"parking_lot 0.11.0",
]
[[package]] [[package]]
name = "smallvec" name = "smallvec"
version = "0.6.13" version = "0.6.13"
@ -3480,8 +3414,8 @@ dependencies = [
"once_cell", "once_cell",
"primitive-types", "primitive-types",
"proptest", "proptest",
"rocksdb",
"serde", "serde",
"sled",
"spandoc", "spandoc",
"tempdir", "tempdir",
"thiserror", "thiserror",

View File

@ -3,7 +3,7 @@ FROM rust:buster as builder
RUN apt-get update && \ RUN apt-get update && \
apt-get install -y --no-install-recommends \ apt-get install -y --no-install-recommends \
make cmake g++ gcc llvm libclang-dev make cmake g++ gcc llvm libclang-dev clang
RUN mkdir /zebra RUN mkdir /zebra
WORKDIR /zebra WORKDIR /zebra

View File

@ -97,11 +97,11 @@ with a timeout error. Currently, we outsource script verification to
Implementing the state request correctly requires considering two sets of behaviors: Implementing the state request correctly requires considering two sets of behaviors:
1. behaviors related to the state's external API (a `Buffer`ed `tower::Service`); 1. behaviors related to the state's external API (a `Buffer`ed `tower::Service`);
2. behaviors related to the state's internal implementation (using `sled`). 2. behaviors related to the state's internal implementation (using `rocksdb`).
Making this distinction helps us to ensure we don't accidentally leak Making this distinction helps us to ensure we don't accidentally leak
"internal" behaviors into "external" behaviors, which would violate "internal" behaviors into "external" behaviors, which would violate
encapsulation and make it more difficult to replace `sled`. encapsulation and make it more difficult to replace `rocksdb`.
In the first category, our state is presented to the rest of the application In the first category, our state is presented to the rest of the application
as a `Buffer`ed `tower::Service`. The `Buffer` wrapper allows shared access as a `Buffer`ed `tower::Service`. The `Buffer` wrapper allows shared access
@ -116,19 +116,12 @@ This means that our external API ensures that the state service sees a
linearized sequence of state requests, although the exact ordering is linearized sequence of state requests, although the exact ordering is
unpredictable when there are multiple senders making requests. unpredictable when there are multiple senders making requests.
In the second category, the Sled API presents itself synchronously, but Because the state service has exclusive access to the rocksdb database, and the
database and tree handles are clonable and can be moved between threads. All
that's required to process some request asynchronously is to clone the
appropriate handle, move it into an async block, and make the call as part of
the future. (We might want to use Tokio's blocking API for this, but that's a
side detail).
Because the state service has exclusive access to the sled database, and the
state service sees a linearized sequence of state requests, we have an easy state service sees a linearized sequence of state requests, we have an easy
way to opt in to asynchronous database access. We can perform sled operations way to opt in to asynchronous database access. We can perform rocksdb operations
synchronously in the `Service::call`, waiting for them to complete, and be synchronously in the `Service::call`, waiting for them to complete, and be
sure that all future requests will see the resulting sled state. Or, we can sure that all future requests will see the resulting rocksdb state. Or, we can
perform sled operations asynchronously in the future returned by perform rocksdb operations asynchronously in the future returned by
`Service::call`. `Service::call`.
If we perform all *writes* synchronously and allow reads to be either If we perform all *writes* synchronously and allow reads to be either
@ -139,7 +132,7 @@ time the request was processed, or a later state.
Now, returning to the UTXO lookup problem, we can map out the possible states Now, returning to the UTXO lookup problem, we can map out the possible states
with this restriction in mind. This description assumes that UTXO storage is with this restriction in mind. This description assumes that UTXO storage is
split into disjoint sets, one in-memory (e.g., blocks after the reorg limit) split into disjoint sets, one in-memory (e.g., blocks after the reorg limit)
and the other in sled (e.g., blocks after the reorg limit). The details of and the other in rocksdb (e.g., blocks after the reorg limit). The details of
this storage are not important for this design, only that the two sets are this storage are not important for this design, only that the two sets are
disjoint. disjoint.
@ -147,14 +140,14 @@ When the state service processes a `Request::AwaitUtxo(OutPoint)` referencing
some UTXO `u`, there are three disjoint possibilities: some UTXO `u`, there are three disjoint possibilities:
1. `u` is already contained in an in-memory block storage; 1. `u` is already contained in an in-memory block storage;
2. `u` is already contained in the sled UTXO set; 2. `u` is already contained in the rocksdb UTXO set;
3. `u` is not yet known to the state service. 3. `u` is not yet known to the state service.
In case 3, we need to queue `u` and scan all *future* blocks to see whether In case 3, we need to queue `u` and scan all *future* blocks to see whether
they contain `u`. However, if we have a mechanism to queue `u`, we can they contain `u`. However, if we have a mechanism to queue `u`, we can
perform check 2 asynchronously, because restricting to synchronous writes perform check 2 asynchronously, because restricting to synchronous writes
means that any async read will return the current or later state. If `u` was means that any async read will return the current or later state. If `u` was
in the sled UTXO set when the request was processed, the only way that an in the rocksdb UTXO set when the request was processed, the only way that an
async read would not return `u` is if the UTXO were spent, in which case the async read would not return `u` is if the UTXO were spent, in which case the
service is not required to return a response. service is not required to return a response.
@ -184,12 +177,12 @@ The state service should maintain an `Arc<Mutex<PendingUtxos>>`, used as follows
1. In `Service::call(Request::AwaitUtxo(u))`, the service should: 1. In `Service::call(Request::AwaitUtxo(u))`, the service should:
- call `PendingUtxos::queue(u)` to get a future `f` to return to the caller; - call `PendingUtxos::queue(u)` to get a future `f` to return to the caller;
spawn a task that does a sled lookup for `u`, calling `PendingUtxos::respond(u, output)` if present; spawn a task that does a rocksdb lookup for `u`, calling `PendingUtxos::respond(u, output)` if present;
- check the in-memory storage for `u`, calling `PendingUtxos::respond(u, output)` if present; - check the in-memory storage for `u`, calling `PendingUtxos::respond(u, output)` if present;
- return `f` to the caller (it may already be ready). - return `f` to the caller (it may already be ready).
The common case is that `u` references an old UTXO, so spawning the lookup The common case is that `u` references an old UTXO, so spawning the lookup
task first means that we don't wait to check in-memory storage for `u` task first means that we don't wait to check in-memory storage for `u`
before starting the sled lookup. before starting the rocksdb lookup.
2. In `Service::call(Request::CommitBlock(block, ..))`, the service should: 2. In `Service::call(Request::CommitBlock(block, ..))`, the service should:
- call `PendingUtxos::check_block(block.as_ref())`; - call `PendingUtxos::check_block(block.as_ref())`;

View File

@ -156,7 +156,7 @@ state data at the finality boundary provided by the reorg limit.
State data from blocks *above* the reorg limit (*non-finalized state*) is State data from blocks *above* the reorg limit (*non-finalized state*) is
stored in-memory and handles multiple chains. State data from blocks *below* stored in-memory and handles multiple chains. State data from blocks *below*
the reorg limit (*finalized state*) is stored persistently using `sled` and the reorg limit (*finalized state*) is stored persistently using `rocksdb` and
only tracks a single chain. This allows a simplification of our state only tracks a single chain. This allows a simplification of our state
handling, because only finalized data is persistent and the logic for handling, because only finalized data is persistent and the logic for
finalized data handles less invariants. finalized data handles less invariants.
@ -169,7 +169,7 @@ Another downside of this design is that we do not achieve exactly the same
behavior as `zcashd` in the event of a 51% attack: `zcashd` limits *each* chain behavior as `zcashd` in the event of a 51% attack: `zcashd` limits *each* chain
reorganization to 100 blocks, but permits multiple reorgs, while Zebra limits reorganization to 100 blocks, but permits multiple reorgs, while Zebra limits
*all* chain reorgs to 100 blocks. In the event of a successful 51% attack on *all* chain reorgs to 100 blocks. In the event of a successful 51% attack on
Zcash, this could be resolved by wiping the Sled state and re-syncing the new Zcash, this could be resolved by wiping the rocksdb state and re-syncing the new
chain, but in this scenario there are worse problems. chain, but in this scenario there are worse problems.
## Service Interface ## Service Interface
@ -180,11 +180,11 @@ Determining what guarantees the state service can and should provide to the
rest of the application requires considering two sets of behaviors: rest of the application requires considering two sets of behaviors:
1. behaviors related to the state's external API (a `Buffer`ed `tower::Service`); 1. behaviors related to the state's external API (a `Buffer`ed `tower::Service`);
2. behaviors related to the state's internal implementation (using `sled`). 2. behaviors related to the state's internal implementation (using `rocksdb`).
Making this distinction helps us to ensure we don't accidentally leak Making this distinction helps us to ensure we don't accidentally leak
"internal" behaviors into "external" behaviors, which would violate "internal" behaviors into "external" behaviors, which would violate
encapsulation and make it more difficult to replace `sled`. encapsulation and make it more difficult to replace `rocksdb`.
In the first category, our state is presented to the rest of the application In the first category, our state is presented to the rest of the application
as a `Buffer`ed `tower::Service`. The `Buffer` wrapper allows shared access as a `Buffer`ed `tower::Service`. The `Buffer` wrapper allows shared access
@ -199,19 +199,12 @@ This means that our external API ensures that the state service sees a
linearized sequence of state requests, although the exact ordering is linearized sequence of state requests, although the exact ordering is
unpredictable when there are multiple senders making requests. unpredictable when there are multiple senders making requests.
In the second category, the Sled API presents itself synchronously, but Because the state service has exclusive access to the rocksdb database, and the
database and tree handles are cloneable and can be moved between threads. All
that's required to process some request asynchronously is to clone the
appropriate handle, move it into an async block, and make the call as part of
the future. (We might want to use Tokio's blocking API for this, but this is
an implementation detail).
Because the state service has exclusive access to the sled database, and the
state service sees a linearized sequence of state requests, we have an easy state service sees a linearized sequence of state requests, we have an easy
way to opt in to asynchronous database access. We can perform sled operations way to opt in to asynchronous database access. We can perform rocksdb operations
synchronously in the `Service::call`, waiting for them to complete, and be synchronously in the `Service::call`, waiting for them to complete, and be
sure that all future requests will see the resulting sled state. Or, we can sure that all future requests will see the resulting rocksdb state. Or, we can
perform sled operations asynchronously in the future returned by perform rocksdb operations asynchronously in the future returned by
`Service::call`. `Service::call`.
If we perform all *writes* synchronously and allow reads to be either If we perform all *writes* synchronously and allow reads to be either
@ -221,10 +214,10 @@ time the request was processed, or a later state.
### Summary ### Summary
- **Sled reads** may be done synchronously (in `call`) or asynchronously (in - **rocksdb reads** may be done synchronously (in `call`) or asynchronously (in
the `Future`), depending on the context; the `Future`), depending on the context;
- **Sled writes** must be done synchronously (in `call`) - **rocksdb writes** must be done synchronously (in `call`)
## In-memory data structures ## In-memory data structures
[in-memory]: #in-memory [in-memory]: #in-memory
@ -580,22 +573,22 @@ New `non-finalized` blocks are commited as follows:
- Remove the lowest height block from the non-finalized state with - Remove the lowest height block from the non-finalized state with
`self.mem.finalize();` `self.mem.finalize();`
- Commit that block to the finalized state with - Commit that block to the finalized state with
`self.sled.commit_finalized_direct(finalized);` `self.disk.commit_finalized_direct(finalized);`
8. Prune orphaned blocks from `self.queued_blocks` with 8. Prune orphaned blocks from `self.queued_blocks` with
`self.queued_blocks.prune_by_height(finalized_height);` `self.queued_blocks.prune_by_height(finalized_height);`
9. Return the receiver for the block's channel 9. Return the receiver for the block's channel
## Sled data structures ## rocksdb data structures
[sled]: #sled [rocksdb]: #rocksdb
Sled provides a persistent, thread-safe `BTreeMap<&[u8], &[u8]>`. Each map is rocksdb provides a persistent, thread-safe `BTreeMap<&[u8], &[u8]>`. Each map is
a distinct "tree". Keys are sorted using lex order on byte strings, so a distinct "tree". Keys are sorted using lex order on byte strings, so
integer values should be stored using big-endian encoding (so that the lex integer values should be stored using big-endian encoding (so that the lex
order on byte strings is the numeric ordering). order on byte strings is the numeric ordering).
We use the following Sled trees: We use the following rocksdb column families:
| Tree | Keys | Values | | Tree | Keys | Values |
|----------------------|-----------------------|-------------------------------------| |----------------------|-----------------------|-------------------------------------|
@ -613,16 +606,16 @@ Zcash structures are encoded using `ZcashSerialize`/`ZcashDeserialize`.
**Note:** We do not store the cumulative work for the finalized chain, because the finalized work is equal for all non-finalized chains. So the additional non-finalized work can be used to calculate the relative chain order, and choose the best chain. **Note:** We do not store the cumulative work for the finalized chain, because the finalized work is equal for all non-finalized chains. So the additional non-finalized work can be used to calculate the relative chain order, and choose the best chain.
### Notes on Sled trees ### Notes on rocksdb column families
- The `hash_by_height` and `height_by_hash` trees provide a bijection between - The `hash_by_height` and `height_by_hash` column families provide a bijection between
block heights and block hashes. (Since the Sled state only stores finalized block heights and block hashes. (Since the rocksdb state only stores finalized
state, they are actually a bijection). state, they are actually a bijection).
- The `block_by_height` tree provides a bijection between block heights and block - The `block_by_height` column family provides a bijection between block
data. There is no corresponding `height_by_block` tree: instead, hash the block, heights and block data. There is no corresponding `height_by_block` column
and use `height_by_hash`. (Since the Sled state only stores finalized state, family: instead, hash the block, and use `height_by_hash`. (Since the
they are actually a bijection). rocksdb state only stores finalized state, they are actually a bijection).
- Blocks are stored by height, not by hash. This has the downside that looking - Blocks are stored by height, not by hash. This has the downside that looking
up a block by hash requires an extra level of indirection. The upside is up a block by hash requires an extra level of indirection. The upside is
@ -630,7 +623,7 @@ Zcash structures are encoded using `ZcashSerialize`/`ZcashDeserialize`.
common access patterns, such as helping a client sync the chain or doing common access patterns, such as helping a client sync the chain or doing
analysis, access blocks in (potentially sparse) height order. In addition, analysis, access blocks in (potentially sparse) height order. In addition,
the fact that we commit blocks in order means we're writing only to the end the fact that we commit blocks in order means we're writing only to the end
of the Sled tree, which may help save space. of the rocksdb column family, which may help save space.
- Transaction references are stored as a `(height, index)` pair referencing the - Transaction references are stored as a `(height, index)` pair referencing the
height of the transaction's parent block and the transaction's index in that height of the transaction's parent block and the transaction's index in that
@ -645,7 +638,7 @@ commit any queued children. (Although the checkpointer generates verified
blocks in order when it completes a checkpoint, the blocks are committed in the blocks in order when it completes a checkpoint, the blocks are committed in the
response futures, so they may arrive out of order). response futures, so they may arrive out of order).
Committing a block to the sled state should be implemented as a wrapper around Committing a block to the rocksdb state should be implemented as a wrapper around
a function also called by [`Request::CommitBlock`](#request-commit-block), a function also called by [`Request::CommitBlock`](#request-commit-block),
which should: which should:
@ -754,7 +747,7 @@ CommitFinalizedBlock {
} }
``` ```
Commits a finalized block to the sled state, skipping contextual validation. Commits a finalized block to the rocksdb state, skipping contextual validation.
This is exposed for use in checkpointing, which produces in-order finalized This is exposed for use in checkpointing, which produces in-order finalized
blocks. Returns `Response::Added(block::Hash)` with the hash of the blocks. Returns `Response::Added(block::Hash)` with the hash of the
committed block if successful. committed block if successful.

View File

@ -15,7 +15,6 @@ dirs = "3.0.1"
hex = "0.4.2" hex = "0.4.2"
lazy_static = "1.4.0" lazy_static = "1.4.0"
serde = { version = "1", features = ["serde_derive"] } serde = { version = "1", features = ["serde_derive"] }
sled = "0.34.5"
futures = "0.3.7" futures = "0.3.7"
metrics = "0.12" metrics = "0.12"
@ -25,6 +24,7 @@ tracing-error = "0.1.2"
thiserror = "1.0.22" thiserror = "1.0.22"
tokio = { version = "0.2.22", features = ["sync"] } tokio = { version = "0.2.22", features = ["sync"] }
displaydoc = "0.1.7" displaydoc = "0.1.7"
rocksdb = "0.15.0"
[dev-dependencies] [dev-dependencies]
zebra-chain = { path = "../zebra-chain", features = ["proptest-impl"] } zebra-chain = { path = "../zebra-chain", features = ["proptest-impl"] }

View File

@ -27,18 +27,6 @@ pub struct Config {
/// | Other | `std::env::current_dir()/cache` | | /// | Other | `std::env::current_dir()/cache` | |
pub cache_dir: PathBuf, pub cache_dir: PathBuf,
/// Controls the size of the database cache, in bytes.
///
/// This corresponds to `sled`'s [`cache_capacity`][cc] parameter.
/// Note that the behavior of this parameter is [somewhat
/// unintuitive][gh], measuring the on-disk size of the cached data,
/// not the in-memory size, which may be much larger, especially for
/// smaller keys and values.
///
/// [cc]: https://docs.rs/sled/0.34.4/sled/struct.Config.html#method.cache_capacity
/// [gh]: https://github.com/spacejam/sled/issues/986#issuecomment-592950100
pub memory_cache_bytes: u64,
/// Whether to use an ephemeral database. /// Whether to use an ephemeral database.
/// ///
/// Ephemeral databases are stored in memory on Linux, and in a temporary directory on other OSes. /// Ephemeral databases are stored in memory on Linux, and in a temporary directory on other OSes.
@ -54,30 +42,68 @@ pub struct Config {
pub debug_stop_at_height: Option<u32>, pub debug_stop_at_height: Option<u32>,
} }
fn gen_temp_path() -> PathBuf {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::SystemTime;
static SALT_COUNTER: AtomicUsize = AtomicUsize::new(0);
let seed = SALT_COUNTER.fetch_add(1, Ordering::SeqCst) as u128;
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_nanos()
<< 48;
#[cfg(not(miri))]
let pid = u128::from(std::process::id());
#[cfg(miri)]
let pid = 0;
let salt = (pid << 16) + now + seed;
if cfg!(target_os = "linux") {
// use shared memory for temporary linux files
format!("/dev/shm/pagecache.tmp.{}", salt).into()
} else {
std::env::temp_dir().join(format!("pagecache.tmp.{}", salt))
}
}
impl Config { impl Config {
/// Generate the appropriate `sled::Config` for `network`, based on the pub(crate) fn open_db(&self, network: Network) -> rocksdb::DB {
/// provided `zebra_state::Config`.
pub(crate) fn sled_config(&self, network: Network) -> sled::Config {
let net_dir = match network { let net_dir = match network {
Network::Mainnet => "mainnet", Network::Mainnet => "mainnet",
Network::Testnet => "testnet", Network::Testnet => "testnet",
}; };
let config = sled::Config::default() let mut opts = rocksdb::Options::default();
.cache_capacity(self.memory_cache_bytes)
.mode(sled::Mode::LowSpace);
if self.ephemeral { let cfs = vec![
config.temporary(self.ephemeral) rocksdb::ColumnFamilyDescriptor::new("hash_by_height", opts.clone()),
rocksdb::ColumnFamilyDescriptor::new("height_by_hash", opts.clone()),
rocksdb::ColumnFamilyDescriptor::new("block_by_height", opts.clone()),
rocksdb::ColumnFamilyDescriptor::new("tx_by_hash", opts.clone()),
rocksdb::ColumnFamilyDescriptor::new("utxo_by_outpoint", opts.clone()),
rocksdb::ColumnFamilyDescriptor::new("sprout_nullifiers", opts.clone()),
rocksdb::ColumnFamilyDescriptor::new("sapling_nullifiers", opts.clone()),
];
opts.create_if_missing(true);
opts.create_missing_column_families(true);
let path = if self.ephemeral {
gen_temp_path()
} else { } else {
let path = self self.cache_dir
.cache_dir
.join("state") .join("state")
.join(format!("v{}", crate::constants::SLED_FORMAT_VERSION)) .join(format!("v{}", crate::constants::DATABASE_FORMAT_VERSION))
.join(net_dir); .join(net_dir)
};
config.path(path) rocksdb::DB::open_cf_descriptors(&opts, path, cfs).unwrap()
}
} }
/// Construct a config for an ephemeral in memory database /// Construct a config for an ephemeral in memory database
@ -96,7 +122,6 @@ impl Default for Config {
Self { Self {
cache_dir, cache_dir,
memory_cache_bytes: 50_000_000,
ephemeral: false, ephemeral: false,
debug_stop_at_height: None, debug_stop_at_height: None,
} }

View File

@ -11,4 +11,4 @@ pub const MIN_TRASPARENT_COINBASE_MATURITY: u32 = 100;
/// coinbase transactions. /// coinbase transactions.
pub const MAX_BLOCK_REORG_HEIGHT: u32 = MIN_TRASPARENT_COINBASE_MATURITY - 1; pub const MAX_BLOCK_REORG_HEIGHT: u32 = MIN_TRASPARENT_COINBASE_MATURITY - 1;
pub const SLED_FORMAT_VERSION: u32 = 1; pub const DATABASE_FORMAT_VERSION: u32 = 2;

View File

@ -1,4 +1,4 @@
//! The primary implementation of the `zebra_state::Service` built upon sled //! The primary implementation of the `zebra_state::Service` built upon rocksdb
mod disk_format; mod disk_format;
@ -17,55 +17,27 @@ use self::disk_format::{DiskDeserialize, DiskSerialize, FromDisk, IntoDisk, Tran
use super::QueuedBlock; use super::QueuedBlock;
/// The finalized part of the chain state, stored in sled. /// The finalized part of the chain state, stored in the db.
///
/// This structure has two categories of methods:
///
/// - *synchronous* methods that perform writes to the sled state;
/// - *asynchronous* methods that perform reads.
///
/// For more on this distinction, see RFC5. The synchronous methods are
/// implemented as ordinary methods on the [`FinalizedState`]. The asynchronous
/// methods are not implemented using `async fn`, but using normal methods that
/// return `impl Future<Output = ...>`. This allows them to move data (e.g.,
/// clones of handles for [`sled::Tree`]s) into the futures they return.
///
/// This means that the returned futures have a `'static` lifetime and don't
/// borrow any resources from the [`FinalizedState`], and the actual database work is
/// performed asynchronously when the returned future is polled, not while it is
/// created. This is analogous to the way [`tower::Service::call`] works.
pub struct FinalizedState { pub struct FinalizedState {
/// Queued blocks that arrived out of order, indexed by their parent block hash. /// Queued blocks that arrived out of order, indexed by their parent block hash.
queued_by_prev_hash: HashMap<block::Hash, QueuedBlock>, queued_by_prev_hash: HashMap<block::Hash, QueuedBlock>,
max_queued_height: i64, max_queued_height: i64,
hash_by_height: sled::Tree, db: rocksdb::DB,
height_by_hash: sled::Tree, ephemeral: bool,
block_by_height: sled::Tree,
tx_by_hash: sled::Tree,
utxo_by_outpoint: sled::Tree,
sprout_nullifiers: sled::Tree,
sapling_nullifiers: sled::Tree,
// sprout_anchors: sled::Tree,
// sapling_anchors: sled::Tree,
/// Commit blocks to the finalized state up to this height, then exit Zebra. /// Commit blocks to the finalized state up to this height, then exit Zebra.
debug_stop_at_height: Option<block::Height>, debug_stop_at_height: Option<block::Height>,
} }
impl FinalizedState { impl FinalizedState {
pub fn new(config: &Config, network: Network) -> Self { pub fn new(config: &Config, network: Network) -> Self {
let db = config.sled_config(network).open().unwrap(); let db = config.open_db(network);
let new_state = Self { let new_state = Self {
queued_by_prev_hash: HashMap::new(), queued_by_prev_hash: HashMap::new(),
max_queued_height: -1, max_queued_height: -1,
hash_by_height: db.open_tree(b"hash_by_height").unwrap(), db,
height_by_hash: db.open_tree(b"height_by_hash").unwrap(), ephemeral: config.ephemeral,
block_by_height: db.open_tree(b"block_by_height").unwrap(),
tx_by_hash: db.open_tree(b"tx_by_hash").unwrap(),
utxo_by_outpoint: db.open_tree(b"utxo_by_outpoint").unwrap(),
sprout_nullifiers: db.open_tree(b"sprout_nullifiers").unwrap(),
sapling_nullifiers: db.open_tree(b"sapling_nullifiers").unwrap(),
debug_stop_at_height: config.debug_stop_at_height.map(block::Height), debug_stop_at_height: config.debug_stop_at_height.map(block::Height),
}; };
@ -101,30 +73,8 @@ impl FinalizedState {
new_state new_state
} }
/// Synchronously flushes all dirty IO buffers and calls fsync. /// Stop the process if `block_height` is greater than or equal to the
/// /// configured stop height.
/// Returns the number of bytes flushed during this call.
/// See sled's `Tree.flush` for more details.
pub fn flush(&self) -> sled::Result<usize> {
let mut total_flushed = 0;
total_flushed += self.hash_by_height.flush()?;
total_flushed += self.height_by_hash.flush()?;
total_flushed += self.block_by_height.flush()?;
total_flushed += self.tx_by_hash.flush()?;
total_flushed += self.utxo_by_outpoint.flush()?;
total_flushed += self.sprout_nullifiers.flush()?;
total_flushed += self.sapling_nullifiers.flush()?;
Ok(total_flushed)
}
/// If `block_height` is greater than or equal to the configured stop height,
/// stop the process.
///
/// Flushes sled trees before exiting.
///
/// `called_from` and `block_hash` are used for assertions and logging.
fn is_at_stop_height(&self, block_height: block::Height) -> bool { fn is_at_stop_height(&self, block_height: block::Height) -> bool {
let debug_stop_at_height = match self.debug_stop_at_height { let debug_stop_at_height = match self.debug_stop_at_height {
Some(debug_stop_at_height) => debug_stop_at_height, Some(debug_stop_at_height) => debug_stop_at_height,
@ -181,10 +131,16 @@ impl FinalizedState {
self.tip().map(|(height, _)| height) self.tip().map(|(height, _)| height)
} }
fn is_empty(&self, cf: &rocksdb::ColumnFamily) -> bool {
// use iterator to check if it's empty
!self
.db
.iterator_cf(cf, rocksdb::IteratorMode::Start)
.valid()
}
/// Immediately commit `block` to the finalized state. /// Immediately commit `block` to the finalized state.
pub fn commit_finalized_direct(&mut self, block: Arc<Block>) -> Result<block::Hash, BoxError> { pub fn commit_finalized_direct(&mut self, block: Arc<Block>) -> Result<block::Hash, BoxError> {
use sled::Transactional;
let height = block let height = block
.coinbase_height() .coinbase_height()
.expect("finalized blocks are valid and have a coinbase height"); .expect("finalized blocks are valid and have a coinbase height");
@ -192,8 +148,16 @@ impl FinalizedState {
block_precommit_metrics(&hash, height, &block); block_precommit_metrics(&hash, height, &block);
let hash_by_height = self.db.cf_handle("hash_by_height").unwrap();
let height_by_hash = self.db.cf_handle("height_by_hash").unwrap();
let block_by_height = self.db.cf_handle("block_by_height").unwrap();
let tx_by_hash = self.db.cf_handle("tx_by_hash").unwrap();
let utxo_by_outpoint = self.db.cf_handle("utxo_by_outpoint").unwrap();
let sprout_nullifiers = self.db.cf_handle("sprout_nullifiers").unwrap();
let sapling_nullifiers = self.db.cf_handle("sapling_nullifiers").unwrap();
// Assert that callers (including unit tests) get the chain order correct // Assert that callers (including unit tests) get the chain order correct
if self.block_by_height.is_empty() { if self.is_empty(hash_by_height) {
assert_eq!( assert_eq!(
block::Hash([0; 32]), block::Hash([0; 32]),
block.header.previous_block_hash, block.header.previous_block_hash,
@ -220,94 +184,73 @@ impl FinalizedState {
); );
} }
let result = ( // We use a closure so we can use an early return for control flow in
&self.hash_by_height, // the genesis case
&self.height_by_hash, let prepare_commit = || -> rocksdb::WriteBatch {
&self.block_by_height, let mut batch = rocksdb::WriteBatch::default();
&self.utxo_by_outpoint,
&self.tx_by_hash,
&self.sprout_nullifiers,
&self.sapling_nullifiers,
)
.transaction(
move |(
hash_by_height,
height_by_hash,
block_by_height,
utxo_by_outpoint,
tx_by_hash,
sprout_nullifiers,
sapling_nullifiers,
)| {
// Index the block
hash_by_height.zs_insert(height, hash)?;
height_by_hash.zs_insert(hash, height)?;
block_by_height.zs_insert(height, &block)?;
// TODO: sprout and sapling anchors (per block) // Index the block
batch.zs_insert(hash_by_height, height, hash);
batch.zs_insert(height_by_hash, hash, height);
batch.zs_insert(block_by_height, height, &block);
// Consensus-critical bug in zcashd: transactions in the // TODO: sprout and sapling anchors (per block)
// genesis block are ignored.
if block.header.previous_block_hash == block::Hash([0; 32]) {
return Ok(hash);
}
// Index each transaction // Consensus-critical bug in zcashd: transactions in the
for (transaction_index, transaction) in block.transactions.iter().enumerate() { // genesis block are ignored.
let transaction_hash = transaction.hash(); if block.header.previous_block_hash == block::Hash([0; 32]) {
let transaction_location = TransactionLocation { return batch;
height,
index: transaction_index
.try_into()
.expect("no more than 4 billion transactions per block"),
};
tx_by_hash.zs_insert(transaction_hash, transaction_location)?;
// Mark all transparent inputs as spent
for input in transaction.inputs() {
match input {
transparent::Input::PrevOut { outpoint, .. } => {
utxo_by_outpoint.remove(outpoint.as_bytes())?;
}
// Coinbase inputs represent new coins,
// so there are no UTXOs to mark as spent.
transparent::Input::Coinbase { .. } => {}
}
}
// Index all new transparent outputs
for (index, output) in transaction.outputs().iter().enumerate() {
let outpoint = transparent::OutPoint {
hash: transaction_hash,
index: index as _,
};
utxo_by_outpoint.zs_insert(outpoint, output)?;
}
// Mark sprout and sapling nullifiers as spent
for sprout_nullifier in transaction.sprout_nullifiers() {
sprout_nullifiers.zs_insert(sprout_nullifier, ())?;
}
for sapling_nullifier in transaction.sapling_nullifiers() {
sapling_nullifiers.zs_insert(sapling_nullifier, ())?;
}
}
// for some reason type inference fails here
Ok::<_, sled::transaction::ConflictableTransactionError>(hash)
},
);
if result.is_ok() && self.is_at_stop_height(height) {
if let Err(e) = self.flush() {
tracing::error!(
?e,
?height,
?hash,
"error flushing sled state before stopping"
);
} }
// Index each transaction
for (transaction_index, transaction) in block.transactions.iter().enumerate() {
let transaction_hash = transaction.hash();
let transaction_location = TransactionLocation {
height,
index: transaction_index
.try_into()
.expect("no more than 4 billion transactions per block"),
};
batch.zs_insert(tx_by_hash, transaction_hash, transaction_location);
// Mark all transparent inputs as spent
for input in transaction.inputs() {
match input {
transparent::Input::PrevOut { outpoint, .. } => {
batch.delete_cf(utxo_by_outpoint, outpoint.as_bytes());
}
// Coinbase inputs represent new coins,
// so there are no UTXOs to mark as spent.
transparent::Input::Coinbase { .. } => {}
}
}
// Index all new transparent outputs
for (index, output) in transaction.outputs().iter().enumerate() {
let outpoint = transparent::OutPoint {
hash: transaction_hash,
index: index as _,
};
batch.zs_insert(utxo_by_outpoint, outpoint, output);
}
// Mark sprout and sapling nullifiers as spent
for sprout_nullifier in transaction.sprout_nullifiers() {
batch.zs_insert(sprout_nullifiers, sprout_nullifier, ());
}
for sapling_nullifier in transaction.sapling_nullifiers() {
batch.zs_insert(sapling_nullifiers, sapling_nullifier, ());
}
}
batch
};
let batch = prepare_commit();
let result = self.db.write(batch).map(|()| hash);
if result.is_ok() && self.is_at_stop_height(height) {
tracing::info!(?height, ?hash, "stopping at configured height"); tracing::info!(?height, ?hash, "stopping at configured height");
std::process::exit(0); std::process::exit(0);
@ -330,15 +273,13 @@ impl FinalizedState {
/// Returns the tip height and hash if there is one. /// Returns the tip height and hash if there is one.
pub fn tip(&self) -> Option<(block::Height, block::Hash)> { pub fn tip(&self) -> Option<(block::Height, block::Hash)> {
self.hash_by_height let hash_by_height = self.db.cf_handle("hash_by_height").unwrap();
.iter() self.db
.rev() .iterator_cf(hash_by_height, rocksdb::IteratorMode::End)
.next() .next()
.transpose()
.expect("expected that sled errors would not occur")
.map(|(height_bytes, hash_bytes)| { .map(|(height_bytes, hash_bytes)| {
let height = block::Height::from_ivec(height_bytes); let height = block::Height::from_bytes(height_bytes);
let hash = block::Hash::from_ivec(hash_bytes); let hash = block::Hash::from_bytes(hash_bytes);
(height, hash) (height, hash)
}) })
@ -346,31 +287,37 @@ impl FinalizedState {
/// Returns the height of the given block if it exists. /// Returns the height of the given block if it exists.
pub fn height(&self, hash: block::Hash) -> Option<block::Height> { pub fn height(&self, hash: block::Hash) -> Option<block::Height> {
self.height_by_hash.zs_get(&hash) let height_by_hash = self.db.cf_handle("height_by_hash").unwrap();
self.db.zs_get(&height_by_hash, &hash)
} }
/// Returns the given block if it exists. /// Returns the given block if it exists.
pub fn block(&self, hash_or_height: HashOrHeight) -> Option<Arc<Block>> { pub fn block(&self, hash_or_height: HashOrHeight) -> Option<Arc<Block>> {
let height = hash_or_height.height_or_else(|hash| self.height_by_hash.zs_get(&hash))?; let height_by_hash = self.db.cf_handle("height_by_hash").unwrap();
let block_by_height = self.db.cf_handle("block_by_height").unwrap();
let height = hash_or_height.height_or_else(|hash| self.db.zs_get(height_by_hash, &hash))?;
self.block_by_height.zs_get(&height) self.db.zs_get(block_by_height, &height)
} }
/// Returns the `transparent::Output` pointed to by the given /// Returns the `transparent::Output` pointed to by the given
/// `transparent::OutPoint` if it is present. /// `transparent::OutPoint` if it is present.
pub fn utxo(&self, outpoint: &transparent::OutPoint) -> Option<transparent::Output> { pub fn utxo(&self, outpoint: &transparent::OutPoint) -> Option<transparent::Output> {
self.utxo_by_outpoint.zs_get(outpoint) let utxo_by_outpoint = self.db.cf_handle("utxo_by_outpoint").unwrap();
self.db.zs_get(utxo_by_outpoint, outpoint)
} }
/// Returns the finalized hash for a given `block::Height` if it is present. /// Returns the finalized hash for a given `block::Height` if it is present.
pub fn hash(&self, height: block::Height) -> Option<block::Hash> { pub fn hash(&self, height: block::Height) -> Option<block::Hash> {
self.hash_by_height.zs_get(&height) let hash_by_height = self.db.cf_handle("hash_by_height").unwrap();
self.db.zs_get(hash_by_height, &height)
} }
/// Returns the given transaction if it exists. /// Returns the given transaction if it exists.
pub fn transaction(&self, hash: transaction::Hash) -> Option<Arc<Transaction>> { pub fn transaction(&self, hash: transaction::Hash) -> Option<Arc<Transaction>> {
self.tx_by_hash let tx_by_hash = self.db.cf_handle("tx_by_hash").unwrap();
.zs_get(&hash) self.db
.zs_get(tx_by_hash, &hash)
.map(|TransactionLocation { index, height }| { .map(|TransactionLocation { index, height }| {
let block = self let block = self
.block(height.into()) .block(height.into())
@ -381,6 +328,20 @@ impl FinalizedState {
} }
} }
// Drop isn't guaranteed to run, such as when we panic, or if someone stored
// their FinalizedState in a static, but it should be fine if we don't clean
// this up since the files are placed in the os temp dir and should be cleaned
// up automatically eventually.
impl Drop for FinalizedState {
fn drop(&mut self) {
if self.ephemeral {
let path = self.db.path();
tracing::debug!("removing temporary database files {:?}", path);
let _res = std::fs::remove_dir_all(path);
}
}
}
fn block_precommit_metrics(hash: &block::Hash, height: block::Height, block: &Block) { fn block_precommit_metrics(hash: &block::Hash, height: block::Height, block: &Block) {
let transaction_count = block.transactions.len(); let transaction_count = block.transactions.len();
let transparent_prevout_count = block let transparent_prevout_count = block

View File

@ -1,4 +1,4 @@
//! Module defining exactly how to move types in and out of sled //! Module defining exactly how to move types in and out of rocksdb
use std::{convert::TryInto, fmt::Debug, sync::Arc}; use std::{convert::TryInto, fmt::Debug, sync::Arc};
use zebra_chain::{ use zebra_chain::{
@ -25,9 +25,6 @@ pub trait IntoDisk {
// function to convert the current type to its disk format in `zs_get()` // function to convert the current type to its disk format in `zs_get()`
// without necessarily allocating a new IVec // without necessarily allocating a new IVec
fn as_bytes(&self) -> Self::Bytes; fn as_bytes(&self) -> Self::Bytes;
// function to convert the current type into its disk format
fn into_ivec(&self) -> sled::IVec;
} }
impl<'a, T> IntoDisk for &'a T impl<'a, T> IntoDisk for &'a T
@ -39,10 +36,6 @@ where
fn as_bytes(&self) -> Self::Bytes { fn as_bytes(&self) -> Self::Bytes {
T::as_bytes(*self) T::as_bytes(*self)
} }
fn into_ivec(&self) -> sled::IVec {
T::into_ivec(*self)
}
} }
impl<T> IntoDisk for Arc<T> impl<T> IntoDisk for Arc<T>
@ -54,10 +47,6 @@ where
fn as_bytes(&self) -> Self::Bytes { fn as_bytes(&self) -> Self::Bytes {
T::as_bytes(&*self) T::as_bytes(&*self)
} }
fn into_ivec(&self) -> sled::IVec {
T::into_ivec(&*self)
}
} }
/// Helper type for retrieving types from the disk with the correct format. /// Helper type for retrieving types from the disk with the correct format.
@ -69,15 +58,15 @@ pub trait FromDisk: Sized {
/// # Panics /// # Panics
/// ///
/// - if the input data doesn't deserialize correctly /// - if the input data doesn't deserialize correctly
fn from_ivec(bytes: sled::IVec) -> Self; fn from_bytes(bytes: impl AsRef<[u8]>) -> Self;
} }
impl<T> FromDisk for Arc<T> impl<T> FromDisk for Arc<T>
where where
T: FromDisk, T: FromDisk,
{ {
fn from_ivec(bytes: sled::IVec) -> Self { fn from_bytes(bytes: impl AsRef<[u8]>) -> Self {
Arc::new(T::from_ivec(bytes)) Arc::new(T::from_bytes(bytes))
} }
} }
@ -88,16 +77,12 @@ impl IntoDisk for Block {
self.zcash_serialize_to_vec() self.zcash_serialize_to_vec()
.expect("serialization to vec doesn't fail") .expect("serialization to vec doesn't fail")
} }
fn into_ivec(&self) -> sled::IVec {
self.as_bytes().into()
}
} }
impl FromDisk for Block { impl FromDisk for Block {
fn from_ivec(bytes: sled::IVec) -> Self { fn from_bytes(bytes: impl AsRef<[u8]>) -> Self {
Block::zcash_deserialize(bytes.as_ref()) Block::zcash_deserialize(bytes.as_ref())
.expect("deserialization format should match the serialization format used by IntoSled") .expect("deserialization format should match the serialization format used by IntoDisk")
} }
} }
@ -115,24 +100,21 @@ impl IntoDisk for TransactionLocation {
bytes bytes
} }
fn into_ivec(&self) -> sled::IVec {
self.as_bytes().as_ref().into()
}
} }
impl FromDisk for TransactionLocation { impl FromDisk for TransactionLocation {
fn from_ivec(sled_bytes: sled::IVec) -> Self { fn from_bytes(disk_bytes: impl AsRef<[u8]>) -> Self {
let disk_bytes = disk_bytes.as_ref();
let height = { let height = {
let mut bytes = [0; 4]; let mut bytes = [0; 4];
bytes.copy_from_slice(&sled_bytes[0..4]); bytes.copy_from_slice(&disk_bytes[0..4]);
let height = u32::from_be_bytes(bytes); let height = u32::from_be_bytes(bytes);
block::Height(height) block::Height(height)
}; };
let index = { let index = {
let mut bytes = [0; 4]; let mut bytes = [0; 4];
bytes.copy_from_slice(&sled_bytes[4..8]); bytes.copy_from_slice(&disk_bytes[4..8]);
u32::from_be_bytes(bytes) u32::from_be_bytes(bytes)
}; };
@ -146,10 +128,6 @@ impl IntoDisk for transaction::Hash {
fn as_bytes(&self) -> Self::Bytes { fn as_bytes(&self) -> Self::Bytes {
self.0 self.0
} }
fn into_ivec(&self) -> sled::IVec {
self.as_bytes().as_ref().into()
}
} }
impl IntoDisk for block::Hash { impl IntoDisk for block::Hash {
@ -158,13 +136,10 @@ impl IntoDisk for block::Hash {
fn as_bytes(&self) -> Self::Bytes { fn as_bytes(&self) -> Self::Bytes {
self.0 self.0
} }
fn into_ivec(&self) -> sled::IVec {
self.as_bytes().as_ref().into()
}
} }
impl FromDisk for block::Hash { impl FromDisk for block::Hash {
fn from_ivec(bytes: sled::IVec) -> Self { fn from_bytes(bytes: impl AsRef<[u8]>) -> Self {
let array = bytes.as_ref().try_into().unwrap(); let array = bytes.as_ref().try_into().unwrap();
Self(array) Self(array)
} }
@ -176,10 +151,6 @@ impl IntoDisk for sprout::Nullifier {
fn as_bytes(&self) -> Self::Bytes { fn as_bytes(&self) -> Self::Bytes {
self.0 self.0
} }
fn into_ivec(&self) -> sled::IVec {
self.as_bytes().as_ref().into()
}
} }
impl IntoDisk for sapling::Nullifier { impl IntoDisk for sapling::Nullifier {
@ -188,10 +159,6 @@ impl IntoDisk for sapling::Nullifier {
fn as_bytes(&self) -> Self::Bytes { fn as_bytes(&self) -> Self::Bytes {
self.0 self.0
} }
fn into_ivec(&self) -> sled::IVec {
self.as_bytes().as_ref().into()
}
} }
impl IntoDisk for () { impl IntoDisk for () {
@ -200,10 +167,6 @@ impl IntoDisk for () {
fn as_bytes(&self) -> Self::Bytes { fn as_bytes(&self) -> Self::Bytes {
[] []
} }
fn into_ivec(&self) -> sled::IVec {
sled::IVec::default()
}
} }
impl IntoDisk for block::Height { impl IntoDisk for block::Height {
@ -212,13 +175,10 @@ impl IntoDisk for block::Height {
fn as_bytes(&self) -> Self::Bytes { fn as_bytes(&self) -> Self::Bytes {
self.0.to_be_bytes() self.0.to_be_bytes()
} }
fn into_ivec(&self) -> sled::IVec {
self.as_bytes().as_ref().into()
}
} }
impl FromDisk for block::Height { impl FromDisk for block::Height {
fn from_ivec(bytes: sled::IVec) -> Self { fn from_bytes(bytes: impl AsRef<[u8]>) -> Self {
let array = bytes.as_ref().try_into().unwrap(); let array = bytes.as_ref().try_into().unwrap();
block::Height(u32::from_be_bytes(array)) block::Height(u32::from_be_bytes(array))
} }
@ -231,16 +191,12 @@ impl IntoDisk for transparent::Output {
self.zcash_serialize_to_vec() self.zcash_serialize_to_vec()
.expect("serialization to vec doesn't fail") .expect("serialization to vec doesn't fail")
} }
fn into_ivec(&self) -> sled::IVec {
self.as_bytes().into()
}
} }
impl FromDisk for transparent::Output { impl FromDisk for transparent::Output {
fn from_ivec(bytes: sled::IVec) -> Self { fn from_bytes(bytes: impl AsRef<[u8]>) -> Self {
Self::zcash_deserialize(&*bytes) Self::zcash_deserialize(bytes.as_ref())
.expect("deserialization format should match the serialization format used by IntoSled") .expect("deserialization format should match the serialization format used by IntoDisk")
} }
} }
@ -251,78 +207,57 @@ impl IntoDisk for transparent::OutPoint {
self.zcash_serialize_to_vec() self.zcash_serialize_to_vec()
.expect("serialization to vec doesn't fail") .expect("serialization to vec doesn't fail")
} }
fn into_ivec(&self) -> sled::IVec {
self.as_bytes().into()
}
} }
/// Helper trait for inserting (Key, Value) pairs into sled with a consistently /// Helper trait for inserting (Key, Value) pairs into rocksdb with a consistently
/// defined format /// defined format
pub trait DiskSerialize { pub trait DiskSerialize {
/// Serialize and insert the given key and value into a sled tree. /// Serialize and insert the given key and value into a rocksdb column family.
fn zs_insert<K, V>( fn zs_insert<K, V>(&mut self, cf: &rocksdb::ColumnFamily, key: K, value: V)
&self,
key: K,
value: V,
) -> Result<(), sled::transaction::UnabortableTransactionError>
where where
K: IntoDisk + Debug, K: IntoDisk + Debug,
V: IntoDisk; V: IntoDisk;
} }
impl DiskSerialize for sled::transaction::TransactionalTree { impl DiskSerialize for rocksdb::WriteBatch {
fn zs_insert<K, V>( fn zs_insert<K, V>(&mut self, cf: &rocksdb::ColumnFamily, key: K, value: V)
&self,
key: K,
value: V,
) -> Result<(), sled::transaction::UnabortableTransactionError>
where where
K: IntoDisk + Debug, K: IntoDisk + Debug,
V: IntoDisk, V: IntoDisk,
{ {
use std::any::type_name; let key_bytes = key.as_bytes();
let value_bytes = value.as_bytes();
let key_bytes = key.into_ivec(); self.put_cf(cf, key_bytes, value_bytes);
let value_bytes = value.into_ivec();
let previous = self.insert(key_bytes, value_bytes)?;
assert!(
previous.is_none(),
"duplicate key: previous value for key {:?} was not none when inserting into ({}, {}) sled Tree",
key,
type_name::<K>(),
type_name::<V>()
);
Ok(())
} }
} }
/// Helper trait for retrieving values from sled trees with a consistently /// Helper trait for retrieving values from rocksdb column familys with a consistently
/// defined format /// defined format
pub trait DiskDeserialize { pub trait DiskDeserialize {
/// Serialize the given key and use that to get and deserialize the /// Serialize the given key and use that to get and deserialize the
/// corresponding value from a sled tree, if it is present. /// corresponding value from a rocksdb column family, if it is present.
fn zs_get<K, V>(&self, key: &K) -> Option<V> fn zs_get<K, V>(&self, cf: &rocksdb::ColumnFamily, key: &K) -> Option<V>
where where
K: IntoDisk, K: IntoDisk,
V: FromDisk; V: FromDisk;
} }
impl DiskDeserialize for sled::Tree { impl DiskDeserialize for rocksdb::DB {
fn zs_get<K, V>(&self, key: &K) -> Option<V> fn zs_get<K, V>(&self, cf: &rocksdb::ColumnFamily, key: &K) -> Option<V>
where where
K: IntoDisk, K: IntoDisk,
V: FromDisk, V: FromDisk,
{ {
let key_bytes = key.as_bytes(); let key_bytes = key.as_bytes();
// We use `get_pinned_cf` to avoid taking ownership of the serialized
// format because we're going to deserialize it anyways, which avoids an
// extra copy
let value_bytes = self let value_bytes = self
.get(key_bytes) .get_pinned_cf(cf, key_bytes)
.expect("expected that sled errors would not occur"); .expect("expected that disk errors would not occur");
value_bytes.map(V::from_ivec) value_bytes.map(V::from_bytes)
} }
} }
@ -347,8 +282,8 @@ mod tests {
where where
T: IntoDisk + FromDisk, T: IntoDisk + FromDisk,
{ {
let bytes = input.into_ivec(); let bytes = input.as_bytes();
T::from_ivec(bytes) T::from_bytes(bytes)
} }
fn assert_round_trip<T>(input: T) fn assert_round_trip<T>(input: T)
@ -364,8 +299,8 @@ mod tests {
where where
T: IntoDisk + FromDisk, T: IntoDisk + FromDisk,
{ {
let bytes = input.into_ivec(); let bytes = input.as_bytes();
T::from_ivec(bytes) T::from_bytes(bytes)
} }
fn assert_round_trip_ref<T>(input: &T) fn assert_round_trip_ref<T>(input: &T)
@ -381,8 +316,8 @@ mod tests {
where where
T: IntoDisk + FromDisk, T: IntoDisk + FromDisk,
{ {
let bytes = input.into_ivec(); let bytes = input.as_bytes();
T::from_ivec(bytes) T::from_bytes(bytes)
} }
fn assert_round_trip_arc<T>(input: Arc<T>) fn assert_round_trip_arc<T>(input: Arc<T>)
@ -394,9 +329,9 @@ mod tests {
assert_eq!(*before, after); assert_eq!(*before, after);
} }
/// The round trip test covers types that are used as value field in a sled /// The round trip test covers types that are used as value field in a rocksdb
/// Tree. Only these types are ever deserialized, and so they're the only /// column family. Only these types are ever deserialized, and so they're the only
/// ones that implement both `IntoSled` and `FromSled`. /// ones that implement both `IntoDisk` and `FromDisk`.
fn assert_value_properties<T>(input: T) fn assert_value_properties<T>(input: T)
where where
T: IntoDisk + FromDisk + Clone + PartialEq + std::fmt::Debug, T: IntoDisk + FromDisk + Clone + PartialEq + std::fmt::Debug,
@ -406,21 +341,6 @@ mod tests {
assert_round_trip(input); assert_round_trip(input);
} }
/// This test asserts that types that are used as sled keys behave correctly.
/// Any type that implements `IntoIVec` can be used as a sled key. The value
/// is serialized via `IntoSled::into_ivec` when the `key`, `value` pair is
/// inserted into the sled tree. The `as_bytes` impl on the other hand is
/// called for most other operations when comparing a key against existing
/// keys in the sled database, such as `contains`.
fn assert_as_bytes_matches_ivec<T>(input: T)
where
T: IntoDisk + Clone,
{
let before = input.clone();
let ivec = input.into_ivec();
assert_eq!(before.as_bytes().as_ref(), ivec.as_ref());
}
#[test] #[test]
fn roundtrip_transaction_location() { fn roundtrip_transaction_location() {
zebra_test::init(); zebra_test::init();
@ -452,52 +372,4 @@ mod tests {
proptest!(|(val in any::<transparent::Output>())| assert_value_properties(val)); proptest!(|(val in any::<transparent::Output>())| assert_value_properties(val));
} }
#[test]
fn key_matches_ivec_transaction_location() {
zebra_test::init();
proptest!(|(val in any::<TransactionLocation>())| assert_as_bytes_matches_ivec(val));
}
#[test]
fn key_matches_ivec_trans_hash() {
zebra_test::init();
proptest!(|(val in any::<transaction::Hash>())| assert_as_bytes_matches_ivec(val));
}
#[test]
fn key_matches_ivec_block_hash() {
zebra_test::init();
proptest!(|(val in any::<block::Hash>())| assert_as_bytes_matches_ivec(val));
}
#[test]
fn key_matches_ivec_sprout_nullifier() {
zebra_test::init();
proptest!(|(val in any::<sprout::Nullifier>())| assert_as_bytes_matches_ivec(val));
}
#[test]
fn key_matches_ivec_sapling_nullifier() {
zebra_test::init();
proptest!(|(val in any::<sapling::Nullifier>())| assert_as_bytes_matches_ivec(val));
}
#[test]
fn key_matches_ivec_block_height() {
zebra_test::init();
proptest!(|(val in any::<block::Height>())| assert_as_bytes_matches_ivec(val));
}
#[test]
fn key_matches_ivec_transparent_output() {
zebra_test::init();
proptest!(|(val in any::<transparent::Output>())| assert_as_bytes_matches_ivec(val));
}
#[test]
fn key_matches_ivec_transparent_outpoint() {
zebra_test::init();
proptest!(|(val in any::<transparent::OutPoint>())| assert_as_bytes_matches_ivec(val));
}
} }

View File

@ -313,7 +313,7 @@ fn persistent_mode() -> Result<()> {
// Make sure the command was killed // Make sure the command was killed
output.assert_was_killed()?; output.assert_was_killed()?;
// Check that we have persistent sled database // Check that we have persistent rocksdb database
let cache_dir = testdir.path().join("state"); let cache_dir = testdir.path().join("state");
assert_with_context!(cache_dir.read_dir()?.count() > 0, &output); assert_with_context!(cache_dir.read_dir()?.count() > 0, &output);
@ -536,8 +536,8 @@ fn restart_stop_at_height() -> Result<()> {
SMALL_CHECKPOINT_TIMEOUT, SMALL_CHECKPOINT_TIMEOUT,
None, None,
)?; )?;
// if stopping corrupts the sled database, zebrad might hang here // if stopping corrupts the rocksdb database, zebrad might hang here
// if stopping does not sync the sled database, the logs will contain OnCommit // if stopping does not sync the rocksdb database, the logs will contain OnCommit
sync_until( sync_until(
Height(0), Height(0),
Mainnet, Mainnet,