Tidy messages, publish all Batches to the coordinator

Prior, we published SignedBatches, yet Batches are necessary for auditing
purposes.
This commit is contained in:
Luke Parker 2024-09-09 03:06:37 -04:00
parent a3cb514400
commit 0078858c1c
6 changed files with 89 additions and 124 deletions

View file

@ -46,12 +46,6 @@ pub mod key_gen {
} }
} }
impl CoordinatorMessage {
pub fn required_block(&self) -> Option<BlockHash> {
None
}
}
#[derive(Clone, PartialEq, Eq, BorshSerialize, BorshDeserialize)] #[derive(Clone, PartialEq, Eq, BorshSerialize, BorshDeserialize)]
pub enum ProcessorMessage { pub enum ProcessorMessage {
// Participated in the specified key generation protocol. // Participated in the specified key generation protocol.
@ -133,10 +127,6 @@ pub mod sign {
} }
impl CoordinatorMessage { impl CoordinatorMessage {
pub fn required_block(&self) -> Option<BlockHash> {
None
}
pub fn sign_id(&self) -> &SignId { pub fn sign_id(&self) -> &SignId {
match self { match self {
CoordinatorMessage::Preprocesses { id, .. } | CoordinatorMessage::Preprocesses { id, .. } |
@ -160,6 +150,7 @@ pub mod sign {
pub mod coordinator { pub mod coordinator {
use super::*; use super::*;
// TODO: Why does this not simply take the block hash?
pub fn cosign_block_msg(block_number: u64, block: [u8; 32]) -> Vec<u8> { pub fn cosign_block_msg(block_number: u64, block: [u8; 32]) -> Vec<u8> {
const DST: &[u8] = b"Cosign"; const DST: &[u8] = b"Cosign";
let mut res = vec![u8::try_from(DST.len()).unwrap()]; let mut res = vec![u8::try_from(DST.len()).unwrap()];
@ -169,36 +160,10 @@ pub mod coordinator {
res res
} }
#[derive(
Clone, Copy, PartialEq, Eq, Hash, Debug, Encode, Decode, BorshSerialize, BorshDeserialize,
)]
pub enum SubstrateSignableId {
CosigningSubstrateBlock([u8; 32]),
Batch(u32),
SlashReport,
}
#[derive(Clone, PartialEq, Eq, Hash, Debug, Encode, Decode, BorshSerialize, BorshDeserialize)]
pub struct SubstrateSignId {
pub session: Session,
pub id: SubstrateSignableId,
pub attempt: u32,
}
#[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)] #[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)]
pub enum CoordinatorMessage { pub enum CoordinatorMessage {
CosignSubstrateBlock { id: SubstrateSignId, block_number: u64 }, CosignSubstrateBlock { session: Session, block_number: u64, block: [u8; 32] },
SignSlashReport { id: SubstrateSignId, report: Vec<([u8; 32], u32)> }, SignSlashReport { session: Session, report: Vec<([u8; 32], u32)> },
}
impl CoordinatorMessage {
// The Coordinator will only send Batch messages once the Batch ID has been recognized
// The ID will only be recognized when the block is acknowledged by a super-majority of the
// network *and the local node*
// This synchrony obtained lets us ignore the synchrony requirement offered here
pub fn required_block(&self) -> Option<BlockHash> {
None
}
} }
#[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)] #[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)]
@ -209,14 +174,9 @@ pub mod coordinator {
#[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)] #[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)]
pub enum ProcessorMessage { pub enum ProcessorMessage {
SubstrateBlockAck { block: u64, plans: Vec<PlanMeta> },
InvalidParticipant { id: SubstrateSignId, participant: Participant },
CosignPreprocess { id: SubstrateSignId, preprocesses: Vec<[u8; 64]> },
// TODO: Remove BatchPreprocess? Why does this take a BlockHash here and not in its
// SubstrateSignId?
BatchPreprocess { id: SubstrateSignId, block: BlockHash, preprocesses: Vec<[u8; 64]> },
// TODO: Make these signatures [u8; 64]?
CosignedBlock { block_number: u64, block: [u8; 32], signature: Vec<u8> }, CosignedBlock { block_number: u64, block: [u8; 32], signature: Vec<u8> },
SignedBatch { batch: SignedBatch },
SubstrateBlockAck { block: u64, plans: Vec<PlanMeta> },
SignedSlashReport { session: Session, signature: Vec<u8> }, SignedSlashReport { session: Session, signature: Vec<u8> },
} }
} }
@ -226,33 +186,23 @@ pub mod substrate {
#[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)] #[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)]
pub enum CoordinatorMessage { pub enum CoordinatorMessage {
ConfirmKeyPair { /// Keys set on the Serai network.
context: SubstrateContext, SetKeys { serai_time: u64, session: Session, key_pair: KeyPair },
session: Session, /// The data from a block which acknowledged a Batch.
key_pair: KeyPair, BlockWithBatchAcknowledgement {
},
SubstrateBlock {
context: SubstrateContext,
block: u64, block: u64,
batch_id: u32,
in_instruction_succeededs: Vec<bool>,
burns: Vec<OutInstructionWithBalance>, burns: Vec<OutInstructionWithBalance>,
batches: Vec<u32>, key_to_activate: Option<KeyPair>,
}, },
} /// The data from a block which didn't acknowledge a Batch.
BlockWithoutBatchAcknowledgement { block: u64, burns: Vec<OutInstructionWithBalance> },
impl CoordinatorMessage {
pub fn required_block(&self) -> Option<BlockHash> {
let context = match self {
CoordinatorMessage::ConfirmKeyPair { context, .. } |
CoordinatorMessage::SubstrateBlock { context, .. } => context,
};
Some(context.network_latest_finalized_block)
}
} }
#[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)] #[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)]
pub enum ProcessorMessage { pub enum ProcessorMessage {
Batch { batch: Batch }, Batch { batch: Batch },
SignedBatch { batch: SignedBatch },
} }
} }
@ -279,24 +229,6 @@ impl_from!(sign, CoordinatorMessage, Sign);
impl_from!(coordinator, CoordinatorMessage, Coordinator); impl_from!(coordinator, CoordinatorMessage, Coordinator);
impl_from!(substrate, CoordinatorMessage, Substrate); impl_from!(substrate, CoordinatorMessage, Substrate);
impl CoordinatorMessage {
pub fn required_block(&self) -> Option<BlockHash> {
let required = match self {
CoordinatorMessage::KeyGen(msg) => msg.required_block(),
CoordinatorMessage::Sign(msg) => msg.required_block(),
CoordinatorMessage::Coordinator(msg) => msg.required_block(),
CoordinatorMessage::Substrate(msg) => msg.required_block(),
};
// 0 is used when Serai hasn't acknowledged *any* block for this network, which also means
// there's no need to wait for the block in question
if required == Some(BlockHash([0; 32])) {
return None;
}
required
}
}
#[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)] #[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)]
pub enum ProcessorMessage { pub enum ProcessorMessage {
KeyGen(key_gen::ProcessorMessage), KeyGen(key_gen::ProcessorMessage),
@ -315,10 +247,10 @@ impl_from!(substrate, ProcessorMessage, Substrate);
const COORDINATOR_UID: u8 = 0; const COORDINATOR_UID: u8 = 0;
const PROCESSOR_UID: u8 = 1; const PROCESSOR_UID: u8 = 1;
const TYPE_KEY_GEN_UID: u8 = 2; const TYPE_KEY_GEN_UID: u8 = 0;
const TYPE_SIGN_UID: u8 = 3; const TYPE_SIGN_UID: u8 = 1;
const TYPE_COORDINATOR_UID: u8 = 4; const TYPE_COORDINATOR_UID: u8 = 2;
const TYPE_SUBSTRATE_UID: u8 = 5; const TYPE_SUBSTRATE_UID: u8 = 3;
impl CoordinatorMessage { impl CoordinatorMessage {
/// The intent for this message, which should be unique across the validator's entire system, /// The intent for this message, which should be unique across the validator's entire system,
@ -359,11 +291,12 @@ impl CoordinatorMessage {
} }
CoordinatorMessage::Coordinator(msg) => { CoordinatorMessage::Coordinator(msg) => {
let (sub, id) = match msg { let (sub, id) = match msg {
// Unique since this ID contains the hash of the block being cosigned // We only cosign a block once, and Reattempt is a separate message
coordinator::CoordinatorMessage::CosignSubstrateBlock { id, .. } => (0, id.encode()), coordinator::CoordinatorMessage::CosignSubstrateBlock { block_number, .. } => {
// Unique since there's only one of these per session/attempt, and ID is inclusive to (0, block_number.encode())
// both }
coordinator::CoordinatorMessage::SignSlashReport { id, .. } => (1, id.encode()), // We only sign one slash report, and Reattempt is a separate message
coordinator::CoordinatorMessage::SignSlashReport { session, .. } => (1, session.encode()),
}; };
let mut res = vec![COORDINATOR_UID, TYPE_COORDINATOR_UID, sub]; let mut res = vec![COORDINATOR_UID, TYPE_COORDINATOR_UID, sub];
@ -372,9 +305,13 @@ impl CoordinatorMessage {
} }
CoordinatorMessage::Substrate(msg) => { CoordinatorMessage::Substrate(msg) => {
let (sub, id) = match msg { let (sub, id) = match msg {
// Unique since there's only one key pair for a session substrate::CoordinatorMessage::SetKeys { session, .. } => (0, session.encode()),
substrate::CoordinatorMessage::ConfirmKeyPair { session, .. } => (0, session.encode()), substrate::CoordinatorMessage::BlockWithBatchAcknowledgement { block, .. } => {
substrate::CoordinatorMessage::SubstrateBlock { block, .. } => (1, block.encode()), (1, block.encode())
}
substrate::CoordinatorMessage::BlockWithoutBatchAcknowledgement { block, .. } => {
(2, block.encode())
}
}; };
let mut res = vec![COORDINATOR_UID, TYPE_SUBSTRATE_UID, sub]; let mut res = vec![COORDINATOR_UID, TYPE_SUBSTRATE_UID, sub];
@ -430,14 +367,10 @@ impl ProcessorMessage {
} }
ProcessorMessage::Coordinator(msg) => { ProcessorMessage::Coordinator(msg) => {
let (sub, id) = match msg { let (sub, id) = match msg {
coordinator::ProcessorMessage::SubstrateBlockAck { block, .. } => (0, block.encode()), coordinator::ProcessorMessage::CosignedBlock { block, .. } => (0, block.encode()),
// Unique since SubstrateSignId coordinator::ProcessorMessage::SignedBatch { batch, .. } => (1, batch.batch.id.encode()),
coordinator::ProcessorMessage::InvalidParticipant { id, .. } => (1, id.encode()), coordinator::ProcessorMessage::SubstrateBlockAck { block, .. } => (2, block.encode()),
coordinator::ProcessorMessage::CosignPreprocess { id, .. } => (2, id.encode()), coordinator::ProcessorMessage::SignedSlashReport { session, .. } => (3, session.encode()),
coordinator::ProcessorMessage::BatchPreprocess { id, .. } => (3, id.encode()),
// Unique since only one instance of a signature matters
coordinator::ProcessorMessage::CosignedBlock { block, .. } => (4, block.encode()),
coordinator::ProcessorMessage::SignedSlashReport { .. } => (5, vec![]),
}; };
let mut res = vec![PROCESSOR_UID, TYPE_COORDINATOR_UID, sub]; let mut res = vec![PROCESSOR_UID, TYPE_COORDINATOR_UID, sub];
@ -446,11 +379,7 @@ impl ProcessorMessage {
} }
ProcessorMessage::Substrate(msg) => { ProcessorMessage::Substrate(msg) => {
let (sub, id) = match msg { let (sub, id) = match msg {
// Unique since network and ID binding substrate::ProcessorMessage::Batch { batch } => (0, batch.id.encode()),
substrate::ProcessorMessage::Batch { batch } => (0, (batch.network, batch.id).encode()),
substrate::ProcessorMessage::SignedBatch { batch, .. } => {
(1, (batch.batch.network, batch.batch.id).encode())
}
}; };
let mut res = vec![PROCESSOR_UID, TYPE_SUBSTRATE_UID, sub]; let mut res = vec![PROCESSOR_UID, TYPE_SUBSTRATE_UID, sub];

View file

@ -525,12 +525,19 @@ db_channel! {
pub(crate) struct SubstrateToEventualityDb; pub(crate) struct SubstrateToEventualityDb;
impl SubstrateToEventualityDb { impl SubstrateToEventualityDb {
pub(crate) fn send_burns( pub(crate) fn send_burns<S: ScannerFeed>(
txn: &mut impl DbTxn, txn: &mut impl DbTxn,
acknowledged_block: u64, acknowledged_block: u64,
burns: &Vec<OutInstructionWithBalance>, burns: Vec<OutInstructionWithBalance>,
) { ) {
Burns::send(txn, acknowledged_block, burns); // Drop burns less than the dust
let burns = burns
.into_iter()
.filter(|burn| burn.balance.amount.0 >= S::dust(burn.balance.coin).0)
.collect::<Vec<_>>();
if !burns.is_empty() {
Burns::send(txn, acknowledged_block, &burns);
}
} }
pub(crate) fn try_recv_burns( pub(crate) fn try_recv_burns(
@ -548,6 +555,7 @@ mod _public_db {
db_channel! { db_channel! {
ScannerPublic { ScannerPublic {
Batches: (empty_key: ()) -> Batch,
BatchesToSign: (key: &[u8]) -> Batch, BatchesToSign: (key: &[u8]) -> Batch,
AcknowledgedBatches: (key: &[u8]) -> u32, AcknowledgedBatches: (key: &[u8]) -> u32,
CompletedEventualities: (key: &[u8]) -> [u8; 32], CompletedEventualities: (key: &[u8]) -> [u8; 32],
@ -555,7 +563,24 @@ mod _public_db {
} }
} }
/// The batches to publish.
///
/// This is used for auditing the Batches published to Serai.
pub struct Batches;
impl Batches {
pub(crate) fn send(txn: &mut impl DbTxn, batch: &Batch) {
_public_db::Batches::send(txn, (), batch);
}
/// Receive a batch to publish.
pub fn try_recv(txn: &mut impl DbTxn) -> Option<Batch> {
_public_db::Batches::try_recv(txn, ())
}
}
/// The batches to sign and publish. /// The batches to sign and publish.
///
/// This is used for publishing Batches onto Serai.
pub struct BatchesToSign<K: GroupEncoding>(PhantomData<K>); pub struct BatchesToSign<K: GroupEncoding>(PhantomData<K>);
impl<K: GroupEncoding> BatchesToSign<K> { impl<K: GroupEncoding> BatchesToSign<K> {
pub(crate) fn send(txn: &mut impl DbTxn, key: &K, batch: &Batch) { pub(crate) fn send(txn: &mut impl DbTxn, key: &K, batch: &Batch) {

View file

@ -8,7 +8,7 @@ use serai_in_instructions_primitives::{MAX_BATCH_SIZE, Batch};
use primitives::task::ContinuallyRan; use primitives::task::ContinuallyRan;
use crate::{ use crate::{
db::{Returnable, ScannerGlobalDb, InInstructionData, ScanToReportDb, BatchesToSign}, db::{Returnable, ScannerGlobalDb, InInstructionData, ScanToReportDb, Batches, BatchesToSign},
index, index,
scan::next_to_scan_for_outputs_block, scan::next_to_scan_for_outputs_block,
ScannerFeed, KeyFor, ScannerFeed, KeyFor,
@ -160,6 +160,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ReportTask<D, S> {
} }
for batch in batches { for batch in batches {
Batches::send(&mut txn, &batch);
BatchesToSign::send(&mut txn, &external_key_for_session_to_sign_batch, &batch); BatchesToSign::send(&mut txn, &external_key_for_session_to_sign_batch, &batch);
} }
} }

View file

@ -144,16 +144,9 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for SubstrateTask<D, S> {
} }
} }
// Drop burns less than the dust
let burns = burns
.into_iter()
.filter(|burn| burn.balance.amount.0 >= S::dust(burn.balance.coin).0)
.collect::<Vec<_>>();
if !burns.is_empty() {
// We send these Burns as stemming from this block we just acknowledged // We send these Burns as stemming from this block we just acknowledged
// This causes them to be acted on after we accumulate the outputs from this block // This causes them to be acted on after we accumulate the outputs from this block
SubstrateToEventualityDb::send_burns(&mut txn, block_number, &burns); SubstrateToEventualityDb::send_burns::<S>(&mut txn, block_number, burns);
}
} }
Action::QueueBurns(burns) => { Action::QueueBurns(burns) => {
@ -163,7 +156,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for SubstrateTask<D, S> {
let queue_as_of = ScannerGlobalDb::<S>::highest_acknowledged_block(&txn) let queue_as_of = ScannerGlobalDb::<S>::highest_acknowledged_block(&txn)
.expect("queueing Burns yet never acknowledged a block"); .expect("queueing Burns yet never acknowledged a block");
SubstrateToEventualityDb::send_burns(&mut txn, queue_as_of, &burns); SubstrateToEventualityDb::send_burns::<S>(&mut txn, queue_as_of, burns);
} }
} }

View file

@ -95,6 +95,20 @@ impl<D: Db, C: Coordinator> ContinuallyRan for CoordinatorTask<D, C> {
} }
} }
// Publish the Batches
{
let mut txn = self.db.txn();
while let Some(batch) = scanner::Batches::try_recv(&mut txn) {
iterated = true;
self
.coordinator
.publish_batch(batch)
.await
.map_err(|e| format!("couldn't publish Batch: {e:?}"))?;
}
txn.commit();
}
// Publish the signed Batches // Publish the signed Batches
{ {
let mut txn = self.db.txn(); let mut txn = self.db.txn();
@ -108,7 +122,7 @@ impl<D: Db, C: Coordinator> ContinuallyRan for CoordinatorTask<D, C> {
db::LastPublishedBatch::set(&mut txn, &batch.batch.id); db::LastPublishedBatch::set(&mut txn, &batch.batch.id);
self self
.coordinator .coordinator
.publish_batch(batch) .publish_signed_batch(batch)
.await .await
.map_err(|e| format!("couldn't publish Batch: {e:?}"))?; .map_err(|e| format!("couldn't publish Batch: {e:?}"))?;
next_batch += 1; next_batch += 1;

View file

@ -46,8 +46,11 @@ pub trait Coordinator: 'static + Send + Sync {
/// Send a `messages::sign::ProcessorMessage`. /// Send a `messages::sign::ProcessorMessage`.
async fn send(&mut self, message: ProcessorMessage) -> Result<(), Self::EphemeralError>; async fn send(&mut self, message: ProcessorMessage) -> Result<(), Self::EphemeralError>;
/// Publish a `Batch`.
async fn publish_batch(&mut self, batch: Batch) -> Result<(), Self::EphemeralError>;
/// Publish a `SignedBatch`. /// Publish a `SignedBatch`.
async fn publish_batch(&mut self, batch: SignedBatch) -> Result<(), Self::EphemeralError>; async fn publish_signed_batch(&mut self, batch: SignedBatch) -> Result<(), Self::EphemeralError>;
} }
/// An object capable of publishing a transaction. /// An object capable of publishing a transaction.