Send to a processor from a test

Mainly here to build out the infra. Does not automate checking
recipience/batch creation yet.
This commit is contained in:
Luke Parker 2023-07-24 20:06:05 -04:00
parent f05e909d0e
commit 7990ee689a
No known key found for this signature in database
8 changed files with 848 additions and 289 deletions

5
Cargo.lock generated
View file

@ -8845,17 +8845,22 @@ dependencies = [
name = "serai-processor-tests"
version = "0.1.0"
dependencies = [
"bitcoin-serai",
"ciphersuite",
"curve25519-dalek 3.2.0",
"dkg",
"dockertest",
"hex",
"monero-serai",
"rand_core 0.6.4",
"serai-docker-tests",
"serai-message-queue",
"serai-message-queue-tests",
"serai-primitives",
"serai-processor",
"serai-processor-messages",
"serai-validator-sets-primitives",
"serde",
"serde_json",
"tokio",
"zeroize",

View file

@ -802,6 +802,7 @@ impl SignableTransaction {
}
/// Sign this transaction.
// TODO: Remove async
pub async fn sign<R: RngCore + CryptoRng>(
mut self,
rng: &mut R,

View file

@ -19,19 +19,26 @@ hex = "0.4"
zeroize = "1"
rand_core = "0.6"
curve25519-dalek = "3.2"
ciphersuite = { path = "../../crypto/ciphersuite", features = ["ristretto"] }
dkg = { path = "../../crypto/dkg", features = ["tests"] }
bitcoin-serai = { path = "../../coins/bitcoin" }
monero-serai = { path = "../../coins/monero" }
messages = { package = "serai-processor-messages", path = "../../processor/messages" }
serai-primitives = { path = "../../substrate/primitives" }
serai-validator-sets-primitives = { path = "../../substrate/validator-sets/primitives" }
serai-message-queue = { path = "../../message-queue" }
serde = "1"
serde_json = "1"
tokio = { version = "1", features = ["full"] }
processor = { package = "serai-processor", path = "../../processor", features = ["bitcoin", "monero"] }
dockertest = "0.3"
serai-docker-tests = { path = "../docker" }
serai-message-queue-tests = { path = "../message-queue" }

View file

@ -1,78 +1,30 @@
use std::sync::{OnceLock, Mutex};
use zeroize::Zeroizing;
use rand_core::{RngCore, OsRng};
use ciphersuite::{group::ff::PrimeField, Ciphersuite, Ristretto};
use serai_primitives::NetworkId;
use messages::{ProcessorMessage, CoordinatorMessage};
use serai_message_queue::{Service, Metadata, client::MessageQueue};
use dockertest::{
PullPolicy, Image, LogAction, LogPolicy, LogSource, LogOptions, StartPolicy, Composition,
DockerOperations,
};
const RPC_USER: &str = "serai";
const RPC_PASS: &str = "seraidex";
mod networks;
pub use networks::*;
#[cfg(test)]
mod tests;
static UNIQUE_ID: OnceLock<Mutex<u16>> = OnceLock::new();
pub fn bitcoin_instance() -> (Composition, u16) {
serai_docker_tests::build("bitcoin".to_string());
(
Composition::with_image(
Image::with_repository("serai-dev-bitcoin").pull_policy(PullPolicy::Never),
)
.with_cmd(vec![
"bitcoind".to_string(),
"-txindex".to_string(),
"-regtest".to_string(),
format!("-rpcuser={RPC_USER}"),
format!("-rpcpassword={RPC_PASS}"),
"-rpcbind=0.0.0.0".to_string(),
"-rpcallowip=0.0.0.0/0".to_string(),
"-rpcport=8332".to_string(),
]),
8332,
)
}
pub fn monero_instance() -> (Composition, u16) {
serai_docker_tests::build("monero".to_string());
(
Composition::with_image(
Image::with_repository("serai-dev-monero").pull_policy(PullPolicy::Never),
)
.with_cmd(vec![
"monerod".to_string(),
"--regtest".to_string(),
"--offline".to_string(),
"--fixed-difficulty=1".to_string(),
"--rpc-bind-ip=0.0.0.0".to_string(),
format!("--rpc-login={RPC_USER}:{RPC_PASS}"),
"--rpc-access-control-origins=*".to_string(),
"--confirm-external-bind".to_string(),
"--non-interactive".to_string(),
])
.with_start_policy(StartPolicy::Strict),
18081,
)
}
pub fn network_instance(network: NetworkId) -> (Composition, u16) {
match network {
NetworkId::Bitcoin => bitcoin_instance(),
NetworkId::Ethereum => todo!(),
NetworkId::Monero => monero_instance(),
NetworkId::Serai => {
panic!("Serai is not a valid network to spawn an instance of for a processor")
}
}
}
pub fn processor_instance(
network: NetworkId,
port: u16,
port: u32,
message_queue_key: <Ristretto as Ciphersuite>::F,
) -> Composition {
serai_docker_tests::build("processor".to_string());
@ -107,7 +59,7 @@ pub fn processor_instance(
pub fn processor_stack(
network: NetworkId,
) -> (String, <Ristretto as Ciphersuite>::F, Vec<Composition>) {
) -> ((String, String, String), <Ristretto as Ciphersuite>::F, Vec<Composition>) {
let (network_composition, network_rpc_port) = network_instance(network);
let (coord_key, message_queue_keys, message_queue_composition) =
@ -147,8 +99,247 @@ pub fn processor_stack(
processor_composition.inject_container_name(handles.remove(0), "NETWORK_RPC_HOSTNAME");
processor_composition.inject_container_name(handles.remove(0), "MESSAGE_QUEUE_RPC");
(compositions[1].handle(), coord_key, compositions)
(
(compositions[0].handle(), compositions[1].handle(), compositions[2].handle()),
coord_key,
compositions,
)
}
#[cfg(test)]
mod tests;
#[derive(serde::Deserialize, Debug)]
struct EmptyResponse {}
pub struct Coordinator {
network: NetworkId,
network_handle: String,
#[allow(unused)]
message_queue_handle: String,
#[allow(unused)]
processor_handle: String,
next_send_id: u64,
next_recv_id: u64,
queue: MessageQueue,
}
impl Coordinator {
pub fn new(
network: NetworkId,
ops: &DockerOperations,
handles: (String, String, String),
coord_key: <Ristretto as Ciphersuite>::F,
) -> Coordinator {
let rpc = ops.handle(&handles.1).host_port(2287).unwrap();
let rpc = rpc.0.to_string() + ":" + &rpc.1.to_string();
Coordinator {
network,
network_handle: handles.0,
message_queue_handle: handles.1,
processor_handle: handles.2,
next_send_id: 0,
next_recv_id: 0,
queue: MessageQueue::new(Service::Coordinator, rpc, Zeroizing::new(coord_key)),
}
}
/// Send a message to a processor as its coordinator.
pub async fn send_message(&mut self, msg: CoordinatorMessage) {
self
.queue
.queue(
Metadata {
from: Service::Coordinator,
to: Service::Processor(self.network),
intent: self.next_send_id.to_le_bytes().to_vec(),
},
serde_json::to_string(&msg).unwrap().into_bytes(),
)
.await;
self.next_send_id += 1;
}
/// Receive a message from a processor as its coordinator.
pub async fn recv_message(&mut self) -> ProcessorMessage {
let msg =
tokio::time::timeout(core::time::Duration::from_secs(10), self.queue.next(self.next_recv_id))
.await
.unwrap();
assert_eq!(msg.from, Service::Processor(self.network));
assert_eq!(msg.id, self.next_recv_id);
self.queue.ack(self.next_recv_id).await;
self.next_recv_id += 1;
serde_json::from_slice(&msg.msg).unwrap()
}
pub async fn add_block(&self, ops: &DockerOperations) -> Vec<u8> {
let rpc_url = network_rpc(self.network, ops, &self.network_handle);
match self.network {
NetworkId::Bitcoin => {
use bitcoin_serai::{
bitcoin::{consensus::Encodable, network::constants::Network, Script, Address},
rpc::Rpc,
};
// Mine a block
let rpc = Rpc::new(rpc_url).await.expect("couldn't connect to the Bitcoin RPC");
rpc
.rpc_call::<Vec<String>>(
"generatetoaddress",
serde_json::json!([1, Address::p2sh(Script::empty(), Network::Regtest).unwrap()]),
)
.await
.unwrap();
// Get it to return it
let block = rpc
.get_block(
&rpc.get_block_hash(rpc.get_latest_block_number().await.unwrap()).await.unwrap(),
)
.await
.unwrap();
let mut block_buf = vec![];
block.consensus_encode(&mut block_buf).unwrap();
block_buf
}
NetworkId::Ethereum => todo!(),
NetworkId::Monero => {
use curve25519_dalek::{constants::ED25519_BASEPOINT_POINT, scalar::Scalar};
use monero_serai::{
wallet::{
ViewPair,
address::{Network, AddressSpec},
},
rpc::HttpRpc,
};
let rpc = HttpRpc::new(rpc_url).expect("couldn't connect to the Monero RPC");
let _: EmptyResponse = rpc
.json_rpc_call(
"generateblocks",
Some(serde_json::json!({
"wallet_address": ViewPair::new(
ED25519_BASEPOINT_POINT,
Zeroizing::new(Scalar::one()),
).address(Network::Mainnet, AddressSpec::Standard).to_string(),
"amount_of_blocks": 1,
})),
)
.await
.unwrap();
rpc
.get_block(rpc.get_block_hash(rpc.get_height().await.unwrap() - 1).await.unwrap())
.await
.unwrap()
.serialize()
}
NetworkId::Serai => panic!("processor tests adding block to Serai"),
}
}
pub async fn broadcast_block(&self, ops: &DockerOperations, block: &[u8]) {
let rpc_url = network_rpc(self.network, ops, &self.network_handle);
match self.network {
NetworkId::Bitcoin => {
use bitcoin_serai::rpc::Rpc;
let rpc =
Rpc::new(rpc_url).await.expect("couldn't connect to the coordinator's Bitcoin RPC");
let res: Option<String> =
rpc.rpc_call("submitblock", serde_json::json!([hex::encode(block)])).await.unwrap();
if let Some(err) = res {
panic!("submitblock failed: {}", err);
}
}
NetworkId::Ethereum => todo!(),
NetworkId::Monero => {
use monero_serai::rpc::HttpRpc;
let rpc = HttpRpc::new(rpc_url).expect("couldn't connect to the coordinator's Monero RPC");
let _: EmptyResponse = rpc
.json_rpc_call("submit_block", Some(serde_json::json!([hex::encode(block)])))
.await
.unwrap();
}
NetworkId::Serai => panic!("processor tests broadcasting block to Serai"),
}
}
pub async fn sync(&self, ops: &DockerOperations, others: &[Coordinator]) {
let rpc_url = network_rpc(self.network, ops, &self.network_handle);
match self.network {
NetworkId::Bitcoin => {
use bitcoin_serai::{bitcoin::consensus::Encodable, rpc::Rpc};
let rpc = Rpc::new(rpc_url).await.expect("couldn't connect to the Bitcoin RPC");
let to = rpc.get_latest_block_number().await.unwrap();
for coordinator in others {
let from = Rpc::new(network_rpc(self.network, ops, &coordinator.network_handle))
.await
.expect("couldn't connect to the Bitcoin RPC")
.get_latest_block_number()
.await
.unwrap() +
1;
for b in from ..= to {
let mut buf = vec![];
rpc
.get_block(&rpc.get_block_hash(b).await.unwrap())
.await
.unwrap()
.consensus_encode(&mut buf)
.unwrap();
coordinator.broadcast_block(ops, &buf).await;
}
}
}
NetworkId::Ethereum => todo!(),
NetworkId::Monero => {
use monero_serai::rpc::HttpRpc;
let rpc = HttpRpc::new(rpc_url).expect("couldn't connect to the Monero RPC");
let to = rpc.get_height().await.unwrap();
for coordinator in others {
let from = HttpRpc::new(network_rpc(self.network, ops, &coordinator.network_handle))
.expect("couldn't connect to the Monero RPC")
.get_height()
.await
.unwrap();
for b in from .. to {
coordinator
.broadcast_block(
ops,
&rpc.get_block(rpc.get_block_hash(b).await.unwrap()).await.unwrap().serialize(),
)
.await;
}
}
}
NetworkId::Serai => panic!("processors tests syncing Serai nodes"),
}
}
pub async fn publish_transacton(&self, ops: &DockerOperations, tx: &[u8]) {
let rpc_url = network_rpc(self.network, ops, &self.network_handle);
match self.network {
NetworkId::Bitcoin => {
use bitcoin_serai::rpc::Rpc;
let rpc =
Rpc::new(rpc_url).await.expect("couldn't connect to the coordinator's Bitcoin RPC");
let _: String =
rpc.rpc_call("sendrawtransaction", serde_json::json!([hex::encode(tx)])).await.unwrap();
}
NetworkId::Ethereum => todo!(),
NetworkId::Monero => {
use monero_serai::{transaction::Transaction, rpc::HttpRpc};
let rpc = HttpRpc::new(rpc_url).expect("couldn't connect to the coordinator's Monero RPC");
rpc.publish_transaction(&Transaction::read(&mut &*tx).unwrap()).await.unwrap();
}
NetworkId::Serai => panic!("processor tests broadcasting block to Serai"),
}
}
}

View file

@ -0,0 +1,351 @@
use std::collections::HashSet;
use zeroize::Zeroizing;
use rand_core::{RngCore, OsRng};
use serai_primitives::NetworkId;
use serai_validator_sets_primitives::ExternalKey;
use dockertest::{PullPolicy, Image, StartPolicy, Composition, DockerOperations};
use crate::*;
pub const RPC_USER: &str = "serai";
pub const RPC_PASS: &str = "seraidex";
pub const BTC_PORT: u32 = 8332;
pub const XMR_PORT: u32 = 18081;
pub fn bitcoin_instance() -> (Composition, u32) {
serai_docker_tests::build("bitcoin".to_string());
let mut composition = Composition::with_image(
Image::with_repository("serai-dev-bitcoin").pull_policy(PullPolicy::Never),
)
.with_cmd(vec![
"bitcoind".to_string(),
"-txindex".to_string(),
"-regtest".to_string(),
format!("-rpcuser={RPC_USER}"),
format!("-rpcpassword={RPC_PASS}"),
"-rpcbind=0.0.0.0".to_string(),
"-rpcallowip=0.0.0.0/0".to_string(),
"-rpcport=8332".to_string(),
]);
composition.publish_all_ports();
(composition, BTC_PORT)
}
pub fn monero_instance() -> (Composition, u32) {
serai_docker_tests::build("monero".to_string());
let mut composition = Composition::with_image(
Image::with_repository("serai-dev-monero").pull_policy(PullPolicy::Never),
)
.with_cmd(vec![
"monerod".to_string(),
"--regtest".to_string(),
"--offline".to_string(),
"--fixed-difficulty=1".to_string(),
"--rpc-bind-ip=0.0.0.0".to_string(),
format!("--rpc-login={RPC_USER}:{RPC_PASS}"),
"--rpc-access-control-origins=*".to_string(),
"--confirm-external-bind".to_string(),
"--non-interactive".to_string(),
])
.with_start_policy(StartPolicy::Strict);
composition.publish_all_ports();
(composition, XMR_PORT)
}
pub fn network_instance(network: NetworkId) -> (Composition, u32) {
match network {
NetworkId::Bitcoin => bitcoin_instance(),
NetworkId::Ethereum => todo!(),
NetworkId::Monero => monero_instance(),
NetworkId::Serai => {
panic!("Serai is not a valid network to spawn an instance of for a processor")
}
}
}
pub fn network_rpc(network: NetworkId, ops: &DockerOperations, handle: &str) -> String {
let (ip, port) = ops
.handle(handle)
.host_port(match network {
NetworkId::Bitcoin => BTC_PORT,
NetworkId::Ethereum => todo!(),
NetworkId::Monero => XMR_PORT,
NetworkId::Serai => panic!("getting port for external network yet it was Serai"),
})
.unwrap();
format!("http://{RPC_USER}:{RPC_PASS}@{}:{}", ip.to_string(), port.to_string())
}
pub fn confirmations(network: NetworkId) -> usize {
use processor::coins::*;
match network {
NetworkId::Bitcoin => Bitcoin::CONFIRMATIONS,
NetworkId::Ethereum => todo!(),
NetworkId::Monero => Monero::CONFIRMATIONS,
NetworkId::Serai => panic!("getting confirmations required for Serai"),
}
}
#[derive(Clone)]
pub enum Wallet {
Bitcoin {
private_key: bitcoin_serai::bitcoin::PrivateKey,
public_key: bitcoin_serai::bitcoin::PublicKey,
input_tx: bitcoin_serai::bitcoin::Transaction,
},
Monero {
handle: String,
spend_key: Zeroizing<curve25519_dalek::scalar::Scalar>,
view_pair: monero_serai::wallet::ViewPair,
inputs: Vec<monero_serai::wallet::ReceivedOutput>,
},
}
// TODO: Merge these functions with the processor's tests, which offers very similar functionality
impl Wallet {
pub async fn new(network: NetworkId, ops: &DockerOperations, handle: String) -> Wallet {
let rpc_url = network_rpc(network, ops, &handle);
match network {
NetworkId::Bitcoin => {
use bitcoin_serai::{
bitcoin::{
secp256k1::{SECP256K1, SecretKey},
PrivateKey, PublicKey, ScriptBuf, Network, Address,
},
rpc::Rpc,
};
let secret_key = SecretKey::new(&mut rand_core::OsRng);
let private_key = PrivateKey::new(secret_key, Network::Regtest);
let public_key = PublicKey::from_private_key(SECP256K1, &private_key);
let main_addr = Address::p2pkh(&public_key, Network::Regtest);
let rpc = Rpc::new(rpc_url).await.expect("couldn't connect to the Bitcoin RPC");
let new_block = rpc.get_latest_block_number().await.unwrap() + 1;
rpc
.rpc_call::<Vec<String>>("generatetoaddress", serde_json::json!([1, main_addr]))
.await
.unwrap();
// Mine it to maturity
rpc
.rpc_call::<Vec<String>>(
"generatetoaddress",
serde_json::json!([100, Address::p2sh(&ScriptBuf::new(), Network::Regtest).unwrap()]),
)
.await
.unwrap();
let funds = rpc
.get_block(&rpc.get_block_hash(new_block).await.unwrap())
.await
.unwrap()
.txdata
.swap_remove(0);
Wallet::Bitcoin { private_key, public_key, input_tx: funds }
}
NetworkId::Ethereum => todo!(),
NetworkId::Monero => {
use curve25519_dalek::{constants::ED25519_BASEPOINT_POINT, scalar::Scalar};
use monero_serai::{
wallet::{
ViewPair, Scanner,
address::{Network, AddressSpec},
},
rpc::HttpRpc,
};
let mut bytes = [0; 64];
OsRng.fill_bytes(&mut bytes);
let spend_key = Scalar::from_bytes_mod_order_wide(&bytes);
OsRng.fill_bytes(&mut bytes);
let view_key = Scalar::from_bytes_mod_order_wide(&bytes);
let view_pair =
ViewPair::new(ED25519_BASEPOINT_POINT * spend_key, Zeroizing::new(view_key));
let rpc = HttpRpc::new(rpc_url).expect("couldn't connect to the Monero RPC");
let height = rpc.get_height().await.unwrap();
// Mines 200 blocks so sufficient decoys exist, as only 60 is needed for maturity
let _: EmptyResponse = rpc
.json_rpc_call(
"generateblocks",
Some(serde_json::json!({
"wallet_address": view_pair.address(
Network::Mainnet,
AddressSpec::Standard
).to_string(),
"amount_of_blocks": 200,
})),
)
.await
.unwrap();
let block = rpc.get_block(rpc.get_block_hash(height).await.unwrap()).await.unwrap();
let output = Scanner::from_view(view_pair.clone(), Some(HashSet::new()))
.scan(&rpc, &block)
.await
.unwrap()
.remove(0)
.ignore_timelock()
.remove(0);
Wallet::Monero {
handle,
spend_key: Zeroizing::new(spend_key),
view_pair,
inputs: vec![output.output.clone()],
}
}
NetworkId::Serai => panic!("creating a wallet for for Serai"),
}
}
pub async fn send_to_address(&mut self, ops: &DockerOperations, to: &ExternalKey) -> Vec<u8> {
match self {
Wallet::Bitcoin { private_key, public_key, ref mut input_tx } => {
use bitcoin_serai::bitcoin::{
secp256k1::{SECP256K1, Message},
key::{XOnlyPublicKey, TweakedPublicKey},
consensus::Encodable,
sighash::{EcdsaSighashType, SighashCache},
script::{PushBytesBuf, Script, Builder},
address::Payload,
OutPoint, Sequence, Witness, TxIn, TxOut,
absolute::LockTime,
Transaction,
};
let mut tx = Transaction {
version: 2,
lock_time: LockTime::ZERO,
input: vec![TxIn {
previous_output: OutPoint { txid: input_tx.txid(), vout: 0 },
script_sig: Script::empty().into(),
sequence: Sequence(u32::MAX),
witness: Witness::default(),
}],
output: vec![TxOut {
value: input_tx.output[0].value - 10000,
script_pubkey: Payload::p2tr_tweaked(TweakedPublicKey::dangerous_assume_tweaked(
XOnlyPublicKey::from_slice(&to[1 ..]).unwrap(),
))
.script_pubkey(),
}],
};
let mut der = SECP256K1
.sign_ecdsa_low_r(
&Message::from(
SighashCache::new(&tx)
.legacy_signature_hash(
0,
&input_tx.output[0].script_pubkey,
EcdsaSighashType::All.to_u32(),
)
.unwrap()
.to_raw_hash(),
),
&private_key.inner,
)
.serialize_der()
.to_vec();
der.push(1);
tx.input[0].script_sig = Builder::new()
.push_slice(PushBytesBuf::try_from(der).unwrap())
.push_key(public_key)
.into_script();
let mut buf = vec![];
tx.consensus_encode(&mut buf).unwrap();
*input_tx = tx;
buf
}
Wallet::Monero { handle, ref spend_key, ref view_pair, ref mut inputs } => {
use curve25519_dalek::{constants::ED25519_BASEPOINT_POINT, edwards::CompressedEdwardsY};
use monero_serai::{
Protocol,
wallet::{
address::{Network, AddressType, AddressMeta, Address},
SpendableOutput, Decoys, Change, FeePriority, Scanner, SignableTransaction,
},
rpc::HttpRpc,
};
use processor::{additional_key, coins::Monero};
let rpc_url = network_rpc(NetworkId::Monero, ops, handle);
let rpc = HttpRpc::new(rpc_url).expect("couldn't connect to the Monero RPC");
// Prepare inputs
let outputs = inputs.drain(..).collect::<Vec<_>>();
let mut these_inputs = vec![];
for output in outputs {
these_inputs.push(
SpendableOutput::from(&rpc, output)
.await
.expect("prior transaction was never published"),
);
}
let mut decoys = Decoys::select(
&mut OsRng,
&rpc,
Protocol::v16.ring_len(),
rpc.get_height().await.unwrap() - 1,
&these_inputs,
)
.await
.unwrap();
let to_spend_key =
CompressedEdwardsY(<[u8; 32]>::try_from(to.as_ref()).unwrap()).decompress().unwrap();
let to_view_key = additional_key::<Monero>(0);
let to_addr = Address::new(
AddressMeta::new(
Network::Mainnet,
AddressType::Featured { subaddress: false, payment_id: None, guaranteed: true },
),
to_spend_key,
ED25519_BASEPOINT_POINT * to_view_key.0,
);
// Create and sign the TX
let tx = SignableTransaction::new(
Protocol::v16,
None,
these_inputs.drain(..).zip(decoys.drain(..)).collect(),
vec![(to_addr, 1_000_000_000_000)],
Some(Change::new(view_pair, false)),
vec![],
rpc.get_fee(Protocol::v16, FeePriority::Low).await.unwrap(),
)
.unwrap()
.sign(&mut OsRng, spend_key)
.await
.unwrap();
// Push the change output
inputs.push(
Scanner::from_view(view_pair.clone(), Some(HashSet::new()))
.scan_transaction(&tx)
.ignore_timelock()
.remove(0),
);
tx.serialize()
}
}
}
}

View file

@ -0,0 +1,167 @@
use std::{collections::HashMap, time::SystemTime};
use dkg::{Participant, ThresholdParams, tests::clone_without};
use serai_primitives::{NetworkId, BlockHash, PublicKey};
use serai_validator_sets_primitives::{Session, KeyPair, ValidatorSet};
use messages::{SubstrateContext, key_gen::KeyGenId, CoordinatorMessage, ProcessorMessage};
use dockertest::DockerTest;
use crate::{*, tests::*};
pub(crate) async fn key_gen(coordinators: &mut [Coordinator], network: NetworkId) -> KeyPair {
// Perform an interaction with all processors via their coordinators
async fn interact_with_all<
FS: Fn(Participant) -> messages::key_gen::CoordinatorMessage,
FR: FnMut(Participant, messages::key_gen::ProcessorMessage),
>(
coordinators: &mut [Coordinator],
message: FS,
mut recv: FR,
) {
for (i, coordinator) in coordinators.iter_mut().enumerate() {
let participant = Participant::new(u16::try_from(i + 1).unwrap()).unwrap();
coordinator.send_message(CoordinatorMessage::KeyGen(message(participant))).await;
match coordinator.recv_message().await {
ProcessorMessage::KeyGen(msg) => recv(participant, msg),
_ => panic!("processor didn't return KeyGen message"),
}
}
}
// Order a key gen
let id = KeyGenId { set: ValidatorSet { session: Session(0), network }, attempt: 0 };
let mut commitments = HashMap::new();
interact_with_all(
coordinators,
|participant| messages::key_gen::CoordinatorMessage::GenerateKey {
id,
params: ThresholdParams::new(
u16::try_from(THRESHOLD).unwrap(),
u16::try_from(COORDINATORS).unwrap(),
participant,
)
.unwrap(),
},
|participant, msg| match msg {
messages::key_gen::ProcessorMessage::Commitments {
id: this_id,
commitments: these_commitments,
} => {
assert_eq!(this_id, id);
commitments.insert(participant, these_commitments);
}
_ => panic!("processor didn't return Commitments in response to GenerateKey"),
},
)
.await;
// Send the commitments to all parties
let mut shares = HashMap::new();
interact_with_all(
coordinators,
|participant| messages::key_gen::CoordinatorMessage::Commitments {
id,
commitments: clone_without(&commitments, &participant),
},
|participant, msg| match msg {
messages::key_gen::ProcessorMessage::Shares { id: this_id, shares: these_shares } => {
assert_eq!(this_id, id);
shares.insert(participant, these_shares);
}
_ => panic!("processor didn't return Shares in response to GenerateKey"),
},
)
.await;
// Send the shares
let mut substrate_key = None;
let mut coin_key = None;
interact_with_all(
coordinators,
|participant| messages::key_gen::CoordinatorMessage::Shares {
id,
shares: shares
.iter()
.filter_map(|(this_participant, shares)| {
shares.get(&participant).cloned().map(|share| (*this_participant, share))
})
.collect(),
},
|_, msg| match msg {
messages::key_gen::ProcessorMessage::GeneratedKeyPair {
id: this_id,
substrate_key: this_substrate_key,
coin_key: this_coin_key,
} => {
assert_eq!(this_id, id);
if substrate_key.is_none() {
substrate_key = Some(this_substrate_key);
coin_key = Some(this_coin_key.clone());
}
assert_eq!(substrate_key.unwrap(), this_substrate_key);
assert_eq!(coin_key.as_ref().unwrap(), &this_coin_key);
}
_ => panic!("processor didn't return GeneratedKeyPair in response to GenerateKey"),
},
)
.await;
// Confirm the key pair
// TODO: Beter document coin_latest_finalized_block's genesis state, and error if a set claims
// [0; 32] was finalized
let context = SubstrateContext {
serai_time: SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(),
coin_latest_finalized_block: BlockHash([0; 32]),
};
let key_pair =
(PublicKey::from_raw(substrate_key.unwrap()), coin_key.clone().unwrap().try_into().unwrap());
for coordinator in coordinators {
coordinator
.send_message(CoordinatorMessage::Substrate(
messages::substrate::CoordinatorMessage::ConfirmKeyPair {
context,
set: id.set,
key_pair: key_pair.clone(),
},
))
.await;
}
key_pair
}
#[test]
fn key_gen_test() {
for network in [NetworkId::Bitcoin, NetworkId::Monero] {
let mut coordinators = vec![];
let mut test = DockerTest::new();
for _ in 0 .. COORDINATORS {
let (handles, coord_key, compositions) = processor_stack(network);
coordinators.push((handles, coord_key));
for composition in compositions {
test.add_composition(composition);
}
}
test.run(|ops| async move {
// Sleep for a second for the message-queue to boot
// It isn't an error to start immediately, it just silences an error
tokio::time::sleep(core::time::Duration::from_secs(1)).await;
// Connect to the Message Queues as the coordinator
let mut coordinators = coordinators
.into_iter()
.map(|(handles, key)| Coordinator::new(network, &ops, handles, key))
.collect::<Vec<_>>();
key_gen(&mut coordinators, network).await;
});
}
}

View file

@ -1,229 +1,7 @@
use std::{collections::HashMap, time::SystemTime};
mod key_gen;
pub(crate) use key_gen::key_gen;
use zeroize::Zeroizing;
mod scan;
use ciphersuite::{Ciphersuite, Ristretto};
use dkg::{Participant, ThresholdParams, tests::clone_without};
use serai_primitives::{NetworkId, BlockHash, PublicKey};
use serai_validator_sets_primitives::{Session, ValidatorSet};
use messages::{SubstrateContext, key_gen::KeyGenId, CoordinatorMessage, ProcessorMessage};
use serai_message_queue::{Service, Metadata, client::MessageQueue};
use dockertest::{DockerOperations, DockerTest};
use crate::*;
const COORDINATORS: usize = 4;
const THRESHOLD: usize = ((COORDINATORS * 2) / 3) + 1;
struct Coordinator {
next_send_id: u64,
next_recv_id: u64,
queue: MessageQueue,
}
fn coordinator_queue(
ops: &DockerOperations,
handle: String,
coord_key: <Ristretto as Ciphersuite>::F,
) -> Coordinator {
let rpc = ops.handle(&handle).host_port(2287).unwrap();
let rpc = rpc.0.to_string() + ":" + &rpc.1.to_string();
Coordinator {
next_send_id: 0,
next_recv_id: 0,
queue: MessageQueue::new(Service::Coordinator, rpc, Zeroizing::new(coord_key)),
}
}
// Send a message to a processor via its coordinator
async fn send_message(coordinator: &mut Coordinator, network: NetworkId, msg: CoordinatorMessage) {
coordinator
.queue
.queue(
Metadata {
from: Service::Coordinator,
to: Service::Processor(network),
intent: coordinator.next_send_id.to_le_bytes().to_vec(),
},
serde_json::to_string(&msg).unwrap().into_bytes(),
)
.await;
coordinator.next_send_id += 1;
}
// Receive a message from a processor via its coordinator
async fn recv_message(coordinator: &mut Coordinator, from: NetworkId) -> ProcessorMessage {
let msg = tokio::time::timeout(
core::time::Duration::from_secs(10),
coordinator.queue.next(coordinator.next_recv_id),
)
.await
.unwrap();
assert_eq!(msg.from, Service::Processor(from));
assert_eq!(msg.id, coordinator.next_recv_id);
coordinator.queue.ack(coordinator.next_recv_id).await;
coordinator.next_recv_id += 1;
serde_json::from_slice(&msg.msg).unwrap()
}
async fn key_gen(coordinators: &mut [Coordinator], network: NetworkId) {
// Perform an interaction with all processors via their coordinators
async fn interact_with_all<
FS: Fn(Participant) -> messages::key_gen::CoordinatorMessage,
FR: FnMut(Participant, messages::key_gen::ProcessorMessage),
>(
coordinators: &mut [Coordinator],
network: NetworkId,
message: FS,
mut recv: FR,
) {
for (i, coordinator) in coordinators.iter_mut().enumerate() {
let participant = Participant::new(u16::try_from(i + 1).unwrap()).unwrap();
send_message(coordinator, network, CoordinatorMessage::KeyGen(message(participant))).await;
match recv_message(coordinator, network).await {
ProcessorMessage::KeyGen(msg) => recv(participant, msg),
_ => panic!("processor didn't return KeyGen message"),
}
}
}
// Order a key gen
let id = KeyGenId { set: ValidatorSet { session: Session(0), network }, attempt: 0 };
let mut commitments = HashMap::new();
interact_with_all(
coordinators,
network,
|participant| messages::key_gen::CoordinatorMessage::GenerateKey {
id,
params: ThresholdParams::new(
u16::try_from(THRESHOLD).unwrap(),
u16::try_from(COORDINATORS).unwrap(),
participant,
)
.unwrap(),
},
|participant, msg| match msg {
messages::key_gen::ProcessorMessage::Commitments {
id: this_id,
commitments: these_commitments,
} => {
assert_eq!(this_id, id);
commitments.insert(participant, these_commitments);
}
_ => panic!("processor didn't return Commitments in response to GenerateKey"),
},
)
.await;
// Send the commitments to all parties
let mut shares = HashMap::new();
interact_with_all(
coordinators,
network,
|participant| messages::key_gen::CoordinatorMessage::Commitments {
id,
commitments: clone_without(&commitments, &participant),
},
|participant, msg| match msg {
messages::key_gen::ProcessorMessage::Shares { id: this_id, shares: these_shares } => {
assert_eq!(this_id, id);
shares.insert(participant, these_shares);
}
_ => panic!("processor didn't return Shares in response to GenerateKey"),
},
)
.await;
// Send the shares
let mut substrate_key = None;
let mut coin_key = None;
interact_with_all(
coordinators,
network,
|participant| messages::key_gen::CoordinatorMessage::Shares {
id,
shares: shares
.iter()
.filter_map(|(this_participant, shares)| {
shares.get(&participant).cloned().map(|share| (*this_participant, share))
})
.collect(),
},
|_, msg| match msg {
messages::key_gen::ProcessorMessage::GeneratedKeyPair {
id: this_id,
substrate_key: this_substrate_key,
coin_key: this_coin_key,
} => {
assert_eq!(this_id, id);
if substrate_key.is_none() {
substrate_key = Some(this_substrate_key);
coin_key = Some(this_coin_key.clone());
}
assert_eq!(substrate_key.unwrap(), this_substrate_key);
assert_eq!(coin_key.as_ref().unwrap(), &this_coin_key);
}
_ => panic!("processor didn't return GeneratedKeyPair in response to GenerateKey"),
},
)
.await;
// Confirm the key pair
// TODO: Beter document coin_latest_finalized_block's genesis state, and error if a set claims
// [0; 32] was finalized
let context = SubstrateContext {
serai_time: SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(),
coin_latest_finalized_block: BlockHash([0; 32]),
};
for coordinator in coordinators {
send_message(
coordinator,
network,
CoordinatorMessage::Substrate(messages::substrate::CoordinatorMessage::ConfirmKeyPair {
context,
set: id.set,
key_pair: (
PublicKey::from_raw(substrate_key.unwrap()),
coin_key.clone().unwrap().try_into().unwrap(),
),
}),
)
.await;
}
tokio::time::sleep(core::time::Duration::from_secs(5)).await;
}
#[test]
fn key_gen_test() {
for network in [NetworkId::Bitcoin, NetworkId::Monero] {
let mut coordinators = vec![];
let mut test = DockerTest::new();
for _ in 0 .. COORDINATORS {
let (coord_handle, coord_key, compositions) = processor_stack(network);
coordinators.push((coord_handle, coord_key));
for composition in compositions {
test.add_composition(composition);
}
}
test.run(|ops| async move {
// Sleep for a second for the message-queue to boot
// It isn't an error to start immediately, it just silences an error
tokio::time::sleep(core::time::Duration::from_secs(1)).await;
// Connect to the Message Queues as the coordinator
let mut coordinators = coordinators
.into_iter()
.map(|(handle, key)| coordinator_queue(&ops, handle, key))
.collect::<Vec<_>>();
key_gen(&mut coordinators, network).await;
});
}
}
pub(crate) const COORDINATORS: usize = 4;
pub(crate) const THRESHOLD: usize = ((COORDINATORS * 2) / 3) + 1;

View file

@ -0,0 +1,59 @@
use serai_primitives::NetworkId;
use dockertest::DockerTest;
use crate::{*, tests::*};
#[test]
fn scan_test() {
for network in [NetworkId::Bitcoin, NetworkId::Monero] {
let mut coordinators = vec![];
let mut test = DockerTest::new();
for _ in 0 .. COORDINATORS {
let (handles, coord_key, compositions) = processor_stack(network);
coordinators.push((handles, coord_key));
for composition in compositions {
test.add_composition(composition);
}
}
test.run(|ops| async move {
tokio::time::sleep(core::time::Duration::from_secs(1)).await;
let mut coordinators = coordinators
.into_iter()
.map(|(handles, key)| Coordinator::new(network, &ops, handles, key))
.collect::<Vec<_>>();
// Start by generating keys
let key_pair = key_gen(&mut coordinators, network).await;
// Now we we have to mine blocks to activate the key
// (the first key is activated when the coin's block time exceeds the Serai time it was
// confirmed at)
for _ in 0 .. confirmations(network) {
let block = coordinators[0].add_block(&ops).await;
for coordinator in &coordinators[1 ..] {
coordinator.broadcast_block(&ops, &block).await;
}
}
let mut wallet = Wallet::new(network, &ops, coordinators[0].network_handle.clone()).await;
coordinators[0].sync(&ops, &coordinators[1 ..]).await;
let tx = wallet.send_to_address(&ops, &key_pair.1).await;
for coordinator in &coordinators {
coordinator.publish_transacton(&ops, &tx).await;
}
for _ in 0 .. confirmations(network) {
let block = coordinators[0].add_block(&ops).await;
for coordinator in &coordinators[1 ..] {
coordinator.broadcast_block(&ops, &block).await;
}
}
tokio::time::sleep(core::time::Duration::from_secs(10)).await;
});
}
}