initial chain search

This commit is contained in:
Boog900 2024-05-24 02:11:49 +01:00
parent 5a0d4a4e94
commit 596fed775a
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
2 changed files with 166 additions and 2 deletions

View file

@ -1,14 +1,24 @@
//! # Block Downloader //! # Block Downloader
//! //!
use std::collections::VecDeque;
use std::sync::Arc; use std::sync::Arc;
use monero_serai::{block::Block, transaction::Transaction}; use monero_serai::{block::Block, transaction::Transaction};
use rand::prelude::*;
use tokio::task::JoinSet;
use tower::{Service, ServiceExt};
use async_buffer::BufferStream; use async_buffer::{BufferAppender, BufferStream};
use monero_p2p::{handles::ConnectionHandle, NetworkZone}; use monero_p2p::{
handles::ConnectionHandle,
services::{PeerSyncRequest, PeerSyncResponse},
NetworkZone, PeerRequest, PeerResponse, PeerSyncSvc,
};
use monero_wire::protocol::ChainRequest;
use crate::client_pool::ClientPool; use crate::client_pool::ClientPool;
use crate::constants::INITIAL_CHAIN_REQUESTS_TO_SEND;
/// A downloaded batch of blocks. /// A downloaded batch of blocks.
pub struct BlockBatch { pub struct BlockBatch {
@ -28,6 +38,34 @@ pub struct BlockDownloaderConfig {
in_progress_queue_size: usize, in_progress_queue_size: usize,
} }
#[derive(Debug, Copy, Clone, PartialOrd, PartialEq, Ord, Eq, thiserror::Error)]
pub enum BlockDownloadError {
#[error("Failed to find a more advanced chain to follow")]
FailedToFindAChainToFollow,
#[error("Service error: {0}")]
ServiceError(#[from] tower::BoxError),
}
/// The request type for the chain service.
pub enum ChainSvcRequest {
/// A request for the current chain history.
CompactHistory,
/// A request to find the first unknown
FindFirstUnknown(Vec<[u8; 32]>),
}
/// The response type for the chain service.
pub enum ChainSvcResponse {
/// The response for [`ChainSvcRequest::CompactHistory`]
CompactHistory {
block_ids: Vec<[u8; 32]>,
cumulative_difficulty: u128,
},
/// The response for [`ChainSvcRequest::FindFirstUnknown`], contains the index of the first unknown
/// block.
FindFirstUnknown(usize),
}
/// # Block Downloader /// # Block Downloader
/// ///
/// This function starts the block downloader and returns a [`BufferStream`] that will produce /// This function starts the block downloader and returns a [`BufferStream`] that will produce
@ -43,5 +81,128 @@ pub fn download_blocks<N: NetworkZone, S>(
peer_sync_svc: S, peer_sync_svc: S,
config: BlockDownloaderConfig, config: BlockDownloaderConfig,
) -> BufferStream<BlockBatch> { ) -> BufferStream<BlockBatch> {
let (buffer_appender, buffer_stream) = async_buffer::new_buffer(config.buffer_size);
tokio::spawn(block_downloader(
client_pool,
peer_sync_svc,
config,
buffer_appender,
));
buffer_stream
}
async fn block_downloader<N: NetworkZone, S>(
client_pool: Arc<ClientPool<N>>,
peer_sync_svc: S,
config: BlockDownloaderConfig,
buffer_appender: BufferAppender<BlockBatch>,
) -> Result<(), tower::BoxError> {
todo!() todo!()
} }
struct BestChainFound {
common_ancestor: [u8; 32],
next_hashes: VecDeque<[u8; 32]>,
from_peer: ConnectionHandle,
}
async fn initial_chain_search<N: NetworkZone, S, C>(
client_pool: &ClientPool<N>,
mut peer_sync_svc: S,
mut our_chain_svc: C,
) -> Result<BestChainFound, BlockDownloadError>
where
S: PeerSyncSvc<N>,
C: Service<ChainSvcRequest, Response = ChainSvcResponse> + Send + 'static,
C::Future: Send + 'static,
{
let ChainSvcResponse::CompactHistory {
block_ids,
cumulative_difficulty,
} = our_chain_svc
.ready()
.await?
.call(ChainSvcRequest::CompactHistory)
.await?
else {
panic!("chain service sent wrong response.");
};
let PeerSyncResponse::PeersToSyncFrom(mut peers) = peer_sync_svc
.ready()
.await?
.call(PeerSyncRequest::PeersToSyncFrom {
block_needed: None,
current_cumulative_difficulty: cumulative_difficulty,
})
.await?
else {
panic!("peer sync service sent wrong response.");
};
peers.shuffle(&mut thread_rng());
let mut peers = client_pool.borrow_clients(&peers);
let mut futs = JoinSet::new();
let req = PeerRequest::GetChain(ChainRequest {
block_ids: block_ids.into(),
prune: false,
});
while futs.len() < INITIAL_CHAIN_REQUESTS_TO_SEND {
let Some(mut next_peer) = peers.next() else {
break;
};
let cloned_req = req.clone();
futs.spawn(async move {
let PeerResponse::GetChain(chain_res) =
next_peer.ready().await?.call(cloned_req).await?
else {
panic!("connection task returned wrong response!");
};
Ok((chain_res, next_peer.info.handle.clone()))
});
}
let mut res = None;
while let Some(task_res) = futs.join_next().await {
let Ok(task_res) = task_res.unwrap() else {
continue;
};
match &mut res {
Some(res) => {
if res.0.cumulative_difficulty() < task_res.0.cumulative_difficulty() {
let _ = std::mem::replace(res, task_res);
}
}
None => {
let _ = std::mem::replace(&mut res, Some(task_res));
}
}
}
let Some((chain_res, peer_handle)) = res else {
return Err(BlockDownloadError::FailedToFindAChainToFollow);
};
let hashes: Vec<[u8; 32]> = chain_res.m_block_ids.into();
let ChainSvcResponse::FindFirstUnknown(first_unknown) = our_chain_svc
.ready()
.await?
.call(ChainSvcRequest::FindFirstUnknown(hashes.clone()))
.await?
else {
panic!("chain service sent wrong response.");
};
todo!()
}

View file

@ -34,6 +34,9 @@ pub(crate) const MAX_TXS_IN_BROADCAST_CHANNEL: usize = 50;
/// TODO: it might be a good idea to make this configurable. /// TODO: it might be a good idea to make this configurable.
pub(crate) const INBOUND_CONNECTION_COOL_DOWN: Duration = Duration::from_millis(500); pub(crate) const INBOUND_CONNECTION_COOL_DOWN: Duration = Duration::from_millis(500);
/// The initial amount of chain requests to send to find the best chain to sync from.
pub(crate) const INITIAL_CHAIN_REQUESTS_TO_SEND: usize = 3;
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;