From a5f1ddaf1b4b1511d9cb78c9e0292371b99efc50 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Sun, 16 Oct 2022 03:29:55 -0400 Subject: [PATCH] Refactor out external parts to generics Also creates a dedicated file for the message log. --- Cargo.lock | 7 + substrate/tendermint/src/ext.rs | 28 +++ substrate/tendermint/src/lib.rs | 253 +++++++++--------------- substrate/tendermint/src/message_log.rs | 84 ++++++++ substrate/tendermint/tests/ext.rs | 29 +++ 5 files changed, 243 insertions(+), 158 deletions(-) create mode 100644 substrate/tendermint/src/ext.rs create mode 100644 substrate/tendermint/src/message_log.rs create mode 100644 substrate/tendermint/tests/ext.rs diff --git a/Cargo.lock b/Cargo.lock index fdb001ba..4c2cbfa0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8853,6 +8853,13 @@ dependencies = [ "winapi", ] +[[package]] +name = "tendermint-machine" +version = "0.1.0" +dependencies = [ + "tokio", +] + [[package]] name = "term" version = "0.7.0" diff --git a/substrate/tendermint/src/ext.rs b/substrate/tendermint/src/ext.rs new file mode 100644 index 00000000..d46aa383 --- /dev/null +++ b/substrate/tendermint/src/ext.rs @@ -0,0 +1,28 @@ +use core::{hash::Hash, fmt::Debug}; + +pub trait ValidatorId: Clone + Copy + PartialEq + Eq + Hash + Debug {} +impl ValidatorId for V {} + +#[derive(Clone, Copy, PartialEq, Eq, Debug)] +pub enum BlockError { + // Invalid behavior entirely + Fatal, + // Potentially valid behavior dependent on unsynchronized state + Temporal, +} + +pub trait Block: Clone + PartialEq { + type Id: Copy + Clone + PartialEq; + + fn id(&self) -> Self::Id; +} + +pub trait Network { + fn total_weight(&self) -> u64; + fn weight(&self, validator: V) -> u64; + fn threshold(&self) -> u64 { + ((self.total_weight() * 2) / 3) + 1 + } + + fn validate(&mut self, block: B) -> Result<(), BlockError>; +} diff --git a/substrate/tendermint/src/lib.rs b/substrate/tendermint/src/lib.rs index 54c83c08..08b05675 100644 --- a/substrate/tendermint/src/lib.rs +++ b/substrate/tendermint/src/lib.rs @@ -1,28 +1,13 @@ -use std::collections::HashMap; +pub mod ext; +use ext::*; -use tokio::{task::{JoinHandle, spawn}, sync::mpsc}; +mod message_log; -type ValidatorId = u16; -const VALIDATORS: ValidatorId = 5; - -#[derive(Clone, Copy, PartialEq, Eq, Debug)] -struct Hash; -#[derive(Clone, Copy, PartialEq, Eq, Debug)] -struct Block { - hash: Hash -} - -#[derive(Clone, Copy, PartialEq, Eq, Debug)] -enum BlockError { - // Invalid behavior entirely - Fatal, - // Potentially valid behavior dependent on unsynchronized state - Temporal, -} - -fn valid(block: &Block) -> Result<(), BlockError> { - Ok(()) -} +// Type aliases which are distinct according to the type system +#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] +pub(crate) struct BlockNumber(u32); +#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] +pub(crate) struct Round(u32); #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] enum Step { @@ -31,14 +16,14 @@ enum Step { Precommit, } -#[derive(Clone, PartialEq, Eq, Debug)] -enum Data { - Proposal(Option, Block), - Prevote(Option), - Precommit(Option), +#[derive(Clone, PartialEq)] +enum Data { + Proposal(Option, B), + Prevote(Option), + Precommit(Option), } -impl Data { +impl Data { fn step(&self) -> Step { match self { Data::Proposal(..) => Step::Propose, @@ -48,26 +33,36 @@ impl Data { } } -#[derive(Clone, PartialEq, Eq, Debug)] -struct Message { - sender: ValidatorId, +#[derive(Clone, PartialEq)] +struct Message { + sender: V, - height: u32, - round: u32, + number: BlockNumber, + round: Round, - data: Data, + data: Data, } #[derive(Clone, Copy, PartialEq, Eq, Debug)] -enum TendermintError { - MaliciousOrTemporal(u16), // TODO: Remove when we figure this out - Malicious(u16), - Offline(u16), +enum TendermintError { + Malicious(V), + Offline(V), Temporal, } -fn proposer(height: u32, round: u32) -> ValidatorId { - ValidatorId::try_from((height + round) % u32::try_from(VALIDATORS).unwrap()).unwrap() +/* +use std::collections::HashMap; + +use tokio::{ + task::{JoinHandle, spawn}, + sync::mpsc, +}; + +type ValidatorId = u16; +const VALIDATORS: ValidatorId = 5; + +fn proposer(number: u32, round: u32) -> ValidatorId { + ValidatorId::try_from((number + round) % u32::try_from(VALIDATORS).unwrap()).unwrap() } #[derive(Debug)] @@ -75,7 +70,7 @@ struct TendermintMachine { proposer: ValidatorId, personal_proposal: Option, - height: u32, + number: u32, log_map: HashMap>>, precommitted: HashMap, @@ -98,7 +93,7 @@ struct TendermintHandle { impl TendermintMachine { fn broadcast(&self, data: Data) -> Option { - let msg = Message { sender: self.proposer, height: self.height, round: self.round, data }; + let msg = Message { sender: self.proposer, number: self.number, round: self.round, data }; let res = self.message(msg).unwrap(); self.broadcast.send(msg).unwrap(); res @@ -106,15 +101,15 @@ impl TendermintMachine { // 14-21 fn round_propose(&mut self) { - // This will happen if it's a new height and propose hasn't been called yet + // This will happen if it's a new block and propose hasn't been called yet if self.personal_proposal.is_none() { - // Ensure it's actually a new height. Else, the caller failed to provide necessary data yet + // Ensure it's actually a new block. Else, the caller failed to provide necessary data yet // is still executing the machine debug_assert_eq!(self.round, 0); return; } - if proposer(self.height, self.round) == self.proposer { + if proposer(self.number, self.round) == self.proposer { let (round, block) = if let Some((round, block)) = self.valid { (Some(round), block) } else { @@ -122,7 +117,7 @@ impl TendermintMachine { }; debug_assert!(self.broadcast(Data::Proposal(round, block)).is_none()); } else { - self.timeouts.write().unwrap().insert(Step::Precommit, self.timeout(Step::Precommit)); + self.timeouts.write().unwrap().insert(Step::Propose, self.timeout(Step::Propose)); } } @@ -133,7 +128,7 @@ impl TendermintMachine { self.round_propose(); } - /// Called whenever a new height occurs + /// Called whenever a new block occurs fn propose(&mut self, block: Block) { self.personal_proposal = Some(block); self.round_propose(); @@ -143,7 +138,7 @@ impl TendermintMachine { fn reset(&mut self) { self.personal_proposal = None; - self.height += 1; + self.number += 1; self.log_map = HashMap::new(); self.precommitted = HashMap::new(); @@ -155,7 +150,7 @@ impl TendermintMachine { } // 10 - pub fn new(proposer: ValidatorId, height: u32) -> TendermintHandle { + pub fn new(proposer: ValidatorId, number: u32) -> TendermintHandle { let block = Arc::new(RwLock::new(None)); let (msg_send, mut msg_recv) = mpsc::channel(100); // Backlog to accept. Currently arbitrary let (broadcast_send, broadcast_recv) = mpsc::channel(5); @@ -168,7 +163,7 @@ impl TendermintMachine { proposer, personal_proposal: None, - height, + number, log_map: HashMap::new(), precommitted: HashMap::new(), @@ -177,7 +172,7 @@ impl TendermintMachine { valid: None, round: 0, - step: Step::Propose + step: Step::Propose, }; loop { @@ -193,106 +188,31 @@ impl TendermintMachine { let now = Instant::now(); let (t1, t2, t3) = { -let timeouts = self.timeouts.read().unwrap(); -let ready = |step| timeouts.get(step).unwrap_or(now) < now; -(ready(Step::Propose), ready(Step::Prevote), ready(Step::Precommit)) -}; + let timeouts = self.timeouts.read().unwrap(); + let ready = |step| timeouts.get(step).unwrap_or(now) < now; + (ready(Step::Propose), ready(Step::Prevote), ready(Step::Precommit)) + }; -if t1 { // Propose timeout -} -if t2 { // Prevote timeout -} -if t3 { // Precommit timeout -} + if t1 { // Propose timeout + } + if t2 { // Prevote timeout + } + if t3 { // Precommit timeout + } match recv.try_recv() { Ok(msg) => machine.message(msg), Err(TryRecvError::Empty) => tokio::yield_now().await, - Err(TryRecvError::Disconnected) => break + Err(TryRecvError::Disconnected) => break, } } - }) + }), } } - // Returns true if it's a new message - fn log(&mut self, msg: Message) -> Result { - if matches!(msg.data, Data::Proposal(..)) && (msg.sender != proposer(msg.height, msg.round)) { - Err(TendermintError::Malicious(msg.sender))?; - }; - - if !self.log_map.contains_key(&msg.round) { - self.log_map.insert(msg.round, HashMap::new()); - } - let log = self.log_map.get_mut(&msg.round).unwrap(); - if !log.contains_key(&msg.sender) { - log.insert(msg.sender, HashMap::new()); - } - let log = log.get_mut(&msg.sender).unwrap(); - - // Handle message replays without issue. It's only multiple messages which is malicious - let step = msg.data.step(); - if let Some(existing) = log.get(&step) { - if existing != &msg.data { - Err(TendermintError::Malicious(msg.sender))?; - } - return Ok(false); - } - - // If they already precommitted to a distinct hash, error - if let Data::Precommit(Some(hash)) = msg.data { - if let Some(prev) = self.precommitted.get(&msg.sender) { - if hash != *prev { - Err(TendermintError::Malicious(msg.sender))?; - } - } - self.precommitted.insert(msg.sender, hash); - } - - log.insert(step, msg.data); - Ok(true) - } - - fn message_instances(&self, round: u32, data: Data) -> (usize, usize) { - let participating = 0; - let weight = 0; - for participant in self.log_map[&round].values() { - if let Some(msg) = participant.get(&data.step()) { - let validator_weight = 1; // TODO - participating += validator_weight; - if &data == msg { - weight += validator_weight; - } - - // If the msg exists, yet has a distinct hash, this validator is faulty - // (at least for precommit) - // TODO - } - } - (participating, weight) - } - - fn participation(&self, round: u32, step: Step) -> usize { - let (participating, _) = self.message_instances(round, match step { - Step::Propose => panic!("Checking for participation on Propose"), - Step::Prevote => Data::Prevote(None), - Step::Precommit => Data::Precommit(None), - }); - participating - } - - fn has_participation(&self, round: u32, step: Step) -> bool { - self.participation(round, step) >= ((VALIDATORS / 3 * 2) + 1).into() - } - - fn has_consensus(&self, round: u32, data: Data) -> bool { - let (_, weight) = self.message_instances(round, data); - weight >= ((VALIDATORS / 3 * 2) + 1).into() - } - // 49-54 fn check_committed(&mut self, round_num: u32) -> Option { - let proposer = proposer(self.height, round_num); + let proposer = proposer(self.number, round_num); // Safe as we only check for rounds which we received a message for let round = self.log_map[&round_num]; @@ -302,7 +222,8 @@ if t3 { // Precommit timeout debug_assert!(matches!(proposal, Data::Proposal(..))); if let Data::Proposal(_, block) = proposal { // Check if it has gotten a sufficient amount of precommits - let (participants, weight) = self.message_instances(round_num, Data::Precommit(Some(block.hash))); + let (participants, weight) = + self.message_instances(round_num, Data::Precommit(Some(block.hash))); let threshold = ((VALIDATORS / 3) * 2) + 1; if weight >= threshold.into() { @@ -310,8 +231,12 @@ if t3 { // Precommit timeout return Some(*block); } - if (participants >= threshold.into()) && first { - schedule timeoutPrecommit(self.height, round); + // 47-48 + if participants >= threshold.into() { + let map = self.timeouts.write().unwrap(); + if !map.contains_key(Step::Precommit) { + map.insert(Step::Precommit, self.timeout(Step::Precommit)); + } } } } @@ -320,10 +245,14 @@ if t3 { // Precommit timeout } fn message(&mut self, msg: Message) -> Result, TendermintError> { - if msg.height != self.height { + if msg.number != self.number { Err(TendermintError::Temporal)?; } + if matches!(msg.data, Data::Proposal(..)) && (msg.sender != proposer(msg.height, msg.round)) { + Err(TendermintError::Malicious(msg.sender))?; + }; + if !self.log(msg)? { return Ok(None); } @@ -352,18 +281,23 @@ if t3 { // Precommit timeout } if self.step == Step::Propose { - if let Some(proposal) = round.get(&proposer(self.height, self.round)).map(|p| p.get(&Step::Propose)).flatten() { + if let Some(proposal) = + round.get(&proposer(self.number, self.round)).map(|p| p.get(&Step::Propose)).flatten() + { debug_assert!(matches!(proposal, Data::Proposal(..))); if let Data::Proposal(vr, block) = proposal { if let Some(vr) = vr { // 28-33 let vr = *vr; if (vr < self.round) && self.has_consensus(vr, Data::Prevote(Some(block.hash))) { - debug_assert!(self.broadcast( - Data::Prevote( - Some(block.hash).filter(|_| self.locked.map(|(round, value)| (round <= vr) || (block == &value)).unwrap_or(true)) - ) - ).is_none()); + debug_assert!(self + .broadcast(Data::Prevote(Some(block.hash).filter(|_| { + self + .locked + .map(|(round, value)| (round <= vr) || (block == &value)) + .unwrap_or(true) + }))) + .is_none()); self.step = Step::Prevote; } else { Err(TendermintError::Malicious(msg.sender))?; @@ -371,7 +305,11 @@ if t3 { // Precommit timeout } else { // 22-27 valid(&block).map_err(|_| TendermintError::Malicious(msg.sender))?; - debug_assert!(self.broadcast(Data::Prevote(Some(block.hash).filter(|_| self.locked.is_none() || self.locked.map(|locked| &locked.1) == Some(block)))).is_none()); + debug_assert!(self + .broadcast(Data::Prevote(Some(block.hash).filter( + |_| self.locked.is_none() || self.locked.map(|locked| &locked.1) == Some(block) + ))) + .is_none()); self.step = Step::Prevote; } } @@ -381,8 +319,11 @@ if t3 { // Precommit timeout if self.step == Step::Prevote { let (participation, weight) = self.message_instances(self.round, Data::Prevote(None)); // 34-35 - if (participation > (((VALIDATORS / 3) * 2) + 1).into()) && first { - self.timeouts.write().unwrap().insert(Step::Prevote, self.timeout(Step::Prevote)); + if participation > (((VALIDATORS / 3) * 2) + 1).into() { + let map = self.timeouts.write().unwrap(); + if !map.contains_key(Step::Prevote) { + map.insert(Step::Prevote, self.timeout(Step::Prevote)) + } } // 44-46 @@ -392,11 +333,7 @@ if t3 { // Precommit timeout } } - // 47-48 - if self.has_participation(self.round, Step::Precommit) && first { - self.timeouts.write().unwrap().insert(Step::Precommit, self.timeout(Step::Precommit)); - } - Ok(None) } } +*/ diff --git a/substrate/tendermint/src/message_log.rs b/substrate/tendermint/src/message_log.rs new file mode 100644 index 00000000..ad240ba0 --- /dev/null +++ b/substrate/tendermint/src/message_log.rs @@ -0,0 +1,84 @@ +use std::{sync::Arc, collections::HashMap}; + +use crate::{ext::*, Round, Step, Data, Message, TendermintError}; + +pub(crate) struct MessageLog> { + network: Arc, + precommitted: HashMap, + log: HashMap>>>, +} + +impl> MessageLog { + pub(crate) fn new(network: Arc) -> MessageLog { + MessageLog { network, precommitted: HashMap::new(), log: HashMap::new() } + } + + // Returns true if it's a new message + pub(crate) fn log(&mut self, msg: Message) -> Result> { + let round = self.log.entry(msg.round).or_insert_with(HashMap::new); + let msgs = round.entry(msg.sender).or_insert_with(HashMap::new); + + // Handle message replays without issue. It's only multiple messages which is malicious + let step = msg.data.step(); + if let Some(existing) = msgs.get(&step) { + if existing != &msg.data { + Err(TendermintError::Malicious(msg.sender))?; + } + return Ok(false); + } + + // If they already precommitted to a distinct hash, error + if let Data::Precommit(Some(hash)) = msg.data { + if let Some(prev) = self.precommitted.get(&msg.sender) { + if hash != *prev { + Err(TendermintError::Malicious(msg.sender))?; + } + } + self.precommitted.insert(msg.sender, hash); + } + + msgs.insert(step, msg.data); + Ok(true) + } + + // For a given round, return the participating weight for this step, and the weight agreeing with + // the data. + pub(crate) fn message_instances(&self, round: Round, data: Data) -> (u64, u64) { + let mut participating = 0; + let mut weight = 0; + for (participant, msgs) in &self.log[&round] { + if let Some(msg) = msgs.get(&data.step()) { + let validator_weight = self.network.weight(*participant); + participating += validator_weight; + if &data == msg { + weight += validator_weight; + } + } + } + (participating, weight) + } + + // Get the participation in a given round for a given step. + pub(crate) fn participation(&self, round: Round, step: Step) -> u64 { + let (participating, _) = self.message_instances( + round, + match step { + Step::Propose => panic!("Checking for participation on Propose"), + Step::Prevote => Data::Prevote(None), + Step::Precommit => Data::Precommit(None), + }, + ); + participating + } + + // Check if there's been a BFT level of participation + pub(crate) fn has_participation(&self, round: Round, step: Step) -> bool { + self.participation(round, step) >= self.network.threshold() + } + + // Check if consensus has been reached on a specific piece of data + pub(crate) fn has_consensus(&self, round: Round, data: Data) -> bool { + let (_, weight) = self.message_instances(round, data); + weight >= self.network.threshold() + } +} diff --git a/substrate/tendermint/tests/ext.rs b/substrate/tendermint/tests/ext.rs new file mode 100644 index 00000000..f4a37922 --- /dev/null +++ b/substrate/tendermint/tests/ext.rs @@ -0,0 +1,29 @@ +use tendermint_machine::ext::{BlockError, Block, Network}; + +#[derive(Clone, PartialEq)] +struct TestBlock { + id: u32, + valid: Result<(), BlockError>, +} + +impl Block for TestBlock { + type Id = u32; + + fn id(&self) -> u32 { + self.id + } +} + +struct TestNetwork; +impl Network for TestNetwork { + fn total_weight(&self) -> u64 { + 5 + } + fn weight(&self, id: u16) -> u64 { + [1, 1, 1, 1, 1][usize::try_from(id).unwrap()] + } + + fn validate(&mut self, block: TestBlock) -> Result<(), BlockError> { + block.valid + } +}