From e73a51bfa55ec8d1b820c5dc77a3402e2f62f557 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Thu, 13 Apr 2023 18:43:03 -0400 Subject: [PATCH] Finish binding Tendermint into Tributary and define a Tributary master object --- coordinator/tributary/Cargo.toml | 9 +- coordinator/tributary/src/blockchain.rs | 9 +- coordinator/tributary/src/lib.rs | 151 +++++++++++++++++- coordinator/tributary/src/tendermint.rs | 93 ++++++++--- coordinator/tributary/src/tests/block.rs | 2 +- coordinator/tributary/src/tests/blockchain.rs | 4 +- coordinator/tributary/src/tests/mempool.rs | 2 +- coordinator/tributary/src/tests/merkle.rs | 2 +- .../tributary/src/tests/transaction/mod.rs | 2 +- .../src/tests/transaction/provided.rs | 2 +- .../tributary/src/tests/transaction/signed.rs | 4 +- coordinator/tributary/src/transaction.rs | 2 +- 12 files changed, 245 insertions(+), 37 deletions(-) diff --git a/coordinator/tributary/Cargo.toml b/coordinator/tributary/Cargo.toml index 6ca0da73..89b87634 100644 --- a/coordinator/tributary/Cargo.toml +++ b/coordinator/tributary/Cargo.toml @@ -12,8 +12,10 @@ async-trait = "0.1" thiserror = "1" subtle = "^2" -zeroize = { version = "^1.5", optional = true } -rand_core = { version = "0.6", optional = true } +zeroize = "^1.5" + +rand = "0.8" +rand_chacha = "0.3" blake2 = "0.10" transcript = { package = "flexible-transcript", path = "../../crypto/transcript", features = ["recommended"] } @@ -25,6 +27,7 @@ hex = "0.4" log = "0.4" scale = { package = "parity-scale-codec", version = "3", features = ["derive"] } +futures = "0.3" tendermint = { package = "tendermint-machine", path = "./tendermint" } tokio = { version = "1", features = ["macros", "sync", "time", "rt"] } @@ -34,4 +37,4 @@ zeroize = "^1.5" rand_core = "0.6" [features] -tests = ["zeroize", "rand_core"] +tests = [] diff --git a/coordinator/tributary/src/blockchain.rs b/coordinator/tributary/src/blockchain.rs index 93517de1..7a44904f 100644 --- a/coordinator/tributary/src/blockchain.rs +++ b/coordinator/tributary/src/blockchain.rs @@ -8,6 +8,7 @@ use crate::{Signed, TransactionKind, Transaction, ProvidedTransactions, BlockErr pub struct Blockchain { genesis: [u8; 32], // TODO: db + block_number: u64, tip: [u8; 32], next_nonces: HashMap<::G, u32>, @@ -17,7 +18,7 @@ pub struct Blockchain { impl Blockchain { pub fn new(genesis: [u8; 32], participants: &[::G]) -> Self { - // TODO: Reload next_nonces/provided/mempool + // TODO: Reload block_number/tip/next_nonces/provided/mempool let mut next_nonces = HashMap::new(); for participant in participants { @@ -27,6 +28,7 @@ impl Blockchain { Self { genesis, + block_number: 0, tip: genesis, next_nonces, @@ -39,6 +41,10 @@ impl Blockchain { self.tip } + pub fn block_number(&self) -> u64 { + self.block_number + } + pub fn add_transaction(&mut self, tx: T) -> bool { self.mempool.add(&self.next_nonces, tx) } @@ -73,6 +79,7 @@ impl Blockchain { // None of the following assertions should be reachable since we verified the block self.tip = block.hash(); + self.block_number += 1; for tx in &block.transactions { match tx.kind() { TransactionKind::Provided => { diff --git a/coordinator/tributary/src/lib.rs b/coordinator/tributary/src/lib.rs index 7fef0032..5563bc35 100644 --- a/coordinator/tributary/src/lib.rs +++ b/coordinator/tributary/src/lib.rs @@ -1,4 +1,23 @@ -use std::io; +use core::fmt::Debug; +use std::{ + sync::{Arc, RwLock}, + io, + collections::HashMap, +}; + +use async_trait::async_trait; + +use zeroize::Zeroizing; + +use ciphersuite::{Ciphersuite, Ristretto}; + +use scale::Decode; +use futures::SinkExt; +use ::tendermint::{ + ext::{BlockNumber, Commit, Block as BlockTrait, Network as NetworkTrait}, + SignedMessageFor, SyncedBlock, SyncedBlockSender, MessageSender, TendermintMachine, + TendermintHandle, +}; mod merkle; pub(crate) use merkle::*; @@ -19,11 +38,14 @@ mod mempool; pub use mempool::*; mod tendermint; -pub use crate::tendermint::*; +pub(crate) use crate::tendermint::*; #[cfg(any(test, feature = "tests"))] pub mod tests; +pub(crate) const TRANSACTION_MESSAGE: u8 = 0; +pub(crate) const TENDERMINT_MESSAGE: u8 = 1; + /// An item which can be read and written. pub trait ReadWrite: Sized { fn read(reader: &mut R) -> io::Result; @@ -36,3 +58,128 @@ pub trait ReadWrite: Sized { buf } } + +#[async_trait] +pub trait P2p: 'static + Send + Sync + Clone + Debug { + async fn broadcast(&self, msg: Vec); +} + +#[async_trait] +impl P2p for Arc

{ + async fn broadcast(&self, msg: Vec) { + (*self).broadcast(msg).await + } +} + +pub struct Tributary { + network: Network, + + synced_block: SyncedBlockSender>, + messages: MessageSender>, +} + +impl Tributary { + pub async fn new( + genesis: [u8; 32], + start_time: u64, + key: Zeroizing<::F>, + validators: HashMap<::G, u64>, + p2p: P, + ) -> Self { + let validators_vec = validators.keys().cloned().collect::>(); + + let signer = Arc::new(Signer::new(genesis, key)); + let validators = Arc::new(Validators::new(genesis, validators)); + + let mut blockchain = Blockchain::new(genesis, &validators_vec); + let block_number = blockchain.block_number(); + let start_time = start_time; // TODO: Get the start time from the blockchain + let proposal = TendermintBlock(blockchain.build_block().serialize()); + let blockchain = Arc::new(RwLock::new(blockchain)); + + let network = Network { genesis, signer, validators, blockchain, p2p }; + + // The genesis block is 0, so we're working on block #1 + let block_number = BlockNumber(block_number + 1); + let TendermintHandle { synced_block, messages, machine } = + TendermintMachine::new(network.clone(), block_number, start_time, proposal).await; + tokio::task::spawn(machine.run()); + + Self { network, synced_block, messages } + } + + pub fn provide_transaction(&self, tx: T) { + self.network.blockchain.write().unwrap().provide_transaction(tx) + } + + // Returns if the transaction was valid. + pub async fn add_transaction(&self, tx: T) -> bool { + let mut to_broadcast = vec![TRANSACTION_MESSAGE]; + tx.write(&mut to_broadcast).unwrap(); + let res = self.network.blockchain.write().unwrap().add_transaction(tx); + if res { + self.network.p2p.broadcast(to_broadcast).await; + } + res + } + + // Sync a block. + // TODO: Since we have a static validator set, we should only need the tail commit? + pub async fn sync_block(&mut self, block: Block, commit: Vec) -> bool { + let (tip, block_number) = { + let blockchain = self.network.blockchain.read().unwrap(); + (blockchain.tip(), blockchain.block_number()) + }; + + if block.header.parent != tip { + return false; + } + + let block = TendermintBlock(block.serialize()); + let Ok(commit) = Commit::>::decode(&mut commit.as_ref()) else { + return false; + }; + if !self.network.verify_commit(block.id(), &commit) { + return false; + } + + let number = BlockNumber(block_number + 1); + self.synced_block.send(SyncedBlock { number, block, commit }).await.unwrap(); + true + } + + // Return true if the message should be rebroadcasted. + pub async fn handle_message(&mut self, msg: Vec) -> bool { + match msg[0] { + TRANSACTION_MESSAGE => { + let Ok(tx) = T::read::<&[u8]>(&mut &msg[1 ..]) else { + return false; + }; + + // TODO: Sync mempools with fellow peers + // Can we just rebroadcast transactions not included for at least two blocks? + self.network.blockchain.write().unwrap().add_transaction(tx) + } + + TENDERMINT_MESSAGE => { + let Ok(msg) = SignedMessageFor::>::decode::<&[u8]>(&mut &msg[1 ..]) else { + return false; + }; + + // If this message isn't to form consensus on the next block, ignore it + if msg.block().0 != (self.network.blockchain.read().unwrap().block_number() + 1) { + return false; + } + + if !msg.verify_signature(&self.network.validators) { + return false; + } + + self.messages.send(msg).await.unwrap(); + true + } + + _ => false, + } + } +} diff --git a/coordinator/tributary/src/tendermint.rs b/coordinator/tributary/src/tendermint.rs index 919df6c7..99b48fb1 100644 --- a/coordinator/tributary/src/tendermint.rs +++ b/coordinator/tributary/src/tendermint.rs @@ -1,11 +1,17 @@ use core::ops::Deref; -use std::{sync::Arc, collections::HashMap}; +use std::{ + sync::{Arc, RwLock}, + collections::HashMap, +}; use async_trait::async_trait; use subtle::ConstantTimeEq; use zeroize::{Zeroize, Zeroizing}; +use rand::{SeedableRng, seq::SliceRandom}; +use rand_chacha::ChaCha12Rng; + use transcript::{Transcript, RecommendedTranscript}; use ciphersuite::{ @@ -28,7 +34,10 @@ use tendermint::{ use tokio::time::{Duration, sleep}; -use crate::{ReadWrite, Transaction, TransactionError, BlockHeader, Block, BlockError, Blockchain}; +use crate::{ + TENDERMINT_MESSAGE, ReadWrite, Transaction, TransactionError, BlockHeader, Block, BlockError, + Blockchain, P2p, +}; fn challenge( genesis: [u8; 32], @@ -46,11 +55,17 @@ fn challenge( } #[derive(Clone, PartialEq, Eq, Debug)] -struct Signer { +pub(crate) struct Signer { genesis: [u8; 32], key: Zeroizing<::F>, } +impl Signer { + pub(crate) fn new(genesis: [u8; 32], key: Zeroizing<::F>) -> Signer { + Signer { genesis, key } + } +} + #[async_trait] impl SignerTrait for Signer { type ValidatorId = [u8; 32]; @@ -99,13 +114,40 @@ impl SignerTrait for Signer { } #[derive(Clone, PartialEq, Eq, Debug)] -struct Validators { +pub(crate) struct Validators { genesis: [u8; 32], - weight: u64, + total_weight: u64, weights: HashMap<[u8; 32], u64>, robin: Vec<[u8; 32]>, } +impl Validators { + pub(crate) fn new( + genesis: [u8; 32], + validators: HashMap<::G, u64>, + ) -> Validators { + let mut total_weight = 0; + let mut weights = HashMap::new(); + + let mut transcript = RecommendedTranscript::new(b"Round Robin Randomization"); + let mut robin = vec![]; + for (validator, weight) in validators { + let validator = validator.to_bytes(); + // TODO: Make an error out of this + assert!(weight != 0); + total_weight += weight; + weights.insert(validator, weight); + + transcript.append_message(b"validator", validator); + transcript.append_message(b"weight", weight.to_le_bytes()); + robin.extend(vec![validator; usize::try_from(weight).unwrap()]); + } + robin.shuffle(&mut ChaCha12Rng::from_seed(transcript.rng_seed(b"robin"))); + + Validators { genesis, total_weight, weights, robin } + } +} + impl SignatureScheme for Validators { type ValidatorId = [u8; 32]; type Signature = [u8; 64]; @@ -151,7 +193,7 @@ impl Weights for Validators { type ValidatorId = [u8; 32]; fn total_weight(&self) -> u64 { - self.weight + self.total_weight } fn weight(&self, validator: Self::ValidatorId) -> u64 { self.weights[&validator] @@ -170,7 +212,7 @@ impl Weights for Validators { } #[derive(Clone, PartialEq, Eq, Debug, Encode, Decode)] -struct TendermintBlock(Vec); +pub(crate) struct TendermintBlock(pub Vec); impl BlockTrait for TendermintBlock { type Id = [u8; 32]; fn id(&self) -> Self::Id { @@ -178,16 +220,17 @@ impl BlockTrait for TendermintBlock { } } -#[derive(Clone, PartialEq, Eq, Debug)] -struct Network { - genesis: [u8; 32], - signer: Arc, - validators: Arc, - blockchain: Blockchain, +#[derive(Clone, Debug)] +pub(crate) struct Network { + pub(crate) genesis: [u8; 32], + pub(crate) signer: Arc, + pub(crate) validators: Arc, + pub(crate) blockchain: Arc>>, + pub(crate) p2p: P, } #[async_trait] -impl NetworkTrait for Network { +impl NetworkTrait for Network { type ValidatorId = [u8; 32]; type SignatureScheme = Arc; type Weights = Arc; @@ -206,12 +249,14 @@ impl NetworkTrait for Network { self.validators.clone() } - async fn broadcast(&mut self, _msg: SignedMessageFor) { - todo!() + async fn broadcast(&mut self, msg: SignedMessageFor) { + let mut to_broadcast = vec![TENDERMINT_MESSAGE]; + to_broadcast.extend(msg.encode()); + self.p2p.broadcast(to_broadcast).await } async fn slash(&mut self, validator: Self::ValidatorId) { log::error!( - "validator {} was slashed on tributary {}", + "validator {} triggered a slash event on tributary {}", hex::encode(validator), hex::encode(self.genesis) ); @@ -220,7 +265,7 @@ impl NetworkTrait for Network { async fn validate(&mut self, block: &Self::Block) -> Result<(), TendermintBlockError> { let block = Block::read::<&[u8]>(&mut block.0.as_ref()).map_err(|_| TendermintBlockError::Fatal)?; - self.blockchain.verify_block(&block).map_err(|e| match e { + self.blockchain.read().unwrap().verify_block(&block).map_err(|e| match e { BlockError::TransactionError(TransactionError::MissingProvided(_)) => { TendermintBlockError::Temporal } @@ -231,7 +276,7 @@ impl NetworkTrait for Network { async fn add_block( &mut self, block: Self::Block, - _commit: Commit, + commit: Commit, ) -> Option { let invalid_block = || { // There's a fatal flaw in the code, it's behind a hard fork, or the validators turned @@ -242,12 +287,15 @@ impl NetworkTrait for Network { panic!("validators added invalid block to tributary {}", hex::encode(self.genesis)); }; + assert!(self.verify_commit(block.id(), commit)); + let Ok(block) = Block::read::<&[u8]>(&mut block.0.as_ref()) else { return invalid_block(); }; loop { - match self.blockchain.add_block(&block) { + let block_res = self.blockchain.write().unwrap().add_block(&block); + match block_res { Ok(()) => break, Err(BlockError::TransactionError(TransactionError::MissingProvided(hash))) => { log::error!( @@ -261,7 +309,8 @@ impl NetworkTrait for Network { } } - // TODO: Handle the commit and return the next proposal - todo!() + // TODO: Save the commit to disk + + Some(TendermintBlock(self.blockchain.write().unwrap().build_block().serialize())) } } diff --git a/coordinator/tributary/src/tests/block.rs b/coordinator/tributary/src/tests/block.rs index ba2fffd1..aa1c9c09 100644 --- a/coordinator/tributary/src/tests/block.rs +++ b/coordinator/tributary/src/tests/block.rs @@ -3,7 +3,7 @@ use std::{ collections::{HashSet, HashMap}, }; -use rand_core::{RngCore, OsRng}; +use rand::{RngCore, rngs::OsRng}; use blake2::{Digest, Blake2s256}; diff --git a/coordinator/tributary/src/tests/blockchain.rs b/coordinator/tributary/src/tests/blockchain.rs index 803d80d3..e027b00d 100644 --- a/coordinator/tributary/src/tests/blockchain.rs +++ b/coordinator/tributary/src/tests/blockchain.rs @@ -1,7 +1,7 @@ use std::collections::{HashSet, HashMap}; use zeroize::Zeroizing; -use rand_core::{RngCore, OsRng}; +use rand::{RngCore, rngs::OsRng}; use blake2::{Digest, Blake2s256}; @@ -24,6 +24,7 @@ fn new_blockchain( ) -> Blockchain { let blockchain = Blockchain::new(genesis, participants); assert_eq!(blockchain.tip(), genesis); + assert_eq!(blockchain.block_number(), 0); blockchain } @@ -37,6 +38,7 @@ fn block_addition() { blockchain.verify_block(&block).unwrap(); assert!(blockchain.add_block(&block).is_ok()); assert_eq!(blockchain.tip(), block.hash()); + assert_eq!(blockchain.block_number(), 1); } #[test] diff --git a/coordinator/tributary/src/tests/mempool.rs b/coordinator/tributary/src/tests/mempool.rs index b9a68db8..cf569a9d 100644 --- a/coordinator/tributary/src/tests/mempool.rs +++ b/coordinator/tributary/src/tests/mempool.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use zeroize::Zeroizing; -use rand_core::{RngCore, OsRng}; +use rand::{RngCore, rngs::OsRng}; use ciphersuite::{group::ff::Field, Ciphersuite, Ristretto}; diff --git a/coordinator/tributary/src/tests/merkle.rs b/coordinator/tributary/src/tests/merkle.rs index c3b0e626..6c3ece22 100644 --- a/coordinator/tributary/src/tests/merkle.rs +++ b/coordinator/tributary/src/tests/merkle.rs @@ -1,6 +1,6 @@ use std::collections::HashSet; -use rand_core::{RngCore, OsRng}; +use rand::{RngCore, rngs::OsRng}; #[test] fn merkle() { diff --git a/coordinator/tributary/src/tests/transaction/mod.rs b/coordinator/tributary/src/tests/transaction/mod.rs index 1ecafa55..14d2ab8d 100644 --- a/coordinator/tributary/src/tests/transaction/mod.rs +++ b/coordinator/tributary/src/tests/transaction/mod.rs @@ -4,7 +4,7 @@ use std::{ }; use zeroize::Zeroizing; -use rand_core::{RngCore, CryptoRng}; +use rand::{RngCore, CryptoRng}; use blake2::{Digest, Blake2s256}; diff --git a/coordinator/tributary/src/tests/transaction/provided.rs b/coordinator/tributary/src/tests/transaction/provided.rs index 15538206..3a80359d 100644 --- a/coordinator/tributary/src/tests/transaction/provided.rs +++ b/coordinator/tributary/src/tests/transaction/provided.rs @@ -1,6 +1,6 @@ use std::collections::{HashSet, HashMap}; -use rand_core::OsRng; +use rand::rngs::OsRng; use crate::{Transaction, verify_transaction, tests::random_provided_transaction}; diff --git a/coordinator/tributary/src/tests/transaction/signed.rs b/coordinator/tributary/src/tests/transaction/signed.rs index a6f301a6..8144cd6d 100644 --- a/coordinator/tributary/src/tests/transaction/signed.rs +++ b/coordinator/tributary/src/tests/transaction/signed.rs @@ -1,6 +1,6 @@ use std::collections::{HashSet, HashMap}; -use rand_core::OsRng; +use rand::rngs::OsRng; use blake2::{Digest, Blake2s256}; @@ -13,7 +13,7 @@ use crate::{ #[test] fn serialize_signed() { - let signed = random_signed(&mut rand_core::OsRng); + let signed = random_signed(&mut rand::rngs::OsRng); assert_eq!(Signed::read::<&[u8]>(&mut signed.serialize().as_ref()).unwrap(), signed); } diff --git a/coordinator/tributary/src/transaction.rs b/coordinator/tributary/src/transaction.rs index c0fa48f3..53f265ce 100644 --- a/coordinator/tributary/src/transaction.rs +++ b/coordinator/tributary/src/transaction.rs @@ -73,7 +73,7 @@ pub enum TransactionKind<'a> { Signed(&'a Signed), } -pub trait Transaction: Send + Sync + Clone + Eq + Debug + ReadWrite { +pub trait Transaction: 'static + Send + Sync + Clone + Eq + Debug + ReadWrite { /// Return what type of transaction this is. fn kind(&self) -> TransactionKind<'_>;