diff --git a/coordinator/src/db.rs b/coordinator/src/db.rs index 90df8192..5784a5ff 100644 --- a/coordinator/src/db.rs +++ b/coordinator/src/db.rs @@ -34,14 +34,20 @@ impl MainDb { getter.get(Self::handled_message_key(network, id)).is_some() } - fn acive_tributaries_key() -> Vec { + fn in_tributary_key(set: ValidatorSet) -> Vec { + Self::main_key(b"in_tributary", set.encode()) + } + fn active_tributaries_key() -> Vec { Self::main_key(b"active_tributaries", []) } fn retired_tributary_key(set: ValidatorSet) -> Vec { Self::main_key(b"retired_tributary", set.encode()) } + pub fn in_tributary(getter: &G, set: ValidatorSet) -> bool { + getter.get(Self::in_tributary_key(set)).is_some() + } pub fn active_tributaries(getter: &G) -> (Vec, Vec) { - let bytes = getter.get(Self::acive_tributaries_key()).unwrap_or(vec![]); + let bytes = getter.get(Self::active_tributaries_key()).unwrap_or(vec![]); let mut bytes_ref: &[u8] = bytes.as_ref(); let mut tributaries = vec![]; @@ -52,7 +58,9 @@ impl MainDb { (bytes, tributaries) } pub fn add_active_tributary(txn: &mut D::Transaction<'_>, spec: &TributarySpec) { - let key = Self::acive_tributaries_key(); + txn.put(Self::in_tributary_key(spec.set()), []); + + let key = Self::active_tributaries_key(); let (mut existing_bytes, existing) = Self::active_tributaries(txn); for tributary in &existing { if tributary == spec { @@ -76,7 +84,7 @@ impl MainDb { for active in active { active.write(&mut bytes).unwrap(); } - txn.put(Self::acive_tributaries_key(), bytes); + txn.put(Self::active_tributaries_key(), bytes); txn.put(Self::retired_tributary_key(set), []); } pub fn is_tributary_retired(getter: &G, set: ValidatorSet) -> bool { diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 7a3451fd..4b86d80c 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -2,7 +2,7 @@ use core::ops::Deref; use std::{ sync::Arc, time::Duration, - collections::{VecDeque, HashMap}, + collections::{VecDeque, HashSet, HashMap}, }; use zeroize::{Zeroize, Zeroizing}; @@ -204,12 +204,33 @@ async fn handle_processor_message( "processor claimed to be a different network than it was for SubstrateBlockAck", ); - // TODO: Find all Tributaries active at this Substrate block, and make sure we have - // them all (if we were present in them) + // Get the sessions for these keys + let keys = plans.iter().map(|plan| plan.key.clone()).collect::>(); + let mut sessions = vec![]; + for key in keys { + let session = SubstrateDb::::session_for_key(&txn, &key).unwrap(); + // Only keep them if we're in the Tributary AND they haven't been retied + let set = ValidatorSet { network: *network, session }; + if MainDb::::in_tributary(&txn, set) && (!MainDb::::is_tributary_retired(&txn, set)) + { + sessions.push((session, key)); + } + } - for tributary in tributaries.values() { - // TODO: This needs to be scoped per multisig - TributaryDb::::set_plan_ids(&mut txn, tributary.spec.genesis(), *block, plans); + // Ensure we have the Tributaries + for (session, _) in &sessions { + if !tributaries.contains_key(session) { + return false; + } + } + + for (session, key) in sessions { + let tributary = &tributaries[&session]; + let plans = plans + .iter() + .filter_map(|plan| Some(plan.id).filter(|_| plan.key == key)) + .collect::>(); + TributaryDb::::set_plan_ids(&mut txn, tributary.spec.genesis(), *block, &plans); let tx = Transaction::SubstrateBlock(*block); log::trace!("processor message effected transaction {}", hex::encode(tx.hash())); diff --git a/processor/messages/src/lib.rs b/processor/messages/src/lib.rs index 01ab54cf..dbb139c0 100644 --- a/processor/messages/src/lib.rs +++ b/processor/messages/src/lib.rs @@ -139,9 +139,15 @@ pub mod coordinator { } } + #[derive(Clone, PartialEq, Eq, Debug, Zeroize, Encode, Decode, Serialize, Deserialize)] + pub struct PlanMeta { + pub key: Vec, + pub id: [u8; 32], + } + #[derive(Clone, PartialEq, Eq, Debug, Zeroize, Encode, Decode, Serialize, Deserialize)] pub enum ProcessorMessage { - SubstrateBlockAck { network: NetworkId, block: u64, plans: Vec<[u8; 32]> }, + SubstrateBlockAck { network: NetworkId, block: u64, plans: Vec }, BatchPreprocess { id: SignId, block: BlockHash, preprocess: Vec }, BatchShare { id: SignId, share: [u8; 32] }, } diff --git a/processor/src/main.rs b/processor/src/main.rs index 471d9328..18a1428c 100644 --- a/processor/src/main.rs +++ b/processor/src/main.rs @@ -13,7 +13,7 @@ use serai_client::{ validator_sets::primitives::{ValidatorSet, KeyPair}, }; -use messages::CoordinatorMessage; +use messages::{coordinator::PlanMeta, CoordinatorMessage}; use serai_env as env; @@ -344,7 +344,13 @@ async fn handle_coordinator_msg( .send(messages::coordinator::ProcessorMessage::SubstrateBlockAck { network: N::NETWORK, block: substrate_block, - plans: to_sign.iter().map(|signable| signable.1).collect(), + plans: to_sign + .iter() + .map(|signable| PlanMeta { + key: signable.0.to_bytes().as_ref().to_vec(), + id: signable.1, + }) + .collect(), }) .await; } diff --git a/tests/coordinator/src/tests/sign.rs b/tests/coordinator/src/tests/sign.rs index c7a61e91..3afac8af 100644 --- a/tests/coordinator/src/tests/sign.rs +++ b/tests/coordinator/src/tests/sign.rs @@ -24,7 +24,7 @@ use serai_client::{ in_instructions::primitives::{InInstruction, InInstructionWithBalance, Batch}, SeraiCoins, }; -use messages::{sign::SignId, SubstrateContext, CoordinatorMessage}; +use messages::{coordinator::PlanMeta, sign::SignId, SubstrateContext, CoordinatorMessage}; use crate::{*, tests::*}; @@ -346,7 +346,10 @@ async fn sign_test() { messages::coordinator::ProcessorMessage::SubstrateBlockAck { network: NetworkId::Bitcoin, block: last_serai_block.number(), - plans: vec![plan_id], + plans: vec![PlanMeta { + key: (Secp256k1::generator() * *network_key).to_bytes().to_vec(), + id: plan_id, + }], }, )) .await; diff --git a/tests/processor/src/tests/batch.rs b/tests/processor/src/tests/batch.rs index b7883664..f0ccb3c6 100644 --- a/tests/processor/src/tests/batch.rs +++ b/tests/processor/src/tests/batch.rs @@ -5,7 +5,7 @@ use std::{ use dkg::{Participant, tests::clone_without}; -use messages::{sign::SignId, SubstrateContext}; +use messages::{coordinator::PlanMeta, sign::SignId, SubstrateContext}; use serai_client::{ primitives::{BlockHash, crypto::RuntimePublic, PublicKey, SeraiAddress, NetworkId}, @@ -155,7 +155,7 @@ pub(crate) async fn sign_batch( pub(crate) async fn substrate_block( coordinator: &mut Coordinator, block: messages::substrate::CoordinatorMessage, -) -> Vec<[u8; 32]> { +) -> Vec { match block.clone() { messages::substrate::CoordinatorMessage::SubstrateBlock { context: _, diff --git a/tests/processor/src/tests/send.rs b/tests/processor/src/tests/send.rs index 268692b9..15826279 100644 --- a/tests/processor/src/tests/send.rs +++ b/tests/processor/src/tests/send.rs @@ -236,7 +236,7 @@ fn send_test() { let (mut id, mut preprocesses) = recv_sign_preprocesses(&mut coordinators, key_pair.1.to_vec(), 0).await; // TODO: Should this use the Substrate key? - assert_eq!(id, SignId { key: key_pair.1.to_vec(), id: plans[0], attempt: 0 }); + assert_eq!(id, SignId { key: key_pair.1.to_vec(), id: plans[0].id, attempt: 0 }); // Trigger a random amount of re-attempts for attempt in 1 ..= u32::try_from(OsRng.next_u64() % 4).unwrap() {