Move from a yielding loop to select in tendermint-machine

This commit is contained in:
Luke Parker 2022-11-08 22:37:06 -05:00
parent 2cb1d35d89
commit 1c8192218a
No known key found for this signature in database
GPG key ID: F9F1386DB1E119B6

View file

@ -8,8 +8,11 @@ use std::{
use parity_scale_codec::{Encode, Decode};
use futures::{task::Poll, future, StreamExt, channel::mpsc};
use futures::{
FutureExt, StreamExt,
future::{self, Fuse},
channel::mpsc,
};
use tokio::time::sleep;
/// Traits and types of the external network being integrated with to provide consensus over.
@ -111,10 +114,8 @@ pub struct TendermintMachine<N: Network> {
start_time: Instant,
personal_proposal: N::Block,
queue: VecDeque<(
bool,
Message<N::ValidatorId, N::Block, <N::SignatureScheme as SignatureScheme>::Signature>,
)>,
queue:
VecDeque<Message<N::ValidatorId, N::Block, <N::SignatureScheme as SignatureScheme>::Signature>>,
msg_recv: mpsc::UnboundedReceiver<
SignedMessage<N::ValidatorId, N::Block, <N::SignatureScheme as SignatureScheme>::Signature>,
>,
@ -181,10 +182,12 @@ impl<N: Network + 'static> TendermintMachine<N> {
let step = data.step();
// 27, 33, 41, 46, 60, 64
self.step = step;
self.queue.push_back((
true,
Message { sender: self.validator_id, number: self.number, round: self.round, data },
));
self.queue.push_back(Message {
sender: self.validator_id,
number: self.number,
round: self.round,
data,
});
}
// 14-21
@ -234,7 +237,7 @@ impl<N: Network + 'static> TendermintMachine<N> {
self.start_time = round_end;
self.personal_proposal = proposal;
self.queue = self.queue.drain(..).filter(|msg| msg.1.number == self.number).collect();
self.queue = self.queue.drain(..).filter(|msg| msg.number == self.number).collect();
self.log = MessageLog::new(self.weights.clone());
self.end_time = HashMap::new();
@ -322,17 +325,18 @@ impl<N: Network + 'static> TendermintMachine<N> {
pub async fn run(mut self) {
self.round(Round(0));
'outer: loop {
// Check if any timeouts have been triggered
loop {
// Create futures for the various timeouts
let timeout_future = |step| {
let timeout = self.timeouts.get(&step).copied();
async move {
(async move {
if let Some(timeout) = timeout {
sleep(timeout.saturating_duration_since(Instant::now())).await
} else {
future::pending::<()>().await
}
}
})
.fuse()
};
tokio::pin! {
let propose_timeout = timeout_future(Step::Propose);
@ -340,41 +344,56 @@ impl<N: Network + 'static> TendermintMachine<N> {
let precommit_timeout = timeout_future(Step::Precommit);
};
// Propose timeout
if futures::poll!(&mut propose_timeout).is_ready() && (self.step == Step::Propose) {
self.broadcast(Data::Prevote(None));
}
// Also create a future for if the queue has a message
// Does not pop_front as if another message has higher priority, its future will be handled
// instead in this loop, and the popped value would be dropped with the next iteration
// While no other message has a higher priority right now, this is a safer practice
let mut queue_future =
if self.queue.is_empty() { Fuse::terminated() } else { future::ready(()).fuse() };
// Prevote timeout
if futures::poll!(&mut prevote_timeout).is_ready() && (self.step == Step::Prevote) {
self.broadcast(Data::Precommit(None));
}
if let Some((broadcast, msg)) = futures::select_biased! {
// Handle our messages
_ = queue_future => {
Some((true, self.queue.pop_front().unwrap()))
},
// Precommit timeout
if futures::poll!(&mut precommit_timeout).is_ready() {
self.round(Round(self.round.0.wrapping_add(1)));
}
// Handle any timeouts
_ = &mut propose_timeout => {
// Remove the timeout so it doesn't persist, always being the selected future due to bias
// While this does enable the below get_entry calls to enter timeouts again, they'll
// never attempt to add a timeout after this timeout has expired
self.timeouts.remove(&Step::Propose);
if self.step == Step::Propose {
self.broadcast(Data::Prevote(None));
}
None
},
_ = &mut prevote_timeout => {
self.timeouts.remove(&Step::Prevote);
if self.step == Step::Prevote {
self.broadcast(Data::Precommit(None));
}
None
},
_ = &mut precommit_timeout => {
// Technically unnecessary since round() will clear the timeouts
self.timeouts.remove(&Step::Precommit);
self.round(Round(self.round.0.wrapping_add(1)));
continue;
},
// Drain the channel of messages
loop {
match futures::poll!(self.msg_recv.next()) {
Poll::Ready(Some(msg)) => {
// Handle any received messages
msg = self.msg_recv.next() => {
if let Some(msg) = msg {
if !msg.verify_signature(&self.validators) {
continue;
}
self.queue.push_back((false, msg.msg));
}
Poll::Ready(None) => {
break 'outer;
}
Poll::Pending => {
Some((false, msg.msg))
} else {
break;
}
}
}
// Handle the queue
if let Some((broadcast, msg)) = self.queue.pop_front() {
} {
let res = self.message(msg.clone()).await;
if res.is_err() && broadcast {
panic!("honest node had invalid behavior");
@ -416,9 +435,6 @@ impl<N: Network + 'static> TendermintMachine<N> {
self.network.broadcast(SignedMessage { msg, sig }).await;
}
}
// futures::pending here does not work
tokio::task::yield_now().await;
}
}