From 6b4df4f2c0bf9d5e90262906fc813c1b7c505350 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Wed, 17 Apr 2024 21:54:10 -0400 Subject: [PATCH] Only have some nodes respond to latent heartbeats Also only respond if they're more than 2 blocks behind to minimize redundant sending of blocks. --- coordinator/src/main.rs | 1 + coordinator/src/p2p.rs | 42 ++++++++++--------- coordinator/src/tests/tributary/handle_p2p.rs | 11 ++++- coordinator/src/tests/tributary/sync.rs | 11 +++-- 4 files changed, 40 insertions(+), 25 deletions(-) diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 4de23ae0..e6e49c3e 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -1292,6 +1292,7 @@ pub async fn run( p2p.clone(), cosign_channel.clone(), tributary_event_listener_4, + ::generator() * key.deref(), )); // Handle all messages from processors diff --git a/coordinator/src/p2p.rs b/coordinator/src/p2p.rs index 19bf299d..627af966 100644 --- a/coordinator/src/p2p.rs +++ b/coordinator/src/p2p.rs @@ -9,6 +9,8 @@ use std::{ use async_trait::async_trait; use rand_core::{RngCore, OsRng}; +use ciphersuite::{Ciphersuite, Ristretto}; + use scale::Encode; use borsh::{BorshSerialize, BorshDeserialize}; use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorSet, Serai}; @@ -612,6 +614,7 @@ pub async fn handle_p2p_task( p2p: P, cosign_channel: mpsc::UnboundedSender, mut tributary_event: broadcast::Receiver>, + our_key: ::G, ) { let channels = Arc::new(RwLock::new(HashMap::<_, mpsc::UnboundedSender>>::new())); tokio::spawn({ @@ -631,6 +634,8 @@ pub async fn handle_p2p_task( // Subscribe to the topic for this tributary p2p.subscribe(tributary.spec.set(), genesis).await; + let spec_set = tributary.spec.set(); + // Per-Tributary P2P message handler tokio::spawn({ let p2p = p2p.clone(); @@ -645,7 +650,7 @@ pub async fn handle_p2p_task( P2pMessageKind::Tributary(msg_genesis) => { assert_eq!(msg_genesis, genesis); - log::trace!("handling message for tributary {:?}", tributary.spec.set()); + log::trace!("handling message for tributary {:?}", spec_set); if tributary.tributary.handle_message(&msg.msg).await { P2p::broadcast(&p2p, msg.kind, msg.msg).await; } @@ -668,18 +673,13 @@ pub async fn handle_p2p_task( // Spawn a dedicated task as this may require loading large amounts of data // from disk and take a notable amount of time tokio::spawn(async move { - /* // Have sqrt(n) nodes reply with the blocks - let mut responders = (tributary.spec.n() as f32).sqrt().floor() as u64; + #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)] + let mut responders = f32::from(spec.n(&[])).sqrt().floor() as u64; // Try to have at least 3 responders if responders < 3 { - responders = tributary.spec.n().min(3).into(); + responders = spec.n(&[]).min(3).into(); } - */ - - /* - // Have up to three nodes respond - let responders = u64::from(spec.n().min(3)); // Decide which nodes will respond by using the latest block's hash as a // mutually agreed upon entropy source @@ -689,7 +689,7 @@ pub async fn handle_p2p_task( // (so the highest is 7, 8, 9) // entropy % (10 + 1) - 3 = entropy % 8 = 0 ..= 7 let start = - usize::try_from(entropy % (u64::from(spec.n() + 1) - responders)) + usize::try_from(entropy % (u64::from(spec.n(&[]) + 1) - responders)) .unwrap(); let mut selected = false; for validator in &spec.validators() @@ -706,22 +706,24 @@ pub async fn handle_p2p_task( } log::debug!("received heartbeat and selected to respond"); - */ - // Have every node respond - // While we could only have a subset respond, LibP2P will sync all messages - // it isn't aware of - // It's cheaper to be aware from our disk than from over the network + // Have the selected nodes respond // TODO: Spawn a dedicated topic for this heartbeat response? let mut latest = msg.msg[.. 32].try_into().unwrap(); + let mut to_send = vec![]; while let Some(next) = reader.block_after(&latest) { - let mut res = reader.block(&next).unwrap().serialize(); - res.extend(reader.commit(&next).unwrap()); - // Also include the timestamp used within the Heartbeat - res.extend(&msg.msg[32 .. 40]); - p2p.send(msg.sender, P2pMessageKind::Block(spec.genesis()), res).await; + to_send.push(next); latest = next; } + if to_send.len() > 1 { + for next in to_send { + let mut res = reader.block(&next).unwrap().serialize(); + res.extend(reader.commit(&next).unwrap()); + // Also include the timestamp used within the Heartbeat + res.extend(&msg.msg[32 .. 40]); + p2p.send(msg.sender, P2pMessageKind::Block(spec.genesis()), res).await; + } + } }); } diff --git a/coordinator/src/tests/tributary/handle_p2p.rs b/coordinator/src/tests/tributary/handle_p2p.rs index 756f4561..daa0cf02 100644 --- a/coordinator/src/tests/tributary/handle_p2p.rs +++ b/coordinator/src/tests/tributary/handle_p2p.rs @@ -3,6 +3,8 @@ use std::sync::Arc; use rand_core::OsRng; +use ciphersuite::{Ciphersuite, Ristretto}; + use tokio::{ sync::{mpsc, broadcast}, time::sleep, @@ -35,12 +37,17 @@ async fn handle_p2p_test() { let mut tributary_senders = vec![]; let mut tributary_arcs = vec![]; - for (p2p, tributary) in tributaries.drain(..) { + for (i, (p2p, tributary)) in tributaries.drain(..).enumerate() { let tributary = Arc::new(tributary); tributary_arcs.push(tributary.clone()); let (new_tributary_send, new_tributary_recv) = broadcast::channel(5); let (cosign_send, _) = mpsc::unbounded_channel(); - tokio::spawn(handle_p2p_task(p2p, cosign_send, new_tributary_recv)); + tokio::spawn(handle_p2p_task( + p2p, + cosign_send, + new_tributary_recv, + ::generator() * *keys[i], + )); new_tributary_send .send(TributaryEvent::NewTributary(ActiveTributary { spec: spec.clone(), tributary })) .map_err(|_| "failed to send ActiveTributary") diff --git a/coordinator/src/tests/tributary/sync.rs b/coordinator/src/tests/tributary/sync.rs index 0a468c63..1af08fa8 100644 --- a/coordinator/src/tests/tributary/sync.rs +++ b/coordinator/src/tests/tributary/sync.rs @@ -45,12 +45,17 @@ async fn sync_test() { let mut tributary_senders = vec![]; let mut tributary_arcs = vec![]; let mut p2p_threads = vec![]; - for (p2p, tributary) in tributaries.drain(..) { + for (i, (p2p, tributary)) in tributaries.drain(..).enumerate() { let tributary = Arc::new(tributary); tributary_arcs.push(tributary.clone()); let (new_tributary_send, new_tributary_recv) = broadcast::channel(5); let (cosign_send, _) = mpsc::unbounded_channel(); - let thread = tokio::spawn(handle_p2p_task(p2p, cosign_send, new_tributary_recv)); + let thread = tokio::spawn(handle_p2p_task( + p2p, + cosign_send, + new_tributary_recv, + ::generator() * *keys[i], + )); new_tributary_send .send(TributaryEvent::NewTributary(ActiveTributary { spec: spec.clone(), tributary })) .map_err(|_| "failed to send ActiveTributary") @@ -86,7 +91,7 @@ async fn sync_test() { let syncer_tributary = Arc::new(syncer_tributary); let (syncer_tributary_send, syncer_tributary_recv) = broadcast::channel(5); let (cosign_send, _) = mpsc::unbounded_channel(); - tokio::spawn(handle_p2p_task(syncer_p2p.clone(), cosign_send, syncer_tributary_recv)); + tokio::spawn(handle_p2p_task(syncer_p2p.clone(), cosign_send, syncer_tributary_recv, syncer_key)); syncer_tributary_send .send(TributaryEvent::NewTributary(ActiveTributary { spec: spec.clone(),