mirror of
https://github.com/serai-dex/serai.git
synced 2025-03-15 16:12:44 +00:00
Stops work where it does to the processor panickinng for Monero, yet not Bitcoin, under what's present. Cleans up processor tests to consolidate shared code.
344 lines
11 KiB
Rust
344 lines
11 KiB
Rust
use std::sync::{OnceLock, Mutex};
|
|
|
|
use zeroize::Zeroizing;
|
|
use rand_core::{RngCore, OsRng};
|
|
|
|
use ciphersuite::{group::ff::PrimeField, Ciphersuite, Ristretto};
|
|
|
|
use serai_client::primitives::NetworkId;
|
|
use messages::{ProcessorMessage, CoordinatorMessage};
|
|
use serai_message_queue::{Service, Metadata, client::MessageQueue};
|
|
|
|
use dockertest::{
|
|
PullPolicy, Image, LogAction, LogPolicy, LogSource, LogOptions, StartPolicy, Composition,
|
|
DockerOperations,
|
|
};
|
|
|
|
mod networks;
|
|
pub use networks::*;
|
|
|
|
#[cfg(test)]
|
|
mod tests;
|
|
|
|
static UNIQUE_ID: OnceLock<Mutex<u16>> = OnceLock::new();
|
|
|
|
pub fn processor_instance(
|
|
network: NetworkId,
|
|
port: u32,
|
|
message_queue_key: <Ristretto as Ciphersuite>::F,
|
|
) -> Composition {
|
|
serai_docker_tests::build("processor".to_string());
|
|
|
|
let mut entropy = [0; 32];
|
|
OsRng.fill_bytes(&mut entropy);
|
|
|
|
Composition::with_image(
|
|
Image::with_repository("serai-dev-processor").pull_policy(PullPolicy::Never),
|
|
)
|
|
.with_env(
|
|
[
|
|
("MESSAGE_QUEUE_KEY".to_string(), hex::encode(message_queue_key.to_repr())),
|
|
("ENTROPY".to_string(), hex::encode(entropy)),
|
|
(
|
|
"NETWORK".to_string(),
|
|
(match network {
|
|
NetworkId::Serai => panic!("starting a processor for Serai"),
|
|
NetworkId::Bitcoin => "bitcoin",
|
|
NetworkId::Ethereum => "ethereum",
|
|
NetworkId::Monero => "monero",
|
|
})
|
|
.to_string(),
|
|
),
|
|
("NETWORK_RPC_LOGIN".to_string(), format!("{RPC_USER}:{RPC_PASS}")),
|
|
("NETWORK_RPC_PORT".to_string(), port.to_string()),
|
|
("DB_PATH".to_string(), "./processor-db".to_string()),
|
|
]
|
|
.into(),
|
|
)
|
|
}
|
|
|
|
pub type Handles = (String, String, String);
|
|
pub fn processor_stack(
|
|
network: NetworkId,
|
|
) -> (Handles, <Ristretto as Ciphersuite>::F, Vec<Composition>) {
|
|
let (network_composition, network_rpc_port) = network_instance(network);
|
|
|
|
let (coord_key, message_queue_keys, message_queue_composition) =
|
|
serai_message_queue_tests::instance();
|
|
|
|
let processor_composition =
|
|
processor_instance(network, network_rpc_port, message_queue_keys[&network]);
|
|
|
|
// Give every item in this stack a unique ID
|
|
// Uses a Mutex as we can't generate a 8-byte random ID without hitting hostname length limits
|
|
let unique_id = {
|
|
let unique_id_mutex = UNIQUE_ID.get_or_init(|| Mutex::new(0));
|
|
let mut unique_id_lock = unique_id_mutex.lock().unwrap();
|
|
let unique_id = hex::encode(unique_id_lock.to_be_bytes());
|
|
*unique_id_lock += 1;
|
|
unique_id
|
|
};
|
|
|
|
let mut compositions = vec![];
|
|
let mut handles = vec![];
|
|
for composition in [network_composition, message_queue_composition, processor_composition] {
|
|
let handle = composition.handle();
|
|
compositions.push(
|
|
composition
|
|
.with_start_policy(StartPolicy::Strict)
|
|
.with_container_name(format!("{handle}-{}", &unique_id))
|
|
.with_log_options(Some(LogOptions {
|
|
action: LogAction::Forward,
|
|
policy: if handle.contains("processor") { LogPolicy::Always } else { LogPolicy::OnError },
|
|
source: LogSource::Both,
|
|
})),
|
|
);
|
|
handles.push(compositions.last().unwrap().handle());
|
|
}
|
|
|
|
let processor_composition = compositions.last_mut().unwrap();
|
|
processor_composition.inject_container_name(handles.remove(0), "NETWORK_RPC_HOSTNAME");
|
|
processor_composition.inject_container_name(handles.remove(0), "MESSAGE_QUEUE_RPC");
|
|
|
|
(
|
|
(compositions[0].handle(), compositions[1].handle(), compositions[2].handle()),
|
|
coord_key,
|
|
compositions,
|
|
)
|
|
}
|
|
|
|
#[derive(serde::Deserialize, Debug)]
|
|
struct EmptyResponse {}
|
|
|
|
pub struct Coordinator {
|
|
network: NetworkId,
|
|
|
|
network_handle: String,
|
|
#[allow(unused)]
|
|
message_queue_handle: String,
|
|
#[allow(unused)]
|
|
processor_handle: String,
|
|
|
|
next_send_id: u64,
|
|
next_recv_id: u64,
|
|
queue: MessageQueue,
|
|
}
|
|
|
|
impl Coordinator {
|
|
pub fn new(
|
|
network: NetworkId,
|
|
ops: &DockerOperations,
|
|
handles: (String, String, String),
|
|
coord_key: <Ristretto as Ciphersuite>::F,
|
|
) -> Coordinator {
|
|
let rpc = ops.handle(&handles.1).host_port(2287).unwrap();
|
|
let rpc = rpc.0.to_string() + ":" + &rpc.1.to_string();
|
|
Coordinator {
|
|
network,
|
|
|
|
network_handle: handles.0,
|
|
message_queue_handle: handles.1,
|
|
processor_handle: handles.2,
|
|
|
|
next_send_id: 0,
|
|
next_recv_id: 0,
|
|
queue: MessageQueue::new(Service::Coordinator, rpc, Zeroizing::new(coord_key)),
|
|
}
|
|
}
|
|
|
|
/// Send a message to a processor as its coordinator.
|
|
pub async fn send_message(&mut self, msg: impl Into<CoordinatorMessage>) {
|
|
let msg: CoordinatorMessage = msg.into();
|
|
self
|
|
.queue
|
|
.queue(
|
|
Metadata {
|
|
from: Service::Coordinator,
|
|
to: Service::Processor(self.network),
|
|
intent: msg.intent(),
|
|
},
|
|
serde_json::to_string(&msg).unwrap().into_bytes(),
|
|
)
|
|
.await;
|
|
self.next_send_id += 1;
|
|
}
|
|
|
|
/// Receive a message from a processor as its coordinator.
|
|
pub async fn recv_message(&mut self) -> ProcessorMessage {
|
|
let msg =
|
|
tokio::time::timeout(core::time::Duration::from_secs(10), self.queue.next(self.next_recv_id))
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(msg.from, Service::Processor(self.network));
|
|
assert_eq!(msg.id, self.next_recv_id);
|
|
self.queue.ack(self.next_recv_id).await;
|
|
self.next_recv_id += 1;
|
|
serde_json::from_slice(&msg.msg).unwrap()
|
|
}
|
|
|
|
pub async fn add_block(&self, ops: &DockerOperations) -> ([u8; 32], Vec<u8>) {
|
|
let rpc_url = network_rpc(self.network, ops, &self.network_handle);
|
|
match self.network {
|
|
NetworkId::Bitcoin => {
|
|
use bitcoin_serai::{
|
|
bitcoin::{consensus::Encodable, network::constants::Network, Script, Address},
|
|
rpc::Rpc,
|
|
};
|
|
|
|
// Mine a block
|
|
let rpc = Rpc::new(rpc_url).await.expect("couldn't connect to the Bitcoin RPC");
|
|
rpc
|
|
.rpc_call::<Vec<String>>(
|
|
"generatetoaddress",
|
|
serde_json::json!([1, Address::p2sh(Script::empty(), Network::Regtest).unwrap()]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
// Get it so we can return it
|
|
let hash = rpc.get_block_hash(rpc.get_latest_block_number().await.unwrap()).await.unwrap();
|
|
let block = rpc.get_block(&hash).await.unwrap();
|
|
let mut block_buf = vec![];
|
|
block.consensus_encode(&mut block_buf).unwrap();
|
|
(hash, block_buf)
|
|
}
|
|
NetworkId::Ethereum => todo!(),
|
|
NetworkId::Monero => {
|
|
use curve25519_dalek::{constants::ED25519_BASEPOINT_POINT, scalar::Scalar};
|
|
use monero_serai::{
|
|
wallet::{
|
|
ViewPair,
|
|
address::{Network, AddressSpec},
|
|
},
|
|
rpc::HttpRpc,
|
|
};
|
|
|
|
let rpc = HttpRpc::new(rpc_url).expect("couldn't connect to the Monero RPC");
|
|
let _: EmptyResponse = rpc
|
|
.json_rpc_call(
|
|
"generateblocks",
|
|
Some(serde_json::json!({
|
|
"wallet_address": ViewPair::new(
|
|
ED25519_BASEPOINT_POINT,
|
|
Zeroizing::new(Scalar::one()),
|
|
).address(Network::Mainnet, AddressSpec::Standard).to_string(),
|
|
"amount_of_blocks": 1,
|
|
})),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
let hash = rpc.get_block_hash(rpc.get_height().await.unwrap() - 1).await.unwrap();
|
|
(hash, rpc.get_block(hash).await.unwrap().serialize())
|
|
}
|
|
NetworkId::Serai => panic!("processor tests adding block to Serai"),
|
|
}
|
|
}
|
|
|
|
pub async fn broadcast_block(&self, ops: &DockerOperations, block: &[u8]) {
|
|
let rpc_url = network_rpc(self.network, ops, &self.network_handle);
|
|
match self.network {
|
|
NetworkId::Bitcoin => {
|
|
use bitcoin_serai::rpc::Rpc;
|
|
|
|
let rpc =
|
|
Rpc::new(rpc_url).await.expect("couldn't connect to the coordinator's Bitcoin RPC");
|
|
let res: Option<String> =
|
|
rpc.rpc_call("submitblock", serde_json::json!([hex::encode(block)])).await.unwrap();
|
|
if let Some(err) = res {
|
|
panic!("submitblock failed: {}", err);
|
|
}
|
|
}
|
|
NetworkId::Ethereum => todo!(),
|
|
NetworkId::Monero => {
|
|
use monero_serai::rpc::HttpRpc;
|
|
|
|
let rpc = HttpRpc::new(rpc_url).expect("couldn't connect to the coordinator's Monero RPC");
|
|
let res: serde_json::Value = rpc
|
|
.json_rpc_call("submit_block", Some(serde_json::json!([hex::encode(block)])))
|
|
.await
|
|
.unwrap();
|
|
let err = res.get("error");
|
|
if err.is_some() && (err.unwrap() != &serde_json::Value::Null) {
|
|
panic!("failed to submit Monero block: {res}");
|
|
}
|
|
}
|
|
NetworkId::Serai => panic!("processor tests broadcasting block to Serai"),
|
|
}
|
|
}
|
|
|
|
pub async fn sync(&self, ops: &DockerOperations, others: &[Coordinator]) {
|
|
let rpc_url = network_rpc(self.network, ops, &self.network_handle);
|
|
match self.network {
|
|
NetworkId::Bitcoin => {
|
|
use bitcoin_serai::{bitcoin::consensus::Encodable, rpc::Rpc};
|
|
|
|
let rpc = Rpc::new(rpc_url).await.expect("couldn't connect to the Bitcoin RPC");
|
|
let to = rpc.get_latest_block_number().await.unwrap();
|
|
for coordinator in others {
|
|
let from = Rpc::new(network_rpc(self.network, ops, &coordinator.network_handle))
|
|
.await
|
|
.expect("couldn't connect to the Bitcoin RPC")
|
|
.get_latest_block_number()
|
|
.await
|
|
.unwrap() +
|
|
1;
|
|
for b in from ..= to {
|
|
let mut buf = vec![];
|
|
rpc
|
|
.get_block(&rpc.get_block_hash(b).await.unwrap())
|
|
.await
|
|
.unwrap()
|
|
.consensus_encode(&mut buf)
|
|
.unwrap();
|
|
coordinator.broadcast_block(ops, &buf).await;
|
|
}
|
|
}
|
|
}
|
|
NetworkId::Ethereum => todo!(),
|
|
NetworkId::Monero => {
|
|
use monero_serai::rpc::HttpRpc;
|
|
|
|
let rpc = HttpRpc::new(rpc_url).expect("couldn't connect to the Monero RPC");
|
|
let to = rpc.get_height().await.unwrap();
|
|
for coordinator in others {
|
|
let from = HttpRpc::new(network_rpc(self.network, ops, &coordinator.network_handle))
|
|
.expect("couldn't connect to the Monero RPC")
|
|
.get_height()
|
|
.await
|
|
.unwrap();
|
|
for b in from .. to {
|
|
coordinator
|
|
.broadcast_block(
|
|
ops,
|
|
&rpc.get_block(rpc.get_block_hash(b).await.unwrap()).await.unwrap().serialize(),
|
|
)
|
|
.await;
|
|
}
|
|
}
|
|
}
|
|
NetworkId::Serai => panic!("processors tests syncing Serai nodes"),
|
|
}
|
|
}
|
|
|
|
pub async fn publish_transacton(&self, ops: &DockerOperations, tx: &[u8]) {
|
|
let rpc_url = network_rpc(self.network, ops, &self.network_handle);
|
|
match self.network {
|
|
NetworkId::Bitcoin => {
|
|
use bitcoin_serai::rpc::Rpc;
|
|
|
|
let rpc =
|
|
Rpc::new(rpc_url).await.expect("couldn't connect to the coordinator's Bitcoin RPC");
|
|
let _: String =
|
|
rpc.rpc_call("sendrawtransaction", serde_json::json!([hex::encode(tx)])).await.unwrap();
|
|
}
|
|
NetworkId::Ethereum => todo!(),
|
|
NetworkId::Monero => {
|
|
use monero_serai::{transaction::Transaction, rpc::HttpRpc};
|
|
|
|
let rpc = HttpRpc::new(rpc_url).expect("couldn't connect to the coordinator's Monero RPC");
|
|
rpc.publish_transaction(&Transaction::read(&mut &*tx).unwrap()).await.unwrap();
|
|
}
|
|
NetworkId::Serai => panic!("processor tests broadcasting block to Serai"),
|
|
}
|
|
}
|
|
}
|