Handle batch n+1 being signed before batch n is

This commit is contained in:
Luke Parker 2023-08-31 22:09:29 -04:00
parent 9bf24480f4
commit 2dc35193c9
No known key found for this signature in database
2 changed files with 78 additions and 31 deletions

View file

@ -1,5 +1,8 @@
use scale::Encode; use scale::{Encode, Decode};
use serai_client::primitives::{NetworkId, BlockHash}; use serai_client::{
primitives::{NetworkId, BlockHash},
in_instructions::primitives::SignedBatch,
};
pub use serai_db::*; pub use serai_db::*;
@ -95,4 +98,19 @@ impl<'a, D: Db> MainDb<'a, D> {
pub fn first_preprocess<G: Get>(getter: &G, id: [u8; 32]) -> Option<Vec<u8>> { pub fn first_preprocess<G: Get>(getter: &G, id: [u8; 32]) -> Option<Vec<u8>> {
getter.get(Self::first_preprocess_key(id)) getter.get(Self::first_preprocess_key(id))
} }
fn batch_key(network: NetworkId, id: u32) -> Vec<u8> {
Self::main_key(b"batch", (network, id).encode())
}
pub fn save_batch(&mut self, batch: SignedBatch) {
let mut txn = self.0.txn();
txn.put(Self::batch_key(batch.batch.network, batch.batch.id), batch.encode());
txn.commit();
}
pub fn batch(&self, network: NetworkId, id: u32) -> Option<SignedBatch> {
self
.0
.get(Self::batch_key(network, id))
.map(|batch| SignedBatch::decode(&mut batch.as_ref()).unwrap())
}
} }

View file

@ -332,7 +332,7 @@ pub async fn handle_p2p<D: Db, P: P2p>(
let mut msg = p2p.receive().await; let mut msg = p2p.receive().await;
// Spawn a dedicated task to handle this message, ensuring any singularly latent message // Spawn a dedicated task to handle this message, ensuring any singularly latent message
// doesn't hold everything up // doesn't hold everything up
// TODO2: Move to one task per tributary // TODO2: Move to one task per tributary (or two. One for Tendermint, one for Tributary)
tokio::spawn({ tokio::spawn({
let p2p = p2p.clone(); let p2p = p2p.clone();
let tributaries = tributaries.clone(); let tributaries = tributaries.clone();
@ -474,6 +474,8 @@ pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>(
let pub_key = Ristretto::generator() * key.deref(); let pub_key = Ristretto::generator() * key.deref();
loop { loop {
// TODO: Dispatch this message to a task dedicated to handling this processor, preventing one
// processor from holding up all the others
let msg = processors.recv().await; let msg = processors.recv().await;
// TODO2: This is slow, and only works as long as a network only has a single Tributary // TODO2: This is slow, and only works as long as a network only has a single Tributary
@ -626,41 +628,68 @@ pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>(
"processor sent us a batch for a different network than it was for", "processor sent us a batch for a different network than it was for",
); );
// TODO: Check this key's key pair's substrate key is authorized to publish batches // TODO: Check this key's key pair's substrate key is authorized to publish batches
// TODO: Handle the fact batch n+1 can be signed before batch n
let tx = Serai::execute_batch(batch.clone()); // Save this batch to the disk
MainDb::new(&mut db).save_batch(batch);
/*
Use a dedicated task to publish batches due to the latency potentially incurred.
This does not guarantee the batch has actually been published when the message is
`ack`ed to message-queue. Accordingly, if we reboot, these batches would be dropped
(as we wouldn't see the `Update` again, triggering our re-attempt to publish).
The solution to this is to have the task try not to publish the batch which caused it
to be spawned, yet all saved batches which have yet to published. This does risk having
multiple tasks trying to publish all pending batches, yet these aren't notably complex.
*/
tokio::spawn({
let mut db = db.clone();
let serai = serai.clone();
let network = msg.network;
async move {
// Since we have a new batch, publish all batches yet to be published to Serai
// This handles the edge-case where batch n+1 is signed before batch n is
while let Some(batch) = {
// Get the next-to-execute batch ID
let next = {
let mut first = true;
loop { loop {
match serai.publish(&tx).await { if !first {
Ok(_) => { log::error!(
log::info!( "couldn't connect to Serai node to get the next batch ID for {network:?}",
"executed batch {:?} {} (block {})",
batch.batch.network,
batch.batch.id,
hex::encode(batch.batch.block),
); );
break; tokio::time::sleep(Duration::from_secs(5)).await;
} }
Err(e) => { first = false;
if let Ok(latest_block) = serai.get_latest_block().await {
if let Ok(Some(last)) = let Ok(latest_block) = serai.get_latest_block().await else { continue };
serai.get_last_batch_for_network(latest_block.hash(), batch.batch.network).await let Ok(last) =
{ serai.get_last_batch_for_network(latest_block.hash(), network).await
if last >= batch.batch.id { else {
log::info!( continue;
"another coordinator executed batch {:?} {} (block {})", };
batch.batch.network, break if let Some(last) = last { last + 1 } else { 0 };
batch.batch.id, }
hex::encode(batch.batch.block), };
);
break; // If we have this batch, attempt to publish it
} MainDb::new(&mut db).batch(network, next)
} } {
} let id = batch.batch.id;
log::error!("couldn't connect to Serai node to publish batch TX: {:?}", e); let block = batch.batch.block;
tokio::time::sleep(Duration::from_secs(10)).await;
let tx = Serai::execute_batch(batch);
// This publish may fail if this transactions already exists in the mempool, which
// is possible, or if this batch was already executed on-chain
// Either case will have eventual resolution and be handled by the above check on
// if this block should execute
if serai.publish(&tx).await.is_ok() {
log::info!("published batch {network:?} {id} (block {})", hex::encode(block));
} }
} }
} }
});
None None
} }