From fa99fd534379794ac4bdf5940b8957f9b84f7b7b Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Mon, 27 May 2024 01:00:55 +0100 Subject: [PATCH] move block downloader to struct --- p2p/cuprate-p2p/src/block_downloader.rs | 199 +++++++++++++++++++----- 1 file changed, 160 insertions(+), 39 deletions(-) diff --git a/p2p/cuprate-p2p/src/block_downloader.rs b/p2p/cuprate-p2p/src/block_downloader.rs index 49618434..58424c5d 100644 --- a/p2p/cuprate-p2p/src/block_downloader.rs +++ b/p2p/cuprate-p2p/src/block_downloader.rs @@ -3,18 +3,20 @@ mod chain_tracker; -use std::collections::{BTreeMap, BinaryHeap, VecDeque}; +use std::collections::{BTreeMap, BinaryHeap, HashSet, VecDeque}; use std::sync::Arc; use std::time::Duration; use monero_serai::{block::Block, transaction::Transaction}; use rand::prelude::*; +use rayon::prelude::*; use tokio::task::JoinSet; use tokio::time::{interval, MissedTickBehavior}; use tower::{Service, ServiceExt}; -use crate::block_downloader::chain_tracker::{ChainEntry, ChainTracker}; +use crate::block_downloader::chain_tracker::{BlocksToRetrieve, ChainEntry, ChainTracker}; use async_buffer::{BufferAppender, BufferStream}; +use cuprate_helper::asynch::rayon_spawn_async; use fixed_bytes::ByteArrayVec; use monero_p2p::client::InternalPeerID; use monero_p2p::{ @@ -22,7 +24,7 @@ use monero_p2p::{ services::{PeerSyncRequest, PeerSyncResponse}, NetworkZone, PeerRequest, PeerResponse, PeerSyncSvc, }; -use monero_wire::protocol::{ChainRequest, ChainResponse}; +use monero_wire::protocol::{ChainRequest, ChainResponse, GetObjectsRequest}; use crate::client_pool::{ClientPool, ClientPoolDropGuard}; use crate::constants::{INITIAL_CHAIN_REQUESTS_TO_SEND, MEDIUM_BAN}; @@ -55,6 +57,10 @@ pub struct BlockDownloaderConfig { #[derive(Debug, thiserror::Error)] pub enum BlockDownloadError { + #[error("The peers we requested data from did not have all the data.")] + PeerDidNotHaveRequestedData, + #[error("The peers response to a request was invalid.")] + PeersResponseWasInvalid, #[error("Failed to find a more advanced chain to follow")] FailedToFindAChainToFollow, #[error("The peer did not send any overlapping blocks, unknown start height.")] @@ -123,21 +129,22 @@ struct BlockDownloader { peer_sync_svc: S, our_chain_svc: C, - block_download_tasks: JoinSet<()>, + amount_of_blocks_to_request: usize, + + block_download_tasks: JoinSet<( + u64, + Result<(ClientPoolDropGuard, BlockBatch), BlockDownloadError>, + )>, chain_entry_task: JoinSet<()>, + inflight_requests: BTreeMap>, + buffer_appender: BufferAppender, config: BlockDownloaderConfig, } -async fn block_downloader( - client_pool: Arc>, - mut peer_sync_svc: S, - mut our_chain_svc: C, - config: BlockDownloaderConfig, - buffer_appender: BufferAppender, -) -> Result<(), BlockDownloadError> +impl BlockDownloader where S: PeerSyncSvc + Clone, C: Service @@ -145,46 +152,160 @@ where + 'static, C::Future: Send + 'static, { - let mut best_chain_found = - initial_chain_search(&client_pool, peer_sync_svc.clone(), &mut our_chain_svc).await?; + fn new( + client_pool: Arc>, - let tasks = JoinSet::new(); + peer_sync_svc: S, + our_chain_svc: C, + buffer_appender: BufferAppender, - let mut ready_queue = BinaryHeap::new(); - let mut inflight_queue = BTreeMap::new(); + config: BlockDownloaderConfig, + ) -> Self { + BlockDownloader { + client_pool, + peer_sync_svc, + our_chain_svc, + amount_of_blocks_to_request: config.initial_batch_size, + block_download_tasks: JoinSet::new(), + chain_entry_task: JoinSet::new(), + inflight_requests: BTreeMap::new(), + buffer_appender, + config, + } + } - let mut next_request_id = 0; + async fn handle_free_client( + &mut self, + chain_tracker: &mut ChainTracker, + client: ClientPoolDropGuard, + ) { + if chain_tracker.block_requests_queued(self.amount_of_blocks_to_request) < 15 + && chain_tracker.should_ask_for_next_chain_entry(&client.info.pruning_seed) + { + todo!("request chain entry") + } - // The request ID for which we updated `amount_of_blocks_to_request` - // `amount_of_blocks_to_request` will update for every new batch of blocks that come in. - let mut amount_of_blocks_to_request_updated_at = next_request_id; + let Some(block_entry_to_get) = chain_tracker + .blocks_to_get(&client.info.pruning_seed, self.amount_of_blocks_to_request) + else { + return; + }; + } - // The amount of blocks to request in 1 batch, will dynamically update based on block size. - let mut amount_of_blocks_to_request = config.initial_batch_size; + async fn run(mut self) -> Result<(), BlockDownloadError> { + let mut chain_tracker = initial_chain_search( + &self.client_pool, + self.peer_sync_svc.clone(), + &mut self.our_chain_svc, + ) + .await?; - let mut check_client_pool_interval = interval(config.check_client_pool_interval); - check_client_pool_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + let mut next_request_id = 0; - loop { - tokio::select! { - _ = check_client_pool_interval.tick() => { - todo!() + // The request ID for which we updated `self.amount_of_blocks_to_request` + // `amount_of_blocks_to_request` will update for every new batch of blocks that come in. + let mut amount_of_blocks_to_request_updated_at = next_request_id; + + let mut check_client_pool_interval = interval(self.config.check_client_pool_interval); + check_client_pool_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + + loop { + tokio::select! { + _ = check_client_pool_interval.tick() => { + todo!() + } } - - - } } } -async fn handle_free_peer( - peer: ClientPoolDropGuard, - chain_tracker: &mut ChainTracker, - next_batch_size: usize, -) { - if chain_tracker.block_requests_queued(next_batch_size) < 15 - && chain_tracker.should_ask_for_next_chain_entry(&peer.info.pruning_seed) - {} +async fn request_batch( + mut client: ClientPoolDropGuard, + ids: ByteArrayVec<32>, + expected_start_height: u64, +) -> Result<(ClientPoolDropGuard, BlockBatch), BlockDownloadError> { + let numb_requested = ids.len(); + + let PeerResponse::GetObjects(blocks_response) = client + .ready() + .await? + .call(PeerRequest::GetObjects(GetObjectsRequest { + blocks: ids, + pruned: false, + })) + .await? + else { + panic!("Connection task returned wrong response."); + }; + + if blocks_response.blocks.len() > numb_requested { + client.info.handle.ban_peer(MEDIUM_BAN); + return Err(BlockDownloadError::PeersResponseWasInvalid); + } + + if blocks_response.blocks.len() != numb_requested { + return Err(BlockDownloadError::PeerDidNotHaveRequestedData); + } + + let blocks = rayon_spawn_async(move || { + // TODO: size check the incoming blocks/ txs. + + let blocks = blocks_response + .blocks + .into_par_iter() + .map(|block_entry| { + let block = Block::read(&mut block_entry.block.as_ref()) + .map_err(|_| BlockDownloadError::PeersResponseWasInvalid)?; + + if block.txs.len() != block_entry.txs.len() { + return Err(BlockDownloadError::PeersResponseWasInvalid); + } + + let txs = block_entry + .txs + .take_normal() + .ok_or(BlockDownloadError::PeersResponseWasInvalid)? + .into_par_iter() + .map(|tx_blob| Transaction::read(&mut tx_blob.as_ref())) + .collect::, _>>() + .map_err(|_| BlockDownloadError::PeersResponseWasInvalid)?; + + let mut expected_txs = block.txs.iter().collect::>(); + + for tx in &txs { + if !expected_txs.remove(&tx.hash()) { + return Err(BlockDownloadError::PeersResponseWasInvalid); + } + } + + if !expected_txs.is_empty() { + return Err(BlockDownloadError::PeersResponseWasInvalid); + } + + Ok((block, txs)) + }) + .collect::, _>>(); + + blocks + }) + .await; + + let blocks = blocks.inspect_err(|e| { + if matches!(e, BlockDownloadError::PeersResponseWasInvalid) { + client.info.handle.ban_peer(MEDIUM_BAN); + } + })?; + + let peer_handle = client.info.handle.clone(); + + Ok(( + client, + BlockBatch { + blocks, + size: 0, + peer_handle, + }, + )) } async fn initial_chain_search(