Have Coordinator MainDb take a mutable borrow

This commit is contained in:
Luke Parker 2023-04-26 00:10:06 -04:00
parent 7824b6cb8b
commit 6032af6692
No known key found for this signature in database
3 changed files with 27 additions and 28 deletions

View file

@ -3,9 +3,9 @@ pub use serai_db::*;
use crate::tributary::TributarySpec; use crate::tributary::TributarySpec;
#[derive(Debug)] #[derive(Debug)]
pub struct MainDb<D: Db>(pub D); pub struct MainDb<'a, D: Db>(&'a mut D);
impl<D: Db> MainDb<D> { impl<'a, D: Db> MainDb<'a, D> {
pub fn new(db: D) -> Self { pub fn new(db: &'a mut D) -> Self {
Self(db) Self(db)
} }

View file

@ -1,4 +1,3 @@
#![allow(dead_code)]
#![allow(unused_variables)] #![allow(unused_variables)]
#![allow(unreachable_code)] #![allow(unreachable_code)]
#![allow(clippy::diverging_sub_expression)] #![allow(clippy::diverging_sub_expression)]
@ -46,16 +45,6 @@ lazy_static::lazy_static! {
static ref NEW_TRIBUTARIES: RwLock<VecDeque<TributarySpec>> = RwLock::new(VecDeque::new()); static ref NEW_TRIBUTARIES: RwLock<VecDeque<TributarySpec>> = RwLock::new(VecDeque::new());
} }
// Specifies a new tributary
async fn create_new_tributary<D: Db>(db: D, spec: TributarySpec) {
// Save it to the database
MainDb(db).add_active_tributary(&spec);
// Add it to the queue
// If we reboot before this is read from the queue, the fact it was saved to the database
// means it'll be handled on reboot
NEW_TRIBUTARIES.write().await.push_back(spec);
}
pub struct ActiveTributary<D: Db, P: P2p> { pub struct ActiveTributary<D: Db, P: P2p> {
pub spec: TributarySpec, pub spec: TributarySpec,
pub tributary: Arc<RwLock<Tributary<D, Transaction, P>>>, pub tributary: Arc<RwLock<Tributary<D, Transaction, P>>>,
@ -104,7 +93,17 @@ pub async fn scan_substrate<D: Db, Pro: Processor>(
match substrate::handle_new_blocks( match substrate::handle_new_blocks(
&mut db, &mut db,
&key, &key,
create_new_tributary, |db: &mut D, spec: TributarySpec| {
// Save it to the database
MainDb::new(db).add_active_tributary(&spec);
// Add it to the queue
// If we reboot before this is read from the queue, the fact it was saved to the database
// means it'll be handled on reboot
async {
NEW_TRIBUTARIES.write().await.push_back(spec);
}
},
&processor, &processor,
&serai, &serai,
&mut last_substrate_block, &mut last_substrate_block,
@ -409,6 +408,7 @@ pub async fn handle_processors<D: Db, Pro: Processor, P: P2p>(
log::warn!("we've already published this transaction. this should only appear on reboot"); log::warn!("we've already published this transaction. this should only appear on reboot");
} else { } else {
// We should've created a valid transaction // We should've created a valid transaction
// TODO: Delay SignPreprocess/BatchPreprocess until associated ID is valid
assert!(tributary.add_transaction(tx).await, "created an invalid transaction"); assert!(tributary.add_transaction(tx).await, "created an invalid transaction");
} }
@ -418,7 +418,7 @@ pub async fn handle_processors<D: Db, Pro: Processor, P: P2p>(
} }
pub async fn run<D: Db, Pro: Processor, P: P2p>( pub async fn run<D: Db, Pro: Processor, P: P2p>(
raw_db: D, mut raw_db: D,
key: Zeroizing<<Ristretto as Ciphersuite>::F>, key: Zeroizing<<Ristretto as Ciphersuite>::F>,
p2p: P, p2p: P,
processor: Pro, processor: Pro,
@ -434,8 +434,7 @@ pub async fn run<D: Db, Pro: Processor, P: P2p>(
let tributaries = Arc::new(RwLock::new(HashMap::<[u8; 32], ActiveTributary<D, P>>::new())); let tributaries = Arc::new(RwLock::new(HashMap::<[u8; 32], ActiveTributary<D, P>>::new()));
// Reload active tributaries from the database // Reload active tributaries from the database
// TODO: Can MainDb take a borrow? for spec in MainDb::new(&mut raw_db).active_tributaries().1 {
for spec in MainDb(raw_db.clone()).active_tributaries().1 {
let _ = add_tributary( let _ = add_tributary(
raw_db.clone(), raw_db.clone(),
key.clone(), key.clone(),

View file

@ -41,12 +41,12 @@ async fn in_set(
async fn handle_new_set< async fn handle_new_set<
D: Db, D: Db,
Fut: Future<Output = ()>, Fut: Future<Output = ()>,
ANT: Clone + Fn(D, TributarySpec) -> Fut, CNT: Clone + Fn(&mut D, TributarySpec) -> Fut,
Pro: Processor, Pro: Processor,
>( >(
db: &D, db: &mut D,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>, key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
add_new_tributary: ANT, create_new_tributary: CNT,
processor: &Pro, processor: &Pro,
serai: &Serai, serai: &Serai,
block: &Block, block: &Block,
@ -56,7 +56,7 @@ async fn handle_new_set<
let set_data = serai.get_validator_set(set).await?.expect("NewSet for set which doesn't exist"); let set_data = serai.get_validator_set(set).await?.expect("NewSet for set which doesn't exist");
let spec = TributarySpec::new(block.hash(), block.time().unwrap(), set, set_data); let spec = TributarySpec::new(block.hash(), block.time().unwrap(), set, set_data);
add_new_tributary(db.clone(), spec.clone()); create_new_tributary(db, spec.clone());
// Trigger a DKG // Trigger a DKG
// TODO: Check how the processor handles this being fired multiple times // TODO: Check how the processor handles this being fired multiple times
@ -210,12 +210,12 @@ async fn handle_batch_and_burns<Pro: Processor>(
async fn handle_block< async fn handle_block<
D: Db, D: Db,
Fut: Future<Output = ()>, Fut: Future<Output = ()>,
ANT: Clone + Fn(D, TributarySpec) -> Fut, CNT: Clone + Fn(&mut D, TributarySpec) -> Fut,
Pro: Processor, Pro: Processor,
>( >(
db: &mut SubstrateDb<D>, db: &mut SubstrateDb<D>,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>, key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
add_new_tributary: ANT, create_new_tributary: CNT,
processor: &Pro, processor: &Pro,
serai: &Serai, serai: &Serai,
block: Block, block: Block,
@ -233,7 +233,7 @@ async fn handle_block<
// stable) // stable)
if !SubstrateDb::<D>::handled_event(&db.0, hash, event_id) { if !SubstrateDb::<D>::handled_event(&db.0, hash, event_id) {
if let ValidatorSetsEvent::NewSet { set } = new_set { if let ValidatorSetsEvent::NewSet { set } = new_set {
handle_new_set(&db.0, key, add_new_tributary.clone(), processor, serai, &block, set) handle_new_set(&mut db.0, key, create_new_tributary.clone(), processor, serai, &block, set)
.await?; .await?;
} else { } else {
panic!("NewSet event wasn't NewSet: {new_set:?}"); panic!("NewSet event wasn't NewSet: {new_set:?}");
@ -278,12 +278,12 @@ async fn handle_block<
pub async fn handle_new_blocks< pub async fn handle_new_blocks<
D: Db, D: Db,
Fut: Future<Output = ()>, Fut: Future<Output = ()>,
ANT: Clone + Fn(D, TributarySpec) -> Fut, CNT: Clone + Fn(&mut D, TributarySpec) -> Fut,
Pro: Processor, Pro: Processor,
>( >(
db: &mut SubstrateDb<D>, db: &mut SubstrateDb<D>,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>, key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
add_new_tributary: ANT, create_new_tributary: CNT,
processor: &Pro, processor: &Pro,
serai: &Serai, serai: &Serai,
last_block: &mut u64, last_block: &mut u64,
@ -300,7 +300,7 @@ pub async fn handle_new_blocks<
handle_block( handle_block(
db, db,
key, key,
add_new_tributary.clone(), create_new_tributary.clone(),
processor, processor,
serai, serai,
if b == latest_number { if b == latest_number {