Add content-based deduplication to the tests' shimmed P2P

The tests have recently had their timing stilted, causing failures. The tests
are... fine. They're fragile, as obvious, yet they're logical. The simplest fix
is to unstilt their timing rather to make them non-fragile.

The recent change, which presumably caused said stilting, was the the
rebroadcasting added. This de-duplication prevents most of the impact of
rebroadcasting. While there's still the async task, and the lock acquisition on
attempt to rebroadcast, this hopefully is enough.
This commit is contained in:
Luke Parker 2023-10-13 19:47:58 -04:00
parent d5c6ed1a03
commit 96c397caa0
No known key found for this signature in database
2 changed files with 18 additions and 7 deletions

View file

@ -1,7 +1,7 @@
use core::fmt::Debug;
use std::{
sync::Arc,
collections::{VecDeque, HashMap},
collections::{VecDeque, HashSet, HashMap},
};
use serai_client::primitives::NetworkId;
@ -45,11 +45,11 @@ impl Processors for MemProcessors {
#[allow(clippy::type_complexity)]
#[derive(Clone, Debug)]
pub struct LocalP2p(usize, pub Arc<RwLock<Vec<VecDeque<(usize, Vec<u8>)>>>>);
pub struct LocalP2p(usize, pub Arc<RwLock<(HashSet<Vec<u8>>, Vec<VecDeque<(usize, Vec<u8>)>>)>>);
impl LocalP2p {
pub fn new(validators: usize) -> Vec<LocalP2p> {
let shared = Arc::new(RwLock::new(vec![VecDeque::new(); validators]));
let shared = Arc::new(RwLock::new((HashSet::new(), vec![VecDeque::new(); validators])));
let mut res = vec![];
for i in 0 .. validators {
res.push(LocalP2p(i, shared.clone()));
@ -63,11 +63,22 @@ impl P2p for LocalP2p {
type Id = usize;
async fn send_raw(&self, to: Self::Id, msg: Vec<u8>) {
self.1.write().await[to].push_back((self.0, msg));
self.1.write().await.1[to].push_back((self.0, msg));
}
async fn broadcast_raw(&self, msg: Vec<u8>) {
for (i, msg_queue) in self.1.write().await.iter_mut().enumerate() {
// Content-based deduplication
let mut lock = self.1.write().await;
{
let already_sent = &mut lock.0;
if already_sent.contains(&msg) {
return;
}
already_sent.insert(msg.clone());
}
let queues = &mut lock.1;
for (i, msg_queue) in queues.iter_mut().enumerate() {
if i == self.0 {
continue;
}
@ -78,7 +89,7 @@ impl P2p for LocalP2p {
async fn receive_raw(&self) -> (Self::Id, Vec<u8>) {
// This is a cursed way to implement an async read from a Vec
loop {
if let Some(res) = self.1.write().await[self.0].pop_front() {
if let Some(res) = self.1.write().await.1[self.0].pop_front() {
return res;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;

View file

@ -70,7 +70,7 @@ async fn sync_test() {
// Now that we've confirmed the other tributaries formed a net without issue, drop the syncer's
// pending P2P messages
syncer_p2p.1.write().await.last_mut().unwrap().clear();
syncer_p2p.1.write().await.1.last_mut().unwrap().clear();
// Have it join the net
let syncer_key = Ristretto::generator() * *syncer_key;