From 44c7ad4992c6a099079652a75a29ff03bae13ef1 Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Sat, 1 Jun 2024 02:02:45 +0100 Subject: [PATCH] dynamic batch size changes --- p2p/cuprate-p2p/src/bin/test_init.rs | 40 ++++++++++++++++++++----- p2p/cuprate-p2p/src/block_downloader.rs | 23 ++++++++++++-- p2p/cuprate-p2p/src/lib.rs | 3 +- 3 files changed, 54 insertions(+), 12 deletions(-) diff --git a/p2p/cuprate-p2p/src/bin/test_init.rs b/p2p/cuprate-p2p/src/bin/test_init.rs index 87c4c41c..3ad4c0f2 100644 --- a/p2p/cuprate-p2p/src/bin/test_init.rs +++ b/p2p/cuprate-p2p/src/bin/test_init.rs @@ -7,6 +7,7 @@ use monero_address_book::AddressBookConfig; use monero_p2p::network_zones::ClearNet; use monero_p2p::services::{CoreSyncDataRequest, CoreSyncDataResponse}; use monero_p2p::{PeerRequest, PeerResponse}; +use monero_wire::admin::TimedSyncResponse; use std::future::Future; use std::path::PathBuf; use std::pin::Pin; @@ -64,8 +65,29 @@ impl Service<PeerRequest> for DummyPeerRequestHandlerSvc { Poll::Ready(Ok(())) } - fn call(&mut self, _: PeerRequest) -> Self::Future { - async move { Ok(PeerResponse::NA) }.boxed() + fn call(&mut self, req: PeerRequest) -> Self::Future { + async move { + Ok(match req { + PeerRequest::TimedSync(_) => PeerResponse::TimedSync(TimedSyncResponse { + payload_data: monero_wire::CoreSyncData { + cumulative_difficulty: 1, + cumulative_difficulty_top64: 0, + current_height: 1, + pruning_seed: 0, + top_id: hex::decode( + "418015bb9ae982a1975da7d79277c2705727a56894ba0fb246adaabb1f4632e3", + ) + .unwrap() + .try_into() + .unwrap(), + top_version: 1, + }, + local_peerlist_new: vec![], + }), + _ => PeerResponse::NA, + }) + } + .boxed() } } @@ -101,7 +123,7 @@ impl Service<ChainSvcRequest> for OurChainSvc { } } -#[tokio::main] +#[tokio::main(flavor = "multi_thread", worker_threads = 12)] async fn main() { tracing_subscriber::fmt() .with_max_level(Level::DEBUG) @@ -110,12 +132,12 @@ async fn main() { let config = P2PConfig::<ClearNet> { network: Default::default(), - outbound_connections: 64, + outbound_connections: 32, extra_outbound_connections: 32, max_inbound_connections: 128, gray_peers_percent: 0.7, server_config: None, - p2p_port: 18081, + p2p_port: 0, rpc_port: 0, address_book_config: AddressBookConfig { max_white_list_length: 1000, @@ -129,7 +151,7 @@ async fn main() { .await .unwrap(); - sleep(Duration::from_secs(55)).await; + sleep(Duration::from_secs(15)).await; loop { let mut buffer = download_blocks( @@ -139,8 +161,8 @@ async fn main() { BlockDownloaderConfig { buffer_size: 50_000_000, in_progress_queue_size: 30_000_000, - check_client_pool_interval: Duration::from_secs(15), - target_batch_size: 2_000_000, + check_client_pool_interval: Duration::from_secs(20), + target_batch_size: 1_000_000, initial_batch_size: 100, }, ); @@ -152,5 +174,7 @@ async fn main() { entry.blocks.len() ) } + + sleep(Duration::from_secs(2)).await; } } diff --git a/p2p/cuprate-p2p/src/block_downloader.rs b/p2p/cuprate-p2p/src/block_downloader.rs index bd997c84..a377a6a3 100644 --- a/p2p/cuprate-p2p/src/block_downloader.rs +++ b/p2p/cuprate-p2p/src/block_downloader.rs @@ -363,7 +363,7 @@ where client: ClientPoolDropGuard<N>, ) { if self.chain_entry_task.len() < 2 - && chain_tracker.block_requests_queued(self.amount_of_blocks_to_request) < 15 + && chain_tracker.block_requests_queued(self.amount_of_blocks_to_request) < 30 && chain_tracker.should_ask_for_next_chain_entry(&client.info.pruning_seed) { let history = chain_tracker.get_simple_history(); @@ -463,13 +463,30 @@ where }; if start_height > self.amount_of_blocks_to_request_updated_at { - let average_block_size = + let old_amount_of_blocks_to_request = self.amount_of_blocks_to_request; + + // The average block size of the last batch of blocks, multiplied by 2 as a safety margin for + // future blocks. + let adjusted_average_block_size = max((block_batch.size * 2) / block_batch.blocks.len(), 1); + // Set the amount of blocks to request equal to our target batch size divided by the adjusted_average_block_size. + // Capping the amount at the maximum allowed in a single request. self.amount_of_blocks_to_request = min( - max(self.config.target_batch_size / average_block_size, 1), + max( + self.config.target_batch_size / adjusted_average_block_size, + 1, + ), 100, ); + + // Make sure the amount does not increase too quickly if we get some small blocks so limit the growth to 1.5x the last + // batch size. + self.amount_of_blocks_to_request = min( + self.amount_of_blocks_to_request, + (old_amount_of_blocks_to_request * 3).div_ceil(2), + ); + self.amount_of_blocks_to_request_updated_at = start_height; } diff --git a/p2p/cuprate-p2p/src/lib.rs b/p2p/cuprate-p2p/src/lib.rs index 7ba02f79..59221a25 100644 --- a/p2p/cuprate-p2p/src/lib.rs +++ b/p2p/cuprate-p2p/src/lib.rs @@ -151,7 +151,8 @@ where #[derive(Clone)] pub struct NetworkInterface<N: NetworkZone> { /// A pool of free connected peers. - pool: Arc<client_pool::ClientPool<N>>, + // TODO: remove pub + pub pool: Arc<client_pool::ClientPool<N>>, /// A [`Service`] that allows broadcasting to all connected peers. broadcast_svc: BroadcastSvc<N>, /// A [`watch`] channel that contains the highest seen cumulative difficulty and other info