Have the machine respond to advances made by an external sync loop

This commit is contained in:
Luke Parker 2022-11-12 05:35:41 -05:00
parent ca3a29f616
commit b53759c6ec
No known key found for this signature in database
GPG key ID: F9F1386DB1E119B6
3 changed files with 39 additions and 2 deletions
substrate/tendermint/machine

View file

@ -121,7 +121,7 @@ impl<S: SignatureScheme> SignatureScheme for Arc<S> {
/// a valid commit.
#[derive(Clone, PartialEq, Debug, Encode, Decode)]
pub struct Commit<S: SignatureScheme> {
/// End time of the round, used as the start time of next round.
/// End time of the round which created this commit, used as the start time of the next block.
pub end_time: u64,
/// Validators participating in the signature.
pub validators: Vec<S::ValidatorId>,

View file

@ -119,6 +119,7 @@ pub struct TendermintMachine<N: Network> {
msg_recv: mpsc::UnboundedReceiver<
SignedMessage<N::ValidatorId, N::Block, <N::SignatureScheme as SignatureScheme>::Signature>,
>,
step_recv: mpsc::UnboundedReceiver<(Commit<N::SignatureScheme>, N::Block)>,
log: MessageLog<N>,
slashes: HashSet<N::ValidatorId>,
@ -142,6 +143,9 @@ pub type MessageSender<N> = mpsc::UnboundedSender<
/// A Tendermint machine and its channel to receive messages from the gossip layer over.
pub struct TendermintHandle<N: Network> {
/// Channel to trigger the machine to move to the next height.
/// Takes in the the previous block's commit, along with the new proposal.
pub step: mpsc::UnboundedSender<(Commit<N::SignatureScheme>, N::Block)>,
/// Channel to send messages received from the P2P layer.
pub messages: MessageSender<N>,
/// Tendermint machine to be run on an asynchronous task.
@ -251,6 +255,26 @@ impl<N: Network + 'static> TendermintMachine<N> {
self.round(Round(0));
}
async fn reset_by_commit(&mut self, commit: Commit<N::SignatureScheme>, proposal: N::Block) {
// Determine the Round number this commit ended on
let mut round = Round(0);
// Use < to prevent an infinite loop
while self.canonical_end_time(round) < commit.end_time {
round.0 += 1;
}
debug_assert_eq!(
self.canonical_end_time(round),
commit.end_time,
"resetting by commit for a different block"
);
// Populate the various pieces of round info
if self.round.0 < round.0 {
self.round(round);
}
self.reset(round, proposal).await;
}
async fn slash(&mut self, validator: N::ValidatorId) {
if !self.slashes.contains(&validator) {
self.slashes.insert(validator);
@ -268,7 +292,9 @@ impl<N: Network + 'static> TendermintMachine<N> {
proposal: N::Block,
) -> TendermintHandle<N> {
let (msg_send, msg_recv) = mpsc::unbounded();
let (step_send, step_recv) = mpsc::unbounded();
TendermintHandle {
step: step_send,
messages: msg_send,
machine: {
let last_end = UNIX_EPOCH + Duration::from_secs(last.1);
@ -317,6 +343,7 @@ impl<N: Network + 'static> TendermintMachine<N> {
queue: VecDeque::new(),
msg_recv,
step_recv,
log: MessageLog::new(weights),
slashes: HashSet::new(),
@ -364,6 +391,16 @@ impl<N: Network + 'static> TendermintMachine<N> {
if self.queue.is_empty() { Fuse::terminated() } else { future::ready(()).fuse() };
if let Some((broadcast, msg)) = futures::select_biased! {
// Handle a new height occuring externally (an external sync loop)
msg = self.step_recv.next() => {
if let Some((commit, proposal)) = msg {
self.reset_by_commit(commit, proposal).await;
None
} else {
break;
}
},
// Handle our messages
_ = queue_future => {
Some((true, self.queue.pop_front().unwrap()))

View file

@ -152,7 +152,7 @@ impl TestNetwork {
let mut write = arc.write().await;
for i in 0 .. validators {
let i = u16::try_from(i).unwrap();
let TendermintHandle { messages, machine } = TendermintMachine::new(
let TendermintHandle { messages, machine, .. } = TendermintMachine::new(
TestNetwork(i, arc.clone()),
(BlockNumber(1), (SystemTime::now().duration_since(UNIX_EPOCH)).unwrap().as_secs()),
TestBlock { id: 1u32.to_le_bytes(), valid: Ok(()) },