From 1ae87734fec79d8619adf94229a807702c6f8783 Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Wed, 29 May 2024 23:47:19 +0100 Subject: [PATCH] dynamic batch sizes --- p2p/cuprate-p2p/src/bin/test_init.rs | 44 ++--- p2p/cuprate-p2p/src/block_downloader.rs | 156 +++++++++++++++--- .../src/block_downloader/chain_tracker.rs | 8 + p2p/monero-p2p/src/client.rs | 1 + 4 files changed, 163 insertions(+), 46 deletions(-) diff --git a/p2p/cuprate-p2p/src/bin/test_init.rs b/p2p/cuprate-p2p/src/bin/test_init.rs index f62d973a..87c4c41c 100644 --- a/p2p/cuprate-p2p/src/bin/test_init.rs +++ b/p2p/cuprate-p2p/src/bin/test_init.rs @@ -16,6 +16,7 @@ use std::time::Duration; use tokio::time::sleep; use tower::Service; use tracing::Level; +use tracing_subscriber::fmt::time::Uptime; #[derive(Clone)] pub struct DummyCoreSyncSvc; @@ -104,6 +105,7 @@ impl Service for OurChainSvc { async fn main() { tracing_subscriber::fmt() .with_max_level(Level::DEBUG) + .with_timer(Uptime::default()) .init(); let config = P2PConfig:: { @@ -127,28 +129,28 @@ async fn main() { .await .unwrap(); - sleep(Duration::from_secs(5)).await; + sleep(Duration::from_secs(55)).await; - let mut buffer = download_blocks( - net.pool.clone(), - net.sync_states_svc.clone(), - OurChainSvc, - BlockDownloaderConfig { - buffer_size: 50_000_000, - in_progress_queue_size: 5_000_000, - check_client_pool_interval: Duration::from_secs(30), - target_batch_size: 100_000, - initial_batch_size: 100, - }, - ); + loop { + let mut buffer = download_blocks( + net.pool.clone(), + net.sync_states_svc.clone(), + OurChainSvc, + BlockDownloaderConfig { + buffer_size: 50_000_000, + in_progress_queue_size: 30_000_000, + check_client_pool_interval: Duration::from_secs(15), + target_batch_size: 2_000_000, + initial_batch_size: 100, + }, + ); - while let Some(entry) = buffer.next().await { - tracing::info!( - "height: {}, amount{}", - entry.blocks[0].0.number().unwrap(), - entry.blocks.len() - ) + while let Some(entry) = buffer.next().await { + tracing::info!( + "height: {}, amount{}", + entry.blocks[0].0.number().unwrap(), + entry.blocks.len() + ) + } } - - sleep(Duration::from_secs(999999999)).await; } diff --git a/p2p/cuprate-p2p/src/block_downloader.rs b/p2p/cuprate-p2p/src/block_downloader.rs index a1ba33ae..bd997c84 100644 --- a/p2p/cuprate-p2p/src/block_downloader.rs +++ b/p2p/cuprate-p2p/src/block_downloader.rs @@ -3,7 +3,7 @@ mod chain_tracker; -use std::cmp::{Ordering, Reverse}; +use std::cmp::{max, min, Ordering, Reverse}; use std::collections::{BTreeMap, BinaryHeap, HashSet, VecDeque}; use std::sync::Arc; use std::time::Duration; @@ -170,6 +170,7 @@ struct BlockDownloader { our_chain_svc: C, amount_of_blocks_to_request: usize, + amount_of_blocks_to_request_updated_at: u64, #[allow(clippy::type_complexity)] block_download_tasks: JoinSet<( @@ -183,6 +184,8 @@ struct BlockDownloader { ready_batches: BinaryHeap, failed_batches: BinaryHeap>, + ready_batches_size: usize, + buffer_appender: BufferAppender, config: BlockDownloaderConfig, @@ -210,23 +213,89 @@ where peer_sync_svc, our_chain_svc, amount_of_blocks_to_request: config.initial_batch_size, + amount_of_blocks_to_request_updated_at: 0, block_download_tasks: JoinSet::new(), chain_entry_task: JoinSet::new(), inflight_requests: BTreeMap::new(), ready_batches: BinaryHeap::new(), + ready_batches_size: 0, failed_batches: BinaryHeap::new(), buffer_appender, config, } } + async fn request_inflight_batch_again(&mut self, client: ClientPoolDropGuard) { + if self.inflight_requests.is_empty() { + panic!("We need requests inflight to be able to send the request again") + } + + let first_batch_requests_sent = self + .inflight_requests + .first_key_value() + .unwrap() + .1 + .requests_sent; + + if first_batch_requests_sent + == self + .inflight_requests + .last_key_value() + .unwrap() + .1 + .requests_sent + { + let mut first_batch = self.inflight_requests.first_entry().unwrap(); + + first_batch.get_mut().requests_sent += 1; + + // They should have the blocks so send the re-request to this peer. + let ids = first_batch.get().ids.clone(); + let start_height = first_batch.get().start_height; + + self.block_download_tasks.spawn(async move { + ( + start_height, + request_batch_from_peer(client, ids, start_height).await, + ) + }); + + return; + } + + let next_batch = self + .inflight_requests + .iter_mut() + .find(|(_, next_batch)| next_batch.requests_sent != first_batch_requests_sent) + .unwrap() + .1; + + next_batch.requests_sent += 1; + + // They should have the blocks so send the re-request to this peer. + let ids = next_batch.ids.clone(); + let start_height = next_batch.start_height; + + self.block_download_tasks.spawn(async move { + ( + start_height, + request_batch_from_peer(client, ids, start_height).await, + ) + }); + } + + /// Spawns a task to request blocks from the given peer. async fn request_block_batch( &mut self, chain_tracker: &mut ChainTracker, client: ClientPoolDropGuard, ) { + // First look to see if we have any failed requests. while let Some(failed_request) = self.failed_batches.peek() { + // Check if we still have the request that failed - another peer could have completed it after + // failure. if let Some(request) = self.inflight_requests.get(&failed_request.0) { + // Check if this peer has the blocks according to their pruning seed. if client .info .pruning_seed @@ -236,6 +305,7 @@ where CRYPTONOTE_MAX_BLOCK_HEIGHT, ) { + // They should have the blocks so send the re-request to this peer. let ids = request.ids.clone(); let start_height = request.start_height; @@ -245,6 +315,7 @@ where request_batch_from_peer(client, ids, start_height).await, ) }); + // Remove the failure, we have just handled it. self.failed_batches.pop(); return; @@ -252,10 +323,18 @@ where break; } else { + // We don't have the request in flight so remove the failure. self.failed_batches.pop(); } } + if self.ready_batches_size >= self.config.in_progress_queue_size { + self.request_inflight_batch_again(client).await; + return; + } + + // No failed requests that we can handle, request some new blocks. + let Some(block_entry_to_get) = chain_tracker .blocks_to_get(&client.info.pruning_seed, self.amount_of_blocks_to_request) else { @@ -337,6 +416,32 @@ where Ok(()) } + async fn push_new_blocks(&mut self) -> Result<(), BlockDownloadError> { + while let Some(ready_batch) = self.ready_batches.peek() { + if self + .inflight_requests + .first_key_value() + .is_some_and(|(&lowest_start_height, _)| { + ready_batch.start_height > lowest_start_height + }) + { + break; + } + + let ready_batch = self.ready_batches.pop().unwrap(); + + let size = ready_batch.block_batch.size; + self.ready_batches_size -= size; + + self.buffer_appender + .send(ready_batch.block_batch, size) + .await + .map_err(|_| BlockDownloadError::BufferWasClosed)?; + } + + Ok(()) + } + async fn handle_download_batch_res( &mut self, start_height: u64, @@ -357,28 +462,24 @@ where return Ok(()); }; + if start_height > self.amount_of_blocks_to_request_updated_at { + let average_block_size = + max((block_batch.size * 2) / block_batch.blocks.len(), 1); + + self.amount_of_blocks_to_request = min( + max(self.config.target_batch_size / average_block_size, 1), + 100, + ); + self.amount_of_blocks_to_request_updated_at = start_height; + } + + self.ready_batches_size += block_batch.size; self.ready_batches.push(ReadyQueueBatch { start_height, block_batch, }); - while let (Some(ready_batch), Some((&lowest_start_height, _))) = ( - self.ready_batches.peek(), - self.inflight_requests.first_key_value(), - ) { - if ready_batch.start_height > lowest_start_height { - break; - } - - let ready_batch = self.ready_batches.pop().unwrap(); - - let size = ready_batch.block_batch.size; - - self.buffer_appender - .send(ready_batch.block_batch, size) - .await - .map_err(|_| BlockDownloadError::BufferWasClosed)?; - } + self.push_new_blocks().await?; self.handle_free_client(chain_tracker, client).await; Ok(()) @@ -415,7 +516,7 @@ where match res { Ok((client, entry)) => { if chain_tracker.add_entry(entry).is_ok() { - self.chain_entry_task.abort_all(); + } self.handle_free_client(&mut chain_tracker, client).await; } @@ -465,6 +566,8 @@ async fn request_batch_from_peer( .blocks .into_par_iter() .map(|block_entry| { + let mut size = block_entry.block.len(); + let block = Block::read(&mut block_entry.block.as_ref()) .map_err(|_| BlockDownloadError::PeersResponseWasInvalid)?; @@ -476,8 +579,11 @@ async fn request_batch_from_peer( .txs .take_normal() .ok_or(BlockDownloadError::PeersResponseWasInvalid)? - .into_par_iter() - .map(|tx_blob| Transaction::read(&mut tx_blob.as_ref())) + .into_iter() + .map(|tx_blob| { + size += tx_blob.len(); + Transaction::read(&mut tx_blob.as_ref()) + }) .collect::, _>>() .map_err(|_| BlockDownloadError::PeersResponseWasInvalid)?; @@ -493,15 +599,15 @@ async fn request_batch_from_peer( return Err(BlockDownloadError::PeersResponseWasInvalid); } - Ok((block, txs)) + Ok(((block, txs), size)) }) - .collect::, _>>(); + .collect::, Vec<_>), _>>(); blocks }) .await; - let blocks = blocks.inspect_err(|e| { + let (blocks, sizes) = blocks.inspect_err(|e| { if matches!(e, BlockDownloadError::PeersResponseWasInvalid) { client.info.handle.ban_peer(MEDIUM_BAN); } @@ -513,7 +619,7 @@ async fn request_batch_from_peer( client, BlockBatch { blocks, - size: 0, + size: sizes.iter().sum(), peer_handle, }, )) diff --git a/p2p/cuprate-p2p/src/block_downloader/chain_tracker.rs b/p2p/cuprate-p2p/src/block_downloader/chain_tracker.rs index fde712bd..7b9fa634 100644 --- a/p2p/cuprate-p2p/src/block_downloader/chain_tracker.rs +++ b/p2p/cuprate-p2p/src/block_downloader/chain_tracker.rs @@ -29,6 +29,8 @@ pub struct BlocksToRetrieve { pub peer_who_told_us: InternalPeerID, /// The peer who told us about this batch's handle. pub peer_who_told_us_handle: ConnectionHandle, + /// The number of requests sent for this batch. + pub requests_sent: usize, } pub enum ChainTrackerError { @@ -101,6 +103,11 @@ impl ChainTracker { return Err(ChainTrackerError::NewEntryIsInvalid); } + if chain_entry.ids.len() == 1 { + // TODO: properly handle this + return Err(ChainTrackerError::NewEntryDoesNotFollowChain); + } + if self .entries .back() @@ -165,6 +172,7 @@ impl ChainTracker { start_height: self.first_height, peer_who_told_us: entry.peer, peer_who_told_us_handle: entry.handle.clone(), + requests_sent: 0, }; self.first_height += u64::try_from(end_idx).unwrap(); diff --git a/p2p/monero-p2p/src/client.rs b/p2p/monero-p2p/src/client.rs index 02deae51..506e728e 100644 --- a/p2p/monero-p2p/src/client.rs +++ b/p2p/monero-p2p/src/client.rs @@ -158,6 +158,7 @@ impl Service for Client { permit: Some(permit), }; + // TODO: this can panic if the channel was closed between poll_ready and this. self.connection_tx .try_send(req) .map_err(|_| ())