Add BatchSignerTask

Uses a wrapper around AlgorithmMachine Schnorrkel to let the message be &[].
This commit is contained in:
Luke Parker 2024-09-09 01:01:29 -04:00
parent 4152bcacb2
commit ed0221d804
13 changed files with 371 additions and 51 deletions

3
Cargo.lock generated
View file

@ -8724,10 +8724,13 @@ dependencies = [
"async-trait",
"borsh",
"ciphersuite",
"frost-schnorrkel",
"log",
"modular-frost",
"parity-scale-codec",
"rand_core",
"serai-db",
"serai-in-instructions-primitives",
"serai-processor-frost-attempt-manager",
"serai-processor-messages",
"serai-processor-primitives",

View file

@ -548,36 +548,36 @@ mod _public_db {
db_channel! {
ScannerPublic {
BatchToSign: (key: &[u8]) -> Batch,
AcknowledgedBatch: (key: &[u8]) -> u32,
BatchesToSign: (key: &[u8]) -> Batch,
AcknowledgedBatches: (key: &[u8]) -> u32,
CompletedEventualities: (key: &[u8]) -> [u8; 32],
}
}
}
/// The batches to sign and publish.
pub struct BatchToSign<K: GroupEncoding>(PhantomData<K>);
impl<K: GroupEncoding> BatchToSign<K> {
pub struct BatchesToSign<K: GroupEncoding>(PhantomData<K>);
impl<K: GroupEncoding> BatchesToSign<K> {
pub(crate) fn send(txn: &mut impl DbTxn, key: &K, batch: &Batch) {
_public_db::BatchToSign::send(txn, key.to_bytes().as_ref(), batch);
_public_db::BatchesToSign::send(txn, key.to_bytes().as_ref(), batch);
}
/// Receive a batch to sign and publish.
pub fn try_recv(txn: &mut impl DbTxn, key: &K) -> Option<Batch> {
_public_db::BatchToSign::try_recv(txn, key.to_bytes().as_ref())
_public_db::BatchesToSign::try_recv(txn, key.to_bytes().as_ref())
}
}
/// The batches which were acknowledged on-chain.
pub struct AcknowledgedBatch<K: GroupEncoding>(PhantomData<K>);
impl<K: GroupEncoding> AcknowledgedBatch<K> {
pub struct AcknowledgedBatches<K: GroupEncoding>(PhantomData<K>);
impl<K: GroupEncoding> AcknowledgedBatches<K> {
pub(crate) fn send(txn: &mut impl DbTxn, key: &K, batch: u32) {
_public_db::AcknowledgedBatch::send(txn, key.to_bytes().as_ref(), &batch);
_public_db::AcknowledgedBatches::send(txn, key.to_bytes().as_ref(), &batch);
}
/// Receive the ID of a batch which was acknowledged.
pub fn try_recv(txn: &mut impl DbTxn, key: &K) -> Option<u32> {
_public_db::AcknowledgedBatch::try_recv(txn, key.to_bytes().as_ref())
_public_db::AcknowledgedBatches::try_recv(txn, key.to_bytes().as_ref())
}
}

View file

@ -21,7 +21,7 @@ pub use lifetime::LifetimeStage;
// Database schema definition and associated functions.
mod db;
use db::ScannerGlobalDb;
pub use db::{BatchToSign, AcknowledgedBatch, CompletedEventualities};
pub use db::{BatchesToSign, AcknowledgedBatches, CompletedEventualities};
// Task to index the blockchain, ensuring we don't reorganize finalized blocks.
mod index;
// Scans blocks for received coins.

View file

@ -8,7 +8,7 @@ use serai_in_instructions_primitives::{MAX_BATCH_SIZE, Batch};
use primitives::task::ContinuallyRan;
use crate::{
db::{Returnable, ScannerGlobalDb, InInstructionData, ScanToReportDb, BatchToSign},
db::{Returnable, ScannerGlobalDb, InInstructionData, ScanToReportDb, BatchesToSign},
index,
scan::next_to_scan_for_outputs_block,
ScannerFeed, KeyFor,
@ -160,7 +160,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ReportTask<D, S> {
}
for batch in batches {
BatchToSign::send(&mut txn, &external_key_for_session_to_sign_batch, &batch);
BatchesToSign::send(&mut txn, &external_key_for_session_to_sign_batch, &batch);
}
}

View file

@ -6,7 +6,7 @@ use serai_coins_primitives::{OutInstruction, OutInstructionWithBalance};
use primitives::task::ContinuallyRan;
use crate::{
db::{ScannerGlobalDb, SubstrateToEventualityDb, AcknowledgedBatch},
db::{ScannerGlobalDb, SubstrateToEventualityDb, AcknowledgedBatches},
report, ScannerFeed, KeyFor,
};
@ -82,7 +82,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for SubstrateTask<D, S> {
{
let external_key_for_session_to_sign_batch =
report::take_external_key_for_session_to_sign_batch::<S>(&mut txn, batch_id).unwrap();
AcknowledgedBatch::send(&mut txn, &external_key_for_session_to_sign_batch, batch_id);
AcknowledgedBatches::send(&mut txn, &external_key_for_session_to_sign_batch, batch_id);
}
// Mark we made progress and handle this

View file

@ -21,15 +21,18 @@ workspace = true
[dependencies]
async-trait = { version = "0.1", default-features = false }
rand_core = { version = "0.6", default-features = false }
zeroize = { version = "1", default-features = false, features = ["std"] }
ciphersuite = { path = "../../crypto/ciphersuite", default-features = false, features = ["std"] }
frost = { package = "modular-frost", path = "../../crypto/frost", default-features = false }
frost-schnorrkel = { path = "../../crypto/schnorrkel", default-features = false }
scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std"] }
borsh = { version = "1", default-features = false, features = ["std", "derive", "de_strict_order"] }
serai-validator-sets-primitives = { path = "../../substrate/validator-sets/primitives", default-features = false, features = ["std"] }
serai-in-instructions-primitives = { path = "../../substrate/in-instructions/primitives", default-features = false, features = ["std"] }
serai-db = { path = "../../common/db" }
log = { version = "0.4", default-features = false, features = ["std"] }

View file

@ -0,0 +1,13 @@
use serai_validator_sets_primitives::Session;
use serai_in_instructions_primitives::{Batch, SignedBatch};
use serai_db::{Get, DbTxn, create_db};
create_db! {
BatchSigner {
ActiveSigningProtocols: (session: Session) -> Vec<u32>,
Batches: (id: u32) -> Batch,
SignedBatches: (id: u32) -> SignedBatch,
LastAcknowledgedBatch: () -> u32,
}
}

View file

@ -0,0 +1,180 @@
use std::collections::HashSet;
use ciphersuite::{group::GroupEncoding, Ristretto};
use frost::dkg::ThresholdKeys;
use serai_validator_sets_primitives::Session;
use serai_in_instructions_primitives::{SignedBatch, batch_message};
use serai_db::{DbTxn, Db};
use messages::sign::VariantSignId;
use primitives::task::ContinuallyRan;
use scanner::{BatchesToSign, AcknowledgedBatches};
use frost_attempt_manager::*;
use crate::{
db::{CoordinatorToBatchSignerMessages, BatchSignerToCoordinatorMessages},
WrappedSchnorrkelMachine,
};
mod db;
use db::*;
// Fetches batches to sign and signs them.
pub(crate) struct BatchSignerTask<D: Db, E: GroupEncoding> {
db: D,
session: Session,
external_key: E,
keys: Vec<ThresholdKeys<Ristretto>>,
active_signing_protocols: HashSet<u32>,
attempt_manager: AttemptManager<D, WrappedSchnorrkelMachine>,
}
impl<D: Db, E: GroupEncoding> BatchSignerTask<D, E> {
pub(crate) fn new(
db: D,
session: Session,
external_key: E,
keys: Vec<ThresholdKeys<Ristretto>>,
) -> Self {
let mut active_signing_protocols = HashSet::new();
let mut attempt_manager = AttemptManager::new(
db.clone(),
session,
keys.first().expect("creating a batch signer with 0 keys").params().i(),
);
// Re-register all active signing protocols
for id in ActiveSigningProtocols::get(&db, session).unwrap_or(vec![]) {
active_signing_protocols.insert(id);
let batch = Batches::get(&db, id).unwrap();
assert_eq!(batch.id, id);
let mut machines = Vec::with_capacity(keys.len());
for keys in &keys {
machines.push(WrappedSchnorrkelMachine::new(keys.clone(), batch_message(&batch)));
}
attempt_manager.register(VariantSignId::Batch(id), machines);
}
Self { db, session, external_key, keys, active_signing_protocols, attempt_manager }
}
}
#[async_trait::async_trait]
impl<D: Db, E: Send + GroupEncoding> ContinuallyRan for BatchSignerTask<D, E> {
async fn run_iteration(&mut self) -> Result<bool, String> {
let mut iterated = false;
// Check for new batches to sign
loop {
let mut txn = self.db.txn();
let Some(batch) = BatchesToSign::try_recv(&mut txn, &self.external_key) else {
break;
};
iterated = true;
// Save this to the database as a transaction to sign
self.active_signing_protocols.insert(batch.id);
ActiveSigningProtocols::set(
&mut txn,
self.session,
&self.active_signing_protocols.iter().copied().collect(),
);
Batches::set(&mut txn, batch.id, &batch);
let mut machines = Vec::with_capacity(self.keys.len());
for keys in &self.keys {
machines.push(WrappedSchnorrkelMachine::new(keys.clone(), batch_message(&batch)));
}
for msg in self.attempt_manager.register(VariantSignId::Batch(batch.id), machines) {
BatchSignerToCoordinatorMessages::send(&mut txn, self.session, &msg);
}
txn.commit();
}
// Check for acknowledged Batches (meaning we should no longer sign for these Batches)
loop {
let mut txn = self.db.txn();
let Some(id) = AcknowledgedBatches::try_recv(&mut txn, &self.external_key) else {
break;
};
{
let last_acknowledged = LastAcknowledgedBatch::get(&txn);
if Some(id) > last_acknowledged {
LastAcknowledgedBatch::set(&mut txn, &id);
}
}
/*
We may have yet to register this signing protocol.
While `BatchesToSign` is populated before `AcknowledgedBatches`, we could theoretically have
`BatchesToSign` populated with a new batch _while iterating over `AcknowledgedBatches`_, and
then have `AcknowledgedBatched` populated. In that edge case, we will see the
acknowledgement notification before we see the transaction.
In such a case, we break (dropping the txn, re-queueing the acknowledgement notification).
On the task's next iteration, we'll process the Batch from `BatchesToSign` and be
able to make progress.
*/
if !self.active_signing_protocols.remove(&id) {
break;
}
iterated = true;
// Since it was, remove this as an active signing protocol
ActiveSigningProtocols::set(
&mut txn,
self.session,
&self.active_signing_protocols.iter().copied().collect(),
);
// Clean up the database
Batches::del(&mut txn, id);
SignedBatches::del(&mut txn, id);
// We retire with a txn so we either successfully flag this Batch as acknowledged, and
// won't re-register it (making this retire safe), or we don't flag it, meaning we will
// re-register it, yet that's safe as we have yet to retire it
self.attempt_manager.retire(&mut txn, VariantSignId::Batch(id));
txn.commit();
}
// Handle any messages sent to us
loop {
let mut txn = self.db.txn();
let Some(msg) = CoordinatorToBatchSignerMessages::try_recv(&mut txn, self.session) else {
break;
};
iterated = true;
match self.attempt_manager.handle(msg) {
Response::Messages(msgs) => {
for msg in msgs {
BatchSignerToCoordinatorMessages::send(&mut txn, self.session, &msg);
}
}
Response::Signature { id, signature } => {
let VariantSignId::Batch(id) = id else { panic!("BatchSignerTask signed a non-Batch") };
let batch =
Batches::get(&txn, id).expect("signed a Batch we didn't save to the database");
let signed_batch = SignedBatch { batch, signature: signature.into() };
SignedBatches::set(&mut txn, signed_batch.batch.id, &signed_batch);
}
}
txn.commit();
}
Ok(iterated)
}
}

View file

@ -93,6 +93,8 @@ impl<D: Db, C: Coordinator> ContinuallyRan for CoordinatorTask<D, C> {
}
}
// TODO: For max(last acknowledged batch, last published batch) onwards, publish every batch
Ok(iterated)
}
}

View file

@ -11,6 +11,7 @@ use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto};
use frost::dkg::{ThresholdCore, ThresholdKeys};
use serai_validator_sets_primitives::Session;
use serai_in_instructions_primitives::SignedBatch;
use serai_db::{DbTxn, Db};
@ -19,25 +20,34 @@ use messages::sign::{VariantSignId, ProcessorMessage, CoordinatorMessage};
use primitives::task::{Task, TaskHandle, ContinuallyRan};
use scheduler::{Transaction, SignableTransaction, TransactionFor};
mod wrapped_schnorrkel;
pub(crate) use wrapped_schnorrkel::WrappedSchnorrkelMachine;
pub(crate) mod db;
mod coordinator;
use coordinator::CoordinatorTask;
mod batch;
use batch::BatchSignerTask;
mod transaction;
use transaction::TransactionTask;
use transaction::TransactionSignerTask;
/// A connection to the Coordinator which messages can be published with.
#[async_trait::async_trait]
pub trait Coordinator: 'static + Send + Sync {
/// An error encountered when sending a message.
/// An error encountered when interacting with a coordinator.
///
/// This MUST be an ephemeral error. Retrying sending a message MUST eventually resolve without
/// This MUST be an ephemeral error. Retrying an interaction MUST eventually resolve without
/// manual intervention/changing the arguments.
type EphemeralError: Debug;
/// Send a `messages::sign::ProcessorMessage`.
async fn send(&mut self, message: ProcessorMessage) -> Result<(), Self::EphemeralError>;
/// Publish a `SignedBatch`.
async fn publish_batch(&mut self, batch: SignedBatch) -> Result<(), Self::EphemeralError>;
}
/// An object capable of publishing a transaction.
@ -111,13 +121,18 @@ impl<ST: SignableTransaction> Signers<ST> {
<ST::Ciphersuite as Ciphersuite>::read_G(&mut external_key_bytes).unwrap();
assert!(external_key_bytes.is_empty());
// Drain the Batches to sign
// This will be fully populated by the scanner before retiry occurs, making this perfect
// in not leaving any pending blobs behind
while scanner::BatchesToSign::try_recv(&mut txn, &external_key).is_some() {}
// Drain the acknowledged batches to no longer sign
while scanner::AcknowledgedBatches::try_recv(&mut txn, &external_key).is_some() {}
// Drain the transactions to sign
// TransactionsToSign will be fully populated by the scheduler before retiry occurs, making
// this perfect in not leaving any pending blobs behind
// This will be fully populated by the scheduler before retiry
while scheduler::TransactionsToSign::<ST>::try_recv(&mut txn, &external_key).is_some() {}
// Drain the completed Eventualities
// This will be fully populated by the scanner before retiry
while scanner::CompletedEventualities::try_recv(&mut txn, &external_key).is_some() {}
// Drain our DB channels
@ -156,18 +171,37 @@ impl<ST: SignableTransaction> Signers<ST> {
// TODO: Batch signer, cosigner, slash report signers
let (batch_task, batch_handle) = Task::new();
tokio::spawn(
BatchSignerTask::new(
db.clone(),
session,
external_keys[0].group_key(),
substrate_keys.clone(),
)
.continually_run(batch_task, vec![coordinator_handle.clone()]),
);
let (transaction_task, transaction_handle) = Task::new();
tokio::spawn(
TransactionTask::<_, ST, _>::new(db.clone(), publisher.clone(), session, external_keys)
TransactionSignerTask::<_, ST, _>::new(
db.clone(),
publisher.clone(),
session,
external_keys,
)
.continually_run(transaction_task, vec![coordinator_handle.clone()]),
);
tasks.insert(session, Tasks {
tasks.insert(
session,
Tasks {
cosigner: todo!("TODO"),
batch: todo!("TODO"),
batch: batch_handle,
slash_report: todo!("TODO"),
transaction: transaction_handle,
});
},
);
}
Self { coordinator_handle, tasks, _ST: PhantomData }
@ -246,19 +280,27 @@ impl<ST: SignableTransaction> Signers<ST> {
match sign_id.id {
VariantSignId::Cosign(_) => {
db::CoordinatorToCosignerMessages::send(txn, sign_id.session, message);
if let Some(tasks) = tasks { tasks.cosigner.run_now(); }
if let Some(tasks) = tasks {
tasks.cosigner.run_now();
}
}
VariantSignId::Batch(_) => {
db::CoordinatorToBatchSignerMessages::send(txn, sign_id.session, message);
if let Some(tasks) = tasks { tasks.batch.run_now(); }
if let Some(tasks) = tasks {
tasks.batch.run_now();
}
}
VariantSignId::SlashReport(_) => {
db::CoordinatorToSlashReportSignerMessages::send(txn, sign_id.session, message);
if let Some(tasks) = tasks { tasks.slash_report.run_now(); }
if let Some(tasks) = tasks {
tasks.slash_report.run_now();
}
}
VariantSignId::Transaction(_) => {
db::CoordinatorToTransactionSignerMessages::send(txn, sign_id.session, message);
if let Some(tasks) = tasks { tasks.transaction.run_now(); }
if let Some(tasks) = tasks {
tasks.transaction.run_now();
}
}
}
}

View file

@ -26,7 +26,7 @@ mod db;
use db::*;
// Fetches transactions to sign and signs them.
pub(crate) struct TransactionTask<
pub(crate) struct TransactionSignerTask<
D: Db,
ST: SignableTransaction,
P: TransactionPublisher<TransactionFor<ST>>,
@ -44,7 +44,7 @@ pub(crate) struct TransactionTask<
}
impl<D: Db, ST: SignableTransaction, P: TransactionPublisher<TransactionFor<ST>>>
TransactionTask<D, ST, P>
TransactionSignerTask<D, ST, P>
{
pub(crate) fn new(
db: D,
@ -90,7 +90,7 @@ impl<D: Db, ST: SignableTransaction, P: TransactionPublisher<TransactionFor<ST>>
#[async_trait::async_trait]
impl<D: Db, ST: SignableTransaction, P: TransactionPublisher<TransactionFor<ST>>> ContinuallyRan
for TransactionTask<D, ST, P>
for TransactionSignerTask<D, ST, P>
{
async fn run_iteration(&mut self) -> Result<bool, String> {
let mut iterated = false;
@ -193,17 +193,16 @@ impl<D: Db, ST: SignableTransaction, P: TransactionPublisher<TransactionFor<ST>>
&mut txn,
match id {
VariantSignId::Transaction(id) => id,
_ => panic!("TransactionTask signed a non-transaction"),
_ => panic!("TransactionSignerTask signed a non-transaction"),
},
&buf,
);
}
self
.publisher
.publish(signed_tx)
.await
.map_err(|e| format!("couldn't publish transaction: {e:?}"))?;
match self.publisher.publish(signed_tx).await {
Ok(()) => {}
Err(e) => log::warn!("couldn't broadcast transaction: {e:?}"),
}
}
}

View file

@ -0,0 +1,86 @@
use std::{
collections::HashMap,
io::{self, Read},
};
use rand_core::{RngCore, CryptoRng};
use ciphersuite::Ristretto;
use frost::{
dkg::{Participant, ThresholdKeys},
FrostError,
algorithm::Algorithm,
sign::*,
};
use frost_schnorrkel::Schnorrkel;
// This wraps a Schnorrkel sign machine into one with a preset message.
#[derive(Clone)]
pub(crate) struct WrappedSchnorrkelMachine(ThresholdKeys<Ristretto>, Vec<u8>);
impl WrappedSchnorrkelMachine {
pub(crate) fn new(keys: ThresholdKeys<Ristretto>, msg: Vec<u8>) -> Self {
Self(keys, msg)
}
}
pub(crate) struct WrappedSchnorrkelSignMachine(
<AlgorithmMachine<Ristretto, Schnorrkel> as PreprocessMachine>::SignMachine,
Vec<u8>,
);
type Signature = <AlgorithmMachine<Ristretto, Schnorrkel> as PreprocessMachine>::Signature;
impl PreprocessMachine for WrappedSchnorrkelMachine {
type Preprocess = <AlgorithmMachine<Ristretto, Schnorrkel> as PreprocessMachine>::Preprocess;
type Signature = Signature;
type SignMachine = WrappedSchnorrkelSignMachine;
fn preprocess<R: RngCore + CryptoRng>(
self,
rng: &mut R,
) -> (Self::SignMachine, Preprocess<Ristretto, <Schnorrkel as Algorithm<Ristretto>>::Addendum>)
{
let WrappedSchnorrkelMachine(keys, batch) = self;
let (machine, preprocess) =
AlgorithmMachine::new(Schnorrkel::new(b"substrate"), keys).preprocess(rng);
(WrappedSchnorrkelSignMachine(machine, batch), preprocess)
}
}
impl SignMachine<Signature> for WrappedSchnorrkelSignMachine {
type Params = <AlgorithmSignMachine<Ristretto, Schnorrkel> as SignMachine<Signature>>::Params;
type Keys = <AlgorithmSignMachine<Ristretto, Schnorrkel> as SignMachine<Signature>>::Keys;
type Preprocess =
<AlgorithmSignMachine<Ristretto, Schnorrkel> as SignMachine<Signature>>::Preprocess;
type SignatureShare =
<AlgorithmSignMachine<Ristretto, Schnorrkel> as SignMachine<Signature>>::SignatureShare;
type SignatureMachine =
<AlgorithmSignMachine<Ristretto, Schnorrkel> as SignMachine<Signature>>::SignatureMachine;
fn cache(self) -> CachedPreprocess {
unimplemented!()
}
fn from_cache(
_algorithm: Schnorrkel,
_keys: ThresholdKeys<Ristretto>,
_cache: CachedPreprocess,
) -> (Self, Self::Preprocess) {
unimplemented!()
}
fn read_preprocess<R: Read>(&self, reader: &mut R) -> io::Result<Self::Preprocess> {
self.0.read_preprocess(reader)
}
fn sign(
self,
preprocesses: HashMap<
Participant,
Preprocess<Ristretto, <Schnorrkel as Algorithm<Ristretto>>::Addendum>,
>,
msg: &[u8],
) -> Result<(Self::SignatureMachine, SignatureShare<Ristretto>), FrostError> {
assert!(msg.is_empty());
self.0.sign(preprocesses, &self.1)
}
}

View file

@ -1,8 +0,0 @@
#[allow(clippy::type_complexity)]
#[derive(Clone, Debug)]
pub enum MultisigEvent<N: Network> {
// Batches to publish
Batches(Option<(<N::Curve as Ciphersuite>::G, <N::Curve as Ciphersuite>::G)>, Vec<Batch>),
// Eventuality completion found on-chain
Completed(Vec<u8>, [u8; 32], N::Eventuality),
}