Implement an (almost) full Key Gen test for processor's Docker tests

It doesn't confirm the key pair yet.

Adds the infra neded to test processors against each other.
This commit is contained in:
Luke Parker 2023-07-22 04:04:17 -04:00
parent c26beae0f9
commit d07447fe97
No known key found for this signature in database
7 changed files with 325 additions and 110 deletions

5
Cargo.lock generated
View file

@ -8739,6 +8739,9 @@ dependencies = [
[[package]]
name = "serai-docker-tests"
version = "0.1.0"
dependencies = [
"chrono",
]
[[package]]
name = "serai-env"
@ -8813,6 +8816,7 @@ dependencies = [
"serai-message-queue",
"serai-primitives",
"tokio",
"zeroize",
]
[[package]]
@ -8953,6 +8957,7 @@ dependencies = [
"serai-validator-sets-primitives",
"serde_json",
"tokio",
"zeroize",
]
[[package]]

View file

@ -92,6 +92,8 @@ pub fn build(name: String) {
}
}
} else {
// Recursively crawl since we care when the folder's contents were edited, not the folder
// itself
for entry in fs::read_dir(path.clone()).expect("couldn't read directory") {
metadatas
.push(meta(path.join(entry.expect("couldn't access item in directory").file_name())));

View file

@ -16,6 +16,7 @@ rustdoc-args = ["--cfg", "docsrs"]
[dependencies]
hex = "0.4"
zeroize = "1"
rand_core = "0.6"
ciphersuite = { path = "../../crypto/ciphersuite", features = ["ristretto"] }

View file

@ -57,9 +57,7 @@ pub fn instance(
#[test]
fn basic_functionality() {
use std::env;
use ciphersuite::group::ff::PrimeField;
use zeroize::Zeroizing;
use dockertest::DockerTest;
@ -74,12 +72,11 @@ fn basic_functionality() {
tokio::time::sleep(core::time::Duration::from_secs(1)).await;
let rpc = ops.handle("serai-dev-message-queue").host_port(2287).unwrap();
// TODO: Add new to MessageQueue to avoid needing to use set_var
env::set_var("MESSAGE_QUEUE_RPC", rpc.0.to_string() + ":" + &rpc.1.to_string());
env::set_var("MESSAGE_QUEUE_KEY", hex::encode(coord_key.to_repr()));
let rpc = rpc.0.to_string() + ":" + &rpc.1.to_string();
// Queue some messages
let coordinator = MessageQueue::from_env(Service::Coordinator);
let coordinator =
MessageQueue::new(Service::Coordinator, rpc.clone(), Zeroizing::new(coord_key));
coordinator
.queue(
Metadata {
@ -103,8 +100,11 @@ fn basic_functionality() {
.await;
// Successfully get it
env::set_var("MESSAGE_QUEUE_KEY", hex::encode(priv_keys[&NetworkId::Bitcoin].to_repr()));
let bitcoin = MessageQueue::from_env(Service::Processor(NetworkId::Bitcoin));
let bitcoin = MessageQueue::new(
Service::Processor(NetworkId::Bitcoin),
rpc,
Zeroizing::new(priv_keys[&NetworkId::Bitcoin]),
);
let msg = bitcoin.next(0).await;
assert_eq!(msg.from, Service::Coordinator);
assert_eq!(msg.id, 0);

View file

@ -16,10 +16,11 @@ rustdoc-args = ["--cfg", "docsrs"]
[dependencies]
hex = "0.4"
zeroize = "1"
rand_core = "0.6"
ciphersuite = { path = "../../crypto/ciphersuite", features = ["ristretto"] }
dkg = { path = "../../crypto/dkg" }
dkg = { path = "../../crypto/dkg", features = ["tests"] }
messages = { package = "serai-processor-messages", path = "../../processor/messages" }

View file

@ -1,36 +1,88 @@
use std::sync::{OnceLock, Mutex};
use rand_core::{RngCore, OsRng};
use ciphersuite::{group::ff::PrimeField, Ciphersuite, Ristretto};
use serai_primitives::NetworkId;
use dockertest::{
PullPolicy, Image, LogAction, LogPolicy, LogSource, LogOptions, StartPolicy, Composition,
};
pub fn bitcoin_instance() -> Composition {
serai_docker_tests::build("bitcoin".to_string());
const RPC_USER: &str = "serai";
const RPC_PASS: &str = "seraidex";
Composition::with_image(
Image::with_repository("serai-dev-bitcoin").pull_policy(PullPolicy::Never),
)
.with_log_options(Some(LogOptions {
static UNIQUE_ID: OnceLock<Mutex<u16>> = OnceLock::new();
fn log_options() -> Option<LogOptions> {
Some(LogOptions {
action: LogAction::Forward,
policy: LogPolicy::Always,
source: LogSource::Both,
}))
.with_cmd(vec![
"bitcoind".to_string(),
"-txindex".to_string(),
"-regtest".to_string(),
"-rpcuser=serai".to_string(),
"-rpcpassword=seraidex".to_string(),
"-rpcbind=0.0.0.0".to_string(),
"-rpcallowip=0.0.0.0/0".to_string(),
"-rpcport=8332".to_string(),
])
.with_start_policy(StartPolicy::Strict)
})
}
pub fn instance(message_queue_key: <Ristretto as Ciphersuite>::F) -> Composition {
pub fn bitcoin_instance() -> (Composition, u16) {
serai_docker_tests::build("bitcoin".to_string());
(
Composition::with_image(
Image::with_repository("serai-dev-bitcoin").pull_policy(PullPolicy::Never),
)
.with_cmd(vec![
"bitcoind".to_string(),
"-txindex".to_string(),
"-regtest".to_string(),
format!("-rpcuser={RPC_USER}"),
format!("-rpcpassword={RPC_PASS}"),
"-rpcbind=0.0.0.0".to_string(),
"-rpcallowip=0.0.0.0/0".to_string(),
"-rpcport=8332".to_string(),
]),
8332,
)
}
pub fn monero_instance() -> (Composition, u16) {
serai_docker_tests::build("monero".to_string());
(
Composition::with_image(
Image::with_repository("serai-dev-monero").pull_policy(PullPolicy::Never),
)
.with_cmd(vec![
"monerod".to_string(),
"--regtest".to_string(),
"--offline".to_string(),
"--fixed-difficulty=1".to_string(),
"--rpc-bind-ip=0.0.0.0".to_string(),
format!("--rpc-login={RPC_USER}:{RPC_PASS}"),
"--rpc-access-control-origins=*".to_string(),
"--confirm-external-bind".to_string(),
"--non-interactive".to_string(),
])
.with_start_policy(StartPolicy::Strict),
18081,
)
}
pub fn network_instance(network: NetworkId) -> (Composition, u16) {
match network {
NetworkId::Bitcoin => bitcoin_instance(),
NetworkId::Ethereum => todo!(),
NetworkId::Monero => monero_instance(),
NetworkId::Serai => {
panic!("Serai is not a valid network to spawn an instance of for a processor")
}
}
}
pub fn processor_instance(
network: NetworkId,
port: u16,
message_queue_key: <Ristretto as Ciphersuite>::F,
) -> Composition {
serai_docker_tests::build("processor".to_string());
let mut entropy = [0; 32];
@ -39,104 +91,68 @@ pub fn instance(message_queue_key: <Ristretto as Ciphersuite>::F) -> Composition
Composition::with_image(
Image::with_repository("serai-dev-processor").pull_policy(PullPolicy::Never),
)
.with_log_options(Some(LogOptions {
action: LogAction::Forward,
policy: LogPolicy::Always,
source: LogSource::Both,
}))
.with_env(
[
("MESSAGE_QUEUE_KEY".to_string(), hex::encode(message_queue_key.to_repr())),
("ENTROPY".to_string(), hex::encode(entropy)),
("NETWORK".to_string(), "bitcoin".to_string()),
("NETWORK_RPC_LOGIN".to_string(), "serai:seraidex".to_string()),
("NETWORK_RPC_PORT".to_string(), "8332".to_string()),
(
"NETWORK".to_string(),
(match network {
NetworkId::Serai => panic!("starting a processor for Serai"),
NetworkId::Bitcoin => "bitcoin",
NetworkId::Ethereum => "ethereum",
NetworkId::Monero => "monero",
})
.to_string(),
),
("NETWORK_RPC_LOGIN".to_string(), format!("{RPC_USER}:{RPC_PASS}")),
("NETWORK_RPC_PORT".to_string(), port.to_string()),
("DB_PATH".to_string(), "./processor-db".to_string()),
]
.into(),
)
.with_start_policy(StartPolicy::Strict)
}
#[test]
fn basic_functionality() {
use std::env;
use serai_primitives::NetworkId;
use serai_validator_sets_primitives::{Session, ValidatorSet};
use serai_message_queue::{Service, Metadata, client::MessageQueue};
use dockertest::DockerTest;
let bitcoin_composition = bitcoin_instance();
pub fn processor_stack(
network: NetworkId,
) -> (String, <Ristretto as Ciphersuite>::F, Vec<Composition>) {
let (network_composition, network_rpc_port) = network_instance(network);
let (coord_key, message_queue_keys, message_queue_composition) =
serai_message_queue_tests::instance();
let message_queue_composition = message_queue_composition.with_start_policy(StartPolicy::Strict);
let mut processor_composition = instance(message_queue_keys[&NetworkId::Bitcoin]);
processor_composition.inject_container_name(bitcoin_composition.handle(), "NETWORK_RPC_HOSTNAME");
processor_composition
.inject_container_name(message_queue_composition.handle(), "MESSAGE_QUEUE_RPC");
let processor_composition =
processor_instance(network, network_rpc_port, message_queue_keys[&network]);
let mut test = DockerTest::new();
test.add_composition(bitcoin_composition);
test.add_composition(message_queue_composition);
test.add_composition(processor_composition);
// Give every item in this stack a unique ID
// Uses a Mutex as we can't generate a 8-byte random ID without hitting hostname length limits
let unique_id = {
let unique_id_mutex = UNIQUE_ID.get_or_init(|| Mutex::new(0));
let mut unique_id_lock = unique_id_mutex.lock().unwrap();
let unique_id = hex::encode(unique_id_lock.to_be_bytes());
*unique_id_lock += 1;
unique_id
};
test.run(|ops| async move {
// Sleep for 10 seconds to be polite and let things boot
tokio::time::sleep(core::time::Duration::from_secs(10)).await;
// Connect to the Message Queue as the coordinator
let rpc = ops.handle("serai-dev-message-queue").host_port(2287).unwrap();
// TODO: MessageQueue::new
env::set_var(
"MESSAGE_QUEUE_RPC",
"http://".to_string() + &rpc.0.to_string() + ":" + &rpc.1.to_string(),
let mut compositions = vec![];
let mut handles = vec![];
for composition in [network_composition, message_queue_composition, processor_composition] {
let handle = composition.handle();
compositions.push(
composition
.with_start_policy(StartPolicy::Strict)
.with_container_name(format!("{handle}-{}", &unique_id))
.with_log_options(log_options()),
);
env::set_var("MESSAGE_QUEUE_KEY", hex::encode(coord_key.to_repr()));
let coordinator = MessageQueue::from_env(Service::Coordinator);
handles.push(compositions.last().unwrap().handle());
}
// Order a key gen
let id = messages::key_gen::KeyGenId {
set: ValidatorSet { session: Session(0), network: NetworkId::Bitcoin },
attempt: 0,
};
let processor_composition = compositions.last_mut().unwrap();
processor_composition.inject_container_name(handles.remove(0), "NETWORK_RPC_HOSTNAME");
processor_composition.inject_container_name(handles.remove(0), "MESSAGE_QUEUE_RPC");
coordinator
.queue(
Metadata {
from: Service::Coordinator,
to: Service::Processor(NetworkId::Bitcoin),
intent: b"key_gen_0".to_vec(),
},
serde_json::to_string(&messages::CoordinatorMessage::KeyGen(
messages::key_gen::CoordinatorMessage::GenerateKey {
id,
params: dkg::ThresholdParams::new(3, 4, dkg::Participant::new(1).unwrap()).unwrap(),
},
))
.unwrap()
.into_bytes(),
)
.await;
// Read the created commitments
let msg = coordinator.next(0).await;
assert_eq!(msg.from, Service::Processor(NetworkId::Bitcoin));
assert_eq!(msg.id, 0);
let msg: messages::ProcessorMessage = serde_json::from_slice(&msg.msg).unwrap();
match msg {
messages::ProcessorMessage::KeyGen(messages::key_gen::ProcessorMessage::Commitments {
id: this_id,
commitments: _,
}) => {
assert_eq!(this_id, id);
}
_ => panic!("processor didn't return Commitments in response to GenerateKey"),
}
coordinator.ack(0).await;
});
(compositions[1].handle(), coord_key, compositions)
}
#[cfg(test)]
mod tests;

View file

@ -0,0 +1,190 @@
use std::collections::HashMap;
use zeroize::Zeroizing;
use ciphersuite::{Ciphersuite, Ristretto};
use dkg::{Participant, ThresholdParams, tests::clone_without};
use serai_primitives::NetworkId;
use serai_validator_sets_primitives::{Session, ValidatorSet};
use serai_message_queue::{Service, Metadata, client::MessageQueue};
use dockertest::{DockerOperations, DockerTest};
use crate::*;
const COORDINATORS: usize = 4;
const THRESHOLD: usize = ((COORDINATORS * 2) / 3) + 1;
fn coordinator_queue(
ops: &DockerOperations,
handle: String,
coord_key: <Ristretto as Ciphersuite>::F,
) -> MessageQueue {
let rpc = ops.handle(&handle).host_port(2287).unwrap();
let rpc = rpc.0.to_string() + ":" + &rpc.1.to_string();
MessageQueue::new(Service::Coordinator, rpc, Zeroizing::new(coord_key))
}
// Receive a message from a processor via its coordinator
async fn recv_message(
coordinator: &MessageQueue,
from: NetworkId,
id: u64,
) -> messages::ProcessorMessage {
let msg =
tokio::time::timeout(core::time::Duration::from_secs(10), coordinator.next(id)).await.unwrap();
assert_eq!(msg.from, Service::Processor(from));
assert_eq!(msg.id, id);
coordinator.ack(id).await;
serde_json::from_slice(&msg.msg).unwrap()
}
// Perform an interaction with all processors via their coordinators
async fn interact_with_all<
FS: Fn(Participant) -> messages::key_gen::CoordinatorMessage,
FR: FnMut(Participant, messages::key_gen::ProcessorMessage),
>(
id: u64,
coordinators: &[MessageQueue],
network: NetworkId,
message: FS,
mut recv: FR,
) {
for (i, coordinator) in coordinators.iter().enumerate() {
let participant = Participant::new(u16::try_from(i + 1).unwrap()).unwrap();
coordinator
.queue(
Metadata {
from: Service::Coordinator,
to: Service::Processor(network),
intent: id.to_le_bytes().to_vec(),
},
serde_json::to_string(&messages::CoordinatorMessage::KeyGen(message(participant)))
.unwrap()
.into_bytes(),
)
.await;
match recv_message(coordinator, network, id).await {
messages::ProcessorMessage::KeyGen(msg) => recv(participant, msg),
_ => panic!("processor didn't return KeyGen message"),
}
}
}
#[test]
fn key_gen() {
for network in [NetworkId::Bitcoin, NetworkId::Monero] {
let mut coordinators = vec![];
let mut test = DockerTest::new();
for _ in 0 .. COORDINATORS {
let (coord_handle, coord_key, compositions) = processor_stack(network);
coordinators.push((coord_handle, coord_key));
for composition in compositions {
test.add_composition(composition);
}
}
test.run(|ops| async move {
// Sleep for a second for the message-queue to boot
// It isn't an error to start immediately, it just silences an error
tokio::time::sleep(core::time::Duration::from_secs(1)).await;
// Connect to the Message Queues as the coordinator
let coordinators = coordinators
.into_iter()
.map(|(handle, key)| coordinator_queue(&ops, handle, key))
.collect::<Vec<_>>();
// Order a key gen
let id = messages::key_gen::KeyGenId {
set: ValidatorSet { session: Session(0), network },
attempt: 0,
};
let mut commitments = HashMap::new();
interact_with_all(
0,
&coordinators,
network,
|participant| messages::key_gen::CoordinatorMessage::GenerateKey {
id,
params: ThresholdParams::new(
u16::try_from(THRESHOLD).unwrap(),
u16::try_from(COORDINATORS).unwrap(),
participant,
)
.unwrap(),
},
|participant, msg| match msg {
messages::key_gen::ProcessorMessage::Commitments {
id: this_id,
commitments: these_commitments,
} => {
assert_eq!(this_id, id);
commitments.insert(participant, these_commitments);
}
_ => panic!("processor didn't return Commitments in response to GenerateKey"),
},
)
.await;
// Send the commitments to all parties
let mut shares = HashMap::new();
interact_with_all(
1,
&coordinators,
network,
|participant| messages::key_gen::CoordinatorMessage::Commitments {
id,
commitments: clone_without(&commitments, &participant),
},
|participant, msg| match msg {
messages::key_gen::ProcessorMessage::Shares { id: this_id, shares: these_shares } => {
assert_eq!(this_id, id);
shares.insert(participant, these_shares);
}
_ => panic!("processor didn't return Shares in response to GenerateKey"),
},
)
.await;
// Send the shares
let mut substrate_key = None;
let mut coin_key = None;
interact_with_all(
2,
&coordinators,
network,
|participant| messages::key_gen::CoordinatorMessage::Shares {
id,
shares: shares
.iter()
.filter_map(|(this_participant, shares)| {
shares.get(&participant).cloned().map(|share| (*this_participant, share))
})
.collect(),
},
|_, msg| match msg {
messages::key_gen::ProcessorMessage::GeneratedKeyPair {
id: this_id,
substrate_key: this_substrate_key,
coin_key: this_coin_key,
} => {
assert_eq!(this_id, id);
if substrate_key.is_none() {
substrate_key = Some(this_substrate_key);
coin_key = Some(this_coin_key.clone());
}
assert_eq!(substrate_key.unwrap(), this_substrate_key);
assert_eq!(coin_key.as_ref().unwrap(), &this_coin_key);
}
_ => panic!("processor didn't return GeneratedKeyPair in response to GenerateKey"),
},
)
.await;
});
}
}