diff --git a/coordinator/src/tests/tributary/dkg.rs b/coordinator/src/tests/tributary/dkg.rs index a618d1de..66d1e8d1 100644 --- a/coordinator/src/tests/tributary/dkg.rs +++ b/coordinator/src/tests/tributary/dkg.rs @@ -1,4 +1,5 @@ use core::time::Duration; +use std::collections::HashMap; use zeroize::Zeroizing; use rand_core::{RngCore, OsRng}; @@ -25,7 +26,7 @@ use crate::{ }; #[tokio::test] -async fn dkg_commitments_test() { +async fn dkg_test() { let keys = new_keys(&mut OsRng); let spec = new_spec(&mut OsRng, &keys); @@ -41,8 +42,7 @@ async fn dkg_commitments_test() { let mut commitments = vec![0; 256]; OsRng.fill_bytes(&mut commitments); - let mut tx = - Transaction::DkgCommitments(attempt, commitments.clone(), Transaction::empty_signed()); + let mut tx = Transaction::DkgCommitments(attempt, commitments, Transaction::empty_signed()); tx.sign(&mut OsRng, spec.genesis(), key, 0); txs.push(tx); } @@ -59,7 +59,7 @@ async fn dkg_commitments_test() { wait_for_tx_inclusion(&tributaries[0].1, block_before_tx, tx.hash()).await; } - let expected_msg = CoordinatorMessage::KeyGen(key_gen::CoordinatorMessage::Commitments { + let expected_commitments = CoordinatorMessage::KeyGen(key_gen::CoordinatorMessage::Commitments { id: KeyGenId { set: spec.set(), attempt: 0 }, commitments: txs .iter() @@ -87,32 +87,29 @@ async fn dkg_commitments_test() { (scanner_db, processor, last_block) } + // Instantiate a scanner and verify it has nothing to report + let (mut scanner_db, mut processor, mut last_block) = + new_processor(&keys[0], &spec, &tributaries[0].1).await; + assert!(processor.0.read().unwrap().is_empty()); + + // Publish the last commitment + assert!(tributaries[0].1.add_transaction(txs[0].clone()).await); + wait_for_tx_inclusion(&tributaries[0].1, last_block, txs[0].hash()).await; + sleep(Duration::from_secs(Tributary::<MemDb, Transaction, LocalP2p>::block_time().into())).await; + + // Verify the scanner emits a KeyGen::Commitments message + handle_new_blocks( + &mut scanner_db, + &keys[0], + &mut processor, + &spec, + &tributaries[0].1, + &mut last_block, + ) + .await; { - // Instantiate a scanner and verify it has nothing to report - let (mut scanner_db, mut processor, mut last_block) = - new_processor(&keys[0], &spec, &tributaries[0].1).await; - assert!(processor.0.read().unwrap().is_empty()); - - // Publish the last commitment - assert!(tributaries[0].1.add_transaction(txs[0].clone()).await); - wait_for_tx_inclusion(&tributaries[0].1, last_block, txs[0].hash()).await; - sleep(Duration::from_secs( - (2 * Tributary::<MemDb, Transaction, LocalP2p>::block_time()).into(), - )) - .await; - - // Verify the scanner emits a KeyGen::Commitments message - handle_new_blocks( - &mut scanner_db, - &keys[0], - &mut processor, - &spec, - &tributaries[0].1, - &mut last_block, - ) - .await; let mut msgs = processor.0.write().unwrap(); - assert_eq!(msgs.pop_front().unwrap(), expected_msg); + assert_eq!(msgs.pop_front().unwrap(), expected_commitments); assert!(msgs.is_empty()); } @@ -120,7 +117,95 @@ async fn dkg_commitments_test() { for (i, key) in keys.iter().enumerate() { let (_, processor, _) = new_processor(key, &spec, &tributaries[i].1).await; let mut msgs = processor.0.write().unwrap(); - assert_eq!(msgs.pop_front().unwrap(), expected_msg); + assert_eq!(msgs.pop_front().unwrap(), expected_commitments); + assert!(msgs.is_empty()); + } + + // Now do shares + let mut txs = vec![]; + for key in &keys { + let attempt = 0; + + let mut shares = HashMap::new(); + for i in 0 .. keys.len() { + let mut share = vec![0; 256]; + OsRng.fill_bytes(&mut share); + shares.insert(Participant::new((i + 1).try_into().unwrap()).unwrap(), share); + } + + let mut tx = Transaction::DkgShares(attempt, shares, Transaction::empty_signed()); + tx.sign(&mut OsRng, spec.genesis(), key, 1); + txs.push(tx); + } + + let block_before_tx = tributaries[0].1.tip(); + for (i, tx) in txs.iter().enumerate().skip(1) { + assert!(tributaries[i].1.add_transaction(tx.clone()).await); + } + for tx in txs.iter().skip(1) { + wait_for_tx_inclusion(&tributaries[0].1, block_before_tx, tx.hash()).await; + } + + // With just 4 sets of shares, nothing should happen yet + handle_new_blocks( + &mut scanner_db, + &keys[0], + &mut processor, + &spec, + &tributaries[0].1, + &mut last_block, + ) + .await; + assert!(processor.0.write().unwrap().is_empty()); + + // Publish the final set of shares + assert!(tributaries[0].1.add_transaction(txs[0].clone()).await); + wait_for_tx_inclusion(&tributaries[0].1, last_block, txs[0].hash()).await; + sleep(Duration::from_secs(Tributary::<MemDb, Transaction, LocalP2p>::block_time().into())).await; + + // Each scanner should emit a distinct shares message + let shares_for = |i: usize| { + CoordinatorMessage::KeyGen(key_gen::CoordinatorMessage::Shares { + id: KeyGenId { set: spec.set(), attempt: 0 }, + shares: txs + .iter() + .enumerate() + .map(|(l, tx)| { + if let Transaction::DkgShares(_, shares, _) = tx { + ( + Participant::new((l + 1).try_into().unwrap()).unwrap(), + shares[&Participant::new((i + 1).try_into().unwrap()).unwrap()].clone(), + ) + } else { + panic!("txs had non-shares"); + } + }) + .collect::<HashMap<_, _>>(), + }) + }; + + // Any scanner which has handled the prior blocks should only emit the new event + handle_new_blocks( + &mut scanner_db, + &keys[0], + &mut processor, + &spec, + &tributaries[0].1, + &mut last_block, + ) + .await; + { + let mut msgs = processor.0.write().unwrap(); + assert_eq!(msgs.pop_front().unwrap(), shares_for(0)); + assert!(msgs.is_empty()); + } + + // Yet new scanners should emit all events + for (i, key) in keys.iter().enumerate() { + let (_, processor, _) = new_processor(key, &spec, &tributaries[i].1).await; + let mut msgs = processor.0.write().unwrap(); + assert_eq!(msgs.pop_front().unwrap(), expected_commitments); + assert_eq!(msgs.pop_front().unwrap(), shares_for(i)); assert!(msgs.is_empty()); } }