mirror of
https://github.com/serai-dex/serai.git
synced 2025-01-22 02:34:55 +00:00
Use &self for handle_message and sync_block in Tributary
They used &mut self to prevent execution at the same time. This uses a lock
over the channel to achieve the same security, without requiring a lock over
the entire tributary.
This fixes post-provided Provided transactions. sync_block waited for the TX to
be provided, yet it never would as sync_block held a mutable reference over the
entire Tributary, preventing any other read/write operations of any scope.
A timeout increased (bc2f23f72b
) due to this bug
not being identified has been decreased back, thankfully.
Also shims in basic support for Completed, which was the WIP before this bug
was identified.
This commit is contained in:
parent
6268bbd7c8
commit
2db53d5434
5 changed files with 49 additions and 22 deletions
|
@ -281,7 +281,7 @@ pub async fn handle_p2p<D: Db, P: P2p>(
|
||||||
};
|
};
|
||||||
|
|
||||||
log::trace!("handling message for tributary {:?}", tributary.spec.set());
|
log::trace!("handling message for tributary {:?}", tributary.spec.set());
|
||||||
if tributary.tributary.write().await.handle_message(&msg.msg).await {
|
if tributary.tributary.read().await.handle_message(&msg.msg).await {
|
||||||
P2p::broadcast(&p2p, msg.kind, msg.msg).await;
|
P2p::broadcast(&p2p, msg.kind, msg.msg).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -385,11 +385,7 @@ pub async fn handle_p2p<D: Db, P: P2p>(
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
|
|
||||||
// TODO: Add a check which doesn't require write to see if this is the next block in
|
let res = tributary.tributary.read().await.sync_block(block, msg.msg).await;
|
||||||
// line
|
|
||||||
// If it's in the future, hold it for up to T time
|
|
||||||
|
|
||||||
let res = tributary.tributary.write().await.sync_block(block, msg.msg).await;
|
|
||||||
log::debug!("received block from {:?}, sync_block returned {}", msg.sender, res);
|
log::debug!("received block from {:?}, sync_block returned {}", msg.sender, res);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -518,8 +514,9 @@ pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>(
|
||||||
data: share,
|
data: share,
|
||||||
signed: Transaction::empty_signed(),
|
signed: Transaction::empty_signed(),
|
||||||
})),
|
})),
|
||||||
// TODO
|
sign::ProcessorMessage::Completed { key: _, id, tx } => {
|
||||||
sign::ProcessorMessage::Completed { .. } => todo!(),
|
Some(Transaction::SignCompleted(id, tx, Transaction::empty_signed()))
|
||||||
|
}
|
||||||
},
|
},
|
||||||
ProcessorMessage::Coordinator(inner_msg) => match inner_msg {
|
ProcessorMessage::Coordinator(inner_msg) => match inner_msg {
|
||||||
coordinator::ProcessorMessage::SubstrateBlockAck { network, block, plans } => {
|
coordinator::ProcessorMessage::SubstrateBlockAck { network, block, plans } => {
|
||||||
|
@ -624,7 +621,9 @@ pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>(
|
||||||
|
|
||||||
// If this created a transaction, publish it
|
// If this created a transaction, publish it
|
||||||
if let Some(mut tx) = tx {
|
if let Some(mut tx) = tx {
|
||||||
|
log::trace!("processor message effected transaction {}", hex::encode(tx.hash()));
|
||||||
let tributaries = tributaries.read().await;
|
let tributaries = tributaries.read().await;
|
||||||
|
log::trace!("read global tributaries");
|
||||||
let Some(tributary) = tributaries.get(&genesis) else {
|
let Some(tributary) = tributaries.get(&genesis) else {
|
||||||
// TODO: This can happen since Substrate tells the Processor to generate commitments
|
// TODO: This can happen since Substrate tells the Processor to generate commitments
|
||||||
// at the same time it tells the Tributary to be created
|
// at the same time it tells the Tributary to be created
|
||||||
|
@ -632,9 +631,11 @@ pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>(
|
||||||
panic!("processor is operating on tributary we don't have");
|
panic!("processor is operating on tributary we don't have");
|
||||||
};
|
};
|
||||||
let tributary = tributary.tributary.read().await;
|
let tributary = tributary.tributary.read().await;
|
||||||
|
log::trace!("read specific tributary");
|
||||||
|
|
||||||
match tx.kind() {
|
match tx.kind() {
|
||||||
TransactionKind::Provided(_) => {
|
TransactionKind::Provided(_) => {
|
||||||
|
log::trace!("providing transaction {}", hex::encode(tx.hash()));
|
||||||
let res = tributary.provide_transaction(tx).await;
|
let res = tributary.provide_transaction(tx).await;
|
||||||
if !(res.is_ok() || (res == Err(ProvidedError::AlreadyProvided))) {
|
if !(res.is_ok() || (res == Err(ProvidedError::AlreadyProvided))) {
|
||||||
panic!("provided an invalid transaction: {res:?}");
|
panic!("provided an invalid transaction: {res:?}");
|
||||||
|
|
|
@ -589,8 +589,20 @@ pub async fn handle_application_tx<
|
||||||
None => {}
|
None => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Transaction::SignCompleted(_, _, _) => {
|
Transaction::SignCompleted(id, tx, signed) => {
|
||||||
// TODO
|
// TODO: Confirm this is a valid ID
|
||||||
|
// TODO: Confirm this signer hasn't prior published a completion
|
||||||
|
let Some(key_pair) = TributaryDb::<D>::key_pair(txn, spec.set()) else { todo!() };
|
||||||
|
processors
|
||||||
|
.send(
|
||||||
|
spec.set().network,
|
||||||
|
CoordinatorMessage::Sign(sign::CoordinatorMessage::Completed {
|
||||||
|
key: key_pair.1.to_vec(),
|
||||||
|
id,
|
||||||
|
tx,
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -243,6 +243,9 @@ pub enum Transaction {
|
||||||
|
|
||||||
SignPreprocess(SignData),
|
SignPreprocess(SignData),
|
||||||
SignShare(SignData),
|
SignShare(SignData),
|
||||||
|
// TODO: We can't make this an Unsigned as we need to prevent spam, which requires a max of 1
|
||||||
|
// claim per sender
|
||||||
|
// Can we de-duplicate across senders though, if they claim the same hash completes?
|
||||||
SignCompleted([u8; 32], Vec<u8>, Signed),
|
SignCompleted([u8; 32], Vec<u8>, Signed),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -8,7 +8,7 @@ use zeroize::Zeroizing;
|
||||||
use ciphersuite::{Ciphersuite, Ristretto};
|
use ciphersuite::{Ciphersuite, Ristretto};
|
||||||
|
|
||||||
use scale::Decode;
|
use scale::Decode;
|
||||||
use futures::{StreamExt, SinkExt};
|
use futures::{StreamExt, SinkExt, channel::mpsc::UnboundedReceiver};
|
||||||
use ::tendermint::{
|
use ::tendermint::{
|
||||||
ext::{BlockNumber, Commit, Block as BlockTrait, Network},
|
ext::{BlockNumber, Commit, Block as BlockTrait, Network},
|
||||||
SignedMessageFor, SyncedBlock, SyncedBlockSender, SyncedBlockResultReceiver, MessageSender,
|
SignedMessageFor, SyncedBlock, SyncedBlockSender, SyncedBlockResultReceiver, MessageSender,
|
||||||
|
@ -144,7 +144,7 @@ pub struct Tributary<D: Db, T: TransactionTrait, P: P2p> {
|
||||||
genesis: [u8; 32],
|
genesis: [u8; 32],
|
||||||
network: TendermintNetwork<D, T, P>,
|
network: TendermintNetwork<D, T, P>,
|
||||||
|
|
||||||
synced_block: SyncedBlockSender<TendermintNetwork<D, T, P>>,
|
synced_block: Arc<RwLock<SyncedBlockSender<TendermintNetwork<D, T, P>>>>,
|
||||||
synced_block_result: Arc<RwLock<SyncedBlockResultReceiver>>,
|
synced_block_result: Arc<RwLock<SyncedBlockResultReceiver>>,
|
||||||
messages: Arc<RwLock<MessageSender<TendermintNetwork<D, T, P>>>>,
|
messages: Arc<RwLock<MessageSender<TendermintNetwork<D, T, P>>>>,
|
||||||
}
|
}
|
||||||
|
@ -188,7 +188,7 @@ impl<D: Db, T: TransactionTrait, P: P2p> Tributary<D, T, P> {
|
||||||
db,
|
db,
|
||||||
genesis,
|
genesis,
|
||||||
network,
|
network,
|
||||||
synced_block,
|
synced_block: Arc::new(RwLock::new(synced_block)),
|
||||||
synced_block_result: Arc::new(RwLock::new(synced_block_result)),
|
synced_block_result: Arc::new(RwLock::new(synced_block_result)),
|
||||||
messages: Arc::new(RwLock::new(messages)),
|
messages: Arc::new(RwLock::new(messages)),
|
||||||
})
|
})
|
||||||
|
@ -239,11 +239,12 @@ impl<D: Db, T: TransactionTrait, P: P2p> Tributary<D, T, P> {
|
||||||
res
|
res
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sync a block.
|
async fn sync_block_internal(
|
||||||
// TODO: Since we have a static validator set, we should only need the tail commit?
|
&self,
|
||||||
pub async fn sync_block(&mut self, block: Block<T>, commit: Vec<u8>) -> bool {
|
block: Block<T>,
|
||||||
let mut result = self.synced_block_result.write().await;
|
commit: Vec<u8>,
|
||||||
|
result: &mut UnboundedReceiver<bool>,
|
||||||
|
) -> bool {
|
||||||
let (tip, block_number) = {
|
let (tip, block_number) = {
|
||||||
let blockchain = self.network.blockchain.read().await;
|
let blockchain = self.network.blockchain.read().await;
|
||||||
(blockchain.tip(), blockchain.block_number())
|
(blockchain.tip(), blockchain.block_number())
|
||||||
|
@ -272,12 +273,22 @@ impl<D: Db, T: TransactionTrait, P: P2p> Tributary<D, T, P> {
|
||||||
}
|
}
|
||||||
|
|
||||||
let number = BlockNumber((block_number + 1).into());
|
let number = BlockNumber((block_number + 1).into());
|
||||||
self.synced_block.send(SyncedBlock { number, block, commit }).await.unwrap();
|
self.synced_block.write().await.send(SyncedBlock { number, block, commit }).await.unwrap();
|
||||||
result.next().await.unwrap()
|
result.next().await.unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Sync a block.
|
||||||
|
// TODO: Since we have a static validator set, we should only need the tail commit?
|
||||||
|
pub async fn sync_block(&self, block: Block<T>, commit: Vec<u8>) -> bool {
|
||||||
|
let mut result = self.synced_block_result.write().await;
|
||||||
|
self.sync_block_internal(block, commit, &mut result).await
|
||||||
|
}
|
||||||
|
|
||||||
// Return true if the message should be rebroadcasted.
|
// Return true if the message should be rebroadcasted.
|
||||||
pub async fn handle_message(&mut self, msg: &[u8]) -> bool {
|
pub async fn handle_message(&self, msg: &[u8]) -> bool {
|
||||||
|
// Acquire the lock now to prevent sync_block from being run at the same time
|
||||||
|
let mut sync_block = self.synced_block_result.write().await;
|
||||||
|
|
||||||
match msg.first() {
|
match msg.first() {
|
||||||
Some(&TRANSACTION_MESSAGE) => {
|
Some(&TRANSACTION_MESSAGE) => {
|
||||||
let Ok(tx) = Transaction::read::<&[u8]>(&mut &msg[1 ..]) else {
|
let Ok(tx) = Transaction::read::<&[u8]>(&mut &msg[1 ..]) else {
|
||||||
|
@ -316,7 +327,7 @@ impl<D: Db, T: TransactionTrait, P: P2p> Tributary<D, T, P> {
|
||||||
return false;
|
return false;
|
||||||
};
|
};
|
||||||
let commit = msg[(msg.len() - msg_ref.len()) ..].to_vec();
|
let commit = msg[(msg.len() - msg_ref.len()) ..].to_vec();
|
||||||
if self.sync_block(block, commit).await {
|
if self.sync_block_internal(block, commit, &mut sync_block).await {
|
||||||
log::debug!("synced block over p2p net instead of building the commit ourselves");
|
log::debug!("synced block over p2p net instead of building the commit ourselves");
|
||||||
}
|
}
|
||||||
false
|
false
|
||||||
|
|
|
@ -43,6 +43,6 @@ pub(crate) fn new_test() -> (Vec<(Handles, <Ristretto as Ciphersuite>::F)>, Dock
|
||||||
pub(crate) async fn wait_for_tributary() {
|
pub(crate) async fn wait_for_tributary() {
|
||||||
tokio::time::sleep(Duration::from_secs(20)).await;
|
tokio::time::sleep(Duration::from_secs(20)).await;
|
||||||
if std::env::var("GITHUB_CI") == Ok("true".to_string()) {
|
if std::env::var("GITHUB_CI") == Ok("true".to_string()) {
|
||||||
tokio::time::sleep(Duration::from_secs(60)).await;
|
tokio::time::sleep(Duration::from_secs(40)).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue