Localize scan_substrate as substrate::scan_task

This commit is contained in:
Luke Parker 2023-10-13 22:31:26 -04:00
parent 4143fe9f47
commit 67951c4971
No known key found for this signature in database
3 changed files with 93 additions and 90 deletions

View file

@ -22,7 +22,6 @@ use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorS
use message_queue::{Service, client::MessageQueue}; use message_queue::{Service, client::MessageQueue};
use futures::stream::StreamExt;
use tokio::{ use tokio::{
sync::{RwLock, mpsc, broadcast}, sync::{RwLock, mpsc, broadcast},
time::sleep, time::sleep,
@ -108,89 +107,6 @@ async fn add_tributary<D: Db, Pro: Processors, P: P2p>(
.unwrap(); .unwrap();
} }
pub async fn scan_substrate<D: Db, Pro: Processors>(
db: D,
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
processors: Pro,
serai: Arc<Serai>,
new_tributary_spec: mpsc::UnboundedSender<TributarySpec>,
) {
log::info!("scanning substrate");
let mut db = SubstrateDb::new(db);
let mut next_substrate_block = db.next_block();
let new_substrate_block_notifier = {
let serai = &serai;
move || async move {
loop {
match serai.newly_finalized_block().await {
Ok(sub) => return sub,
Err(e) => {
log::error!("couldn't communicate with serai node: {e}");
sleep(Duration::from_secs(5)).await;
}
}
}
}
};
let mut substrate_block_notifier = new_substrate_block_notifier().await;
loop {
// await the next block, yet if our notifier had an error, re-create it
{
let Ok(next_block) =
tokio::time::timeout(Duration::from_secs(60), substrate_block_notifier.next()).await
else {
// Timed out, which may be because Serai isn't finalizing or may be some issue with the
// notifier
if serai.get_latest_block().await.map(|block| block.number()).ok() ==
Some(next_substrate_block.saturating_sub(1))
{
log::info!("serai hasn't finalized a block in the last 60s...");
} else {
substrate_block_notifier = new_substrate_block_notifier().await;
}
continue;
};
// next_block is a Option<Result>
if next_block.and_then(Result::ok).is_none() {
substrate_block_notifier = new_substrate_block_notifier().await;
continue;
}
}
match substrate::handle_new_blocks(
&mut db,
&key,
|db: &mut D, spec: TributarySpec| {
log::info!("creating new tributary for {:?}", spec.set());
// Save it to the database
let mut txn = db.txn();
MainDb::<D>::add_active_tributary(&mut txn, &spec);
txn.commit();
// If we reboot before this is read, the fact it was saved to the database means it'll be
// handled on reboot
new_tributary_spec.send(spec).unwrap();
},
&processors,
&serai,
&mut next_substrate_block,
)
.await
{
Ok(()) => {}
Err(e) => {
log::error!("couldn't communicate with serai node: {e}");
sleep(Duration::from_secs(5)).await;
}
}
}
}
pub async fn heartbeat_tributaries<D: Db, P: P2p>( pub async fn heartbeat_tributaries<D: Db, P: P2p>(
p2p: P, p2p: P,
mut new_tributary: broadcast::Receiver<ActiveTributary<D, P>>, mut new_tributary: broadcast::Receiver<ActiveTributary<D, P>>,
@ -1032,7 +948,7 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
} }
// Handle new Substrate blocks // Handle new Substrate blocks
tokio::spawn(scan_substrate( tokio::spawn(crate::substrate::scan_task(
raw_db.clone(), raw_db.clone(),
key.clone(), key.clone(),
processors.clone(), processors.clone(),
@ -1147,7 +1063,7 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
// Handle new blocks for each Tributary // Handle new blocks for each Tributary
{ {
let raw_db = raw_db.clone(); let raw_db = raw_db.clone();
tokio::spawn(tributary::scanner::scan_tributaries( tokio::spawn(tributary::scanner::scan_tributaries_task(
raw_db, raw_db,
key.clone(), key.clone(),
recognized_id, recognized_id,

View file

@ -1,5 +1,8 @@
use core::{ops::Deref, time::Duration}; use core::{ops::Deref, time::Duration};
use std::collections::{HashSet, HashMap}; use std::{
sync::Arc,
collections::{HashSet, HashMap},
};
use zeroize::Zeroizing; use zeroize::Zeroizing;
@ -20,7 +23,8 @@ use serai_db::DbTxn;
use processor_messages::SubstrateContext; use processor_messages::SubstrateContext;
use tokio::time::sleep; use futures::stream::StreamExt;
use tokio::{sync::mpsc, time::sleep};
use crate::{ use crate::{
Db, Db,
@ -313,7 +317,7 @@ async fn handle_block<D: Db, CNT: Clone + Fn(&mut D, TributarySpec), Pro: Proces
Ok(()) Ok(())
} }
pub async fn handle_new_blocks<D: Db, CNT: Clone + Fn(&mut D, TributarySpec), Pro: Processors>( async fn handle_new_blocks<D: Db, CNT: Clone + Fn(&mut D, TributarySpec), Pro: Processors>(
db: &mut SubstrateDb<D>, db: &mut SubstrateDb<D>,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>, key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
create_new_tributary: CNT, create_new_tributary: CNT,
@ -407,3 +411,86 @@ pub async fn is_active_set(serai: &Serai, set: ValidatorSet) -> bool {
true true
} }
pub async fn scan_task<D: Db, Pro: Processors>(
db: D,
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
processors: Pro,
serai: Arc<Serai>,
new_tributary_spec: mpsc::UnboundedSender<TributarySpec>,
) {
log::info!("scanning substrate");
let mut db = SubstrateDb::new(db);
let mut next_substrate_block = db.next_block();
let new_substrate_block_notifier = {
let serai = &serai;
move || async move {
loop {
match serai.newly_finalized_block().await {
Ok(sub) => return sub,
Err(e) => {
log::error!("couldn't communicate with serai node: {e}");
sleep(Duration::from_secs(5)).await;
}
}
}
}
};
let mut substrate_block_notifier = new_substrate_block_notifier().await;
loop {
// await the next block, yet if our notifier had an error, re-create it
{
let Ok(next_block) =
tokio::time::timeout(Duration::from_secs(60), substrate_block_notifier.next()).await
else {
// Timed out, which may be because Serai isn't finalizing or may be some issue with the
// notifier
if serai.get_latest_block().await.map(|block| block.number()).ok() ==
Some(next_substrate_block.saturating_sub(1))
{
log::info!("serai hasn't finalized a block in the last 60s...");
} else {
substrate_block_notifier = new_substrate_block_notifier().await;
}
continue;
};
// next_block is a Option<Result>
if next_block.and_then(Result::ok).is_none() {
substrate_block_notifier = new_substrate_block_notifier().await;
continue;
}
}
match handle_new_blocks(
&mut db,
&key,
|db: &mut D, spec: TributarySpec| {
log::info!("creating new tributary for {:?}", spec.set());
// Save it to the database
let mut txn = db.txn();
crate::MainDb::<D>::add_active_tributary(&mut txn, &spec);
txn.commit();
// If we reboot before this is read, the fact it was saved to the database means it'll be
// handled on reboot
new_tributary_spec.send(spec).unwrap();
},
&processors,
&serai,
&mut next_substrate_block,
)
.await
{
Ok(()) => {}
Err(e) => {
log::error!("couldn't communicate with serai node: {e}");
sleep(Duration::from_secs(5)).await;
}
}
}
}

View file

@ -163,7 +163,7 @@ pub(crate) async fn handle_new_blocks<
} }
} }
pub(crate) async fn scan_tributaries< pub(crate) async fn scan_tributaries_task<
D: Db, D: Db,
Pro: Processors, Pro: Processors,
P: P2p, P: P2p,