E2E test coordinator KeyGen

This commit is contained in:
Luke Parker 2023-08-14 06:54:04 -04:00
parent acc19e2817
commit 666bb3e96b
No known key found for this signature in database
3 changed files with 145 additions and 28 deletions

1
.gitignore vendored
View file

@ -1,2 +1,3 @@
target
.vscode
.test-logs

View file

@ -1,6 +1,10 @@
#![allow(clippy::needless_pass_by_ref_mut)] // False positives
use std::sync::{OnceLock, Mutex};
use std::{
sync::{OnceLock, Mutex},
time::Duration,
fs,
};
use zeroize::Zeroizing;
@ -77,32 +81,44 @@ pub fn coordinator_stack(name: &str) -> (Handles, <Ristretto as Ciphersuite>::F,
// Give every item in this stack a unique ID
// Uses a Mutex as we can't generate a 8-byte random ID without hitting hostname length limits
let unique_id = {
let (first, unique_id) = {
let unique_id_mutex = UNIQUE_ID.get_or_init(|| Mutex::new(0));
let mut unique_id_lock = unique_id_mutex.lock().unwrap();
let first = *unique_id_lock == 0;
let unique_id = hex::encode(unique_id_lock.to_be_bytes());
*unique_id_lock += 1;
unique_id
(first, unique_id)
};
let logs_path = [std::env::current_dir().unwrap().to_str().unwrap(), ".test-logs", "coordinator"]
.iter()
.collect::<std::path::PathBuf>();
if first {
let _ = fs::remove_dir_all(&logs_path);
fs::create_dir_all(&logs_path).expect("couldn't create logs directory");
assert!(
fs::read_dir(&logs_path).expect("couldn't read the logs folder").next().is_none(),
"logs folder wasn't empty, despite removing it at the start of the run",
);
}
let logs_path = logs_path.to_str().unwrap().to_string();
let mut compositions = vec![];
let mut handles = vec![];
for composition in [serai_composition, message_queue_composition, coordinator_composition] {
let handle = composition.handle();
let name = format!("{}-{}", composition.handle(), &unique_id);
compositions.push(
composition
.with_start_policy(StartPolicy::Strict)
.with_container_name(format!("{handle}-{}", &unique_id))
.with_container_name(name.clone())
.with_log_options(Some(LogOptions {
action: LogAction::Forward,
policy: if handle.contains("coordinator") {
LogPolicy::Always
} else {
LogPolicy::OnError
},
action: LogAction::ForwardToFile { path: logs_path.clone() },
policy: LogPolicy::Always,
source: LogSource::Both,
})),
);
handles.push(compositions.last().unwrap().handle());
}
@ -147,7 +163,7 @@ impl Processor {
let serai_rpc = format!("ws://{}:{}", serai_rpc.0, serai_rpc.1);
// Bound execution to 60 seconds
for _ in 0 .. 60 {
tokio::time::sleep(core::time::Duration::from_secs(1)).await;
tokio::time::sleep(Duration::from_secs(1)).await;
let Ok(client) = serai_client::Serai::new(&serai_rpc).await else { continue };
if client.get_latest_block_hash().await.is_err() {
continue;
@ -194,10 +210,9 @@ impl Processor {
/// Receive a message from a processor as its coordinator.
pub async fn recv_message(&mut self) -> CoordinatorMessage {
let msg =
tokio::time::timeout(core::time::Duration::from_secs(10), self.queue.next(self.next_recv_id))
.await
.unwrap();
let msg = tokio::time::timeout(Duration::from_secs(10), self.queue.next(self.next_recv_id))
.await
.unwrap();
assert_eq!(msg.from, Service::Coordinator);
assert_eq!(msg.id, self.next_recv_id);
self.queue.ack(self.next_recv_id).await;

View file

@ -1,4 +1,7 @@
use std::time::Duration;
use std::{
time::{Duration, SystemTime},
collections::HashMap,
};
use ciphersuite::{Ciphersuite, Ristretto};
use dkg::{Participant, ThresholdParams};
@ -38,13 +41,16 @@ fn new_test() -> (Vec<(Handles, <Ristretto as Ciphersuite>::F)>, DockerTest) {
}
#[tokio::test]
async fn stack_test() {
async fn key_gen_test() {
let (processors, test) = new_test();
let participant_from_i = |i: usize| Participant::new(u16::try_from(i + 1).unwrap()).unwrap();
test
.run_async(|ops| async move {
// Wait for the Serai node to boot
tokio::time::sleep(Duration::from_secs(30)).await;
// Wait for the Serai node to boot, and for the Tendermint chain to get past the first block
// TODO: Replace this with a Coordinator RPC
tokio::time::sleep(Duration::from_secs(150)).await;
// Connect to the Message Queues as the processor
let mut new_processors: Vec<Processor> = vec![];
@ -53,25 +59,120 @@ async fn stack_test() {
}
let mut processors = new_processors;
let set = ValidatorSet { session: Session(0), network: NetworkId::Bitcoin };
let id = KeyGenId { set, attempt: 0 };
for (i, processor) in processors.iter_mut().enumerate() {
assert_eq!(
processor.recv_message().await,
CoordinatorMessage::KeyGen(messages::key_gen::CoordinatorMessage::GenerateKey {
id: KeyGenId {
set: ValidatorSet { session: Session(0), network: NetworkId::Bitcoin },
attempt: 0
},
id,
params: ThresholdParams::new(
3,
4,
Participant::new(u16::try_from(i).unwrap() + 1).unwrap()
u16::try_from(((COORDINATORS * 2) / 3) + 1).unwrap(),
u16::try_from(COORDINATORS).unwrap(),
participant_from_i(i),
)
.unwrap()
})
);
processor
.send_message(messages::key_gen::ProcessorMessage::Commitments {
id,
commitments: vec![u8::try_from(i).unwrap()],
})
.await;
}
tokio::time::sleep(Duration::from_secs(30)).await;
// Sleep for 20s to give everything processing time
tokio::time::sleep(Duration::from_secs(20)).await;
for (i, processor) in processors.iter_mut().enumerate() {
let mut commitments = (0 .. u8::try_from(COORDINATORS).unwrap())
.map(|l| (participant_from_i(l.into()), vec![l]))
.collect::<HashMap<_, _>>();
commitments.remove(&participant_from_i(i));
assert_eq!(
processor.recv_message().await,
CoordinatorMessage::KeyGen(messages::key_gen::CoordinatorMessage::Commitments {
id,
commitments,
})
);
// from (0 .. n), to (1 ..= n)
let mut shares = (0 .. u8::try_from(COORDINATORS).unwrap())
.map(|l| (participant_from_i(l.into()), vec![u8::try_from(i).unwrap(), l + 1]))
.collect::<HashMap<_, _>>();
let i = participant_from_i(i);
shares.remove(&i);
processor.send_message(messages::key_gen::ProcessorMessage::Shares { id, shares }).await;
}
tokio::time::sleep(Duration::from_secs(20)).await;
for (i, processor) in processors.iter_mut().enumerate() {
let i = participant_from_i(i);
assert_eq!(
processor.recv_message().await,
CoordinatorMessage::KeyGen(messages::key_gen::CoordinatorMessage::Shares {
id,
shares: {
let mut shares = (0 .. u8::try_from(COORDINATORS).unwrap())
.map(|l| {
(participant_from_i(l.into()), vec![l, u8::try_from(u16::from(i)).unwrap()])
})
.collect::<HashMap<_, _>>();
shares.remove(&i);
shares
},
})
);
processor
.send_message(messages::key_gen::ProcessorMessage::GeneratedKeyPair {
id,
substrate_key: [0xaa; 32],
network_key: b"network_key".to_vec(),
})
.await;
}
// Sleeps for longer since we need to wait for a Substrate block as well
tokio::time::sleep(Duration::from_secs(60)).await;
let mut message = None;
for processor in processors.iter_mut() {
let msg = processor.recv_message().await;
if message.is_none() {
match msg {
CoordinatorMessage::Substrate(
messages::substrate::CoordinatorMessage::ConfirmKeyPair {
context,
set: this_set,
ref key_pair,
},
) => {
assert!(
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs()
.abs_diff(context.serai_time) <
70
);
assert_eq!(context.network_latest_finalized_block.0, [0; 32]);
assert_eq!(set, this_set);
assert_eq!(key_pair.0 .0, [0xaa; 32]);
assert_eq!(key_pair.1.to_vec(), b"network_key".to_vec());
}
_ => panic!("coordinator didn't respond with ConfirmKeyPair"),
}
message = Some(msg);
} else {
assert_eq!(message, Some(msg));
}
}
})
.await;
// TODO: Check Substrate actually has this key pair
}