move block downloader to struct

This commit is contained in:
Boog900 2024-05-27 01:00:55 +01:00
parent 3270560711
commit fa99fd5343
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2

View file

@ -3,18 +3,20 @@
mod chain_tracker;
use std::collections::{BTreeMap, BinaryHeap, VecDeque};
use std::collections::{BTreeMap, BinaryHeap, HashSet, VecDeque};
use std::sync::Arc;
use std::time::Duration;
use monero_serai::{block::Block, transaction::Transaction};
use rand::prelude::*;
use rayon::prelude::*;
use tokio::task::JoinSet;
use tokio::time::{interval, MissedTickBehavior};
use tower::{Service, ServiceExt};
use crate::block_downloader::chain_tracker::{ChainEntry, ChainTracker};
use crate::block_downloader::chain_tracker::{BlocksToRetrieve, ChainEntry, ChainTracker};
use async_buffer::{BufferAppender, BufferStream};
use cuprate_helper::asynch::rayon_spawn_async;
use fixed_bytes::ByteArrayVec;
use monero_p2p::client::InternalPeerID;
use monero_p2p::{
@ -22,7 +24,7 @@ use monero_p2p::{
services::{PeerSyncRequest, PeerSyncResponse},
NetworkZone, PeerRequest, PeerResponse, PeerSyncSvc,
};
use monero_wire::protocol::{ChainRequest, ChainResponse};
use monero_wire::protocol::{ChainRequest, ChainResponse, GetObjectsRequest};
use crate::client_pool::{ClientPool, ClientPoolDropGuard};
use crate::constants::{INITIAL_CHAIN_REQUESTS_TO_SEND, MEDIUM_BAN};
@ -55,6 +57,10 @@ pub struct BlockDownloaderConfig {
#[derive(Debug, thiserror::Error)]
pub enum BlockDownloadError {
#[error("The peers we requested data from did not have all the data.")]
PeerDidNotHaveRequestedData,
#[error("The peers response to a request was invalid.")]
PeersResponseWasInvalid,
#[error("Failed to find a more advanced chain to follow")]
FailedToFindAChainToFollow,
#[error("The peer did not send any overlapping blocks, unknown start height.")]
@ -123,21 +129,22 @@ struct BlockDownloader<N: NetworkZone, S, C> {
peer_sync_svc: S,
our_chain_svc: C,
block_download_tasks: JoinSet<()>,
amount_of_blocks_to_request: usize,
block_download_tasks: JoinSet<(
u64,
Result<(ClientPoolDropGuard<N>, BlockBatch), BlockDownloadError>,
)>,
chain_entry_task: JoinSet<()>,
inflight_requests: BTreeMap<u64, BlocksToRetrieve<N>>,
buffer_appender: BufferAppender<BlockBatch>,
config: BlockDownloaderConfig,
}
async fn block_downloader<N: NetworkZone, S, C>(
client_pool: Arc<ClientPool<N>>,
mut peer_sync_svc: S,
mut our_chain_svc: C,
config: BlockDownloaderConfig,
buffer_appender: BufferAppender<BlockBatch>,
) -> Result<(), BlockDownloadError>
impl<N: NetworkZone, S, C> BlockDownloader<N, S, C>
where
S: PeerSyncSvc<N> + Clone,
C: Service<ChainSvcRequest, Response = ChainSvcResponse, Error = tower::BoxError>
@ -145,46 +152,160 @@ where
+ 'static,
C::Future: Send + 'static,
{
let mut best_chain_found =
initial_chain_search(&client_pool, peer_sync_svc.clone(), &mut our_chain_svc).await?;
fn new(
client_pool: Arc<ClientPool<N>>,
let tasks = JoinSet::new();
peer_sync_svc: S,
our_chain_svc: C,
buffer_appender: BufferAppender<BlockBatch>,
let mut ready_queue = BinaryHeap::new();
let mut inflight_queue = BTreeMap::new();
config: BlockDownloaderConfig,
) -> Self {
BlockDownloader {
client_pool,
peer_sync_svc,
our_chain_svc,
amount_of_blocks_to_request: config.initial_batch_size,
block_download_tasks: JoinSet::new(),
chain_entry_task: JoinSet::new(),
inflight_requests: BTreeMap::new(),
buffer_appender,
config,
}
}
let mut next_request_id = 0;
async fn handle_free_client(
&mut self,
chain_tracker: &mut ChainTracker<N>,
client: ClientPoolDropGuard<N>,
) {
if chain_tracker.block_requests_queued(self.amount_of_blocks_to_request) < 15
&& chain_tracker.should_ask_for_next_chain_entry(&client.info.pruning_seed)
{
todo!("request chain entry")
}
// The request ID for which we updated `amount_of_blocks_to_request`
// `amount_of_blocks_to_request` will update for every new batch of blocks that come in.
let mut amount_of_blocks_to_request_updated_at = next_request_id;
let Some(block_entry_to_get) = chain_tracker
.blocks_to_get(&client.info.pruning_seed, self.amount_of_blocks_to_request)
else {
return;
};
}
// The amount of blocks to request in 1 batch, will dynamically update based on block size.
let mut amount_of_blocks_to_request = config.initial_batch_size;
async fn run(mut self) -> Result<(), BlockDownloadError> {
let mut chain_tracker = initial_chain_search(
&self.client_pool,
self.peer_sync_svc.clone(),
&mut self.our_chain_svc,
)
.await?;
let mut check_client_pool_interval = interval(config.check_client_pool_interval);
check_client_pool_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
let mut next_request_id = 0;
loop {
tokio::select! {
_ = check_client_pool_interval.tick() => {
todo!()
// The request ID for which we updated `self.amount_of_blocks_to_request`
// `amount_of_blocks_to_request` will update for every new batch of blocks that come in.
let mut amount_of_blocks_to_request_updated_at = next_request_id;
let mut check_client_pool_interval = interval(self.config.check_client_pool_interval);
check_client_pool_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
tokio::select! {
_ = check_client_pool_interval.tick() => {
todo!()
}
}
}
}
}
async fn handle_free_peer<N: NetworkZone>(
peer: ClientPoolDropGuard<N>,
chain_tracker: &mut ChainTracker<N>,
next_batch_size: usize,
) {
if chain_tracker.block_requests_queued(next_batch_size) < 15
&& chain_tracker.should_ask_for_next_chain_entry(&peer.info.pruning_seed)
{}
async fn request_batch<N: NetworkZone>(
mut client: ClientPoolDropGuard<N>,
ids: ByteArrayVec<32>,
expected_start_height: u64,
) -> Result<(ClientPoolDropGuard<N>, BlockBatch), BlockDownloadError> {
let numb_requested = ids.len();
let PeerResponse::GetObjects(blocks_response) = client
.ready()
.await?
.call(PeerRequest::GetObjects(GetObjectsRequest {
blocks: ids,
pruned: false,
}))
.await?
else {
panic!("Connection task returned wrong response.");
};
if blocks_response.blocks.len() > numb_requested {
client.info.handle.ban_peer(MEDIUM_BAN);
return Err(BlockDownloadError::PeersResponseWasInvalid);
}
if blocks_response.blocks.len() != numb_requested {
return Err(BlockDownloadError::PeerDidNotHaveRequestedData);
}
let blocks = rayon_spawn_async(move || {
// TODO: size check the incoming blocks/ txs.
let blocks = blocks_response
.blocks
.into_par_iter()
.map(|block_entry| {
let block = Block::read(&mut block_entry.block.as_ref())
.map_err(|_| BlockDownloadError::PeersResponseWasInvalid)?;
if block.txs.len() != block_entry.txs.len() {
return Err(BlockDownloadError::PeersResponseWasInvalid);
}
let txs = block_entry
.txs
.take_normal()
.ok_or(BlockDownloadError::PeersResponseWasInvalid)?
.into_par_iter()
.map(|tx_blob| Transaction::read(&mut tx_blob.as_ref()))
.collect::<Result<Vec<_>, _>>()
.map_err(|_| BlockDownloadError::PeersResponseWasInvalid)?;
let mut expected_txs = block.txs.iter().collect::<HashSet<_>>();
for tx in &txs {
if !expected_txs.remove(&tx.hash()) {
return Err(BlockDownloadError::PeersResponseWasInvalid);
}
}
if !expected_txs.is_empty() {
return Err(BlockDownloadError::PeersResponseWasInvalid);
}
Ok((block, txs))
})
.collect::<Result<Vec<_>, _>>();
blocks
})
.await;
let blocks = blocks.inspect_err(|e| {
if matches!(e, BlockDownloadError::PeersResponseWasInvalid) {
client.info.handle.ban_peer(MEDIUM_BAN);
}
})?;
let peer_handle = client.info.handle.clone();
Ok((
client,
BlockBatch {
blocks,
size: 0,
peer_handle,
},
))
}
async fn initial_chain_search<N: NetworkZone, S, C>(