add some tests

This commit is contained in:
Boog900 2024-05-15 17:15:39 +01:00
parent df6d6ec187
commit aedf87ec5b
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2

View file

@ -2,10 +2,8 @@
//!
//! This module handles broadcasting messages to multiple peers with the [`BroadcastSvc`].
use std::{
collections::HashSet,
future::{ready, Future, Ready},
pin::{pin, Pin},
sync::Arc,
task::{ready, Context, Poll},
time::Duration,
};
@ -392,3 +390,152 @@ fn get_txs_to_broadcast<N: NetworkZone>(
}
}
}
#[cfg(test)]
mod tests {
use std::{pin::pin, time::Duration};
use bytes::Bytes;
use futures::StreamExt;
use tokio::time::timeout;
use tower::{Service, ServiceExt};
use cuprate_test_utils::test_netzone::TestNetZone;
use monero_p2p::{client::InternalPeerID, BroadcastMessage, ConnectionDirection};
use super::{init_broadcast_channels, BroadcastConfig, BroadcastRequest};
const TEST_CONFIG: BroadcastConfig = BroadcastConfig {
diffusion_flush_average_seconds_outbound: Duration::from_millis(100),
diffusion_flush_average_seconds_inbound: Duration::from_millis(200),
};
#[tokio::test]
async fn tx_broadcast_direction_correct() {
let (mut brcst, outbound_mkr, inbound_mkr) =
init_broadcast_channels::<TestNetZone<true, true, true>>(&TEST_CONFIG);
let mut outbound_stream = pin!(outbound_mkr(InternalPeerID::Unknown(1)));
let mut inbound_stream = pin!(inbound_mkr(InternalPeerID::Unknown(1)));
// Outbound should get 1 and 3, inbound should get 2 and 3.
brcst
.ready()
.await
.unwrap()
.call(BroadcastRequest::Transaction {
tx_bytes: Bytes::from_static(&[1]),
direction: Some(ConnectionDirection::OutBound),
received_from: None,
})
.await
.unwrap();
brcst
.ready()
.await
.unwrap()
.call(BroadcastRequest::Transaction {
tx_bytes: Bytes::from_static(&[2]),
direction: Some(ConnectionDirection::InBound),
received_from: None,
})
.await
.unwrap();
brcst
.ready()
.await
.unwrap()
.call(BroadcastRequest::Transaction {
tx_bytes: Bytes::from_static(&[3]),
direction: None,
received_from: None,
})
.await
.unwrap();
let match_tx = |mes, txs| match mes {
BroadcastMessage::NewTransaction(tx) => assert_eq!(tx.txs.as_slice(), txs),
_ => panic!("Block broadcast?"),
};
let next = outbound_stream.next().await.unwrap();
let txs = [Bytes::from_static(&[1]), Bytes::from_static(&[3])];
match_tx(next, &txs);
let next = inbound_stream.next().await.unwrap();
match_tx(next, &[Bytes::from_static(&[2]), Bytes::from_static(&[3])]);
}
#[tokio::test]
async fn block_broadcast_sent_to_all() {
let (mut brcst, outbound_mkr, inbound_mkr) =
init_broadcast_channels::<TestNetZone<true, true, true>>(&TEST_CONFIG);
let mut outbound_stream = pin!(outbound_mkr(InternalPeerID::Unknown(1)));
let mut inbound_stream = pin!(inbound_mkr(InternalPeerID::Unknown(1)));
brcst
.ready()
.await
.unwrap()
.call(BroadcastRequest::Block {
block_bytes: Default::default(),
current_blockchain_height: 0,
})
.await
.unwrap();
let next = outbound_stream.next().await.unwrap();
assert!(matches!(next, BroadcastMessage::NewFluffyBlock(_)));
let next = inbound_stream.next().await.unwrap();
assert!(matches!(next, BroadcastMessage::NewFluffyBlock(_)));
}
#[tokio::test]
async fn tx_broadcast_skipped_for_received_from_peer() {
let (mut brcst, outbound_mkr, inbound_mkr) =
init_broadcast_channels::<TestNetZone<true, true, true>>(&TEST_CONFIG);
let mut outbound_stream = pin!(outbound_mkr(InternalPeerID::Unknown(1)));
let mut outbound_stream_from = pin!(outbound_mkr(InternalPeerID::Unknown(0)));
let mut inbound_stream = pin!(inbound_mkr(InternalPeerID::Unknown(1)));
let mut inbound_stream_from = pin!(inbound_mkr(InternalPeerID::Unknown(0)));
brcst
.ready()
.await
.unwrap()
.call(BroadcastRequest::Transaction {
tx_bytes: Bytes::from_static(&[1]),
direction: None,
received_from: Some(InternalPeerID::Unknown(0)),
})
.await
.unwrap();
let match_tx = |mes, txs| match mes {
BroadcastMessage::NewTransaction(tx) => assert_eq!(tx.txs.as_slice(), txs),
_ => panic!("Block broadcast?"),
};
let next = outbound_stream.next().await.unwrap();
let txs = [Bytes::from_static(&[1])];
match_tx(next, &txs);
let next = inbound_stream.next().await.unwrap();
match_tx(next, &[Bytes::from_static(&[1])]);
// Make sure the streams with the same id as the one we said sent the tx do not get the tx to broadcast.
assert!(timeout(
Duration::from_secs(2),
futures::future::select(inbound_stream_from.next(), outbound_stream_from.next())
)
.await
.is_err())
}
}