From 9175383e8966d981ef2112e75a7e2ea461ec0b16 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Tue, 9 May 2023 16:58:53 -0400 Subject: [PATCH] Spawn a new async task for each block message This probably should be done with n-long lived tasks, one per Tributary. While this may not be suitably performant long-term (potential DoS vector), this at least resolves the halting concerns. --- coordinator/src/main.rs | 38 ++++++++++++++++++++++++++------------ 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index b6ba622d..9a20ab16 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -306,18 +306,32 @@ pub async fn handle_p2p( // Get just the commit msg.msg.drain(.. (msg.msg.len() - msg_ref.len())); - let tributaries = tributaries.read().await; - let Some(tributary) = tributaries.get(&genesis) else { - log::debug!("received block message for unknown network"); - continue; - }; + // Spawn a dedicated task to add this block, as it may take a notable amount of time + // While we could use a long-lived task to add each block, that task would only add one + // block at a time *across all tributaries* + // We either need: + // 1) One task per tributary + // 2) Background tasks + // 3) For sync_block to return instead of waiting for provided transactions which are + // missing + // sync_block waiting is preferable since we know the block is valid by its commit, meaning + // we are the node behind + // A for 1/2, 1 may be preferable since this message may frequently occur + // We at least need to check if we take value from this message before running spawn + // TODO + tokio::spawn({ + let tributaries = tributaries.clone(); + async move { + let tributaries = tributaries.read().await; + let Some(tributary) = tributaries.get(&genesis) else { + log::debug!("received block message for unknown network"); + return; + }; - // TODO: We take a notable amount of time to add blocks when we're missing provided - // transactions - // Any tributary with missing provided transactions will cause this P2P loop to halt - // Make a separate queue for this - let res = tributary.tributary.write().await.sync_block(block, msg.msg).await; - log::debug!("received block from {:?}, sync_block returned {}", msg.sender, res); + let res = tributary.tributary.write().await.sync_block(block, msg.msg).await; + log::debug!("received block from {:?}, sync_block returned {}", msg.sender, res); + } + }); } } } @@ -558,7 +572,7 @@ pub async fn run( publish_transaction(&tributary, tx).await; } else { - log::warn!("recognized_id_recv was dropped. are we shutting down?"); + log::warn!("recognized_id_send was dropped. are we shutting down?"); break; } }