Remove a TODO re: an unhandled race condition

This commit is contained in:
Luke Parker 2023-10-14 00:41:07 -04:00
parent 7275a95907
commit 96cc5d0157
No known key found for this signature in database
2 changed files with 25 additions and 15 deletions

View file

@ -62,6 +62,7 @@ pub struct ActiveTributary<D: Db, P: P2p> {
}
// Creates a new tributary and sends it to all listeners.
// TODO: retire_tributary
async fn add_tributary<D: Db, Pro: Processors, P: P2p>(
db: D,
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
@ -139,11 +140,10 @@ async fn publish_signed_transaction<D: Db, P: P2p>(
.await
.expect("we don't have a nonce, meaning we aren't a participant on this tributary"),
) {
// We should've created a valid transaction
// This does assume publish_signed_transaction hasn't been called twice with the same
// transaction, which risks a race condition on the validity of this assert
// Our use case only calls this function sequentially
assert!(tributary.add_transaction(tx).await, "created an invalid transaction");
// TODO: Assert if we didn't create a valid transaction
// We need to return a proper error here to enable that, due to a race condition around
// multiple publications
tributary.add_transaction(tx).await;
}
}
@ -261,7 +261,6 @@ async fn handle_processor_message<D: Db, P: P2p>(
batch.batch.network, msg.network,
"processor sent us a signed batch for a different network than it was for",
);
// TODO: Check this key's key pair's substrate key is authorized to publish batches
log::debug!("received batch {:?} {}", batch.batch.network, batch.batch.id);
@ -817,14 +816,25 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
tx.sign(&mut OsRng, genesis, &key, nonce);
let tributaries = tributaries.read().await;
let Some(tributary) = tributaries.get(&genesis) else {
// TODO: This may happen if the task above is simply slow
panic!("tributary we don't have came to consensus on an Batch");
};
let mut txn = raw_db.txn();
publish_signed_transaction(&mut txn, tributary, tx).await;
txn.commit();
let mut first = true;
loop {
if !first {
sleep(Duration::from_millis(100)).await;
}
first = false;
let tributaries = tributaries.read().await;
let Some(tributary) = tributaries.get(&genesis) else {
// This may happen if the task above is simply slow
log::warn!("tributary we don't have yet came to consensus on an Batch");
continue;
};
// This is safe to perform multiple times and solely needs atomicity within itself
let mut txn = raw_db.txn();
publish_signed_transaction(&mut txn, tributary, tx).await;
txn.commit();
break
}
}
}
};

View file

@ -110,7 +110,7 @@ async fn handle_block<
event_id += 1;
}
// TODO2: Trigger any necessary re-attempts
// TODO: Trigger any necessary re-attempts
}
pub(crate) async fn handle_new_blocks<