mirror of
https://github.com/serai-dex/serai.git
synced 2025-01-23 03:05:07 +00:00
Don't return from sync_block until the Tendermint machine returns if it's valid or not
We had a race condition where'd we be informed of blocks 1 .. 3, and immediately add 1 .. 3. Because we immediately tried to add 2 after 1, it'd fail since the tip was still the genesis, yet 2 needs the tip to be 1. Adding a channel, while ugly, was the simplest way to accomplish this. Also has any added block be broadcasted. Else there's a race condition where a node which syncs up to the most recent block does so, yet fails to add the next block when it's committed to.
This commit is contained in:
parent
14388e746c
commit
cc491ee1e1
4 changed files with 86 additions and 32 deletions
|
@ -8,11 +8,11 @@ use zeroize::Zeroizing;
|
|||
use ciphersuite::{Ciphersuite, Ristretto};
|
||||
|
||||
use scale::Decode;
|
||||
use futures::SinkExt;
|
||||
use futures::{StreamExt, SinkExt};
|
||||
use ::tendermint::{
|
||||
ext::{BlockNumber, Commit, Block as BlockTrait, Network},
|
||||
SignedMessageFor, SyncedBlock, SyncedBlockSender, MessageSender, TendermintMachine,
|
||||
TendermintHandle,
|
||||
SignedMessageFor, SyncedBlock, SyncedBlockSender, SyncedBlockResultReceiver, MessageSender,
|
||||
TendermintMachine, TendermintHandle,
|
||||
};
|
||||
|
||||
use serai_db::Db;
|
||||
|
@ -53,8 +53,9 @@ pub const ACCOUNT_MEMPOOL_LIMIT: u32 = 50;
|
|||
// participant from flooding disks and causing out of space errors in order processes.
|
||||
pub const BLOCK_SIZE_LIMIT: usize = 350_000;
|
||||
|
||||
pub(crate) const TRANSACTION_MESSAGE: u8 = 0;
|
||||
pub(crate) const TENDERMINT_MESSAGE: u8 = 1;
|
||||
pub(crate) const TENDERMINT_MESSAGE: u8 = 0;
|
||||
pub(crate) const BLOCK_MESSAGE: u8 = 1;
|
||||
pub(crate) const TRANSACTION_MESSAGE: u8 = 2;
|
||||
|
||||
/// An item which can be read and written.
|
||||
pub trait ReadWrite: Sized {
|
||||
|
@ -89,6 +90,7 @@ pub struct Tributary<D: Db, T: Transaction, P: P2p> {
|
|||
network: TendermintNetwork<D, T, P>,
|
||||
|
||||
synced_block: SyncedBlockSender<TendermintNetwork<D, T, P>>,
|
||||
synced_block_result: Arc<RwLock<SyncedBlockResultReceiver>>,
|
||||
messages: Arc<RwLock<MessageSender<TendermintNetwork<D, T, P>>>>,
|
||||
}
|
||||
|
||||
|
@ -119,11 +121,18 @@ impl<D: Db, T: Transaction, P: P2p> Tributary<D, T, P> {
|
|||
|
||||
let network = TendermintNetwork { genesis, signer, validators, blockchain, p2p };
|
||||
|
||||
let TendermintHandle { synced_block, messages, machine } =
|
||||
let TendermintHandle { synced_block, synced_block_result, messages, machine } =
|
||||
TendermintMachine::new(network.clone(), block_number, start_time, proposal).await;
|
||||
tokio::task::spawn(machine.run());
|
||||
|
||||
Some(Self { db, genesis, network, synced_block, messages: Arc::new(RwLock::new(messages)) })
|
||||
Some(Self {
|
||||
db,
|
||||
genesis,
|
||||
network,
|
||||
synced_block,
|
||||
synced_block_result: Arc::new(RwLock::new(synced_block_result)),
|
||||
messages: Arc::new(RwLock::new(messages)),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn block_time() -> u32 {
|
||||
|
@ -149,6 +158,9 @@ impl<D: Db, T: Transaction, P: P2p> Tributary<D, T, P> {
|
|||
pub fn commit(&self, hash: &[u8; 32]) -> Option<Vec<u8>> {
|
||||
Blockchain::<D, T>::commit_from_db(&self.db, hash)
|
||||
}
|
||||
pub fn parsed_commit(&self, hash: &[u8; 32]) -> Option<Commit<Validators>> {
|
||||
self.commit(hash).map(|commit| Commit::<Validators>::decode(&mut commit.as_ref()).unwrap())
|
||||
}
|
||||
pub fn block_after(&self, hash: &[u8; 32]) -> Option<[u8; 32]> {
|
||||
Blockchain::<D, T>::block_after(&self.db, hash)
|
||||
}
|
||||
|
@ -182,32 +194,35 @@ impl<D: Db, T: Transaction, P: P2p> Tributary<D, T, P> {
|
|||
// Sync a block.
|
||||
// TODO: Since we have a static validator set, we should only need the tail commit?
|
||||
pub async fn sync_block(&mut self, block: Block<T>, commit: Vec<u8>) -> bool {
|
||||
let mut result = self.synced_block_result.write().await;
|
||||
|
||||
let (tip, block_number) = {
|
||||
let blockchain = self.network.blockchain.read().await;
|
||||
(blockchain.tip(), blockchain.block_number())
|
||||
};
|
||||
|
||||
if block.header.parent != tip {
|
||||
log::debug!("told to sync a block whose parent wasn't our tip");
|
||||
return false;
|
||||
}
|
||||
|
||||
let block = TendermintBlock(block.serialize());
|
||||
let Ok(commit) = Commit::<Arc<Validators>>::decode(&mut commit.as_ref()) else {
|
||||
log::error!("sent an invalidly serialized commit");
|
||||
return false;
|
||||
};
|
||||
if !self.network.verify_commit(block.id(), &commit) {
|
||||
log::error!("sent an invalid commit");
|
||||
return false;
|
||||
}
|
||||
|
||||
let number = BlockNumber((block_number + 1).into());
|
||||
self.synced_block.send(SyncedBlock { number, block, commit }).await.unwrap();
|
||||
true
|
||||
result.next().await.unwrap()
|
||||
}
|
||||
|
||||
// Return true if the message should be rebroadcasted.
|
||||
// Safe to be &self since the only usage of self is on self.network.blockchain and self.messages,
|
||||
// both which successfully acquire their own write locks and don't rely on each other
|
||||
pub async fn handle_message(&self, msg: &[u8]) -> bool {
|
||||
pub async fn handle_message(&mut self, msg: &[u8]) -> bool {
|
||||
match msg.first() {
|
||||
Some(&TRANSACTION_MESSAGE) => {
|
||||
let Ok(tx) = T::read::<&[u8]>(&mut &msg[1 ..]) else {
|
||||
|
@ -234,6 +249,19 @@ impl<D: Db, T: Transaction, P: P2p> Tributary<D, T, P> {
|
|||
false
|
||||
}
|
||||
|
||||
Some(&BLOCK_MESSAGE) => {
|
||||
let mut msg_ref = &msg[1 ..];
|
||||
let Ok(block) = Block::<T>::read(&mut msg_ref) else {
|
||||
log::error!("received invalid block message");
|
||||
return false;
|
||||
};
|
||||
let commit = msg[(msg.len() - msg_ref.len()) ..].to_vec();
|
||||
if self.sync_block(block, commit).await {
|
||||
log::debug!("synced block over p2p net instead of building the commit ourselves");
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -37,7 +37,8 @@ use tokio::{
|
|||
};
|
||||
|
||||
use crate::{
|
||||
TENDERMINT_MESSAGE, ReadWrite, Transaction, BlockHeader, Block, BlockError, Blockchain, P2p,
|
||||
TENDERMINT_MESSAGE, BLOCK_MESSAGE, ReadWrite, Transaction, BlockHeader, Block, BlockError,
|
||||
Blockchain, P2p,
|
||||
};
|
||||
|
||||
fn challenge(
|
||||
|
@ -56,7 +57,7 @@ fn challenge(
|
|||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Eq, Debug)]
|
||||
pub(crate) struct Signer {
|
||||
pub struct Signer {
|
||||
genesis: [u8; 32],
|
||||
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
|
||||
}
|
||||
|
@ -115,7 +116,7 @@ impl SignerTrait for Signer {
|
|||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Eq, Debug)]
|
||||
pub(crate) struct Validators {
|
||||
pub struct Validators {
|
||||
genesis: [u8; 32],
|
||||
total_weight: u64,
|
||||
weights: HashMap<[u8; 32], u64>,
|
||||
|
@ -281,7 +282,7 @@ impl<D: Db, T: Transaction, P: P2p> Network for TendermintNetwork<D, T, P> {
|
|||
|
||||
async fn add_block(
|
||||
&mut self,
|
||||
block: Self::Block,
|
||||
serialized_block: Self::Block,
|
||||
commit: Commit<Self::SignatureScheme>,
|
||||
) -> Option<Self::Block> {
|
||||
let invalid_block = || {
|
||||
|
@ -294,16 +295,25 @@ impl<D: Db, T: Transaction, P: P2p> Network for TendermintNetwork<D, T, P> {
|
|||
};
|
||||
|
||||
// Tendermint should only produce valid commits
|
||||
assert!(self.verify_commit(block.id(), &commit));
|
||||
assert!(self.verify_commit(serialized_block.id(), &commit));
|
||||
|
||||
let Ok(block) = Block::read::<&[u8]>(&mut block.0.as_ref()) else {
|
||||
let Ok(block) = Block::read::<&[u8]>(&mut serialized_block.0.as_ref()) else {
|
||||
return invalid_block();
|
||||
};
|
||||
|
||||
let encoded_commit = commit.encode();
|
||||
loop {
|
||||
let block_res = self.blockchain.write().await.add_block(&block, commit.encode());
|
||||
let block_res = self.blockchain.write().await.add_block(&block, encoded_commit.clone());
|
||||
match block_res {
|
||||
Ok(()) => break,
|
||||
Ok(()) => {
|
||||
// If we successfully added this block, broadcast it
|
||||
// TODO: Move this under the coordinator once we set up on new block notifications?
|
||||
let mut msg = serialized_block.0;
|
||||
msg.insert(0, BLOCK_MESSAGE);
|
||||
msg.extend(encoded_commit);
|
||||
self.p2p.broadcast(self.genesis, msg).await;
|
||||
break;
|
||||
}
|
||||
Err(BlockError::NonLocalProvided(hash)) => {
|
||||
log::error!(
|
||||
"missing provided transaction {} which other validators on tributary {} had",
|
||||
|
|
|
@ -9,7 +9,7 @@ use std::{
|
|||
use parity_scale_codec::{Encode, Decode};
|
||||
|
||||
use futures::{
|
||||
FutureExt, StreamExt,
|
||||
FutureExt, StreamExt, SinkExt,
|
||||
future::{self, Fuse},
|
||||
channel::mpsc,
|
||||
};
|
||||
|
@ -135,6 +135,7 @@ pub struct TendermintMachine<N: Network> {
|
|||
queue: VecDeque<MessageFor<N>>,
|
||||
msg_recv: mpsc::UnboundedReceiver<SignedMessageFor<N>>,
|
||||
synced_block_recv: mpsc::UnboundedReceiver<SyncedBlock<N>>,
|
||||
synced_block_result_send: mpsc::UnboundedSender<bool>,
|
||||
|
||||
block: BlockData<N>,
|
||||
}
|
||||
|
@ -146,6 +147,7 @@ pub struct SyncedBlock<N: Network> {
|
|||
}
|
||||
|
||||
pub type SyncedBlockSender<N> = mpsc::UnboundedSender<SyncedBlock<N>>;
|
||||
pub type SyncedBlockResultReceiver = mpsc::UnboundedReceiver<bool>;
|
||||
|
||||
pub type MessageSender<N> = mpsc::UnboundedSender<SignedMessageFor<N>>;
|
||||
|
||||
|
@ -154,6 +156,8 @@ pub struct TendermintHandle<N: Network> {
|
|||
/// Channel to trigger the machine to move to the next block.
|
||||
/// Takes in the the previous block's commit, along with the new proposal.
|
||||
pub synced_block: SyncedBlockSender<N>,
|
||||
/// A channel to communicate the result of a synced_block message.
|
||||
pub synced_block_result: SyncedBlockResultReceiver,
|
||||
/// Channel to send messages received from the P2P layer.
|
||||
pub messages: MessageSender<N>,
|
||||
/// Tendermint machine to be run on an asynchronous task.
|
||||
|
@ -253,8 +257,10 @@ impl<N: Network + 'static> TendermintMachine<N> {
|
|||
) -> TendermintHandle<N> {
|
||||
let (msg_send, msg_recv) = mpsc::unbounded();
|
||||
let (synced_block_send, synced_block_recv) = mpsc::unbounded();
|
||||
let (synced_block_result_send, synced_block_result_recv) = mpsc::unbounded();
|
||||
TendermintHandle {
|
||||
synced_block: synced_block_send,
|
||||
synced_block_result: synced_block_result_recv,
|
||||
messages: msg_send,
|
||||
machine: {
|
||||
let sys_time = sys_time(last_time);
|
||||
|
@ -275,6 +281,7 @@ impl<N: Network + 'static> TendermintMachine<N> {
|
|||
queue: VecDeque::new(),
|
||||
msg_recv,
|
||||
synced_block_recv,
|
||||
synced_block_result_send,
|
||||
|
||||
block: BlockData::new(
|
||||
weights,
|
||||
|
@ -313,16 +320,19 @@ impl<N: Network + 'static> TendermintMachine<N> {
|
|||
if let Some(SyncedBlock { number, block, commit }) = msg {
|
||||
// Commit is for a block we've already moved past
|
||||
if number != self.block.number {
|
||||
self.synced_block_result_send.send(false).await.unwrap();
|
||||
continue;
|
||||
}
|
||||
|
||||
// Commit is invalid
|
||||
if !self.network.verify_commit(block.id(), &commit) {
|
||||
self.synced_block_result_send.send(false).await.unwrap();
|
||||
continue;
|
||||
}
|
||||
|
||||
let proposal = self.network.add_block(block, commit.clone()).await;
|
||||
self.reset_by_commit(commit, proposal).await;
|
||||
self.synced_block_result_send.send(true).await.unwrap();
|
||||
None
|
||||
} else {
|
||||
break;
|
||||
|
|
|
@ -11,7 +11,8 @@ use futures::SinkExt;
|
|||
use tokio::{sync::RwLock, time::sleep};
|
||||
|
||||
use tendermint_machine::{
|
||||
ext::*, SignedMessageFor, SyncedBlockSender, MessageSender, TendermintMachine, TendermintHandle,
|
||||
ext::*, SignedMessageFor, SyncedBlockSender, SyncedBlockResultReceiver, MessageSender,
|
||||
TendermintMachine, TendermintHandle,
|
||||
};
|
||||
|
||||
type TestValidatorId = u16;
|
||||
|
@ -97,7 +98,10 @@ impl Block for TestBlock {
|
|||
}
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
struct TestNetwork(u16, Arc<RwLock<Vec<(MessageSender<Self>, SyncedBlockSender<Self>)>>>);
|
||||
struct TestNetwork(
|
||||
u16,
|
||||
Arc<RwLock<Vec<(MessageSender<Self>, SyncedBlockSender<Self>, SyncedBlockResultReceiver)>>>,
|
||||
);
|
||||
|
||||
#[async_trait]
|
||||
impl Network for TestNetwork {
|
||||
|
@ -122,7 +126,7 @@ impl Network for TestNetwork {
|
|||
}
|
||||
|
||||
async fn broadcast(&mut self, msg: SignedMessageFor<Self>) {
|
||||
for (messages, _) in self.1.write().await.iter_mut() {
|
||||
for (messages, _, _) in self.1.write().await.iter_mut() {
|
||||
messages.send(msg.clone()).await.unwrap();
|
||||
}
|
||||
}
|
||||
|
@ -151,21 +155,23 @@ impl Network for TestNetwork {
|
|||
impl TestNetwork {
|
||||
async fn new(
|
||||
validators: usize,
|
||||
) -> Arc<RwLock<Vec<(MessageSender<Self>, SyncedBlockSender<Self>)>>> {
|
||||
) -> Arc<RwLock<Vec<(MessageSender<Self>, SyncedBlockSender<Self>, SyncedBlockResultReceiver)>>>
|
||||
{
|
||||
let arc = Arc::new(RwLock::new(vec![]));
|
||||
{
|
||||
let mut write = arc.write().await;
|
||||
for i in 0 .. validators {
|
||||
let i = u16::try_from(i).unwrap();
|
||||
let TendermintHandle { messages, synced_block, machine } = TendermintMachine::new(
|
||||
TestNetwork(i, arc.clone()),
|
||||
BlockNumber(1),
|
||||
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(),
|
||||
TestBlock { id: 1u32.to_le_bytes(), valid: Ok(()) },
|
||||
)
|
||||
.await;
|
||||
let TendermintHandle { messages, synced_block, synced_block_result, machine } =
|
||||
TendermintMachine::new(
|
||||
TestNetwork(i, arc.clone()),
|
||||
BlockNumber(1),
|
||||
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(),
|
||||
TestBlock { id: 1u32.to_le_bytes(), valid: Ok(()) },
|
||||
)
|
||||
.await;
|
||||
tokio::task::spawn(machine.run());
|
||||
write.push((messages, synced_block));
|
||||
write.push((messages, synced_block, synced_block_result));
|
||||
}
|
||||
}
|
||||
arc
|
||||
|
|
Loading…
Reference in a new issue