Use futures mpsc instead of tokio

This commit is contained in:
Luke Parker 2022-11-08 21:14:03 -05:00
parent 16a2c9a2dc
commit 56a21ca6a6
No known key found for this signature in database
GPG key ID: F9F1386DB1E119B6
5 changed files with 137 additions and 113 deletions
Cargo.lock
substrate/tendermint
client/src/authority
machine

1
Cargo.lock generated
View file

@ -8893,6 +8893,7 @@ name = "tendermint-machine"
version = "0.1.0"
dependencies = [
"async-trait",
"futures",
"parity-scale-codec",
"sp-runtime",
"thiserror",

View file

@ -8,7 +8,7 @@ use async_trait::async_trait;
use log::{warn, error};
use futures::{
StreamExt,
SinkExt, StreamExt,
channel::mpsc::{self, UnboundedSender},
};
@ -170,7 +170,7 @@ impl<T: TendermintValidator> TendermintAuthority<T> {
let (gossip_tx, mut gossip_rx) = mpsc::unbounded();
// Create the Tendermint machine
let handle = {
let mut handle = {
// Set this struct as active
*self.import.providers.write().await = Some(providers);
self.active = Some(ActiveAuthority {

View file

@ -13,6 +13,7 @@ thiserror = "1"
parity-scale-codec = { version = "3.2", features = ["derive"] }
futures = "0.3"
tokio = { version = "1", features = ["macros", "sync", "time", "rt"] }
sp-runtime = { git = "https://github.com/serai-dex/substrate", version = "6.0.0", optional = true }

View file

@ -3,16 +3,14 @@ use core::fmt::Debug;
use std::{
sync::Arc,
time::{UNIX_EPOCH, SystemTime, Instant, Duration},
collections::HashMap,
collections::{VecDeque, HashMap},
};
use parity_scale_codec::{Encode, Decode};
use tokio::{
task::{JoinHandle, yield_now},
sync::mpsc::{self, error::TryRecvError},
time::sleep,
};
use futures::{task::Poll, StreamExt, channel::mpsc};
use tokio::time::sleep;
/// Traits and types of the external network being integrated with to provide consensus over.
pub mod ext;
@ -113,10 +111,13 @@ pub struct TendermintMachine<N: Network> {
start_time: Instant,
personal_proposal: N::Block,
queue: Vec<(
queue: VecDeque<(
bool,
Message<N::ValidatorId, N::Block, <N::SignatureScheme as SignatureScheme>::Signature>,
)>,
msg_recv: mpsc::UnboundedReceiver<
SignedMessage<N::ValidatorId, N::Block, <N::SignatureScheme as SignatureScheme>::Signature>,
>,
log: MessageLog<N>,
round: Round,
@ -129,15 +130,20 @@ pub struct TendermintMachine<N: Network> {
timeouts: HashMap<Step, Instant>,
}
/// A handle to an asynchronous task, along with a channel to inform of it of messages received.
pub type MessageSender<N> = mpsc::UnboundedSender<
SignedMessage<
<N as Network>::ValidatorId,
<N as Network>::Block,
<<N as Network>::SignatureScheme as SignatureScheme>::Signature,
>,
>;
/// A Tendermint machine and its channel to receive messages from the gossip layer over.
pub struct TendermintHandle<N: Network> {
/// Channel to send messages received from the P2P layer.
pub messages: mpsc::Sender<
SignedMessage<N::ValidatorId, N::Block, <N::SignatureScheme as SignatureScheme>::Signature>,
>,
/// Handle for the asynchronous task executing the machine. The task will automatically exit
/// when the channel is dropped.
pub handle: JoinHandle<()>,
pub messages: MessageSender<N>,
/// Tendermint machine to be run on an asynchronous task.
pub machine: TendermintMachine<N>,
}
impl<N: Network + 'static> TendermintMachine<N> {
@ -175,7 +181,7 @@ impl<N: Network + 'static> TendermintMachine<N> {
let step = data.step();
// 27, 33, 41, 46, 60, 64
self.step = step;
self.queue.push((
self.queue.push_back((
true,
Message { sender: self.validator_id, number: self.number, round: self.round, data },
));
@ -239,14 +245,19 @@ impl<N: Network + 'static> TendermintMachine<N> {
self.round(Round(0));
}
/// Create a new Tendermint machine, for the specified proposer, from the specified block, with
/// the specified block as the one to propose next, returning a handle for the machine.
/// Create a new Tendermint machine, from the specified point, with the specified block as the
/// one to propose next. This will return a channel to send messages from the gossip layer and
/// the machine itself. The machine should have `run` called from an asynchronous task.
#[allow(clippy::new_ret_no_self)]
pub fn new(network: N, last: (BlockNumber, u64), proposal: N::Block) -> TendermintHandle<N> {
let (msg_send, mut msg_recv) = mpsc::channel(100); // Backlog to accept. Currently arbitrary
pub async fn new(
network: N,
last: (BlockNumber, u64),
proposal: N::Block,
) -> TendermintHandle<N> {
let (msg_send, msg_recv) = mpsc::unbounded();
TendermintHandle {
messages: msg_send,
handle: tokio::spawn(async move {
machine: {
let last_end = UNIX_EPOCH + Duration::from_secs(last.1);
// If the last block hasn't ended yet, sleep until it has
@ -271,7 +282,7 @@ impl<N: Network + 'static> TendermintMachine<N> {
let weights = Arc::new(network.weights());
let validator_id = signer.validator_id().await;
// 01-10
let mut machine = TendermintMachine {
TendermintMachine {
network,
signer,
validators,
@ -284,14 +295,15 @@ impl<N: Network + 'static> TendermintMachine<N> {
// The end time of the last block is the start time for this one
// The Commit explicitly contains the end time, so loading the last commit will provide
// this. The only exception is for the genesis block, which doesn't have a commit
// Using the genesis time in place will cause this block to be created immediately after
// it, without the standard amount of separation (so their times will be equivalent or
// minimally offset)
// Using the genesis time in place will cause this block to be created immediately
// after it, without the standard amount of separation (so their times will be
// equivalent or minimally offset)
// For callers wishing to avoid this, they should pass (0, GENESIS + BLOCK_TIME)
start_time: last_time,
personal_proposal: proposal,
queue: vec![],
queue: VecDeque::new(),
msg_recv,
log: MessageLog::new(weights),
round: Round(0),
@ -302,95 +314,101 @@ impl<N: Network + 'static> TendermintMachine<N> {
valid: None,
timeouts: HashMap::new(),
};
machine.round(Round(0));
}
},
}
}
loop {
// Check if any timeouts have been triggered
let now = Instant::now();
let (t1, t2, t3) = {
let ready = |step| machine.timeouts.get(&step).unwrap_or(&now) < &now;
(ready(Step::Propose), ready(Step::Prevote), ready(Step::Precommit))
};
pub async fn run(mut self) {
self.round(Round(0));
// Propose timeout
if t1 && (machine.step == Step::Propose) {
machine.broadcast(Data::Prevote(None));
}
'outer: loop {
// Check if any timeouts have been triggered
let now = Instant::now();
let (t1, t2, t3) = {
let ready = |step| self.timeouts.get(&step).unwrap_or(&now) < &now;
(ready(Step::Propose), ready(Step::Prevote), ready(Step::Precommit))
};
// Prevote timeout
if t2 && (machine.step == Step::Prevote) {
machine.broadcast(Data::Precommit(None));
}
// Propose timeout
if t1 && (self.step == Step::Propose) {
self.broadcast(Data::Prevote(None));
}
// Precommit timeout
if t3 {
machine.round(Round(machine.round.0.wrapping_add(1)));
}
// Prevote timeout
if t2 && (self.step == Step::Prevote) {
self.broadcast(Data::Precommit(None));
}
// Drain the channel of messages
let mut broken = false;
loop {
match msg_recv.try_recv() {
Ok(msg) => {
if !msg.verify_signature(&machine.validators) {
continue;
}
machine.queue.push((false, msg.msg));
}
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => broken = true,
// Precommit timeout
if t3 {
self.round(Round(self.round.0.wrapping_add(1)));
}
// Drain the channel of messages
loop {
match futures::poll!(self.msg_recv.next()) {
Poll::Ready(Some(msg)) => {
if !msg.verify_signature(&self.validators) {
continue;
}
self.queue.push_back((false, msg.msg));
}
if broken {
Poll::Ready(None) => {
break 'outer;
}
Poll::Pending => {
break;
}
// Handle the queue
let mut queue = machine.queue.drain(..).collect::<Vec<_>>();
for (broadcast, msg) in queue.drain(..) {
let res = machine.message(msg.clone()).await;
if res.is_err() && broadcast {
panic!("honest node had invalid behavior");
}
match res {
Ok(None) => (),
Ok(Some(block)) => {
let mut validators = vec![];
let mut sigs = vec![];
for (v, sig) in machine.log.precommitted.iter().filter_map(|(k, (id, sig))| {
Some((*k, sig.clone())).filter(|_| id == &block.id())
}) {
validators.push(v);
sigs.push(sig);
}
let commit = Commit {
end_time: machine.canonical_end_time(msg.round),
validators,
signature: N::SignatureScheme::aggregate(&sigs),
};
debug_assert!(machine.network.verify_commit(block.id(), &commit));
let proposal = machine.network.add_block(block, commit).await;
machine.reset(msg.round, proposal).await;
}
Err(TendermintError::Malicious(validator)) => {
machine.network.slash(validator).await;
}
Err(TendermintError::Temporal) => (),
}
if broadcast {
let sig = machine.signer.sign(&msg.encode()).await;
machine.network.broadcast(SignedMessage { msg, sig }).await;
}
}
yield_now().await;
}
}),
}
// Handle the queue
if let Some((broadcast, msg)) = self.queue.pop_front() {
let res = self.message(msg.clone()).await;
if res.is_err() && broadcast {
panic!("honest node had invalid behavior");
}
match res {
Ok(None) => (),
Ok(Some(block)) => {
let mut validators = vec![];
let mut sigs = vec![];
for (v, sig) in self
.log
.precommitted
.iter()
.filter_map(|(k, (id, sig))| Some((*k, sig.clone())).filter(|_| id == &block.id()))
{
validators.push(v);
sigs.push(sig);
}
let commit = Commit {
end_time: self.canonical_end_time(msg.round),
validators,
signature: N::SignatureScheme::aggregate(&sigs),
};
debug_assert!(self.network.verify_commit(block.id(), &commit));
let proposal = self.network.add_block(block, commit).await;
self.reset(msg.round, proposal).await;
}
Err(TendermintError::Malicious(validator)) => {
self.network.slash(validator).await;
}
Err(TendermintError::Temporal) => (),
}
if broadcast {
let sig = self.signer.sign(&msg.encode()).await;
self.network.broadcast(SignedMessage { msg, sig }).await;
}
}
// futures::pending here does not work
tokio::task::yield_now().await;
}
}

View file

@ -7,9 +7,10 @@ use async_trait::async_trait;
use parity_scale_codec::{Encode, Decode};
use futures::SinkExt;
use tokio::{sync::RwLock, time::sleep};
use tendermint_machine::{ext::*, SignedMessage, TendermintMachine, TendermintHandle};
use tendermint_machine::{ext::*, SignedMessage, MessageSender, TendermintMachine, TendermintHandle};
type TestValidatorId = u16;
type TestBlockId = [u8; 4];
@ -93,7 +94,7 @@ impl Block for TestBlock {
}
}
struct TestNetwork(u16, Arc<RwLock<Vec<TendermintHandle<Self>>>>);
struct TestNetwork(u16, Arc<RwLock<Vec<MessageSender<Self>>>>);
#[async_trait]
impl Network for TestNetwork {
@ -117,8 +118,8 @@ impl Network for TestNetwork {
}
async fn broadcast(&mut self, msg: SignedMessage<TestValidatorId, Self::Block, [u8; 32]>) {
for handle in self.1.write().await.iter_mut() {
handle.messages.send(msg.clone()).await.unwrap();
for messages in self.1.write().await.iter_mut() {
messages.send(msg.clone()).await.unwrap();
}
}
@ -144,17 +145,20 @@ impl Network for TestNetwork {
}
impl TestNetwork {
async fn new(validators: usize) -> Arc<RwLock<Vec<TendermintHandle<Self>>>> {
async fn new(validators: usize) -> Arc<RwLock<Vec<MessageSender<Self>>>> {
let arc = Arc::new(RwLock::new(vec![]));
{
let mut write = arc.write().await;
for i in 0 .. validators {
let i = u16::try_from(i).unwrap();
write.push(TendermintMachine::new(
let TendermintHandle { messages, machine } = TendermintMachine::new(
TestNetwork(i, arc.clone()),
(BlockNumber(1), (SystemTime::now().duration_since(UNIX_EPOCH)).unwrap().as_secs()),
TestBlock { id: 1u32.to_le_bytes(), valid: Ok(()) },
));
)
.await;
tokio::task::spawn(machine.run());
write.push(messages);
}
}
arc