Test the Coordinator emits KeyGen

Mainly just a test that the full stack is properly set up and we've hit basic
functioning for further testing.
This commit is contained in:
Luke Parker 2023-08-06 12:38:44 -04:00
parent bebe2fae0e
commit cee788eac3
No known key found for this signature in database
7 changed files with 224 additions and 35 deletions
Cargo.lock
coordinator/src
main.rs
substrate
processor/src
tests/coordinator

2
Cargo.lock generated
View file

@ -7975,6 +7975,7 @@ name = "serai-coordinator-tests"
version = "0.1.0"
dependencies = [
"ciphersuite",
"dkg",
"dockertest",
"hex",
"serai-client",
@ -7985,6 +7986,7 @@ dependencies = [
"serde",
"serde_json",
"tokio",
"zeroize",
]
[[package]]

View file

@ -9,10 +9,10 @@ use std::{
collections::{VecDeque, HashMap},
};
use zeroize::Zeroizing;
use zeroize::{Zeroize, Zeroizing};
use rand_core::OsRng;
use ciphersuite::{group::ff::Field, Ciphersuite, Ristretto};
use ciphersuite::{group::ff::PrimeField, Ciphersuite, Ristretto};
use serai_db::{DbTxn, Db};
use serai_env as env;
@ -684,17 +684,31 @@ async fn main() {
let db = serai_db::new_rocksdb(&env::var("DB_PATH").expect("path to DB wasn't specified"));
let key = Zeroizing::new(<Ristretto as Ciphersuite>::F::ZERO); // TODO
let key = {
let mut key_hex = serai_env::var("SERAI_KEY").expect("Serai key wasn't provided");
let mut key_vec = hex::decode(&key_hex).map_err(|_| ()).expect("Serai key wasn't hex-encoded");
key_hex.zeroize();
if key_vec.len() != 32 {
key_vec.zeroize();
panic!("Serai key had an invalid length");
}
let mut key_bytes = [0; 32];
key_bytes.copy_from_slice(&key_vec);
key_vec.zeroize();
let key = Zeroizing::new(<Ristretto as Ciphersuite>::F::from_repr(key_bytes).unwrap());
key_bytes.zeroize();
key
};
let p2p = LocalP2p::new(1).swap_remove(0); // TODO
let processors = Arc::new(MessageQueue::from_env(Service::Coordinator));
let serai = || async {
loop {
let Ok(serai) = Serai::new(&dbg!(format!(
let Ok(serai) = Serai::new(&format!(
"ws://{}:9944",
serai_env::var("SERAI_HOSTNAME").expect("Serai hostname wasn't provided")
)))
))
.await
else {
log::error!("couldn't connect to the Serai node");

View file

@ -1,4 +1,4 @@
use core::{ops::Deref, future::Future};
use core::{ops::Deref, time::Duration, future::Future};
use std::collections::{HashSet, HashMap};
use zeroize::Zeroizing;
@ -21,6 +21,8 @@ use serai_db::DbTxn;
use processor_messages::{SubstrateContext, key_gen::KeyGenId, CoordinatorMessage};
use tokio::time::sleep;
use crate::{Db, processors::Processors, tributary::TributarySpec};
mod db;
@ -53,9 +55,24 @@ async fn handle_new_set<
set: ValidatorSet,
) -> Result<(), SeraiError> {
if in_set(key, serai, set).await?.expect("NewSet for set which doesn't exist") {
log::info!("present in set {:?}", set);
let set_data = serai.get_validator_set(set).await?.expect("NewSet for set which doesn't exist");
let spec = TributarySpec::new(block.hash(), block.time().unwrap(), set, set_data);
let time = if let Ok(time) = block.time() {
time
} else {
assert_eq!(block.number(), 0);
// Use the next block's time
loop {
let Ok(Some(res)) = serai.get_block_by_number(1).await else {
sleep(Duration::from_secs(5)).await;
continue;
};
break res.time().unwrap();
}
};
let spec = TributarySpec::new(block.hash(), time, set, set_data);
create_new_tributary(db, spec.clone());
// Trigger a DKG
@ -79,6 +96,8 @@ async fn handle_new_set<
}),
)
.await;
} else {
log::info!("not present in set {:?}", set);
}
Ok(())
@ -241,6 +260,7 @@ async fn handle_block<
// stable)
if !SubstrateDb::<D>::handled_event(&db.0, hash, event_id) {
if let ValidatorSetsEvent::NewSet { set } = new_set {
log::info!("found fresh new set event {:?}", new_set);
handle_new_set(
&mut db.0,
key,
@ -264,6 +284,7 @@ async fn handle_block<
// If a key pair was confirmed, inform the processor
for key_gen in serai.get_key_gen_events(hash).await? {
if !SubstrateDb::<D>::handled_event(&db.0, hash, event_id) {
log::info!("found fresh key gen event {:?}", key_gen);
if let ValidatorSetsEvent::KeyGen { set, key_pair } = key_gen {
handle_key_gen(key, processors, serai, &block, set, key_pair).await?;
} else {

View file

@ -471,7 +471,12 @@ async fn boot<N: Network, D: Db>(
if entropy.len() != 64 {
panic!("entropy isn't the right length");
}
let bytes = Zeroizing::new(hex::decode(entropy).expect("entropy wasn't hex-formatted"));
let mut bytes =
Zeroizing::new(hex::decode(entropy).map_err(|_| ()).expect("entropy wasn't hex-formatted"));
if bytes.len() != 32 {
bytes.zeroize();
panic!("entropy wasn't 32 bytes");
}
let mut entropy = Zeroizing::new([0; 32]);
let entropy_mut: &mut [u8] = entropy.as_mut();
entropy_mut.copy_from_slice(bytes.as_ref());

View file

@ -16,13 +16,19 @@ rustdoc-args = ["--cfg", "docsrs"]
[dependencies]
hex = "0.4"
zeroize = { version = "1", default-features = false }
ciphersuite = { path = "../../crypto/ciphersuite", default-features = false, features = ["ristretto"] }
dkg = { path = "../../crypto/dkg", default-features = false, features = ["tests"] }
messages = { package = "serai-processor-messages", path = "../../processor/messages" }
serai-client = { path = "../../substrate/client" }
serai-client = { path = "../../substrate/client", features = ["serai"] }
serai-message-queue = { path = "../../message-queue" }
serde = { version = "1", default-features = false }
serde_json = { version = "1", default-features = false }
tokio = { version = "1", features = ["time"] }
dockertest = "0.3"

View file

@ -2,12 +2,18 @@
use std::sync::{OnceLock, Mutex};
use zeroize::Zeroizing;
use ciphersuite::{group::ff::PrimeField, Ciphersuite, Ristretto};
use serai_client::primitives::NetworkId;
use messages::{CoordinatorMessage, ProcessorMessage};
use serai_message_queue::{Service, Metadata, client::MessageQueue};
use dockertest::{
PullPolicy, Image, LogAction, LogPolicy, LogSource, LogOptions, StartPolicy, Composition,
DockerOperations,
};
#[cfg(test)]
@ -15,7 +21,10 @@ mod tests;
static UNIQUE_ID: OnceLock<Mutex<u16>> = OnceLock::new();
pub fn coordinator_instance(message_queue_key: <Ristretto as Ciphersuite>::F) -> Composition {
pub fn coordinator_instance(
name: &str,
message_queue_key: <Ristretto as Ciphersuite>::F,
) -> Composition {
serai_docker_tests::build("coordinator".to_string());
Composition::with_image(
@ -25,6 +34,10 @@ pub fn coordinator_instance(message_queue_key: <Ristretto as Ciphersuite>::F) ->
[
("MESSAGE_QUEUE_KEY".to_string(), hex::encode(message_queue_key.to_repr())),
("DB_PATH".to_string(), "./coordinator-db".to_string()),
("SERAI_KEY".to_string(), {
use serai_client::primitives::insecure_pair_from_name;
hex::encode(insecure_pair_from_name(name).as_ref().secret.to_bytes()[.. 32].as_ref())
}),
]
.into(),
)
@ -33,16 +46,20 @@ pub fn coordinator_instance(message_queue_key: <Ristretto as Ciphersuite>::F) ->
pub fn serai_composition(name: &str) -> Composition {
serai_docker_tests::build("serai".to_string());
Composition::with_image(Image::with_repository("serai-dev-serai").pull_policy(PullPolicy::Never))
.with_cmd(vec![
"serai-node".to_string(),
"--unsafe-rpc-external".to_string(),
"--rpc-cors".to_string(),
"all".to_string(),
"--chain".to_string(),
"devnet".to_string(),
format!("--{name}"),
])
let mut composition = Composition::with_image(
Image::with_repository("serai-dev-serai").pull_policy(PullPolicy::Never),
)
.with_cmd(vec![
"serai-node".to_string(),
"--unsafe-rpc-external".to_string(),
"--rpc-cors".to_string(),
"all".to_string(),
"--chain".to_string(),
"local".to_string(),
format!("--{}", name.to_lowercase()),
]);
composition.publish_all_ports();
composition
}
pub type Handles = (String, String, String);
@ -52,7 +69,7 @@ pub fn coordinator_stack(name: &str) -> (Handles, <Ristretto as Ciphersuite>::F,
let (coord_key, message_queue_keys, message_queue_composition) =
serai_message_queue_tests::instance();
let coordinator_composition = coordinator_instance(message_queue_keys[&NetworkId::Bitcoin]);
let coordinator_composition = coordinator_instance(name, coord_key);
// Give every item in this stack a unique ID
// Uses a Mutex as we can't generate a 8-byte random ID without hitting hostname length limits
@ -91,7 +108,96 @@ pub fn coordinator_stack(name: &str) -> (Handles, <Ristretto as Ciphersuite>::F,
(
(compositions[0].handle(), compositions[1].handle(), compositions[2].handle()),
coord_key,
message_queue_keys[&NetworkId::Bitcoin],
compositions,
)
}
pub struct Processor {
network: NetworkId,
#[allow(unused)]
serai_handle: String,
#[allow(unused)]
message_queue_handle: String,
#[allow(unused)]
coordinator_handle: String,
next_send_id: u64,
next_recv_id: u64,
queue: MessageQueue,
}
impl Processor {
pub async fn new(
network: NetworkId,
ops: &DockerOperations,
handles: (String, String, String),
processor_key: <Ristretto as Ciphersuite>::F,
) -> Processor {
let message_queue_rpc = ops.handle(&handles.1).host_port(2287).unwrap();
let message_queue_rpc = format!("{}:{}", message_queue_rpc.0, message_queue_rpc.1);
// Sleep until the Substrate RPC starts
let serai_rpc = ops.handle(&handles.0).host_port(9944).unwrap();
let serai_rpc = format!("ws://{}:{}", serai_rpc.0, serai_rpc.1);
// Bound execution to 60 seconds
for _ in 0 .. 60 {
tokio::time::sleep(core::time::Duration::from_secs(1)).await;
let Ok(client) = serai_client::Serai::new(&serai_rpc).await else { continue };
if client.get_latest_block_hash().await.is_err() {
continue;
}
break;
}
// The Serai RPC may or may not be started
// Assume it is and continue, so if it's a few seconds late, it's still within tolerance
Processor {
network,
serai_handle: handles.0,
message_queue_handle: handles.1,
coordinator_handle: handles.2,
next_send_id: 0,
next_recv_id: 0,
queue: MessageQueue::new(
Service::Processor(network),
message_queue_rpc,
Zeroizing::new(processor_key),
),
}
}
/// Send a message to a processor as its coordinator.
pub async fn send_message(&mut self, msg: impl Into<ProcessorMessage>) {
let msg: ProcessorMessage = msg.into();
self
.queue
.queue(
Metadata {
from: Service::Processor(self.network),
to: Service::Coordinator,
intent: msg.intent(),
},
serde_json::to_string(&msg).unwrap().into_bytes(),
)
.await;
self.next_send_id += 1;
}
/// Receive a message from a processor as its coordinator.
pub async fn recv_message(&mut self) -> CoordinatorMessage {
let msg =
tokio::time::timeout(core::time::Duration::from_secs(10), self.queue.next(self.next_recv_id))
.await
.unwrap();
assert_eq!(msg.from, Service::Coordinator);
assert_eq!(msg.id, self.next_recv_id);
self.queue.ack(self.next_recv_id).await;
self.next_recv_id += 1;
serde_json::from_slice(&msg.msg).unwrap()
}
}

View file

@ -1,12 +1,19 @@
use std::time::Duration;
use ciphersuite::{Ciphersuite, Ristretto};
use dkg::{Participant, ThresholdParams};
use serai_client::{
primitives::NetworkId,
validator_sets::primitives::{Session, ValidatorSet},
};
use messages::{key_gen::KeyGenId, CoordinatorMessage};
use dockertest::DockerTest;
use crate::*;
pub(crate) const COORDINATORS: usize = 4;
pub(crate) const COORDINATORS: usize = 3;
// pub(crate) const THRESHOLD: usize = ((COORDINATORS * 2) / 3) + 1;
fn new_test() -> (Vec<(Handles, <Ristretto as Ciphersuite>::F)>, DockerTest) {
@ -14,12 +21,12 @@ fn new_test() -> (Vec<(Handles, <Ristretto as Ciphersuite>::F)>, DockerTest) {
let mut test = DockerTest::new();
for i in 0 .. COORDINATORS {
let (handles, coord_key, compositions) = coordinator_stack(match i {
0 => "alice",
1 => "bob",
2 => "charlie",
3 => "dave",
4 => "eve",
5 => "ferdie",
0 => "Alice",
1 => "Bob",
2 => "Charlie",
3 => "Dave",
4 => "Eve",
5 => "Ferdie",
_ => panic!("needed a 6th name for a serai node"),
});
coordinators.push((handles, coord_key));
@ -30,11 +37,39 @@ fn new_test() -> (Vec<(Handles, <Ristretto as Ciphersuite>::F)>, DockerTest) {
(coordinators, test)
}
#[test]
fn stack_test() {
let (_coordinators, test) = new_test();
#[tokio::test]
async fn stack_test() {
let (processors, test) = new_test();
test.run(|_ops| async move {
tokio::time::sleep(Duration::from_secs(30)).await;
});
test
.run_async(|ops| async move {
// Wait for the Serai node to boot
tokio::time::sleep(Duration::from_secs(30)).await;
// Connect to the Message Queues as the processor
let mut new_processors: Vec<Processor> = vec![];
for (handles, key) in processors {
new_processors.push(Processor::new(NetworkId::Bitcoin, &ops, handles, key).await);
}
let mut processors = new_processors;
for (i, processor) in processors.iter_mut().enumerate() {
assert_eq!(
processor.recv_message().await,
CoordinatorMessage::KeyGen(messages::key_gen::CoordinatorMessage::GenerateKey {
id: KeyGenId {
set: ValidatorSet { session: Session(0), network: NetworkId::Bitcoin },
attempt: 0
},
params: ThresholdParams::new(
3,
3,
Participant::new(u16::try_from(i).unwrap() + 1).unwrap()
)
.unwrap()
})
);
}
})
.await;
}