From 08f1abb9df2e209fdef478e86a658a7655e77ee6 Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Sun, 9 Jun 2024 02:38:00 +0100 Subject: [PATCH] add more docs --- p2p/cuprate-p2p/src/block_downloader.rs | 151 ++++++++++++++++++------ p2p/cuprate-p2p/src/constants.rs | 8 ++ 2 files changed, 120 insertions(+), 39 deletions(-) diff --git a/p2p/cuprate-p2p/src/block_downloader.rs b/p2p/cuprate-p2p/src/block_downloader.rs index 0ee1b8bd..d2f559ce 100644 --- a/p2p/cuprate-p2p/src/block_downloader.rs +++ b/p2p/cuprate-p2p/src/block_downloader.rs @@ -10,6 +10,7 @@ use std::{ use monero_serai::{block::Block, transaction::Transaction}; use rand::prelude::*; use rayon::prelude::*; +use tokio::time::timeout; use tokio::{ task::JoinSet, time::{interval, MissedTickBehavior}, @@ -34,6 +35,7 @@ use crate::{ }; mod chain_tracker; +use crate::constants::{CHIAN_ENTRY_REQUEST_TIMEOUT, MAX_BLOCK_BATCH_LEN}; use chain_tracker::{BlocksToRetrieve, ChainEntry, ChainTracker}; /// A downloaded batch of blocks. @@ -65,6 +67,8 @@ pub struct BlockDownloaderConfig { #[derive(Debug, thiserror::Error)] pub enum BlockDownloadError { + #[error("A request to a peer timed out.")] + TimedOut, #[error("The block buffer was closed.")] BufferWasClosed, #[error("The peers we requested data from did not have all the data.")] @@ -87,7 +91,7 @@ pub enum ChainSvcRequest { CompactHistory, /// A request to find the first unknown FindFirstUnknown(Vec<[u8; 32]>), - + /// A request for our current cumulative difficulty. CumulativeDifficulty, } @@ -95,26 +99,27 @@ pub enum ChainSvcRequest { pub enum ChainSvcResponse { /// The response for [`ChainSvcRequest::CompactHistory`] CompactHistory { + /// A list of blocks IDs in our chain, starting with the most recent block, all the way to the genesis block. + /// + /// These blocks should be in reverse chronological order and not every block is needed. block_ids: Vec<[u8; 32]>, cumulative_difficulty: u128, }, /// The response for [`ChainSvcRequest::FindFirstUnknown`], contains the index of the first unknown /// block. FindFirstUnknown(usize), - + /// The current cumulative difficulty of our chain. CumulativeDifficulty(u128), } -/// # Block Downloader -/// /// This function starts the block downloader and returns a [`BufferStream`] that will produce /// a sequential stream of blocks. /// /// The block downloader will pick the longest chain and will follow it for as long as possible, /// the blocks given from the [`BufferStream`] will be in order. /// -/// The block downloader may fail before the whole chain is downloaded. If this is the case you are -/// free to call this function again, so it can start the search again. +/// The block downloader may fail before the whole chain is downloaded. If this is the case you can +/// call this function again, so it can start the search again. pub fn download_blocks( client_pool: Arc>, peer_sync_svc: S, @@ -144,9 +149,13 @@ where buffer_stream } +/// A batch of blocks in the ready queue, waiting for previous blocks to come in so they can +/// be passed into the buffer. #[derive(Debug)] struct ReadyQueueBatch { + /// The start height of the batch. start_height: u64, + /// The batch of blocks. block_batch: BlockBatch, } @@ -166,35 +175,64 @@ impl PartialOrd for ReadyQueueBatch { impl Ord for ReadyQueueBatch { fn cmp(&self, other: &Self) -> Ordering { + // reverse the ordering so earlier blocks come first in a [`BinaryHeap`] self.start_height.cmp(&other.start_height).reverse() } } +/// # Block Downloader +/// +/// This is the block downloader, which finds a chain to follow and attempts to follow it adding the blocks +/// to a [`async_buffer`]. struct BlockDownloader { + /// The client pool. client_pool: Arc>, + /// The service that holds the peers sync states. peer_sync_svc: S, + /// The service that holds our current chain state. our_chain_svc: C, + /// The amount of blocks to request in the next batch. amount_of_blocks_to_request: usize, + /// The height at which `amount_of_blocks_to_request` was updated. amount_of_blocks_to_request_updated_at: u64, + /// The running block download tasks. + /// + /// Returns: + /// - The start height of the batch + /// - A result contains the batch or an error. #[allow(clippy::type_complexity)] block_download_tasks: JoinSet<( u64, Result<(ClientPoolDropGuard, BlockBatch), BlockDownloadError>, )>, + /// The running chain entry tasks. + /// + /// Returns a result of the chain entry or an error. #[allow(clippy::type_complexity)] chain_entry_task: JoinSet, ChainEntry), BlockDownloadError>>, + /// The current inflight requests. + /// + /// This a map of batch start heights to block ids and related information of the batch. inflight_requests: BTreeMap>, - ready_batches: BinaryHeap, - failed_batches: BinaryHeap>, + /// A queue of ready batches. + ready_batches: BinaryHeap, + /// The size, in bytes, of all the batches in `ready_batches`. ready_batches_size: usize, + /// A queue of failed batches start height's that should be retried. + /// + /// Wrapped in [`Reverse`] so we prioritize early batches. + failed_batches: BinaryHeap>, + + /// The [`BufferAppender`] that gives blocks to Cuprate. buffer_appender: BufferAppender, + /// The [`BlockDownloaderConfig`]. config: BlockDownloaderConfig, } @@ -375,8 +413,14 @@ where { let history = chain_tracker.get_simple_history(); - self.chain_entry_task - .spawn(request_chain_entry_from_peer(client, history)); + self.chain_entry_task.spawn(async move { + timeout( + CHIAN_ENTRY_REQUEST_TIMEOUT, + request_chain_entry_from_peer(client, history), + ) + .await + .map_err(|_| BlockDownloadError::TimedOut)? + }); return; } @@ -427,8 +471,14 @@ where Ok(()) } + /// Checks if we have batches ready to send down the [`BufferAppender`]. + /// + /// We guarantee that blocks sent down the buffer are sent in the correct order. async fn push_new_blocks(&mut self) -> Result<(), BlockDownloadError> { while let Some(ready_batch) = self.ready_batches.peek() { + // Check if this ready batch's start height is higher than the lowest in flight request. + // If there is a lower start height in the inflight requests then this is _not_ the next batch + // to send down the buffer. if self .inflight_requests .first_key_value() @@ -439,6 +489,8 @@ where break; } + // Our next ready batch is older (lower height) than the oldest in flight, push it down the + // buffer. let ready_batch = self.ready_batches.pop().unwrap(); let size = ready_batch.block_batch.size; @@ -448,11 +500,14 @@ where .send(ready_batch.block_batch, size) .await .map_err(|_| BlockDownloadError::BufferWasClosed)?; + + // Loops back to check the next oldest ready batch. } Ok(()) } + /// Handles a response to a request to get blocks from a peer. async fn handle_download_batch_res( &mut self, start_height: u64, @@ -461,7 +516,10 @@ where ) -> Result<(), BlockDownloadError> { match res { Err(e) => { + // TODO: (IMPORTANT) check if this failure is from the peer who told us about the batch, if so ban them. + if matches!(e, BlockDownloadError::ChainInvalid) { + // If the chain was invalid ban the peer who told us about it. self.inflight_requests .get(&start_height) .inspect(|entry| entry.peer_who_told_us_handle.ban_peer(LONG_BAN)); @@ -469,6 +527,7 @@ where return Err(e); } + // Add the request to the failed list. if self.inflight_requests.contains_key(&start_height) { self.failed_batches.push(Reverse(start_height)) } @@ -476,47 +535,37 @@ where Ok(()) } Ok((client, block_batch)) => { + // Remove the batch from the inflight batches. if self.inflight_requests.remove(&start_height).is_none() { + // If it was already retrieved then there is nothing else to do. + // TODO: should we drop this peer for being slow? self.handle_free_client(chain_tracker, client).await; return Ok(()); }; + // If the batch is higher than the last time we updated `amount_of_blocks_to_request`, update it + // again. if start_height > self.amount_of_blocks_to_request_updated_at { - let old_amount_of_blocks_to_request = self.amount_of_blocks_to_request; - - // The average block size of the last batch of blocks, multiplied by 2 as a safety margin for - // future blocks. - let adjusted_average_block_size = - max((block_batch.size * 2) / block_batch.blocks.len(), 1); - - // Set the amount of blocks to request equal to our target batch size divided by the adjusted_average_block_size. - // Capping the amount at the maximum allowed in a single request. - self.amount_of_blocks_to_request = min( - max( - self.config.target_batch_size / adjusted_average_block_size, - 1, - ), - 100, - ); - - // Make sure the amount does not increase too quickly if we get some small blocks so limit the growth to 1.5x the last - // batch size. - self.amount_of_blocks_to_request = min( - self.amount_of_blocks_to_request, - (old_amount_of_blocks_to_request * 3).div_ceil(2), + self.amount_of_blocks_to_request = calculate_next_block_batch_size( + block_batch.size, + block_batch.blocks.len(), + self.config.target_batch_size, ); self.amount_of_blocks_to_request_updated_at = start_height; } + // Add the batch to the queue of ready batches. self.ready_batches_size += block_batch.size; self.ready_batches.push(ReadyQueueBatch { start_height, block_batch, }); + // Attempt to push new batches to the buffer. self.push_new_blocks().await?; + // Give more work to this client. self.handle_free_client(chain_tracker, client).await; Ok(()) } @@ -565,6 +614,32 @@ where } } +/// Calculates the next amount of blocks to request in a batch. +/// +/// Parameters: +/// `previous_batch_size` is the size, in bytes, of the last batch, +/// `previous_batch_len` is the amount of blocks in the last batch, +/// `target_batch_size` is the target size, in bytes, of a batch. +fn calculate_next_block_batch_size( + previous_batch_size: usize, + previous_batch_len: usize, + target_batch_size: usize, +) -> usize { + // The average block size of the last batch of blocks, multiplied by 2 as a safety margin for + // future blocks. + let adjusted_average_block_size = max((previous_batch_size * 2) / previous_batch_len, 1); + + // Set the amount of blocks to request equal to our target batch size divided by the adjusted_average_block_size. + let next_batch_len = max(target_batch_size / adjusted_average_block_size, 1); + + // Cap the amount of growth to 1.5x the previous batch len, to prevent a small block casing us to request + // a huge amount of blocks. + let next_batch_len = min(next_batch_len, (previous_batch_len * 3).div_ceil(2)); + + // Cap the length to the maximum allowed. + min(next_batch_len, MAX_BLOCK_BATCH_LEN) +} + async fn request_batch_from_peer( mut client: ClientPoolDropGuard, ids: ByteArrayVec<32>, @@ -615,6 +690,7 @@ async fn request_batch_from_peer( .number() .is_some_and(|height| height == expected_height) { + // TODO: remove this panic, I have it this error though which is why it's here. panic!("{} {}", expected_height, block.number().unwrap()); // This peer probably did nothing wrong, it was the peer who told us this blockID which // is misbehaving. @@ -760,24 +836,21 @@ where break; }; let cloned_req = req.clone(); - futs.spawn(async move { + futs.spawn(timeout(CHIAN_ENTRY_REQUEST_TIMEOUT, async move { let PeerResponse::GetChain(chain_res) = next_peer.ready().await?.call(cloned_req).await? else { panic!("connection task returned wrong response!"); }; - Ok((chain_res, next_peer.info.id, next_peer.info.handle.clone())) - }); + Ok::<_, tower::BoxError>((chain_res, next_peer.info.id, next_peer.info.handle.clone())) + })); } let mut res: Option<(ChainResponse, InternalPeerID<_>, ConnectionHandle)> = None; while let Some(task_res) = futs.join_next().await { - let Ok(task_res): Result< - (ChainResponse, InternalPeerID<_>, ConnectionHandle), - tower::BoxError, - > = task_res.unwrap() else { + let Ok(Ok(task_res)) = task_res.unwrap() else { continue; }; diff --git a/p2p/cuprate-p2p/src/constants.rs b/p2p/cuprate-p2p/src/constants.rs index 6e6ce0b3..81d2b520 100644 --- a/p2p/cuprate-p2p/src/constants.rs +++ b/p2p/cuprate-p2p/src/constants.rs @@ -43,6 +43,14 @@ pub(crate) const INBOUND_CONNECTION_COOL_DOWN: Duration = Duration::from_millis( /// The initial amount of chain requests to send to find the best chain to sync from. pub(crate) const INITIAL_CHAIN_REQUESTS_TO_SEND: usize = 3; +/// The enforced maximum amount of blocks to request in a batch. +/// +/// Requesting more than this will cause the peer to disconnect and potentially lead to bans. +pub(crate) const MAX_BLOCK_BATCH_LEN: usize = 100; + +/// The timeout for a chain entry request. +pub(crate) const CHIAN_ENTRY_REQUEST_TIMEOUT: Duration = Duration::from_secs(10); + #[cfg(test)] mod tests { use super::*;