Improve handling of tasks in coordinator, one per Tributary scanner

This commit is contained in:
Luke Parker 2023-09-25 20:27:44 -04:00
parent 9f3840d1cf
commit 60491a091f
No known key found for this signature in database
3 changed files with 111 additions and 104 deletions

View file

@ -99,7 +99,7 @@ async fn add_tributary<D: Db, Pro: Processors, P: P2p>(
spec.n(), spec.n(),
spec spec
.i(Ristretto::generator() * key.deref()) .i(Ristretto::generator() * key.deref())
.expect("adding a tribuary for a set we aren't in set for"), .expect("adding a tribtuary for a set we aren't in set for"),
) )
.unwrap(), .unwrap(),
}, },
@ -195,13 +195,22 @@ pub async fn scan_substrate<D: Db, Pro: Processors>(
} }
} }
pub(crate) trait RIDTrait<FRid>:
Clone + Fn(NetworkId, [u8; 32], RecognizedIdType, [u8; 32], u32) -> FRid
{
}
impl<FRid, F: Clone + Fn(NetworkId, [u8; 32], RecognizedIdType, [u8; 32], u32) -> FRid>
RIDTrait<FRid> for F
{
}
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
pub async fn scan_tributaries< pub(crate) async fn scan_tributaries<
D: Db, D: Db,
Pro: Processors, Pro: Processors,
P: P2p, P: P2p,
FRid: Future<Output = ()>, FRid: Send + Future<Output = ()>,
RID: Clone + Fn(NetworkId, [u8; 32], RecognizedIdType, [u8; 32], u32) -> FRid, RID: 'static + Send + Sync + RIDTrait<FRid>,
>( >(
raw_db: D, raw_db: D,
key: Zeroizing<<Ristretto as Ciphersuite>::F>, key: Zeroizing<<Ristretto as Ciphersuite>::F>,
@ -213,64 +222,74 @@ pub async fn scan_tributaries<
) { ) {
log::info!("scanning tributaries"); log::info!("scanning tributaries");
// Handle new Tributary blocks
let mut tributary_readers = vec![];
let mut tributary_db = tributary::TributaryDb::new(raw_db.clone());
loop { loop {
while let Ok(ActiveTributary { spec, tributary }) = { match new_tributary.recv().await {
match new_tributary.try_recv() { Ok(ActiveTributary { spec, tributary }) => {
Ok(tributary) => Ok(tributary), // For each Tributary, spawn a dedicated scanner task
Err(broadcast::error::TryRecvError::Empty) => Err(()), tokio::spawn({
Err(broadcast::error::TryRecvError::Lagged(_)) => { let raw_db = raw_db.clone();
panic!("scan_tributaries lagged to handle new_tributary") let key = key.clone();
} let recognized_id = recognized_id.clone();
Err(broadcast::error::TryRecvError::Closed) => panic!("new_tributary sender closed"), let p2p = p2p.clone();
} let processors = processors.clone();
} {
tributary_readers.push((spec, tributary.reader()));
}
for (spec, reader) in &tributary_readers {
tributary::scanner::handle_new_blocks::<_, _, _, _, _, _, P>(
&mut tributary_db,
&key,
recognized_id.clone(),
&processors,
|set, tx| {
let serai = serai.clone(); let serai = serai.clone();
async move { async move {
let spec = &spec;
let reader = tributary.reader();
let mut tributary_db = tributary::TributaryDb::new(raw_db.clone());
loop { loop {
match serai.publish(&tx).await { tributary::scanner::handle_new_blocks::<_, _, _, _, _, _, P>(
Ok(_) => { &mut tributary_db,
log::info!("set key pair for {set:?}"); &key,
break; recognized_id.clone(),
} &processors,
// This is assumed to be some ephemeral error due to the assumed fault-free |set, tx| {
// creation let serai = serai.clone();
// TODO2: Differentiate connection errors from invariants async move {
Err(e) => { loop {
// Check if this failed because the keys were already set by someone else match serai.publish(&tx).await {
if matches!(serai.get_keys(spec.set()).await, Ok(Some(_))) { Ok(_) => {
log::info!("another coordinator set key pair for {:?}", set); log::info!("set key pair for {set:?}");
break; break;
} }
// This is assumed to be some ephemeral error due to the assumed fault-free
// creation
// TODO2: Differentiate connection errors from invariants
Err(e) => {
// Check if this failed because the keys were already set by someone else
if matches!(serai.get_keys(spec.set()).await, Ok(Some(_))) {
log::info!("another coordinator set key pair for {:?}", set);
break;
}
log::error!("couldn't connect to Serai node to publish set_keys TX: {:?}", e); log::error!(
tokio::time::sleep(Duration::from_secs(10)).await; "couldn't connect to Serai node to publish set_keys TX: {:?}",
} e
} );
tokio::time::sleep(Duration::from_secs(10)).await;
}
}
}
}
},
spec,
&reader,
)
.await;
// Sleep for half the block time
// TODO: Define a notification system for when a new block occurs
sleep(Duration::from_secs((Tributary::<D, Transaction, P>::block_time() / 2).into()))
.await;
} }
} }
}, });
spec, }
reader, Err(broadcast::error::RecvError::Lagged(_)) => {
) panic!("scan_tributaries lagged to handle new_tributary")
.await; }
Err(broadcast::error::RecvError::Closed) => panic!("new_tributary sender closed"),
} }
// Sleep for half the block time
// TODO2: Define a notification system for when a new block occurs
sleep(Duration::from_secs((Tributary::<D, Transaction, P>::block_time() / 2).into())).await;
} }
} }
@ -288,7 +307,7 @@ pub async fn heartbeat_tributaries<D: Db, P: P2p>(
Ok(tributary) => Ok(tributary), Ok(tributary) => Ok(tributary),
Err(broadcast::error::TryRecvError::Empty) => Err(()), Err(broadcast::error::TryRecvError::Empty) => Err(()),
Err(broadcast::error::TryRecvError::Lagged(_)) => { Err(broadcast::error::TryRecvError::Lagged(_)) => {
panic!("heartbeat lagged to handle new_tributary") panic!("heartbeat_tributaries lagged to handle new_tributary")
} }
Err(broadcast::error::TryRecvError::Closed) => panic!("new_tributary sender closed"), Err(broadcast::error::TryRecvError::Closed) => panic!("new_tributary sender closed"),
} }
@ -328,30 +347,27 @@ pub async fn handle_p2p<D: Db, P: P2p>(
p2p: P, p2p: P,
mut new_tributary: broadcast::Receiver<ActiveTributary<D, P>>, mut new_tributary: broadcast::Receiver<ActiveTributary<D, P>>,
) { ) {
// TODO: Merge this into the below loop. We don't need an extra task here
let tributaries = Arc::new(RwLock::new(HashMap::new())); let tributaries = Arc::new(RwLock::new(HashMap::new()));
tokio::spawn({
let tributaries = tributaries.clone();
async move {
loop {
match new_tributary.recv().await {
Ok(tributary) => {
tributaries.write().await.insert(tributary.spec.genesis(), tributary);
}
Err(broadcast::error::RecvError::Lagged(_)) => {
panic!("handle_p2p lagged to handle new_tributary")
}
Err(broadcast::error::RecvError::Closed) => panic!("new_tributary sender closed"),
}
}
}
});
loop { loop {
while let Ok(tributary) = {
match new_tributary.try_recv() {
Ok(tributary) => Ok(tributary),
Err(broadcast::error::TryRecvError::Empty) => Err(()),
Err(broadcast::error::TryRecvError::Lagged(_)) => {
panic!("handle_p2p lagged to handle new_tributary")
}
Err(broadcast::error::TryRecvError::Closed) => panic!("new_tributary sender closed"),
}
} {
// TODO: Because the below maintains a read lock, this will never process until all prior P2P
// messages were handled. That's a notable latency spike
tributaries.write().await.insert(tributary.spec.genesis(), tributary);
}
let mut msg = p2p.receive().await; let mut msg = p2p.receive().await;
// Spawn a dedicated task to handle this message, ensuring any singularly latent message // Spawn a dedicated task to handle this message, ensuring any singularly latent message
// doesn't hold everything up // doesn't hold everything up
// TODO2: Move to one task per tributary (or two. One for Tendermint, one for Tributary) // TODO: Move to one task per tributary (or two. One for Tendermint, one for Tributary)
tokio::spawn({ tokio::spawn({
let p2p = p2p.clone(); let p2p = p2p.clone();
let tributaries = tributaries.clone(); let tributaries = tributaries.clone();
@ -494,26 +510,21 @@ pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>(
) { ) {
let pub_key = Ristretto::generator() * key.deref(); let pub_key = Ristretto::generator() * key.deref();
// TODO: Merge this into the below loop. We don't need an extra task here let mut tributaries = HashMap::new();
let tributaries = Arc::new(RwLock::new(HashMap::new()));
tokio::spawn({
let tributaries = tributaries.clone();
async move {
loop {
match new_tributary.recv().await {
Ok(tributary) => {
tributaries.write().await.insert(tributary.spec.genesis(), tributary);
}
Err(broadcast::error::RecvError::Lagged(_)) => {
panic!("handle_processors lagged to handle new_tributary")
}
Err(broadcast::error::RecvError::Closed) => panic!("new_tributary sender closed"),
}
}
}
});
loop { loop {
while let Ok(tributary) = {
match new_tributary.try_recv() {
Ok(tributary) => Ok(tributary),
Err(broadcast::error::TryRecvError::Empty) => Err(()),
Err(broadcast::error::TryRecvError::Lagged(_)) => {
panic!("handle_processors lagged to handle new_tributary")
}
Err(broadcast::error::TryRecvError::Closed) => panic!("new_tributary sender closed"),
}
} {
tributaries.insert(tributary.spec.genesis(), tributary);
}
// TODO: Dispatch this message to a task dedicated to handling this processor, preventing one // TODO: Dispatch this message to a task dedicated to handling this processor, preventing one
// processor from holding up all the others. This would require a peek method be added to the // processor from holding up all the others. This would require a peek method be added to the
// message-queue (to view multiple future messages at once) // message-queue (to view multiple future messages at once)
@ -525,7 +536,7 @@ pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>(
// (which means there's a lack of multisig rotation) // (which means there's a lack of multisig rotation)
let spec = { let spec = {
let mut spec = None; let mut spec = None;
for tributary in tributaries.read().await.values() { for tributary in tributaries.values() {
if tributary.spec.set().network == msg.network { if tributary.spec.set().network == msg.network {
spec = Some(tributary.spec.clone()); spec = Some(tributary.spec.clone());
break; break;
@ -763,7 +774,6 @@ pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>(
// If this created a transaction, publish it // If this created a transaction, publish it
if let Some(mut tx) = tx { if let Some(mut tx) = tx {
log::trace!("processor message effected transaction {}", hex::encode(tx.hash())); log::trace!("processor message effected transaction {}", hex::encode(tx.hash()));
let tributaries = tributaries.read().await;
let Some(tributary) = tributaries.get(&genesis) else { let Some(tributary) = tributaries.get(&genesis) else {
// TODO: This can happen since Substrate tells the Processor to generate commitments // TODO: This can happen since Substrate tells the Processor to generate commitments
// at the same time it tells the Tributary to be created // at the same time it tells the Tributary to be created

View file

@ -17,7 +17,6 @@ use frost_schnorrkel::Schnorrkel;
use serai_client::{ use serai_client::{
Signature, Signature,
primitives::NetworkId,
validator_sets::primitives::{ValidatorSet, KeyPair, musig_context, set_keys_message}, validator_sets::primitives::{ValidatorSet, KeyPair, musig_context, set_keys_message},
subxt::utils::Encoded, subxt::utils::Encoded,
Serai, Serai,
@ -225,13 +224,13 @@ pub fn generated_key_pair<D: Db>(
DkgConfirmer::share(spec, key, attempt, preprocesses, key_pair) DkgConfirmer::share(spec, key, attempt, preprocesses, key_pair)
} }
pub async fn handle_application_tx< pub(crate) async fn handle_application_tx<
D: Db, D: Db,
Pro: Processors, Pro: Processors,
FPst: Future<Output = ()>, FPst: Future<Output = ()>,
PST: Clone + Fn(ValidatorSet, Encoded) -> FPst, PST: Clone + Fn(ValidatorSet, Encoded) -> FPst,
FRid: Future<Output = ()>, FRid: Future<Output = ()>,
RID: Clone + Fn(NetworkId, [u8; 32], RecognizedIdType, [u8; 32], u32) -> FRid, RID: crate::RIDTrait<FRid>,
>( >(
tx: Transaction, tx: Transaction,
spec: &TributarySpec, spec: &TributarySpec,

View file

@ -4,9 +4,7 @@ use zeroize::Zeroizing;
use ciphersuite::{Ciphersuite, Ristretto}; use ciphersuite::{Ciphersuite, Ristretto};
use serai_client::{ use serai_client::{validator_sets::primitives::ValidatorSet, subxt::utils::Encoded};
primitives::NetworkId, validator_sets::primitives::ValidatorSet, subxt::utils::Encoded,
};
use tributary::{ use tributary::{
Transaction as TributaryTransaction, Block, TributaryReader, Transaction as TributaryTransaction, Block, TributaryReader,
@ -40,7 +38,7 @@ async fn handle_block<
FPst: Future<Output = ()>, FPst: Future<Output = ()>,
PST: Clone + Fn(ValidatorSet, Encoded) -> FPst, PST: Clone + Fn(ValidatorSet, Encoded) -> FPst,
FRid: Future<Output = ()>, FRid: Future<Output = ()>,
RID: Clone + Fn(NetworkId, [u8; 32], RecognizedIdType, [u8; 32], u32) -> FRid, RID: crate::RIDTrait<FRid>,
P: P2p, P: P2p,
>( >(
db: &mut TributaryDb<D>, db: &mut TributaryDb<D>,
@ -101,13 +99,13 @@ async fn handle_block<
// TODO2: Trigger any necessary re-attempts // TODO2: Trigger any necessary re-attempts
} }
pub async fn handle_new_blocks< pub(crate) async fn handle_new_blocks<
D: Db, D: Db,
Pro: Processors, Pro: Processors,
FPst: Future<Output = ()>, FPst: Future<Output = ()>,
PST: Clone + Fn(ValidatorSet, Encoded) -> FPst, PST: Clone + Fn(ValidatorSet, Encoded) -> FPst,
FRid: Future<Output = ()>, FRid: Future<Output = ()>,
RID: Clone + Fn(NetworkId, [u8; 32], RecognizedIdType, [u8; 32], u32) -> FRid, RID: crate::RIDTrait<FRid>,
P: P2p, P: P2p,
>( >(
db: &mut TributaryDb<D>, db: &mut TributaryDb<D>,