Remove lazy_static for proper use of channels

This commit is contained in:
Luke Parker 2023-09-25 18:23:39 -04:00
parent 62a1a45135
commit 77f7794452
No known key found for this signature in database
5 changed files with 43 additions and 47 deletions

1
Cargo.lock generated
View file

@ -8166,7 +8166,6 @@ dependencies = [
"frost-schnorrkel",
"futures",
"hex",
"lazy_static",
"libp2p",
"log",
"modular-frost",

View file

@ -15,7 +15,6 @@ rustdoc-args = ["--cfg", "docsrs"]
[dependencies]
async-trait = "0.1"
lazy_static = "1"
zeroize = "^1.5"
rand_core = "0.6"

View file

@ -6,7 +6,7 @@ use core::{ops::Deref, future::Future};
use std::{
sync::Arc,
time::{SystemTime, Duration},
collections::{VecDeque, HashMap},
collections::HashMap,
};
use zeroize::{Zeroize, Zeroizing};
@ -27,7 +27,10 @@ use serai_client::{primitives::NetworkId, Public, Serai};
use message_queue::{Service, client::MessageQueue};
use futures::stream::StreamExt;
use tokio::{sync::RwLock, time::sleep};
use tokio::{
sync::{RwLock, mpsc},
time::sleep,
};
use ::tributary::{
ReadWrite, ProvidedError, TransactionKind, TransactionTrait, Block, Tributary, TributaryReader,
@ -54,11 +57,6 @@ mod substrate;
#[cfg(test)]
pub mod tests;
lazy_static::lazy_static! {
// This is a static to satisfy lifetime expectations
static ref NEW_TRIBUTARIES: RwLock<VecDeque<TributarySpec>> = RwLock::new(VecDeque::new());
}
pub struct ActiveTributary<D: Db, P: P2p> {
pub spec: TributarySpec,
pub tributary: Arc<RwLock<Tributary<D, Transaction, P>>>,
@ -103,6 +101,7 @@ pub async fn scan_substrate<D: Db, Pro: Processors>(
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
processors: Pro,
serai: Arc<Serai>,
new_tributary_channel: mpsc::UnboundedSender<TributarySpec>,
) {
log::info!("scanning substrate");
@ -162,9 +161,7 @@ pub async fn scan_substrate<D: Db, Pro: Processors>(
// Add it to the queue
// If we reboot before this is read from the queue, the fact it was saved to the database
// means it'll be handled on reboot
async {
NEW_TRIBUTARIES.write().await.push_back(spec);
}
new_tributary_channel.send(spec).unwrap();
},
&processors,
&serai,
@ -181,7 +178,7 @@ pub async fn scan_substrate<D: Db, Pro: Processors>(
}
}
#[allow(clippy::type_complexity)]
#[allow(clippy::too_many_arguments, clippy::type_complexity)]
pub async fn scan_tributaries<
D: Db,
Pro: Processors,
@ -196,6 +193,7 @@ pub async fn scan_tributaries<
processors: Pro,
serai: Arc<Serai>,
tributaries: Arc<RwLock<Tributaries<D, P>>>,
mut new_tributary_channel: mpsc::UnboundedReceiver<TributarySpec>,
) {
log::info!("scanning tributaries");
@ -209,11 +207,10 @@ pub async fn scan_tributaries<
loop {
// The following handle_new_blocks function may take an arbitrary amount of time
// Accordingly, it may take a long time to acquire a write lock on the tributaries table
// By definition of NEW_TRIBUTARIES, we allow tributaries to be added almost immediately,
// meaning the Substrate scanner won't become blocked on this
// By definition of new_tributary_channel, we allow tributaries to be 'added' almost
// immediately, meaning the Substrate scanner won't become blocked on this
{
let mut new_tributaries = NEW_TRIBUTARIES.write().await;
while let Some(spec) = new_tributaries.pop_front() {
while let Ok(spec) = new_tributary_channel.try_recv() {
let reader = add_tributary(
raw_db.clone(),
key.clone(),
@ -225,6 +222,7 @@ pub async fn scan_tributaries<
.await;
// Trigger a DKG for the newly added Tributary
// TODO: This needs to moved into add_tributary, or else we may never emit GenerateKey
let set = spec.set();
processors
.send(
@ -799,8 +797,16 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
) {
let serai = Arc::new(serai);
let (new_tributary_channel_send, new_tributary_channel_recv) = mpsc::unbounded_channel();
// Handle new Substrate blocks
tokio::spawn(scan_substrate(raw_db.clone(), key.clone(), processors.clone(), serai.clone()));
tokio::spawn(scan_substrate(
raw_db.clone(),
key.clone(),
processors.clone(),
serai.clone(),
new_tributary_channel_send,
));
// Handle the Tributaries
@ -883,6 +889,7 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
processors.clone(),
serai.clone(),
tributaries.clone(),
new_tributary_channel_recv,
));
}

View file

@ -149,10 +149,6 @@ struct Behavior {
mdns: libp2p::mdns::tokio::Behaviour,
}
lazy_static::lazy_static! {
static ref TIME_OF_LAST_P2P_MESSAGE: Mutex<Instant> = Mutex::new(Instant::now());
}
#[allow(clippy::type_complexity)]
#[derive(Clone)]
pub struct LibP2p(
@ -246,10 +242,16 @@ impl LibP2p {
let (receive_send, receive_recv) = mpsc::unbounded_channel();
tokio::spawn({
let mut time_of_last_p2p_message = Instant::now();
#[allow(clippy::needless_pass_by_ref_mut)] // False positive
async fn broadcast_raw(p2p: &mut Swarm<Behavior>, msg: Vec<u8>) {
async fn broadcast_raw(
p2p: &mut Swarm<Behavior>,
time_of_last_p2p_message: &mut Instant,
msg: Vec<u8>,
) {
// Update the time of last message
*TIME_OF_LAST_P2P_MESSAGE.lock().await = Instant::now();
*time_of_last_p2p_message = Instant::now();
match p2p.behaviour_mut().gossipsub.publish(IdentTopic::new(LIBP2P_TOPIC), msg.clone()) {
Err(PublishError::SigningError(e)) => panic!("signing error when broadcasting: {e}"),
@ -267,8 +269,7 @@ impl LibP2p {
async move {
// Run this task ad-infinitum
loop {
let time_since_last =
Instant::now().duration_since(*TIME_OF_LAST_P2P_MESSAGE.lock().await);
let time_since_last = Instant::now().duration_since(time_of_last_p2p_message);
tokio::select! {
biased;
@ -276,6 +277,7 @@ impl LibP2p {
msg = broadcast_recv.recv() => {
broadcast_raw(
&mut swarm,
&mut time_of_last_p2p_message,
msg.expect("broadcast_recv closed. are we shutting down?")
).await;
}
@ -324,7 +326,11 @@ impl LibP2p {
// (where a finalized block only occurs due to network activity), meaning this won't be
// run
_ = tokio::time::sleep(Duration::from_secs(80).saturating_sub(time_since_last)) => {
broadcast_raw(&mut swarm, P2pMessageKind::KeepAlive.serialize()).await;
broadcast_raw(
&mut swarm,
&mut time_of_last_p2p_message,
P2pMessageKind::KeepAlive.serialize()
).await;
}
}
}

View file

@ -1,4 +1,4 @@
use core::{ops::Deref, time::Duration, future::Future};
use core::{ops::Deref, time::Duration};
use std::collections::{HashSet, HashMap};
use zeroize::Zeroizing;
@ -43,12 +43,7 @@ async fn in_set(
Ok(Some(data.participants.iter().any(|(participant, _)| participant.0 == key)))
}
async fn handle_new_set<
D: Db,
Fut: Future<Output = ()>,
CNT: Clone + Fn(&mut D, TributarySpec) -> Fut,
Pro: Processors,
>(
async fn handle_new_set<D: Db, CNT: Clone + Fn(&mut D, TributarySpec), Pro: Processors>(
db: &mut D,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
create_new_tributary: CNT,
@ -84,7 +79,7 @@ async fn handle_new_set<
let time = time + SUBSTRATE_TO_TRIBUTARY_TIME_DELAY;
let spec = TributarySpec::new(block.hash(), time, set, set_data);
create_new_tributary(db, spec.clone()).await;
create_new_tributary(db, spec.clone());
} else {
log::info!("not present in set {:?}", set);
}
@ -215,12 +210,7 @@ async fn handle_batch_and_burns<Pro: Processors>(
// Handle a specific Substrate block, returning an error when it fails to get data
// (not blocking / holding)
#[allow(clippy::needless_pass_by_ref_mut)] // False positive?
async fn handle_block<
D: Db,
Fut: Future<Output = ()>,
CNT: Clone + Fn(&mut D, TributarySpec) -> Fut,
Pro: Processors,
>(
async fn handle_block<D: Db, CNT: Clone + Fn(&mut D, TributarySpec), Pro: Processors>(
db: &mut SubstrateDb<D>,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
create_new_tributary: CNT,
@ -295,12 +285,7 @@ async fn handle_block<
Ok(())
}
pub async fn handle_new_blocks<
D: Db,
Fut: Future<Output = ()>,
CNT: Clone + Fn(&mut D, TributarySpec) -> Fut,
Pro: Processors,
>(
pub async fn handle_new_blocks<D: Db, CNT: Clone + Fn(&mut D, TributarySpec), Pro: Processors>(
db: &mut SubstrateDb<D>,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
create_new_tributary: CNT,