From e81593beffe891f9be2ab9bab71824f150edd8ef Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Tue, 11 Jun 2024 02:39:53 +0100 Subject: [PATCH] clean up API and add more docs --- p2p/cuprate-p2p/src/bin/test_init.rs | 4 +- p2p/cuprate-p2p/src/block_downloader.rs | 214 +++++++++++------- .../src/block_downloader/chain_tracker.rs | 14 +- p2p/cuprate-p2p/src/constants.rs | 3 + p2p/cuprate-p2p/src/lib.rs | 32 ++- 5 files changed, 170 insertions(+), 97 deletions(-) diff --git a/p2p/cuprate-p2p/src/bin/test_init.rs b/p2p/cuprate-p2p/src/bin/test_init.rs index 6f9153ec..95126df8 100644 --- a/p2p/cuprate-p2p/src/bin/test_init.rs +++ b/p2p/cuprate-p2p/src/bin/test_init.rs @@ -153,9 +153,7 @@ async fn main() { sleep(Duration::from_secs(15)).await; - let mut buffer = download_blocks( - net.pool.clone(), - net.sync_states_svc.clone(), + let mut buffer = net.block_downloader( OurChainSvc, BlockDownloaderConfig { buffer_size: 50_000_000, diff --git a/p2p/cuprate-p2p/src/block_downloader.rs b/p2p/cuprate-p2p/src/block_downloader.rs index 89d137d7..2f11ab41 100644 --- a/p2p/cuprate-p2p/src/block_downloader.rs +++ b/p2p/cuprate-p2p/src/block_downloader.rs @@ -1,5 +1,9 @@ //! # Block Downloader //! +//! This module contains the block downloader, which finds a chain to download from our connected peers +//! and downloads it. +//! +//! The block downloader is started by [`download_blocks`]. use std::{ cmp::{max, min, Ordering, Reverse}, collections::{BTreeMap, BinaryHeap, HashSet}, @@ -11,10 +15,9 @@ use std::{ use monero_serai::{block::Block, transaction::Transaction}; use rand::prelude::*; use rayon::prelude::*; -use tokio::time::timeout; use tokio::{ task::JoinSet, - time::{interval, MissedTickBehavior}, + time::{interval, timeout, MissedTickBehavior}, }; use tower::{Service, ServiceExt}; @@ -32,14 +35,15 @@ use monero_wire::protocol::{ChainRequest, ChainResponse, GetObjectsRequest}; use crate::{ client_pool::{ClientPool, ClientPoolDropGuard}, - constants::{INITIAL_CHAIN_REQUESTS_TO_SEND, LONG_BAN, MEDIUM_BAN}, + constants::{ + CHAIN_ENTRY_REQUEST_TIMEOUT, INITIAL_CHAIN_REQUESTS_TO_SEND, LONG_BAN, + MAX_BLOCKS_IDS_IN_CHAIN_ENTRY, MAX_BLOCK_BATCH_LEN, MAX_DOWNLOAD_FAILURES, + MAX_TRANSACTION_BLOB_SIZE, MEDIUM_BAN, + }, }; mod chain_tracker; -use crate::constants::{ - CHAIN_ENTRY_REQUEST_TIMEOUT, MAX_BLOCKS_IDS_IN_CHAIN_ENTRY, MAX_BLOCK_BATCH_LEN, - MAX_DOWNLOAD_FAILURES, MAX_TRANSACTION_BLOB_SIZE, -}; +use crate::constants::EMPTY_CHAIN_ENTIES_BEFORE_TOP_ASSUMED; use chain_tracker::{BlocksToRetrieve, ChainEntry, ChainTracker}; /// A downloaded batch of blocks. @@ -59,7 +63,7 @@ pub struct BlockDownloaderConfig { /// The size of the buffer between the block downloader and the place which /// is consuming the downloaded blocks. pub buffer_size: usize, - /// The size of the in progress queue at which we stop requesting more blocks. + /// The size of the in progress queue (in bytes) at which we stop requesting more blocks. pub in_progress_queue_size: usize, /// The [`Duration`] between checking the client pool for free peers. pub check_client_pool_interval: Duration, @@ -110,8 +114,8 @@ pub enum ChainSvcResponse { cumulative_difficulty: u128, }, /// The response for [`ChainSvcRequest::FindFirstUnknown`], contains the index of the first unknown - /// block. - FindFirstUnknown(usize), + /// block and its expected height. + FindFirstUnknown(usize, u64), /// The current cumulative difficulty of our chain. CumulativeDifficulty(u128), } @@ -179,7 +183,7 @@ impl PartialOrd for ReadyQueueBatch { impl Ord for ReadyQueueBatch { fn cmp(&self, other: &Self) -> Ordering { - // reverse the ordering so earlier blocks come first in a [`BinaryHeap`] + // reverse the ordering so older blocks (lower height) come first in a [`BinaryHeap`] self.start_height.cmp(&other.start_height).reverse() } } @@ -188,12 +192,31 @@ impl Ord for ReadyQueueBatch { /// /// This is the block downloader, which finds a chain to follow and attempts to follow it, adding the /// downloaded blocks to an [`async_buffer`]. +/// +/// ## Implementation Details +/// +/// The first step to downloading blocks is to find a chain to follow, this is done by [`initial_chain_search`], +/// docs can be found on that function for details on how this is done. +/// +/// With an initial list of block IDs to follow the block downloader will then look for available peers +/// to download blocks from. +/// +/// For each peer we will then allocate a batch of blocks for them to retrieve, as these blocks come in +/// we add them to queue for pushing into the [`async_buffer`], once we have the oldest block downloaded +/// we send it into the buffer, repeating this until the oldest current block is still being downloaded. +/// +/// When a peer has finished downloading blocks we add it to our list of ready peers, so it can be used to +/// request more data from. +/// +/// Ready peers will either: +/// - download the next batch of blocks +/// - request the next chain entry +/// - download an already requested batch of blocks this might happen due to an error in the previous request +/// or because the queue of ready blocks is too large, so we need the oldest block to clear it. struct BlockDownloader { /// The client pool. client_pool: Arc>, - /// Peers that have pruned the blocks we are currently downloading. - /// - /// These peers are ready for when we reach blocks that they have not pruned. + /// Peers that are ready to handle requests. pending_peers: BTreeMap>>, /// The service that holds the peers sync states. @@ -285,14 +308,16 @@ where } } + /// Checks if we can make use of any peers that are currently pending requests. async fn check_pending_peers(&mut self, chain_tracker: &mut ChainTracker) { - // HACK: The borrow checker doesn't like the following code, if we don't do this. + // HACK: The borrow checker doesn't like the following code if we don't do this. let mut pending_peers = mem::take(&mut self.pending_peers); for (_, peers) in pending_peers.iter_mut() { while let Some(peer) = peers.pop() { if let Some(peer) = self.try_handle_free_client(chain_tracker, peer).await { - // This peer is ok however it does not have the data we currently need. + // This peer is ok however it does not have the data we currently need, this will only happen + // because of it's pruning seed so just skip over all peers with this pruning seed. peers.push(peer); break; } @@ -304,6 +329,12 @@ where self.pending_peers = pending_peers; } + /// Attempts to send another request for an inflight batch + /// + /// This function will find the oldest batch that has had the least amount of requests sent for it + /// and will then attempt to send another request for it. + /// + /// Returns the [`ClientPoolDropGuard`] back if it doesn't have the batch according to it's pruning seed. async fn request_inflight_batch_again( &mut self, client: ClientPoolDropGuard, @@ -312,8 +343,33 @@ where panic!("We need requests inflight to be able to send the request again") } - // TODO: check pruning seeds. + // The closure that checks if a peer has a batch according to it's pruning seed and either requests it or + // returns the peer. + let try_request_batch = |batch: &mut BlocksToRetrieve| { + if !client_has_block_in_range( + &client.info.pruning_seed, + batch.start_height, + batch.ids.len(), + ) { + return Some(client); + } + batch.requests_sent += 1; + + let ids = batch.ids.clone(); + let start_height = batch.start_height; + + self.block_download_tasks.spawn(async move { + ( + start_height, + request_batch_from_peer(client, ids, start_height).await, + ) + }); + + None + }; + + // The amount of requests sent for the oldest (lowest height) batch. let first_batch_requests_sent = self .inflight_requests .first_key_value() @@ -321,6 +377,8 @@ where .1 .requests_sent; + // If the first batch has the same amount of requests sent as the oldest batch then send another + // request for the first batch. if first_batch_requests_sent == self .inflight_requests @@ -333,30 +391,10 @@ where let first_batch_mut = first_batch.get_mut(); - if !client_has_block_in_range( - &client.info.pruning_seed, - first_batch_mut.start_height, - first_batch_mut.ids.len(), - ) { - return Some(client); - } - - first_batch_mut.requests_sent += 1; - - // They should have the blocks so send the re-request to this peer. - let ids = first_batch_mut.ids.clone(); - let start_height = first_batch_mut.start_height; - - self.block_download_tasks.spawn(async move { - ( - start_height, - request_batch_from_peer(client, ids, start_height).await, - ) - }); - - return None; + return try_request_batch(first_batch_mut); } + // Otherwise find the point where the split in number of requests sent happen. let next_batch = self .inflight_requests .iter_mut() @@ -364,31 +402,15 @@ where .unwrap() .1; - if !client_has_block_in_range( - &client.info.pruning_seed, - next_batch.start_height, - next_batch.ids.len(), - ) { - return Some(client); - } - - next_batch.requests_sent += 1; - - // They should have the blocks so send the re-request to this peer. - let ids = next_batch.ids.clone(); - let start_height = next_batch.start_height; - - self.block_download_tasks.spawn(async move { - ( - start_height, - request_batch_from_peer(client, ids, start_height).await, - ) - }); - - None + try_request_batch(next_batch) } /// Spawns a task to request blocks from the given peer. + /// + /// The batch requested will depend on our current state, failed batches will be prioritised. + /// + /// Returns the [`ClientPoolDropGuard`] back if it doesn't have the data we currently need according + /// to it's pruning seed. async fn request_block_batch( &mut self, chain_tracker: &mut ChainTracker, @@ -400,15 +422,11 @@ where // failure. if let Some(request) = self.inflight_requests.get(&failed_request.0) { // Check if this peer has the blocks according to their pruning seed. - if client - .info - .pruning_seed - .has_full_block(request.start_height, CRYPTONOTE_MAX_BLOCK_HEIGHT) - && client.info.pruning_seed.has_full_block( - request.start_height + u64::try_from(request.ids.len()).unwrap(), - CRYPTONOTE_MAX_BLOCK_HEIGHT, - ) - { + if client_has_block_in_range( + &client.info.pruning_seed, + request.start_height, + request.ids.len(), + ) { // They should have the blocks so send the re-request to this peer. let ids = request.ids.clone(); let start_height = request.start_height; @@ -432,6 +450,7 @@ where } } + // If our ready queue is too large send duplicate requests for the blocks we are waiting on. if self.ready_batches_size >= self.config.in_progress_queue_size { return self.request_inflight_batch_again(client).await; } @@ -462,14 +481,26 @@ where None } + /// Attempts to give work to a free client. + /// + /// This function will use our current state to decide if we should send a request for a chain entry + /// or if we should request a batch of blocks. + /// + /// Returns the [`ClientPoolDropGuard`] back if it doesn't have the data we currently need according + /// to it's pruning seed. async fn try_handle_free_client( &mut self, chain_tracker: &mut ChainTracker, client: ClientPoolDropGuard, ) -> Option> { + // We send 2 requests, so if one of them is slow/ doesn't have the next chain we still have a backup. if self.chain_entry_task.len() < 2 - && self.amount_of_empty_chain_entries <= 5 + // If we have had too many failures then assume the tip has been found so no more chain entries. + && self.amount_of_empty_chain_entries <= EMPTY_CHAIN_ENTIES_BEFORE_TOP_ASSUMED + // Check we have a nice buffer of pending block IDs to retrieve, we don't want to be waiting around + // for a chain entry. && chain_tracker.block_requests_queued(self.amount_of_blocks_to_request) < 500 + // Make sure this peer actually has the chain. && chain_tracker.should_ask_for_next_chain_entry(&client.info.pruning_seed) { let history = chain_tracker.get_simple_history(); @@ -486,9 +517,11 @@ where return None; } + // Request a batch of blocks instead. self.request_block_batch(chain_tracker, client).await } + /// Checks the [`ClientPool`] for free peers. async fn check_for_free_clients( &mut self, chain_tracker: &mut ChainTracker, @@ -667,7 +700,7 @@ where _ = check_client_pool_interval.tick() => { self.check_for_free_clients(&mut chain_tracker).await?; - if self.block_download_tasks.is_empty() && self.amount_of_empty_chain_entries > 5 { + if self.inflight_requests.is_empty() && self.amount_of_empty_chain_entries >= EMPTY_CHAIN_ENTIES_BEFORE_TOP_ASSUMED { return Ok(()) } } @@ -676,7 +709,7 @@ where self.handle_download_batch_res(start_height, res, &mut chain_tracker).await?; - if self.block_download_tasks.is_empty() && self.amount_of_empty_chain_entries > 5 { + if self.inflight_requests.is_empty() && self.amount_of_empty_chain_entries >= EMPTY_CHAIN_ENTIES_BEFORE_TOP_ASSUMED { return Ok(()) } } @@ -687,7 +720,7 @@ where self.amount_of_empty_chain_entries += 1; } - if chain_tracker.add_entry(entry).is_ok() { + if chain_tracker.add_entry(entry).is_ok() { self.amount_of_empty_chain_entries = 0; } @@ -791,6 +824,7 @@ async fn request_batch_from_peer( } // Check the height lines up as expected. + // This must happen after the hash check. if !block .number() .is_some_and(|height| height == expected_height) @@ -908,6 +942,10 @@ async fn request_chain_entry_from_peer( Ok((client, entry)) } +/// Initial chain search, this function pulls [`INITIAL_CHAIN_REQUESTS_TO_SEND`] peers from the [`ClientPool`] +/// and sends chain requests to all of them. +/// +/// We then wait for their response and choose the peer who claims the highest cumulative difficulty. async fn initial_chain_search( client_pool: &Arc>, mut peer_sync_svc: S, @@ -917,6 +955,7 @@ where S: PeerSyncSvc, C: Service, { + // Get our history. let ChainSvcResponse::CompactHistory { block_ids, cumulative_difficulty, @@ -943,6 +982,7 @@ where panic!("peer sync service sent wrong response."); }; + // Shuffle the list to remove any possibility of peers being able to prioritize getting picked. peers.shuffle(&mut thread_rng()); let mut peers = client_pool.borrow_clients(&peers); @@ -954,10 +994,12 @@ where prune: false, }); + // Send the requests. while futs.len() < INITIAL_CHAIN_REQUESTS_TO_SEND { let Some(mut next_peer) = peers.next() else { break; }; + let cloned_req = req.clone(); futs.spawn(timeout(CHAIN_ENTRY_REQUEST_TIMEOUT, async move { let PeerResponse::GetChain(chain_res) = @@ -972,6 +1014,7 @@ where let mut res: Option<(ChainResponse, InternalPeerID<_>, ConnectionHandle)> = None; + // Wait for the peers responses. while let Some(task_res) = futs.join_next().await { let Ok(Ok(task_res)) = task_res.unwrap() else { continue; @@ -979,12 +1022,14 @@ where match &mut res { Some(res) => { + // res has already been set, replace it if this peer claims higher cumulative difficulty if res.0.cumulative_difficulty() < task_res.0.cumulative_difficulty() { - let _ = std::mem::replace(res, task_res); + let _ = mem::replace(res, task_res); } } None => { - let _ = std::mem::replace(&mut res, Some(task_res)); + // res has not been set, set it now; + res = Some(task_res); } } } @@ -994,11 +1039,11 @@ where }; let hashes: Vec<[u8; 32]> = (&chain_res.m_block_ids).into(); - let start_height = chain_res.start_height; // drop this to deallocate the [`Bytes`]. drop(chain_res); - let ChainSvcResponse::FindFirstUnknown(first_unknown) = our_chain_svc + // Find the first unknown block in the batch. + let ChainSvcResponse::FindFirstUnknown(first_unknown, expected_height) = our_chain_svc .ready() .await? .call(ChainSvcRequest::FindFirstUnknown(hashes.clone())) @@ -1007,22 +1052,25 @@ where panic!("chain service sent wrong response."); }; + // The peer must send at least one block we already know. if first_unknown == 0 { peer_handle.ban_peer(MEDIUM_BAN); return Err(BlockDownloadError::PeerSentNoOverlappingBlocks); } + // We know all the blocks already + // TODO: The peer could still be on a different chain, however the chain might just be too far split. + if first_unknown == hashes.len() { + return Err(BlockDownloadError::FailedToFindAChainToFollow); + } + let first_entry = ChainEntry { ids: hashes[first_unknown..].to_vec(), peer: peer_id, handle: peer_handle, }; - let tracker = ChainTracker::new( - first_entry, - start_height + u64::try_from(first_unknown).unwrap(), - our_genesis, - ); + let tracker = ChainTracker::new(first_entry, expected_height, our_genesis); Ok(tracker) } diff --git a/p2p/cuprate-p2p/src/block_downloader/chain_tracker.rs b/p2p/cuprate-p2p/src/block_downloader/chain_tracker.rs index 5221dc81..19d70110 100644 --- a/p2p/cuprate-p2p/src/block_downloader/chain_tracker.rs +++ b/p2p/cuprate-p2p/src/block_downloader/chain_tracker.rs @@ -1,6 +1,7 @@ -use fixed_bytes::ByteArrayVec; use std::{cmp::min, collections::VecDeque}; +use fixed_bytes::ByteArrayVec; + use monero_p2p::{client::InternalPeerID, handles::ConnectionHandle, NetworkZone}; use monero_pruning::{PruningSeed, CRYPTONOTE_MAX_BLOCK_HEIGHT}; @@ -55,6 +56,7 @@ pub struct ChainTracker { } impl ChainTracker { + /// Creates a new chain tracker. pub fn new(new_entry: ChainEntry, first_height: u64, our_genesis: [u8; 32]) -> Self { let top_seen_hash = *new_entry.ids.last().unwrap(); let mut entries = VecDeque::with_capacity(1); @@ -96,8 +98,8 @@ impl ChainTracker { .sum() } + /// Attempts to add an incoming [`ChainEntry`] to the chain tracker. pub fn add_entry(&mut self, mut chain_entry: ChainEntry) -> Result<(), ChainTrackerError> { - // TODO: check chain entries length. if chain_entry.ids.is_empty() { // The peer must send at lest one overlapping block. chain_entry.handle.ban_peer(MEDIUM_BAN); @@ -105,7 +107,6 @@ impl ChainTracker { } if chain_entry.ids.len() == 1 { - // TODO: properly handle this return Err(ChainTrackerError::NewEntryDoesNotFollowChain); } @@ -117,8 +118,6 @@ impl ChainTracker { return Err(ChainTrackerError::NewEntryDoesNotFollowChain); } - tracing::warn!("len: {}", chain_entry.ids.len()); - let new_entry = ChainEntry { // ignore the first block - we already know it. ids: chain_entry.ids.split_off(1), @@ -133,6 +132,9 @@ impl ChainTracker { Ok(()) } + /// Returns a batch of blocks to request. + /// + /// The returned batches length will be less than or equal to `max_blocks` pub fn blocks_to_get( &mut self, pruning_seed: &PruningSeed, @@ -142,8 +144,6 @@ impl ChainTracker { return None; } - // TODO: make sure max block height is enforced. - let entry = self.entries.front_mut()?; // Calculate the ending index for us to get in this batch, will be the smallest out of `max_blocks`, the length of the batch or diff --git a/p2p/cuprate-p2p/src/constants.rs b/p2p/cuprate-p2p/src/constants.rs index 883b40f1..ef46773e 100644 --- a/p2p/cuprate-p2p/src/constants.rs +++ b/p2p/cuprate-p2p/src/constants.rs @@ -65,6 +65,9 @@ pub(crate) const MAX_BLOCKS_IDS_IN_CHAIN_ENTRY: usize = 25_000; pub(crate) const MAX_DOWNLOAD_FAILURES: usize = 3; +/// The amount of empty chain entries to receive before we assume we have found the top of the chain. +pub(crate) const EMPTY_CHAIN_ENTIES_BEFORE_TOP_ASSUMED: usize = 5; + #[cfg(test)] mod tests { use super::*; diff --git a/p2p/cuprate-p2p/src/lib.rs b/p2p/cuprate-p2p/src/lib.rs index b55c216e..634a040e 100644 --- a/p2p/cuprate-p2p/src/lib.rs +++ b/p2p/cuprate-p2p/src/lib.rs @@ -4,13 +4,14 @@ //! a certain [`NetworkZone`] use std::sync::Arc; +use async_buffer::BufferStream; use futures::FutureExt; use tokio::{ sync::{mpsc, watch}, task::JoinSet, }; use tokio_stream::wrappers::WatchStream; -use tower::{buffer::Buffer, util::BoxCloneService, ServiceExt}; +use tower::{buffer::Buffer, util::BoxCloneService, Service, ServiceExt}; use tracing::{instrument, Instrument, Span}; use monero_p2p::{ @@ -29,6 +30,9 @@ mod constants; mod inbound_server; mod sync_states; +use crate::block_downloader::{ + BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse, +}; pub use broadcast::{BroadcastRequest, BroadcastSvc}; use client_pool::ClientPoolDropGuard; pub use config::P2PConfig; @@ -146,8 +150,7 @@ where #[derive(Clone)] pub struct NetworkInterface { /// A pool of free connected peers. - // TODO: remove pub - pub pool: Arc>, + pool: Arc>, /// A [`Service`] that allows broadcasting to all connected peers. broadcast_svc: BroadcastSvc, /// A [`watch`] channel that contains the highest seen cumulative difficulty and other info @@ -158,7 +161,8 @@ pub struct NetworkInterface { make_connection_tx: mpsc::Sender, /// The address book service. address_book: BoxCloneService, AddressBookResponse, tower::BoxError>, - pub sync_states_svc: Buffer, PeerSyncRequest>, + /// The peers sync states service. + sync_states_svc: Buffer, PeerSyncRequest>, /// Background tasks that will be aborted when this interface is dropped. _background_tasks: Arc>, } @@ -169,6 +173,26 @@ impl NetworkInterface { self.broadcast_svc.clone() } + /// Starts the block downloader and returns a stream that will yield sequentially downloaded blocks. + pub fn block_downloader( + &self, + our_chain_service: C, + config: BlockDownloaderConfig, + ) -> BufferStream + where + C: Service + + Send + + 'static, + C::Future: Send + 'static, + { + block_downloader::download_blocks( + self.pool.clone(), + self.sync_states_svc.clone(), + our_chain_service, + config, + ) + } + /// Returns a stream which yields the highest seen sync state from a connected peer. pub fn top_sync_stream(&self) -> WatchStream { WatchStream::from_changes(self.top_block_watch.clone())