diff --git a/coordinator/src/cosign_evaluator.rs b/coordinator/src/cosign_evaluator.rs index 60b0cbe1..08719655 100644 --- a/coordinator/src/cosign_evaluator.rs +++ b/coordinator/src/cosign_evaluator.rs @@ -1,20 +1,23 @@ use core::time::Duration; use std::{ - sync::{Arc, Mutex, RwLock}, + sync::Arc, collections::{HashSet, HashMap}, }; -use tokio::{sync::mpsc, time::sleep}; +use tokio::{ + sync::{mpsc, Mutex, RwLock}, + time::sleep, +}; use scale::Encode; use sp_application_crypto::RuntimePublic; use serai_client::{ primitives::{NETWORKS, NetworkId, Signature}, validator_sets::primitives::{Session, ValidatorSet}, - SeraiError, Serai, + SeraiError, TemporalSerai, Serai, }; -use serai_db::{DbTxn, Db}; +use serai_db::{Get, DbTxn, Db, create_db}; use processor_messages::coordinator::cosign_block_msg; @@ -23,6 +26,13 @@ use crate::{ substrate::SubstrateDb, }; +create_db! { + CosignDb { + ReceivedCosign: (set: ValidatorSet, block: [u8; 32]) -> Vec, + DistinctChain: (set: ValidatorSet) -> () + } +} + pub struct CosignEvaluator { db: Mutex, serai: Arc, @@ -31,14 +41,14 @@ pub struct CosignEvaluator { } impl CosignEvaluator { - fn update_latest_cosign(&self) { - let stakes_lock = self.stakes.read().unwrap(); + async fn update_latest_cosign(&self) { + let stakes_lock = self.stakes.read().await; // If we haven't gotten the stake data yet, return let Some(stakes) = stakes_lock.as_ref() else { return }; let total_stake = stakes.values().cloned().sum::(); - let latest_cosigns = self.latest_cosigns.read().unwrap(); + let latest_cosigns = self.latest_cosigns.read().await; let mut highest_block = 0; for (block_num, _) in latest_cosigns.values() { let mut networks = HashSet::new(); @@ -55,7 +65,7 @@ impl CosignEvaluator { } } - let mut db_lock = self.db.lock().unwrap(); + let mut db_lock = self.db.lock().await; let mut txn = db_lock.txn(); if highest_block > SubstrateDb::::latest_cosigned_block(&txn) { log::info!("setting latest cosigned block to {}", highest_block); @@ -85,61 +95,172 @@ impl CosignEvaluator { } // Since we've successfully built stakes, set it - *self.stakes.write().unwrap() = Some(stakes); + *self.stakes.write().await = Some(stakes); - self.update_latest_cosign(); + self.update_latest_cosign().await; Ok(()) } // Uses Err to signify a message should be retried async fn handle_new_cosign(&self, cosign: CosignedBlock) -> Result<(), SeraiError> { - let Some(block) = self.serai.block(cosign.block).await? else { - log::warn!("received cosign for an unknown block"); - return Ok(()); - }; + // If we already have this cosign or a newer cosign, return + if let Some(latest) = self.latest_cosigns.read().await.get(&cosign.network) { + if latest.0 >= cosign.block_number { + return Ok(()); + } + } - // If this an old cosign, don't bother handling it - if block.number() < - self.latest_cosigns.read().unwrap().get(&cosign.network).map(|cosign| cosign.0).unwrap_or(0) - { - log::debug!("received old cosign from {:?}", cosign.network); + // If this an old cosign (older than a day), drop it + let latest_block = self.serai.latest_block().await?; + if (cosign.block_number + (24 * 60 * 60 / 6)) < latest_block.number() { + log::debug!("received old cosign supposedly signed by {:?}", cosign.network); return Ok(()); } - // Get the key for this network as of the prior block - let serai = self.serai.as_of(block.header().parent_hash.into()); - - let Some(latest_session) = serai.validator_sets().session(cosign.network).await? else { - log::warn!("received cosign from {:?}, which doesn't yet have a session", cosign.network); + let Some(block) = self.serai.block_by_number(cosign.block_number).await? else { + log::warn!("received cosign with a block number which doesn't map to a block"); return Ok(()); }; - let prior_session = Session(latest_session.0.saturating_sub(1)); - let set_with_keys = if serai - .validator_sets() - .keys(ValidatorSet { network: cosign.network, session: prior_session }) - .await? - .is_some() - { - ValidatorSet { network: cosign.network, session: prior_session } - } else { - ValidatorSet { network: cosign.network, session: latest_session } - }; + async fn set_with_keys_fn( + serai: &TemporalSerai<'_>, + network: NetworkId, + ) -> Result, SeraiError> { + let Some(latest_session) = serai.validator_sets().session(network).await? else { + log::warn!("received cosign from {:?}, which doesn't yet have a session", network); + return Ok(None); + }; + let prior_session = Session(latest_session.0.saturating_sub(1)); + Ok(Some( + if serai + .validator_sets() + .keys(ValidatorSet { network, session: prior_session }) + .await? + .is_some() + { + ValidatorSet { network, session: prior_session } + } else { + ValidatorSet { network, session: latest_session } + }, + )) + } + + // Get the key for this network as of the prior block + // If we have two chains, this value may be different across chains depending on if one chain + // included the set_keys and one didn't + // Because set_keys will force a cosign, it will force detection of distinct blocks + // re: set_keys using keys prior to set_keys (assumed amenable to all) + let serai = self.serai.as_of(block.header().parent_hash.into()); + + let Some(set_with_keys) = set_with_keys_fn(&serai, cosign.network).await? else { + return Ok(()); + }; let Some(keys) = serai.validator_sets().keys(set_with_keys).await? else { log::warn!("received cosign for a block we didn't have keys for"); return Ok(()); }; - if !keys.0.verify(&cosign_block_msg(cosign.block), &Signature(cosign.signature)) { + if !keys + .0 + .verify(&cosign_block_msg(cosign.block_number, cosign.block), &Signature(cosign.signature)) + { log::warn!("received cosigned block with an invalid signature"); return Ok(()); } - log::info!("received cosign for block {} by {:?}", block.number(), cosign.network); - self.latest_cosigns.write().unwrap().insert(cosign.network, (block.number(), cosign)); + log::info!( + "received cosign for block {} ({}) by {:?}", + block.number(), + hex::encode(cosign.block), + cosign.network + ); - self.update_latest_cosign(); + // Save this cosign to the DB + { + let mut db = self.db.lock().await; + let mut txn = db.txn(); + ReceivedCosign::set(&mut txn, set_with_keys, cosign.block, &cosign.encode()); + txn.commit(); + } + + if cosign.block != block.hash() { + log::error!( + "received cosign for a distinct block at {}. we have {}. cosign had {}", + cosign.block_number, + hex::encode(block.hash()), + hex::encode(cosign.block) + ); + + let serai = self.serai.as_of(latest_block.hash()); + + let mut db = self.db.lock().await; + // Save this set as being on a different chain + let mut txn = db.txn(); + DistinctChain::set(&mut txn, set_with_keys, &()); + txn.commit(); + + let mut total_stake = 0; + let mut total_on_distinct_chain = 0; + for network in NETWORKS { + if network == NetworkId::Serai { + continue; + } + + // Get the current set for this network + let set_with_keys = { + let mut res; + while { + res = set_with_keys_fn(&serai, cosign.network).await; + res.is_err() + } { + log::error!( + "couldn't get the set with keys when checking for a distinct chain: {:?}", + res + ); + tokio::time::sleep(core::time::Duration::from_secs(3)).await; + } + res.unwrap() + }; + + // Get its stake + // Doesn't use the stakes inside self to prevent deadlocks re: multi-lock acquisition + if let Some(set_with_keys) = set_with_keys { + let stake = { + let mut res; + while { + res = serai.validator_sets().total_allocated_stake(set_with_keys.network).await; + res.is_err() + } { + log::error!( + "couldn't get total allocated stake when checking for a distinct chain: {:?}", + res + ); + tokio::time::sleep(core::time::Duration::from_secs(3)).await; + } + res.unwrap() + }; + + if let Some(stake) = stake { + total_stake += stake.0; + + if DistinctChain::get(&*db, set_with_keys).is_some() { + total_on_distinct_chain += stake.0; + } + } + } + } + + // See https://github.com/serai-dex/serai/issues/339 for the reasoning on 17% + if (total_stake * 17 / 100) <= total_on_distinct_chain { + panic!("17% of validator sets (by stake) have co-signed a distinct chain"); + } + } else { + let mut latest_cosigns = self.latest_cosigns.write().await; + + latest_cosigns.insert(cosign.network, (block.number(), cosign)); + self.update_latest_cosign().await; + } Ok(()) } @@ -191,7 +312,7 @@ impl CosignEvaluator { let cosigns = evaluator .latest_cosigns .read() - .unwrap() + .await .values() .map(|cosign| cosign.1) .collect::>(); diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index be549d77..ba8e9632 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -294,9 +294,10 @@ async fn handle_processor_message( coordinator::ProcessorMessage::SubstrateShare { id, .. } => { Some(SubstrateDb::::session_for_key(&txn, &id.key).unwrap()) } - coordinator::ProcessorMessage::CosignedBlock { block, signature } => { + coordinator::ProcessorMessage::CosignedBlock { block_number, block, signature } => { let cosigned_block = CosignedBlock { network, + block_number: *block_number, block: *block, signature: { let mut arr = [0; 64]; diff --git a/coordinator/src/p2p.rs b/coordinator/src/p2p.rs index a78c97a5..a484b42e 100644 --- a/coordinator/src/p2p.rs +++ b/coordinator/src/p2p.rs @@ -43,6 +43,7 @@ const LIBP2P_TOPIC: &str = "serai-coordinator"; #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, Encode, Decode)] pub struct CosignedBlock { pub network: NetworkId, + pub block_number: u64, pub block: [u8; 32], pub signature: [u8; 64], } diff --git a/coordinator/src/substrate/mod.rs b/coordinator/src/substrate/mod.rs index 8a4cb186..8144d610 100644 --- a/coordinator/src/substrate/mod.rs +++ b/coordinator/src/substrate/mod.rs @@ -30,7 +30,7 @@ use tokio::{sync::mpsc, time::sleep}; use crate::{ Db, processors::Processors, - tributary::{TributarySpec, TributaryDb}, + tributary::{TributarySpec, SeraiBlockNumber, TributaryDb}, }; mod db; @@ -456,6 +456,16 @@ async fn handle_new_blocks( let maximally_latent_cosign_block = skipped_block.map(|skipped_block| skipped_block + COSIGN_DISTANCE); for block in (last_intended_to_cosign_block + 1) ..= latest_number { + SeraiBlockNumber::set( + &mut txn, + serai + .block_by_number(block) + .await? + .expect("couldn't get block which should've been finalized") + .hash(), + &block, + ); + let mut set = false; let block_has_events = block_has_events(&mut txn, serai, block).await?; diff --git a/coordinator/src/tributary/db.rs b/coordinator/src/tributary/db.rs index 6ff6f76b..bd771642 100644 --- a/coordinator/src/tributary/db.rs +++ b/coordinator/src/tributary/db.rs @@ -15,6 +15,12 @@ pub use serai_db::*; use crate::tributary::TributarySpec; +create_db! { + NewTributaryDb { + SeraiBlockNumber: (hash: [u8; 32]) -> u64 + } +} + #[derive(Clone, Copy, PartialEq, Eq, Debug)] pub enum Topic { Dkg, diff --git a/coordinator/src/tributary/handle.rs b/coordinator/src/tributary/handle.rs index e5f22585..f0e3a22a 100644 --- a/coordinator/src/tributary/handle.rs +++ b/coordinator/src/tributary/handle.rs @@ -27,8 +27,8 @@ use serai_db::{Get, Db}; use crate::{ processors::Processors, tributary::{ - Transaction, TributarySpec, Topic, DataSpecification, TributaryDb, DataSet, Accumulation, - TributaryState, + Transaction, TributarySpec, SeraiBlockNumber, Topic, DataSpecification, TributaryDb, DataSet, + Accumulation, TributaryState, nonce_decider::NonceDecider, dkg_confirmer::DkgConfirmer, scanner::{RecognizedIdType, RIDTrait}, @@ -528,6 +528,8 @@ pub(crate) async fn handle_application_tx< id: SubstrateSignableId::CosigningSubstrateBlock(hash), attempt: 0, }, + block_number: SeraiBlockNumber::get(txn, hash) + .expect("CosignSubstrateBlock yet didn't save Serai block number"), }, ) .await; diff --git a/processor/messages/src/lib.rs b/processor/messages/src/lib.rs index 055b4180..2905c16e 100644 --- a/processor/messages/src/lib.rs +++ b/processor/messages/src/lib.rs @@ -156,10 +156,11 @@ pub mod sign { pub mod coordinator { use super::*; - pub fn cosign_block_msg(block: [u8; 32]) -> Vec { + pub fn cosign_block_msg(block_number: u64, block: [u8; 32]) -> Vec { const DST: &[u8] = b"Cosign"; let mut res = vec![u8::try_from(DST.len()).unwrap()]; res.extend(DST); + res.extend(block_number.to_le_bytes()); res.extend(block); res } @@ -181,7 +182,7 @@ pub mod coordinator { #[derive(Clone, PartialEq, Eq, Debug, Serialize, Deserialize)] pub enum CoordinatorMessage { - CosignSubstrateBlock { id: SubstrateSignId }, + CosignSubstrateBlock { id: SubstrateSignId, block_number: u64 }, // Uses Vec instead of [u8; 64] since serde Deserialize isn't implemented for [u8; 64] SubstratePreprocesses { id: SubstrateSignId, preprocesses: HashMap> }, SubstrateShares { id: SubstrateSignId, shares: HashMap }, @@ -205,7 +206,7 @@ pub mod coordinator { pub fn key(&self) -> &[u8] { match self { - CoordinatorMessage::CosignSubstrateBlock { id } => &id.key, + CoordinatorMessage::CosignSubstrateBlock { id, .. } => &id.key, CoordinatorMessage::SubstratePreprocesses { id, .. } => &id.key, CoordinatorMessage::SubstrateShares { id, .. } => &id.key, CoordinatorMessage::BatchReattempt { id } => &id.key, @@ -226,7 +227,7 @@ pub mod coordinator { CosignPreprocess { id: SubstrateSignId, preprocesses: Vec> }, BatchPreprocess { id: SubstrateSignId, block: BlockHash, preprocesses: Vec> }, SubstrateShare { id: SubstrateSignId, shares: Vec<[u8; 32]> }, - CosignedBlock { block: [u8; 32], signature: Vec }, + CosignedBlock { block_number: u64, block: [u8; 32], signature: Vec }, } } @@ -372,7 +373,7 @@ impl CoordinatorMessage { CoordinatorMessage::Coordinator(msg) => { let (sub, id) = match msg { // Unique since this is the entire message - coordinator::CoordinatorMessage::CosignSubstrateBlock { id } => (0, id.encode()), + coordinator::CoordinatorMessage::CosignSubstrateBlock { id, .. } => (0, id.encode()), // Unique since this embeds the batch ID (including its network) and attempt coordinator::CoordinatorMessage::SubstratePreprocesses { id, .. } => (1, id.encode()), coordinator::CoordinatorMessage::SubstrateShares { id, .. } => (2, id.encode()), diff --git a/processor/src/cosigner.rs b/processor/src/cosigner.rs index dc8008fd..4d552480 100644 --- a/processor/src/cosigner.rs +++ b/processor/src/cosigner.rs @@ -38,6 +38,7 @@ pub struct Cosigner { #[allow(dead_code)] // False positive keys: Vec>, + block_number: u64, id: [u8; 32], attempt: u32, #[allow(clippy::type_complexity)] @@ -50,6 +51,7 @@ impl fmt::Debug for Cosigner { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt .debug_struct("Cosigner") + .field("block_number", &self.block_number) .field("id", &self.id) .field("attempt", &self.attempt) .field("preprocessing", &self.preprocessing.is_some()) @@ -62,6 +64,7 @@ impl Cosigner { pub fn new( txn: &mut impl DbTxn, keys: Vec>, + block_number: u64, id: [u8; 32], attempt: u32, ) -> Option<(Cosigner, ProcessorMessage)> { @@ -104,7 +107,7 @@ impl Cosigner { }; Some(( - Cosigner { keys, id, attempt, preprocessing, signing: None }, + Cosigner { keys, block_number, id, attempt, preprocessing, signing: None }, ProcessorMessage::CosignPreprocess { id: substrate_sign_id, preprocesses: serialized_preprocesses, @@ -176,21 +179,22 @@ impl Cosigner { } } - let (machine, share) = match machine.sign(preprocesses, &cosign_block_msg(self.id)) { - Ok(res) => res, - Err(e) => match e { - FrostError::InternalError(_) | - FrostError::InvalidParticipant(_, _) | - FrostError::InvalidSigningSet(_) | - FrostError::InvalidParticipantQuantity(_, _) | - FrostError::DuplicatedParticipant(_) | - FrostError::MissingParticipant(_) => unreachable!(), + let (machine, share) = + match machine.sign(preprocesses, &cosign_block_msg(self.block_number, self.id)) { + Ok(res) => res, + Err(e) => match e { + FrostError::InternalError(_) | + FrostError::InvalidParticipant(_, _) | + FrostError::InvalidSigningSet(_) | + FrostError::InvalidParticipantQuantity(_, _) | + FrostError::DuplicatedParticipant(_) | + FrostError::MissingParticipant(_) => unreachable!(), - FrostError::InvalidPreprocess(l) | FrostError::InvalidShare(l) => { - return Some(ProcessorMessage::InvalidParticipant { id, participant: l }) - } - }, - }; + FrostError::InvalidPreprocess(l) | FrostError::InvalidShare(l) => { + return Some(ProcessorMessage::InvalidParticipant { id, participant: l }) + } + }, + }; if m == 0 { signature_machine = Some(machine); } @@ -278,7 +282,11 @@ impl Cosigner { Completed::set(txn, block, &()); - Some(ProcessorMessage::CosignedBlock { block, signature: sig.to_bytes().to_vec() }) + Some(ProcessorMessage::CosignedBlock { + block_number: self.block_number, + block, + signature: sig.to_bytes().to_vec(), + }) } CoordinatorMessage::BatchReattempt { .. } => panic!("BatchReattempt passed to Cosigner"), } diff --git a/processor/src/main.rs b/processor/src/main.rs index 97c66018..abf53165 100644 --- a/processor/src/main.rs +++ b/processor/src/main.rs @@ -253,7 +253,7 @@ async fn handle_coordinator_msg( } } else { match msg { - CoordinatorCoordinatorMessage::CosignSubstrateBlock { id } => { + CoordinatorCoordinatorMessage::CosignSubstrateBlock { id, block_number } => { let SubstrateSignableId::CosigningSubstrateBlock(block) = id.id else { panic!("CosignSubstrateBlock id didn't have a CosigningSubstrateBlock") }; @@ -261,7 +261,8 @@ async fn handle_coordinator_msg( else { panic!("didn't have key shares for the key we were told to cosign with"); }; - if let Some((cosigner, msg)) = Cosigner::new(txn, keys, block, id.attempt) { + if let Some((cosigner, msg)) = Cosigner::new(txn, keys, block_number, block, id.attempt) + { tributary_mutable.cosigner = Some(cosigner); coordinator.send(msg).await; } else { diff --git a/processor/src/tests/cosigner.rs b/processor/src/tests/cosigner.rs index a910e6d2..7af79452 100644 --- a/processor/src/tests/cosigner.rs +++ b/processor/src/tests/cosigner.rs @@ -24,6 +24,7 @@ async fn test_cosigner() { let participant_one = Participant::new(1).unwrap(); + let block_number = OsRng.next_u64(); let block = [0xaa; 32]; let actual_id = SubstrateSignId { @@ -54,7 +55,7 @@ async fn test_cosigner() { let mut db = MemDb::new(); let mut txn = db.txn(); let (signer, preprocess) = - Cosigner::new(&mut txn, vec![keys], block, actual_id.attempt).unwrap(); + Cosigner::new(&mut txn, vec![keys], block_number, block, actual_id.attempt).unwrap(); match preprocess { // All participants should emit a preprocess @@ -114,10 +115,12 @@ async fn test_cosigner() { .await .unwrap() { - ProcessorMessage::CosignedBlock { block: signed_block, signature } => { + ProcessorMessage::CosignedBlock { block_number, block: signed_block, signature } => { assert_eq!(signed_block, block); - assert!(Public::from_raw(keys[&participant_one].group_key().to_bytes()) - .verify(&cosign_block_msg(block), &Signature(signature.try_into().unwrap()))); + assert!(Public::from_raw(keys[&participant_one].group_key().to_bytes()).verify( + &cosign_block_msg(block_number, block), + &Signature(signature.try_into().unwrap()) + )); } _ => panic!("didn't get cosigned block back"), }