Clean out create_new_tributary

It made sense when the task was in main.rs. Now that it isn't, it's a pointless
indirection.
This commit is contained in:
Luke Parker 2023-10-14 16:09:24 -04:00
parent 863a7842ca
commit 5897efd7c7
No known key found for this signature in database

View file

@ -47,10 +47,10 @@ async fn in_set(
Ok(Some(participants.iter().any(|participant| participant.0 == key))) Ok(Some(participants.iter().any(|participant| participant.0 == key)))
} }
async fn handle_new_set<D: Db, CNT: Clone + Fn(&mut D, TributarySpec)>( async fn handle_new_set<D: Db>(
db: &mut D, txn: &mut D::Transaction<'_>,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>, key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
create_new_tributary: CNT, new_tributary_spec: &mpsc::UnboundedSender<TributarySpec>,
serai: &Serai, serai: &Serai,
block: &Block, block: &Block,
set: ValidatorSet, set: ValidatorSet,
@ -107,7 +107,18 @@ async fn handle_new_set<D: Db, CNT: Clone + Fn(&mut D, TributarySpec)>(
let time = time + SUBSTRATE_TO_TRIBUTARY_TIME_DELAY; let time = time + SUBSTRATE_TO_TRIBUTARY_TIME_DELAY;
let spec = TributarySpec::new(block.hash(), time, set, set_data); let spec = TributarySpec::new(block.hash(), time, set, set_data);
create_new_tributary(db, spec.clone());
log::info!("creating new tributary for {:?}", spec.set());
// Save it to the database now, not on the channel receiver's side, so this is safe against
// reboots
// If this txn finishes, and we reboot, then this'll be reloaded from active Tributaries
// If this txn doesn't finish, this will be re-fired
// If we waited to save to the DB, this txn may be finished, preventing re-firing, yet the
// prior fired event may have not been received yet
crate::MainDb::<D>::add_active_tributary(txn, &spec);
new_tributary_spec.send(spec).unwrap();
} else { } else {
log::info!("not present in set {:?}", set); log::info!("not present in set {:?}", set);
} }
@ -245,11 +256,10 @@ async fn handle_batch_and_burns<D: Db, Pro: Processors>(
// Handle a specific Substrate block, returning an error when it fails to get data // Handle a specific Substrate block, returning an error when it fails to get data
// (not blocking / holding) // (not blocking / holding)
#[allow(clippy::needless_pass_by_ref_mut)] // False positive? async fn handle_block<D: Db, Pro: Processors>(
async fn handle_block<D: Db, CNT: Clone + Fn(&mut D, TributarySpec), Pro: Processors>(
db: &mut SubstrateDb<D>, db: &mut SubstrateDb<D>,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>, key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
create_new_tributary: CNT, new_tributary_spec: &mpsc::UnboundedSender<TributarySpec>,
processors: &Pro, processors: &Pro,
serai: &Serai, serai: &Serai,
block: Block, block: Block,
@ -277,8 +287,8 @@ async fn handle_block<D: Db, CNT: Clone + Fn(&mut D, TributarySpec), Pro: Proces
if !SubstrateDb::<D>::handled_event(&db.0, hash, event_id) { if !SubstrateDb::<D>::handled_event(&db.0, hash, event_id) {
log::info!("found fresh new set event {:?}", new_set); log::info!("found fresh new set event {:?}", new_set);
handle_new_set(&mut db.0, key, create_new_tributary.clone(), serai, &block, set).await?;
let mut txn = db.0.txn(); let mut txn = db.0.txn();
handle_new_set::<D>(&mut txn, key, new_tributary_spec, serai, &block, set).await?;
SubstrateDb::<D>::handle_event(&mut txn, hash, event_id); SubstrateDb::<D>::handle_event(&mut txn, hash, event_id);
txn.commit(); txn.commit();
} }
@ -322,10 +332,10 @@ async fn handle_block<D: Db, CNT: Clone + Fn(&mut D, TributarySpec), Pro: Proces
Ok(()) Ok(())
} }
async fn handle_new_blocks<D: Db, CNT: Clone + Fn(&mut D, TributarySpec), Pro: Processors>( async fn handle_new_blocks<D: Db, Pro: Processors>(
db: &mut SubstrateDb<D>, db: &mut SubstrateDb<D>,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>, key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
create_new_tributary: CNT, new_tributary_spec: &mpsc::UnboundedSender<TributarySpec>,
processors: &Pro, processors: &Pro,
serai: &Serai, serai: &Serai,
next_block: &mut u64, next_block: &mut u64,
@ -343,7 +353,7 @@ async fn handle_new_blocks<D: Db, CNT: Clone + Fn(&mut D, TributarySpec), Pro: P
handle_block( handle_block(
db, db,
key, key,
create_new_tributary.clone(), new_tributary_spec,
processors, processors,
serai, serai,
if b == latest_number { if b == latest_number {
@ -420,29 +430,7 @@ pub async fn scan_task<D: Db, Pro: Processors>(
match handle_new_blocks( match handle_new_blocks(
&mut db, &mut db,
&key, &key,
|db: &mut D, spec: TributarySpec| { &new_tributary_spec,
log::info!("creating new tributary for {:?}", spec.set());
// Check it isn't already present in the DB due to rescanning this block upon reboot
for existing_spec in crate::MainDb::<D>::active_tributaries(db).1 {
if spec.set() == existing_spec.set() {
log::warn!(
"already created tributary {:?}, this should only happen on reboot",
spec.set()
);
return;
}
}
// Save it to the database
let mut txn = db.txn();
crate::MainDb::<D>::add_active_tributary(&mut txn, &spec);
txn.commit();
// If we reboot before this is read, the fact it was saved to the database means it'll be
// handled on reboot
new_tributary_spec.send(spec).unwrap();
},
&processors, &processors,
&serai, &serai,
&mut next_substrate_block, &mut next_substrate_block,