Move more code into block.rs

Introduces type-aliases to obtain Data/Message/SignedMessage solely from 
a Network object.

Fixes a bug regarding stepping when you're not an active validator.
This commit is contained in:
Luke Parker 2022-11-13 18:11:09 -05:00
parent 4ba469e653
commit c13e0c75ae
No known key found for this signature in database
GPG key ID: F9F1386DB1E119B6
5 changed files with 106 additions and 92 deletions
substrate/tendermint/machine

View file

@ -8,6 +8,7 @@ use crate::{
ext::{RoundNumber, BlockNumber, Block, Network},
round::RoundData,
message_log::MessageLog,
Step, Data, DataFor, Message, MessageFor,
};
pub(crate) struct BlockData<N: Network> {
@ -56,4 +57,71 @@ impl<N: Network> BlockData<N> {
pub(crate) fn round_mut(&mut self) -> &mut RoundData<N> {
self.round.as_mut().unwrap()
}
pub(crate) fn populate_end_time(&mut self, round: RoundNumber) {
for r in (self.round().number.0 + 1) .. round.0 {
self.end_time.insert(
RoundNumber(r),
RoundData::<N>::new(RoundNumber(r), self.end_time[&RoundNumber(r - 1)]).end_time(),
);
}
}
// Start a new round. Optionally takes in the time for when this is the first round, and the time
// isn't simply the time of the prior round (yet rather the prior block). Returns the proposal
// data, if we are the proposer.
pub(crate) fn new_round(
&mut self,
round: RoundNumber,
proposer: N::ValidatorId,
time: Option<CanonicalInstant>,
) -> Option<DataFor<N>> {
debug_assert_eq!(round.0 == 0, time.is_some());
// If skipping rounds, populate end_time
if round.0 != 0 {
self.populate_end_time(round);
}
// 11-13
self.round = Some(RoundData::<N>::new(
round,
time.unwrap_or_else(|| self.end_time[&RoundNumber(round.0 - 1)]),
));
self.end_time.insert(round, self.round().end_time());
// 14-21
if Some(proposer) == self.validator_id {
let (round, block) = if let Some((round, block)) = &self.valid {
(Some(*round), block.clone())
} else {
(None, self.proposal.clone())
};
Some(Data::Proposal(round, block))
} else {
self.round_mut().set_timeout(Step::Propose);
None
}
}
// Transform Data into an actual Message, using the contextual data from this block
pub(crate) fn message(&mut self, data: DataFor<N>) -> Option<MessageFor<N>> {
debug_assert_eq!(
self.round().step,
match data.step() {
Step::Propose | Step::Prevote => Step::Propose,
Step::Precommit => Step::Prevote,
},
);
// 27, 33, 41, 46, 60, 64
self.round_mut().step = data.step();
// Only return a message to if we're actually a current validator
self.validator_id.map(|validator_id| Message {
sender: validator_id,
number: self.number,
round: self.round().number,
data,
})
}
}

View file

@ -6,7 +6,7 @@ use thiserror::Error;
use parity_scale_codec::{Encode, Decode};
use crate::{SignedMessage, commit_msg};
use crate::{SignedMessageFor, commit_msg};
/// An alias for a series of traits required for a type to be usable as a validator ID,
/// automatically implemented for all types satisfying those traits.
@ -249,14 +249,7 @@ pub trait Network: Send + Sync {
/// established, this will double-authenticate. Switching to unauthenticated channels in a system
/// already providing authenticated channels is not recommended as this is a minor, temporal
/// inefficiency while downgrading channels may have wider implications.
async fn broadcast(
&mut self,
msg: SignedMessage<
Self::ValidatorId,
Self::Block,
<Self::SignatureScheme as SignatureScheme>::Signature,
>,
);
async fn broadcast(&mut self, msg: SignedMessageFor<Self>);
/// Trigger a slash for the validator in question who was definitively malicious.
/// The exact process of triggering a slash is undefined and left to the network as a whole.

View file

@ -19,7 +19,6 @@ mod time;
use time::{sys_time, CanonicalInstant};
mod round;
use round::RoundData;
mod block;
use block::BlockData;
@ -108,6 +107,21 @@ enum TendermintError<V: ValidatorId> {
Temporal,
}
// Type aliases to abstract over generic hell
pub(crate) type DataFor<N> =
Data<<N as Network>::Block, <<N as Network>::SignatureScheme as SignatureScheme>::Signature>;
pub(crate) type MessageFor<N> = Message<
<N as Network>::ValidatorId,
<N as Network>::Block,
<<N as Network>::SignatureScheme as SignatureScheme>::Signature,
>;
/// Type alias to the SignedMessage type for a given Network
pub type SignedMessageFor<N> = SignedMessage<
<N as Network>::ValidatorId,
<N as Network>::Block,
<<N as Network>::SignatureScheme as SignatureScheme>::Signature,
>;
/// A machine executing the Tendermint protocol.
pub struct TendermintMachine<N: Network> {
network: N,
@ -115,11 +129,8 @@ pub struct TendermintMachine<N: Network> {
validators: N::SignatureScheme,
weights: Arc<N::Weights>,
queue:
VecDeque<Message<N::ValidatorId, N::Block, <N::SignatureScheme as SignatureScheme>::Signature>>,
msg_recv: mpsc::UnboundedReceiver<
SignedMessage<N::ValidatorId, N::Block, <N::SignatureScheme as SignatureScheme>::Signature>,
>,
queue: VecDeque<MessageFor<N>>,
msg_recv: mpsc::UnboundedReceiver<SignedMessageFor<N>>,
step_recv: mpsc::UnboundedReceiver<(Commit<N::SignatureScheme>, N::Block)>,
block: BlockData<N>,
@ -128,13 +139,7 @@ pub struct TendermintMachine<N: Network> {
pub type StepSender<N> =
mpsc::UnboundedSender<(Commit<<N as Network>::SignatureScheme>, <N as Network>::Block)>;
pub type MessageSender<N> = mpsc::UnboundedSender<
SignedMessage<
<N as Network>::ValidatorId,
<N as Network>::Block,
<<N as Network>::SignatureScheme as SignatureScheme>::Signature,
>,
>;
pub type MessageSender<N> = mpsc::UnboundedSender<SignedMessageFor<N>>;
/// A Tendermint machine and its channel to receive messages from the gossip layer over.
pub struct TendermintHandle<N: Network> {
@ -148,58 +153,20 @@ pub struct TendermintHandle<N: Network> {
}
impl<N: Network + 'static> TendermintMachine<N> {
fn broadcast(
&mut self,
data: Data<N::Block, <N::SignatureScheme as SignatureScheme>::Signature>,
) {
if let Some(validator_id) = self.block.validator_id {
// 27, 33, 41, 46, 60, 64
self.block.round_mut().step = data.step();
self.queue.push_back(Message {
sender: validator_id,
number: self.block.number,
round: self.block.round().number,
data,
});
}
}
fn populate_end_time(&mut self, round: RoundNumber) {
for r in (self.block.round().number.0 + 1) .. round.0 {
self.block.end_time.insert(
RoundNumber(r),
RoundData::<N>::new(RoundNumber(r), self.block.end_time[&RoundNumber(r - 1)]).end_time(),
);
fn broadcast(&mut self, data: DataFor<N>) {
if let Some(msg) = self.block.message(data) {
self.queue.push_back(msg);
}
}
// Start a new round. Returns true if we were the proposer
fn round(&mut self, round: RoundNumber, time: Option<CanonicalInstant>) -> bool {
// If skipping rounds, populate end_time
if round.0 != 0 {
self.populate_end_time(round);
}
// 11-13
self.block.round = Some(RoundData::<N>::new(
round,
time.unwrap_or_else(|| self.block.end_time[&RoundNumber(round.0 - 1)]),
));
self.block.end_time.insert(round, self.block.round().end_time());
// 14-21
if Some(self.weights.proposer(self.block.number, self.block.round().number)) ==
self.block.validator_id
if let Some(data) =
self.block.new_round(round, self.weights.proposer(self.block.number, round), time)
{
let (round, block) = if let Some((round, block)) = &self.block.valid {
(Some(*round), block.clone())
} else {
(None, self.block.proposal.clone())
};
self.broadcast(Data::Proposal(round, block));
self.broadcast(data);
true
} else {
self.block.round_mut().set_timeout(Step::Propose);
false
}
}
@ -207,7 +174,7 @@ impl<N: Network + 'static> TendermintMachine<N> {
// 53-54
async fn reset(&mut self, end_round: RoundNumber, proposal: N::Block) {
// Ensure we have the end time data for the last round
self.populate_end_time(end_round);
self.block.populate_end_time(end_round);
// Sleep until this round ends
let round_end = self.block.end_time[&end_round];
@ -233,7 +200,7 @@ impl<N: Network + 'static> TendermintMachine<N> {
// If this commit is for a round we don't have, jump up to it
while self.block.end_time[&round].canonical() < commit.end_time {
round.0 += 1;
self.populate_end_time(round);
self.block.populate_end_time(round);
}
// If this commit is for a prior round, find it
while self.block.end_time[&round].canonical() > commit.end_time {
@ -417,7 +384,7 @@ impl<N: Network + 'static> TendermintMachine<N> {
&self,
sender: N::ValidatorId,
round: RoundNumber,
data: &Data<N::Block, <N::SignatureScheme as SignatureScheme>::Signature>,
data: &DataFor<N>,
) -> Result<(), TendermintError<N::ValidatorId>> {
if let Data::Precommit(Some((id, sig))) = data {
// Also verify the end_time of the commit
@ -435,7 +402,7 @@ impl<N: Network + 'static> TendermintMachine<N> {
async fn message(
&mut self,
msg: Message<N::ValidatorId, N::Block, <N::SignatureScheme as SignatureScheme>::Signature>,
msg: MessageFor<N>,
) -> Result<Option<N::Block>, TendermintError<N::ValidatorId>> {
if msg.number != self.block.number {
Err(TendermintError::Temporal)?;

View file

@ -1,6 +1,6 @@
use std::{sync::Arc, collections::HashMap};
use crate::{ext::*, RoundNumber, Step, Data, Message, TendermintError};
use crate::{ext::*, RoundNumber, Step, Data, DataFor, MessageFor, TendermintError};
pub(crate) struct MessageLog<N: Network> {
weights: Arc<N::Weights>,
@ -8,13 +8,7 @@ pub(crate) struct MessageLog<N: Network> {
N::ValidatorId,
(<N::Block as Block>::Id, <N::SignatureScheme as SignatureScheme>::Signature),
>,
pub(crate) log: HashMap<
RoundNumber,
HashMap<
N::ValidatorId,
HashMap<Step, Data<N::Block, <N::SignatureScheme as SignatureScheme>::Signature>>,
>,
>,
pub(crate) log: HashMap<RoundNumber, HashMap<N::ValidatorId, HashMap<Step, DataFor<N>>>>,
}
impl<N: Network> MessageLog<N> {
@ -25,7 +19,7 @@ impl<N: Network> MessageLog<N> {
// Returns true if it's a new message
pub(crate) fn log(
&mut self,
msg: Message<N::ValidatorId, N::Block, <N::SignatureScheme as SignatureScheme>::Signature>,
msg: MessageFor<N>,
) -> Result<bool, TendermintError<N::ValidatorId>> {
let round = self.log.entry(msg.round).or_insert_with(HashMap::new);
let msgs = round.entry(msg.sender).or_insert_with(HashMap::new);
@ -55,11 +49,7 @@ impl<N: Network> MessageLog<N> {
// For a given round, return the participating weight for this step, and the weight agreeing with
// the data.
pub(crate) fn message_instances(
&self,
round: RoundNumber,
data: Data<N::Block, <N::SignatureScheme as SignatureScheme>::Signature>,
) -> (u64, u64) {
pub(crate) fn message_instances(&self, round: RoundNumber, data: DataFor<N>) -> (u64, u64) {
let mut participating = 0;
let mut weight = 0;
for (participant, msgs) in &self.log[&round] {
@ -97,11 +87,7 @@ impl<N: Network> MessageLog<N> {
}
// Check if consensus has been reached on a specific piece of data
pub(crate) fn has_consensus(
&self,
round: RoundNumber,
data: Data<N::Block, <N::SignatureScheme as SignatureScheme>::Signature>,
) -> bool {
pub(crate) fn has_consensus(&self, round: RoundNumber, data: DataFor<N>) -> bool {
let (_, weight) = self.message_instances(round, data);
weight >= self.weights.threshold()
}
@ -111,7 +97,7 @@ impl<N: Network> MessageLog<N> {
round: RoundNumber,
sender: N::ValidatorId,
step: Step,
) -> Option<&Data<N::Block, <N::SignatureScheme as SignatureScheme>::Signature>> {
) -> Option<&DataFor<N>> {
self.log.get(&round).and_then(|round| round.get(&sender).and_then(|msgs| msgs.get(&step)))
}
}

View file

@ -11,7 +11,7 @@ use futures::SinkExt;
use tokio::{sync::RwLock, time::sleep};
use tendermint_machine::{
ext::*, SignedMessage, StepSender, MessageSender, TendermintMachine, TendermintHandle,
ext::*, SignedMessageFor, StepSender, MessageSender, TendermintMachine, TendermintHandle,
};
type TestValidatorId = u16;
@ -120,7 +120,7 @@ impl Network for TestNetwork {
TestWeights
}
async fn broadcast(&mut self, msg: SignedMessage<TestValidatorId, Self::Block, [u8; 32]>) {
async fn broadcast(&mut self, msg: SignedMessageFor<Self>) {
for (messages, _) in self.1.write().await.iter_mut() {
messages.send(msg.clone()).await.unwrap();
}