mirror of
https://github.com/serai-dex/serai.git
synced 2025-01-25 03:55:58 +00:00
Test handle_p2p and Tributary syncing
Includes bug fixes.
This commit is contained in:
parent
cc491ee1e1
commit
2feebe536e
5 changed files with 224 additions and 14 deletions
|
@ -54,8 +54,8 @@ async fn create_new_tributary<D: Db>(db: D, spec: TributarySpec) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct ActiveTributary<D: Db, P: P2p> {
|
pub struct ActiveTributary<D: Db, P: P2p> {
|
||||||
spec: TributarySpec,
|
pub spec: TributarySpec,
|
||||||
tributary: Arc<RwLock<Tributary<D, Transaction, P>>>,
|
pub tributary: Arc<RwLock<Tributary<D, Transaction, P>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Adds a tributary into the specified HahMap
|
// Adds a tributary into the specified HahMap
|
||||||
|
@ -145,8 +145,9 @@ pub async fn scan_tributaries<D: Db, Pro: Processor, P: P2p>(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Instead of holding this lock long term, should this take in Arc RwLock and
|
// TODO: Make a TributaryReader which only requires a DB handle and safely doesn't require
|
||||||
// re-acquire read locks?
|
// locks
|
||||||
|
// Use that here
|
||||||
for ActiveTributary { spec, tributary } in tributaries.read().await.values() {
|
for ActiveTributary { spec, tributary } in tributaries.read().await.values() {
|
||||||
tributary::scanner::handle_new_blocks::<_, _, P>(
|
tributary::scanner::handle_new_blocks::<_, _, P>(
|
||||||
&mut tributary_db,
|
&mut tributary_db,
|
||||||
|
@ -170,7 +171,7 @@ pub async fn heartbeat_tributaries<D: Db, P: P2p>(
|
||||||
tributaries: Arc<RwLock<HashMap<[u8; 32], ActiveTributary<D, P>>>>,
|
tributaries: Arc<RwLock<HashMap<[u8; 32], ActiveTributary<D, P>>>>,
|
||||||
) {
|
) {
|
||||||
let ten_blocks_of_time =
|
let ten_blocks_of_time =
|
||||||
Duration::from_secs((Tributary::<D, Transaction, P>::block_time() * 10).into());
|
Duration::from_secs((10 * Tributary::<D, Transaction, P>::block_time()).into());
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
for ActiveTributary { spec: _, tributary } in tributaries.read().await.values() {
|
for ActiveTributary { spec: _, tributary } in tributaries.read().await.values() {
|
||||||
|
@ -207,18 +208,16 @@ pub async fn handle_p2p<D: Db, P: P2p>(
|
||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
|
|
||||||
// This is misleading being read, as it will mutate the Tributary, yet there's
|
if tributary.tributary.write().await.handle_message(&msg.msg).await {
|
||||||
// greater efficiency when it is read
|
|
||||||
// The safety of it is also justified by Tributary::handle_message's documentation
|
|
||||||
if tributary.tributary.read().await.handle_message(&msg.msg).await {
|
|
||||||
P2p::broadcast(&p2p, msg.kind, msg.msg).await;
|
P2p::broadcast(&p2p, msg.kind, msg.msg).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: Rate limit this
|
||||||
P2pMessageKind::Heartbeat(genesis) => {
|
P2pMessageKind::Heartbeat(genesis) => {
|
||||||
let tributaries_read = tributaries.read().await;
|
let tributaries_read = tributaries.read().await;
|
||||||
let Some(tributary) = tributaries_read.get(&genesis) else {
|
let Some(tributary) = tributaries_read.get(&genesis) else {
|
||||||
log::debug!("received hearttbeat message for unknown network");
|
log::debug!("received heartbeat message for unknown network");
|
||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -229,14 +228,21 @@ pub async fn handle_p2p<D: Db, P: P2p>(
|
||||||
|
|
||||||
let tributary_read = tributary.tributary.read().await;
|
let tributary_read = tributary.tributary.read().await;
|
||||||
|
|
||||||
|
/*
|
||||||
// Have sqrt(n) nodes reply with the blocks
|
// Have sqrt(n) nodes reply with the blocks
|
||||||
let mut responders = (tributary.spec.n() as f32).sqrt().floor() as u64;
|
let mut responders = (tributary.spec.n() as f32).sqrt().floor() as u64;
|
||||||
// Try to have at least 3 responders
|
// Try to have at least 3 responders
|
||||||
if responders < 3 {
|
if responders < 3 {
|
||||||
responders = tributary.spec.n().min(3).into();
|
responders = tributary.spec.n().min(3).into();
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
// Only respond to this if randomly chosen
|
// Have up to three nodes respond
|
||||||
|
let responders = u64::from(tributary.spec.n().min(3));
|
||||||
|
|
||||||
|
// Decide which nodes will respond by using the latest block's hash as a mutually agreed
|
||||||
|
// upon entropy source
|
||||||
|
// THis isn't a secure source of entropy, yet it's fine for this
|
||||||
let entropy = u64::from_le_bytes(tributary_read.tip().await[.. 8].try_into().unwrap());
|
let entropy = u64::from_le_bytes(tributary_read.tip().await[.. 8].try_into().unwrap());
|
||||||
// If n = 10, responders = 3, we want start to be 0 ..= 7 (so the highest is 7, 8, 9)
|
// If n = 10, responders = 3, we want start to be 0 ..= 7 (so the highest is 7, 8, 9)
|
||||||
// entropy % (10 + 1) - 3 = entropy % 8 = 0 ..= 7
|
// entropy % (10 + 1) - 3 = entropy % 8 = 0 ..= 7
|
||||||
|
@ -252,9 +258,12 @@ pub async fn handle_p2p<D: Db, P: P2p>(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !selected {
|
if !selected {
|
||||||
|
log::debug!("received heartbeat and not selected to respond");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log::debug!("received heartbeat and selected to respond");
|
||||||
|
|
||||||
let mut latest = msg.msg.try_into().unwrap();
|
let mut latest = msg.msg.try_into().unwrap();
|
||||||
// TODO: All of these calls don't *actually* need a read lock, just access to a DB handle
|
// TODO: All of these calls don't *actually* need a read lock, just access to a DB handle
|
||||||
// We can reduce lock contention accordingly
|
// We can reduce lock contention accordingly
|
||||||
|
@ -273,7 +282,7 @@ pub async fn handle_p2p<D: Db, P: P2p>(
|
||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
// Get just the commit
|
// Get just the commit
|
||||||
msg.msg.drain((msg.msg.len() - msg_ref.len()) ..);
|
msg.msg.drain(.. (msg.msg.len() - msg_ref.len()));
|
||||||
|
|
||||||
let tributaries = tributaries.read().await;
|
let tributaries = tributaries.read().await;
|
||||||
let Some(tributary) = tributaries.get(&genesis) else {
|
let Some(tributary) = tributaries.get(&genesis) else {
|
||||||
|
@ -281,7 +290,12 @@ pub async fn handle_p2p<D: Db, P: P2p>(
|
||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
|
|
||||||
tributary.tributary.write().await.sync_block(block, msg.msg).await;
|
// TODO: We take a notable amount of time to add blocks when we're missing provided
|
||||||
|
// transactions
|
||||||
|
// Any tributary with missing provided transactions will cause this P2P loop to halt
|
||||||
|
// Make a separate queue for this
|
||||||
|
let res = tributary.tributary.write().await.sync_block(block, msg.msg).await;
|
||||||
|
log::debug!("received block from {:?}, sync_block returned {}", msg.sender, res);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -106,7 +106,7 @@ pub trait P2p: Send + Sync + Clone + Debug + TributaryP2p {
|
||||||
// TODO: Move this to tests
|
// TODO: Move this to tests
|
||||||
#[allow(clippy::type_complexity)]
|
#[allow(clippy::type_complexity)]
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct LocalP2p(usize, Arc<RwLock<Vec<VecDeque<(usize, Vec<u8>)>>>>);
|
pub struct LocalP2p(usize, pub Arc<RwLock<Vec<VecDeque<(usize, Vec<u8>)>>>>);
|
||||||
|
|
||||||
impl LocalP2p {
|
impl LocalP2p {
|
||||||
pub fn new(validators: usize) -> Vec<LocalP2p> {
|
pub fn new(validators: usize) -> Vec<LocalP2p> {
|
||||||
|
|
64
coordinator/src/tests/tributary/handle_p2p.rs
Normal file
64
coordinator/src/tests/tributary/handle_p2p.rs
Normal file
|
@ -0,0 +1,64 @@
|
||||||
|
use core::time::Duration;
|
||||||
|
use std::{sync::Arc, collections::HashMap};
|
||||||
|
|
||||||
|
use rand_core::OsRng;
|
||||||
|
|
||||||
|
use ciphersuite::{Ciphersuite, Ristretto};
|
||||||
|
|
||||||
|
use tokio::{sync::RwLock, time::sleep};
|
||||||
|
|
||||||
|
use serai_db::MemDb;
|
||||||
|
|
||||||
|
use tributary::Tributary;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
tributary::Transaction,
|
||||||
|
LocalP2p, ActiveTributary, handle_p2p,
|
||||||
|
tests::tributary::{new_keys, new_spec, new_tributaries},
|
||||||
|
};
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn handle_p2p_test() {
|
||||||
|
let keys = new_keys(&mut OsRng);
|
||||||
|
let spec = new_spec(&mut OsRng, &keys);
|
||||||
|
|
||||||
|
let mut tributaries = new_tributaries(&keys, &spec).await;
|
||||||
|
|
||||||
|
let mut tributary_arcs = vec![];
|
||||||
|
for (i, (p2p, tributary)) in tributaries.drain(..).enumerate() {
|
||||||
|
let tributary = Arc::new(RwLock::new(tributary));
|
||||||
|
tributary_arcs.push(tributary.clone());
|
||||||
|
tokio::spawn(handle_p2p(
|
||||||
|
Ristretto::generator() * *keys[i],
|
||||||
|
p2p,
|
||||||
|
Arc::new(RwLock::new(HashMap::from([(
|
||||||
|
spec.genesis(),
|
||||||
|
ActiveTributary { spec: spec.clone(), tributary },
|
||||||
|
)]))),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
let tributaries = tributary_arcs;
|
||||||
|
|
||||||
|
// After two blocks of time, we should have a new block
|
||||||
|
// We don't wait one block of time as we may have missed the chance for this block
|
||||||
|
sleep(Duration::from_secs((2 * Tributary::<MemDb, Transaction, LocalP2p>::block_time()).into()))
|
||||||
|
.await;
|
||||||
|
let tip = tributaries[0].read().await.tip().await;
|
||||||
|
assert!(tip != spec.genesis());
|
||||||
|
|
||||||
|
// Sleep one second to make sure this block propagates
|
||||||
|
sleep(Duration::from_secs(1)).await;
|
||||||
|
// Make sure every tributary has it
|
||||||
|
for tributary in &tributaries {
|
||||||
|
assert!(tributary.read().await.block(&tip).is_some());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Then after another block of time, we should have yet another new block
|
||||||
|
sleep(Duration::from_secs(Tributary::<MemDb, Transaction, LocalP2p>::block_time().into())).await;
|
||||||
|
let new_tip = tributaries[0].read().await.tip().await;
|
||||||
|
assert!(new_tip != tip);
|
||||||
|
sleep(Duration::from_secs(1)).await;
|
||||||
|
for tributary in tributaries {
|
||||||
|
assert!(tributary.read().await.block(&new_tip).is_some());
|
||||||
|
}
|
||||||
|
}
|
|
@ -17,6 +17,9 @@ mod tx;
|
||||||
mod dkg;
|
mod dkg;
|
||||||
// TODO: Test the other transactions
|
// TODO: Test the other transactions
|
||||||
|
|
||||||
|
mod handle_p2p;
|
||||||
|
mod sync;
|
||||||
|
|
||||||
fn random_u32<R: RngCore>(rng: &mut R) -> u32 {
|
fn random_u32<R: RngCore>(rng: &mut R) -> u32 {
|
||||||
u32::try_from(rng.next_u64() >> 32).unwrap()
|
u32::try_from(rng.next_u64() >> 32).unwrap()
|
||||||
}
|
}
|
||||||
|
|
129
coordinator/src/tests/tributary/sync.rs
Normal file
129
coordinator/src/tests/tributary/sync.rs
Normal file
|
@ -0,0 +1,129 @@
|
||||||
|
use core::time::Duration;
|
||||||
|
use std::{
|
||||||
|
sync::Arc,
|
||||||
|
collections::{HashSet, HashMap},
|
||||||
|
};
|
||||||
|
|
||||||
|
use rand_core::OsRng;
|
||||||
|
|
||||||
|
use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto};
|
||||||
|
|
||||||
|
use tokio::{sync::RwLock, time::sleep};
|
||||||
|
|
||||||
|
use serai_db::MemDb;
|
||||||
|
|
||||||
|
use tributary::Tributary;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
tributary::Transaction,
|
||||||
|
LocalP2p, ActiveTributary, handle_p2p, heartbeat_tributaries,
|
||||||
|
tests::tributary::{new_keys, new_spec, new_tributaries},
|
||||||
|
};
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn sync_test() {
|
||||||
|
let mut keys = new_keys(&mut OsRng);
|
||||||
|
let spec = new_spec(&mut OsRng, &keys);
|
||||||
|
// Ensure this can have a node fail
|
||||||
|
assert!(spec.n() > spec.t());
|
||||||
|
|
||||||
|
let mut tributaries = new_tributaries(&keys, &spec).await;
|
||||||
|
|
||||||
|
// Keep a Tributary back, effectively having it offline
|
||||||
|
let syncer_key = keys.pop().unwrap();
|
||||||
|
let (syncer_p2p, syncer_tributary) = tributaries.pop().unwrap();
|
||||||
|
|
||||||
|
// Have the rest form a P2P net
|
||||||
|
let mut tributary_arcs = vec![];
|
||||||
|
for (i, (p2p, tributary)) in tributaries.drain(..).enumerate() {
|
||||||
|
let tributary = Arc::new(RwLock::new(tributary));
|
||||||
|
tributary_arcs.push(tributary.clone());
|
||||||
|
tokio::spawn(handle_p2p(
|
||||||
|
Ristretto::generator() * *keys[i],
|
||||||
|
p2p,
|
||||||
|
Arc::new(RwLock::new(HashMap::from([(
|
||||||
|
spec.genesis(),
|
||||||
|
ActiveTributary { spec: spec.clone(), tributary },
|
||||||
|
)]))),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
let tributaries = tributary_arcs;
|
||||||
|
|
||||||
|
// After three blocks of time, we should have a new block
|
||||||
|
// We don't wait one block of time as we may have missed the chance for the first block
|
||||||
|
// We don't wait two blocks because we may have missed the chance, and then had a failure to
|
||||||
|
// propose by our 'offline' validator
|
||||||
|
let block_time = u64::from(Tributary::<MemDb, Transaction, LocalP2p>::block_time());
|
||||||
|
sleep(Duration::from_secs(3 * block_time)).await;
|
||||||
|
let tip = tributaries[0].read().await.tip().await;
|
||||||
|
assert!(tip != spec.genesis());
|
||||||
|
|
||||||
|
// Sleep one second to make sure this block propagates
|
||||||
|
sleep(Duration::from_secs(1)).await;
|
||||||
|
// Make sure every tributary has it
|
||||||
|
for tributary in &tributaries {
|
||||||
|
assert!(tributary.read().await.block(&tip).is_some());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now that we've confirmed the other tributaries formed a net without issue, drop the syncer's
|
||||||
|
// pending P2P messages
|
||||||
|
syncer_p2p.1.write().await.last_mut().unwrap().clear();
|
||||||
|
|
||||||
|
// Have it join the net
|
||||||
|
let syncer_key = Ristretto::generator() * *syncer_key;
|
||||||
|
let syncer_tributary = Arc::new(RwLock::new(syncer_tributary));
|
||||||
|
let syncer_tributaries = Arc::new(RwLock::new(HashMap::from([(
|
||||||
|
spec.genesis(),
|
||||||
|
ActiveTributary { spec: spec.clone(), tributary: syncer_tributary.clone() },
|
||||||
|
)])));
|
||||||
|
tokio::spawn(handle_p2p(syncer_key, syncer_p2p.clone(), syncer_tributaries.clone()));
|
||||||
|
|
||||||
|
// It shouldn't automatically catch up. If it somehow was, our test would be broken
|
||||||
|
// Sanity check this
|
||||||
|
let tip = tributaries[0].read().await.tip().await;
|
||||||
|
sleep(Duration::from_secs(2 * block_time)).await;
|
||||||
|
assert!(tributaries[0].read().await.tip().await != tip);
|
||||||
|
assert_eq!(syncer_tributary.read().await.tip().await, spec.genesis());
|
||||||
|
|
||||||
|
// Start the heartbeat protocol
|
||||||
|
tokio::spawn(heartbeat_tributaries(syncer_p2p, syncer_tributaries));
|
||||||
|
|
||||||
|
// The heartbeat is once every 10 blocks
|
||||||
|
sleep(Duration::from_secs(10 * block_time)).await;
|
||||||
|
assert!(syncer_tributary.read().await.tip().await != spec.genesis());
|
||||||
|
|
||||||
|
// Verify it synced to the tip
|
||||||
|
let syncer_tip = {
|
||||||
|
let tributary = tributaries[0].write().await;
|
||||||
|
let syncer_tributary = syncer_tributary.write().await;
|
||||||
|
|
||||||
|
let tip = tributary.tip().await;
|
||||||
|
let syncer_tip = syncer_tributary.tip().await;
|
||||||
|
// Allow a one block tolerance in case of race conditions
|
||||||
|
assert!(HashSet::from([tip, tributary.block(&tip).unwrap().parent()]).contains(&syncer_tip));
|
||||||
|
syncer_tip
|
||||||
|
};
|
||||||
|
|
||||||
|
sleep(Duration::from_secs(block_time)).await;
|
||||||
|
|
||||||
|
// Verify it's now keeping up
|
||||||
|
assert!(syncer_tributary.read().await.tip().await != syncer_tip);
|
||||||
|
|
||||||
|
// Verify it's now participating in consensus
|
||||||
|
// Because only `t` validators are used in a commit, check several commits
|
||||||
|
// This should be biased in favor of the syncer since we're using the syncer's view of the commit
|
||||||
|
for _ in 0 .. 10 {
|
||||||
|
let syncer_tributary = syncer_tributary.read().await;
|
||||||
|
if syncer_tributary
|
||||||
|
.parsed_commit(&syncer_tributary.tip().await)
|
||||||
|
.unwrap()
|
||||||
|
.validators
|
||||||
|
.iter()
|
||||||
|
.any(|signer| signer == &syncer_key.to_bytes())
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
sleep(Duration::from_secs(block_time)).await;
|
||||||
|
}
|
||||||
|
panic!("synced tributary didn't start participating in consensus");
|
||||||
|
}
|
Loading…
Reference in a new issue