dynamic batch size changes

This commit is contained in:
Boog900 2024-06-01 02:02:45 +01:00
parent 334a62115b
commit 44c7ad4992
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
3 changed files with 54 additions and 12 deletions

View file

@ -7,6 +7,7 @@ use monero_address_book::AddressBookConfig;
use monero_p2p::network_zones::ClearNet;
use monero_p2p::services::{CoreSyncDataRequest, CoreSyncDataResponse};
use monero_p2p::{PeerRequest, PeerResponse};
use monero_wire::admin::TimedSyncResponse;
use std::future::Future;
use std::path::PathBuf;
use std::pin::Pin;
@ -64,8 +65,29 @@ impl Service<PeerRequest> for DummyPeerRequestHandlerSvc {
Poll::Ready(Ok(()))
}
fn call(&mut self, _: PeerRequest) -> Self::Future {
async move { Ok(PeerResponse::NA) }.boxed()
fn call(&mut self, req: PeerRequest) -> Self::Future {
async move {
Ok(match req {
PeerRequest::TimedSync(_) => PeerResponse::TimedSync(TimedSyncResponse {
payload_data: monero_wire::CoreSyncData {
cumulative_difficulty: 1,
cumulative_difficulty_top64: 0,
current_height: 1,
pruning_seed: 0,
top_id: hex::decode(
"418015bb9ae982a1975da7d79277c2705727a56894ba0fb246adaabb1f4632e3",
)
.unwrap()
.try_into()
.unwrap(),
top_version: 1,
},
local_peerlist_new: vec![],
}),
_ => PeerResponse::NA,
})
}
.boxed()
}
}
@ -101,7 +123,7 @@ impl Service<ChainSvcRequest> for OurChainSvc {
}
}
#[tokio::main]
#[tokio::main(flavor = "multi_thread", worker_threads = 12)]
async fn main() {
tracing_subscriber::fmt()
.with_max_level(Level::DEBUG)
@ -110,12 +132,12 @@ async fn main() {
let config = P2PConfig::<ClearNet> {
network: Default::default(),
outbound_connections: 64,
outbound_connections: 32,
extra_outbound_connections: 32,
max_inbound_connections: 128,
gray_peers_percent: 0.7,
server_config: None,
p2p_port: 18081,
p2p_port: 0,
rpc_port: 0,
address_book_config: AddressBookConfig {
max_white_list_length: 1000,
@ -129,7 +151,7 @@ async fn main() {
.await
.unwrap();
sleep(Duration::from_secs(55)).await;
sleep(Duration::from_secs(15)).await;
loop {
let mut buffer = download_blocks(
@ -139,8 +161,8 @@ async fn main() {
BlockDownloaderConfig {
buffer_size: 50_000_000,
in_progress_queue_size: 30_000_000,
check_client_pool_interval: Duration::from_secs(15),
target_batch_size: 2_000_000,
check_client_pool_interval: Duration::from_secs(20),
target_batch_size: 1_000_000,
initial_batch_size: 100,
},
);
@ -152,5 +174,7 @@ async fn main() {
entry.blocks.len()
)
}
sleep(Duration::from_secs(2)).await;
}
}

View file

@ -363,7 +363,7 @@ where
client: ClientPoolDropGuard<N>,
) {
if self.chain_entry_task.len() < 2
&& chain_tracker.block_requests_queued(self.amount_of_blocks_to_request) < 15
&& chain_tracker.block_requests_queued(self.amount_of_blocks_to_request) < 30
&& chain_tracker.should_ask_for_next_chain_entry(&client.info.pruning_seed)
{
let history = chain_tracker.get_simple_history();
@ -463,13 +463,30 @@ where
};
if start_height > self.amount_of_blocks_to_request_updated_at {
let average_block_size =
let old_amount_of_blocks_to_request = self.amount_of_blocks_to_request;
// The average block size of the last batch of blocks, multiplied by 2 as a safety margin for
// future blocks.
let adjusted_average_block_size =
max((block_batch.size * 2) / block_batch.blocks.len(), 1);
// Set the amount of blocks to request equal to our target batch size divided by the adjusted_average_block_size.
// Capping the amount at the maximum allowed in a single request.
self.amount_of_blocks_to_request = min(
max(self.config.target_batch_size / average_block_size, 1),
max(
self.config.target_batch_size / adjusted_average_block_size,
1,
),
100,
);
// Make sure the amount does not increase too quickly if we get some small blocks so limit the growth to 1.5x the last
// batch size.
self.amount_of_blocks_to_request = min(
self.amount_of_blocks_to_request,
(old_amount_of_blocks_to_request * 3).div_ceil(2),
);
self.amount_of_blocks_to_request_updated_at = start_height;
}

View file

@ -151,7 +151,8 @@ where
#[derive(Clone)]
pub struct NetworkInterface<N: NetworkZone> {
/// A pool of free connected peers.
pool: Arc<client_pool::ClientPool<N>>,
// TODO: remove pub
pub pool: Arc<client_pool::ClientPool<N>>,
/// A [`Service`] that allows broadcasting to all connected peers.
broadcast_svc: BroadcastSvc<N>,
/// A [`watch`] channel that contains the highest seen cumulative difficulty and other info