Test multiple batches, re-attempts, randomized selected signers

This commit is contained in:
Luke Parker 2023-07-26 05:55:47 -04:00
parent b205391b28
commit 7823ece4fe
No known key found for this signature in database
3 changed files with 181 additions and 108 deletions
tests/processor/src

View file

@ -252,10 +252,14 @@ impl Coordinator {
use monero_serai::rpc::HttpRpc;
let rpc = HttpRpc::new(rpc_url).expect("couldn't connect to the coordinator's Monero RPC");
let _: EmptyResponse = rpc
let res: serde_json::Value = rpc
.json_rpc_call("submit_block", Some(serde_json::json!([hex::encode(block)])))
.await
.unwrap();
let err = res.get("error");
if err.is_some() && (err.unwrap() != &serde_json::Value::Null) {
panic!("failed to submit Monero block: {res}");
}
}
NetworkId::Serai => panic!("processor tests broadcasting block to Serai"),
}

View file

@ -228,6 +228,7 @@ impl Wallet {
Transaction,
};
const AMOUNT: u64 = 100000000;
let mut tx = Transaction {
version: 2,
lock_time: LockTime::ZERO,
@ -237,13 +238,19 @@ impl Wallet {
sequence: Sequence(u32::MAX),
witness: Witness::default(),
}],
output: vec![TxOut {
value: input_tx.output[0].value - 10000,
script_pubkey: Payload::p2tr_tweaked(TweakedPublicKey::dangerous_assume_tweaked(
XOnlyPublicKey::from_slice(&to[1 ..]).unwrap(),
))
.script_pubkey(),
}],
output: vec![
TxOut {
value: input_tx.output[0].value - AMOUNT - 10000,
script_pubkey: input_tx.output[0].script_pubkey.clone(),
},
TxOut {
value: AMOUNT,
script_pubkey: Payload::p2tr_tweaked(TweakedPublicKey::dangerous_assume_tweaked(
XOnlyPublicKey::from_slice(&to[1 ..]).unwrap(),
))
.script_pubkey(),
},
],
};
let mut der = SECP256K1

View file

@ -2,13 +2,135 @@ use std::collections::HashMap;
use dkg::{Participant, tests::clone_without};
use messages::sign::SignId;
use serai_primitives::{NetworkId, BlockHash, crypto::RuntimePublic, PublicKey};
use serai_in_instructions_primitives::batch_message;
use serai_in_instructions_primitives::{SignedBatch, batch_message};
use dockertest::DockerTest;
use crate::{*, tests::*};
pub(crate) async fn recv_batch_preprocesses(
coordinators: &mut [Coordinator],
key: [u8; 32],
attempt: u32,
) -> (SignId, HashMap<Participant, Vec<u8>>) {
let mut id = None;
let mut preprocesses = HashMap::new();
for (i, coordinator) in coordinators.iter_mut().enumerate() {
let i = Participant::new(u16::try_from(i).unwrap() + 1).unwrap();
let msg = coordinator.recv_message().await;
match msg {
messages::ProcessorMessage::Coordinator(
messages::coordinator::ProcessorMessage::BatchPreprocess { id: this_id, preprocess },
) => {
if id.is_none() {
assert_eq!(&this_id.key, &key);
assert_eq!(this_id.attempt, attempt);
id = Some(this_id.clone());
}
assert_eq!(&this_id, id.as_ref().unwrap());
preprocesses.insert(i, preprocess);
}
_ => panic!("processor didn't send batch preprocess"),
}
}
// Reduce the preprocesses down to the threshold
while preprocesses.len() > THRESHOLD {
preprocesses.remove(
&Participant::new(
u16::try_from(OsRng.next_u64() % u64::try_from(COORDINATORS).unwrap()).unwrap() + 1,
)
.unwrap(),
);
}
(id.unwrap(), preprocesses)
}
pub(crate) async fn sign_batch(
coordinators: &mut [Coordinator],
id: SignId,
preprocesses: HashMap<Participant, Vec<u8>>,
) -> SignedBatch {
assert_eq!(preprocesses.len(), THRESHOLD);
for (i, coordinator) in coordinators.iter_mut().enumerate() {
let i = Participant::new(u16::try_from(i).unwrap() + 1).unwrap();
if preprocesses.contains_key(&i) {
coordinator
.send_message(messages::coordinator::CoordinatorMessage::BatchPreprocesses {
id: id.clone(),
preprocesses: clone_without(&preprocesses, &i),
})
.await;
}
}
let mut shares = HashMap::new();
for (i, coordinator) in coordinators.iter_mut().enumerate() {
let i = Participant::new(u16::try_from(i).unwrap() + 1).unwrap();
if preprocesses.contains_key(&i) {
match coordinator.recv_message().await {
messages::ProcessorMessage::Coordinator(
messages::coordinator::ProcessorMessage::BatchShare { id: this_id, share },
) => {
assert_eq!(&this_id, &id);
shares.insert(i, share);
}
_ => panic!("processor didn't send batch share"),
}
}
}
for (i, coordinator) in coordinators.iter_mut().enumerate() {
let i = Participant::new(u16::try_from(i).unwrap() + 1).unwrap();
if preprocesses.contains_key(&i) {
coordinator
.send_message(messages::coordinator::CoordinatorMessage::BatchShares {
id: id.clone(),
shares: clone_without(&shares, &i),
})
.await;
}
}
// The selected processors should yield the batch
let mut batch = None;
for (i, coordinator) in coordinators.iter_mut().enumerate() {
let i = Participant::new(u16::try_from(i).unwrap() + 1).unwrap();
if preprocesses.contains_key(&i) {
match coordinator.recv_message().await {
messages::ProcessorMessage::Substrate(messages::substrate::ProcessorMessage::Update {
key,
batch: this_batch,
}) => {
assert_eq!(&key, &id.key);
if batch.is_none() {
assert!(PublicKey::from_raw(id.key.clone().try_into().unwrap())
.verify(&batch_message(&this_batch.batch), &this_batch.signature));
batch = Some(this_batch.clone());
}
assert_eq!(batch.as_ref().unwrap(), &this_batch);
}
_ => panic!("processor didn't send batch"),
}
}
}
batch.unwrap()
}
#[test]
fn batch_test() {
for network in [NetworkId::Bitcoin, NetworkId::Monero] {
@ -46,115 +168,55 @@ fn batch_test() {
}
coordinators[0].sync(&ops, &coordinators[1 ..]).await;
// Send into the processor's wallet
let tx = wallet.send_to_address(&ops, &key_pair.1).await;
for coordinator in &coordinators {
coordinator.publish_transacton(&ops, &tx).await;
}
// Put the TX past the confirmation depth
let mut block_with_tx = None;
for _ in 0 .. confirmations(network) {
let (hash, _) = coordinators[0].add_block(&ops).await;
if block_with_tx.is_none() {
block_with_tx = Some(hash);
for i in 0 .. 2 {
// Send into the processor's wallet
let tx = wallet.send_to_address(&ops, &key_pair.1).await;
for coordinator in &mut coordinators {
coordinator.publish_transacton(&ops, &tx).await;
}
}
coordinators[0].sync(&ops, &coordinators[1 ..]).await;
// Sleep for 10s
// The scanner works on a 5s interval, so this leaves a few s for any processing/latency
tokio::time::sleep(core::time::Duration::from_secs(10)).await;
// Make sure the proceessors picked it up by checking they're trying to sign a batch for it
let mut id = None;
let mut preprocesses = HashMap::new();
for (i, coordinator) in coordinators.iter_mut().enumerate() {
// Only use their preprocess if they're within the threshold
let i = if i < THRESHOLD {
Some(Participant::new(u16::try_from(i).unwrap() + 1).unwrap())
} else {
None
};
let msg = coordinator.recv_message().await;
match msg {
messages::ProcessorMessage::Coordinator(
messages::coordinator::ProcessorMessage::BatchPreprocess { id: this_id, preprocess },
) => {
assert_eq!(&this_id.key, &key_pair.0 .0);
assert_eq!(this_id.attempt, 0);
if id.is_none() {
id = Some(this_id.clone());
}
assert_eq!(&this_id, id.as_ref().unwrap());
if let Some(i) = i {
preprocesses.insert(i, preprocess);
}
// Put the TX past the confirmation depth
let mut block_with_tx = None;
for _ in 0 .. confirmations(network) {
let (hash, _) = coordinators[0].add_block(&ops).await;
if block_with_tx.is_none() {
block_with_tx = Some(hash);
}
_ => panic!("processor didn't send batch preprocess"),
}
}
let id = id.unwrap();
coordinators[0].sync(&ops, &coordinators[1 ..]).await;
// Continue with batch siging by sending the preprocesses to selected parties
for (i, coordinator) in coordinators.iter_mut().enumerate().take(THRESHOLD) {
let i = Participant::new(u16::try_from(i).unwrap() + 1).unwrap();
// Sleep for 10s
// The scanner works on a 5s interval, so this leaves a few s for any processing/latency
tokio::time::sleep(core::time::Duration::from_secs(10)).await;
coordinator
.send_message(messages::coordinator::CoordinatorMessage::BatchPreprocesses {
id: id.clone(),
preprocesses: clone_without(&preprocesses, &i),
})
.await;
}
let mut shares = HashMap::new();
for (i, coordinator) in coordinators.iter_mut().enumerate().take(THRESHOLD) {
let i = Participant::new(u16::try_from(i).unwrap() + 1).unwrap();
match coordinator.recv_message().await {
messages::ProcessorMessage::Coordinator(
messages::coordinator::ProcessorMessage::BatchShare { id: this_id, share },
) => {
assert_eq!(&this_id, &id);
shares.insert(i, share);
// Make sure the proceessors picked it up by checking they're trying to sign a batch for it
let (mut id, mut preprocesses) =
recv_batch_preprocesses(&mut coordinators, key_pair.0 .0, 0).await;
// Trigger a random amount of re-attempts
for attempt in 1 ..= u32::try_from(OsRng.next_u64() % 4).unwrap() {
// TODO: Double check how the processor handles this ID field
// It should be able to assert its perfectly sequential
id.attempt = attempt;
for coordinator in coordinators.iter_mut() {
coordinator
.send_message(messages::coordinator::CoordinatorMessage::BatchReattempt {
id: id.clone(),
})
.await;
}
_ => panic!("processor didn't send batch share"),
(id, preprocesses) =
recv_batch_preprocesses(&mut coordinators, key_pair.0 .0, attempt).await;
}
}
for (i, coordinator) in coordinators.iter_mut().enumerate().take(THRESHOLD) {
let i = Participant::new(u16::try_from(i).unwrap() + 1).unwrap();
// Continue with signing the batch
let batch = sign_batch(&mut coordinators, id, preprocesses).await;
coordinator
.send_message(messages::coordinator::CoordinatorMessage::BatchShares {
id: id.clone(),
shares: clone_without(&shares, &i),
})
.await;
}
// The selected processors should yield the batch
for coordinator in coordinators.iter_mut().take(THRESHOLD) {
match coordinator.recv_message().await {
messages::ProcessorMessage::Substrate(
messages::substrate::ProcessorMessage::Update { key, batch },
) => {
assert_eq!(&key, &key_pair.0 .0);
assert_eq!(batch.batch.network, network);
assert_eq!(batch.batch.id, 0);
assert!(PublicKey::from_raw(key_pair.0 .0)
.verify(&batch_message(&batch.batch), &batch.signature));
assert_eq!(batch.batch.block, BlockHash(block_with_tx.unwrap()));
// This shouldn't have an instruction as we didn't add any data into the TX we sent
assert!(batch.batch.instructions.is_empty());
}
_ => panic!("processor didn't send batch"),
}
// Check it
assert_eq!(batch.batch.network, network);
assert_eq!(batch.batch.id, i);
assert_eq!(batch.batch.block, BlockHash(block_with_tx.unwrap()));
// This shouldn't have an instruction as we didn't add any data into the TX we sent
assert!(batch.batch.instructions.is_empty());
}
});
}