From 96cc5d01573979aa73044fc713afe57a35ca8beb Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Sat, 14 Oct 2023 00:41:07 -0400 Subject: [PATCH] Remove a TODO re: an unhandled race condition --- coordinator/src/main.rs | 38 ++++++++++++++++++---------- coordinator/src/tributary/scanner.rs | 2 +- 2 files changed, 25 insertions(+), 15 deletions(-) diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 91971b7b..4c58a75d 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -62,6 +62,7 @@ pub struct ActiveTributary { } // Creates a new tributary and sends it to all listeners. +// TODO: retire_tributary async fn add_tributary( db: D, key: Zeroizing<::F>, @@ -139,11 +140,10 @@ async fn publish_signed_transaction( .await .expect("we don't have a nonce, meaning we aren't a participant on this tributary"), ) { - // We should've created a valid transaction - // This does assume publish_signed_transaction hasn't been called twice with the same - // transaction, which risks a race condition on the validity of this assert - // Our use case only calls this function sequentially - assert!(tributary.add_transaction(tx).await, "created an invalid transaction"); + // TODO: Assert if we didn't create a valid transaction + // We need to return a proper error here to enable that, due to a race condition around + // multiple publications + tributary.add_transaction(tx).await; } } @@ -261,7 +261,6 @@ async fn handle_processor_message( batch.batch.network, msg.network, "processor sent us a signed batch for a different network than it was for", ); - // TODO: Check this key's key pair's substrate key is authorized to publish batches log::debug!("received batch {:?} {}", batch.batch.network, batch.batch.id); @@ -817,14 +816,25 @@ pub async fn run( tx.sign(&mut OsRng, genesis, &key, nonce); - let tributaries = tributaries.read().await; - let Some(tributary) = tributaries.get(&genesis) else { - // TODO: This may happen if the task above is simply slow - panic!("tributary we don't have came to consensus on an Batch"); - }; - let mut txn = raw_db.txn(); - publish_signed_transaction(&mut txn, tributary, tx).await; - txn.commit(); + let mut first = true; + loop { + if !first { + sleep(Duration::from_millis(100)).await; + } + first = false; + + let tributaries = tributaries.read().await; + let Some(tributary) = tributaries.get(&genesis) else { + // This may happen if the task above is simply slow + log::warn!("tributary we don't have yet came to consensus on an Batch"); + continue; + }; + // This is safe to perform multiple times and solely needs atomicity within itself + let mut txn = raw_db.txn(); + publish_signed_transaction(&mut txn, tributary, tx).await; + txn.commit(); + break + } } } }; diff --git a/coordinator/src/tributary/scanner.rs b/coordinator/src/tributary/scanner.rs index bd0268a0..bbe72dd9 100644 --- a/coordinator/src/tributary/scanner.rs +++ b/coordinator/src/tributary/scanner.rs @@ -110,7 +110,7 @@ async fn handle_block< event_id += 1; } - // TODO2: Trigger any necessary re-attempts + // TODO: Trigger any necessary re-attempts } pub(crate) async fn handle_new_blocks<