From 596fed775a6430826347e32a1a932fb561ae6b14 Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Fri, 24 May 2024 02:11:49 +0100 Subject: [PATCH] initial chain search --- p2p/cuprate-p2p/src/block_downloader.rs | 165 +++++++++++++++++++++++- p2p/cuprate-p2p/src/constants.rs | 3 + 2 files changed, 166 insertions(+), 2 deletions(-) diff --git a/p2p/cuprate-p2p/src/block_downloader.rs b/p2p/cuprate-p2p/src/block_downloader.rs index 51acaae0..0cae8a74 100644 --- a/p2p/cuprate-p2p/src/block_downloader.rs +++ b/p2p/cuprate-p2p/src/block_downloader.rs @@ -1,14 +1,24 @@ //! # Block Downloader //! +use std::collections::VecDeque; use std::sync::Arc; use monero_serai::{block::Block, transaction::Transaction}; +use rand::prelude::*; +use tokio::task::JoinSet; +use tower::{Service, ServiceExt}; -use async_buffer::BufferStream; -use monero_p2p::{handles::ConnectionHandle, NetworkZone}; +use async_buffer::{BufferAppender, BufferStream}; +use monero_p2p::{ + handles::ConnectionHandle, + services::{PeerSyncRequest, PeerSyncResponse}, + NetworkZone, PeerRequest, PeerResponse, PeerSyncSvc, +}; +use monero_wire::protocol::ChainRequest; use crate::client_pool::ClientPool; +use crate::constants::INITIAL_CHAIN_REQUESTS_TO_SEND; /// A downloaded batch of blocks. pub struct BlockBatch { @@ -28,6 +38,34 @@ pub struct BlockDownloaderConfig { in_progress_queue_size: usize, } +#[derive(Debug, Copy, Clone, PartialOrd, PartialEq, Ord, Eq, thiserror::Error)] +pub enum BlockDownloadError { + #[error("Failed to find a more advanced chain to follow")] + FailedToFindAChainToFollow, + #[error("Service error: {0}")] + ServiceError(#[from] tower::BoxError), +} + +/// The request type for the chain service. +pub enum ChainSvcRequest { + /// A request for the current chain history. + CompactHistory, + /// A request to find the first unknown + FindFirstUnknown(Vec<[u8; 32]>), +} + +/// The response type for the chain service. +pub enum ChainSvcResponse { + /// The response for [`ChainSvcRequest::CompactHistory`] + CompactHistory { + block_ids: Vec<[u8; 32]>, + cumulative_difficulty: u128, + }, + /// The response for [`ChainSvcRequest::FindFirstUnknown`], contains the index of the first unknown + /// block. + FindFirstUnknown(usize), +} + /// # Block Downloader /// /// This function starts the block downloader and returns a [`BufferStream`] that will produce @@ -43,5 +81,128 @@ pub fn download_blocks( peer_sync_svc: S, config: BlockDownloaderConfig, ) -> BufferStream { + let (buffer_appender, buffer_stream) = async_buffer::new_buffer(config.buffer_size); + + tokio::spawn(block_downloader( + client_pool, + peer_sync_svc, + config, + buffer_appender, + )); + + buffer_stream +} + +async fn block_downloader( + client_pool: Arc>, + peer_sync_svc: S, + config: BlockDownloaderConfig, + buffer_appender: BufferAppender, +) -> Result<(), tower::BoxError> { todo!() } + +struct BestChainFound { + common_ancestor: [u8; 32], + next_hashes: VecDeque<[u8; 32]>, + from_peer: ConnectionHandle, +} + +async fn initial_chain_search( + client_pool: &ClientPool, + mut peer_sync_svc: S, + mut our_chain_svc: C, +) -> Result +where + S: PeerSyncSvc, + C: Service + Send + 'static, + C::Future: Send + 'static, +{ + let ChainSvcResponse::CompactHistory { + block_ids, + cumulative_difficulty, + } = our_chain_svc + .ready() + .await? + .call(ChainSvcRequest::CompactHistory) + .await? + else { + panic!("chain service sent wrong response."); + }; + + let PeerSyncResponse::PeersToSyncFrom(mut peers) = peer_sync_svc + .ready() + .await? + .call(PeerSyncRequest::PeersToSyncFrom { + block_needed: None, + current_cumulative_difficulty: cumulative_difficulty, + }) + .await? + else { + panic!("peer sync service sent wrong response."); + }; + + peers.shuffle(&mut thread_rng()); + + let mut peers = client_pool.borrow_clients(&peers); + + let mut futs = JoinSet::new(); + + let req = PeerRequest::GetChain(ChainRequest { + block_ids: block_ids.into(), + prune: false, + }); + + while futs.len() < INITIAL_CHAIN_REQUESTS_TO_SEND { + let Some(mut next_peer) = peers.next() else { + break; + }; + let cloned_req = req.clone(); + futs.spawn(async move { + let PeerResponse::GetChain(chain_res) = + next_peer.ready().await?.call(cloned_req).await? + else { + panic!("connection task returned wrong response!"); + }; + + Ok((chain_res, next_peer.info.handle.clone())) + }); + } + + let mut res = None; + + while let Some(task_res) = futs.join_next().await { + let Ok(task_res) = task_res.unwrap() else { + continue; + }; + + match &mut res { + Some(res) => { + if res.0.cumulative_difficulty() < task_res.0.cumulative_difficulty() { + let _ = std::mem::replace(res, task_res); + } + } + None => { + let _ = std::mem::replace(&mut res, Some(task_res)); + } + } + } + + let Some((chain_res, peer_handle)) = res else { + return Err(BlockDownloadError::FailedToFindAChainToFollow); + }; + + let hashes: Vec<[u8; 32]> = chain_res.m_block_ids.into(); + + let ChainSvcResponse::FindFirstUnknown(first_unknown) = our_chain_svc + .ready() + .await? + .call(ChainSvcRequest::FindFirstUnknown(hashes.clone())) + .await? + else { + panic!("chain service sent wrong response."); + }; + + todo!() + +} diff --git a/p2p/cuprate-p2p/src/constants.rs b/p2p/cuprate-p2p/src/constants.rs index 0c65386b..9bf9abc9 100644 --- a/p2p/cuprate-p2p/src/constants.rs +++ b/p2p/cuprate-p2p/src/constants.rs @@ -34,6 +34,9 @@ pub(crate) const MAX_TXS_IN_BROADCAST_CHANNEL: usize = 50; /// TODO: it might be a good idea to make this configurable. pub(crate) const INBOUND_CONNECTION_COOL_DOWN: Duration = Duration::from_millis(500); +/// The initial amount of chain requests to send to find the best chain to sync from. +pub(crate) const INITIAL_CHAIN_REQUESTS_TO_SEND: usize = 3; + #[cfg(test)] mod tests { use super::*;