diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index b21b9a19..05cb3add 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -383,7 +383,6 @@ pub async fn handle_p2p( // TODO2: Rate limit this per timestamp // And/or slash on Heartbeat which justifies a response, since the node obviously // was offline and we must now use our bandwidth to compensate for them? - // TODO: Dedicated task for heartbeats P2pMessageKind::Heartbeat(msg_genesis) => { assert_eq!(msg_genesis, genesis); if msg.msg.len() != 40 { @@ -391,60 +390,59 @@ pub async fn handle_p2p( continue; } - let tributary_read = &tributary.tributary; - - /* - // Have sqrt(n) nodes reply with the blocks - let mut responders = (tributary.spec.n() as f32).sqrt().floor() as u64; - // Try to have at least 3 responders - if responders < 3 { - responders = tributary.spec.n().min(3).into(); - } - */ - - // Have up to three nodes respond - let responders = u64::from(tributary.spec.n().min(3)); - - // Decide which nodes will respond by using the latest block's hash as a mutually - // agreed upon entropy source - // This isn't a secure source of entropy, yet it's fine for this - let entropy = - u64::from_le_bytes(tributary_read.tip().await[.. 8].try_into().unwrap()); - // If n = 10, responders = 3, we want `start` to be 0 ..= 7 - // (so the highest is 7, 8, 9) - // entropy % (10 + 1) - 3 = entropy % 8 = 0 ..= 7 - let start = - usize::try_from(entropy % (u64::from(tributary.spec.n() + 1) - responders)) - .unwrap(); - let mut selected = false; - for validator in &tributary.spec.validators() - [start .. (start + usize::try_from(responders).unwrap())] - { - if our_key == validator.0 { - selected = true; - break; + let p2p = p2p.clone(); + let spec = tributary.spec.clone(); + let reader = tributary.tributary.reader(); + // Spawn a dedicated task as this may require loading large amounts of data from + // disk and take a notable amount of time + tokio::spawn(async move { + /* + // Have sqrt(n) nodes reply with the blocks + let mut responders = (tributary.spec.n() as f32).sqrt().floor() as u64; + // Try to have at least 3 responders + if responders < 3 { + responders = tributary.spec.n().min(3).into(); } - } - if !selected { - log::debug!("received heartbeat and not selected to respond"); - continue; - } + */ - log::debug!("received heartbeat and selected to respond"); + // Have up to three nodes respond + let responders = u64::from(spec.n().min(3)); - let reader = tributary_read.reader(); + // Decide which nodes will respond by using the latest block's hash as a + // mutually agreed upon entropy source + // This isn't a secure source of entropy, yet it's fine for this + let entropy = u64::from_le_bytes(reader.tip()[.. 8].try_into().unwrap()); + // If n = 10, responders = 3, we want `start` to be 0 ..= 7 + // (so the highest is 7, 8, 9) + // entropy % (10 + 1) - 3 = entropy % 8 = 0 ..= 7 + let start = + usize::try_from(entropy % (u64::from(spec.n() + 1) - responders)).unwrap(); + let mut selected = false; + for validator in + &spec.validators()[start .. (start + usize::try_from(responders).unwrap())] + { + if our_key == validator.0 { + selected = true; + break; + } + } + if !selected { + log::debug!("received heartbeat and not selected to respond"); + return; + } - let mut latest = msg.msg[.. 32].try_into().unwrap(); - while let Some(next) = reader.block_after(&latest) { - let mut res = reader.block(&next).unwrap().serialize(); - res.extend(reader.commit(&next).unwrap()); - // Also include the timestamp used within the Heartbeat - res.extend(&msg.msg[32 .. 40]); - p2p - .send(msg.sender, P2pMessageKind::Block(tributary.spec.genesis()), res) - .await; - latest = next; - } + log::debug!("received heartbeat and selected to respond"); + + let mut latest = msg.msg[.. 32].try_into().unwrap(); + while let Some(next) = reader.block_after(&latest) { + let mut res = reader.block(&next).unwrap().serialize(); + res.extend(reader.commit(&next).unwrap()); + // Also include the timestamp used within the Heartbeat + res.extend(&msg.msg[32 .. 40]); + p2p.send(msg.sender, P2pMessageKind::Block(spec.genesis()), res).await; + latest = next; + } + }); } P2pMessageKind::Block(msg_genesis) => {