diff --git a/tests/coordinator/src/lib.rs b/tests/coordinator/src/lib.rs index 0c197a92..0541c4fd 100644 --- a/tests/coordinator/src/lib.rs +++ b/tests/coordinator/src/lib.rs @@ -5,7 +5,10 @@ use std::{ time::Duration, }; -use tokio::{task::AbortHandle, sync::Mutex as AsyncMutex}; +use tokio::{ + task::AbortHandle, + sync::{Mutex as AsyncMutex, mpsc}, +}; use rand_core::{RngCore, OsRng}; @@ -96,7 +99,6 @@ pub struct Handles { pub(crate) message_queue: String, } -#[derive(Clone)] pub struct Processor { network: NetworkId, @@ -104,7 +106,8 @@ pub struct Processor { #[allow(unused)] handles: Handles, - queue: Arc>, + msgs: mpsc::UnboundedReceiver, + queue_for_sending: MessageQueue, abort_handle: Option>, substrate_key: Arc::F>>>>, @@ -145,156 +148,173 @@ impl Processor { // The Serai RPC may or may not be started // Assume it is and continue, so if it's a few seconds late, it's still within tolerance + // Create the queue + let mut queue = ( + 0, + Arc::new(MessageQueue::new( + Service::Processor(network), + message_queue_rpc.clone(), + Zeroizing::new(processor_key), + )), + ); + + let (msg_send, msg_recv) = mpsc::unbounded_channel(); + + let substrate_key = Arc::new(AsyncMutex::new(None)); let mut res = Processor { network, serai_rpc, handles, - queue: Arc::new(AsyncMutex::new(( - 0, - 0, - MessageQueue::new( - Service::Processor(network), - message_queue_rpc, - Zeroizing::new(processor_key), - ), - ))), + queue_for_sending: MessageQueue::new( + Service::Processor(network), + message_queue_rpc, + Zeroizing::new(processor_key), + ), + msgs: msg_recv, abort_handle: None, - substrate_key: Arc::new(AsyncMutex::new(None)), + substrate_key: substrate_key.clone(), }; - // Handle any cosigns which come up - res.abort_handle = Some(Arc::new( - tokio::spawn({ - let mut res = res.clone(); - async move { - loop { - tokio::task::yield_now().await; + // Spawn a task to handle cosigns and forward messages as appropriate + let abort_handle = tokio::spawn({ + async move { + loop { + // Get new messages + let (next_recv_id, queue) = &mut queue; + let msg = queue.next(Service::Coordinator).await; + assert_eq!(msg.from, Service::Coordinator); + assert_eq!(msg.id, *next_recv_id); + queue.ack(Service::Coordinator, msg.id).await; + *next_recv_id += 1; - let msg = { - let mut queue_lock = res.queue.lock().await; - let (_, next_recv_id, queue) = &mut *queue_lock; - let Ok(msg) = - tokio::time::timeout(Duration::from_secs(1), queue.next(Service::Coordinator)) - .await - else { - continue; - }; - assert_eq!(msg.from, Service::Coordinator); - assert_eq!(msg.id, *next_recv_id); + let msg_msg = borsh::from_slice(&msg.msg).unwrap(); - let msg_msg = borsh::from_slice(&msg.msg).unwrap(); - // Remove any BatchReattempts clogging the pipe - // TODO: Set up a wrapper around serai-client so we aren't throwing this away yet - // leave it for the tests - if matches!( - msg_msg, - messages::CoordinatorMessage::Coordinator( - messages::coordinator::CoordinatorMessage::BatchReattempt { .. } - ) - ) { - queue.ack(Service::Coordinator, msg.id).await; - *next_recv_id += 1; - continue; - } - if !is_cosign_message(&msg_msg) { - continue; - }; - queue.ack(Service::Coordinator, msg.id).await; - *next_recv_id += 1; - msg_msg - }; + // Remove any BatchReattempts clogging the pipe + // TODO: Set up a wrapper around serai-client so we aren't throwing this away yet + // leave it for the tests + if matches!( + msg_msg, + messages::CoordinatorMessage::Coordinator( + messages::coordinator::CoordinatorMessage::BatchReattempt { .. } + ) + ) { + continue; + } - struct CurrentCosign { - block_number: u64, - block: [u8; 32], - } - static CURRENT_COSIGN: OnceLock>> = OnceLock::new(); - let mut current_cosign = - CURRENT_COSIGN.get_or_init(|| AsyncMutex::new(None)).lock().await; - match msg { - // If this is a CosignSubstrateBlock, reset the CurrentCosign - // While technically, each processor should individually track the current cosign, - // this is fine for current testing purposes - CoordinatorMessage::Coordinator( - messages::coordinator::CoordinatorMessage::CosignSubstrateBlock { - id, - block_number, + if !is_cosign_message(&msg_msg) { + msg_send.send(msg_msg).unwrap(); + continue; + } + let msg = msg_msg; + + let send_message = |msg: ProcessorMessage| async move { + queue + .queue( + Metadata { + from: Service::Processor(network), + to: Service::Coordinator, + intent: msg.intent(), }, - ) => { - let SubstrateSignId { - id: SubstrateSignableId::CosigningSubstrateBlock(block), .. - } = id - else { - panic!("CosignSubstrateBlock didn't have CosigningSubstrateBlock ID") - }; + borsh::to_vec(&msg).unwrap(), + ) + .await; + }; - let new_cosign = CurrentCosign { block_number, block }; - if current_cosign.is_none() || (current_cosign.as_ref().unwrap().block != block) { - *current_cosign = Some(new_cosign); + struct CurrentCosign { + block_number: u64, + block: [u8; 32], + } + static CURRENT_COSIGN: OnceLock>> = OnceLock::new(); + let mut current_cosign = + CURRENT_COSIGN.get_or_init(|| AsyncMutex::new(None)).lock().await; + match msg { + // If this is a CosignSubstrateBlock, reset the CurrentCosign + // While technically, each processor should individually track the current cosign, + // this is fine for current testing purposes + CoordinatorMessage::Coordinator( + messages::coordinator::CoordinatorMessage::CosignSubstrateBlock { id, block_number }, + ) => { + let SubstrateSignId { + id: SubstrateSignableId::CosigningSubstrateBlock(block), .. + } = id + else { + panic!("CosignSubstrateBlock didn't have CosigningSubstrateBlock ID") + }; + + let new_cosign = CurrentCosign { block_number, block }; + if current_cosign.is_none() || (current_cosign.as_ref().unwrap().block != block) { + *current_cosign = Some(new_cosign); + } + send_message( + messages::coordinator::ProcessorMessage::CosignPreprocess { + id: id.clone(), + preprocesses: vec![[raw_i; 64]], } - res - .send_message(messages::coordinator::ProcessorMessage::CosignPreprocess { - id: id.clone(), - preprocesses: vec![[raw_i; 64]], - }) - .await; - } - CoordinatorMessage::Coordinator( - messages::coordinator::CoordinatorMessage::SubstratePreprocesses { id, .. }, - ) => { - // TODO: Assert the ID matches CURRENT_COSIGN - // TODO: Verify the received preprocesses - res - .send_message(messages::coordinator::ProcessorMessage::SubstrateShare { - id, - shares: vec![[raw_i; 32]], - }) - .await; - } - CoordinatorMessage::Coordinator( - messages::coordinator::CoordinatorMessage::SubstrateShares { .. }, - ) => { - // TODO: Assert the ID matches CURRENT_COSIGN - // TODO: Verify the shares - - let block_number = current_cosign.as_ref().unwrap().block_number; - let block = current_cosign.as_ref().unwrap().block; - - let substrate_key = res.substrate_key.lock().await.clone().unwrap(); - - // Expand to a key pair as Schnorrkel expects - // It's the private key + 32-bytes of entropy for nonces + the public key - let mut schnorrkel_key_pair = [0; 96]; - schnorrkel_key_pair[.. 32].copy_from_slice(&substrate_key.to_repr()); - OsRng.fill_bytes(&mut schnorrkel_key_pair[32 .. 64]); - schnorrkel_key_pair[64 ..].copy_from_slice( - &(::generator() * *substrate_key).to_bytes(), - ); - let signature = Signature( - schnorrkel::keys::Keypair::from_bytes(&schnorrkel_key_pair) - .unwrap() - .sign_simple(b"substrate", &cosign_block_msg(block_number, block)) - .to_bytes(), - ); - - res - .send_message(messages::coordinator::ProcessorMessage::CosignedBlock { - block_number, - block, - signature: signature.0.to_vec(), - }) - .await; - } - _ => panic!("unexpected message passed is_cosign_message"), + .into(), + ) + .await; } + CoordinatorMessage::Coordinator( + messages::coordinator::CoordinatorMessage::SubstratePreprocesses { id, .. }, + ) => { + // TODO: Assert the ID matches CURRENT_COSIGN + // TODO: Verify the received preprocesses + send_message( + messages::coordinator::ProcessorMessage::SubstrateShare { + id, + shares: vec![[raw_i; 32]], + } + .into(), + ) + .await; + } + CoordinatorMessage::Coordinator( + messages::coordinator::CoordinatorMessage::SubstrateShares { .. }, + ) => { + // TODO: Assert the ID matches CURRENT_COSIGN + // TODO: Verify the shares + + let block_number = current_cosign.as_ref().unwrap().block_number; + let block = current_cosign.as_ref().unwrap().block; + + let substrate_key = substrate_key.lock().await.clone().unwrap(); + + // Expand to a key pair as Schnorrkel expects + // It's the private key + 32-bytes of entropy for nonces + the public key + let mut schnorrkel_key_pair = [0; 96]; + schnorrkel_key_pair[.. 32].copy_from_slice(&substrate_key.to_repr()); + OsRng.fill_bytes(&mut schnorrkel_key_pair[32 .. 64]); + schnorrkel_key_pair[64 ..].copy_from_slice( + &(::generator() * *substrate_key).to_bytes(), + ); + let signature = Signature( + schnorrkel::keys::Keypair::from_bytes(&schnorrkel_key_pair) + .unwrap() + .sign_simple(b"substrate", &cosign_block_msg(block_number, block)) + .to_bytes(), + ); + + send_message( + messages::coordinator::ProcessorMessage::CosignedBlock { + block_number, + block, + signature: signature.0.to_vec(), + } + .into(), + ) + .await; + } + _ => panic!("unexpected message passed is_cosign_message"), } } - }) - .abort_handle(), - )); + } + }) + .abort_handle(); + + res.abort_handle = Some(Arc::new(abort_handle)); res } @@ -307,9 +327,8 @@ impl Processor { pub async fn send_message(&mut self, msg: impl Into) { let msg: ProcessorMessage = msg.into(); - let mut queue_lock = self.queue.lock().await; - let (next_send_id, _, queue) = &mut *queue_lock; - queue + self + .queue_for_sending .queue( Metadata { from: Service::Processor(self.network), @@ -319,36 +338,13 @@ impl Processor { borsh::to_vec(&msg).unwrap(), ) .await; - *next_send_id += 1; - } - - async fn recv_message_inner(&mut self) -> CoordinatorMessage { - loop { - tokio::task::yield_now().await; - - let mut queue_lock = self.queue.lock().await; - let (_, next_recv_id, queue) = &mut *queue_lock; - let msg = queue.next(Service::Coordinator).await; - assert_eq!(msg.from, Service::Coordinator); - assert_eq!(msg.id, *next_recv_id); - - // If this is a cosign message, let the cosign task handle it - let msg_msg = borsh::from_slice(&msg.msg).unwrap(); - if is_cosign_message(&msg_msg) { - continue; - } - - queue.ack(Service::Coordinator, msg.id).await; - *next_recv_id += 1; - return msg_msg; - } } /// Receive a message from the coordinator as a processor. pub async fn recv_message(&mut self) -> CoordinatorMessage { // Set a timeout of 20 minutes to allow effectively any protocol to occur without a fear of // an arbitrary timeout cutting it short - tokio::time::timeout(Duration::from_secs(20 * 60), self.recv_message_inner()).await.unwrap() + tokio::time::timeout(Duration::from_secs(20 * 60), self.msgs.recv()).await.unwrap().unwrap() } pub async fn set_substrate_key( diff --git a/tests/coordinator/src/tests/batch.rs b/tests/coordinator/src/tests/batch.rs index 67bafa24..ebba957b 100644 --- a/tests/coordinator/src/tests/batch.rs +++ b/tests/coordinator/src/tests/batch.rs @@ -245,7 +245,7 @@ pub async fn batch( ) ); - // Send the ack as expected, though it shouldn't trigger any observable behavior + // Send the ack as expected processor .send_message(messages::ProcessorMessage::Coordinator( messages::coordinator::ProcessorMessage::SubstrateBlockAck {