mirror of
https://github.com/serai-dex/serai.git
synced 2025-01-22 10:44:53 +00:00
Replace Tendermint step with sync_block
Step moved a step forward after an externally synced/added block. This created a race condition to add the block between the sync process and the Tendermint machine. Now that the block routes through Tendermint, there is no such race condition.
This commit is contained in:
parent
9bea368d36
commit
5858b6c03e
3 changed files with 43 additions and 22 deletions
|
@ -117,9 +117,10 @@ impl<S: SignatureScheme> SignatureScheme for Arc<S> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A commit for a specific block. The list of validators have weight exceeding the threshold for
|
/// A commit for a specific block.
|
||||||
/// a valid commit.
|
///
|
||||||
#[derive(Clone, PartialEq, Debug, Encode, Decode)]
|
/// The list of validators have weight exceeding the threshold for a valid commit.
|
||||||
|
#[derive(PartialEq, Debug, Encode, Decode)]
|
||||||
pub struct Commit<S: SignatureScheme> {
|
pub struct Commit<S: SignatureScheme> {
|
||||||
/// End time of the round which created this commit, used as the start time of the next block.
|
/// End time of the round which created this commit, used as the start time of the next block.
|
||||||
pub end_time: u64,
|
pub end_time: u64,
|
||||||
|
@ -129,6 +130,16 @@ pub struct Commit<S: SignatureScheme> {
|
||||||
pub signature: S::AggregateSignature,
|
pub signature: S::AggregateSignature,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<S: SignatureScheme> Clone for Commit<S> {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
Self {
|
||||||
|
end_time: self.end_time,
|
||||||
|
validators: self.validators.clone(),
|
||||||
|
signature: self.signature.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Weights for the validators present.
|
/// Weights for the validators present.
|
||||||
pub trait Weights: Send + Sync {
|
pub trait Weights: Send + Sync {
|
||||||
type ValidatorId: ValidatorId;
|
type ValidatorId: ValidatorId;
|
||||||
|
|
|
@ -135,17 +135,18 @@ pub struct TendermintMachine<N: Network> {
|
||||||
|
|
||||||
queue: VecDeque<MessageFor<N>>,
|
queue: VecDeque<MessageFor<N>>,
|
||||||
msg_recv: mpsc::UnboundedReceiver<SignedMessageFor<N>>,
|
msg_recv: mpsc::UnboundedReceiver<SignedMessageFor<N>>,
|
||||||
#[allow(clippy::type_complexity)]
|
synced_block_recv: mpsc::UnboundedReceiver<SyncedBlock<N>>,
|
||||||
step_recv: mpsc::UnboundedReceiver<(BlockNumber, Commit<N::SignatureScheme>, Option<N::Block>)>,
|
|
||||||
|
|
||||||
block: BlockData<N>,
|
block: BlockData<N>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type StepSender<N> = mpsc::UnboundedSender<(
|
pub struct SyncedBlock<N: Network> {
|
||||||
BlockNumber,
|
pub number: BlockNumber,
|
||||||
Commit<<N as Network>::SignatureScheme>,
|
pub block: <N as Network>::Block,
|
||||||
Option<<N as Network>::Block>,
|
pub commit: Commit<<N as Network>::SignatureScheme>,
|
||||||
)>;
|
}
|
||||||
|
|
||||||
|
pub type SyncedBlockSender<N> = mpsc::UnboundedSender<SyncedBlock<N>>;
|
||||||
|
|
||||||
pub type MessageSender<N> = mpsc::UnboundedSender<SignedMessageFor<N>>;
|
pub type MessageSender<N> = mpsc::UnboundedSender<SignedMessageFor<N>>;
|
||||||
|
|
||||||
|
@ -153,7 +154,7 @@ pub type MessageSender<N> = mpsc::UnboundedSender<SignedMessageFor<N>>;
|
||||||
pub struct TendermintHandle<N: Network> {
|
pub struct TendermintHandle<N: Network> {
|
||||||
/// Channel to trigger the machine to move to the next block.
|
/// Channel to trigger the machine to move to the next block.
|
||||||
/// Takes in the the previous block's commit, along with the new proposal.
|
/// Takes in the the previous block's commit, along with the new proposal.
|
||||||
pub step: StepSender<N>,
|
pub synced_block: SyncedBlockSender<N>,
|
||||||
/// Channel to send messages received from the P2P layer.
|
/// Channel to send messages received from the P2P layer.
|
||||||
pub messages: MessageSender<N>,
|
pub messages: MessageSender<N>,
|
||||||
/// Tendermint machine to be run on an asynchronous task.
|
/// Tendermint machine to be run on an asynchronous task.
|
||||||
|
@ -252,9 +253,9 @@ impl<N: Network + 'static> TendermintMachine<N> {
|
||||||
proposal: N::Block,
|
proposal: N::Block,
|
||||||
) -> TendermintHandle<N> {
|
) -> TendermintHandle<N> {
|
||||||
let (msg_send, msg_recv) = mpsc::unbounded();
|
let (msg_send, msg_recv) = mpsc::unbounded();
|
||||||
let (step_send, step_recv) = mpsc::unbounded();
|
let (synced_block_send, synced_block_recv) = mpsc::unbounded();
|
||||||
TendermintHandle {
|
TendermintHandle {
|
||||||
step: step_send,
|
synced_block: synced_block_send,
|
||||||
messages: msg_send,
|
messages: msg_send,
|
||||||
machine: {
|
machine: {
|
||||||
let sys_time = sys_time(last_time);
|
let sys_time = sys_time(last_time);
|
||||||
|
@ -274,7 +275,7 @@ impl<N: Network + 'static> TendermintMachine<N> {
|
||||||
|
|
||||||
queue: VecDeque::new(),
|
queue: VecDeque::new(),
|
||||||
msg_recv,
|
msg_recv,
|
||||||
step_recv,
|
synced_block_recv,
|
||||||
|
|
||||||
block: BlockData::new(
|
block: BlockData::new(
|
||||||
weights,
|
weights,
|
||||||
|
@ -309,12 +310,19 @@ impl<N: Network + 'static> TendermintMachine<N> {
|
||||||
if let Some((broadcast, msg)) = futures::select_biased! {
|
if let Some((broadcast, msg)) = futures::select_biased! {
|
||||||
// Handle a new block occuring externally (an external sync loop)
|
// Handle a new block occuring externally (an external sync loop)
|
||||||
// Has the highest priority as it makes all other futures here irrelevant
|
// Has the highest priority as it makes all other futures here irrelevant
|
||||||
msg = self.step_recv.next() => {
|
msg = self.synced_block_recv.next() => {
|
||||||
if let Some((block_number, commit, proposal)) = msg {
|
if let Some(SyncedBlock { number, block, commit }) = msg {
|
||||||
// Commit is for a block we've already moved past
|
// Commit is for a block we've already moved past
|
||||||
if block_number != self.block.number {
|
if number != self.block.number {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Commit is invalid
|
||||||
|
if !self.network.verify_commit(block.id(), &commit) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let proposal = self.network.add_block(block, commit.clone()).await;
|
||||||
self.reset_by_commit(commit, proposal).await;
|
self.reset_by_commit(commit, proposal).await;
|
||||||
None
|
None
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -11,7 +11,7 @@ use futures::SinkExt;
|
||||||
use tokio::{sync::RwLock, time::sleep};
|
use tokio::{sync::RwLock, time::sleep};
|
||||||
|
|
||||||
use tendermint_machine::{
|
use tendermint_machine::{
|
||||||
ext::*, SignedMessageFor, StepSender, MessageSender, TendermintMachine, TendermintHandle,
|
ext::*, SignedMessageFor, SyncedBlockSender, MessageSender, TendermintMachine, TendermintHandle,
|
||||||
};
|
};
|
||||||
|
|
||||||
type TestValidatorId = u16;
|
type TestValidatorId = u16;
|
||||||
|
@ -97,7 +97,7 @@ impl Block for TestBlock {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(clippy::type_complexity)]
|
#[allow(clippy::type_complexity)]
|
||||||
struct TestNetwork(u16, Arc<RwLock<Vec<(MessageSender<Self>, StepSender<Self>)>>>);
|
struct TestNetwork(u16, Arc<RwLock<Vec<(MessageSender<Self>, SyncedBlockSender<Self>)>>>);
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl Network for TestNetwork {
|
impl Network for TestNetwork {
|
||||||
|
@ -149,13 +149,15 @@ impl Network for TestNetwork {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TestNetwork {
|
impl TestNetwork {
|
||||||
async fn new(validators: usize) -> Arc<RwLock<Vec<(MessageSender<Self>, StepSender<Self>)>>> {
|
async fn new(
|
||||||
|
validators: usize,
|
||||||
|
) -> Arc<RwLock<Vec<(MessageSender<Self>, SyncedBlockSender<Self>)>>> {
|
||||||
let arc = Arc::new(RwLock::new(vec![]));
|
let arc = Arc::new(RwLock::new(vec![]));
|
||||||
{
|
{
|
||||||
let mut write = arc.write().await;
|
let mut write = arc.write().await;
|
||||||
for i in 0 .. validators {
|
for i in 0 .. validators {
|
||||||
let i = u16::try_from(i).unwrap();
|
let i = u16::try_from(i).unwrap();
|
||||||
let TendermintHandle { messages, machine, step } = TendermintMachine::new(
|
let TendermintHandle { messages, synced_block, machine } = TendermintMachine::new(
|
||||||
TestNetwork(i, arc.clone()),
|
TestNetwork(i, arc.clone()),
|
||||||
BlockNumber(1),
|
BlockNumber(1),
|
||||||
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(),
|
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(),
|
||||||
|
@ -163,7 +165,7 @@ impl TestNetwork {
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
tokio::task::spawn(machine.run());
|
tokio::task::spawn(machine.run());
|
||||||
write.push((messages, step));
|
write.push((messages, synced_block));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
arc
|
arc
|
||||||
|
|
Loading…
Reference in a new issue