From 8c74576cf03b22072f51720d2e056b5ce2f26e05 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Sat, 22 Apr 2023 10:49:52 -0400 Subject: [PATCH] Add a test to the coordinator for running a Tributary Impls a LocalP2p for testing. Moves rebroadcasting into Tendermint, since it's what knows if a message is fully valid + original. Removes TributarySpec::validators() HashMap, as its non-determinism caused different instances to have different round robin schedules. It was already prior moved to a Vec for this issue, so I'm unsure why this remnant existed. Also renames the GH no-std workflow from the prior commit. --- .github/workflows/no-std.yml | 4 +- Cargo.lock | 1 + coordinator/Cargo.toml | 2 + coordinator/src/main.rs | 2 +- coordinator/src/p2p.rs | 80 +++++++++-- coordinator/src/tests/tributary/chain.rs | 128 ++++++++++++++++++ .../tests/{tributary.rs => tributary/mod.rs} | 2 + coordinator/src/tributary/db.rs | 6 +- coordinator/src/tributary/mod.rs | 8 +- coordinator/src/tributary/scanner.rs | 11 +- coordinator/tributary/src/lib.rs | 37 +++-- coordinator/tributary/src/tendermint.rs | 2 +- coordinator/tributary/tendermint/src/lib.rs | 45 +++--- 13 files changed, 259 insertions(+), 69 deletions(-) create mode 100644 coordinator/src/tests/tributary/chain.rs rename coordinator/src/tests/{tributary.rs => tributary/mod.rs} (99%) diff --git a/.github/workflows/no-std.yml b/.github/workflows/no-std.yml index edf7fcd7..e7a2639f 100644 --- a/.github/workflows/no-std.yml +++ b/.github/workflows/no-std.yml @@ -1,4 +1,4 @@ -name: Tests +name: no-std build on: push: @@ -7,7 +7,7 @@ on: pull_request: jobs: - test: + build: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 diff --git a/Cargo.lock b/Cargo.lock index 61b105c3..986e3d88 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1316,6 +1316,7 @@ dependencies = [ "rand_core 0.6.4", "serai-client", "serai-db", + "sp-application-crypto", "tokio", "tributary-chain", "zeroize", diff --git a/coordinator/Cargo.toml b/coordinator/Cargo.toml index f113a69d..0d958c49 100644 --- a/coordinator/Cargo.toml +++ b/coordinator/Cargo.toml @@ -26,6 +26,8 @@ frost = { package = "modular-frost", path = "../crypto/frost" } scale = { package = "parity-scale-codec", version = "3", features = ["derive"] } +sp-application-crypto = { git = "https://github.com/serai-dex/substrate", default-features = false } + serai-db = { path = "../common/db" } processor-messages = { package = "processor-messages", path = "../processor/messages" } diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 0044c08b..f6d842dd 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -94,7 +94,7 @@ async fn main() { let db = MemDb::new(); // TODO let key = Zeroizing::new(::F::ZERO); // TODO - let p2p = LocalP2p {}; // TODO + let p2p = LocalP2p::new(1).swap_remove(0); // TODO let processor = processor::MemProcessor::new(); // TODO diff --git a/coordinator/src/p2p.rs b/coordinator/src/p2p.rs index 918a8c1f..7df40a46 100644 --- a/coordinator/src/p2p.rs +++ b/coordinator/src/p2p.rs @@ -1,25 +1,79 @@ use core::fmt::Debug; +use std::{ + sync::{Arc, RwLock}, + collections::VecDeque, +}; use async_trait::async_trait; -use tributary::P2p as TributaryP2p; +pub use tributary::P2p as TributaryP2p; -// TODO -#[async_trait] -pub trait P2p: Send + Sync + Clone + Debug + TributaryP2p {} +#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] +pub enum P2pMessageKind { + Tributary, +} -// TODO -#[derive(Clone, Debug)] -pub struct LocalP2p {} +impl P2pMessageKind { + fn to_byte(self) -> u8 { + match self { + P2pMessageKind::Tributary => 0, + } + } -#[async_trait] -impl TributaryP2p for LocalP2p { - async fn broadcast(&self, msg: Vec) { - // TODO - todo!() + fn from_byte(byte: u8) -> Option { + match byte { + 0 => Some(P2pMessageKind::Tributary), + _ => None, + } } } // TODO #[async_trait] -impl P2p for LocalP2p {} +pub trait P2p: Send + Sync + Clone + Debug + TributaryP2p { + async fn broadcast(&self, msg: Vec); + async fn receive(&self) -> Option<(P2pMessageKind, Vec)>; +} + +#[derive(Clone, Debug)] +pub struct LocalP2p(usize, Arc>>>>); + +impl LocalP2p { + pub fn new(validators: usize) -> Vec { + let shared = Arc::new(RwLock::new(vec![VecDeque::new(); validators])); + let mut res = vec![]; + for i in 0 .. validators { + res.push(LocalP2p(i, shared.clone())); + } + res + } +} + +#[async_trait] +impl P2p for LocalP2p { + async fn broadcast(&self, msg: Vec) { + for (i, msg_queue) in self.1.write().unwrap().iter_mut().enumerate() { + if i == self.0 { + continue; + } + msg_queue.push_back(msg.clone()); + } + } + + async fn receive(&self) -> Option<(P2pMessageKind, Vec)> { + let mut msg = self.1.write().unwrap()[self.0].pop_front()?; + if msg.is_empty() { + log::error!("empty p2p message"); + return None; + } + Some((P2pMessageKind::from_byte(msg.remove(0))?, msg)) + } +} + +#[async_trait] +impl TributaryP2p for LocalP2p { + async fn broadcast(&self, mut msg: Vec) { + msg.insert(0, P2pMessageKind::Tributary.to_byte()); + ::broadcast(self, msg).await + } +} diff --git a/coordinator/src/tests/tributary/chain.rs b/coordinator/src/tests/tributary/chain.rs new file mode 100644 index 00000000..e2c5575d --- /dev/null +++ b/coordinator/src/tests/tributary/chain.rs @@ -0,0 +1,128 @@ +use std::time::{Duration, SystemTime}; + +use zeroize::Zeroizing; + +use rand_core::{RngCore, OsRng}; + +use ciphersuite::{ + group::{ff::Field, GroupEncoding}, + Ciphersuite, Ristretto, +}; + +use sp_application_crypto::sr25519; + +use serai_client::{ + primitives::{NETWORKS, NetworkId, Amount}, + validator_sets::primitives::{Session, ValidatorSet, ValidatorSetData}, +}; + +use tokio::time::sleep; + +use serai_db::MemDb; + +use tributary::Tributary; + +use crate::{P2pMessageKind, P2p, LocalP2p, processor::MemProcessor, tributary::TributarySpec}; + +fn new_spec(keys: &[Zeroizing<::F>]) -> TributarySpec { + let mut serai_block = [0; 32]; + OsRng.fill_bytes(&mut serai_block); + + let start_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); + + let set = ValidatorSet { session: Session(0), network: NetworkId::Bitcoin }; + + let set_data = ValidatorSetData { + bond: Amount(100), + network: NETWORKS[&NetworkId::Bitcoin].clone(), + participants: keys + .iter() + .map(|key| { + (sr25519::Public((::generator() * **key).to_bytes()), Amount(100)) + }) + .collect::>() + .try_into() + .unwrap(), + }; + + TributarySpec::new(serai_block, start_time, set, set_data) +} + +#[tokio::test] +async fn tributary_test() { + let mut keys = vec![]; + for _ in 0 .. 5 { + keys.push(Zeroizing::new(::F::random(&mut OsRng))); + } + + let processor = MemProcessor::new(); + + let spec = new_spec(&keys); + + let p2p = LocalP2p::new(keys.len()); + + let mut tributaries = vec![]; + + for (i, key) in keys.iter().enumerate() { + tributaries.push( + Tributary::<_, crate::tributary::Transaction, _>::new( + MemDb::new(), + spec.genesis(), + spec.start_time(), + key.clone(), + spec.validators(), + p2p[i].clone(), + ) + .await + .unwrap(), + ); + } + + let mut blocks = 0; + let mut last_block = spec.genesis(); + + let timeout = SystemTime::now() + Duration::from_secs(70); + while (blocks < 10) && (SystemTime::now().duration_since(timeout).is_err()) { + for (i, p2p) in p2p.iter().enumerate() { + while let Some(msg) = p2p.receive().await { + match msg.0 { + P2pMessageKind::Tributary => { + tributaries[i].handle_message(&msg.1).await; + } + } + } + } + + let tip = tributaries[0].tip(); + if tip != last_block { + last_block = tip; + blocks += 1; + } + + sleep(Duration::from_millis(100)).await; + } + + if blocks != 10 { + panic!("tributary chain test hit timeout"); + } + + // Handle all existing messages + for (i, p2p) in p2p.iter().enumerate() { + while let Some(msg) = p2p.receive().await { + match msg.0 { + P2pMessageKind::Tributary => { + tributaries[i].handle_message(&msg.1).await; + } + } + } + } + + // All tributaries should agree on the tip + let mut final_block = None; + for tributary in tributaries { + final_block = final_block.or_else(|| Some(tributary.tip())); + if tributary.tip() != final_block.unwrap() { + panic!("tributary had different tip"); + } + } +} diff --git a/coordinator/src/tests/tributary.rs b/coordinator/src/tests/tributary/mod.rs similarity index 99% rename from coordinator/src/tests/tributary.rs rename to coordinator/src/tests/tributary/mod.rs index 0ec4aed0..0d1741eb 100644 --- a/coordinator/src/tests/tributary.rs +++ b/coordinator/src/tests/tributary/mod.rs @@ -9,6 +9,8 @@ use tributary::{ReadWrite, tests::random_signed}; use crate::tributary::{SignData, Transaction}; +mod chain; + fn random_u32(rng: &mut R) -> u32 { u32::try_from(rng.next_u64() >> 32).unwrap() } diff --git a/coordinator/src/tributary/db.rs b/coordinator/src/tributary/db.rs index 92095e82..e860d1bd 100644 --- a/coordinator/src/tributary/db.rs +++ b/coordinator/src/tributary/db.rs @@ -97,7 +97,7 @@ impl TributaryDb { genesis: [u8; 32], id: [u8; 32], attempt: u32, - signer: &::G, + signer: ::G, ) -> Vec { Self::tributary_key( b"data", @@ -117,7 +117,7 @@ impl TributaryDb { genesis: [u8; 32], id: [u8; 32], attempt: u32, - signer: &::G, + signer: ::G, ) -> Option> { getter.get(Self::data_key(label, genesis, id, attempt, signer)) } @@ -127,7 +127,7 @@ impl TributaryDb { genesis: [u8; 32], id: [u8; 32], attempt: u32, - signer: &::G, + signer: ::G, data: &[u8], ) -> u16 { let received_key = Self::data_received_key(label, genesis, id, attempt); diff --git a/coordinator/src/tributary/mod.rs b/coordinator/src/tributary/mod.rs index 3a7d06a0..fbad2fca 100644 --- a/coordinator/src/tributary/mod.rs +++ b/coordinator/src/tributary/mod.rs @@ -91,12 +91,8 @@ impl TributarySpec { None } - pub fn validators(&self) -> HashMap<::G, u64> { - let mut res = HashMap::new(); - for (key, amount) in self.validators.clone() { - res.insert(key, amount); - } - res + pub fn validators(&self) -> Vec<(::G, u64)> { + self.validators.clone() } } diff --git a/coordinator/src/tributary/scanner.rs b/coordinator/src/tributary/scanner.rs index bfac1eac..d3cc6656 100644 --- a/coordinator/src/tributary/scanner.rs +++ b/coordinator/src/tributary/scanner.rs @@ -70,7 +70,7 @@ async fn handle_block( // If they've already published a TX for this attempt, slash if let Some(data) = - TributaryDb::::data(label, &txn, tributary.genesis(), id, attempt, &signed.signer) + TributaryDb::::data(label, &txn, tributary.genesis(), id, attempt, signed.signer) { if data != bytes { // TODO: Full slash @@ -99,17 +99,18 @@ async fn handle_block( tributary.genesis(), id, attempt, - &signed.signer, + signed.signer, &bytes, ); // If we have all the needed commitments/preprocesses/shares, tell the processor + // TODO: This needs to be coded by weight, not by validator count if received == needed { let mut data = HashMap::new(); - for validator in spec.validators().keys() { + for validator in spec.validators().iter().map(|validator| validator.0) { data.insert( - spec.i(*validator).unwrap(), - if validator == &signed.signer { + spec.i(validator).unwrap(), + if validator == signed.signer { bytes.split_off(0) } else if let Some(data) = TributaryDb::::data(label, &txn, tributary.genesis(), id, attempt, validator) diff --git a/coordinator/tributary/src/lib.rs b/coordinator/tributary/src/lib.rs index e9ea3539..fec02dee 100644 --- a/coordinator/tributary/src/lib.rs +++ b/coordinator/tributary/src/lib.rs @@ -2,7 +2,6 @@ use core::fmt::Debug; use std::{ sync::{Arc, RwLock}, io, - collections::HashMap, }; use async_trait::async_trait; @@ -96,16 +95,16 @@ impl Tributary { genesis: [u8; 32], start_time: u64, key: Zeroizing<::F>, - validators: HashMap<::G, u64>, + validators: Vec<(::G, u64)>, p2p: P, ) -> Option { - let validators_vec = validators.keys().cloned().collect::>(); + let validators_vec = validators.iter().map(|validator| validator.0).collect::>(); let signer = Arc::new(Signer::new(genesis, key)); let validators = Arc::new(Validators::new(genesis, validators)?); let mut blockchain = Blockchain::new(db, genesis, &validators_vec); - let block_number = blockchain.block_number(); + let block_number = BlockNumber(blockchain.block_number().into()); let start_time = if let Some(commit) = blockchain.commit(&blockchain.tip()) { Commit::::decode(&mut commit.as_ref()).unwrap().end_time @@ -117,8 +116,6 @@ impl Tributary { let network = TendermintNetwork { genesis, signer, validators, blockchain, p2p }; - // The genesis block is 0, so we're working on block #1 - let block_number = BlockNumber((block_number + 1).into()); let TendermintHandle { synced_block, messages, machine } = TendermintMachine::new(network.clone(), block_number, start_time, proposal).await; tokio::task::spawn(machine.run()); @@ -129,6 +126,9 @@ impl Tributary { pub fn genesis(&self) -> [u8; 32] { self.network.blockchain.read().unwrap().genesis() } + pub fn block_number(&self) -> u32 { + self.network.blockchain.read().unwrap().block_number() + } pub fn tip(&self) -> [u8; 32] { self.network.blockchain.read().unwrap().tip() } @@ -184,36 +184,31 @@ impl Tributary { } // Return true if the message should be rebroadcasted. - pub async fn handle_message(&mut self, msg: Vec) -> bool { - match msg[0] { - TRANSACTION_MESSAGE => { + pub async fn handle_message(&mut self, msg: &[u8]) -> bool { + match msg.first() { + Some(&TRANSACTION_MESSAGE) => { let Ok(tx) = T::read::<&[u8]>(&mut &msg[1 ..]) else { + log::error!("received invalid transaction message"); return false; }; // TODO: Sync mempools with fellow peers // Can we just rebroadcast transactions not included for at least two blocks? - self.network.blockchain.write().unwrap().add_transaction(false, tx) + let res = self.network.blockchain.write().unwrap().add_transaction(false, tx); + log::debug!("received transaction message. valid new transaction: {res}"); + res } - TENDERMINT_MESSAGE => { + Some(&TENDERMINT_MESSAGE) => { let Ok(msg) = SignedMessageFor::>::decode::<&[u8]>( &mut &msg[1 ..] ) else { + log::error!("received invalid tendermint message"); return false; }; - // If this message isn't to form consensus on the next block, ignore it - if msg.block().0 != (self.network.blockchain.read().unwrap().block_number() + 1).into() { - return false; - } - - if !msg.verify_signature(&self.network.validators) { - return false; - } - self.messages.send(msg).await.unwrap(); - true + false } _ => false, diff --git a/coordinator/tributary/src/tendermint.rs b/coordinator/tributary/src/tendermint.rs index 59814ffc..5145651a 100644 --- a/coordinator/tributary/src/tendermint.rs +++ b/coordinator/tributary/src/tendermint.rs @@ -125,7 +125,7 @@ pub(crate) struct Validators { impl Validators { pub(crate) fn new( genesis: [u8; 32], - validators: HashMap<::G, u64>, + validators: Vec<(::G, u64)>, ) -> Option { let mut total_weight = 0; let mut weights = HashMap::new(); diff --git a/coordinator/tributary/tendermint/src/lib.rs b/coordinator/tributary/tendermint/src/lib.rs index ee355b02..7ed632a9 100644 --- a/coordinator/tributary/tendermint/src/lib.rs +++ b/coordinator/tributary/tendermint/src/lib.rs @@ -6,8 +6,6 @@ use std::{ collections::VecDeque, }; -use log::debug; - use parity_scale_codec::{Encode, Decode}; use futures::{ @@ -109,6 +107,7 @@ impl SignedMessage { enum TendermintError { Malicious(V), Temporal, + AlreadyHandled, } // Type aliases to abstract over generic hell @@ -236,7 +235,7 @@ impl TendermintMachine { async fn slash(&mut self, validator: N::ValidatorId) { if !self.block.slashes.contains(&validator) { - debug!(target: "tendermint", "Slashing validator {:?}", validator); + log::info!(target: "tendermint", "Slashing validator {:?}", validator); self.block.slashes.insert(validator); self.network.slash(validator).await; } @@ -307,7 +306,7 @@ impl TendermintMachine { let mut queue_future = if self.queue.is_empty() { Fuse::terminated() } else { future::ready(()).fuse() }; - if let Some((broadcast, msg)) = futures::select_biased! { + if let Some((our_message, msg, mut sig)) = futures::select_biased! { // Handle a new block occuring externally (an external sync loop) // Has the highest priority as it makes all other futures here irrelevant msg = self.synced_block_recv.next() => { @@ -332,7 +331,7 @@ impl TendermintMachine { // Handle our messages _ = queue_future => { - Some((true, self.queue.pop_front().unwrap())) + Some((true, self.queue.pop_front().unwrap(), None)) }, // Handle any timeouts @@ -346,7 +345,7 @@ impl TendermintMachine { match step { Step::Propose => { // Slash the validator for not proposing when they should've - debug!(target: "tendermint", "Validator didn't propose when they should have"); + log::debug!(target: "tendermint", "Validator didn't propose when they should have"); self.slash( self.weights.proposer(self.block.number, self.block.round().number) ).await; @@ -368,19 +367,27 @@ impl TendermintMachine { if !msg.verify_signature(&self.validators) { continue; } - Some((false, msg.msg)) + Some((false, msg.msg, Some(msg.sig))) } else { break; } } } { let res = self.message(msg.clone()).await; - if res.is_err() && broadcast { - panic!("honest node had invalid behavior"); + if res.is_err() && our_message { + panic!("honest node (ourselves) had invalid behavior"); } match res { - Ok(None) => (), + Ok(None) => { + if let Some(sig) = sig.take() { + // If it's our own message, it shouldn't already be signed + assert!(!our_message); + + // Re-broadcast this since it's an original consensus message + self.network.broadcast(SignedMessage { msg: msg.clone(), sig }).await; + } + } Ok(Some(block)) => { let mut validators = vec![]; let mut sigs = vec![]; @@ -407,9 +414,11 @@ impl TendermintMachine { } Err(TendermintError::Malicious(validator)) => self.slash(validator).await, Err(TendermintError::Temporal) => (), + Err(TendermintError::AlreadyHandled) => (), } - if broadcast { + if our_message { + assert!(sig.is_none()); let sig = self.signer.sign(&msg.encode()).await; self.network.broadcast(SignedMessage { msg, sig }).await; } @@ -433,7 +442,7 @@ impl TendermintMachine { // which forces us to calculate every end time if let Some(end_time) = self.block.end_time.get(&round) { if !self.validators.verify(sender, &commit_msg(end_time.canonical(), id.as_ref()), sig) { - debug!(target: "tendermint", "Validator produced an invalid commit signature"); + log::warn!(target: "tendermint", "Validator produced an invalid commit signature"); Err(TendermintError::Malicious(sender))?; } return Ok(true); @@ -457,13 +466,14 @@ impl TendermintMachine { if matches!(msg.data, Data::Proposal(..)) && (msg.sender != self.weights.proposer(msg.block, msg.round)) { - debug!(target: "tendermint", "Validator who wasn't the proposer proposed"); + log::warn!(target: "tendermint", "Validator who wasn't the proposer proposed"); Err(TendermintError::Malicious(msg.sender))?; }; if !self.block.log.log(msg.clone())? { - return Ok(None); + return Err(TendermintError::AlreadyHandled); } + log::debug!(target: "tendermint", "received new tendermint message"); // All functions, except for the finalizer and the jump, are locked to the current round @@ -481,6 +491,7 @@ impl TendermintMachine { msg.round, Data::Precommit(Some((block.id(), self.signer.sign(&[]).await))), ) { + log::debug!(target: "tendermint", "block {} has consensus", msg.block.0); return Ok(Some(block.clone())); } } @@ -573,7 +584,7 @@ impl TendermintMachine { Ok(_) => (true, Ok(None)), Err(BlockError::Temporal) => (false, Ok(None)), Err(BlockError::Fatal) => (false, { - debug!(target: "tendermint", "Validator proposed a fatally invalid block"); + log::warn!(target: "tendermint", "Validator proposed a fatally invalid block"); Err(TendermintError::Malicious(proposer)) }), }; @@ -590,7 +601,7 @@ impl TendermintMachine { if let Some(vr) = vr { // Malformed message if vr.0 >= self.block.round().number.0 { - debug!(target: "tendermint", "Validator claimed a round from the future was valid"); + log::warn!(target: "tendermint", "Validator claimed a round from the future was valid"); Err(TendermintError::Malicious(msg.sender))?; } @@ -629,7 +640,7 @@ impl TendermintMachine { Ok(_) => (), Err(BlockError::Temporal) => (), Err(BlockError::Fatal) => { - debug!(target: "tendermint", "Validator proposed a fatally invalid block"); + log::warn!(target: "tendermint", "Validator proposed a fatally invalid block"); Err(TendermintError::Malicious(proposer))? } };