Refactor out external parts to generics

Also creates a dedicated file for the message log.
This commit is contained in:
Luke Parker 2022-10-16 03:29:55 -04:00
parent 1237c41c53
commit a5f1ddaf1b
No known key found for this signature in database
GPG key ID: F9F1386DB1E119B6
5 changed files with 243 additions and 158 deletions

7
Cargo.lock generated
View file

@ -8853,6 +8853,13 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "tendermint-machine"
version = "0.1.0"
dependencies = [
"tokio",
]
[[package]] [[package]]
name = "term" name = "term"
version = "0.7.0" version = "0.7.0"

View file

@ -0,0 +1,28 @@
use core::{hash::Hash, fmt::Debug};
pub trait ValidatorId: Clone + Copy + PartialEq + Eq + Hash + Debug {}
impl<V: Clone + Copy + PartialEq + Eq + Hash + Debug> ValidatorId for V {}
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum BlockError {
// Invalid behavior entirely
Fatal,
// Potentially valid behavior dependent on unsynchronized state
Temporal,
}
pub trait Block: Clone + PartialEq {
type Id: Copy + Clone + PartialEq;
fn id(&self) -> Self::Id;
}
pub trait Network<V: ValidatorId, B: Block> {
fn total_weight(&self) -> u64;
fn weight(&self, validator: V) -> u64;
fn threshold(&self) -> u64 {
((self.total_weight() * 2) / 3) + 1
}
fn validate(&mut self, block: B) -> Result<(), BlockError>;
}

View file

@ -1,28 +1,13 @@
use std::collections::HashMap; pub mod ext;
use ext::*;
use tokio::{task::{JoinHandle, spawn}, sync::mpsc}; mod message_log;
type ValidatorId = u16; // Type aliases which are distinct according to the type system
const VALIDATORS: ValidatorId = 5; #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub(crate) struct BlockNumber(u32);
#[derive(Clone, Copy, PartialEq, Eq, Debug)] #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
struct Hash; pub(crate) struct Round(u32);
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
struct Block {
hash: Hash
}
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
enum BlockError {
// Invalid behavior entirely
Fatal,
// Potentially valid behavior dependent on unsynchronized state
Temporal,
}
fn valid(block: &Block) -> Result<(), BlockError> {
Ok(())
}
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
enum Step { enum Step {
@ -31,14 +16,14 @@ enum Step {
Precommit, Precommit,
} }
#[derive(Clone, PartialEq, Eq, Debug)] #[derive(Clone, PartialEq)]
enum Data { enum Data<B: Block> {
Proposal(Option<u32>, Block), Proposal(Option<u32>, B),
Prevote(Option<Hash>), Prevote(Option<B::Id>),
Precommit(Option<Hash>), Precommit(Option<B::Id>),
} }
impl Data { impl<B: Block> Data<B> {
fn step(&self) -> Step { fn step(&self) -> Step {
match self { match self {
Data::Proposal(..) => Step::Propose, Data::Proposal(..) => Step::Propose,
@ -48,26 +33,36 @@ impl Data {
} }
} }
#[derive(Clone, PartialEq, Eq, Debug)] #[derive(Clone, PartialEq)]
struct Message { struct Message<V: ValidatorId, B: Block> {
sender: ValidatorId, sender: V,
height: u32, number: BlockNumber,
round: u32, round: Round,
data: Data, data: Data<B>,
} }
#[derive(Clone, Copy, PartialEq, Eq, Debug)] #[derive(Clone, Copy, PartialEq, Eq, Debug)]
enum TendermintError { enum TendermintError<V: ValidatorId> {
MaliciousOrTemporal(u16), // TODO: Remove when we figure this out Malicious(V),
Malicious(u16), Offline(V),
Offline(u16),
Temporal, Temporal,
} }
fn proposer(height: u32, round: u32) -> ValidatorId { /*
ValidatorId::try_from((height + round) % u32::try_from(VALIDATORS).unwrap()).unwrap() use std::collections::HashMap;
use tokio::{
task::{JoinHandle, spawn},
sync::mpsc,
};
type ValidatorId = u16;
const VALIDATORS: ValidatorId = 5;
fn proposer(number: u32, round: u32) -> ValidatorId {
ValidatorId::try_from((number + round) % u32::try_from(VALIDATORS).unwrap()).unwrap()
} }
#[derive(Debug)] #[derive(Debug)]
@ -75,7 +70,7 @@ struct TendermintMachine {
proposer: ValidatorId, proposer: ValidatorId,
personal_proposal: Option<Block>, personal_proposal: Option<Block>,
height: u32, number: u32,
log_map: HashMap<u32, HashMap<ValidatorId, HashMap<Step, Data>>>, log_map: HashMap<u32, HashMap<ValidatorId, HashMap<Step, Data>>>,
precommitted: HashMap<ValidatorId, Hash>, precommitted: HashMap<ValidatorId, Hash>,
@ -98,7 +93,7 @@ struct TendermintHandle {
impl TendermintMachine { impl TendermintMachine {
fn broadcast(&self, data: Data) -> Option<Block> { fn broadcast(&self, data: Data) -> Option<Block> {
let msg = Message { sender: self.proposer, height: self.height, round: self.round, data }; let msg = Message { sender: self.proposer, number: self.number, round: self.round, data };
let res = self.message(msg).unwrap(); let res = self.message(msg).unwrap();
self.broadcast.send(msg).unwrap(); self.broadcast.send(msg).unwrap();
res res
@ -106,15 +101,15 @@ impl TendermintMachine {
// 14-21 // 14-21
fn round_propose(&mut self) { fn round_propose(&mut self) {
// This will happen if it's a new height and propose hasn't been called yet // This will happen if it's a new block and propose hasn't been called yet
if self.personal_proposal.is_none() { if self.personal_proposal.is_none() {
// Ensure it's actually a new height. Else, the caller failed to provide necessary data yet // Ensure it's actually a new block. Else, the caller failed to provide necessary data yet
// is still executing the machine // is still executing the machine
debug_assert_eq!(self.round, 0); debug_assert_eq!(self.round, 0);
return; return;
} }
if proposer(self.height, self.round) == self.proposer { if proposer(self.number, self.round) == self.proposer {
let (round, block) = if let Some((round, block)) = self.valid { let (round, block) = if let Some((round, block)) = self.valid {
(Some(round), block) (Some(round), block)
} else { } else {
@ -122,7 +117,7 @@ impl TendermintMachine {
}; };
debug_assert!(self.broadcast(Data::Proposal(round, block)).is_none()); debug_assert!(self.broadcast(Data::Proposal(round, block)).is_none());
} else { } else {
self.timeouts.write().unwrap().insert(Step::Precommit, self.timeout(Step::Precommit)); self.timeouts.write().unwrap().insert(Step::Propose, self.timeout(Step::Propose));
} }
} }
@ -133,7 +128,7 @@ impl TendermintMachine {
self.round_propose(); self.round_propose();
} }
/// Called whenever a new height occurs /// Called whenever a new block occurs
fn propose(&mut self, block: Block) { fn propose(&mut self, block: Block) {
self.personal_proposal = Some(block); self.personal_proposal = Some(block);
self.round_propose(); self.round_propose();
@ -143,7 +138,7 @@ impl TendermintMachine {
fn reset(&mut self) { fn reset(&mut self) {
self.personal_proposal = None; self.personal_proposal = None;
self.height += 1; self.number += 1;
self.log_map = HashMap::new(); self.log_map = HashMap::new();
self.precommitted = HashMap::new(); self.precommitted = HashMap::new();
@ -155,7 +150,7 @@ impl TendermintMachine {
} }
// 10 // 10
pub fn new(proposer: ValidatorId, height: u32) -> TendermintHandle { pub fn new(proposer: ValidatorId, number: u32) -> TendermintHandle {
let block = Arc::new(RwLock::new(None)); let block = Arc::new(RwLock::new(None));
let (msg_send, mut msg_recv) = mpsc::channel(100); // Backlog to accept. Currently arbitrary let (msg_send, mut msg_recv) = mpsc::channel(100); // Backlog to accept. Currently arbitrary
let (broadcast_send, broadcast_recv) = mpsc::channel(5); let (broadcast_send, broadcast_recv) = mpsc::channel(5);
@ -168,7 +163,7 @@ impl TendermintMachine {
proposer, proposer,
personal_proposal: None, personal_proposal: None,
height, number,
log_map: HashMap::new(), log_map: HashMap::new(),
precommitted: HashMap::new(), precommitted: HashMap::new(),
@ -177,7 +172,7 @@ impl TendermintMachine {
valid: None, valid: None,
round: 0, round: 0,
step: Step::Propose step: Step::Propose,
}; };
loop { loop {
@ -193,106 +188,31 @@ impl TendermintMachine {
let now = Instant::now(); let now = Instant::now();
let (t1, t2, t3) = { let (t1, t2, t3) = {
let timeouts = self.timeouts.read().unwrap(); let timeouts = self.timeouts.read().unwrap();
let ready = |step| timeouts.get(step).unwrap_or(now) < now; let ready = |step| timeouts.get(step).unwrap_or(now) < now;
(ready(Step::Propose), ready(Step::Prevote), ready(Step::Precommit)) (ready(Step::Propose), ready(Step::Prevote), ready(Step::Precommit))
}; };
if t1 { // Propose timeout if t1 { // Propose timeout
} }
if t2 { // Prevote timeout if t2 { // Prevote timeout
} }
if t3 { // Precommit timeout if t3 { // Precommit timeout
} }
match recv.try_recv() { match recv.try_recv() {
Ok(msg) => machine.message(msg), Ok(msg) => machine.message(msg),
Err(TryRecvError::Empty) => tokio::yield_now().await, Err(TryRecvError::Empty) => tokio::yield_now().await,
Err(TryRecvError::Disconnected) => break Err(TryRecvError::Disconnected) => break,
} }
} }
}) }),
} }
} }
// Returns true if it's a new message
fn log(&mut self, msg: Message) -> Result<bool, TendermintError> {
if matches!(msg.data, Data::Proposal(..)) && (msg.sender != proposer(msg.height, msg.round)) {
Err(TendermintError::Malicious(msg.sender))?;
};
if !self.log_map.contains_key(&msg.round) {
self.log_map.insert(msg.round, HashMap::new());
}
let log = self.log_map.get_mut(&msg.round).unwrap();
if !log.contains_key(&msg.sender) {
log.insert(msg.sender, HashMap::new());
}
let log = log.get_mut(&msg.sender).unwrap();
// Handle message replays without issue. It's only multiple messages which is malicious
let step = msg.data.step();
if let Some(existing) = log.get(&step) {
if existing != &msg.data {
Err(TendermintError::Malicious(msg.sender))?;
}
return Ok(false);
}
// If they already precommitted to a distinct hash, error
if let Data::Precommit(Some(hash)) = msg.data {
if let Some(prev) = self.precommitted.get(&msg.sender) {
if hash != *prev {
Err(TendermintError::Malicious(msg.sender))?;
}
}
self.precommitted.insert(msg.sender, hash);
}
log.insert(step, msg.data);
Ok(true)
}
fn message_instances(&self, round: u32, data: Data) -> (usize, usize) {
let participating = 0;
let weight = 0;
for participant in self.log_map[&round].values() {
if let Some(msg) = participant.get(&data.step()) {
let validator_weight = 1; // TODO
participating += validator_weight;
if &data == msg {
weight += validator_weight;
}
// If the msg exists, yet has a distinct hash, this validator is faulty
// (at least for precommit)
// TODO
}
}
(participating, weight)
}
fn participation(&self, round: u32, step: Step) -> usize {
let (participating, _) = self.message_instances(round, match step {
Step::Propose => panic!("Checking for participation on Propose"),
Step::Prevote => Data::Prevote(None),
Step::Precommit => Data::Precommit(None),
});
participating
}
fn has_participation(&self, round: u32, step: Step) -> bool {
self.participation(round, step) >= ((VALIDATORS / 3 * 2) + 1).into()
}
fn has_consensus(&self, round: u32, data: Data) -> bool {
let (_, weight) = self.message_instances(round, data);
weight >= ((VALIDATORS / 3 * 2) + 1).into()
}
// 49-54 // 49-54
fn check_committed(&mut self, round_num: u32) -> Option<Block> { fn check_committed(&mut self, round_num: u32) -> Option<Block> {
let proposer = proposer(self.height, round_num); let proposer = proposer(self.number, round_num);
// Safe as we only check for rounds which we received a message for // Safe as we only check for rounds which we received a message for
let round = self.log_map[&round_num]; let round = self.log_map[&round_num];
@ -302,7 +222,8 @@ if t3 { // Precommit timeout
debug_assert!(matches!(proposal, Data::Proposal(..))); debug_assert!(matches!(proposal, Data::Proposal(..)));
if let Data::Proposal(_, block) = proposal { if let Data::Proposal(_, block) = proposal {
// Check if it has gotten a sufficient amount of precommits // Check if it has gotten a sufficient amount of precommits
let (participants, weight) = self.message_instances(round_num, Data::Precommit(Some(block.hash))); let (participants, weight) =
self.message_instances(round_num, Data::Precommit(Some(block.hash)));
let threshold = ((VALIDATORS / 3) * 2) + 1; let threshold = ((VALIDATORS / 3) * 2) + 1;
if weight >= threshold.into() { if weight >= threshold.into() {
@ -310,8 +231,12 @@ if t3 { // Precommit timeout
return Some(*block); return Some(*block);
} }
if (participants >= threshold.into()) && first { // 47-48
schedule timeoutPrecommit(self.height, round); if participants >= threshold.into() {
let map = self.timeouts.write().unwrap();
if !map.contains_key(Step::Precommit) {
map.insert(Step::Precommit, self.timeout(Step::Precommit));
}
} }
} }
} }
@ -320,10 +245,14 @@ if t3 { // Precommit timeout
} }
fn message(&mut self, msg: Message) -> Result<Option<Block>, TendermintError> { fn message(&mut self, msg: Message) -> Result<Option<Block>, TendermintError> {
if msg.height != self.height { if msg.number != self.number {
Err(TendermintError::Temporal)?; Err(TendermintError::Temporal)?;
} }
if matches!(msg.data, Data::Proposal(..)) && (msg.sender != proposer(msg.height, msg.round)) {
Err(TendermintError::Malicious(msg.sender))?;
};
if !self.log(msg)? { if !self.log(msg)? {
return Ok(None); return Ok(None);
} }
@ -352,18 +281,23 @@ if t3 { // Precommit timeout
} }
if self.step == Step::Propose { if self.step == Step::Propose {
if let Some(proposal) = round.get(&proposer(self.height, self.round)).map(|p| p.get(&Step::Propose)).flatten() { if let Some(proposal) =
round.get(&proposer(self.number, self.round)).map(|p| p.get(&Step::Propose)).flatten()
{
debug_assert!(matches!(proposal, Data::Proposal(..))); debug_assert!(matches!(proposal, Data::Proposal(..)));
if let Data::Proposal(vr, block) = proposal { if let Data::Proposal(vr, block) = proposal {
if let Some(vr) = vr { if let Some(vr) = vr {
// 28-33 // 28-33
let vr = *vr; let vr = *vr;
if (vr < self.round) && self.has_consensus(vr, Data::Prevote(Some(block.hash))) { if (vr < self.round) && self.has_consensus(vr, Data::Prevote(Some(block.hash))) {
debug_assert!(self.broadcast( debug_assert!(self
Data::Prevote( .broadcast(Data::Prevote(Some(block.hash).filter(|_| {
Some(block.hash).filter(|_| self.locked.map(|(round, value)| (round <= vr) || (block == &value)).unwrap_or(true)) self
) .locked
).is_none()); .map(|(round, value)| (round <= vr) || (block == &value))
.unwrap_or(true)
})))
.is_none());
self.step = Step::Prevote; self.step = Step::Prevote;
} else { } else {
Err(TendermintError::Malicious(msg.sender))?; Err(TendermintError::Malicious(msg.sender))?;
@ -371,7 +305,11 @@ if t3 { // Precommit timeout
} else { } else {
// 22-27 // 22-27
valid(&block).map_err(|_| TendermintError::Malicious(msg.sender))?; valid(&block).map_err(|_| TendermintError::Malicious(msg.sender))?;
debug_assert!(self.broadcast(Data::Prevote(Some(block.hash).filter(|_| self.locked.is_none() || self.locked.map(|locked| &locked.1) == Some(block)))).is_none()); debug_assert!(self
.broadcast(Data::Prevote(Some(block.hash).filter(
|_| self.locked.is_none() || self.locked.map(|locked| &locked.1) == Some(block)
)))
.is_none());
self.step = Step::Prevote; self.step = Step::Prevote;
} }
} }
@ -381,8 +319,11 @@ if t3 { // Precommit timeout
if self.step == Step::Prevote { if self.step == Step::Prevote {
let (participation, weight) = self.message_instances(self.round, Data::Prevote(None)); let (participation, weight) = self.message_instances(self.round, Data::Prevote(None));
// 34-35 // 34-35
if (participation > (((VALIDATORS / 3) * 2) + 1).into()) && first { if participation > (((VALIDATORS / 3) * 2) + 1).into() {
self.timeouts.write().unwrap().insert(Step::Prevote, self.timeout(Step::Prevote)); let map = self.timeouts.write().unwrap();
if !map.contains_key(Step::Prevote) {
map.insert(Step::Prevote, self.timeout(Step::Prevote))
}
} }
// 44-46 // 44-46
@ -392,11 +333,7 @@ if t3 { // Precommit timeout
} }
} }
// 47-48
if self.has_participation(self.round, Step::Precommit) && first {
self.timeouts.write().unwrap().insert(Step::Precommit, self.timeout(Step::Precommit));
}
Ok(None) Ok(None)
} }
} }
*/

View file

@ -0,0 +1,84 @@
use std::{sync::Arc, collections::HashMap};
use crate::{ext::*, Round, Step, Data, Message, TendermintError};
pub(crate) struct MessageLog<V: ValidatorId, B: Block, N: Network<V, B>> {
network: Arc<N>,
precommitted: HashMap<V, B::Id>,
log: HashMap<Round, HashMap<V, HashMap<Step, Data<B>>>>,
}
impl<V: ValidatorId, B: Block, N: Network<V, B>> MessageLog<V, B, N> {
pub(crate) fn new(network: Arc<N>) -> MessageLog<V, B, N> {
MessageLog { network, precommitted: HashMap::new(), log: HashMap::new() }
}
// Returns true if it's a new message
pub(crate) fn log(&mut self, msg: Message<V, B>) -> Result<bool, TendermintError<V>> {
let round = self.log.entry(msg.round).or_insert_with(HashMap::new);
let msgs = round.entry(msg.sender).or_insert_with(HashMap::new);
// Handle message replays without issue. It's only multiple messages which is malicious
let step = msg.data.step();
if let Some(existing) = msgs.get(&step) {
if existing != &msg.data {
Err(TendermintError::Malicious(msg.sender))?;
}
return Ok(false);
}
// If they already precommitted to a distinct hash, error
if let Data::Precommit(Some(hash)) = msg.data {
if let Some(prev) = self.precommitted.get(&msg.sender) {
if hash != *prev {
Err(TendermintError::Malicious(msg.sender))?;
}
}
self.precommitted.insert(msg.sender, hash);
}
msgs.insert(step, msg.data);
Ok(true)
}
// For a given round, return the participating weight for this step, and the weight agreeing with
// the data.
pub(crate) fn message_instances(&self, round: Round, data: Data<B>) -> (u64, u64) {
let mut participating = 0;
let mut weight = 0;
for (participant, msgs) in &self.log[&round] {
if let Some(msg) = msgs.get(&data.step()) {
let validator_weight = self.network.weight(*participant);
participating += validator_weight;
if &data == msg {
weight += validator_weight;
}
}
}
(participating, weight)
}
// Get the participation in a given round for a given step.
pub(crate) fn participation(&self, round: Round, step: Step) -> u64 {
let (participating, _) = self.message_instances(
round,
match step {
Step::Propose => panic!("Checking for participation on Propose"),
Step::Prevote => Data::Prevote(None),
Step::Precommit => Data::Precommit(None),
},
);
participating
}
// Check if there's been a BFT level of participation
pub(crate) fn has_participation(&self, round: Round, step: Step) -> bool {
self.participation(round, step) >= self.network.threshold()
}
// Check if consensus has been reached on a specific piece of data
pub(crate) fn has_consensus(&self, round: Round, data: Data<B>) -> bool {
let (_, weight) = self.message_instances(round, data);
weight >= self.network.threshold()
}
}

View file

@ -0,0 +1,29 @@
use tendermint_machine::ext::{BlockError, Block, Network};
#[derive(Clone, PartialEq)]
struct TestBlock {
id: u32,
valid: Result<(), BlockError>,
}
impl Block for TestBlock {
type Id = u32;
fn id(&self) -> u32 {
self.id
}
}
struct TestNetwork;
impl Network<u16, TestBlock> for TestNetwork {
fn total_weight(&self) -> u64 {
5
}
fn weight(&self, id: u16) -> u64 {
[1, 1, 1, 1, 1][usize::try_from(id).unwrap()]
}
fn validate(&mut self, block: TestBlock) -> Result<(), BlockError> {
block.valid
}
}