diff --git a/processor/messages/src/lib.rs b/processor/messages/src/lib.rs index ef907f97..4a191b68 100644 --- a/processor/messages/src/lib.rs +++ b/processor/messages/src/lib.rs @@ -46,12 +46,6 @@ pub mod key_gen { } } - impl CoordinatorMessage { - pub fn required_block(&self) -> Option { - None - } - } - #[derive(Clone, PartialEq, Eq, BorshSerialize, BorshDeserialize)] pub enum ProcessorMessage { // Participated in the specified key generation protocol. @@ -133,10 +127,6 @@ pub mod sign { } impl CoordinatorMessage { - pub fn required_block(&self) -> Option { - None - } - pub fn sign_id(&self) -> &SignId { match self { CoordinatorMessage::Preprocesses { id, .. } | @@ -160,6 +150,7 @@ pub mod sign { pub mod coordinator { use super::*; + // TODO: Why does this not simply take the block hash? pub fn cosign_block_msg(block_number: u64, block: [u8; 32]) -> Vec { const DST: &[u8] = b"Cosign"; let mut res = vec![u8::try_from(DST.len()).unwrap()]; @@ -169,36 +160,10 @@ pub mod coordinator { res } - #[derive( - Clone, Copy, PartialEq, Eq, Hash, Debug, Encode, Decode, BorshSerialize, BorshDeserialize, - )] - pub enum SubstrateSignableId { - CosigningSubstrateBlock([u8; 32]), - Batch(u32), - SlashReport, - } - - #[derive(Clone, PartialEq, Eq, Hash, Debug, Encode, Decode, BorshSerialize, BorshDeserialize)] - pub struct SubstrateSignId { - pub session: Session, - pub id: SubstrateSignableId, - pub attempt: u32, - } - #[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)] pub enum CoordinatorMessage { - CosignSubstrateBlock { id: SubstrateSignId, block_number: u64 }, - SignSlashReport { id: SubstrateSignId, report: Vec<([u8; 32], u32)> }, - } - - impl CoordinatorMessage { - // The Coordinator will only send Batch messages once the Batch ID has been recognized - // The ID will only be recognized when the block is acknowledged by a super-majority of the - // network *and the local node* - // This synchrony obtained lets us ignore the synchrony requirement offered here - pub fn required_block(&self) -> Option { - None - } + CosignSubstrateBlock { session: Session, block_number: u64, block: [u8; 32] }, + SignSlashReport { session: Session, report: Vec<([u8; 32], u32)> }, } #[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)] @@ -209,14 +174,9 @@ pub mod coordinator { #[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)] pub enum ProcessorMessage { - SubstrateBlockAck { block: u64, plans: Vec }, - InvalidParticipant { id: SubstrateSignId, participant: Participant }, - CosignPreprocess { id: SubstrateSignId, preprocesses: Vec<[u8; 64]> }, - // TODO: Remove BatchPreprocess? Why does this take a BlockHash here and not in its - // SubstrateSignId? - BatchPreprocess { id: SubstrateSignId, block: BlockHash, preprocesses: Vec<[u8; 64]> }, - // TODO: Make these signatures [u8; 64]? CosignedBlock { block_number: u64, block: [u8; 32], signature: Vec }, + SignedBatch { batch: SignedBatch }, + SubstrateBlockAck { block: u64, plans: Vec }, SignedSlashReport { session: Session, signature: Vec }, } } @@ -226,33 +186,23 @@ pub mod substrate { #[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)] pub enum CoordinatorMessage { - ConfirmKeyPair { - context: SubstrateContext, - session: Session, - key_pair: KeyPair, - }, - SubstrateBlock { - context: SubstrateContext, + /// Keys set on the Serai network. + SetKeys { serai_time: u64, session: Session, key_pair: KeyPair }, + /// The data from a block which acknowledged a Batch. + BlockWithBatchAcknowledgement { block: u64, + batch_id: u32, + in_instruction_succeededs: Vec, burns: Vec, - batches: Vec, + key_to_activate: Option, }, - } - - impl CoordinatorMessage { - pub fn required_block(&self) -> Option { - let context = match self { - CoordinatorMessage::ConfirmKeyPair { context, .. } | - CoordinatorMessage::SubstrateBlock { context, .. } => context, - }; - Some(context.network_latest_finalized_block) - } + /// The data from a block which didn't acknowledge a Batch. + BlockWithoutBatchAcknowledgement { block: u64, burns: Vec }, } #[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)] pub enum ProcessorMessage { Batch { batch: Batch }, - SignedBatch { batch: SignedBatch }, } } @@ -279,24 +229,6 @@ impl_from!(sign, CoordinatorMessage, Sign); impl_from!(coordinator, CoordinatorMessage, Coordinator); impl_from!(substrate, CoordinatorMessage, Substrate); -impl CoordinatorMessage { - pub fn required_block(&self) -> Option { - let required = match self { - CoordinatorMessage::KeyGen(msg) => msg.required_block(), - CoordinatorMessage::Sign(msg) => msg.required_block(), - CoordinatorMessage::Coordinator(msg) => msg.required_block(), - CoordinatorMessage::Substrate(msg) => msg.required_block(), - }; - - // 0 is used when Serai hasn't acknowledged *any* block for this network, which also means - // there's no need to wait for the block in question - if required == Some(BlockHash([0; 32])) { - return None; - } - required - } -} - #[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)] pub enum ProcessorMessage { KeyGen(key_gen::ProcessorMessage), @@ -315,10 +247,10 @@ impl_from!(substrate, ProcessorMessage, Substrate); const COORDINATOR_UID: u8 = 0; const PROCESSOR_UID: u8 = 1; -const TYPE_KEY_GEN_UID: u8 = 2; -const TYPE_SIGN_UID: u8 = 3; -const TYPE_COORDINATOR_UID: u8 = 4; -const TYPE_SUBSTRATE_UID: u8 = 5; +const TYPE_KEY_GEN_UID: u8 = 0; +const TYPE_SIGN_UID: u8 = 1; +const TYPE_COORDINATOR_UID: u8 = 2; +const TYPE_SUBSTRATE_UID: u8 = 3; impl CoordinatorMessage { /// The intent for this message, which should be unique across the validator's entire system, @@ -359,11 +291,12 @@ impl CoordinatorMessage { } CoordinatorMessage::Coordinator(msg) => { let (sub, id) = match msg { - // Unique since this ID contains the hash of the block being cosigned - coordinator::CoordinatorMessage::CosignSubstrateBlock { id, .. } => (0, id.encode()), - // Unique since there's only one of these per session/attempt, and ID is inclusive to - // both - coordinator::CoordinatorMessage::SignSlashReport { id, .. } => (1, id.encode()), + // We only cosign a block once, and Reattempt is a separate message + coordinator::CoordinatorMessage::CosignSubstrateBlock { block_number, .. } => { + (0, block_number.encode()) + } + // We only sign one slash report, and Reattempt is a separate message + coordinator::CoordinatorMessage::SignSlashReport { session, .. } => (1, session.encode()), }; let mut res = vec![COORDINATOR_UID, TYPE_COORDINATOR_UID, sub]; @@ -372,9 +305,13 @@ impl CoordinatorMessage { } CoordinatorMessage::Substrate(msg) => { let (sub, id) = match msg { - // Unique since there's only one key pair for a session - substrate::CoordinatorMessage::ConfirmKeyPair { session, .. } => (0, session.encode()), - substrate::CoordinatorMessage::SubstrateBlock { block, .. } => (1, block.encode()), + substrate::CoordinatorMessage::SetKeys { session, .. } => (0, session.encode()), + substrate::CoordinatorMessage::BlockWithBatchAcknowledgement { block, .. } => { + (1, block.encode()) + } + substrate::CoordinatorMessage::BlockWithoutBatchAcknowledgement { block, .. } => { + (2, block.encode()) + } }; let mut res = vec![COORDINATOR_UID, TYPE_SUBSTRATE_UID, sub]; @@ -430,14 +367,10 @@ impl ProcessorMessage { } ProcessorMessage::Coordinator(msg) => { let (sub, id) = match msg { - coordinator::ProcessorMessage::SubstrateBlockAck { block, .. } => (0, block.encode()), - // Unique since SubstrateSignId - coordinator::ProcessorMessage::InvalidParticipant { id, .. } => (1, id.encode()), - coordinator::ProcessorMessage::CosignPreprocess { id, .. } => (2, id.encode()), - coordinator::ProcessorMessage::BatchPreprocess { id, .. } => (3, id.encode()), - // Unique since only one instance of a signature matters - coordinator::ProcessorMessage::CosignedBlock { block, .. } => (4, block.encode()), - coordinator::ProcessorMessage::SignedSlashReport { .. } => (5, vec![]), + coordinator::ProcessorMessage::CosignedBlock { block, .. } => (0, block.encode()), + coordinator::ProcessorMessage::SignedBatch { batch, .. } => (1, batch.batch.id.encode()), + coordinator::ProcessorMessage::SubstrateBlockAck { block, .. } => (2, block.encode()), + coordinator::ProcessorMessage::SignedSlashReport { session, .. } => (3, session.encode()), }; let mut res = vec![PROCESSOR_UID, TYPE_COORDINATOR_UID, sub]; @@ -446,11 +379,7 @@ impl ProcessorMessage { } ProcessorMessage::Substrate(msg) => { let (sub, id) = match msg { - // Unique since network and ID binding - substrate::ProcessorMessage::Batch { batch } => (0, (batch.network, batch.id).encode()), - substrate::ProcessorMessage::SignedBatch { batch, .. } => { - (1, (batch.batch.network, batch.batch.id).encode()) - } + substrate::ProcessorMessage::Batch { batch } => (0, batch.id.encode()), }; let mut res = vec![PROCESSOR_UID, TYPE_SUBSTRATE_UID, sub]; diff --git a/processor/scanner/src/db.rs b/processor/scanner/src/db.rs index 3dd5a2e2..52a36419 100644 --- a/processor/scanner/src/db.rs +++ b/processor/scanner/src/db.rs @@ -525,12 +525,19 @@ db_channel! { pub(crate) struct SubstrateToEventualityDb; impl SubstrateToEventualityDb { - pub(crate) fn send_burns( + pub(crate) fn send_burns( txn: &mut impl DbTxn, acknowledged_block: u64, - burns: &Vec, + burns: Vec, ) { - Burns::send(txn, acknowledged_block, burns); + // Drop burns less than the dust + let burns = burns + .into_iter() + .filter(|burn| burn.balance.amount.0 >= S::dust(burn.balance.coin).0) + .collect::>(); + if !burns.is_empty() { + Burns::send(txn, acknowledged_block, &burns); + } } pub(crate) fn try_recv_burns( @@ -548,6 +555,7 @@ mod _public_db { db_channel! { ScannerPublic { + Batches: (empty_key: ()) -> Batch, BatchesToSign: (key: &[u8]) -> Batch, AcknowledgedBatches: (key: &[u8]) -> u32, CompletedEventualities: (key: &[u8]) -> [u8; 32], @@ -555,7 +563,24 @@ mod _public_db { } } +/// The batches to publish. +/// +/// This is used for auditing the Batches published to Serai. +pub struct Batches; +impl Batches { + pub(crate) fn send(txn: &mut impl DbTxn, batch: &Batch) { + _public_db::Batches::send(txn, (), batch); + } + + /// Receive a batch to publish. + pub fn try_recv(txn: &mut impl DbTxn) -> Option { + _public_db::Batches::try_recv(txn, ()) + } +} + /// The batches to sign and publish. +/// +/// This is used for publishing Batches onto Serai. pub struct BatchesToSign(PhantomData); impl BatchesToSign { pub(crate) fn send(txn: &mut impl DbTxn, key: &K, batch: &Batch) { diff --git a/processor/scanner/src/report/mod.rs b/processor/scanner/src/report/mod.rs index 309b44aa..5fd2c7eb 100644 --- a/processor/scanner/src/report/mod.rs +++ b/processor/scanner/src/report/mod.rs @@ -8,7 +8,7 @@ use serai_in_instructions_primitives::{MAX_BATCH_SIZE, Batch}; use primitives::task::ContinuallyRan; use crate::{ - db::{Returnable, ScannerGlobalDb, InInstructionData, ScanToReportDb, BatchesToSign}, + db::{Returnable, ScannerGlobalDb, InInstructionData, ScanToReportDb, Batches, BatchesToSign}, index, scan::next_to_scan_for_outputs_block, ScannerFeed, KeyFor, @@ -160,6 +160,7 @@ impl ContinuallyRan for ReportTask { } for batch in batches { + Batches::send(&mut txn, &batch); BatchesToSign::send(&mut txn, &external_key_for_session_to_sign_batch, &batch); } } diff --git a/processor/scanner/src/substrate/mod.rs b/processor/scanner/src/substrate/mod.rs index 76961c37..fc97daf3 100644 --- a/processor/scanner/src/substrate/mod.rs +++ b/processor/scanner/src/substrate/mod.rs @@ -144,16 +144,9 @@ impl ContinuallyRan for SubstrateTask { } } - // Drop burns less than the dust - let burns = burns - .into_iter() - .filter(|burn| burn.balance.amount.0 >= S::dust(burn.balance.coin).0) - .collect::>(); - if !burns.is_empty() { - // We send these Burns as stemming from this block we just acknowledged - // This causes them to be acted on after we accumulate the outputs from this block - SubstrateToEventualityDb::send_burns(&mut txn, block_number, &burns); - } + // We send these Burns as stemming from this block we just acknowledged + // This causes them to be acted on after we accumulate the outputs from this block + SubstrateToEventualityDb::send_burns::(&mut txn, block_number, burns); } Action::QueueBurns(burns) => { @@ -163,7 +156,7 @@ impl ContinuallyRan for SubstrateTask { let queue_as_of = ScannerGlobalDb::::highest_acknowledged_block(&txn) .expect("queueing Burns yet never acknowledged a block"); - SubstrateToEventualityDb::send_burns(&mut txn, queue_as_of, &burns); + SubstrateToEventualityDb::send_burns::(&mut txn, queue_as_of, burns); } } diff --git a/processor/signers/src/coordinator/mod.rs b/processor/signers/src/coordinator/mod.rs index 3255603d..77cdef59 100644 --- a/processor/signers/src/coordinator/mod.rs +++ b/processor/signers/src/coordinator/mod.rs @@ -95,6 +95,20 @@ impl ContinuallyRan for CoordinatorTask { } } + // Publish the Batches + { + let mut txn = self.db.txn(); + while let Some(batch) = scanner::Batches::try_recv(&mut txn) { + iterated = true; + self + .coordinator + .publish_batch(batch) + .await + .map_err(|e| format!("couldn't publish Batch: {e:?}"))?; + } + txn.commit(); + } + // Publish the signed Batches { let mut txn = self.db.txn(); @@ -108,7 +122,7 @@ impl ContinuallyRan for CoordinatorTask { db::LastPublishedBatch::set(&mut txn, &batch.batch.id); self .coordinator - .publish_batch(batch) + .publish_signed_batch(batch) .await .map_err(|e| format!("couldn't publish Batch: {e:?}"))?; next_batch += 1; diff --git a/processor/signers/src/lib.rs b/processor/signers/src/lib.rs index 024badfa..36e2db2e 100644 --- a/processor/signers/src/lib.rs +++ b/processor/signers/src/lib.rs @@ -46,8 +46,11 @@ pub trait Coordinator: 'static + Send + Sync { /// Send a `messages::sign::ProcessorMessage`. async fn send(&mut self, message: ProcessorMessage) -> Result<(), Self::EphemeralError>; + /// Publish a `Batch`. + async fn publish_batch(&mut self, batch: Batch) -> Result<(), Self::EphemeralError>; + /// Publish a `SignedBatch`. - async fn publish_batch(&mut self, batch: SignedBatch) -> Result<(), Self::EphemeralError>; + async fn publish_signed_batch(&mut self, batch: SignedBatch) -> Result<(), Self::EphemeralError>; } /// An object capable of publishing a transaction.