diff --git a/tests/coordinator/src/lib.rs b/tests/coordinator/src/lib.rs index 699d275c..83a329d6 100644 --- a/tests/coordinator/src/lib.rs +++ b/tests/coordinator/src/lib.rs @@ -1,21 +1,31 @@ #![allow(clippy::needless_pass_by_ref_mut)] // False positives use std::{ - sync::{OnceLock, Mutex}, + sync::{OnceLock, Arc, Mutex}, time::Duration, fs, }; +use tokio::{task::AbortHandle, sync::Mutex as AsyncMutex}; + +use rand_core::{RngCore, OsRng}; + use zeroize::Zeroizing; -use ciphersuite::{group::ff::PrimeField, Ciphersuite, Ristretto}; +use ciphersuite::{ + group::{ff::PrimeField, GroupEncoding}, + Ciphersuite, Ristretto, +}; use serai_client::primitives::NetworkId; -use messages::{CoordinatorMessage, ProcessorMessage}; +use messages::{ + coordinator::{SubstrateSignableId, SubstrateSignId, cosign_block_msg}, + CoordinatorMessage, ProcessorMessage, +}; use serai_message_queue::{Service, Metadata, client::MessageQueue}; -use serai_client::Serai; +use serai_client::{primitives::Signature, Serai}; use dockertest::{ PullPolicy, Image, LogAction, LogPolicy, LogSource, LogOptions, StartPolicy, @@ -143,6 +153,30 @@ pub fn coordinator_stack( ) } +fn is_cosign_message(msg: &CoordinatorMessage) -> bool { + matches!( + msg, + CoordinatorMessage::Coordinator( + messages::coordinator::CoordinatorMessage::CosignSubstrateBlock { .. } + ) + ) || matches!( + msg, + CoordinatorMessage::Coordinator( + messages::coordinator::CoordinatorMessage::SubstratePreprocesses { + id: SubstrateSignId { id: SubstrateSignableId::CosigningSubstrateBlock(_), .. }, + .. + } + ), + ) || matches!( + msg, + CoordinatorMessage::Coordinator(messages::coordinator::CoordinatorMessage::SubstrateShares { + id: SubstrateSignId { id: SubstrateSignableId::CosigningSubstrateBlock(_), .. }, + .. + }), + ) +} + +#[derive(Clone)] pub struct Processor { network: NetworkId, @@ -152,13 +186,23 @@ pub struct Processor { #[allow(unused)] coordinator_handle: String, - next_send_id: u64, - next_recv_id: u64, - queue: MessageQueue, + queue: Arc>, + abort_handle: Option>, + + substrate_key: Arc::F>>>>, +} + +impl Drop for Processor { + fn drop(&mut self) { + if let Some(abort_handle) = self.abort_handle.take() { + abort_handle.abort(); + }; + } } impl Processor { pub async fn new( + raw_i: u8, network: NetworkId, ops: &DockerOperations, handles: (String, String, String), @@ -183,21 +227,147 @@ impl Processor { // The Serai RPC may or may not be started // Assume it is and continue, so if it's a few seconds late, it's still within tolerance - Processor { + let mut res = Processor { network, serai_rpc, message_queue_handle: handles.1, coordinator_handle: handles.2, - next_send_id: 0, - next_recv_id: 0, - queue: MessageQueue::new( - Service::Processor(network), - message_queue_rpc, - Zeroizing::new(processor_key), - ), - } + queue: Arc::new(AsyncMutex::new(( + 0, + 0, + MessageQueue::new( + Service::Processor(network), + message_queue_rpc, + Zeroizing::new(processor_key), + ), + ))), + abort_handle: None, + + substrate_key: Arc::new(AsyncMutex::new(None)), + }; + + // Handle any cosigns which come up + res.abort_handle = Some(Arc::new( + tokio::spawn({ + let mut res = res.clone(); + async move { + loop { + tokio::task::yield_now().await; + + let msg = { + let mut queue_lock = res.queue.lock().await; + let (_, next_recv_id, queue) = &mut *queue_lock; + let Ok(msg) = + tokio::time::timeout(Duration::from_secs(1), queue.next(Service::Coordinator)) + .await + else { + continue; + }; + assert_eq!(msg.from, Service::Coordinator); + assert_eq!(msg.id, *next_recv_id); + + let msg_msg = serde_json::from_slice(&msg.msg).unwrap(); + if !is_cosign_message(&msg_msg) { + continue; + } + queue.ack(Service::Coordinator, msg.id).await; + *next_recv_id += 1; + msg_msg + }; + + struct CurrentCosign { + block_number: u64, + block: [u8; 32], + } + static CURRENT_COSIGN: OnceLock>> = OnceLock::new(); + let mut current_cosign = + CURRENT_COSIGN.get_or_init(|| AsyncMutex::new(None)).lock().await; + match msg { + // If this is a CosignSubstrateBlock, reset the CurrentCosign + // While technically, each processor should individually track the current cosign, + // this is fine for current testing purposes + CoordinatorMessage::Coordinator( + messages::coordinator::CoordinatorMessage::CosignSubstrateBlock { + id, + block_number, + }, + ) => { + let block = match id { + SubstrateSignId { + id: SubstrateSignableId::CosigningSubstrateBlock(block), + .. + } => block, + _ => panic!("CosignSubstrateBlock didn't have CosigningSubstrateBlock ID"), + }; + + let new_cosign = CurrentCosign { block_number, block }; + if current_cosign.is_none() || (current_cosign.as_ref().unwrap().block != block) { + *current_cosign = Some(new_cosign); + } + res + .send_message(messages::coordinator::ProcessorMessage::CosignPreprocess { + id: id.clone(), + preprocesses: vec![vec![raw_i; 64]], + }) + .await; + } + CoordinatorMessage::Coordinator( + messages::coordinator::CoordinatorMessage::SubstratePreprocesses { id, .. }, + ) => { + // TODO: Assert the ID matches CURRENT_COSIGN + // TODO: Verify the received preprocesses + res + .send_message(messages::coordinator::ProcessorMessage::SubstrateShare { + id, + shares: vec![[raw_i; 32]], + }) + .await; + } + CoordinatorMessage::Coordinator( + messages::coordinator::CoordinatorMessage::SubstrateShares { .. }, + ) => { + // TODO: Assert the ID matches CURRENT_COSIGN + // TODO: Verify the shares + + let block_number = current_cosign.as_ref().unwrap().block_number; + let block = current_cosign.as_ref().unwrap().block; + + let substrate_key = res.substrate_key.lock().await.clone().unwrap(); + + // Expand to a key pair as Schnorrkel expects + // It's the private key + 32-bytes of entropy for nonces + the public key + let mut schnorrkel_key_pair = [0; 96]; + schnorrkel_key_pair[.. 32].copy_from_slice(&substrate_key.to_repr()); + OsRng.fill_bytes(&mut schnorrkel_key_pair[32 .. 64]); + schnorrkel_key_pair[64 ..].copy_from_slice( + &(::generator() * *substrate_key).to_bytes(), + ); + let signature = Signature( + schnorrkel::keys::Keypair::from_bytes(&schnorrkel_key_pair) + .unwrap() + .sign_simple(b"substrate", &cosign_block_msg(block_number, block)) + .to_bytes(), + ); + + res + .send_message(messages::coordinator::ProcessorMessage::CosignedBlock { + block_number, + block, + signature: signature.0.to_vec(), + }) + .await; + } + _ => panic!("unexpected message passed is_cosign_message"), + } + } + } + }) + .abort_handle(), + )); + + res } pub async fn serai(&self) -> Serai { @@ -207,8 +377,10 @@ impl Processor { /// Send a message to the coordinator as a processor. pub async fn send_message(&mut self, msg: impl Into) { let msg: ProcessorMessage = msg.into(); - self - .queue + + let mut queue_lock = self.queue.lock().await; + let (next_send_id, _, queue) = &mut *queue_lock; + queue .queue( Metadata { from: Service::Processor(self.network), @@ -218,20 +390,39 @@ impl Processor { serde_json::to_string(&msg).unwrap().into_bytes(), ) .await; - self.next_send_id += 1; + *next_send_id += 1; } /// Receive a message from the coordinator as a processor. pub async fn recv_message(&mut self) -> CoordinatorMessage { - // Set a timeout of an entire 6 minutes as cosigning may be delayed by up to 5 minutes - let msg = - tokio::time::timeout(Duration::from_secs(6 * 60), self.queue.next(Service::Coordinator)) + loop { + tokio::task::yield_now().await; + + let mut queue_lock = self.queue.lock().await; + let (_, next_recv_id, queue) = &mut *queue_lock; + // Set a timeout of an entire 6 minutes as cosigning may be delayed by up to 5 minutes + let msg = tokio::time::timeout(Duration::from_secs(6 * 60), queue.next(Service::Coordinator)) .await .unwrap(); - assert_eq!(msg.from, Service::Coordinator); - assert_eq!(msg.id, self.next_recv_id); - self.queue.ack(Service::Coordinator, msg.id).await; - self.next_recv_id += 1; - serde_json::from_slice(&msg.msg).unwrap() + assert_eq!(msg.from, Service::Coordinator); + assert_eq!(msg.id, *next_recv_id); + + // If this is a cosign message, let the cosign task handle it + let msg_msg = serde_json::from_slice(&msg.msg).unwrap(); + if is_cosign_message(&msg_msg) { + continue; + } + + queue.ack(Service::Coordinator, msg.id).await; + *next_recv_id += 1; + return msg_msg; + } + } + + pub async fn set_substrate_key( + &mut self, + substrate_key: Zeroizing<::F>, + ) { + *self.substrate_key.lock().await = Some(substrate_key); } } diff --git a/tests/coordinator/src/tests/batch.rs b/tests/coordinator/src/tests/batch.rs index 8945716c..21318d7b 100644 --- a/tests/coordinator/src/tests/batch.rs +++ b/tests/coordinator/src/tests/batch.rs @@ -223,9 +223,9 @@ pub async fn batch( // Verify the coordinator sends SubstrateBlock to all processors let last_block = serai.block_by_number(last_serai_block).await.unwrap().unwrap(); - for i in 0 .. processors.len() { + for processor in processors { assert_eq!( - potentially_cosign(processors, i, processor_is, substrate_key).await, + processor.recv_message().await, messages::CoordinatorMessage::Substrate( messages::substrate::CoordinatorMessage::SubstrateBlock { context: SubstrateContext { @@ -241,7 +241,7 @@ pub async fn batch( ); // Send the ack as expected, though it shouldn't trigger any observable behavior - processors[i] + processor .send_message(messages::ProcessorMessage::Coordinator( messages::coordinator::ProcessorMessage::SubstrateBlockAck { network: batch.batch.network, @@ -272,8 +272,10 @@ async fn batch_test() { // Connect to the Message Queues as the processor let mut new_processors: Vec = vec![]; - for (handles, key) in processors { - new_processors.push(Processor::new(NetworkId::Bitcoin, &ops, handles, key).await); + for (i, (handles, key)) in processors.into_iter().enumerate() { + new_processors.push( + Processor::new(i.try_into().unwrap(), NetworkId::Bitcoin, &ops, handles, key).await, + ); } let mut processors = new_processors; diff --git a/tests/coordinator/src/tests/cosign.rs b/tests/coordinator/src/tests/cosign.rs deleted file mode 100644 index 1a36ecda..00000000 --- a/tests/coordinator/src/tests/cosign.rs +++ /dev/null @@ -1,173 +0,0 @@ -use std::collections::{HashSet, HashMap}; - -use zeroize::Zeroizing; -use rand_core::{RngCore, OsRng}; - -use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto}; -use dkg::Participant; - -use serai_client::primitives::Signature; -use messages::{ - coordinator::{SubstrateSignableId, cosign_block_msg}, - CoordinatorMessage, -}; - -use crate::{*, tests::*}; - -pub async fn potentially_cosign( - processors: &mut [Processor], - primary_processor: usize, - processor_is: &[u8], - substrate_key: &Zeroizing<::F>, -) -> CoordinatorMessage { - let msg = processors[primary_processor].recv_message().await; - let messages::CoordinatorMessage::Coordinator( - messages::coordinator::CoordinatorMessage::CosignSubstrateBlock { block_number, id }, - ) = msg.clone() - else { - return msg; - }; - let SubstrateSignableId::CosigningSubstrateBlock(block) = id.id else { - panic!("CosignSubstrateBlock didn't have CosigningSubstrateBlock id") - }; - - for (i, processor) in processors.iter_mut().enumerate() { - if i == primary_processor { - continue; - } - assert_eq!(msg, processor.recv_message().await); - } - - // Select a random participant to exclude, so we know for sure who *is* participating - assert_eq!(COORDINATORS - THRESHOLD, 1); - let excluded_signer = - usize::try_from(OsRng.next_u64() % u64::try_from(processors.len()).unwrap()).unwrap(); - for (i, processor) in processors.iter_mut().enumerate() { - if i == excluded_signer { - continue; - } - - processor - .send_message(messages::coordinator::ProcessorMessage::CosignPreprocess { - id: id.clone(), - preprocesses: vec![[processor_is[i]; 64].to_vec()], - }) - .await; - } - - // Send from the excluded signer so they don't stay stuck - processors[excluded_signer] - .send_message(messages::coordinator::ProcessorMessage::CosignPreprocess { - id: id.clone(), - preprocesses: vec![[processor_is[excluded_signer]; 64].to_vec()], - }) - .await; - - // Read from a known signer to find out who was selected to sign - let known_signer = (excluded_signer + 1) % COORDINATORS; - let first_preprocesses = processors[known_signer].recv_message().await; - let participants = match first_preprocesses { - CoordinatorMessage::Coordinator( - messages::coordinator::CoordinatorMessage::SubstratePreprocesses { - id: this_id, - preprocesses, - }, - ) => { - assert_eq!(&id, &this_id); - assert_eq!(preprocesses.len(), THRESHOLD - 1); - let known_signer_i = Participant::new(u16::from(processor_is[known_signer])).unwrap(); - assert!(!preprocesses.contains_key(&known_signer_i)); - - let mut participants = preprocesses.keys().cloned().collect::>(); - for (p, preprocess) in preprocesses { - assert_eq!(preprocess, vec![u8::try_from(u16::from(p)).unwrap(); 64]); - } - participants.insert(known_signer_i); - participants - } - other => panic!("coordinator didn't send back SubstratePreprocesses: {:?}", other), - }; - - for i in participants.clone() { - if u16::from(i) == u16::from(processor_is[known_signer]) { - continue; - } - - let processor = - &mut processors[processor_is.iter().position(|p_i| u16::from(*p_i) == u16::from(i)).unwrap()]; - let mut preprocesses = participants - .clone() - .into_iter() - .map(|i| (i, [u8::try_from(u16::from(i)).unwrap(); 64].to_vec())) - .collect::>(); - preprocesses.remove(&i); - - assert_eq!( - processor.recv_message().await, - CoordinatorMessage::Coordinator( - messages::coordinator::CoordinatorMessage::SubstratePreprocesses { - id: id.clone(), - preprocesses - } - ) - ); - } - - for i in participants.clone() { - let processor = - &mut processors[processor_is.iter().position(|p_i| u16::from(*p_i) == u16::from(i)).unwrap()]; - processor - .send_message(messages::coordinator::ProcessorMessage::SubstrateShare { - id: id.clone(), - shares: vec![[u8::try_from(u16::from(i)).unwrap(); 32]], - }) - .await; - } - for i in participants.clone() { - let processor = - &mut processors[processor_is.iter().position(|p_i| u16::from(*p_i) == u16::from(i)).unwrap()]; - let mut shares = participants - .clone() - .into_iter() - .map(|i| (i, [u8::try_from(u16::from(i)).unwrap(); 32])) - .collect::>(); - shares.remove(&i); - - assert_eq!( - processor.recv_message().await, - CoordinatorMessage::Coordinator(messages::coordinator::CoordinatorMessage::SubstrateShares { - id: id.clone(), - shares, - }) - ); - } - - // Expand to a key pair as Schnorrkel expects - // It's the private key + 32-bytes of entropy for nonces + the public key - let mut schnorrkel_key_pair = [0; 96]; - schnorrkel_key_pair[.. 32].copy_from_slice(&substrate_key.to_repr()); - OsRng.fill_bytes(&mut schnorrkel_key_pair[32 .. 64]); - schnorrkel_key_pair[64 ..] - .copy_from_slice(&(::generator() * **substrate_key).to_bytes()); - let signature = Signature( - schnorrkel::keys::Keypair::from_bytes(&schnorrkel_key_pair) - .unwrap() - .sign_simple(b"substrate", &cosign_block_msg(block_number, block)) - .to_bytes(), - ); - - for (i, processor) in processors.iter_mut().enumerate() { - if i == excluded_signer { - continue; - } - processor - .send_message(messages::coordinator::ProcessorMessage::CosignedBlock { - block_number, - block, - signature: signature.0.to_vec(), - }) - .await; - } - - processors[primary_processor].recv_message().await -} diff --git a/tests/coordinator/src/tests/key_gen.rs b/tests/coordinator/src/tests/key_gen.rs index 122c6c7f..8c3d54c6 100644 --- a/tests/coordinator/src/tests/key_gen.rs +++ b/tests/coordinator/src/tests/key_gen.rs @@ -20,7 +20,7 @@ use serai_client::{ }; use messages::{key_gen::KeyGenId, CoordinatorMessage}; -use crate::{*, tests::*}; +use crate::tests::*; pub async fn key_gen( processors: &mut [Processor], @@ -208,6 +208,10 @@ pub async fn key_gen( (Public(substrate_key), network_key.try_into().unwrap()) ); + for processor in processors.iter_mut() { + processor.set_substrate_key(substrate_priv_key.clone()).await; + } + ( participant_is.into_iter().map(|i| u8::try_from(u16::from(i)).unwrap()).collect(), substrate_priv_key, @@ -233,8 +237,10 @@ async fn key_gen_test() { // Connect to the Message Queues as the processor let mut new_processors: Vec = vec![]; - for (handles, key) in processors { - new_processors.push(Processor::new(NetworkId::Bitcoin, &ops, handles, key).await); + for (i, (handles, key)) in processors.into_iter().enumerate() { + new_processors.push( + Processor::new(i.try_into().unwrap(), NetworkId::Bitcoin, &ops, handles, key).await, + ); } let mut processors = new_processors; diff --git a/tests/coordinator/src/tests/mod.rs b/tests/coordinator/src/tests/mod.rs index b8fd248c..0e84ec66 100644 --- a/tests/coordinator/src/tests/mod.rs +++ b/tests/coordinator/src/tests/mod.rs @@ -9,9 +9,6 @@ use crate::*; mod key_gen; pub use key_gen::key_gen; -mod cosign; -pub use cosign::potentially_cosign; - mod batch; pub use batch::batch; diff --git a/tests/coordinator/src/tests/sign.rs b/tests/coordinator/src/tests/sign.rs index 7ee0c1e1..8066df83 100644 --- a/tests/coordinator/src/tests/sign.rs +++ b/tests/coordinator/src/tests/sign.rs @@ -26,7 +26,7 @@ use serai_client::{ }; use messages::{coordinator::PlanMeta, sign::SignId, SubstrateContext, CoordinatorMessage}; -use crate::{*, tests::*}; +use crate::tests::*; pub async fn sign( processors: &mut [Processor], @@ -189,8 +189,10 @@ async fn sign_test() { // Connect to the Message Queues as the processor let mut new_processors: Vec = vec![]; - for (handles, key) in processors { - new_processors.push(Processor::new(NetworkId::Bitcoin, &ops, handles, key).await); + for (i, (handles, key)) in processors.into_iter().enumerate() { + new_processors.push( + Processor::new(i.try_into().unwrap(), NetworkId::Bitcoin, &ops, handles, key).await, + ); } let mut processors = new_processors; @@ -328,9 +330,9 @@ async fn sign_test() { let plan_id = plan_id; // We should now get a SubstrateBlock - for i in 0 .. processors.len() { + for processor in &mut processors { assert_eq!( - potentially_cosign(&mut processors, i, &participant_is, &substrate_key).await, + processor.recv_message().await, messages::CoordinatorMessage::Substrate( messages::substrate::CoordinatorMessage::SubstrateBlock { context: SubstrateContext { @@ -346,7 +348,7 @@ async fn sign_test() { ); // Send the ACK, claiming there's a plan to sign - processors[i] + processor .send_message(messages::ProcessorMessage::Coordinator( messages::coordinator::ProcessorMessage::SubstrateBlockAck { network: NetworkId::Bitcoin,