diff --git a/Cargo.lock b/Cargo.lock index 986e3d88..e005ce89 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1314,6 +1314,7 @@ dependencies = [ "parity-scale-codec", "processor-messages", "rand_core 0.6.4", + "schnorr-signatures", "serai-client", "serai-db", "sp-application-crypto", diff --git a/coordinator/Cargo.toml b/coordinator/Cargo.toml index 0d958c49..ad5df6be 100644 --- a/coordinator/Cargo.toml +++ b/coordinator/Cargo.toml @@ -22,6 +22,7 @@ blake2 = "0.10" transcript = { package = "flexible-transcript", path = "../crypto/transcript", features = ["recommended"] } ciphersuite = { path = "../crypto/ciphersuite" } +schnorr = { package = "schnorr-signatures", path = "../crypto/schnorr" } frost = { package = "modular-frost", path = "../crypto/frost" } scale = { package = "parity-scale-codec", version = "3", features = ["derive"] } diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index f6d842dd..6a1f63cf 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -25,7 +25,7 @@ use processor::Processor; mod substrate; #[cfg(test)] -mod tests; +pub mod tests; async fn run( raw_db: D, diff --git a/coordinator/src/p2p.rs b/coordinator/src/p2p.rs index 7df40a46..97c78827 100644 --- a/coordinator/src/p2p.rs +++ b/coordinator/src/p2p.rs @@ -31,7 +31,7 @@ impl P2pMessageKind { // TODO #[async_trait] pub trait P2p: Send + Sync + Clone + Debug + TributaryP2p { - async fn broadcast(&self, msg: Vec); + async fn broadcast(&self, kind: P2pMessageKind, msg: Vec); async fn receive(&self) -> Option<(P2pMessageKind, Vec)>; } @@ -51,7 +51,8 @@ impl LocalP2p { #[async_trait] impl P2p for LocalP2p { - async fn broadcast(&self, msg: Vec) { + async fn broadcast(&self, kind: P2pMessageKind, mut msg: Vec) { + msg.insert(0, kind.to_byte()); for (i, msg_queue) in self.1.write().unwrap().iter_mut().enumerate() { if i == self.0 { continue; @@ -72,8 +73,7 @@ impl P2p for LocalP2p { #[async_trait] impl TributaryP2p for LocalP2p { - async fn broadcast(&self, mut msg: Vec) { - msg.insert(0, P2pMessageKind::Tributary.to_byte()); - ::broadcast(self, msg).await + async fn broadcast(&self, msg: Vec) { + ::broadcast(self, P2pMessageKind::Tributary, msg).await } } diff --git a/coordinator/src/tests/mod.rs b/coordinator/src/tests/mod.rs index a21e73aa..e263b4da 100644 --- a/coordinator/src/tests/mod.rs +++ b/coordinator/src/tests/mod.rs @@ -1 +1 @@ -mod tributary; +pub mod tributary; diff --git a/coordinator/src/tests/tributary/chain.rs b/coordinator/src/tests/tributary/chain.rs index e2c5575d..416e9329 100644 --- a/coordinator/src/tests/tributary/chain.rs +++ b/coordinator/src/tests/tributary/chain.rs @@ -2,7 +2,7 @@ use std::time::{Duration, SystemTime}; use zeroize::Zeroizing; -use rand_core::{RngCore, OsRng}; +use rand_core::{RngCore, CryptoRng, OsRng}; use ciphersuite::{ group::{ff::Field, GroupEncoding}, @@ -22,11 +22,27 @@ use serai_db::MemDb; use tributary::Tributary; -use crate::{P2pMessageKind, P2p, LocalP2p, processor::MemProcessor, tributary::TributarySpec}; +use crate::{ + P2pMessageKind, P2p, LocalP2p, + tributary::{Transaction, TributarySpec}, +}; -fn new_spec(keys: &[Zeroizing<::F>]) -> TributarySpec { +pub fn new_keys( + rng: &mut R, +) -> Vec::F>> { + let mut keys = vec![]; + for _ in 0 .. 5 { + keys.push(Zeroizing::new(::F::random(&mut *rng))); + } + keys +} + +pub fn new_spec( + rng: &mut R, + keys: &[Zeroizing<::F>], +) -> TributarySpec { let mut serai_block = [0; 32]; - OsRng.fill_bytes(&mut serai_block); + rng.fill_bytes(&mut serai_block); let start_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); @@ -48,24 +64,16 @@ fn new_spec(keys: &[Zeroizing<::F>]) -> TributarySpec TributarySpec::new(serai_block, start_time, set, set_data) } -#[tokio::test] -async fn tributary_test() { - let mut keys = vec![]; - for _ in 0 .. 5 { - keys.push(Zeroizing::new(::F::random(&mut OsRng))); - } - - let processor = MemProcessor::new(); - - let spec = new_spec(&keys); - +pub async fn new_tributaries( + keys: &[Zeroizing<::F>], + spec: &TributarySpec, +) -> Vec<(LocalP2p, Tributary)> { let p2p = LocalP2p::new(keys.len()); - - let mut tributaries = vec![]; - + let mut res = vec![]; for (i, key) in keys.iter().enumerate() { - tributaries.push( - Tributary::<_, crate::tributary::Transaction, _>::new( + res.push(( + p2p[i].clone(), + Tributary::<_, Transaction, _>::new( MemDb::new(), spec.genesis(), spec.start_time(), @@ -75,25 +83,56 @@ async fn tributary_test() { ) .await .unwrap(), - ); + )); } + res +} - let mut blocks = 0; - let mut last_block = spec.genesis(); - - let timeout = SystemTime::now() + Duration::from_secs(70); - while (blocks < 10) && (SystemTime::now().duration_since(timeout).is_err()) { - for (i, p2p) in p2p.iter().enumerate() { +pub async fn run_tributaries( + mut tributaries: Vec<(LocalP2p, Tributary)>, +) { + loop { + for (p2p, tributary) in tributaries.iter_mut() { while let Some(msg) = p2p.receive().await { match msg.0 { P2pMessageKind::Tributary => { - tributaries[i].handle_message(&msg.1).await; + if tributary.handle_message(&msg.1).await { + p2p.broadcast(msg.0, msg.1).await; + } } } } } - let tip = tributaries[0].tip(); + sleep(Duration::from_millis(100)).await; + } +} + +#[tokio::test] +async fn tributary_test() { + let keys = new_keys(&mut OsRng); + let spec = new_spec(&mut OsRng, &keys); + + let mut tributaries = new_tributaries(&keys, &spec).await; + + let mut blocks = 0; + let mut last_block = spec.genesis(); + + // Doesn't use run_tributaries as we want to wind these down at a certain point + // run_tributaries will run them ad infinitum + let timeout = SystemTime::now() + Duration::from_secs(65); + while (blocks < 10) && (SystemTime::now().duration_since(timeout).is_err()) { + for (p2p, tributary) in tributaries.iter_mut() { + while let Some(msg) = p2p.receive().await { + match msg.0 { + P2pMessageKind::Tributary => { + tributary.handle_message(&msg.1).await; + } + } + } + } + + let tip = tributaries[0].1.tip(); if tip != last_block { last_block = tip; blocks += 1; @@ -107,11 +146,11 @@ async fn tributary_test() { } // Handle all existing messages - for (i, p2p) in p2p.iter().enumerate() { + for (p2p, tributary) in tributaries.iter_mut() { while let Some(msg) = p2p.receive().await { match msg.0 { P2pMessageKind::Tributary => { - tributaries[i].handle_message(&msg.1).await; + tributary.handle_message(&msg.1).await; } } } @@ -119,7 +158,7 @@ async fn tributary_test() { // All tributaries should agree on the tip let mut final_block = None; - for tributary in tributaries { + for (_, tributary) in tributaries { final_block = final_block.or_else(|| Some(tributary.tip())); if tributary.tip() != final_block.unwrap() { panic!("tributary had different tip"); diff --git a/coordinator/src/tests/tributary/mod.rs b/coordinator/src/tests/tributary/mod.rs index 0d1741eb..4abd549a 100644 --- a/coordinator/src/tests/tributary/mod.rs +++ b/coordinator/src/tests/tributary/mod.rs @@ -10,6 +10,9 @@ use tributary::{ReadWrite, tests::random_signed}; use crate::tributary::{SignData, Transaction}; mod chain; +pub use chain::*; + +mod tx; fn random_u32(rng: &mut R) -> u32 { u32::try_from(rng.next_u64() >> 32).unwrap() diff --git a/coordinator/src/tests/tributary/tx.rs b/coordinator/src/tests/tributary/tx.rs new file mode 100644 index 00000000..d9194eb5 --- /dev/null +++ b/coordinator/src/tests/tributary/tx.rs @@ -0,0 +1,97 @@ +use core::time::Duration; + +use zeroize::Zeroizing; + +use rand_core::{RngCore, OsRng}; + +use ciphersuite::{group::ff::Field, Ciphersuite, Ristretto}; +use schnorr::SchnorrSignature; + +use tokio::time::sleep; + +use serai_db::MemDb; + +use tributary::{Signed, Transaction as TransactionTrait, Tributary}; + +use crate::{ + LocalP2p, + tributary::Transaction, + tests::tributary::{new_keys, new_spec, new_tributaries, run_tributaries}, +}; + +#[tokio::test] +async fn tx_test() { + let keys = new_keys(&mut OsRng); + let spec = new_spec(&mut OsRng, &keys); + + let mut tributaries = new_tributaries(&keys, &spec).await; + + // Run the tributaries in the background + tokio::spawn(run_tributaries(tributaries.clone())); + + // Send a TX from a random Tributary + let sender = + usize::try_from(OsRng.next_u64() % u64::try_from(tributaries.len()).unwrap()).unwrap(); + let key = keys[sender].clone(); + let pub_key = Ristretto::generator() * *key; + + let attempt = 0; + let mut commitments = vec![0; 256]; + OsRng.fill_bytes(&mut commitments); + + // Create the TX with a null signature so we can get its sig hash + let tx = Transaction::DkgCommitments( + attempt, + commitments.clone(), + Signed { + signer: pub_key, + nonce: 0, + signature: SchnorrSignature:: { + R: Ristretto::generator(), + s: ::F::ZERO, + }, + }, + ); + + // Re-create it with the actual signature + // We could mutate the existing one, we'd just have to match to the DkgCommitments enum variant + let tx = Transaction::DkgCommitments( + attempt, + commitments, + Signed { + signer: pub_key, + nonce: 0, + signature: SchnorrSignature::::sign( + &key, + Zeroizing::new(::F::random(&mut OsRng)), + tx.sig_hash(spec.genesis()), + ), + }, + ); + + assert!(tributaries[sender].1.add_transaction(tx.clone()).await); + // Sleep for two blocks + sleep(Duration::from_secs((2 * Tributary::::block_time()).into())) + .await; + + // All tributaries should have acknowledged this transaction in a block + let mut included_in = None; + for (_, tributary) in tributaries { + if included_in.is_none() { + let mut found = tributary.tip(); + + let mut block; + while { + block = tributary.block(&found).unwrap(); + block.transactions.is_empty() + } { + found = block.parent(); + } + + included_in = Some(found); + } + + let block = tributary.block(&included_in.unwrap()).unwrap(); + assert_eq!(block.transactions, vec![tx.clone()]); + } +} diff --git a/coordinator/tributary/src/lib.rs b/coordinator/tributary/src/lib.rs index fec02dee..7b2516d4 100644 --- a/coordinator/tributary/src/lib.rs +++ b/coordinator/tributary/src/lib.rs @@ -82,6 +82,7 @@ impl P2p for Arc

{ } } +#[derive(Clone)] pub struct Tributary { network: TendermintNetwork, @@ -123,6 +124,10 @@ impl Tributary { Some(Self { network, synced_block, messages }) } + pub fn block_time() -> u32 { + TendermintNetwork::::block_time() + } + pub fn genesis(&self) -> [u8; 32] { self.network.blockchain.read().unwrap().genesis() }