diff --git a/processor/signers/src/batch/db.rs b/processor/signers/src/batch/db.rs index fec0a894..a895e0bb 100644 --- a/processor/signers/src/batch/db.rs +++ b/processor/signers/src/batch/db.rs @@ -4,7 +4,7 @@ use serai_in_instructions_primitives::{Batch, SignedBatch}; use serai_db::{Get, DbTxn, create_db}; create_db! { - BatchSigner { + SignersBatch { ActiveSigningProtocols: (session: Session) -> Vec, Batches: (id: u32) -> Batch, SignedBatches: (id: u32) -> SignedBatch, diff --git a/processor/signers/src/batch/mod.rs b/processor/signers/src/batch/mod.rs index 410ca378..f08fb5e2 100644 --- a/processor/signers/src/batch/mod.rs +++ b/processor/signers/src/batch/mod.rs @@ -6,7 +6,7 @@ use frost::dkg::ThresholdKeys; use serai_validator_sets_primitives::Session; use serai_in_instructions_primitives::{SignedBatch, batch_message}; -use serai_db::{DbTxn, Db}; +use serai_db::{Get, DbTxn, Db}; use messages::sign::VariantSignId; @@ -23,6 +23,14 @@ use crate::{ mod db; use db::*; +pub(crate) fn last_acknowledged_batch(getter: &impl Get) -> Option { + LastAcknowledgedBatch::get(getter) +} + +pub(crate) fn signed_batch(getter: &impl Get, id: u32) -> Option { + SignedBatches::get(getter, id) +} + // Fetches batches to sign and signs them. pub(crate) struct BatchSignerTask { db: D, diff --git a/processor/signers/src/coordinator/db.rs b/processor/signers/src/coordinator/db.rs new file mode 100644 index 00000000..c8235ede --- /dev/null +++ b/processor/signers/src/coordinator/db.rs @@ -0,0 +1,7 @@ +use serai_db::{Get, DbTxn, create_db}; + +create_db! { + SignersCoordinator { + LastPublishedBatch: () -> u32, + } +} diff --git a/processor/signers/src/coordinator.rs b/processor/signers/src/coordinator/mod.rs similarity index 75% rename from processor/signers/src/coordinator.rs rename to processor/signers/src/coordinator/mod.rs index c87dc4bb..3255603d 100644 --- a/processor/signers/src/coordinator.rs +++ b/processor/signers/src/coordinator/mod.rs @@ -10,6 +10,8 @@ use crate::{ Coordinator, }; +mod db; + // Fetches messages to send the coordinator and sends them. pub(crate) struct CoordinatorTask { db: D, @@ -93,7 +95,26 @@ impl ContinuallyRan for CoordinatorTask { } } - // TODO: For max(last acknowledged batch, last published batch) onwards, publish every batch + // Publish the signed Batches + { + let mut txn = self.db.txn(); + // The last acknowledged Batch may exceed the last Batch we published if we didn't sign for + // the prior Batch(es) (and accordingly didn't publish them) + let last_batch = + crate::batch::last_acknowledged_batch(&txn).max(db::LastPublishedBatch::get(&txn)); + let mut next_batch = last_batch.map_or(0, |id| id + 1); + while let Some(batch) = crate::batch::signed_batch(&txn, next_batch) { + iterated = true; + db::LastPublishedBatch::set(&mut txn, &batch.batch.id); + self + .coordinator + .publish_batch(batch) + .await + .map_err(|e| format!("couldn't publish Batch: {e:?}"))?; + next_batch += 1; + } + txn.commit(); + } Ok(iterated) } diff --git a/processor/signers/src/lib.rs b/processor/signers/src/lib.rs index def6ef16..024badfa 100644 --- a/processor/signers/src/lib.rs +++ b/processor/signers/src/lib.rs @@ -169,7 +169,7 @@ impl Signers { .push(ThresholdKeys::from(ThresholdCore::::read(&mut buf).unwrap())); } - // TODO: Batch signer, cosigner, slash report signers + // TODO: Cosigner and slash report signers let (batch_task, batch_handle) = Task::new(); tokio::spawn( diff --git a/processor/signers/src/transaction/db.rs b/processor/signers/src/transaction/db.rs index b77d38c7..a91881e7 100644 --- a/processor/signers/src/transaction/db.rs +++ b/processor/signers/src/transaction/db.rs @@ -3,7 +3,7 @@ use serai_validator_sets_primitives::Session; use serai_db::{Get, DbTxn, create_db}; create_db! { - TransactionSigner { + SignersTransaction { ActiveSigningProtocols: (session: Session) -> Vec<[u8; 32]>, SerializedSignableTransactions: (id: [u8; 32]) -> Vec, SerializedTransactions: (id: [u8; 32]) -> Vec,