diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 4c58a75d..beccfc9f 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -21,7 +21,7 @@ use serai_env as env; use serai_client::{ primitives::NetworkId, validator_sets::primitives::{Session, ValidatorSet}, - Public, Serai, + Public, Serai, SeraiInInstructions, }; use message_queue::{Service, client::MessageQueue}; @@ -286,7 +286,7 @@ async fn handle_processor_message( continue; } - let tx = Serai::execute_batch(batch.clone()); + let tx = SeraiInInstructions::execute_batch(batch.clone()); log::debug!("attempting to publish batch {:?} {}", batch.batch.network, batch.batch.id,); // This publish may fail if this transactions already exists in the mempool, which is // possible, or if this batch was already executed on-chain @@ -833,7 +833,7 @@ pub async fn run( let mut txn = raw_db.txn(); publish_signed_transaction(&mut txn, tributary, tx).await; txn.commit(); - break + break; } } } diff --git a/coordinator/src/substrate/mod.rs b/coordinator/src/substrate/mod.rs index 6f963650..33e3381f 100644 --- a/coordinator/src/substrate/mod.rs +++ b/coordinator/src/substrate/mod.rs @@ -9,14 +9,14 @@ use zeroize::Zeroizing; use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto}; use serai_client::{ - SeraiError, Block, Serai, + SeraiError, Block, Serai, TemporalSerai, primitives::{BlockHash, NetworkId}, validator_sets::{ primitives::{ValidatorSet, KeyPair, amortize_excess_key_shares}, ValidatorSetsEvent, }, in_instructions::InInstructionsEvent, - tokens::{primitives::OutInstructionWithBalance, TokensEvent}, + coins::{primitives::OutInstructionWithBalance, TokensEvent}, }; use serai_db::DbTxn; @@ -37,12 +37,10 @@ pub use db::*; async fn in_set( key: &Zeroizing<::F>, - serai: &Serai, + serai: &TemporalSerai<'_>, set: ValidatorSet, - block_hash: [u8; 32], ) -> Result, SeraiError> { - let Some(participants) = serai.get_validator_set_participants(set.network, block_hash).await? - else { + let Some(participants) = serai.validator_sets().participants(set.network).await? else { return Ok(None); }; let key = (Ristretto::generator() * key.deref()).to_bytes(); @@ -57,30 +55,35 @@ async fn handle_new_set( block: &Block, set: ValidatorSet, ) -> Result<(), SeraiError> { - if in_set(key, serai, set, block.hash()).await?.expect("NewSet for set which doesn't exist") { + if in_set(key, &serai.as_of(block.hash()), set) + .await? + .expect("NewSet for set which doesn't exist") + { log::info!("present in set {:?}", set); - let set_participants = serai - .get_validator_set_participants(set.network, block.hash()) - .await? - .expect("NewSet for set which doesn't exist"); + let set_data = { + let serai = serai.as_of(block.hash()).validator_sets(); + let set_participants = + serai.participants(set.network).await?.expect("NewSet for set which doesn't exist"); - let allocation_per_key_share = serai - .get_allocation_per_key_share(set.network, block.hash()) - .await? - .expect("NewSet for set which didn't have an allocation per key share") - .0; - - let mut set_data = vec![]; - for participant in set_participants { - let allocation = serai - .get_allocation(set.network, participant, block.hash()) + let allocation_per_key_share = serai + .allocation_per_key_share(set.network) .await? - .expect("validator selected for set yet didn't have an allocation") + .expect("NewSet for set which didn't have an allocation per key share") .0; - set_data.push((participant, allocation / allocation_per_key_share)); - } - amortize_excess_key_shares(&mut set_data); + + let mut set_data = vec![]; + for participant in set_participants { + let allocation = serai + .allocation(set.network, participant) + .await? + .expect("validator selected for set yet didn't have an allocation") + .0; + set_data.push((participant, allocation / allocation_per_key_share)); + } + amortize_excess_key_shares(&mut set_data); + set_data + }; let time = if let Ok(time) = block.time() { time @@ -88,7 +91,7 @@ async fn handle_new_set( assert_eq!(block.number(), 0); // Use the next block's time loop { - let Ok(Some(res)) = serai.get_block_by_number(1).await else { + let Ok(Some(res)) = serai.block_by_number(1).await else { sleep(Duration::from_secs(5)).await; continue; }; @@ -132,7 +135,9 @@ async fn handle_key_gen( context: SubstrateContext { serai_time: block.time().unwrap() / 1000, network_latest_finalized_block: serai - .get_latest_block_for_network(block.hash(), set.network) + .as_of(block.hash()) + .in_instructions() + .latest_block_for_network(set.network) .await? // The processor treats this as a magic value which will cause it to find a network // block which has a time greater than or equal to the Serai time @@ -153,8 +158,6 @@ async fn handle_batch_and_burns( serai: &Serai, block: &Block, ) -> Result<(), SeraiError> { - let hash = block.hash(); - // Track which networks had events with a Vec in ordr to preserve the insertion order // While that shouldn't be needed, ensuring order never hurts, and may enable design choices // with regards to Processor <-> Coordinator message passing @@ -173,7 +176,8 @@ async fn handle_batch_and_burns( let mut batches = HashMap::>::new(); let mut burns = HashMap::new(); - for batch in serai.get_batch_events(hash).await? { + let serai = serai.as_of(block.hash()); + for batch in serai.in_instructions().batch_events().await? { if let InInstructionsEvent::Batch { network, id, block: network_block, instructions_hash } = batch { @@ -193,7 +197,7 @@ async fn handle_batch_and_burns( } } - for burn in serai.get_burn_events(hash).await? { + for burn in serai.coins().burn_events().await? { if let TokensEvent::Burn { address: _, balance, instruction } = burn { let network = balance.coin.network(); network_had_event(&mut burns, &mut batches, network); @@ -213,7 +217,8 @@ async fn handle_batch_and_burns( } else { // If it's had a batch or a burn, it must have had a block acknowledged serai - .get_latest_block_for_network(hash, network) + .in_instructions() + .latest_block_for_network(network) .await? .expect("network had a batch/burn yet never set a latest block") }; @@ -255,7 +260,7 @@ async fn handle_block::handled_event(&db.0, hash, event_id) { log::info!("found fresh key gen event {:?}", key_gen); if let ValidatorSetsEvent::KeyGen { set, key_pair } = key_gen { @@ -326,7 +331,7 @@ async fn handle_new_blocks Result<(), SeraiError> { // Check if there's been a new Substrate block - let latest = serai.get_latest_block().await?; + let latest = serai.latest_block().await?; let latest_number = latest.number(); if latest_number < *next_block { return Ok(()); @@ -345,7 +350,7 @@ async fn handle_new_blocks( else { // Timed out, which may be because Serai isn't finalizing or may be some issue with the // notifier - if serai.get_latest_block().await.map(|block| block.number()).ok() == + if serai.latest_block().await.map(|block| block.number()).ok() == Some(next_substrate_block.saturating_sub(1)) { log::info!("serai hasn't finalized a block in the last 60s..."); @@ -447,7 +452,7 @@ pub async fn is_active_set(serai: &Serai, set: ValidatorSet) -> bool { // TODO: Track this from the Substrate scanner to reduce our overhead? We'd only have a DB // call, instead of a series of network requests let latest = loop { - let Ok(res) = serai.get_latest_block_hash().await else { + let Ok(res) = serai.latest_block_hash().await else { log::error!( "couldn't get the latest block hash from serai when checking tributary relevancy" ); @@ -458,7 +463,7 @@ pub async fn is_active_set(serai: &Serai, set: ValidatorSet) -> bool { }; let latest_session = loop { - let Ok(res) = serai.get_session(set.network, latest).await else { + let Ok(res) = serai.as_of(latest).validator_sets().session(set.network).await else { log::error!("couldn't get the latest session from serai when checking tributary relevancy"); sleep(Duration::from_secs(5)).await; continue; @@ -477,7 +482,7 @@ pub async fn is_active_set(serai: &Serai, set: ValidatorSet) -> bool { } else { // Since the next session has started, check its handover status let keys = loop { - let Ok(res) = serai.get_keys(set, latest).await else { + let Ok(res) = serai.as_of(latest).validator_sets().keys(set).await else { log::error!( "couldn't get the keys for a session from serai when checking tributary relevancy" ); @@ -506,10 +511,12 @@ pub(crate) async fn get_expected_next_batch(serai: &Serai, network: NetworkId) - } first = false; - let Ok(latest_block) = serai.get_latest_block().await else { + let Ok(latest_block) = serai.latest_block().await else { continue; }; - let Ok(last) = serai.get_last_batch_for_network(latest_block.hash(), network).await else { + let Ok(last) = + serai.as_of(latest_block.hash()).in_instructions().last_batch_for_network(network).await + else { continue; }; break if let Some(last) = last { last + 1 } else { 0 }; diff --git a/coordinator/src/tributary/handle.rs b/coordinator/src/tributary/handle.rs index 8b4ff9df..7c92611d 100644 --- a/coordinator/src/tributary/handle.rs +++ b/coordinator/src/tributary/handle.rs @@ -19,7 +19,7 @@ use serai_client::{ Signature, validator_sets::primitives::{ValidatorSet, KeyPair, musig_context, set_keys_message}, subxt::utils::Encoded, - Serai, + SeraiValidatorSets, }; use tributary::Signed; @@ -413,7 +413,7 @@ pub(crate) async fn handle_application_tx< publish_serai_tx( spec.set(), - Serai::set_validator_set_keys(spec.set().network, key_pair, Signature(sig)), + SeraiValidatorSets::set_keys(spec.set().network, key_pair, Signature(sig)), ) .await; } diff --git a/coordinator/src/tributary/scanner.rs b/coordinator/src/tributary/scanner.rs index bbe72dd9..fa9c118d 100644 --- a/coordinator/src/tributary/scanner.rs +++ b/coordinator/src/tributary/scanner.rs @@ -216,10 +216,11 @@ pub(crate) async fn scan_tributaries_task< // creation // TODO2: Differentiate connection errors from invariants Err(e) => { - if let Ok(latest) = serai.get_latest_block_hash().await { + if let Ok(latest) = serai.latest_block_hash().await { + let serai = serai.as_of(latest).validator_sets(); // Check if this failed because the keys were already set by someone // else - if matches!(serai.get_keys(spec.set(), latest).await, Ok(Some(_))) { + if matches!(serai.keys(spec.set()).await, Ok(Some(_))) { log::info!("another coordinator set key pair for {:?}", set); break; } @@ -230,7 +231,7 @@ pub(crate) async fn scan_tributaries_task< // some point did set keys, and we're just operating off very // historical data if let Ok(Some(current_session)) = - serai.get_session(spec.set().network, latest).await + serai.session(spec.set().network).await { if current_session.0 > spec.set().session.0 { log::warn!(