diff --git a/Cargo.lock b/Cargo.lock index 77b417d1..3d3cb38e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6593,6 +6593,7 @@ dependencies = [ "serai-client", "serde", "serde_json", + "sp-application-crypto", "thiserror", "tokio", "zeroize", diff --git a/processor/Cargo.toml b/processor/Cargo.toml index 24927166..b14ca99d 100644 --- a/processor/Cargo.toml +++ b/processor/Cargo.toml @@ -37,6 +37,9 @@ group = "0.13" transcript = { package = "flexible-transcript", path = "../crypto/transcript" } frost = { package = "modular-frost", path = "../crypto/frost", features = ["ristretto"] } +# Substrate +sp-application-crypto = { git = "https://github.com/serai-dex/substrate", default-features = false } + # Bitcoin secp256k1 = { version = "0.27", features = ["global-context", "rand-std"], optional = true } k256 = { version = "0.13", features = ["arithmetic"], optional = true } @@ -50,7 +53,7 @@ monero-serai = { path = "../coins/monero", features = ["multisig"], optional = t log = "0.4" tokio = { version = "1", features = ["full"] } -serai-client = { path = "../substrate//client", default-features = false } +serai-client = { path = "../substrate/client", default-features = false } messages = { package = "processor-messages", path = "./messages" } diff --git a/processor/messages/src/lib.rs b/processor/messages/src/lib.rs index 9eb2c337..fce2b948 100644 --- a/processor/messages/src/lib.rs +++ b/processor/messages/src/lib.rs @@ -10,14 +10,15 @@ use serde::{Serialize, Deserialize}; use dkg::{Participant, ThresholdParams}; -use in_instructions_primitives::InInstructionWithBalance; +use serai_primitives::BlockHash; +use in_instructions_primitives::SignedBatch; use tokens_primitives::OutInstructionWithBalance; use validator_sets_primitives::ValidatorSet; #[derive(Clone, Copy, PartialEq, Eq, Debug, Zeroize, Serialize, Deserialize)] pub struct SubstrateContext { pub time: u64, - pub coin_latest_block_number: u64, + pub coin_latest_finalized_block: BlockHash, } pub mod key_gen { @@ -41,6 +42,16 @@ pub mod key_gen { ConfirmKeyPair { context: SubstrateContext, id: KeyGenId }, } + impl CoordinatorMessage { + pub fn required_block(&self) -> Option { + if let CoordinatorMessage::ConfirmKeyPair { context, .. } = self { + Some(context.coin_latest_finalized_block) + } else { + None + } + } + } + #[derive(Clone, PartialEq, Eq, Debug, Serialize, Deserialize)] pub enum ProcessorMessage { // Created commitments for the specified key generation protocol. @@ -95,6 +106,12 @@ pub mod sign { Completed { key: Vec, id: [u8; 32], tx: Vec }, } + impl CoordinatorMessage { + pub fn required_block(&self) -> Option { + None + } + } + #[derive(Clone, PartialEq, Eq, Debug, Zeroize, Serialize, Deserialize)] pub enum ProcessorMessage { // Created preprocess for the specified signing protocol. @@ -116,18 +133,72 @@ pub mod sign { } } +pub mod coordinator { + use super::*; + + #[derive(Clone, PartialEq, Eq, Debug, Serialize, Deserialize)] + pub enum CoordinatorMessage { + // The validators have off-chain agreeance on this block being finalized. That means it should + // be signed and published to Substrate. + BlockFinalized { key: Vec, block: BlockHash }, + + // Uses Vec instead of [u8; 64] since serde Deserialize isn't implemented for [u8; 64] + BlockPreprocesses { key: Vec, block: BlockHash, preprocesses: HashMap> }, + BlockShares { key: Vec, block: BlockHash, shares: HashMap }, + // Needed so a client which didn't participate in signing can still realize signing completed + BlockSigned { key: Vec, block: BlockHash, signature: Vec }, + } + + impl CoordinatorMessage { + pub fn required_block(&self) -> Option { + Some(match self { + CoordinatorMessage::BlockFinalized { block, .. } => *block, + CoordinatorMessage::BlockPreprocesses { block, .. } => *block, + CoordinatorMessage::BlockShares { block, .. } => *block, + CoordinatorMessage::BlockSigned { block, .. } => *block, + }) + } + } + + #[derive(Clone, PartialEq, Eq, Debug, Zeroize, Serialize, Deserialize)] + pub enum ProcessorMessage { + // This should become an inherent transaction by the block producer. + // As an inherent, this should be ~41 bytes per update. + // Ideally, we don't need to put finalized_block on chain though. + Block { key: Vec, latest_number: u64, finalized_block: BlockHash }, + + BlockPreprocess { key: Vec, block: BlockHash, preprocess: Vec }, + BlockSign { key: Vec, block: BlockHash, share: [u8; 32] }, + } +} + pub mod substrate { use super::*; #[derive(Clone, PartialEq, Eq, Debug, Zeroize, Serialize, Deserialize)] pub enum CoordinatorMessage { - BlockAcknowledged { context: SubstrateContext, key: Vec, block: Vec }, + // Substrate acknwoledged the block, meaning it should be acted upon. + // + // This still needs to come from Substrate, not from the validator-chain, due to it mutating + // the scheduler, which the Substrate chain primarily does. To have two causes of mutation + // requires a definitive ordering, which isn't achievable when we have distinct consensus. + BlockAcknowledged { context: SubstrateContext, key: Vec, block: BlockHash }, Burns { context: SubstrateContext, burns: Vec }, } + impl CoordinatorMessage { + pub fn required_block(&self) -> Option { + let context = match self { + CoordinatorMessage::BlockAcknowledged { context, .. } => context, + CoordinatorMessage::Burns { context, .. } => context, + }; + Some(context.coin_latest_finalized_block) + } + } + #[derive(Clone, PartialEq, Eq, Debug, Zeroize, Serialize, Deserialize)] pub enum ProcessorMessage { - Update { key: Vec, block: Vec, instructions: Vec }, + Update { key: Vec, batch: SignedBatch }, } } @@ -135,12 +206,25 @@ pub mod substrate { pub enum CoordinatorMessage { KeyGen(key_gen::CoordinatorMessage), Sign(sign::CoordinatorMessage), + Coordinator(coordinator::CoordinatorMessage), Substrate(substrate::CoordinatorMessage), } +impl CoordinatorMessage { + pub fn required_block(&self) -> Option { + match self { + CoordinatorMessage::KeyGen(msg) => msg.required_block(), + CoordinatorMessage::Sign(msg) => msg.required_block(), + CoordinatorMessage::Coordinator(msg) => msg.required_block(), + CoordinatorMessage::Substrate(msg) => msg.required_block(), + } + } +} + #[derive(Clone, PartialEq, Eq, Debug, Serialize, Deserialize)] pub enum ProcessorMessage { KeyGen(key_gen::ProcessorMessage), Sign(sign::ProcessorMessage), + Coordinator(coordinator::ProcessorMessage), Substrate(substrate::ProcessorMessage), } diff --git a/processor/src/coins/bitcoin.rs b/processor/src/coins/bitcoin.rs index 04e9a0af..b780c71a 100644 --- a/processor/src/coins/bitcoin.rs +++ b/processor/src/coins/bitcoin.rs @@ -37,7 +37,7 @@ use bitcoin_serai::bitcoin::{ }; use serai_client::{ - primitives::{MAX_DATA_LEN, BITCOIN, Amount, Balance}, + primitives::{MAX_DATA_LEN, BITCOIN, BITCOIN_NET_ID, NetworkId, Amount, Balance}, coins::bitcoin::Address, }; @@ -286,6 +286,7 @@ impl Coin for Bitcoin { type Address = Address; + const NETWORK: NetworkId = BITCOIN_NET_ID; const ID: &'static str = "Bitcoin"; const CONFIRMATIONS: usize = 3; diff --git a/processor/src/coins/mod.rs b/processor/src/coins/mod.rs index d44ec12c..5c57d4a8 100644 --- a/processor/src/coins/mod.rs +++ b/processor/src/coins/mod.rs @@ -10,7 +10,7 @@ use frost::{ sign::PreprocessMachine, }; -use serai_client::primitives::Balance; +use serai_client::primitives::{NetworkId, Balance}; #[cfg(feature = "bitcoin")] pub mod bitcoin; @@ -157,6 +157,7 @@ impl Default for EventualitiesTracker { } pub trait Block: Send + Sync + Sized + Clone + Debug { + // This is currently bounded to being 32-bytes. type Id: 'static + Id; fn id(&self) -> Self::Id; fn median_fee(&self) -> C::Fee; @@ -249,6 +250,8 @@ pub trait Coin: 'static + Send + Sync + Clone + PartialEq + Eq + Debug { + TryInto> + TryFrom>; + /// Network ID for this coin. + const NETWORK: NetworkId; /// String ID for this coin. const ID: &'static str; /// The amount of confirmations required to consider a block 'final'. diff --git a/processor/src/coins/monero.rs b/processor/src/coins/monero.rs index 6939c5cb..7ba78fb3 100644 --- a/processor/src/coins/monero.rs +++ b/processor/src/coins/monero.rs @@ -26,7 +26,7 @@ use monero_serai::{ use tokio::time::sleep; pub use serai_client::{ - primitives::{MAX_DATA_LEN, MONERO, Amount, Balance}, + primitives::{MAX_DATA_LEN, MONERO, MONERO_NET_ID, NetworkId, Amount, Balance}, coins::monero::Address, }; @@ -217,6 +217,7 @@ impl Coin for Monero { type Address = Address; + const NETWORK: NetworkId = MONERO_NET_ID; const ID: &'static str = "Monero"; const CONFIRMATIONS: usize = 10; diff --git a/processor/src/key_gen.rs b/processor/src/key_gen.rs index 65c75488..85e5faa4 100644 --- a/processor/src/key_gen.rs +++ b/processor/src/key_gen.rs @@ -15,7 +15,7 @@ use frost::{ use log::info; -use serai_client::validator_sets::primitives::ValidatorSet; +use serai_client::{primitives::BlockHash, validator_sets::primitives::ValidatorSet}; use messages::key_gen::*; use crate::{DbTxn, Db, coins::Coin}; @@ -23,7 +23,7 @@ use crate::{DbTxn, Db, coins::Coin}; #[derive(Debug)] pub enum KeyGenEvent { KeyConfirmed { - activation_number: usize, + activation_block: BlockHash, substrate_keys: ThresholdKeys, coin_keys: ThresholdKeys, }, @@ -374,7 +374,7 @@ impl KeyGen { ); KeyGenEvent::KeyConfirmed { - activation_number: context.coin_latest_block_number.try_into().unwrap(), + activation_block: context.coin_latest_finalized_block, substrate_keys, coin_keys, } diff --git a/processor/src/main.rs b/processor/src/main.rs index 2a74fd01..4acdd31b 100644 --- a/processor/src/main.rs +++ b/processor/src/main.rs @@ -19,12 +19,14 @@ use tokio::time::sleep; use scale::Decode; use serai_client::{ - primitives::MAX_DATA_LEN, + primitives::{MAX_DATA_LEN, BlockHash}, tokens::primitives::{OutInstruction, OutInstructionWithBalance}, - in_instructions::primitives::{Shorthand, RefundableInInstruction, InInstructionWithBalance}, + in_instructions::primitives::{ + Shorthand, RefundableInInstruction, InInstructionWithBalance, Batch, SignedBatch, + }, }; -use messages::{SubstrateContext, sign, substrate, CoordinatorMessage, ProcessorMessage}; +use messages::{SubstrateContext, CoordinatorMessage, ProcessorMessage}; mod plan; pub use plan::*; @@ -135,8 +137,15 @@ async fn sign_plans( plans: Vec>, ) { let mut plans = VecDeque::from(plans); + let start = SystemTime::UNIX_EPOCH.checked_add(Duration::from_secs(context.time)).unwrap(); - let block_number = context.coin_latest_block_number.try_into().unwrap(); + + let mut block_hash = >::Id::default(); + block_hash.as_mut().copy_from_slice(&context.coin_latest_finalized_block.0); + let block_number = scanner + .block_number(&block_hash) + .await + .expect("told to sign_plans on a context we're not synced to"); let fee = get_fee(coin, block_number).await; @@ -145,7 +154,7 @@ async fn sign_plans( info!("preparing plan {}: {:?}", hex::encode(id), plan); let key = plan.key.to_bytes(); - db.save_signing(key.as_ref(), context.coin_latest_block_number, context.time, &plan); + db.save_signing(key.as_ref(), block_number.try_into().unwrap(), context.time, &plan); let (tx, branches) = prepare_send(coin, &signers[key.as_ref()], block_number, fee, plan).await; // TODO: If we reboot mid-sign_plans, for a DB-backed scheduler, these may be partially @@ -253,92 +262,76 @@ async fn run(raw_db: D, coin: C, mut coordinato // If this message expects a higher block number than we have, halt until synced async fn wait( - coin: &C, scanner: &ScannerHandle, - context: &SubstrateContext + block_hash: &BlockHash ) { - let needed = usize::try_from(context.coin_latest_block_number).unwrap(); + let mut needed_hash = >::Id::default(); + needed_hash.as_mut().copy_from_slice(&block_hash.0); + let block_number; loop { - let Ok(actual) = coin.get_latest_block_number().await else { - error!("couldn't get the latest block number"); - // Sleep for a minute as node errors should be incredibly uncommon yet take multiple - // seconds to resolve - sleep(Duration::from_secs(60)).await; + // Ensure our scanner has scanned this block, which means our daemon has this block at + // a sufficient depth + let Some(block_number_inner) = scanner.block_number(&needed_hash).await else { + warn!( + "node is desynced. we haven't scanned {} which should happen after {} confirms", + hex::encode(&needed_hash), + C::CONFIRMATIONS, + ); + sleep(Duration::from_secs(10)).await; continue; }; - - // Check our daemon has this block - // CONFIRMATIONS - 1 since any block's TXs have one confirmation (the block itself) - let confirmed = actual.saturating_sub(C::CONFIRMATIONS - 1); - if needed > confirmed { - // This may occur within some natural latency window - warn!( - "node is desynced. need block {}, have {}", - // Print the block needed for the needed block to be confirmed - needed + (C::CONFIRMATIONS - 1), - actual, - ); - // Sleep for one second per needed block - // If the node is disconnected from the network, this will be faster than it should - // be, yet presumably it just neeeds a moment to sync up - sleep(Duration::from_secs((needed - confirmed).try_into().unwrap())).await; - } - - // Check our scanner has scanned it - // This check does void the need for the last one, yet it provides a bit better - // debugging - let ram_scanned = scanner.ram_scanned().await; - if ram_scanned < needed { - warn!("scanner is behind. need block {}, scanned up to {}", needed, ram_scanned); - sleep(Duration::from_secs((needed - ram_scanned).try_into().unwrap())).await; - } - - // TODO: Sanity check we got an AckBlock (or this is the AckBlock) for the block in - // question - - /* - let synced = |context: &SubstrateContext, key| -> Result<(), ()> { - // Check that we've synced this block and can actually operate on it ourselves - let latest = scanner.latest_scanned(key); - if usize::try_from(context.coin_latest_block_number).unwrap() < latest { - log::warn!( - "coin node disconnected/desynced from rest of the network. \ - our block: {latest:?}, network's acknowledged: {}", - context.coin_latest_block_number - ); - Err(())?; - } - Ok(()) - }; - */ - + block_number = block_number_inner; break; } + + // While the scanner has cemented this block, that doesn't mean it's been scanned for all + // keys + // ram_scanned will return the lowest scanned block number out of all keys + while scanner.ram_scanned().await < block_number { + sleep(Duration::from_secs(1)).await; + } + + // TODO: Sanity check we got an AckBlock (or this is the AckBlock) for the block in + // question + + /* + let synced = |context: &SubstrateContext, key| -> Result<(), ()> { + // Check that we've synced this block and can actually operate on it ourselves + let latest = scanner.latest_scanned(key); + if usize::try_from(context.coin_latest_block_number).unwrap() < latest { + log::warn!( + "coin node disconnected/desynced from rest of the network. \ + our block: {latest:?}, network's acknowledged: {}", + context.coin_latest_block_number + ); + Err(())?; + } + Ok(()) + }; + */ } - match &msg.msg { - CoordinatorMessage::KeyGen(_) => {}, - CoordinatorMessage::Sign(_) => {}, - CoordinatorMessage::Substrate(msg) => { - match msg { - substrate::CoordinatorMessage::BlockAcknowledged { context, .. } => { - wait(&coin, &scanner, context).await; - }, - substrate::CoordinatorMessage::Burns { context, .. } => { - wait(&coin, &scanner, context).await; - }, - } - }, + if let Some(required) = msg.msg.required_block() { + wait(&scanner, &required).await; } match msg.msg.clone() { CoordinatorMessage::KeyGen(msg) => { match key_gen.handle(msg).await { // TODO: Handle substrate_keys - KeyGenEvent::KeyConfirmed { activation_number, substrate_keys: _, coin_keys } => { + KeyGenEvent::KeyConfirmed { activation_block, substrate_keys: _, coin_keys } => { let keys = coin_keys; let key = keys.group_key(); + + let mut activation_block_hash = >::Id::default(); + activation_block_hash.as_mut().copy_from_slice(&activation_block.0); + let activation_number = + scanner + .block_number(&activation_block_hash) + .await + .expect("KeyConfirmed from context we haven't synced"); + scanner.rotate_key(activation_number, key).await; schedulers.insert(key.to_bytes().as_ref().to_vec(), Scheduler::::new(key)); signers.insert( @@ -358,13 +351,19 @@ async fn run(raw_db: D, coin: C, mut coordinato signers[msg.key()].handle(msg).await; } + CoordinatorMessage::Coordinator(_) => todo!(), + CoordinatorMessage::Substrate(msg) => { match msg { - substrate::CoordinatorMessage::BlockAcknowledged { context, key: key_vec, block } => { + messages::substrate::CoordinatorMessage::BlockAcknowledged { + context, + key: key_vec, + block + } => { let key = ::read_G::<&[u8]>(&mut key_vec.as_ref()).unwrap(); let mut block_id = >::Id::default(); - block_id.as_mut().copy_from_slice(&block); + block_id.as_mut().copy_from_slice(&block.0); let plans = schedulers .get_mut(&key_vec) @@ -381,7 +380,7 @@ async fn run(raw_db: D, coin: C, mut coordinato ).await; } - substrate::CoordinatorMessage::Burns { context, burns } => { + messages::substrate::CoordinatorMessage::Burns { context, burns } => { // TODO2: Rewrite rotation documentation let schedule_key = active_keys.last().expect("burn event despite no keys"); let scheduler = schedulers.get_mut(schedule_key.to_bytes().as_ref()).unwrap(); @@ -424,9 +423,18 @@ async fn run(raw_db: D, coin: C, mut coordinato // TODO match msg.unwrap() { ScannerEvent::Outputs(key, block, outputs) => { - coordinator.send(ProcessorMessage::Substrate(substrate::ProcessorMessage::Update { - key: key.to_bytes().as_ref().to_vec(), - block: block.as_ref().to_vec(), + let key = key.to_bytes().as_ref().to_vec(); + + let mut block_hash = [0; 32]; + block_hash.copy_from_slice(block.as_ref()); + + // TODO + let id = 0; + + let batch = Batch { + network: C::NETWORK, + id, + block: BlockHash(block_hash), instructions: outputs.iter().filter_map(|output| { // If these aren't externally received funds, don't handle it as an instruction if output.kind() != OutputType::External { @@ -451,8 +459,18 @@ async fn run(raw_db: D, coin: C, mut coordinato instruction: instruction.instruction, balance: output.balance(), }) - }).collect(), - })).await; + }).collect() + }; + + coordinator.send(ProcessorMessage::Substrate( + messages::substrate::ProcessorMessage::Update { + key, + batch: SignedBatch { + batch, + signature: sp_application_crypto::sr25519::Signature([0; 64]), + }, + } + )).await; }, } }, @@ -462,7 +480,7 @@ async fn run(raw_db: D, coin: C, mut coordinato SignerEvent::SignedTransaction { id, tx } => { main_db.finish_signing(&key, id); coordinator - .send(ProcessorMessage::Sign(sign::ProcessorMessage::Completed { + .send(ProcessorMessage::Sign(messages::sign::ProcessorMessage::Completed { key, id, tx: tx.as_ref().to_vec() diff --git a/processor/src/scanner.rs b/processor/src/scanner.rs index 771450fe..79450447 100644 --- a/processor/src/scanner.rs +++ b/processor/src/scanner.rs @@ -232,6 +232,10 @@ impl ScannerHandle { scanner.keys.push(key); } + pub async fn block_number(&self, id: &>::Id) -> Option { + self.scanner.read().await.db.block_number(id) + } + /// Acknowledge having handled a block for a key. pub async fn ack_block( &self, @@ -341,7 +345,7 @@ impl Scanner { if let Some(id) = scanner.db.block(i) { // TODO2: Also check this block builds off the previous block - if id != block.id() { + if id != block_id { panic!("{} reorg'd from {id:?} to {:?}", C::ID, hex::encode(block_id)); } } else { diff --git a/processor/src/tests/key_gen.rs b/processor/src/tests/key_gen.rs index 6c8f0503..b178f497 100644 --- a/processor/src/tests/key_gen.rs +++ b/processor/src/tests/key_gen.rs @@ -8,7 +8,7 @@ use group::GroupEncoding; use frost::{Participant, ThresholdParams, tests::clone_without}; use serai_client::{ - primitives::MONERO_NET_ID, + primitives::{MONERO_NET_ID, BlockHash}, validator_sets::primitives::{Session, ValidatorSet}, }; @@ -119,14 +119,14 @@ pub async fn test_key_gen() { for i in 1 ..= 5 { let key_gen = key_gens.get_mut(&i).unwrap(); - if let KeyGenEvent::KeyConfirmed { activation_number, substrate_keys, coin_keys } = key_gen + if let KeyGenEvent::KeyConfirmed { activation_block, substrate_keys, coin_keys } = key_gen .handle(CoordinatorMessage::ConfirmKeyPair { - context: SubstrateContext { time: 0, coin_latest_block_number: 111 }, + context: SubstrateContext { time: 0, coin_latest_finalized_block: BlockHash([0x11; 32]) }, id: ID, }) .await { - assert_eq!(activation_number, 111); + assert_eq!(activation_block, BlockHash([0x11; 32])); let params = ThresholdParams::new(3, 5, Participant::new(u16::try_from(i).unwrap()).unwrap()).unwrap(); assert_eq!(substrate_keys.params(), params);