add a test

This commit is contained in:
Boog900 2024-06-13 03:15:47 +01:00
parent cefd8a7a3f
commit dd9e63d408
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
5 changed files with 368 additions and 54 deletions
Cargo.lock
p2p
cuprate-p2p
monero-p2p/src

50
Cargo.lock generated
View file

@ -609,6 +609,7 @@ dependencies = [
"monero-serai",
"monero-wire",
"pin-project",
"proptest",
"rand",
"rand_distr",
"rayon",
@ -618,7 +619,6 @@ dependencies = [
"tokio-util",
"tower",
"tracing",
"tracing-subscriber",
]
[[package]]
@ -1563,16 +1563,6 @@ dependencies = [
"zeroize",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
dependencies = [
"overload",
"winapi",
]
[[package]]
name = "num-traits"
version = "0.2.18"
@ -1620,12 +1610,6 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d"
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "page_size"
version = "0.6.0"
@ -2252,15 +2236,6 @@ dependencies = [
"keccak",
]
[[package]]
name = "sharded-slab"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6"
dependencies = [
"lazy_static",
]
[[package]]
name = "signal-hook-registry"
version = "1.4.2"
@ -2614,18 +2589,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54"
dependencies = [
"once_cell",
"valuable",
]
[[package]]
name = "tracing-log"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3"
dependencies = [
"log",
"once_cell",
"tracing-core",
]
[[package]]
@ -2634,12 +2597,7 @@ version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b"
dependencies = [
"nu-ansi-term",
"sharded-slab",
"smallvec",
"thread_local",
"tracing-core",
"tracing-log",
]
[[package]]
@ -2698,12 +2656,6 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "valuable"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]]
name = "version_check"
version = "0.9.4"

View file

@ -27,13 +27,12 @@ dashmap = { workspace = true }
thiserror = { workspace = true }
bytes = { workspace = true, features = ["std"] }
indexmap = { workspace = true, features = ["std"] }
rand = { workspace = true, features = ["std", "std_rng"] }
rand_distr = { workspace = true, features = ["std"] }
hex = { workspace = true, features = ["std"] }
tracing = { workspace = true, features = ["std", "attributes"] }
tracing-subscriber = "0.3.18"
[dev-dependencies]
cuprate-test-utils = { path = "../../test-utils" }
indexmap = { workspace = true }
proptest = { workspace = true }

View file

@ -4,6 +4,7 @@
//! and downloads it.
//!
//! The block downloader is started by [`download_blocks`].
use futures::FutureExt;
use std::{
cmp::{max, min, Ordering, Reverse},
collections::{BTreeMap, BinaryHeap, HashSet},
@ -46,6 +47,9 @@ use crate::{
mod chain_tracker;
use chain_tracker::{BlocksToRetrieve, ChainEntry, ChainTracker};
#[cfg(test)]
mod tests;
/// A downloaded batch of blocks.
#[derive(Debug)]
pub struct BlockBatch {
@ -154,7 +158,12 @@ where
config,
);
tokio::spawn(block_downloader.run().instrument(Span::current()));
tokio::spawn(
block_downloader
.run()
.instrument(Span::current())
.map(|res| panic!("{res:?}")),
);
buffer_stream
}

View file

@ -0,0 +1,303 @@
use futures::{FutureExt, StreamExt};
use indexmap::IndexMap;
use monero_serai::{
block::{Block, BlockHeader},
ringct::{RctBase, RctPrunable, RctSignatures},
transaction::{Input, Timelock, Transaction, TransactionPrefix},
};
use std::fmt::{Debug, Formatter};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use crate::block_downloader::{
download_blocks, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse,
};
use crate::client_pool::ClientPool;
use fixed_bytes::ByteArrayVec;
use monero_p2p::client::{mock_client, Client, InternalPeerID, PeerInformation};
use monero_p2p::network_zones::ClearNet;
use monero_p2p::services::{PeerSyncRequest, PeerSyncResponse};
use monero_p2p::{ConnectionDirection, NetworkZone, PeerRequest, PeerResponse};
use monero_pruning::PruningSeed;
use monero_wire::common::{BlockCompleteEntry, TransactionBlobs};
use monero_wire::protocol::{ChainResponse, GetObjectsResponse};
use proptest::{collection::vec, prelude::*};
use tokio::sync::Semaphore;
use tower::{service_fn, Service};
prop_compose! {
fn dummy_transaction_stragtegy
(height: u64)
(extra in vec(any::<u8>(), 0..10_000))
-> Transaction {
Transaction {
prefix: TransactionPrefix {
version: 1,
timelock: Timelock::None,
inputs: vec![Input::Gen(height)],
outputs: vec![],
extra,
},
signatures: vec![],
rct_signatures: RctSignatures {
base: RctBase {
fee: 0,
pseudo_outs: vec![],
encrypted_amounts: vec![],
commitments: vec![],
},
prunable: RctPrunable::Null
},
}
}
}
prop_compose! {
fn dummy_block_stragtegy(
height: u64,
previous: [u8; 32],
)
(
miner_tx in dummy_transaction_stragtegy(height),
txs in vec(dummy_transaction_stragtegy(height), 0..25)
)
-> (Block, Vec<Transaction>) {
(
Block {
header: BlockHeader {
major_version: 0,
minor_version: 0,
timestamp: 0,
previous,
nonce: 0,
},
miner_tx,
txs: txs.iter().map(Transaction::hash).collect(),
},
txs
)
}
}
struct MockBlockchain {
blocks: IndexMap<[u8; 32], (Block, Vec<Transaction>)>,
}
impl Debug for MockBlockchain {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str("MockBlockchain")
}
}
prop_compose! {
fn dummy_blockchain_stragtegy()(
blocks in vec(dummy_block_stragtegy(0, [0; 32]), 1..1_000),
) -> MockBlockchain {
let mut blockchain = IndexMap::new();
for (height, mut block) in blocks.into_iter().enumerate() {
if let Some(last) = blockchain.last() {
block.0.header.previous = *last.0;
block.0.miner_tx.prefix.inputs = vec![Input::Gen(height as u64)]
}
blockchain.insert(block.0.hash(), block);
}
MockBlockchain {
blocks: blockchain
}
}
}
proptest! {
#![proptest_config(ProptestConfig {
cases: 2,
max_shrink_iters: 0,
timeout: 600 * 1_000,
.. ProptestConfig::default()
})]
#[test]
fn test_block_downloader(blockchain in dummy_blockchain_stragtegy(), peers in 1_usize..128) {
let blockchain = Arc::new(blockchain);
let tokio_pool = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap();
tokio_pool.block_on(async move {
let client_pool = ClientPool::new();
let mut peer_ids = Vec::with_capacity(peers);
for _ in 0..peers {
let client = mock_block_downloader_client(blockchain.clone());
peer_ids.push(client.info.id);
client_pool.add_new_client(client);
}
let stream = download_blocks(
client_pool,
SyncStateSvc(peer_ids) ,
OurChainSvc {
genesis: *blockchain.blocks.first().unwrap().0
},
BlockDownloaderConfig {
buffer_size: 100_000,
in_progress_queue_size: 100_000,
check_client_pool_interval: Duration::from_secs(5),
target_batch_size: 50_000,
initial_batch_size: 1,
});
let blocks = stream.map(|blocks| blocks.blocks).concat().await;
assert_eq!(blocks.len() + 1, blockchain.blocks.len());
});
}
}
fn mock_block_downloader_client(blockchain: Arc<MockBlockchain>) -> Client<ClearNet> {
let semaphore = Arc::new(Semaphore::new(1));
let (connection_guard, connection_handle) = monero_p2p::handles::HandleBuilder::new()
.with_permit(semaphore.try_acquire_owned().unwrap())
.build();
let request_handler = service_fn(move |req: PeerRequest| {
let bc = blockchain.clone();
async move {
match req {
PeerRequest::GetChain(chain_req) => {
let mut i = 0;
while !bc.blocks.contains_key(&chain_req.block_ids[i]) {
i += 1;
if i == chain_req.block_ids.len() {
i -= 1;
break;
}
}
let block_index = bc.blocks.get_index_of(&chain_req.block_ids[i]).unwrap();
let block_ids = bc
.blocks
.get_range(block_index..)
.unwrap()
.iter()
.map(|(id, _)| *id)
.take(200)
.collect::<Vec<_>>();
Ok(PeerResponse::GetChain(ChainResponse {
start_height: 0,
total_height: 0,
cumulative_difficulty_low64: 1,
cumulative_difficulty_top64: 0,
m_block_ids: block_ids.into(),
m_block_weights: vec![],
first_block: Default::default(),
}))
}
PeerRequest::GetObjects(obj) => {
let mut res = Vec::with_capacity(obj.blocks.len());
for i in 0..obj.blocks.len() {
let block = bc.blocks.get(&obj.blocks[i]).unwrap();
let block_entry = BlockCompleteEntry {
pruned: false,
block: block.0.serialize().into(),
txs: TransactionBlobs::Normal(
block
.1
.iter()
.map(Transaction::serialize)
.map(Into::into)
.collect(),
),
block_weight: 0,
};
res.push(block_entry);
}
Ok(PeerResponse::GetObjects(GetObjectsResponse {
blocks: res,
missed_ids: ByteArrayVec::from([]),
current_blockchain_height: 0,
}))
}
_ => panic!(),
}
}
.boxed()
});
let info = PeerInformation {
id: InternalPeerID::Unknown(rand::random()),
handle: connection_handle,
direction: ConnectionDirection::InBound,
pruning_seed: PruningSeed::NotPruned,
};
mock_client(info, connection_guard, request_handler)
}
#[derive(Clone)]
struct SyncStateSvc<Z: NetworkZone>(Vec<InternalPeerID<Z::Addr>>);
impl Service<PeerSyncRequest<ClearNet>> for SyncStateSvc<ClearNet> {
type Response = PeerSyncResponse<ClearNet>;
type Error = tower::BoxError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _: PeerSyncRequest<ClearNet>) -> Self::Future {
let peers = self.0.clone();
async move { Ok(PeerSyncResponse::PeersToSyncFrom(peers)) }.boxed()
}
}
struct OurChainSvc {
genesis: [u8; 32],
}
impl Service<ChainSvcRequest> for OurChainSvc {
type Response = ChainSvcResponse;
type Error = tower::BoxError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: ChainSvcRequest) -> Self::Future {
let genesis = self.genesis;
async move {
Ok(match req {
ChainSvcRequest::CompactHistory => ChainSvcResponse::CompactHistory {
block_ids: vec![genesis],
cumulative_difficulty: 1,
},
ChainSvcRequest::FindFirstUnknown(_) => ChainSvcResponse::FindFirstUnknown(1, 1),
ChainSvcRequest::CumulativeDifficulty => ChainSvcResponse::CumulativeDifficulty(1),
})
}
.boxed()
}
}

View file

@ -10,7 +10,8 @@ use tokio::{
task::JoinHandle,
};
use tokio_util::sync::PollSemaphore;
use tower::Service;
use tower::{Service, ServiceExt};
use tracing::Instrument;
use cuprate_helper::asynch::InfallibleOneshotReceiver;
@ -24,6 +25,7 @@ mod connector;
pub mod handshaker;
mod timeout_monitor;
use crate::handles::ConnectionGuard;
pub use connector::{ConnectRequest, Connector};
pub use handshaker::{DoHandshakeRequest, HandShaker, HandshakeError};
use monero_pruning::PruningSeed;
@ -163,6 +165,8 @@ impl<Z: NetworkZone> Service<PeerRequest> for Client<Z> {
match e {
TrySendError::Closed(req) | TrySendError::Full(req) => {
self.set_err(PeerError::ClientChannelClosed);
let _ = req
.response_channel
.send(Err(PeerError::ClientChannelClosed.into()));
@ -173,3 +177,50 @@ impl<Z: NetworkZone> Service<PeerRequest> for Client<Z> {
rx.into()
}
}
/// Creates a mock [`Client`] for testing purposes.
///
/// `request_handler` will be used to handle requests sent to the [`Client`]
pub fn mock_client<Z: NetworkZone, S>(
info: PeerInformation<Z::Addr>,
connection_guard: ConnectionGuard,
mut request_handler: S,
) -> Client<Z>
where
S: crate::PeerRequestHandler,
{
let (tx, mut rx) = mpsc::channel(1);
let task_span = tracing::error_span!("mock_connection", addr = %info.id);
let task_handle = tokio::spawn(
async move {
let _guard = connection_guard;
let Some(req): Option<connection::ConnectionTaskRequest> = rx.recv().await else {
tracing::debug!("Channel closed, closing mock connection");
return;
};
tracing::debug!("Received new request: {:?}", req.request.id());
let res = request_handler
.ready()
.await
.unwrap()
.call(req.request)
.await
.unwrap();
tracing::debug!("Sending back response");
let _ = req.response_channel.send(Ok(res));
}
.instrument(task_span),
);
let timeout_task = tokio::spawn(futures::future::pending());
let semaphore = Arc::new(Semaphore::new(1));
let error_slot = SharedError::new();
Client::new(info, tx, task_handle, timeout_task, semaphore, error_slot)
}