diff --git a/coordinator/src/db.rs b/coordinator/src/db.rs index 77a12d8a..ed74a7c3 100644 --- a/coordinator/src/db.rs +++ b/coordinator/src/db.rs @@ -15,7 +15,7 @@ use serai_client::{ pub use serai_db::*; use ::tributary::ReadWrite; -use crate::tributary::{TributarySpec, Transaction}; +use crate::tributary::{TributarySpec, Transaction, scanner::RecognizedIdType}; #[derive(Debug)] pub struct MainDb<D: Db>(PhantomData<D>); @@ -106,24 +106,30 @@ impl<D: Db> MainDb<D> { res } - fn first_preprocess_key(network: NetworkId, id: [u8; 32]) -> Vec<u8> { - Self::main_key(b"first_preprocess", (network, id).encode()) + fn first_preprocess_key(network: NetworkId, id_type: RecognizedIdType, id: [u8; 32]) -> Vec<u8> { + Self::main_key(b"first_preprocess", (network, id_type, id).encode()) } pub fn save_first_preprocess( txn: &mut D::Transaction<'_>, network: NetworkId, + id_type: RecognizedIdType, id: [u8; 32], preprocess: Vec<u8>, ) { - let key = Self::first_preprocess_key(network, id); + let key = Self::first_preprocess_key(network, id_type, id); if let Some(existing) = txn.get(&key) { assert_eq!(existing, preprocess, "saved a distinct first preprocess"); return; } txn.put(key, preprocess); } - pub fn first_preprocess<G: Get>(getter: &G, network: NetworkId, id: [u8; 32]) -> Option<Vec<u8>> { - getter.get(Self::first_preprocess_key(network, id)) + pub fn first_preprocess<G: Get>( + getter: &G, + network: NetworkId, + id_type: RecognizedIdType, + id: [u8; 32], + ) -> Option<Vec<u8>> { + getter.get(Self::first_preprocess_key(network, id_type, id)) } fn last_received_batch_key(network: NetworkId) -> Vec<u8> { diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 4576d4d1..80d3712e 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -476,7 +476,13 @@ async fn handle_processor_message<D: Db, P: P2p>( ProcessorMessage::Sign(msg) => match msg { sign::ProcessorMessage::Preprocess { id, preprocess } => { if id.attempt == 0 { - MainDb::<D>::save_first_preprocess(&mut txn, network, id.id, preprocess); + MainDb::<D>::save_first_preprocess( + &mut txn, + network, + RecognizedIdType::Plan, + id.id, + preprocess, + ); vec![] } else { @@ -527,7 +533,13 @@ async fn handle_processor_message<D: Db, P: P2p>( // If this is the first attempt instance, wait until we synchronize around the batch // first if id.attempt == 0 { - MainDb::<D>::save_first_preprocess(&mut txn, spec.set().network, id.id, preprocess); + MainDb::<D>::save_first_preprocess( + &mut txn, + spec.set().network, + RecognizedIdType::Batch, + id.id, + preprocess, + ); // If this is the new key's first Batch, only create this TX once we verify all // all prior published `Batch`s @@ -860,10 +872,10 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>( // received/saved, creating a race between Tributary ack and the availability of all // Preprocesses // This waits until the necessary preprocess is available 0, - // TODO: Incorporate RecognizedIdType here? - let get_preprocess = |raw_db, id| async move { + let get_preprocess = |raw_db, id_type, id| async move { loop { - let Some(preprocess) = MainDb::<D>::first_preprocess(raw_db, set.network, id) else { + let Some(preprocess) = MainDb::<D>::first_preprocess(raw_db, set.network, id_type, id) + else { sleep(Duration::from_millis(100)).await; continue; }; @@ -875,14 +887,14 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>( RecognizedIdType::Batch => Transaction::BatchPreprocess(SignData { plan: id, attempt: 0, - data: get_preprocess(&raw_db, id).await, + data: get_preprocess(&raw_db, id_type, id).await, signed: Transaction::empty_signed(), }), RecognizedIdType::Plan => Transaction::SignPreprocess(SignData { plan: id, attempt: 0, - data: get_preprocess(&raw_db, id).await, + data: get_preprocess(&raw_db, id_type, id).await, signed: Transaction::empty_signed(), }), }; @@ -909,7 +921,7 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>( }; // This is safe to perform multiple times and solely needs atomicity with regards to // itself - // TODO: Should this not take a TXN accordingly? It's best practice to take a txn, yet + // TODO: Should this not take a txn accordingly? It's best practice to take a txn, yet // taking a txn fails to declare its achieved independence let mut txn = raw_db.txn(); publish_signed_transaction(&mut txn, tributary, tx).await; diff --git a/coordinator/src/tributary/scanner.rs b/coordinator/src/tributary/scanner.rs index 25bb3f2f..9fb7a80f 100644 --- a/coordinator/src/tributary/scanner.rs +++ b/coordinator/src/tributary/scanner.rs @@ -7,6 +7,7 @@ use ciphersuite::{Ciphersuite, Ristretto}; use tokio::sync::broadcast; +use scale::{Encode, Decode}; use serai_client::{validator_sets::primitives::ValidatorSet, subxt::utils::Encoded, Serai}; use tributary::{ @@ -27,7 +28,7 @@ use crate::{ P2p, }; -#[derive(Clone, Copy, PartialEq, Eq, Debug)] +#[derive(Clone, Copy, PartialEq, Eq, Debug, Encode, Decode)] pub enum RecognizedIdType { Batch, Plan,