diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 99eb78d7..b3c00be3 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -16,21 +16,24 @@ use tokio::time::sleep; mod db; pub use db::*; -mod transaction; -pub use transaction::Transaction as TributaryTransaction; +pub mod tributary; mod p2p; pub use p2p::*; +pub mod processor; +use processor::Processor; + mod substrate; #[cfg(test)] mod tests; -async fn run( +async fn run( db: D, key: Zeroizing<::F>, p2p: P, + mut processor: Pro, serai: Serai, ) { let mut db = MainDb::new(db); @@ -39,8 +42,15 @@ async fn run( tokio::spawn(async move { loop { - match substrate::handle_new_blocks(&mut db, &key, &p2p, &serai, &mut last_substrate_block) - .await + match substrate::handle_new_blocks( + &mut db, + &key, + &p2p, + &mut processor, + &serai, + &mut last_substrate_block, + ) + .await { Ok(()) => {} Err(e) => { @@ -63,16 +73,21 @@ async fn run( #[tokio::main] async fn main() { let db = MemDb::new(); // TODO + let key = Zeroizing::new(::F::ZERO); // TODO let p2p = LocalP2p {}; // TODO + + let processor = processor::MemProcessor::new(); // TODO + let serai = || async { loop { let Ok(serai) = Serai::new("ws://127.0.0.1:9944").await else { log::error!("couldn't connect to the Serai node"); + sleep(Duration::from_secs(5)).await; continue }; return serai; } }; - run(db, key, p2p, serai().await).await + run(db, key, p2p, processor, serai().await).await } diff --git a/coordinator/src/processor.rs b/coordinator/src/processor.rs new file mode 100644 index 00000000..5817e290 --- /dev/null +++ b/coordinator/src/processor.rs @@ -0,0 +1,41 @@ +use std::{ + sync::{Arc, RwLock}, + collections::VecDeque, +}; + +use processor_messages::{ProcessorMessage, CoordinatorMessage}; + +#[derive(Clone, PartialEq, Eq, Debug)] +pub struct Message { + pub id: u64, + pub msg: ProcessorMessage, +} + +#[async_trait::async_trait] +pub trait Processor: 'static + Send + Sync { + async fn send(&mut self, msg: CoordinatorMessage); + async fn recv(&mut self) -> Message; + async fn ack(&mut self, msg: Message); +} + +// TODO: Move this to tests +pub struct MemProcessor(Arc>>); +impl MemProcessor { + #[allow(clippy::new_without_default)] + pub fn new() -> MemProcessor { + MemProcessor(Arc::new(RwLock::new(VecDeque::new()))) + } +} + +#[async_trait::async_trait] +impl Processor for MemProcessor { + async fn send(&mut self, _: CoordinatorMessage) { + todo!() + } + async fn recv(&mut self) -> Message { + todo!() + } + async fn ack(&mut self, _: Message) { + todo!() + } +} diff --git a/coordinator/src/substrate.rs b/coordinator/src/substrate.rs index bd927338..d3fcc258 100644 --- a/coordinator/src/substrate.rs +++ b/coordinator/src/substrate.rs @@ -3,7 +3,6 @@ use std::collections::{HashSet, HashMap}; use zeroize::Zeroizing; -use transcript::{Transcript, RecommendedTranscript}; use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto}; use frost::{Participant, ThresholdParams}; @@ -20,9 +19,9 @@ use serai_client::{ use tributary::Tributary; -use processor_messages::{SubstrateContext, key_gen::KeyGenId}; +use processor_messages::{SubstrateContext, key_gen::KeyGenId, CoordinatorMessage}; -use crate::{Db, MainDb, TributaryTransaction, P2p}; +use crate::{Db, MainDb, P2p, processor::Processor}; async fn get_coin_key(serai: &Serai, set: ValidatorSet) -> Result>, SeraiError> { Ok(serai.get_keys(set).await?.map(|keys| keys.1.into_inner())) @@ -46,10 +45,11 @@ async fn in_set( )) } -async fn handle_new_set( +async fn handle_new_set( db: &mut MainDb, key: &Zeroizing<::F>, p2p: &P, + processor: &mut Pro, serai: &Serai, block: &Block, set: ValidatorSet, @@ -69,21 +69,11 @@ async fn handle_new_set( validators.insert(participant, amount.0 / set_data.bond.0); } - // Calculate the genesis for this Tributary - let mut genesis = RecommendedTranscript::new(b"Serai Tributary Genesis"); - // This locks it to a specific Serai chain - genesis.append_message(b"serai_block", block.hash()); - genesis.append_message(b"session", set.session.0.to_le_bytes()); - genesis.append_message(b"network", set.network.0.to_le_bytes()); - let genesis = genesis.challenge(b"genesis"); - let genesis_ref: &[u8] = genesis.as_ref(); - let genesis = genesis_ref[.. 32].try_into().unwrap(); - // TODO: Do something with this - let tributary = Tributary::<_, TributaryTransaction, _>::new( + let tributary = Tributary::<_, crate::tributary::Transaction, _>::new( // TODO2: Use a DB on a dedicated volume db.0.clone(), - genesis, + crate::tributary::genesis(block.hash(), set), block.time().expect("Serai block didn't have a timestamp set"), key.clone(), validators, @@ -93,22 +83,27 @@ async fn handle_new_set( .unwrap(); // Trigger a DKG - // TODO: Send this to processor. Check how it handles it being fired multiple times + // TODO: Check how the processor handles thi being fired multiple times // We already have a unique event ID based on block, event index (where event index is // the one generated in this handle_block function) // We could use that on this end and the processor end? - let msg = processor_messages::key_gen::CoordinatorMessage::GenerateKey { - id: KeyGenId { set, attempt: 0 }, - params: ThresholdParams::new(t, n, i).unwrap(), - }; + processor + .send(CoordinatorMessage::KeyGen( + processor_messages::key_gen::CoordinatorMessage::GenerateKey { + id: KeyGenId { set, attempt: 0 }, + params: ThresholdParams::new(t, n, i).unwrap(), + }, + )) + .await; } Ok(()) } -async fn handle_key_gen( +async fn handle_key_gen( db: &mut MainDb, key: &Zeroizing<::F>, + processor: &mut Pro, serai: &Serai, block: &Block, set: ValidatorSet, @@ -119,25 +114,30 @@ async fn handle_key_gen( .expect("KeyGen occurred for a set which doesn't exist") .is_some() { - // TODO: Send this to processor. Check how it handles it being fired multiple times - let msg = processor_messages::key_gen::CoordinatorMessage::ConfirmKeyPair { - context: SubstrateContext { - coin_latest_finalized_block: serai - .get_latest_block_for_network(block.hash(), set.network) - .await? - .unwrap_or(BlockHash([0; 32])), // TODO: Have the processor override this - }, - // TODO: Check the DB for which attempt used this key pair - id: KeyGenId { set, attempt: todo!() }, - }; + // TODO: Check how the processor handles thi being fired multiple times + processor + .send(CoordinatorMessage::KeyGen( + processor_messages::key_gen::CoordinatorMessage::ConfirmKeyPair { + context: SubstrateContext { + coin_latest_finalized_block: serai + .get_latest_block_for_network(block.hash(), set.network) + .await? + .unwrap_or(BlockHash([0; 32])), // TODO: Have the processor override this + }, + // TODO: Check the DB for which attempt used this key pair + id: KeyGenId { set, attempt: todo!() }, + }, + )) + .await; } Ok(()) } -async fn handle_batch_and_burns( +async fn handle_batch_and_burns( db: &mut MainDb, key: &Zeroizing<::F>, + processor: &mut Pro, serai: &Serai, block: &Block, ) -> Result<(), SeraiError> { @@ -170,17 +170,21 @@ async fn handle_batch_and_burns( // the last batch will be the latest batch, so its block will be the latest block batch_block.insert(network, network_block); - // TODO: Send this to processor. Check how it handles it being fired multiple times - let msg = processor_messages::coordinator::CoordinatorMessage::BatchSigned { - key: get_coin_key( - serai, - // TODO2 - ValidatorSet { network, session: Session(0) }, - ) - .await? - .expect("ValidatorSet without keys signed a batch"), - block: network_block, - }; + // TODO: Check how the processor handles thi being fired multiple times + processor + .send(CoordinatorMessage::Coordinator( + processor_messages::coordinator::CoordinatorMessage::BatchSigned { + key: get_coin_key( + serai, + // TODO2 + ValidatorSet { network, session: Session(0) }, + ) + .await? + .expect("ValidatorSet without keys signed a batch"), + block: network_block, + }, + )) + .await; } else { panic!("Batch event wasn't Batch: {batch:?}"); } @@ -224,18 +228,22 @@ async fn handle_batch_and_burns( .expect("network had a batch/burn yet never set a latest block") }; - // TODO: Send this to processor. Check how it handles it being fired multiple times - let msg = processor_messages::substrate::CoordinatorMessage::SubstrateBlock { - context: SubstrateContext { coin_latest_finalized_block }, - key: get_coin_key( - serai, - // TODO2 - ValidatorSet { network, session: Session(0) }, - ) - .await? - .expect("batch/burn for network which never set keys"), - burns: burns.remove(&network).unwrap(), - }; + // TODO: Check how the processor handles thi being fired multiple times + processor + .send(CoordinatorMessage::Substrate( + processor_messages::substrate::CoordinatorMessage::SubstrateBlock { + context: SubstrateContext { coin_latest_finalized_block }, + key: get_coin_key( + serai, + // TODO2 + ValidatorSet { network, session: Session(0) }, + ) + .await? + .expect("batch/burn for network which never set keys"), + burns: burns.remove(&network).unwrap(), + }, + )) + .await; } Ok(()) @@ -243,10 +251,11 @@ async fn handle_batch_and_burns( // Handle a specific Substrate block, returning an error when it fails to get data // (not blocking / holding) -async fn handle_block( +async fn handle_block( db: &mut MainDb, key: &Zeroizing<::F>, p2p: &P, + processor: &mut Pro, serai: &Serai, block: Block, ) -> Result<(), SeraiError> { @@ -263,7 +272,7 @@ async fn handle_block( // stable) if !db.handled_event(hash, event_id) { if let ValidatorSetsEvent::NewSet { set } = new_set { - handle_new_set(db, key, p2p, serai, &block, set).await?; + handle_new_set(db, key, p2p, processor, serai, &block, set).await?; } else { panic!("NewSet event wasn't NewSet: {new_set:?}"); } @@ -276,7 +285,7 @@ async fn handle_block( for key_gen in serai.get_key_gen_events(hash).await? { if !db.handled_event(hash, event_id) { if let ValidatorSetsEvent::KeyGen { set, key_pair } = key_gen { - handle_key_gen(db, key, serai, &block, set, key_pair).await?; + handle_key_gen(db, key, processor, serai, &block, set, key_pair).await?; } else { panic!("KeyGen event wasn't KeyGen: {key_gen:?}"); } @@ -291,17 +300,18 @@ async fn handle_block( // This does break the uniqueness of (hash, event_id) -> one event, yet // (network, (hash, event_id)) remains valid as a unique ID for an event if !db.handled_event(hash, event_id) { - handle_batch_and_burns(db, key, serai, &block).await?; + handle_batch_and_burns(db, key, processor, serai, &block).await?; } db.handle_event(hash, event_id); Ok(()) } -pub async fn handle_new_blocks( +pub async fn handle_new_blocks( db: &mut MainDb, key: &Zeroizing<::F>, p2p: &P, + processor: &mut Pro, serai: &Serai, last_substrate_block: &mut u64, ) -> Result<(), SeraiError> { @@ -318,11 +328,15 @@ pub async fn handle_new_blocks( db, key, p2p, + processor, serai, if b == latest_number { latest.take().unwrap() } else { - serai.get_block_by_number(b).await?.unwrap() + serai + .get_block_by_number(b) + .await? + .expect("couldn't get block before the latest finalized block") }, ) .await?;