diff --git a/docs/processor/Processor.md b/docs/processor/Processor.md index 46dc8417..53ed7635 100644 --- a/docs/processor/Processor.md +++ b/docs/processor/Processor.md @@ -46,9 +46,9 @@ When the connected network has a new block, which is considered finalized of confirmations), it's scanned. Outputs to the key of Serai's multisig are saved to the database. Outputs which -newly transfer into Serai are used to build a `Batch` for the block. The +newly transfer into Serai are used to build `Batch`s for the block. The processor then begins a threshold signature protocol with its key pair's -Ristretto key to sign the batch. The protocol's preprocess is sent to the +Ristretto key to sign the `Batch`s. The protocol's preprocess is sent to the coordinator in a `coordinator::ProcessorMessage::BatchPreprocess`. As a design comment, we *may* be able to sign now possible, already scheduled, diff --git a/substrate/in-instructions/primitives/src/lib.rs b/substrate/in-instructions/primitives/src/lib.rs index f4a730e1..74349de9 100644 --- a/substrate/in-instructions/primitives/src/lib.rs +++ b/substrate/in-instructions/primitives/src/lib.rs @@ -82,6 +82,7 @@ impl Zeroize for SignedBatch { } } +// TODO: Make this an associated method? /// The message for the batch signature. pub fn batch_message(batch: &Batch) -> Vec { [b"InInstructions-batch".as_ref(), &batch.encode()].concat() diff --git a/tests/coordinator/Cargo.toml b/tests/coordinator/Cargo.toml index 0e94e9aa..71ec2e77 100644 --- a/tests/coordinator/Cargo.toml +++ b/tests/coordinator/Cargo.toml @@ -17,8 +17,10 @@ rustdoc-args = ["--cfg", "docsrs"] hex = "0.4" zeroize = { version = "1", default-features = false } +rand_core = { version = "0.6", default-features = false } ciphersuite = { path = "../../crypto/ciphersuite", default-features = false, features = ["ristretto"] } +schnorrkel = { git = "https://github.com/serai-dex/schnorrkel" } dkg = { path = "../../crypto/dkg", default-features = false, features = ["tests"] } messages = { package = "serai-processor-messages", path = "../../processor/messages" } diff --git a/tests/coordinator/src/tests/batch.rs b/tests/coordinator/src/tests/batch.rs new file mode 100644 index 00000000..602993ff --- /dev/null +++ b/tests/coordinator/src/tests/batch.rs @@ -0,0 +1,227 @@ +use std::{ + time::Duration, + collections::{HashSet, HashMap}, +}; + +use zeroize::Zeroizing; +use rand_core::{RngCore, OsRng}; + +use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto}; + +use dkg::Participant; + +use serai_client::{ + primitives::{NetworkId, BlockHash, Signature}, + in_instructions::{ + primitives::{Batch, SignedBatch, batch_message}, + InInstructionsEvent, + }, +}; +use messages::{sign::SignId, CoordinatorMessage}; + +use crate::{*, tests::*}; + +pub async fn batch( + processors: &mut [Processor], + substrate_key: &Zeroizing<::F>, +) { + let mut id = [0; 32]; + OsRng.fill_bytes(&mut id); + let id = SignId { key: vec![], id, attempt: 0 }; + + let block = BlockHash([0x22; 32]); + + // Select a random participant to sign first, guaranteeing their inclusion + let first_signer = + usize::try_from(OsRng.next_u64() % u64::try_from(processors.len()).unwrap()).unwrap(); + processors[first_signer] + .send_message(messages::coordinator::ProcessorMessage::BatchPreprocess { + id: id.clone(), + block, + preprocess: [u8::try_from(first_signer).unwrap(); 64].to_vec(), + }) + .await; + // Sleep twice as for some reason this specific statement hits some latency? + wait_for_tributary().await; + wait_for_tributary().await; + + // Send the rest of the preprocesses + for (i, processor) in processors.iter_mut().enumerate() { + if i == first_signer { + continue; + } + + processor + .send_message(messages::coordinator::ProcessorMessage::BatchPreprocess { + id: id.clone(), + block, + preprocess: [u8::try_from(i).unwrap(); 64].to_vec(), + }) + .await; + } + wait_for_tributary().await; + + // Read from the first signer to find out who was selected to sign + let first_preprocesses = processors[first_signer].recv_message().await; + let participants = match first_preprocesses { + CoordinatorMessage::Coordinator( + messages::coordinator::CoordinatorMessage::BatchPreprocesses { id: this_id, preprocesses }, + ) => { + assert_eq!(&id, &this_id); + assert_eq!(preprocesses.len(), THRESHOLD - 1); + assert!(!preprocesses + .contains_key(&Participant::new(u16::try_from(first_signer).unwrap() + 1).unwrap())); + + let mut participants = + preprocesses.keys().map(|p| usize::from(u16::from(*p)) - 1).collect::>(); + for (p, preprocess) in preprocesses { + assert_eq!(preprocess, vec![u8::try_from(u16::from(p)).unwrap() - 1; 64]); + } + participants.insert(first_signer); + participants + } + _ => panic!("coordinator didn't send back BatchPreprocesses"), + }; + + for i in participants.clone() { + if i == first_signer { + continue; + } + let processor = &mut processors[i]; + let mut preprocesses = participants + .clone() + .into_iter() + .map(|i| { + ( + Participant::new(u16::try_from(i + 1).unwrap()).unwrap(), + [u8::try_from(i).unwrap(); 64].to_vec(), + ) + }) + .collect::>(); + preprocesses.remove(&Participant::new(u16::try_from(i + 1).unwrap()).unwrap()); + + assert_eq!( + processor.recv_message().await, + CoordinatorMessage::Coordinator( + messages::coordinator::CoordinatorMessage::BatchPreprocesses { + id: id.clone(), + preprocesses + } + ) + ); + } + + for i in participants.clone() { + let processor = &mut processors[i]; + processor + .send_message(messages::coordinator::ProcessorMessage::BatchShare { + id: id.clone(), + share: [u8::try_from(i).unwrap(); 32], + }) + .await; + } + wait_for_tributary().await; + for i in participants.clone() { + let processor = &mut processors[i]; + let mut shares = participants + .clone() + .into_iter() + .map(|i| { + (Participant::new(u16::try_from(i + 1).unwrap()).unwrap(), [u8::try_from(i).unwrap(); 32]) + }) + .collect::>(); + shares.remove(&Participant::new(u16::try_from(i + 1).unwrap()).unwrap()); + + assert_eq!( + processor.recv_message().await, + CoordinatorMessage::Coordinator(messages::coordinator::CoordinatorMessage::BatchShares { + id: id.clone(), + shares, + }) + ); + } + + let batch = Batch { network: NetworkId::Bitcoin, id: 0, block, instructions: vec![] }; + + // Expand to a key pair as Schnorrkel expects + // It's the private key + 32-bytes of entropy for nonces + the public key + let mut schnorrkel_key_pair = [0; 96]; + schnorrkel_key_pair[.. 32].copy_from_slice(&substrate_key.to_repr()); + OsRng.fill_bytes(&mut schnorrkel_key_pair[32 .. 64]); + schnorrkel_key_pair[64 ..] + .copy_from_slice(&(::generator() * **substrate_key).to_bytes()); + let signature = Signature( + schnorrkel::keys::Keypair::from_bytes(&schnorrkel_key_pair) + .unwrap() + .sign_simple(b"substrate", &batch_message(&batch)) + .to_bytes(), + ); + + let batch = SignedBatch { batch, signature }; + + let serai = processors[0].serai().await; + let mut last_serai_block = serai.get_latest_block().await.unwrap().number(); + + for processor in processors.iter_mut() { + processor + .send_message(messages::substrate::ProcessorMessage::Update { batch: batch.clone() }) + .await; + } + + // Verify the Batch was published to Substrate + 'outer: for _ in 0 .. 20 { + tokio::time::sleep(Duration::from_secs(6)).await; + if std::env::var("GITHUB_CI") == Ok("true".to_string()) { + tokio::time::sleep(Duration::from_secs(6)).await; + } + + while last_serai_block <= serai.get_latest_block().await.unwrap().number() { + let batch_events = serai + .get_batch_events( + serai.get_block_by_number(last_serai_block).await.unwrap().unwrap().hash(), + ) + .await + .unwrap(); + + if !batch_events.is_empty() { + assert_eq!(batch_events.len(), 1); + assert_eq!( + batch_events[0], + InInstructionsEvent::Batch { network: NetworkId::Bitcoin, id: 0, block } + ); + break 'outer; + } + last_serai_block += 1; + } + } + + // TODO: Verify the coordinator sends SubstrateBlock to all processors +} + +#[tokio::test] +async fn batch_test() { + let (processors, test) = new_test(); + + test + .run_async(|ops| async move { + // Wait for the Serai node to boot, and for the Tendermint chain to get past the first block + // TODO: Replace this with a Coordinator RPC + tokio::time::sleep(Duration::from_secs(150)).await; + + // Sleep even longer if in the CI due to it being slower than commodity hardware + if std::env::var("GITHUB_CI") == Ok("true".to_string()) { + tokio::time::sleep(Duration::from_secs(120)).await; + } + + // Connect to the Message Queues as the processor + let mut new_processors: Vec = vec![]; + for (handles, key) in processors { + new_processors.push(Processor::new(NetworkId::Bitcoin, &ops, handles, key).await); + } + let mut processors = new_processors; + + let substrate_key = key_gen(&mut processors).await; + batch(&mut processors, &substrate_key).await; + }) + .await; +} diff --git a/tests/coordinator/src/tests/key_gen.rs b/tests/coordinator/src/tests/key_gen.rs new file mode 100644 index 00000000..1401e139 --- /dev/null +++ b/tests/coordinator/src/tests/key_gen.rs @@ -0,0 +1,193 @@ +use std::{ + time::{Duration, SystemTime}, + collections::HashMap, +}; + +use zeroize::Zeroizing; +use rand_core::OsRng; + +use ciphersuite::{ + group::{ff::Field, GroupEncoding}, + Ciphersuite, Ristretto, +}; +use dkg::{Participant, ThresholdParams}; + +use serai_client::{ + primitives::NetworkId, + Public, + validator_sets::primitives::{Session, ValidatorSet}, +}; +use messages::{key_gen::KeyGenId, CoordinatorMessage}; + +use crate::{*, tests::*}; + +pub async fn key_gen(processors: &mut [Processor]) -> Zeroizing<::F> { + let participant_from_i = |i: usize| Participant::new(u16::try_from(i + 1).unwrap()).unwrap(); + + let set = ValidatorSet { session: Session(0), network: NetworkId::Bitcoin }; + let id = KeyGenId { set, attempt: 0 }; + + for (i, processor) in processors.iter_mut().enumerate() { + assert_eq!( + processor.recv_message().await, + CoordinatorMessage::KeyGen(messages::key_gen::CoordinatorMessage::GenerateKey { + id, + params: ThresholdParams::new( + u16::try_from(((COORDINATORS * 2) / 3) + 1).unwrap(), + u16::try_from(COORDINATORS).unwrap(), + participant_from_i(i), + ) + .unwrap() + }) + ); + + processor + .send_message(messages::key_gen::ProcessorMessage::Commitments { + id, + commitments: vec![u8::try_from(i).unwrap()], + }) + .await; + } + + wait_for_tributary().await; + for (i, processor) in processors.iter_mut().enumerate() { + let mut commitments = (0 .. u8::try_from(COORDINATORS).unwrap()) + .map(|l| (participant_from_i(l.into()), vec![l])) + .collect::>(); + commitments.remove(&participant_from_i(i)); + assert_eq!( + processor.recv_message().await, + CoordinatorMessage::KeyGen(messages::key_gen::CoordinatorMessage::Commitments { + id, + commitments, + }) + ); + + // from (0 .. n), to (1 ..= n) + let mut shares = (0 .. u8::try_from(COORDINATORS).unwrap()) + .map(|l| (participant_from_i(l.into()), vec![u8::try_from(i).unwrap(), l + 1])) + .collect::>(); + + let i = participant_from_i(i); + shares.remove(&i); + processor.send_message(messages::key_gen::ProcessorMessage::Shares { id, shares }).await; + } + + let substrate_priv_key = Zeroizing::new(::F::random(&mut OsRng)); + let substrate_key = (::generator() * *substrate_priv_key).to_bytes(); + + let serai = processors[0].serai().await; + let mut last_serai_block = serai.get_latest_block().await.unwrap().number(); + + wait_for_tributary().await; + for (i, processor) in processors.iter_mut().enumerate() { + let i = participant_from_i(i); + assert_eq!( + processor.recv_message().await, + CoordinatorMessage::KeyGen(messages::key_gen::CoordinatorMessage::Shares { + id, + shares: { + let mut shares = (0 .. u8::try_from(COORDINATORS).unwrap()) + .map(|l| (participant_from_i(l.into()), vec![l, u8::try_from(u16::from(i)).unwrap()])) + .collect::>(); + shares.remove(&i); + shares + }, + }) + ); + processor + .send_message(messages::key_gen::ProcessorMessage::GeneratedKeyPair { + id, + substrate_key, + network_key: b"network_key".to_vec(), + }) + .await; + } + + // Sleeps for longer since we need to wait for a Substrate block as well + 'outer: for _ in 0 .. 20 { + tokio::time::sleep(Duration::from_secs(6)).await; + if std::env::var("GITHUB_CI") == Ok("true".to_string()) { + tokio::time::sleep(Duration::from_secs(6)).await; + } + + while last_serai_block <= serai.get_latest_block().await.unwrap().number() { + if !serai + .get_key_gen_events( + serai.get_block_by_number(last_serai_block).await.unwrap().unwrap().hash(), + ) + .await + .unwrap() + .is_empty() + { + break 'outer; + } + last_serai_block += 1; + } + } + let mut message = None; + for processor in processors.iter_mut() { + let msg = processor.recv_message().await; + if message.is_none() { + match msg { + CoordinatorMessage::Substrate( + messages::substrate::CoordinatorMessage::ConfirmKeyPair { + context, + set: this_set, + ref key_pair, + }, + ) => { + assert!( + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs() + .abs_diff(context.serai_time) < + 70 + ); + assert_eq!(context.network_latest_finalized_block.0, [0; 32]); + assert_eq!(set, this_set); + assert_eq!(key_pair.0 .0, substrate_key); + assert_eq!(key_pair.1.to_vec(), b"network_key".to_vec()); + } + _ => panic!("coordinator didn't respond with ConfirmKeyPair"), + } + message = Some(msg); + } else { + assert_eq!(message, Some(msg)); + } + } + assert_eq!( + serai.get_keys(set).await.unwrap().unwrap(), + (Public(substrate_key), b"network_key".to_vec().try_into().unwrap()) + ); + + substrate_priv_key +} + +#[tokio::test] +async fn key_gen_test() { + let (processors, test) = new_test(); + + test + .run_async(|ops| async move { + // Wait for the Serai node to boot, and for the Tendermint chain to get past the first block + // TODO: Replace this with a Coordinator RPC + tokio::time::sleep(Duration::from_secs(150)).await; + + // Sleep even longer if in the CI due to it being slower than commodity hardware + if std::env::var("GITHUB_CI") == Ok("true".to_string()) { + tokio::time::sleep(Duration::from_secs(120)).await; + } + + // Connect to the Message Queues as the processor + let mut new_processors: Vec = vec![]; + for (handles, key) in processors { + new_processors.push(Processor::new(NetworkId::Bitcoin, &ops, handles, key).await); + } + let mut processors = new_processors; + + key_gen(&mut processors).await; + }) + .await; +} diff --git a/tests/coordinator/src/tests/mod.rs b/tests/coordinator/src/tests/mod.rs index 1c016fda..9155c4cd 100644 --- a/tests/coordinator/src/tests/mod.rs +++ b/tests/coordinator/src/tests/mod.rs @@ -1,25 +1,19 @@ -use std::{ - time::{Duration, SystemTime}, - collections::HashMap, -}; - -use ciphersuite::{Ciphersuite, Ristretto}; -use dkg::{Participant, ThresholdParams}; - -use serai_client::{ - primitives::NetworkId, - validator_sets::primitives::{Session, ValidatorSet}, -}; -use messages::{key_gen::KeyGenId, CoordinatorMessage}; +use ciphersuite::Ristretto; use dockertest::DockerTest; use crate::*; -pub(crate) const COORDINATORS: usize = 4; -// pub(crate) const THRESHOLD: usize = ((COORDINATORS * 2) / 3) + 1; +mod key_gen; +pub use key_gen::key_gen; -fn new_test() -> (Vec<(Handles, ::F)>, DockerTest) { +mod batch; +pub use batch::batch; + +pub(crate) const COORDINATORS: usize = 4; +pub(crate) const THRESHOLD: usize = ((COORDINATORS * 2) / 3) + 1; + +pub(crate) fn new_test() -> (Vec<(Handles, ::F)>, DockerTest) { let mut coordinators = vec![]; let mut test = DockerTest::new(); for i in 0 .. COORDINATORS { @@ -40,172 +34,12 @@ fn new_test() -> (Vec<(Handles, ::F)>, DockerTest) { (coordinators, test) } -#[tokio::test] -async fn key_gen_test() { - let (processors, test) = new_test(); - - let participant_from_i = |i: usize| Participant::new(u16::try_from(i + 1).unwrap()).unwrap(); - - test - .run_async(|ops| async move { - // Wait for the Serai node to boot, and for the Tendermint chain to get past the first block - // TODO: Replace this with a Coordinator RPC - tokio::time::sleep(Duration::from_secs(150)).await; - - // Sleep even longer if in the CI due to it being slower than commodity hardware - if std::env::var("GITHUB_CI") == Ok("true".to_string()) { - tokio::time::sleep(Duration::from_secs(120)).await; - } - - // Connect to the Message Queues as the processor - let mut new_processors: Vec = vec![]; - for (handles, key) in processors { - new_processors.push(Processor::new(NetworkId::Bitcoin, &ops, handles, key).await); - } - let mut processors = new_processors; - - let set = ValidatorSet { session: Session(0), network: NetworkId::Bitcoin }; - let id = KeyGenId { set, attempt: 0 }; - - for (i, processor) in processors.iter_mut().enumerate() { - assert_eq!( - processor.recv_message().await, - CoordinatorMessage::KeyGen(messages::key_gen::CoordinatorMessage::GenerateKey { - id, - params: ThresholdParams::new( - u16::try_from(((COORDINATORS * 2) / 3) + 1).unwrap(), - u16::try_from(COORDINATORS).unwrap(), - participant_from_i(i), - ) - .unwrap() - }) - ); - - processor - .send_message(messages::key_gen::ProcessorMessage::Commitments { - id, - commitments: vec![u8::try_from(i).unwrap()], - }) - .await; - } - - // Sleep for 20s to give everything processing time - tokio::time::sleep(Duration::from_secs(20)).await; - if std::env::var("GITHUB_CI") == Ok("true".to_string()) { - tokio::time::sleep(Duration::from_secs(20)).await; - } - for (i, processor) in processors.iter_mut().enumerate() { - let mut commitments = (0 .. u8::try_from(COORDINATORS).unwrap()) - .map(|l| (participant_from_i(l.into()), vec![l])) - .collect::>(); - commitments.remove(&participant_from_i(i)); - assert_eq!( - processor.recv_message().await, - CoordinatorMessage::KeyGen(messages::key_gen::CoordinatorMessage::Commitments { - id, - commitments, - }) - ); - - // from (0 .. n), to (1 ..= n) - let mut shares = (0 .. u8::try_from(COORDINATORS).unwrap()) - .map(|l| (participant_from_i(l.into()), vec![u8::try_from(i).unwrap(), l + 1])) - .collect::>(); - - let i = participant_from_i(i); - shares.remove(&i); - processor.send_message(messages::key_gen::ProcessorMessage::Shares { id, shares }).await; - } - - let serai = processors[0].serai().await; - let mut last_serai_block = serai.get_latest_block().await.unwrap().number(); - - tokio::time::sleep(Duration::from_secs(20)).await; - if std::env::var("GITHUB_CI") == Ok("true".to_string()) { - tokio::time::sleep(Duration::from_secs(20)).await; - } - for (i, processor) in processors.iter_mut().enumerate() { - let i = participant_from_i(i); - assert_eq!( - processor.recv_message().await, - CoordinatorMessage::KeyGen(messages::key_gen::CoordinatorMessage::Shares { - id, - shares: { - let mut shares = (0 .. u8::try_from(COORDINATORS).unwrap()) - .map(|l| { - (participant_from_i(l.into()), vec![l, u8::try_from(u16::from(i)).unwrap()]) - }) - .collect::>(); - shares.remove(&i); - shares - }, - }) - ); - - processor - .send_message(messages::key_gen::ProcessorMessage::GeneratedKeyPair { - id, - substrate_key: [0xaa; 32], - network_key: b"network_key".to_vec(), - }) - .await; - } - - // Sleeps for longer since we need to wait for a Substrate block as well - 'outer: for _ in 0 .. 20 { - tokio::time::sleep(Duration::from_secs(6)).await; - if std::env::var("GITHUB_CI") == Ok("true".to_string()) { - tokio::time::sleep(Duration::from_secs(6)).await; - } - - while last_serai_block <= serai.get_latest_block().await.unwrap().number() { - if !serai - .get_key_gen_events( - serai.get_block_by_number(last_serai_block).await.unwrap().unwrap().hash(), - ) - .await - .unwrap() - .is_empty() - { - break 'outer; - } - last_serai_block += 1; - } - } - let mut message = None; - for processor in processors.iter_mut() { - let msg = processor.recv_message().await; - if message.is_none() { - match msg { - CoordinatorMessage::Substrate( - messages::substrate::CoordinatorMessage::ConfirmKeyPair { - context, - set: this_set, - ref key_pair, - }, - ) => { - assert!( - SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs() - .abs_diff(context.serai_time) < - 70 - ); - assert_eq!(context.network_latest_finalized_block.0, [0; 32]); - assert_eq!(set, this_set); - assert_eq!(key_pair.0 .0, [0xaa; 32]); - assert_eq!(key_pair.1.to_vec(), b"network_key".to_vec()); - } - _ => panic!("coordinator didn't respond with ConfirmKeyPair"), - } - message = Some(msg); - } else { - assert_eq!(message, Some(msg)); - } - } - }) - .await; - - // TODO: Check Substrate actually has this key pair +// TODO: Don't use a pessimistic sleep +// Use an RPC to enaluate if a condition was met, with the following time being a timeout +// https://github.com/serai-dex/serai/issues/340 +pub(crate) async fn wait_for_tributary() { + tokio::time::sleep(Duration::from_secs(20)).await; + if std::env::var("GITHUB_CI") == Ok("true".to_string()) { + tokio::time::sleep(Duration::from_secs(20)).await; + } }