clean up test

This commit is contained in:
Boog900 2024-06-13 22:04:58 +01:00
parent 3d995c5af5
commit 6d83cda90f
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
3 changed files with 183 additions and 138 deletions

View file

@ -392,12 +392,19 @@ where
let ids = in_flight_batch.ids.clone();
let start_height = in_flight_batch.start_height;
self.block_download_tasks.spawn(async move {
(
self.block_download_tasks.spawn(
async move {
(
start_height,
request_batch_from_peer(client, ids, start_height).await,
)
}
.instrument(tracing::debug_span!(
"download_batch",
start_height,
request_batch_from_peer(client, ids, start_height).await,
)
});
attempt = in_flight_batch.requests_sent
)),
);
return None;
}
@ -423,7 +430,7 @@ where
while let Some(failed_request) = self.failed_batches.peek() {
// Check if we still have the request that failed - another peer could have completed it after
// failure.
if let Some(request) = self.inflight_requests.get(&failed_request.0) {
if let Some(request) = self.inflight_requests.get_mut(&failed_request.0) {
// Check if this peer has the blocks according to their pruning seed.
if client_has_block_in_range(
&client.info.pruning_seed,
@ -435,12 +442,22 @@ where
let ids = request.ids.clone();
let start_height = request.start_height;
self.block_download_tasks.spawn(async move {
(
request.requests_sent += 1;
self.block_download_tasks.spawn(
async move {
(
start_height,
request_batch_from_peer(client, ids, start_height).await,
)
}
.instrument(tracing::debug_span!(
"download_batch",
start_height,
request_batch_from_peer(client, ids, start_height).await,
)
});
attempt = request.requests_sent
)),
);
// Remove the failure, we have just handled it.
self.failed_batches.pop();
@ -473,17 +490,24 @@ where
self.inflight_requests
.insert(block_entry_to_get.start_height, block_entry_to_get.clone());
self.block_download_tasks.spawn(async move {
(
block_entry_to_get.start_height,
request_batch_from_peer(
client,
block_entry_to_get.ids,
self.block_download_tasks.spawn(
async move {
(
block_entry_to_get.start_height,
request_batch_from_peer(
client,
block_entry_to_get.ids,
block_entry_to_get.start_height,
)
.await,
)
.await,
)
});
}
.instrument(tracing::debug_span!(
"download_batch",
block_entry_to_get.start_height,
attempt = block_entry_to_get.requests_sent
)),
);
None
}
@ -523,7 +547,10 @@ where
.await
.map_err(|_| BlockDownloadError::TimedOut)?
}
.instrument(Span::current()),
.instrument(tracing::debug_span!(
"request_chain_entry",
current_height = chain_tracker.top_height()
)),
);
return None;

View file

@ -73,21 +73,23 @@ impl<N: NetworkZone> ChainTracker<N> {
/// Returns `true` if the peer is expected to have the next block after our highest seen block
/// according to their pruning seed.
pub fn should_ask_for_next_chain_entry(&self, seed: &PruningSeed) -> bool {
seed.has_full_block(self.top_height(), CRYPTONOTE_MAX_BLOCK_HEIGHT)
}
/// Returns the simple history, the highest seen block and the genesis block.
pub fn get_simple_history(&self) -> [[u8; 32]; 2] {
[self.top_seen_hash, self.our_genesis]
}
/// Returns the height of the highest block we are tracking.
pub fn top_height(&self) -> u64 {
let top_block_idx = self
.entries
.iter()
.map(|entry| entry.ids.len())
.sum::<usize>();
seed.has_full_block(
self.first_height + u64::try_from(top_block_idx).unwrap(),
CRYPTONOTE_MAX_BLOCK_HEIGHT,
)
}
/// Returns the simple history, the highest seen block and the genesis block.
pub fn get_simple_history(&self) -> [[u8; 32]; 2] {
[self.top_seen_hash, self.our_genesis]
self.first_height + u64::try_from(top_block_idx).unwrap()
}
/// Returns the total number of queued batches for a certain `batch_size`.

View file

@ -1,3 +1,12 @@
use std::{
fmt::{Debug, Formatter},
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};
use futures::{FutureExt, StreamExt};
use indexmap::IndexMap;
use monero_serai::{
@ -5,121 +14,33 @@ use monero_serai::{
ringct::{RctBase, RctPrunable, RctSignatures},
transaction::{Input, Timelock, Transaction, TransactionPrefix},
};
use std::fmt::{Debug, Formatter};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use crate::block_downloader::{
download_blocks, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse,
};
use crate::client_pool::ClientPool;
use fixed_bytes::ByteArrayVec;
use monero_p2p::client::{mock_client, Client, InternalPeerID, PeerInformation};
use monero_p2p::network_zones::ClearNet;
use monero_p2p::services::{PeerSyncRequest, PeerSyncResponse};
use monero_p2p::{ConnectionDirection, NetworkZone, PeerRequest, PeerResponse};
use monero_pruning::PruningSeed;
use monero_wire::common::{BlockCompleteEntry, TransactionBlobs};
use monero_wire::protocol::{ChainResponse, GetObjectsResponse};
use proptest::{collection::vec, prelude::*};
use tokio::sync::Semaphore;
use tower::{service_fn, Service};
prop_compose! {
fn dummy_transaction_stragtegy(height: u64)
(
extra in vec(any::<u8>(), 0..1_000),
timelock in any::<usize>(),
)
-> Transaction {
Transaction {
prefix: TransactionPrefix {
version: 1,
timelock: Timelock::Block(timelock),
inputs: vec![Input::Gen(height)],
outputs: vec![],
extra,
},
signatures: vec![],
rct_signatures: RctSignatures {
base: RctBase {
fee: 0,
pseudo_outs: vec![],
encrypted_amounts: vec![],
commitments: vec![],
},
prunable: RctPrunable::Null
},
}
}
}
use fixed_bytes::ByteArrayVec;
use monero_p2p::{
client::{mock_client, Client, InternalPeerID, PeerInformation},
network_zones::ClearNet,
services::{PeerSyncRequest, PeerSyncResponse},
ConnectionDirection, NetworkZone, PeerRequest, PeerResponse,
};
use monero_pruning::PruningSeed;
use monero_wire::{
common::{BlockCompleteEntry, TransactionBlobs},
protocol::{ChainResponse, GetObjectsResponse},
};
prop_compose! {
fn dummy_block_stragtegy(
height: u64,
previous: [u8; 32],
)
(
miner_tx in dummy_transaction_stragtegy(height),
txs in vec(dummy_transaction_stragtegy(height), 0..25)
)
-> (Block, Vec<Transaction>) {
(
Block {
header: BlockHeader {
major_version: 0,
minor_version: 0,
timestamp: 0,
previous,
nonce: 0,
},
miner_tx,
txs: txs.iter().map(Transaction::hash).collect(),
},
txs
)
}
}
struct MockBlockchain {
blocks: IndexMap<[u8; 32], (Block, Vec<Transaction>)>,
}
impl Debug for MockBlockchain {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str("MockBlockchain")
}
}
prop_compose! {
fn dummy_blockchain_stragtegy()(
blocks in vec(dummy_block_stragtegy(0, [0; 32]), 1..50_000),
) -> MockBlockchain {
let mut blockchain = IndexMap::new();
for (height, mut block) in blocks.into_iter().enumerate() {
if let Some(last) = blockchain.last() {
block.0.header.previous = *last.0;
block.0.miner_tx.prefix.inputs = vec![Input::Gen(height as u64)]
}
blockchain.insert(block.0.hash(), block);
}
MockBlockchain {
blocks: blockchain
}
}
}
use crate::{
block_downloader::{download_blocks, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse},
client_pool::ClientPool,
};
proptest! {
#![proptest_config(ProptestConfig {
cases: 4,
max_shrink_iters: 10,
timeout: 600 * 1_000,
timeout: 60 * 1000,
.. ProptestConfig::default()
})]
@ -149,20 +70,115 @@ proptest! {
genesis: *blockchain.blocks.first().unwrap().0
},
BlockDownloaderConfig {
buffer_size: 100_000,
in_progress_queue_size: 100_000,
buffer_size: 1_000,
in_progress_queue_size: 10_000,
check_client_pool_interval: Duration::from_secs(5),
target_batch_size: 50_000,
target_batch_size: 5_000,
initial_batch_size: 1,
});
let blocks = stream.map(|blocks| blocks.blocks).concat().await;
assert_eq!(blocks.len() + 1, blockchain.blocks.len());
for (i, block) in blocks.into_iter().enumerate() {
assert_eq!(&block, blockchain.blocks.get_index(i + 1).unwrap().1);
}
});
}
}
prop_compose! {
/// Returns a strategy to generate a [`Transaction`] that is valid for the block downloader.
fn dummy_transaction_stragtegy(height: u64)
(
extra in vec(any::<u8>(), 0..1_000),
timelock in 0_usize..50_000_000,
)
-> Transaction {
Transaction {
prefix: TransactionPrefix {
version: 1,
timelock: Timelock::Block(timelock),
inputs: vec![Input::Gen(height)],
outputs: vec![],
extra,
},
signatures: vec![],
rct_signatures: RctSignatures {
base: RctBase {
fee: 0,
pseudo_outs: vec![],
encrypted_amounts: vec![],
commitments: vec![],
},
prunable: RctPrunable::Null
},
}
}
}
prop_compose! {
/// Returns a strategy to generate a [`Block`] that is valid for the block downloader.
fn dummy_block_stragtegy(
height: u64,
previous: [u8; 32],
)
(
miner_tx in dummy_transaction_stragtegy(height),
txs in vec(dummy_transaction_stragtegy(height), 0..25)
)
-> (Block, Vec<Transaction>) {
(
Block {
header: BlockHeader {
major_version: 0,
minor_version: 0,
timestamp: 0,
previous,
nonce: 0,
},
miner_tx,
txs: txs.iter().map(Transaction::hash).collect(),
},
txs
)
}
}
/// A mock blockchain.
struct MockBlockchain {
blocks: IndexMap<[u8; 32], (Block, Vec<Transaction>)>,
}
impl Debug for MockBlockchain {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str("MockBlockchain")
}
}
prop_compose! {
/// Returns a strategy to generate a [`MockBlockchain`].
fn dummy_blockchain_stragtegy()(
blocks in vec(dummy_block_stragtegy(0, [0; 32]), 1..50_000),
) -> MockBlockchain {
let mut blockchain = IndexMap::new();
for (height, mut block) in blocks.into_iter().enumerate() {
if let Some(last) = blockchain.last() {
block.0.header.previous = *last.0;
block.0.miner_tx.prefix.inputs = vec![Input::Gen(height as u64)]
}
blockchain.insert(block.0.hash(), block);
}
MockBlockchain {
blocks: blockchain
}
}
}
fn mock_block_downloader_client(blockchain: Arc<MockBlockchain>) -> Client<ClearNet> {
let semaphore = Arc::new(Semaphore::new(1));