From 71fe4c4c735d9095fc0b08afc80ec6a11fd61aca Mon Sep 17 00:00:00 2001 From: Alfredo Garcia Date: Thu, 21 Jul 2022 20:14:58 -0300 Subject: [PATCH] try to do deserialization of transaction in a rayon thread (#4801) * try to do deserialization of transaction in a rayon thread * Try tokio::task::block_in_place instead * fix tests * add deserialize block into rayon pool * fill some docs Co-authored-by: teor --- Cargo.lock | 1 + zebra-network/Cargo.toml | 1 + zebra-network/src/protocol/external/codec.rs | 67 ++++++++++++++++--- .../components/inbound/tests/real_peer_set.rs | 4 +- 4 files changed, 62 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 978dc716..b320b67e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6408,6 +6408,7 @@ dependencies = [ "proptest", "proptest-derive", "rand 0.8.5", + "rayon", "regex", "serde", "static_assertions", diff --git a/zebra-network/Cargo.toml b/zebra-network/Cargo.toml index b1bba785..cb3206c8 100644 --- a/zebra-network/Cargo.toml +++ b/zebra-network/Cargo.toml @@ -24,6 +24,7 @@ lazy_static = "1.4.0" ordered-map = "0.4.2" pin-project = "1.0.10" rand = { version = "0.8.5", package = "rand" } +rayon = "1.5.3" regex = "1.5.6" serde = { version = "1.0.137", features = ["serde_derive"] } thiserror = "1.0.31" diff --git a/zebra-network/src/protocol/external/codec.rs b/zebra-network/src/protocol/external/codec.rs index d249eca0..89fc68ff 100644 --- a/zebra-network/src/protocol/external/codec.rs +++ b/zebra-network/src/protocol/external/codec.rs @@ -570,8 +570,9 @@ impl Codec { Ok(Message::GetAddr) } - fn read_block(&self, reader: R) -> Result { - Ok(Message::Block(Block::zcash_deserialize(reader)?.into())) + fn read_block(&self, reader: R) -> Result { + let result = Self::deserialize_block_spawning(reader); + Ok(Message::Block(result?.into())) } fn read_getblocks(&self, mut reader: R) -> Result { @@ -625,8 +626,9 @@ impl Codec { Ok(Message::NotFound(Vec::zcash_deserialize(reader)?)) } - fn read_tx(&self, reader: R) -> Result { - Ok(Message::Tx(Transaction::zcash_deserialize(reader)?.into())) + fn read_tx(&self, reader: R) -> Result { + let result = Self::deserialize_transaction_spawning(reader); + Ok(Message::Tx(result?.into())) } fn read_mempool(&self, mut _reader: R) -> Result { @@ -674,6 +676,52 @@ impl Codec { fn read_filterclear(&self, mut _reader: R) -> Result { Ok(Message::FilterClear) } + + /// Given the reader, deserialize the transaction in the rayon thread pool. + #[allow(clippy::unwrap_in_result)] + fn deserialize_transaction_spawning( + reader: R, + ) -> Result { + let mut result = None; + + // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. + // + // Since we use `block_in_place()`, other futures running on the connection task will be blocked: + // https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html + // + // We can't use `spawn_blocking()` because: + // - The `reader` has a lifetime (but we could replace it with a `Vec` of message data) + // - There is no way to check the blocking task's future for panics + tokio::task::block_in_place(|| { + rayon::in_place_scope_fifo(|s| { + s.spawn_fifo(|_s| result = Some(Transaction::zcash_deserialize(reader))) + }) + }); + + result.expect("scope has already finished") + } + + /// Given the reader, deserialize the block in the rayon thread pool. + #[allow(clippy::unwrap_in_result)] + fn deserialize_block_spawning(reader: R) -> Result { + let mut result = None; + + // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. + // + // Since we use `block_in_place()`, other futures running on the connection task will be blocked: + // https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html + // + // We can't use `spawn_blocking()` because: + // - The `reader` has a lifetime (but we could replace it with a `Vec` of message data) + // - There is no way to check the blocking task's future for panics + tokio::task::block_in_place(|| { + rayon::in_place_scope_fifo(|s| { + s.spawn_fifo(|_s| result = Some(Block::zcash_deserialize(reader))) + }) + }); + + result.expect("scope has already finished") + } } // XXX replace these interior unit tests with exterior integration tests + proptest @@ -943,7 +991,8 @@ mod tests { fn max_msg_size_round_trip() { use zebra_chain::serialization::ZcashDeserializeInto; - let rt = zebra_test::init_async(); + //let rt = zebra_test::init_async(); + zebra_test::init(); // make tests with a Tx message let tx: Transaction = zebra_test::vectors::DUMMY_TX1 @@ -957,7 +1006,7 @@ mod tests { let size = 85; // reducing the max size to body size - 1 - rt.block_on(async { + zebra_test::MULTI_THREADED_RUNTIME.block_on(async { let mut bytes = Vec::new(); { let mut fw = FramedWrite::new( @@ -971,7 +1020,7 @@ mod tests { }); // send again with the msg body size as max size - let msg_bytes = rt.block_on(async { + let msg_bytes = zebra_test::MULTI_THREADED_RUNTIME.block_on(async { let mut bytes = Vec::new(); { let mut fw = FramedWrite::new( @@ -986,7 +1035,7 @@ mod tests { }); // receive with a reduced max size - rt.block_on(async { + zebra_test::MULTI_THREADED_RUNTIME.block_on(async { let mut fr = FramedRead::new( Cursor::new(&msg_bytes), Codec::builder().with_max_body_len(size - 1).finish(), @@ -998,7 +1047,7 @@ mod tests { }); // receive again with the tx size as max size - rt.block_on(async { + zebra_test::MULTI_THREADED_RUNTIME.block_on(async { let mut fr = FramedRead::new( Cursor::new(&msg_bytes), Codec::builder().with_max_body_len(size).finish(), diff --git a/zebrad/src/components/inbound/tests/real_peer_set.rs b/zebrad/src/components/inbound/tests/real_peer_set.rs index dc5ef009..cef025d0 100644 --- a/zebrad/src/components/inbound/tests/real_peer_set.rs +++ b/zebrad/src/components/inbound/tests/real_peer_set.rs @@ -336,7 +336,7 @@ async fn inbound_tx_empty_state_notfound() -> Result<(), crate::BoxError> { /// /// Uses a Zebra network stack's peer set to query an isolated Zebra TCP connection, /// with an unrelated transaction test responder. -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn outbound_tx_unrelated_response_notfound() -> Result<(), crate::BoxError> { // We respond with an unrelated transaction, so the peer gives up on the request. let unrelated_response: Transaction = @@ -486,7 +486,7 @@ async fn outbound_tx_unrelated_response_notfound() -> Result<(), crate::BoxError /// - returns a `NotFoundRegistry` error for repeated requests to a non-responsive peer. /// /// The requests are coming from the full stack to the isolated peer. -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn outbound_tx_partial_response_notfound() -> Result<(), crate::BoxError> { // We repeatedly respond with the same transaction, so the peer gives up on the second response. let repeated_tx: Transaction = zebra_test::vectors::DUMMY_TX1.zcash_deserialize_into()?;