diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index bea2f984..6f8c6969 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -330,115 +330,103 @@ pub async fn handle_p2p( ) { loop { let mut msg = p2p.receive().await; - match msg.kind { - P2pMessageKind::KeepAlive => {} + // Spawn a dedicated task to handle this message, ensuring any singularly latent message + // doesn't hold everything up + // TODO2: Move to one task per tributary + tokio::spawn({ + let p2p = p2p.clone(); + let tributaries = tributaries.clone(); + async move { + match msg.kind { + P2pMessageKind::KeepAlive => {} - P2pMessageKind::Tributary(genesis) => { - let tributaries = tributaries.read().await; - let Some(tributary) = tributaries.get(&genesis) else { - log::debug!("received p2p message for unknown network"); - continue; - }; + P2pMessageKind::Tributary(genesis) => { + let tributaries = tributaries.read().await; + let Some(tributary) = tributaries.get(&genesis) else { + log::debug!("received p2p message for unknown network"); + return; + }; - log::trace!("handling message for tributary {:?}", tributary.spec.set()); - if tributary.tributary.read().await.handle_message(&msg.msg).await { - P2p::broadcast(&p2p, msg.kind, msg.msg).await; - } - } - - // TODO2: Rate limit this per timestamp - P2pMessageKind::Heartbeat(genesis) => { - if msg.msg.len() != 40 { - log::error!("validator sent invalid heartbeat"); - continue; - } - - let tributaries = tributaries.read().await; - let Some(tributary) = tributaries.get(&genesis) else { - log::debug!("received heartbeat message for unknown network"); - continue; - }; - let tributary_read = tributary.tributary.read().await; - - /* - // Have sqrt(n) nodes reply with the blocks - let mut responders = (tributary.spec.n() as f32).sqrt().floor() as u64; - // Try to have at least 3 responders - if responders < 3 { - responders = tributary.spec.n().min(3).into(); - } - */ - - // Have up to three nodes respond - let responders = u64::from(tributary.spec.n().min(3)); - - // Decide which nodes will respond by using the latest block's hash as a mutually agreed - // upon entropy source - // This isn't a secure source of entropy, yet it's fine for this - let entropy = u64::from_le_bytes(tributary_read.tip().await[.. 8].try_into().unwrap()); - // If n = 10, responders = 3, we want start to be 0 ..= 7 (so the highest is 7, 8, 9) - // entropy % (10 + 1) - 3 = entropy % 8 = 0 ..= 7 - let start = - usize::try_from(entropy % (u64::from(tributary.spec.n() + 1) - responders)).unwrap(); - let mut selected = false; - for validator in - &tributary.spec.validators()[start .. (start + usize::try_from(responders).unwrap())] - { - if our_key == validator.0 { - selected = true; - break; + log::trace!("handling message for tributary {:?}", tributary.spec.set()); + if tributary.tributary.read().await.handle_message(&msg.msg).await { + P2p::broadcast(&p2p, msg.kind, msg.msg).await; + } } - } - if !selected { - log::debug!("received heartbeat and not selected to respond"); - continue; - } - log::debug!("received heartbeat and selected to respond"); + // TODO2: Rate limit this per timestamp + P2pMessageKind::Heartbeat(genesis) => { + if msg.msg.len() != 40 { + log::error!("validator sent invalid heartbeat"); + return; + } - let reader = tributary_read.reader(); - drop(tributary_read); + let tributaries = tributaries.read().await; + let Some(tributary) = tributaries.get(&genesis) else { + log::debug!("received heartbeat message for unknown network"); + return; + }; + let tributary_read = tributary.tributary.read().await; - let mut latest = msg.msg[.. 32].try_into().unwrap(); - while let Some(next) = reader.block_after(&latest) { - let mut res = reader.block(&next).unwrap().serialize(); - res.extend(reader.commit(&next).unwrap()); - // Also include the timestamp used within the Heartbeat - res.extend(&msg.msg[32 .. 40]); - p2p.send(msg.sender, P2pMessageKind::Block(tributary.spec.genesis()), res).await; - latest = next; - } - } + /* + // Have sqrt(n) nodes reply with the blocks + let mut responders = (tributary.spec.n() as f32).sqrt().floor() as u64; + // Try to have at least 3 responders + if responders < 3 { + responders = tributary.spec.n().min(3).into(); + } + */ - P2pMessageKind::Block(genesis) => { - let mut msg_ref: &[u8] = msg.msg.as_ref(); - let Ok(block) = Block::::read(&mut msg_ref) else { - log::error!("received block message with an invalidly serialized block"); - continue; - }; - // Get just the commit - msg.msg.drain(.. (msg.msg.len() - msg_ref.len())); - msg.msg.drain((msg.msg.len() - 8) ..); + // Have up to three nodes respond + let responders = u64::from(tributary.spec.n().min(3)); + + // Decide which nodes will respond by using the latest block's hash as a mutually agreed + // upon entropy source + // This isn't a secure source of entropy, yet it's fine for this + let entropy = u64::from_le_bytes(tributary_read.tip().await[.. 8].try_into().unwrap()); + // If n = 10, responders = 3, we want start to be 0 ..= 7 (so the highest is 7, 8, 9) + // entropy % (10 + 1) - 3 = entropy % 8 = 0 ..= 7 + let start = + usize::try_from(entropy % (u64::from(tributary.spec.n() + 1) - responders)).unwrap(); + let mut selected = false; + for validator in + &tributary.spec.validators()[start .. (start + usize::try_from(responders).unwrap())] + { + if our_key == validator.0 { + selected = true; + break; + } + } + if !selected { + log::debug!("received heartbeat and not selected to respond"); + return; + } + + log::debug!("received heartbeat and selected to respond"); + + let reader = tributary_read.reader(); + drop(tributary_read); + + let mut latest = msg.msg[.. 32].try_into().unwrap(); + while let Some(next) = reader.block_after(&latest) { + let mut res = reader.block(&next).unwrap().serialize(); + res.extend(reader.commit(&next).unwrap()); + // Also include the timestamp used within the Heartbeat + res.extend(&msg.msg[32 .. 40]); + p2p.send(msg.sender, P2pMessageKind::Block(tributary.spec.genesis()), res).await; + latest = next; + } + } + + P2pMessageKind::Block(genesis) => { + let mut msg_ref: &[u8] = msg.msg.as_ref(); + let Ok(block) = Block::::read(&mut msg_ref) else { + log::error!("received block message with an invalidly serialized block"); + return; + }; + // Get just the commit + msg.msg.drain(.. (msg.msg.len() - msg_ref.len())); + msg.msg.drain((msg.msg.len() - 8) ..); - // Spawn a dedicated task to add this block, as it may take a notable amount of time - // While we could use a long-lived task to add each block, that task would only add one - // block at a time *across all tributaries* - // We either need: - // 1) One task per tributary - // 2) Background tasks - // 3) For sync_block to return instead of waiting for provided transactions which are - // missing - // sync_block waiting is preferable since we know the block is valid by its commit, meaning - // we are the node behind - // As for 1/2, 1 may be preferable since this message may frequently occur - // This is suitably performant, as tokio HTTP servers will even spawn a new task per - // connection - // In order to reduce congestion though, we should at least check if we take value from - // this message before running spawn - // TODO2 - tokio::spawn({ - let tributaries = tributaries.clone(); - async move { let tributaries = tributaries.read().await; let Some(tributary) = tributaries.get(&genesis) else { log::debug!("received block message for unknown network"); @@ -448,9 +436,9 @@ pub async fn handle_p2p( let res = tributary.tributary.read().await.sync_block(block, msg.msg).await; log::debug!("received block from {:?}, sync_block returned {}", msg.sender, res); } - }); + } } - } + }); } } diff --git a/coordinator/tributary/src/tendermint/mod.rs b/coordinator/tributary/src/tendermint/mod.rs index ae3966e4..c078826d 100644 --- a/coordinator/tributary/src/tendermint/mod.rs +++ b/coordinator/tributary/src/tendermint/mod.rs @@ -399,6 +399,8 @@ impl Network for TendermintNetwork hex::encode(hash), hex::encode(self.genesis) ); + // TODO: Use a notification system for when we have a new provided, in order to minimize + // latency sleep(Duration::from_secs(Self::block_time().into())).await; } _ => return invalid_block(), diff --git a/coordinator/tributary/tendermint/src/lib.rs b/coordinator/tributary/tendermint/src/lib.rs index be9d5e5d..e45dc4f5 100644 --- a/coordinator/tributary/tendermint/src/lib.rs +++ b/coordinator/tributary/tendermint/src/lib.rs @@ -189,8 +189,7 @@ impl TendermintMachine { // Push it on to the queue. This is done so we only handle one message at a time, and so we // can handle our own message before broadcasting it. That way, we fail before before // becoming malicious - // push_front to prioritize our own messages - self.queue.push_front(msg); + self.queue.push_back(msg); } } @@ -220,6 +219,12 @@ impl TendermintMachine { // Sleep until this round ends let round_end = self.block.end_time[&end_round]; let time_until_round_end = round_end.instant().saturating_duration_since(Instant::now()); + if time_until_round_end == Duration::ZERO { + log::trace!( + "resetting when prior round ended {}ms ago", + Instant::now().saturating_duration_since(round_end.instant()).as_millis(), + ); + } log::trace!("sleeping until round ends in {}ms", time_until_round_end.as_millis()); sleep(time_until_round_end).await; @@ -575,6 +580,13 @@ impl TendermintMachine { Err(TendermintError::Temporal)?; } + if (msg.block == self.block.number) && + (msg.round == self.block.round().number) && + (msg.data.step() == Step::Propose) + { + log::trace!("received Propose for block {}, round {}", msg.block.0, msg.round.0); + } + // If this is a precommit, verify its signature self.verify_precommit_signature(signed)?;