Add CosignerTask to signers, completing it

This commit is contained in:
Luke Parker 2024-09-09 16:20:04 -04:00
parent 46c12c0e66
commit 8aba71b9c4
7 changed files with 261 additions and 53 deletions

View file

@ -82,7 +82,7 @@ pub mod sign {
#[derive(Clone, Copy, PartialEq, Eq, Hash, Encode, Decode, BorshSerialize, BorshDeserialize)]
pub enum VariantSignId {
Cosign([u8; 32]),
Cosign(u64),
Batch(u32),
SlashReport(Session),
Transaction([u8; 32]),
@ -91,7 +91,7 @@ pub mod sign {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
match self {
Self::Cosign(cosign) => {
f.debug_struct("VariantSignId::Cosign").field("0", &hex::encode(cosign)).finish()
f.debug_struct("VariantSignId::Cosign").field("0", &cosign).finish()
}
Self::Batch(batch) => f.debug_struct("VariantSignId::Batch").field("0", &batch).finish(),
Self::SlashReport(session) => {

View file

@ -90,6 +90,21 @@ impl<D: Db, C: Coordinator> ContinuallyRan for CoordinatorTask<D, C> {
txn.commit();
}
// Publish the cosigns from this session
{
let mut txn = self.db.txn();
while let Some(((block_number, block_id), signature)) = Cosign::try_recv(&mut txn, session)
{
iterated = true;
self
.coordinator
.publish_cosign(block_number, block_id, <_>::decode(&mut signature.as_slice()).unwrap())
.await
.map_err(|e| format!("couldn't publish Cosign: {e:?}"))?;
}
txn.commit();
}
// If this session signed its slash report, publish its signature
{
let mut txn = self.db.txn();

View file

@ -0,0 +1,9 @@
use serai_validator_sets_primitives::Session;
use serai_db::{Get, DbTxn, create_db};
create_db! {
SignersCosigner {
LatestCosigned: (session: Session) -> u64,
}
}

View file

@ -0,0 +1,122 @@
use ciphersuite::Ristretto;
use frost::dkg::ThresholdKeys;
use scale::Encode;
use serai_primitives::Signature;
use serai_validator_sets_primitives::Session;
use serai_db::{DbTxn, Db};
use messages::{sign::VariantSignId, coordinator::cosign_block_msg};
use primitives::task::ContinuallyRan;
use frost_attempt_manager::*;
use crate::{
db::{ToCosign, Cosign, CoordinatorToCosignerMessages, CosignerToCoordinatorMessages},
WrappedSchnorrkelMachine,
};
mod db;
use db::LatestCosigned;
/// Fetches the latest cosign information and works on it.
///
/// Only the latest cosign attempt is kept. We don't work on historical attempts as later cosigns
/// supersede them.
#[allow(non_snake_case)]
pub(crate) struct CosignerTask<D: Db> {
db: D,
session: Session,
keys: Vec<ThresholdKeys<Ristretto>>,
current_cosign: Option<(u64, [u8; 32])>,
attempt_manager: AttemptManager<D, WrappedSchnorrkelMachine>,
}
impl<D: Db> CosignerTask<D> {
pub(crate) fn new(db: D, session: Session, keys: Vec<ThresholdKeys<Ristretto>>) -> Self {
let attempt_manager = AttemptManager::new(
db.clone(),
session,
keys.first().expect("creating a cosigner with 0 keys").params().i(),
);
Self { db, session, keys, current_cosign: None, attempt_manager }
}
}
#[async_trait::async_trait]
impl<D: Db> ContinuallyRan for CosignerTask<D> {
async fn run_iteration(&mut self) -> Result<bool, String> {
let mut iterated = false;
// Check the cosign to work on
{
let mut txn = self.db.txn();
if let Some(cosign) = ToCosign::get(&txn, self.session) {
// If this wasn't already signed for...
if LatestCosigned::get(&txn, self.session) < Some(cosign.0) {
// If this isn't the cosign we're currently working on, meaning it's fresh
if self.current_cosign != Some(cosign) {
// Retire the current cosign
if let Some(current_cosign) = self.current_cosign {
assert!(current_cosign.0 < cosign.0);
self.attempt_manager.retire(&mut txn, VariantSignId::Cosign(current_cosign.0));
}
// Set the cosign being worked on
self.current_cosign = Some(cosign);
let mut machines = Vec::with_capacity(self.keys.len());
{
let message = cosign_block_msg(cosign.0, cosign.1);
for keys in &self.keys {
machines.push(WrappedSchnorrkelMachine::new(keys.clone(), message.clone()));
}
}
for msg in self.attempt_manager.register(VariantSignId::Cosign(cosign.0), machines) {
CosignerToCoordinatorMessages::send(&mut txn, self.session, &msg);
}
txn.commit();
}
}
}
}
// Handle any messages sent to us
loop {
let mut txn = self.db.txn();
let Some(msg) = CoordinatorToCosignerMessages::try_recv(&mut txn, self.session) else {
break;
};
iterated = true;
match self.attempt_manager.handle(msg) {
Response::Messages(msgs) => {
for msg in msgs {
CosignerToCoordinatorMessages::send(&mut txn, self.session, &msg);
}
}
Response::Signature { id, signature } => {
let VariantSignId::Cosign(block_number) = id else {
panic!("CosignerTask signed a non-Cosign")
};
assert_eq!(Some(block_number), self.current_cosign.map(|cosign| cosign.0));
let cosign = self.current_cosign.take().unwrap();
LatestCosigned::set(&mut txn, self.session, &cosign.0);
// Send the cosign
Cosign::send(&mut txn, self.session, &(cosign, Signature::from(signature).encode()));
}
}
txn.commit();
}
Ok(iterated)
}
}

View file

@ -10,12 +10,15 @@ create_db! {
SerializedKeys: (session: Session) -> Vec<u8>,
LatestRetiredSession: () -> Session,
ToCleanup: () -> Vec<(Session, Vec<u8>)>,
ToCosign: (session: Session) -> (u64, [u8; 32]),
}
}
db_channel! {
SignersGlobal {
Cosign: (session: Session) -> (u64, [u8; 32]),
Cosign: (session: Session) -> ((u64, [u8; 32]), Vec<u8>),
SlashReport: (session: Session) -> Vec<Slash>,
SlashReportSignature: (session: Session) -> Vec<u8>,

View file

@ -30,6 +30,9 @@ pub(crate) mod db;
mod coordinator;
use coordinator::CoordinatorTask;
mod cosign;
use cosign::CosignerTask;
mod batch;
use batch::BatchSignerTask;
@ -51,6 +54,14 @@ pub trait Coordinator: 'static + Send + Sync {
/// Send a `messages::sign::ProcessorMessage`.
async fn send(&mut self, message: ProcessorMessage) -> Result<(), Self::EphemeralError>;
/// Publish a cosign.
async fn publish_cosign(
&mut self,
block_number: u64,
block_id: [u8; 32],
signature: Signature,
) -> Result<(), Self::EphemeralError>;
/// Publish a `Batch`.
async fn publish_batch(&mut self, batch: Batch) -> Result<(), Self::EphemeralError>;
@ -92,7 +103,14 @@ struct Tasks {
/// The signers used by a processor.
#[allow(non_snake_case)]
pub struct Signers<S: ScannerFeed, Sch: Scheduler<S>> {
pub struct Signers<
D: Db,
S: ScannerFeed,
Sch: Scheduler<S>,
P: TransactionPublisher<TransactionFor<SignableTransactionFor<S, Sch>>>,
> {
db: D,
publisher: P,
coordinator_handle: TaskHandle,
tasks: HashMap<Session, Tasks>,
_Sch: PhantomData<Sch>,
@ -115,15 +133,66 @@ type SignableTransactionFor<S, Sch> = <Sch as Scheduler<S>>::SignableTransaction
completion comes in *before* we registered a key, the signer will hold the signing protocol in
memory until the session is retired entirely.
*/
impl<S: ScannerFeed, Sch: Scheduler<S>> Signers<S, Sch> {
impl<
D: Db,
S: ScannerFeed,
Sch: Scheduler<S>,
P: TransactionPublisher<TransactionFor<SignableTransactionFor<S, Sch>>>,
> Signers<D, S, Sch, P>
{
fn tasks(
db: D,
publisher: P,
coordinator_handle: TaskHandle,
session: Session,
substrate_keys: Vec<ThresholdKeys<Ristretto>>,
external_keys: Vec<ThresholdKeys<CiphersuiteFor<S, Sch>>>,
) -> Tasks {
let (cosign_task, cosign_handle) = Task::new();
tokio::spawn(
CosignerTask::new(db.clone(), session, substrate_keys.clone())
.continually_run(cosign_task, vec![coordinator_handle.clone()]),
);
let (batch_task, batch_handle) = Task::new();
tokio::spawn(
BatchSignerTask::new(
db.clone(),
session,
external_keys[0].group_key(),
substrate_keys.clone(),
)
.continually_run(batch_task, vec![coordinator_handle.clone()]),
);
let (slash_report_task, slash_report_handle) = Task::new();
tokio::spawn(
SlashReportSignerTask::<_, S>::new(db.clone(), session, substrate_keys)
.continually_run(slash_report_task, vec![coordinator_handle.clone()]),
);
let (transaction_task, transaction_handle) = Task::new();
tokio::spawn(
TransactionSignerTask::<_, SignableTransactionFor<S, Sch>, _>::new(
db,
publisher,
session,
external_keys,
)
.continually_run(transaction_task, vec![coordinator_handle]),
);
Tasks {
cosigner: cosign_handle,
batch: batch_handle,
slash_report: slash_report_handle,
transaction: transaction_handle,
}
}
/// Initialize the signers.
///
/// This will spawn tasks for any historically registered keys.
pub fn new(
mut db: impl Db,
coordinator: impl Coordinator,
publisher: &impl TransactionPublisher<TransactionFor<SignableTransactionFor<S, Sch>>>,
) -> Self {
pub fn new(mut db: D, coordinator: impl Coordinator, publisher: P) -> Self {
/*
On boot, perform any database cleanup which was queued.
@ -158,6 +227,8 @@ impl<S: ScannerFeed, Sch: Scheduler<S>> Signers<S, Sch> {
// Drain the completed Eventualities
while scanner::CompletedEventualities::try_recv(&mut txn, &external_key).is_some() {}
// Delete the cosign this session should be working on
db::ToCosign::del(&mut txn, session);
// Drain our DB channels
while db::Cosign::try_recv(&mut txn, session).is_some() {}
while db::SlashReport::try_recv(&mut txn, session).is_some() {}
@ -195,48 +266,20 @@ impl<S: ScannerFeed, Sch: Scheduler<S>> Signers<S, Sch> {
));
}
// TODO: Cosigner
let (batch_task, batch_handle) = Task::new();
tokio::spawn(
BatchSignerTask::new(
db.clone(),
session,
external_keys[0].group_key(),
substrate_keys.clone(),
)
.continually_run(batch_task, vec![coordinator_handle.clone()]),
);
let (slash_report_task, slash_report_handle) = Task::new();
tokio::spawn(
SlashReportSignerTask::<_, S>::new(db.clone(), session, substrate_keys.clone())
.continually_run(slash_report_task, vec![coordinator_handle.clone()]),
);
let (transaction_task, transaction_handle) = Task::new();
tokio::spawn(
TransactionSignerTask::<_, SignableTransactionFor<S, Sch>, _>::new(
db.clone(),
publisher.clone(),
session,
external_keys,
)
.continually_run(transaction_task, vec![coordinator_handle.clone()]),
);
tasks.insert(
session,
Tasks {
cosigner: todo!("TODO"),
batch: batch_handle,
slash_report: slash_report_handle,
transaction: transaction_handle,
},
Self::tasks(
db.clone(),
publisher.clone(),
coordinator_handle.clone(),
session,
substrate_keys,
external_keys,
),
);
}
Self { coordinator_handle, tasks, _Sch: PhantomData, _S: PhantomData }
Self { db, publisher, coordinator_handle, tasks, _Sch: PhantomData, _S: PhantomData }
}
/// Register a set of keys to sign with.
@ -247,7 +290,7 @@ impl<S: ScannerFeed, Sch: Scheduler<S>> Signers<S, Sch> {
txn: &mut impl DbTxn,
session: Session,
substrate_keys: Vec<ThresholdKeys<Ristretto>>,
network_keys: Vec<ThresholdKeys<CiphersuiteFor<S, Sch>>>,
external_keys: Vec<ThresholdKeys<CiphersuiteFor<S, Sch>>>,
) {
// Don't register already retired keys
if Some(session.0) <= db::LatestRetiredSession::get(txn).map(|session| session.0) {
@ -262,12 +305,25 @@ impl<S: ScannerFeed, Sch: Scheduler<S>> Signers<S, Sch> {
{
let mut buf = Zeroizing::new(Vec::with_capacity(2 * substrate_keys.len() * 128));
for (substrate_keys, network_keys) in substrate_keys.into_iter().zip(network_keys) {
for (substrate_keys, external_keys) in substrate_keys.iter().zip(&external_keys) {
buf.extend(&*substrate_keys.serialize());
buf.extend(&*network_keys.serialize());
buf.extend(&*external_keys.serialize());
}
db::SerializedKeys::set(txn, session, &buf);
}
// Spawn the tasks
self.tasks.insert(
session,
Self::tasks(
self.db.clone(),
self.publisher.clone(),
self.coordinator_handle.clone(),
session,
substrate_keys,
external_keys,
),
);
}
/// Retire the signers for a session.
@ -302,6 +358,9 @@ impl<S: ScannerFeed, Sch: Scheduler<S>> Signers<S, Sch> {
let mut to_cleanup = db::ToCleanup::get(txn).unwrap_or(vec![]);
to_cleanup.push((session, external_key.to_bytes().as_ref().to_vec()));
db::ToCleanup::set(txn, &to_cleanup);
// Drop the task handles, which will cause the tasks to close
self.tasks.remove(&session);
}
/// Queue handling a message.
@ -348,7 +407,7 @@ impl<S: ScannerFeed, Sch: Scheduler<S>> Signers<S, Sch> {
block_number: u64,
block: [u8; 32],
) {
db::Cosign::send(&mut txn, session, &(block_number, block));
db::ToCosign::set(&mut txn, session, &(block_number, block));
txn.commit();
if let Some(tasks) = self.tasks.get(&session) {

View file

@ -26,7 +26,7 @@ use crate::{
WrappedSchnorrkelMachine,
};
// Fetches slash_reportes to sign and signs them.
// Fetches slash reports to sign and signs them.
#[allow(non_snake_case)]
pub(crate) struct SlashReportSignerTask<D: Db, S: ScannerFeed> {
db: D,
@ -44,7 +44,7 @@ impl<D: Db, S: ScannerFeed> SlashReportSignerTask<D, S> {
let attempt_manager = AttemptManager::new(
db.clone(),
session,
keys.first().expect("creating a slash_report signer with 0 keys").params().i(),
keys.first().expect("creating a slash report signer with 0 keys").params().i(),
);
Self { db, _S: PhantomData, session, keys, has_slash_report: false, attempt_manager }