Start on the new processor main loop

This commit is contained in:
Luke Parker 2024-09-11 04:54:03 -04:00
parent a8159e9070
commit 4054e44471
5 changed files with 99 additions and 268 deletions

View file

@ -1,5 +1,13 @@
use serai_client::validator_sets::primitives::Session;
use serai_db::{Get, DbTxn, create_db};
create_db! {
Processor {
ExternalKeyForSession: (session: Session) -> Vec<u8>,
}
}
create_db! {
BitcoinProcessor {
LatestBlockToYieldAsFinalized: () -> u64,

View file

@ -1,12 +1,10 @@
use ciphersuite::{group::GroupEncoding, Ciphersuite, Secp256k1};
use frost::ThresholdKeys;
use key_gen::KeyGenParams;
use crate::scan::scanner;
pub(crate) struct KeyGen;
impl KeyGenParams for KeyGen {
pub(crate) struct KeyGenParams;
impl key_gen::KeyGenParams for KeyGenParams {
const ID: &'static str = "Bitcoin";
type ExternalNetworkCurve = Secp256k1;

View file

@ -6,6 +6,10 @@
static ALLOCATOR: zalloc::ZeroizingAlloc<std::alloc::System> =
zalloc::ZeroizingAlloc(std::alloc::System);
use ciphersuite::Ciphersuite;
use serai_db::{DbTxn, Db};
mod primitives;
pub(crate) use primitives::*;
@ -14,8 +18,11 @@ mod scan;
// App-logic trait satisfactions
mod key_gen;
use crate::key_gen::KeyGenParams;
mod rpc;
use rpc::Rpc;
mod scheduler;
use scheduler::Scheduler;
// Our custom code for Bitcoin
mod db;
@ -29,6 +36,82 @@ pub(crate) fn hash_bytes(hash: bitcoin_serai::bitcoin::hashes::sha256d::Hash) ->
res
}
/// Fetch the next message from the Coordinator.
///
/// This message is guaranteed to have never been handled before, where handling is defined as
/// this `txn` being committed.
async fn next_message(_txn: &mut impl DbTxn) -> messages::CoordinatorMessage {
todo!("TODO")
}
async fn send_message(_msg: messages::ProcessorMessage) {
todo!("TODO")
}
async fn coordinator_loop<D: Db>(
mut db: D,
mut key_gen: ::key_gen::KeyGen<KeyGenParams, D>,
mut signers: signers::Signers<D, Rpc<D>, Scheduler<D>, Rpc<D>>,
mut scanner: Option<scanner::Scanner<Rpc<D>>>,
) {
loop {
let mut txn = Some(db.txn());
let msg = next_message(txn.as_mut().unwrap()).await;
match msg {
messages::CoordinatorMessage::KeyGen(msg) => {
// This is a computationally expensive call yet it happens infrequently
for msg in key_gen.handle(txn.as_mut().unwrap(), msg) {
send_message(messages::ProcessorMessage::KeyGen(msg)).await;
}
}
// These are cheap calls which are fine to be here in this loop
messages::CoordinatorMessage::Sign(msg) => signers.queue_message(txn.as_mut().unwrap(), &msg),
messages::CoordinatorMessage::Coordinator(
messages::coordinator::CoordinatorMessage::CosignSubstrateBlock {
session,
block_number,
block,
},
) => signers.cosign_block(txn.take().unwrap(), session, block_number, block),
messages::CoordinatorMessage::Coordinator(
messages::coordinator::CoordinatorMessage::SignSlashReport { session, report },
) => signers.sign_slash_report(txn.take().unwrap(), session, &report),
messages::CoordinatorMessage::Substrate(msg) => match msg {
messages::substrate::CoordinatorMessage::SetKeys { serai_time, session, key_pair } => {
db::ExternalKeyForSession::set(txn.as_mut().unwrap(), session, &key_pair.1.into_inner());
todo!("TODO: Register in signers");
todo!("TODO: Scanner activation")
}
messages::substrate::CoordinatorMessage::SlashesReported { session } => {
let key_bytes = db::ExternalKeyForSession::get(txn.as_ref().unwrap(), session).unwrap();
let mut key_bytes = key_bytes.as_slice();
let key =
<KeyGenParams as ::key_gen::KeyGenParams>::ExternalNetworkCurve::read_G(&mut key_bytes)
.unwrap();
assert!(key_bytes.is_empty());
signers.retire_session(txn.as_mut().unwrap(), session, &key)
}
messages::substrate::CoordinatorMessage::BlockWithBatchAcknowledgement {
block,
batch_id,
in_instruction_succeededs,
burns,
key_to_activate,
} => todo!("TODO"),
messages::substrate::CoordinatorMessage::BlockWithoutBatchAcknowledgement {
block,
burns,
} => todo!("TODO"),
},
};
// If the txn wasn't already consumed and committed, commit it
if let Some(txn) = txn {
txn.commit();
}
}
}
#[tokio::main]
async fn main() {}

View file

@ -113,8 +113,6 @@ pub mod sign {
pub attempt: u32,
}
// TODO: Make this generic to the ID once we introduce topics into the message-queue and remove
// the global ProcessorMessage/CoordinatorMessage
#[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)]
pub enum CoordinatorMessage {
// Received preprocesses for the specified signing protocol.
@ -185,8 +183,10 @@ pub mod substrate {
#[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)]
pub enum CoordinatorMessage {
/// Keys set on the Serai network.
/// Keys set on the Serai blockchain.
SetKeys { serai_time: u64, session: Session, key_pair: KeyPair },
/// Slashes reported on the Serai blockchain OR the process timed out.
SlashesReported { session: Session },
/// The data from a block which acknowledged a Batch.
BlockWithBatchAcknowledgement {
block: u64,
@ -305,11 +305,12 @@ impl CoordinatorMessage {
CoordinatorMessage::Substrate(msg) => {
let (sub, id) = match msg {
substrate::CoordinatorMessage::SetKeys { session, .. } => (0, session.encode()),
substrate::CoordinatorMessage::SlashesReported { session } => (1, session.encode()),
substrate::CoordinatorMessage::BlockWithBatchAcknowledgement { block, .. } => {
(1, block.encode())
(2, block.encode())
}
substrate::CoordinatorMessage::BlockWithoutBatchAcknowledgement { block, .. } => {
(2, block.encode())
(3, block.encode())
}
};

View file

@ -1,21 +1,3 @@
use std::{time::Duration, collections::HashMap};
use zeroize::{Zeroize, Zeroizing};
use ciphersuite::{
group::{ff::PrimeField, GroupEncoding},
Ciphersuite, Ristretto,
};
use dkg::evrf::EvrfCurve;
use log::{info, warn};
use tokio::time::sleep;
use serai_client::{
primitives::{BlockHash, NetworkId},
validator_sets::primitives::{Session, KeyPair},
};
use messages::{
coordinator::{
SubstrateSignableId, PlanMeta, CoordinatorMessage as CoordinatorCoordinatorMessage,
@ -27,112 +9,18 @@ use serai_env as env;
use message_queue::{Service, client::MessageQueue};
mod networks;
use networks::{Block, Network};
#[cfg(feature = "bitcoin")]
use networks::Bitcoin;
#[cfg(feature = "ethereum")]
use networks::Ethereum;
#[cfg(feature = "monero")]
use networks::Monero;
mod db;
pub use db::*;
mod coordinator;
pub use coordinator::*;
use serai_processor_key_gen as key_gen;
use key_gen::{SessionDb, KeyConfirmed, KeyGen};
mod signer;
use signer::Signer;
mod cosigner;
use cosigner::Cosigner;
mod batch_signer;
use batch_signer::BatchSigner;
mod slash_report_signer;
use slash_report_signer::SlashReportSigner;
mod multisigs;
use multisigs::{MultisigEvent, MultisigManager};
#[cfg(test)]
mod tests;
#[global_allocator]
static ALLOCATOR: zalloc::ZeroizingAlloc<std::alloc::System> =
zalloc::ZeroizingAlloc(std::alloc::System);
// Items which are mutably borrowed by Tributary.
// Any exceptions to this have to be carefully monitored in order to ensure consistency isn't
// violated.
struct TributaryMutable<N: Network, D: Db> {
// The following are actually mutably borrowed by Substrate as well.
// - Substrate triggers key gens, and determines which to use.
// - SubstrateBlock events cause scheduling which causes signing.
//
// This is still considered Tributary-mutable as most mutation (preprocesses/shares) happens by
// the Tributary.
//
// Creation of tasks is by Substrate, yet this is safe since the mutable borrow is transferred to
// Tributary.
//
// Tributary stops mutating a key gen attempt before Substrate is made aware of it, ensuring
// Tributary drops its mutable borrow before Substrate acquires it. Tributary will maintain a
// mutable borrow on the *key gen task*, yet the finalization code can successfully run for any
// attempt.
//
// The only other note is how the scanner may cause a signer task to be dropped, effectively
// invalidating the Tributary's mutable borrow. The signer is coded to allow for attempted usage
// of a dropped task.
key_gen: KeyGen<N, D>,
signers: HashMap<Session, Signer<N, D>>,
// This is also mutably borrowed by the Scanner.
// The Scanner starts new sign tasks.
// The Tributary mutates already-created signed tasks, potentially completing them.
// Substrate may mark tasks as completed, invalidating any existing mutable borrows.
// The safety of this follows as written above.
// There should only be one BatchSigner at a time (see #277)
batch_signer: Option<BatchSigner<D>>,
// Solely mutated by the tributary.
cosigner: Option<Cosigner>,
slash_report_signer: Option<SlashReportSigner>,
}
// Items which are mutably borrowed by Substrate.
// Any exceptions to this have to be carefully monitored in order to ensure consistency isn't
// violated.
/*
The MultisigManager contains the Scanner and Schedulers.
The scanner is expected to autonomously operate, scanning blocks as they appear. When a block is
sufficiently confirmed, the scanner causes the Substrate signer to sign a batch. It itself only
mutates its list of finalized blocks, to protect against re-orgs, and its in-memory state though.
Disk mutations to the scan-state only happens once the relevant `Batch` is included on Substrate.
It can't be mutated as soon as the `Batch` is signed as we need to know the order of `Batch`s
relevant to `Burn`s.
Schedulers take in new outputs, confirmed in `Batch`s, and outbound payments, triggered by
`Burn`s.
Substrate also decides when to move to a new multisig, hence why this entire object is
Substrate-mutable.
Since MultisigManager should always be verifiable, and the Tributary is temporal, MultisigManager
being entirely SubstrateMutable shows proper data pipe-lining.
*/
type SubstrateMutable<N, D> = MultisigManager<D, N>;
async fn handle_coordinator_msg<D: Db, N: Network, Co: Coordinator>(
txn: &mut D::Transaction<'_>,
network: &N,
@ -141,54 +29,6 @@ async fn handle_coordinator_msg<D: Db, N: Network, Co: Coordinator>(
substrate_mutable: &mut SubstrateMutable<N, D>,
msg: &Message,
) {
// If this message expects a higher block number than we have, halt until synced
async fn wait<N: Network, D: Db>(
txn: &D::Transaction<'_>,
substrate_mutable: &SubstrateMutable<N, D>,
block_hash: &BlockHash,
) {
let mut needed_hash = <N::Block as Block<N>>::Id::default();
needed_hash.as_mut().copy_from_slice(&block_hash.0);
loop {
// Ensure our scanner has scanned this block, which means our daemon has this block at
// a sufficient depth
if substrate_mutable.block_number(txn, &needed_hash).await.is_none() {
warn!(
"node is desynced. we haven't scanned {} which should happen after {} confirms",
hex::encode(&needed_hash),
N::CONFIRMATIONS,
);
sleep(Duration::from_secs(10)).await;
continue;
};
break;
}
// TODO2: Sanity check we got an AckBlock (or this is the AckBlock) for the block in question
/*
let synced = |context: &SubstrateContext, key| -> Result<(), ()> {
// Check that we've synced this block and can actually operate on it ourselves
let latest = scanner.latest_scanned(key);
if usize::try_from(context.network_latest_finalized_block).unwrap() < latest {
log::warn!(
"external network node disconnected/desynced from rest of the network. \
our block: {latest:?}, network's acknowledged: {}",
context.network_latest_finalized_block,
);
Err(())?;
}
Ok(())
};
*/
}
if let Some(required) = msg.msg.required_block() {
// wait only reads from, it doesn't mutate, substrate_mutable
wait(txn, substrate_mutable, &required).await;
}
async fn activate_key<N: Network, D: Db>(
network: &N,
substrate_mutable: &mut SubstrateMutable<N, D>,
@ -220,105 +60,6 @@ async fn handle_coordinator_msg<D: Db, N: Network, Co: Coordinator>(
}
match msg.msg.clone() {
CoordinatorMessage::KeyGen(msg) => {
for msg in tributary_mutable.key_gen.handle(txn, msg) {
coordinator.send(msg).await;
}
}
CoordinatorMessage::Sign(msg) => {
if let Some(msg) = tributary_mutable
.signers
.get_mut(&msg.session())
.expect("coordinator told us to sign with a signer we don't have")
.handle(txn, msg)
.await
{
coordinator.send(msg).await;
}
}
CoordinatorMessage::Coordinator(msg) => match msg {
CoordinatorCoordinatorMessage::CosignSubstrateBlock { id, block_number } => {
let SubstrateSignableId::CosigningSubstrateBlock(block) = id.id else {
panic!("CosignSubstrateBlock id didn't have a CosigningSubstrateBlock")
};
let Some(keys) = tributary_mutable.key_gen.substrate_keys_by_session(id.session) else {
panic!("didn't have key shares for the key we were told to cosign with");
};
if let Some((cosigner, msg)) =
Cosigner::new(txn, id.session, keys, block_number, block, id.attempt)
{
tributary_mutable.cosigner = Some(cosigner);
coordinator.send(msg).await;
} else {
log::warn!("Cosigner::new returned None");
}
}
CoordinatorCoordinatorMessage::SignSlashReport { id, report } => {
assert_eq!(id.id, SubstrateSignableId::SlashReport);
let Some(keys) = tributary_mutable.key_gen.substrate_keys_by_session(id.session) else {
panic!("didn't have key shares for the key we were told to perform a slash report with");
};
if let Some((slash_report_signer, msg)) =
SlashReportSigner::new(txn, N::NETWORK, id.session, keys, report, id.attempt)
{
tributary_mutable.slash_report_signer = Some(slash_report_signer);
coordinator.send(msg).await;
} else {
log::warn!("SlashReportSigner::new returned None");
}
}
_ => {
let (is_cosign, is_batch, is_slash_report) = match msg {
CoordinatorCoordinatorMessage::CosignSubstrateBlock { .. } |
CoordinatorCoordinatorMessage::SignSlashReport { .. } => (false, false, false),
CoordinatorCoordinatorMessage::SubstratePreprocesses { ref id, .. } |
CoordinatorCoordinatorMessage::SubstrateShares { ref id, .. } => (
matches!(&id.id, SubstrateSignableId::CosigningSubstrateBlock(_)),
matches!(&id.id, SubstrateSignableId::Batch(_)),
matches!(&id.id, SubstrateSignableId::SlashReport),
),
CoordinatorCoordinatorMessage::BatchReattempt { .. } => (false, true, false),
};
if is_cosign {
if let Some(cosigner) = tributary_mutable.cosigner.as_mut() {
if let Some(msg) = cosigner.handle(txn, msg) {
coordinator.send(msg).await;
}
} else {
log::warn!(
"received message for cosigner yet didn't have a cosigner. {}",
"this is an error if we didn't reboot",
);
}
} else if is_batch {
if let Some(msg) = tributary_mutable
.batch_signer
.as_mut()
.expect(
"coordinator told us to sign a batch when we don't currently have a Substrate signer",
)
.handle(txn, msg)
{
coordinator.send(msg).await;
}
} else if is_slash_report {
if let Some(slash_report_signer) = tributary_mutable.slash_report_signer.as_mut() {
if let Some(msg) = slash_report_signer.handle(txn, msg) {
coordinator.send(msg).await;
}
} else {
log::warn!(
"received message for slash report signer yet didn't have {}",
"a slash report signer. this is an error if we didn't reboot",
);
}
}
}
},
CoordinatorMessage::Substrate(msg) => {
match msg {
messages::substrate::CoordinatorMessage::ConfirmKeyPair { context, session, key_pair } => {