use std::sync::{OnceLock, Mutex}; use zeroize::Zeroizing; use rand_core::{RngCore, OsRng}; use ciphersuite::{group::ff::PrimeField, Ciphersuite, Ristretto}; use serai_primitives::NetworkId; use messages::{ProcessorMessage, CoordinatorMessage}; use serai_message_queue::{Service, Metadata, client::MessageQueue}; use dockertest::{ PullPolicy, Image, LogAction, LogPolicy, LogSource, LogOptions, StartPolicy, Composition, DockerOperations, }; mod networks; pub use networks::*; #[cfg(test)] mod tests; static UNIQUE_ID: OnceLock> = OnceLock::new(); pub fn processor_instance( network: NetworkId, port: u32, message_queue_key: ::F, ) -> Composition { serai_docker_tests::build("processor".to_string()); let mut entropy = [0; 32]; OsRng.fill_bytes(&mut entropy); Composition::with_image( Image::with_repository("serai-dev-processor").pull_policy(PullPolicy::Never), ) .with_env( [ ("MESSAGE_QUEUE_KEY".to_string(), hex::encode(message_queue_key.to_repr())), ("ENTROPY".to_string(), hex::encode(entropy)), ( "NETWORK".to_string(), (match network { NetworkId::Serai => panic!("starting a processor for Serai"), NetworkId::Bitcoin => "bitcoin", NetworkId::Ethereum => "ethereum", NetworkId::Monero => "monero", }) .to_string(), ), ("NETWORK_RPC_LOGIN".to_string(), format!("{RPC_USER}:{RPC_PASS}")), ("NETWORK_RPC_PORT".to_string(), port.to_string()), ("DB_PATH".to_string(), "./processor-db".to_string()), ] .into(), ) } pub fn processor_stack( network: NetworkId, ) -> ((String, String, String), ::F, Vec) { let (network_composition, network_rpc_port) = network_instance(network); let (coord_key, message_queue_keys, message_queue_composition) = serai_message_queue_tests::instance(); let processor_composition = processor_instance(network, network_rpc_port, message_queue_keys[&network]); // Give every item in this stack a unique ID // Uses a Mutex as we can't generate a 8-byte random ID without hitting hostname length limits let unique_id = { let unique_id_mutex = UNIQUE_ID.get_or_init(|| Mutex::new(0)); let mut unique_id_lock = unique_id_mutex.lock().unwrap(); let unique_id = hex::encode(unique_id_lock.to_be_bytes()); *unique_id_lock += 1; unique_id }; let mut compositions = vec![]; let mut handles = vec![]; for composition in [network_composition, message_queue_composition, processor_composition] { let handle = composition.handle(); compositions.push( composition .with_start_policy(StartPolicy::Strict) .with_container_name(format!("{handle}-{}", &unique_id)) .with_log_options(Some(LogOptions { action: LogAction::Forward, policy: if handle.contains("processor") { LogPolicy::Always } else { LogPolicy::OnError }, source: LogSource::Both, })), ); handles.push(compositions.last().unwrap().handle()); } let processor_composition = compositions.last_mut().unwrap(); processor_composition.inject_container_name(handles.remove(0), "NETWORK_RPC_HOSTNAME"); processor_composition.inject_container_name(handles.remove(0), "MESSAGE_QUEUE_RPC"); ( (compositions[0].handle(), compositions[1].handle(), compositions[2].handle()), coord_key, compositions, ) } #[derive(serde::Deserialize, Debug)] struct EmptyResponse {} pub struct Coordinator { network: NetworkId, network_handle: String, #[allow(unused)] message_queue_handle: String, #[allow(unused)] processor_handle: String, next_send_id: u64, next_recv_id: u64, queue: MessageQueue, } impl Coordinator { pub fn new( network: NetworkId, ops: &DockerOperations, handles: (String, String, String), coord_key: ::F, ) -> Coordinator { let rpc = ops.handle(&handles.1).host_port(2287).unwrap(); let rpc = rpc.0.to_string() + ":" + &rpc.1.to_string(); Coordinator { network, network_handle: handles.0, message_queue_handle: handles.1, processor_handle: handles.2, next_send_id: 0, next_recv_id: 0, queue: MessageQueue::new(Service::Coordinator, rpc, Zeroizing::new(coord_key)), } } /// Send a message to a processor as its coordinator. pub async fn send_message(&mut self, msg: impl Into) { let msg: CoordinatorMessage = msg.into(); self .queue .queue( Metadata { from: Service::Coordinator, to: Service::Processor(self.network), intent: msg.intent(), }, serde_json::to_string(&msg).unwrap().into_bytes(), ) .await; self.next_send_id += 1; } /// Receive a message from a processor as its coordinator. pub async fn recv_message(&mut self) -> ProcessorMessage { let msg = tokio::time::timeout(core::time::Duration::from_secs(10), self.queue.next(self.next_recv_id)) .await .unwrap(); assert_eq!(msg.from, Service::Processor(self.network)); assert_eq!(msg.id, self.next_recv_id); self.queue.ack(self.next_recv_id).await; self.next_recv_id += 1; serde_json::from_slice(&msg.msg).unwrap() } pub async fn add_block(&self, ops: &DockerOperations) -> ([u8; 32], Vec) { let rpc_url = network_rpc(self.network, ops, &self.network_handle); match self.network { NetworkId::Bitcoin => { use bitcoin_serai::{ bitcoin::{consensus::Encodable, network::constants::Network, Script, Address}, rpc::Rpc, }; // Mine a block let rpc = Rpc::new(rpc_url).await.expect("couldn't connect to the Bitcoin RPC"); rpc .rpc_call::>( "generatetoaddress", serde_json::json!([1, Address::p2sh(Script::empty(), Network::Regtest).unwrap()]), ) .await .unwrap(); // Get it so we can return it let hash = rpc.get_block_hash(rpc.get_latest_block_number().await.unwrap()).await.unwrap(); let block = rpc.get_block(&hash).await.unwrap(); let mut block_buf = vec![]; block.consensus_encode(&mut block_buf).unwrap(); (hash, block_buf) } NetworkId::Ethereum => todo!(), NetworkId::Monero => { use curve25519_dalek::{constants::ED25519_BASEPOINT_POINT, scalar::Scalar}; use monero_serai::{ wallet::{ ViewPair, address::{Network, AddressSpec}, }, rpc::HttpRpc, }; let rpc = HttpRpc::new(rpc_url).expect("couldn't connect to the Monero RPC"); let _: EmptyResponse = rpc .json_rpc_call( "generateblocks", Some(serde_json::json!({ "wallet_address": ViewPair::new( ED25519_BASEPOINT_POINT, Zeroizing::new(Scalar::one()), ).address(Network::Mainnet, AddressSpec::Standard).to_string(), "amount_of_blocks": 1, })), ) .await .unwrap(); let hash = rpc.get_block_hash(rpc.get_height().await.unwrap() - 1).await.unwrap(); (hash, rpc.get_block(hash).await.unwrap().serialize()) } NetworkId::Serai => panic!("processor tests adding block to Serai"), } } pub async fn broadcast_block(&self, ops: &DockerOperations, block: &[u8]) { let rpc_url = network_rpc(self.network, ops, &self.network_handle); match self.network { NetworkId::Bitcoin => { use bitcoin_serai::rpc::Rpc; let rpc = Rpc::new(rpc_url).await.expect("couldn't connect to the coordinator's Bitcoin RPC"); let res: Option = rpc.rpc_call("submitblock", serde_json::json!([hex::encode(block)])).await.unwrap(); if let Some(err) = res { panic!("submitblock failed: {}", err); } } NetworkId::Ethereum => todo!(), NetworkId::Monero => { use monero_serai::rpc::HttpRpc; let rpc = HttpRpc::new(rpc_url).expect("couldn't connect to the coordinator's Monero RPC"); let res: serde_json::Value = rpc .json_rpc_call("submit_block", Some(serde_json::json!([hex::encode(block)]))) .await .unwrap(); let err = res.get("error"); if err.is_some() && (err.unwrap() != &serde_json::Value::Null) { panic!("failed to submit Monero block: {res}"); } } NetworkId::Serai => panic!("processor tests broadcasting block to Serai"), } } pub async fn sync(&self, ops: &DockerOperations, others: &[Coordinator]) { let rpc_url = network_rpc(self.network, ops, &self.network_handle); match self.network { NetworkId::Bitcoin => { use bitcoin_serai::{bitcoin::consensus::Encodable, rpc::Rpc}; let rpc = Rpc::new(rpc_url).await.expect("couldn't connect to the Bitcoin RPC"); let to = rpc.get_latest_block_number().await.unwrap(); for coordinator in others { let from = Rpc::new(network_rpc(self.network, ops, &coordinator.network_handle)) .await .expect("couldn't connect to the Bitcoin RPC") .get_latest_block_number() .await .unwrap() + 1; for b in from ..= to { let mut buf = vec![]; rpc .get_block(&rpc.get_block_hash(b).await.unwrap()) .await .unwrap() .consensus_encode(&mut buf) .unwrap(); coordinator.broadcast_block(ops, &buf).await; } } } NetworkId::Ethereum => todo!(), NetworkId::Monero => { use monero_serai::rpc::HttpRpc; let rpc = HttpRpc::new(rpc_url).expect("couldn't connect to the Monero RPC"); let to = rpc.get_height().await.unwrap(); for coordinator in others { let from = HttpRpc::new(network_rpc(self.network, ops, &coordinator.network_handle)) .expect("couldn't connect to the Monero RPC") .get_height() .await .unwrap(); for b in from .. to { coordinator .broadcast_block( ops, &rpc.get_block(rpc.get_block_hash(b).await.unwrap()).await.unwrap().serialize(), ) .await; } } } NetworkId::Serai => panic!("processors tests syncing Serai nodes"), } } pub async fn publish_transacton(&self, ops: &DockerOperations, tx: &[u8]) { let rpc_url = network_rpc(self.network, ops, &self.network_handle); match self.network { NetworkId::Bitcoin => { use bitcoin_serai::rpc::Rpc; let rpc = Rpc::new(rpc_url).await.expect("couldn't connect to the coordinator's Bitcoin RPC"); let _: String = rpc.rpc_call("sendrawtransaction", serde_json::json!([hex::encode(tx)])).await.unwrap(); } NetworkId::Ethereum => todo!(), NetworkId::Monero => { use monero_serai::{transaction::Transaction, rpc::HttpRpc}; let rpc = HttpRpc::new(rpc_url).expect("couldn't connect to the coordinator's Monero RPC"); rpc.publish_transaction(&Transaction::read(&mut &*tx).unwrap()).await.unwrap(); } NetworkId::Serai => panic!("processor tests broadcasting block to Serai"), } } }