Refactor rate limiting to not store `Sleep` type (#2915)

In newer Tokio versions the `Sleep` type doesn't implement `Unpin`, so
it's a little more complicated to use it. In this case it was easier to
refactor the code to not store the `Sleep` type instead of wrapping it
in a `Pin` type.
This commit is contained in:
Janito Vaqueiro Ferreira Filho 2021-10-21 08:47:04 -03:00 committed by GitHub
parent d2a5af0ea5
commit 192a45ccf1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 6 additions and 7 deletions

View File

@ -1,7 +1,7 @@
use std::{cmp::min, mem, sync::Arc, time::Duration}; use std::{cmp::min, sync::Arc};
use futures::stream::{FuturesUnordered, StreamExt}; use futures::stream::{FuturesUnordered, StreamExt};
use tokio::time::{sleep, timeout, Instant, Sleep}; use tokio::time::{sleep_until, timeout, Instant};
use tower::{Service, ServiceExt}; use tower::{Service, ServiceExt};
use zebra_chain::serialization::DateTime32; use zebra_chain::serialization::DateTime32;
@ -114,7 +114,7 @@ mod tests;
pub(crate) struct CandidateSet<S> { pub(crate) struct CandidateSet<S> {
pub(super) address_book: Arc<std::sync::Mutex<AddressBook>>, pub(super) address_book: Arc<std::sync::Mutex<AddressBook>>,
pub(super) peer_service: S, pub(super) peer_service: S,
wait_next_handshake: Sleep, min_next_handshake: Instant,
min_next_crawl: Instant, min_next_crawl: Instant,
} }
@ -131,7 +131,7 @@ where
CandidateSet { CandidateSet {
address_book, address_book,
peer_service, peer_service,
wait_next_handshake: sleep(Duration::from_secs(0)), min_next_handshake: Instant::now(),
min_next_crawl: Instant::now(), min_next_crawl: Instant::now(),
} }
} }
@ -321,9 +321,8 @@ where
}; };
// SECURITY: rate-limit new outbound peer connections // SECURITY: rate-limit new outbound peer connections
(&mut self.wait_next_handshake).await; sleep_until(self.min_next_handshake).await;
let mut sleep = sleep(constants::MIN_PEER_CONNECTION_INTERVAL); self.min_next_handshake = Instant::now() + constants::MIN_PEER_CONNECTION_INTERVAL;
mem::swap(&mut self.wait_next_handshake, &mut sleep);
Some(reconnect) Some(reconnect)
} }