From 6d83cda90f31e19670e2bd4cd1d7256953c0dd70 Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Thu, 13 Jun 2024 22:04:58 +0100 Subject: [PATCH] clean up test --- p2p/cuprate-p2p/src/block_downloader.rs | 69 ++++-- .../src/block_downloader/chain_tracker.rs | 20 +- p2p/cuprate-p2p/src/block_downloader/tests.rs | 232 ++++++++++-------- 3 files changed, 183 insertions(+), 138 deletions(-) diff --git a/p2p/cuprate-p2p/src/block_downloader.rs b/p2p/cuprate-p2p/src/block_downloader.rs index 2101df47..9b607716 100644 --- a/p2p/cuprate-p2p/src/block_downloader.rs +++ b/p2p/cuprate-p2p/src/block_downloader.rs @@ -392,12 +392,19 @@ where let ids = in_flight_batch.ids.clone(); let start_height = in_flight_batch.start_height; - self.block_download_tasks.spawn(async move { - ( + self.block_download_tasks.spawn( + async move { + ( + start_height, + request_batch_from_peer(client, ids, start_height).await, + ) + } + .instrument(tracing::debug_span!( + "download_batch", start_height, - request_batch_from_peer(client, ids, start_height).await, - ) - }); + attempt = in_flight_batch.requests_sent + )), + ); return None; } @@ -423,7 +430,7 @@ where while let Some(failed_request) = self.failed_batches.peek() { // Check if we still have the request that failed - another peer could have completed it after // failure. - if let Some(request) = self.inflight_requests.get(&failed_request.0) { + if let Some(request) = self.inflight_requests.get_mut(&failed_request.0) { // Check if this peer has the blocks according to their pruning seed. if client_has_block_in_range( &client.info.pruning_seed, @@ -435,12 +442,22 @@ where let ids = request.ids.clone(); let start_height = request.start_height; - self.block_download_tasks.spawn(async move { - ( + request.requests_sent += 1; + + self.block_download_tasks.spawn( + async move { + ( + start_height, + request_batch_from_peer(client, ids, start_height).await, + ) + } + .instrument(tracing::debug_span!( + "download_batch", start_height, - request_batch_from_peer(client, ids, start_height).await, - ) - }); + attempt = request.requests_sent + )), + ); + // Remove the failure, we have just handled it. self.failed_batches.pop(); @@ -473,17 +490,24 @@ where self.inflight_requests .insert(block_entry_to_get.start_height, block_entry_to_get.clone()); - self.block_download_tasks.spawn(async move { - ( - block_entry_to_get.start_height, - request_batch_from_peer( - client, - block_entry_to_get.ids, + self.block_download_tasks.spawn( + async move { + ( block_entry_to_get.start_height, + request_batch_from_peer( + client, + block_entry_to_get.ids, + block_entry_to_get.start_height, + ) + .await, ) - .await, - ) - }); + } + .instrument(tracing::debug_span!( + "download_batch", + block_entry_to_get.start_height, + attempt = block_entry_to_get.requests_sent + )), + ); None } @@ -523,7 +547,10 @@ where .await .map_err(|_| BlockDownloadError::TimedOut)? } - .instrument(Span::current()), + .instrument(tracing::debug_span!( + "request_chain_entry", + current_height = chain_tracker.top_height() + )), ); return None; diff --git a/p2p/cuprate-p2p/src/block_downloader/chain_tracker.rs b/p2p/cuprate-p2p/src/block_downloader/chain_tracker.rs index 19d70110..d116f6bf 100644 --- a/p2p/cuprate-p2p/src/block_downloader/chain_tracker.rs +++ b/p2p/cuprate-p2p/src/block_downloader/chain_tracker.rs @@ -73,21 +73,23 @@ impl ChainTracker { /// Returns `true` if the peer is expected to have the next block after our highest seen block /// according to their pruning seed. pub fn should_ask_for_next_chain_entry(&self, seed: &PruningSeed) -> bool { + seed.has_full_block(self.top_height(), CRYPTONOTE_MAX_BLOCK_HEIGHT) + } + + /// Returns the simple history, the highest seen block and the genesis block. + pub fn get_simple_history(&self) -> [[u8; 32]; 2] { + [self.top_seen_hash, self.our_genesis] + } + + /// Returns the height of the highest block we are tracking. + pub fn top_height(&self) -> u64 { let top_block_idx = self .entries .iter() .map(|entry| entry.ids.len()) .sum::(); - seed.has_full_block( - self.first_height + u64::try_from(top_block_idx).unwrap(), - CRYPTONOTE_MAX_BLOCK_HEIGHT, - ) - } - - /// Returns the simple history, the highest seen block and the genesis block. - pub fn get_simple_history(&self) -> [[u8; 32]; 2] { - [self.top_seen_hash, self.our_genesis] + self.first_height + u64::try_from(top_block_idx).unwrap() } /// Returns the total number of queued batches for a certain `batch_size`. diff --git a/p2p/cuprate-p2p/src/block_downloader/tests.rs b/p2p/cuprate-p2p/src/block_downloader/tests.rs index fda524de..56c8f864 100644 --- a/p2p/cuprate-p2p/src/block_downloader/tests.rs +++ b/p2p/cuprate-p2p/src/block_downloader/tests.rs @@ -1,3 +1,12 @@ +use std::{ + fmt::{Debug, Formatter}, + future::Future, + pin::Pin, + sync::Arc, + task::{Context, Poll}, + time::Duration, +}; + use futures::{FutureExt, StreamExt}; use indexmap::IndexMap; use monero_serai::{ @@ -5,121 +14,33 @@ use monero_serai::{ ringct::{RctBase, RctPrunable, RctSignatures}, transaction::{Input, Timelock, Transaction, TransactionPrefix}, }; -use std::fmt::{Debug, Formatter}; -use std::future::Future; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; -use std::time::Duration; - -use crate::block_downloader::{ - download_blocks, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse, -}; -use crate::client_pool::ClientPool; -use fixed_bytes::ByteArrayVec; -use monero_p2p::client::{mock_client, Client, InternalPeerID, PeerInformation}; -use monero_p2p::network_zones::ClearNet; -use monero_p2p::services::{PeerSyncRequest, PeerSyncResponse}; -use monero_p2p::{ConnectionDirection, NetworkZone, PeerRequest, PeerResponse}; -use monero_pruning::PruningSeed; -use monero_wire::common::{BlockCompleteEntry, TransactionBlobs}; -use monero_wire::protocol::{ChainResponse, GetObjectsResponse}; use proptest::{collection::vec, prelude::*}; use tokio::sync::Semaphore; use tower::{service_fn, Service}; -prop_compose! { - fn dummy_transaction_stragtegy(height: u64) - ( - extra in vec(any::(), 0..1_000), - timelock in any::(), - ) - -> Transaction { - Transaction { - prefix: TransactionPrefix { - version: 1, - timelock: Timelock::Block(timelock), - inputs: vec![Input::Gen(height)], - outputs: vec![], - extra, - }, - signatures: vec![], - rct_signatures: RctSignatures { - base: RctBase { - fee: 0, - pseudo_outs: vec![], - encrypted_amounts: vec![], - commitments: vec![], - }, - prunable: RctPrunable::Null - }, - } - } -} +use fixed_bytes::ByteArrayVec; +use monero_p2p::{ + client::{mock_client, Client, InternalPeerID, PeerInformation}, + network_zones::ClearNet, + services::{PeerSyncRequest, PeerSyncResponse}, + ConnectionDirection, NetworkZone, PeerRequest, PeerResponse, +}; +use monero_pruning::PruningSeed; +use monero_wire::{ + common::{BlockCompleteEntry, TransactionBlobs}, + protocol::{ChainResponse, GetObjectsResponse}, +}; -prop_compose! { - fn dummy_block_stragtegy( - height: u64, - previous: [u8; 32], - ) - ( - miner_tx in dummy_transaction_stragtegy(height), - txs in vec(dummy_transaction_stragtegy(height), 0..25) - ) - -> (Block, Vec) { - ( - Block { - header: BlockHeader { - major_version: 0, - minor_version: 0, - timestamp: 0, - previous, - nonce: 0, - }, - miner_tx, - txs: txs.iter().map(Transaction::hash).collect(), - }, - txs - ) - } -} - -struct MockBlockchain { - blocks: IndexMap<[u8; 32], (Block, Vec)>, -} - -impl Debug for MockBlockchain { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.write_str("MockBlockchain") - } -} - -prop_compose! { - fn dummy_blockchain_stragtegy()( - blocks in vec(dummy_block_stragtegy(0, [0; 32]), 1..50_000), - ) -> MockBlockchain { - let mut blockchain = IndexMap::new(); - - for (height, mut block) in blocks.into_iter().enumerate() { - if let Some(last) = blockchain.last() { - block.0.header.previous = *last.0; - block.0.miner_tx.prefix.inputs = vec![Input::Gen(height as u64)] - } - - blockchain.insert(block.0.hash(), block); - } - - MockBlockchain { - blocks: blockchain - } - } -} +use crate::{ + block_downloader::{download_blocks, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse}, + client_pool::ClientPool, +}; proptest! { #![proptest_config(ProptestConfig { cases: 4, max_shrink_iters: 10, - timeout: 600 * 1_000, + timeout: 60 * 1000, .. ProptestConfig::default() })] @@ -149,20 +70,115 @@ proptest! { genesis: *blockchain.blocks.first().unwrap().0 }, BlockDownloaderConfig { - buffer_size: 100_000, - in_progress_queue_size: 100_000, + buffer_size: 1_000, + in_progress_queue_size: 10_000, check_client_pool_interval: Duration::from_secs(5), - target_batch_size: 50_000, + target_batch_size: 5_000, initial_batch_size: 1, }); let blocks = stream.map(|blocks| blocks.blocks).concat().await; assert_eq!(blocks.len() + 1, blockchain.blocks.len()); + + for (i, block) in blocks.into_iter().enumerate() { + assert_eq!(&block, blockchain.blocks.get_index(i + 1).unwrap().1); + } }); } } +prop_compose! { + /// Returns a strategy to generate a [`Transaction`] that is valid for the block downloader. + fn dummy_transaction_stragtegy(height: u64) + ( + extra in vec(any::(), 0..1_000), + timelock in 0_usize..50_000_000, + ) + -> Transaction { + Transaction { + prefix: TransactionPrefix { + version: 1, + timelock: Timelock::Block(timelock), + inputs: vec![Input::Gen(height)], + outputs: vec![], + extra, + }, + signatures: vec![], + rct_signatures: RctSignatures { + base: RctBase { + fee: 0, + pseudo_outs: vec![], + encrypted_amounts: vec![], + commitments: vec![], + }, + prunable: RctPrunable::Null + }, + } + } +} + +prop_compose! { + /// Returns a strategy to generate a [`Block`] that is valid for the block downloader. + fn dummy_block_stragtegy( + height: u64, + previous: [u8; 32], + ) + ( + miner_tx in dummy_transaction_stragtegy(height), + txs in vec(dummy_transaction_stragtegy(height), 0..25) + ) + -> (Block, Vec) { + ( + Block { + header: BlockHeader { + major_version: 0, + minor_version: 0, + timestamp: 0, + previous, + nonce: 0, + }, + miner_tx, + txs: txs.iter().map(Transaction::hash).collect(), + }, + txs + ) + } +} + +/// A mock blockchain. +struct MockBlockchain { + blocks: IndexMap<[u8; 32], (Block, Vec)>, +} + +impl Debug for MockBlockchain { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_str("MockBlockchain") + } +} + +prop_compose! { + /// Returns a strategy to generate a [`MockBlockchain`]. + fn dummy_blockchain_stragtegy()( + blocks in vec(dummy_block_stragtegy(0, [0; 32]), 1..50_000), + ) -> MockBlockchain { + let mut blockchain = IndexMap::new(); + + for (height, mut block) in blocks.into_iter().enumerate() { + if let Some(last) = blockchain.last() { + block.0.header.previous = *last.0; + block.0.miner_tx.prefix.inputs = vec![Input::Gen(height as u64)] + } + + blockchain.insert(block.0.hash(), block); + } + + MockBlockchain { + blocks: blockchain + } + } +} + fn mock_block_downloader_client(blockchain: Arc) -> Client { let semaphore = Arc::new(Semaphore::new(1));