Add a test for Tributary

Further fleshes out the Tributary testing code.
This commit is contained in:
Luke Parker 2023-04-22 22:27:12 -04:00
parent 8c74576cf0
commit af84b7f707
No known key found for this signature in database
9 changed files with 185 additions and 39 deletions

1
Cargo.lock generated
View file

@ -1314,6 +1314,7 @@ dependencies = [
"parity-scale-codec", "parity-scale-codec",
"processor-messages", "processor-messages",
"rand_core 0.6.4", "rand_core 0.6.4",
"schnorr-signatures",
"serai-client", "serai-client",
"serai-db", "serai-db",
"sp-application-crypto", "sp-application-crypto",

View file

@ -22,6 +22,7 @@ blake2 = "0.10"
transcript = { package = "flexible-transcript", path = "../crypto/transcript", features = ["recommended"] } transcript = { package = "flexible-transcript", path = "../crypto/transcript", features = ["recommended"] }
ciphersuite = { path = "../crypto/ciphersuite" } ciphersuite = { path = "../crypto/ciphersuite" }
schnorr = { package = "schnorr-signatures", path = "../crypto/schnorr" }
frost = { package = "modular-frost", path = "../crypto/frost" } frost = { package = "modular-frost", path = "../crypto/frost" }
scale = { package = "parity-scale-codec", version = "3", features = ["derive"] } scale = { package = "parity-scale-codec", version = "3", features = ["derive"] }

View file

@ -25,7 +25,7 @@ use processor::Processor;
mod substrate; mod substrate;
#[cfg(test)] #[cfg(test)]
mod tests; pub mod tests;
async fn run<D: Db, Pro: Processor, P: P2p>( async fn run<D: Db, Pro: Processor, P: P2p>(
raw_db: D, raw_db: D,

View file

@ -31,7 +31,7 @@ impl P2pMessageKind {
// TODO // TODO
#[async_trait] #[async_trait]
pub trait P2p: Send + Sync + Clone + Debug + TributaryP2p { pub trait P2p: Send + Sync + Clone + Debug + TributaryP2p {
async fn broadcast(&self, msg: Vec<u8>); async fn broadcast(&self, kind: P2pMessageKind, msg: Vec<u8>);
async fn receive(&self) -> Option<(P2pMessageKind, Vec<u8>)>; async fn receive(&self) -> Option<(P2pMessageKind, Vec<u8>)>;
} }
@ -51,7 +51,8 @@ impl LocalP2p {
#[async_trait] #[async_trait]
impl P2p for LocalP2p { impl P2p for LocalP2p {
async fn broadcast(&self, msg: Vec<u8>) { async fn broadcast(&self, kind: P2pMessageKind, mut msg: Vec<u8>) {
msg.insert(0, kind.to_byte());
for (i, msg_queue) in self.1.write().unwrap().iter_mut().enumerate() { for (i, msg_queue) in self.1.write().unwrap().iter_mut().enumerate() {
if i == self.0 { if i == self.0 {
continue; continue;
@ -72,8 +73,7 @@ impl P2p for LocalP2p {
#[async_trait] #[async_trait]
impl TributaryP2p for LocalP2p { impl TributaryP2p for LocalP2p {
async fn broadcast(&self, mut msg: Vec<u8>) { async fn broadcast(&self, msg: Vec<u8>) {
msg.insert(0, P2pMessageKind::Tributary.to_byte()); <Self as P2p>::broadcast(self, P2pMessageKind::Tributary, msg).await
<Self as P2p>::broadcast(self, msg).await
} }
} }

View file

@ -1 +1 @@
mod tributary; pub mod tributary;

View file

@ -2,7 +2,7 @@ use std::time::{Duration, SystemTime};
use zeroize::Zeroizing; use zeroize::Zeroizing;
use rand_core::{RngCore, OsRng}; use rand_core::{RngCore, CryptoRng, OsRng};
use ciphersuite::{ use ciphersuite::{
group::{ff::Field, GroupEncoding}, group::{ff::Field, GroupEncoding},
@ -22,11 +22,27 @@ use serai_db::MemDb;
use tributary::Tributary; use tributary::Tributary;
use crate::{P2pMessageKind, P2p, LocalP2p, processor::MemProcessor, tributary::TributarySpec}; use crate::{
P2pMessageKind, P2p, LocalP2p,
tributary::{Transaction, TributarySpec},
};
fn new_spec(keys: &[Zeroizing<<Ristretto as Ciphersuite>::F>]) -> TributarySpec { pub fn new_keys<R: RngCore + CryptoRng>(
rng: &mut R,
) -> Vec<Zeroizing<<Ristretto as Ciphersuite>::F>> {
let mut keys = vec![];
for _ in 0 .. 5 {
keys.push(Zeroizing::new(<Ristretto as Ciphersuite>::F::random(&mut *rng)));
}
keys
}
pub fn new_spec<R: RngCore + CryptoRng>(
rng: &mut R,
keys: &[Zeroizing<<Ristretto as Ciphersuite>::F>],
) -> TributarySpec {
let mut serai_block = [0; 32]; let mut serai_block = [0; 32];
OsRng.fill_bytes(&mut serai_block); rng.fill_bytes(&mut serai_block);
let start_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); let start_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
@ -48,24 +64,16 @@ fn new_spec(keys: &[Zeroizing<<Ristretto as Ciphersuite>::F>]) -> TributarySpec
TributarySpec::new(serai_block, start_time, set, set_data) TributarySpec::new(serai_block, start_time, set, set_data)
} }
#[tokio::test] pub async fn new_tributaries(
async fn tributary_test() { keys: &[Zeroizing<<Ristretto as Ciphersuite>::F>],
let mut keys = vec![]; spec: &TributarySpec,
for _ in 0 .. 5 { ) -> Vec<(LocalP2p, Tributary<MemDb, Transaction, LocalP2p>)> {
keys.push(Zeroizing::new(<Ristretto as Ciphersuite>::F::random(&mut OsRng)));
}
let processor = MemProcessor::new();
let spec = new_spec(&keys);
let p2p = LocalP2p::new(keys.len()); let p2p = LocalP2p::new(keys.len());
let mut res = vec![];
let mut tributaries = vec![];
for (i, key) in keys.iter().enumerate() { for (i, key) in keys.iter().enumerate() {
tributaries.push( res.push((
Tributary::<_, crate::tributary::Transaction, _>::new( p2p[i].clone(),
Tributary::<_, Transaction, _>::new(
MemDb::new(), MemDb::new(),
spec.genesis(), spec.genesis(),
spec.start_time(), spec.start_time(),
@ -75,25 +83,56 @@ async fn tributary_test() {
) )
.await .await
.unwrap(), .unwrap(),
); ));
} }
res
}
let mut blocks = 0; pub async fn run_tributaries(
let mut last_block = spec.genesis(); mut tributaries: Vec<(LocalP2p, Tributary<MemDb, Transaction, LocalP2p>)>,
) {
let timeout = SystemTime::now() + Duration::from_secs(70); loop {
while (blocks < 10) && (SystemTime::now().duration_since(timeout).is_err()) { for (p2p, tributary) in tributaries.iter_mut() {
for (i, p2p) in p2p.iter().enumerate() {
while let Some(msg) = p2p.receive().await { while let Some(msg) = p2p.receive().await {
match msg.0 { match msg.0 {
P2pMessageKind::Tributary => { P2pMessageKind::Tributary => {
tributaries[i].handle_message(&msg.1).await; if tributary.handle_message(&msg.1).await {
p2p.broadcast(msg.0, msg.1).await;
}
} }
} }
} }
} }
let tip = tributaries[0].tip(); sleep(Duration::from_millis(100)).await;
}
}
#[tokio::test]
async fn tributary_test() {
let keys = new_keys(&mut OsRng);
let spec = new_spec(&mut OsRng, &keys);
let mut tributaries = new_tributaries(&keys, &spec).await;
let mut blocks = 0;
let mut last_block = spec.genesis();
// Doesn't use run_tributaries as we want to wind these down at a certain point
// run_tributaries will run them ad infinitum
let timeout = SystemTime::now() + Duration::from_secs(65);
while (blocks < 10) && (SystemTime::now().duration_since(timeout).is_err()) {
for (p2p, tributary) in tributaries.iter_mut() {
while let Some(msg) = p2p.receive().await {
match msg.0 {
P2pMessageKind::Tributary => {
tributary.handle_message(&msg.1).await;
}
}
}
}
let tip = tributaries[0].1.tip();
if tip != last_block { if tip != last_block {
last_block = tip; last_block = tip;
blocks += 1; blocks += 1;
@ -107,11 +146,11 @@ async fn tributary_test() {
} }
// Handle all existing messages // Handle all existing messages
for (i, p2p) in p2p.iter().enumerate() { for (p2p, tributary) in tributaries.iter_mut() {
while let Some(msg) = p2p.receive().await { while let Some(msg) = p2p.receive().await {
match msg.0 { match msg.0 {
P2pMessageKind::Tributary => { P2pMessageKind::Tributary => {
tributaries[i].handle_message(&msg.1).await; tributary.handle_message(&msg.1).await;
} }
} }
} }
@ -119,7 +158,7 @@ async fn tributary_test() {
// All tributaries should agree on the tip // All tributaries should agree on the tip
let mut final_block = None; let mut final_block = None;
for tributary in tributaries { for (_, tributary) in tributaries {
final_block = final_block.or_else(|| Some(tributary.tip())); final_block = final_block.or_else(|| Some(tributary.tip()));
if tributary.tip() != final_block.unwrap() { if tributary.tip() != final_block.unwrap() {
panic!("tributary had different tip"); panic!("tributary had different tip");

View file

@ -10,6 +10,9 @@ use tributary::{ReadWrite, tests::random_signed};
use crate::tributary::{SignData, Transaction}; use crate::tributary::{SignData, Transaction};
mod chain; mod chain;
pub use chain::*;
mod tx;
fn random_u32<R: RngCore>(rng: &mut R) -> u32 { fn random_u32<R: RngCore>(rng: &mut R) -> u32 {
u32::try_from(rng.next_u64() >> 32).unwrap() u32::try_from(rng.next_u64() >> 32).unwrap()

View file

@ -0,0 +1,97 @@
use core::time::Duration;
use zeroize::Zeroizing;
use rand_core::{RngCore, OsRng};
use ciphersuite::{group::ff::Field, Ciphersuite, Ristretto};
use schnorr::SchnorrSignature;
use tokio::time::sleep;
use serai_db::MemDb;
use tributary::{Signed, Transaction as TransactionTrait, Tributary};
use crate::{
LocalP2p,
tributary::Transaction,
tests::tributary::{new_keys, new_spec, new_tributaries, run_tributaries},
};
#[tokio::test]
async fn tx_test() {
let keys = new_keys(&mut OsRng);
let spec = new_spec(&mut OsRng, &keys);
let mut tributaries = new_tributaries(&keys, &spec).await;
// Run the tributaries in the background
tokio::spawn(run_tributaries(tributaries.clone()));
// Send a TX from a random Tributary
let sender =
usize::try_from(OsRng.next_u64() % u64::try_from(tributaries.len()).unwrap()).unwrap();
let key = keys[sender].clone();
let pub_key = Ristretto::generator() * *key;
let attempt = 0;
let mut commitments = vec![0; 256];
OsRng.fill_bytes(&mut commitments);
// Create the TX with a null signature so we can get its sig hash
let tx = Transaction::DkgCommitments(
attempt,
commitments.clone(),
Signed {
signer: pub_key,
nonce: 0,
signature: SchnorrSignature::<Ristretto> {
R: Ristretto::generator(),
s: <Ristretto as Ciphersuite>::F::ZERO,
},
},
);
// Re-create it with the actual signature
// We could mutate the existing one, we'd just have to match to the DkgCommitments enum variant
let tx = Transaction::DkgCommitments(
attempt,
commitments,
Signed {
signer: pub_key,
nonce: 0,
signature: SchnorrSignature::<Ristretto>::sign(
&key,
Zeroizing::new(<Ristretto as Ciphersuite>::F::random(&mut OsRng)),
tx.sig_hash(spec.genesis()),
),
},
);
assert!(tributaries[sender].1.add_transaction(tx.clone()).await);
// Sleep for two blocks
sleep(Duration::from_secs((2 * Tributary::<MemDb, Transaction, LocalP2p>::block_time()).into()))
.await;
// All tributaries should have acknowledged this transaction in a block
let mut included_in = None;
for (_, tributary) in tributaries {
if included_in.is_none() {
let mut found = tributary.tip();
let mut block;
while {
block = tributary.block(&found).unwrap();
block.transactions.is_empty()
} {
found = block.parent();
}
included_in = Some(found);
}
let block = tributary.block(&included_in.unwrap()).unwrap();
assert_eq!(block.transactions, vec![tx.clone()]);
}
}

View file

@ -82,6 +82,7 @@ impl<P: P2p> P2p for Arc<P> {
} }
} }
#[derive(Clone)]
pub struct Tributary<D: Db, T: Transaction, P: P2p> { pub struct Tributary<D: Db, T: Transaction, P: P2p> {
network: TendermintNetwork<D, T, P>, network: TendermintNetwork<D, T, P>,
@ -123,6 +124,10 @@ impl<D: Db, T: Transaction, P: P2p> Tributary<D, T, P> {
Some(Self { network, synced_block, messages }) Some(Self { network, synced_block, messages })
} }
pub fn block_time() -> u32 {
TendermintNetwork::<D, T, P>::block_time()
}
pub fn genesis(&self) -> [u8; 32] { pub fn genesis(&self) -> [u8; 32] {
self.network.blockchain.read().unwrap().genesis() self.network.blockchain.read().unwrap().genesis()
} }