check for free peers and handle batch response

This commit is contained in:
Boog900 2024-05-29 01:40:42 +01:00
parent eb69895278
commit 759369dd0a
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2

View file

@ -3,6 +3,7 @@
mod chain_tracker;
use std::cmp::{Ordering, Reverse};
use std::collections::{BTreeMap, BinaryHeap, HashSet, VecDeque};
use std::sync::Arc;
use std::time::Duration;
@ -30,6 +31,7 @@ use crate::client_pool::{ClientPool, ClientPoolDropGuard};
use crate::constants::{INITIAL_CHAIN_REQUESTS_TO_SEND, MEDIUM_BAN};
/// A downloaded batch of blocks.
#[derive(Debug)]
pub struct BlockBatch {
/// The blocks.
blocks: Vec<(Block, Vec<Transaction>)>,
@ -57,6 +59,8 @@ pub struct BlockDownloaderConfig {
#[derive(Debug, thiserror::Error)]
pub enum BlockDownloadError {
#[error("The block buffer was closed.")]
BufferWasClosed,
#[error("The peers we requested data from did not have all the data.")]
PeerDidNotHaveRequestedData,
#[error("The peers response to a request was invalid.")]
@ -103,26 +107,61 @@ pub enum ChainSvcResponse {
///
/// The block downloader may fail before the whole chain is downloaded. If this is the case you are
/// free to call this function again, so it can start the search again.
pub fn download_blocks<N: NetworkZone, S>(
pub fn download_blocks<N: NetworkZone, S, C>(
client_pool: Arc<ClientPool<N>>,
peer_sync_svc: S,
our_chain_svc: C,
config: BlockDownloaderConfig,
) -> BufferStream<BlockBatch> {
) -> BufferStream<BlockBatch>
where
S: PeerSyncSvc<N> + Clone,
C: Service<ChainSvcRequest, Response = ChainSvcResponse, Error = tower::BoxError>
+ Send
+ 'static,
C::Future: Send + 'static,
{
let (buffer_appender, buffer_stream) = async_buffer::new_buffer(config.buffer_size);
/*
tokio::spawn(block_downloader(
let block_downloader = BlockDownloader::new(
client_pool,
peer_sync_svc,
config,
our_chain_svc,
buffer_appender,
));
config,
);
*/
tokio::spawn(block_downloader.run());
buffer_stream
}
#[derive(Debug)]
struct ReadyQueueBatch {
start_height: u64,
block_batch: BlockBatch,
}
impl Eq for ReadyQueueBatch {}
impl PartialEq<Self> for ReadyQueueBatch {
fn eq(&self, other: &Self) -> bool {
self.start_height.eq(&other.start_height)
}
}
impl PartialOrd<Self> for ReadyQueueBatch {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for ReadyQueueBatch {
fn cmp(&self, other: &Self) -> Ordering {
self.start_height.cmp(&other.start_height).reverse()
}
}
struct BlockDownloader<N: NetworkZone, S, C> {
client_pool: Arc<ClientPool<N>>,
@ -138,6 +177,7 @@ struct BlockDownloader<N: NetworkZone, S, C> {
chain_entry_task: JoinSet<()>,
inflight_requests: BTreeMap<u64, BlocksToRetrieve<N>>,
ready_batches: BinaryHeap<ReadyQueueBatch>,
buffer_appender: BufferAppender<BlockBatch>,
@ -169,6 +209,7 @@ where
block_download_tasks: JoinSet::new(),
chain_entry_task: JoinSet::new(),
inflight_requests: BTreeMap::new(),
ready_batches: BinaryHeap::new(),
buffer_appender,
config,
}
@ -191,18 +232,12 @@ where
return;
};
let next_request_id = self
.inflight_requests
.last_key_value()
.map(|(id, _)| *id + 1)
.unwrap_or(0);
self.inflight_requests
.insert(next_request_id, block_entry_to_get.clone());
.insert(block_entry_to_get.start_height, block_entry_to_get.clone());
self.block_download_tasks.spawn(async move {
(
next_request_id,
block_entry_to_get.start_height,
request_batch_from_peer(
client,
block_entry_to_get.ids,
@ -213,6 +248,88 @@ where
});
}
async fn check_for_free_clients(
&mut self,
chain_tracker: &mut ChainTracker<N>,
) -> Result<(), BlockDownloadError> {
// This value might be slightly behind but thats ok.
let ChainSvcResponse::CumulativeDifficulty(cum_diff) = self
.our_chain_svc
.ready()
.await?
.call(ChainSvcRequest::CumulativeDifficulty)
.await?
else {
panic!("Chain service returned ")
};
let PeerSyncResponse::PeersToSyncFrom(peers) = self
.peer_sync_svc
.ready()
.await?
.call(PeerSyncRequest::PeersToSyncFrom {
current_cumulative_difficulty: 0,
block_needed: None,
})
.await?
else {
panic!("Chain service returned ")
};
// Rust borrow rules mean we have to build a vec here.
let mut clients = Vec::with_capacity(peers.len());
clients.extend(self.client_pool.borrow_clients(&peers));
for peer in clients {
self.handle_free_client(chain_tracker, peer).await;
}
Ok(())
}
async fn handle_download_batch_res(
&mut self,
start_height: u64,
res: Result<(ClientPoolDropGuard<N>, BlockBatch), BlockDownloadError>,
chain_tracker: &mut ChainTracker<N>,
) -> Result<(), BlockDownloadError> {
match res {
Err(_) => todo!("Check if block IDs was invalid"),
Ok((client, block_batch)) => {
if self.inflight_requests.remove(&start_height).is_none() {
self.handle_free_client(chain_tracker, client).await;
return Ok(());
};
self.ready_batches.push(ReadyQueueBatch {
start_height,
block_batch,
});
while let (Some(ready_batch), Some((&lowest_start_height, _))) = (
self.ready_batches.peek(),
self.inflight_requests.first_key_value(),
) {
if ready_batch.start_height > lowest_start_height {
break;
}
let ready_batch = self.ready_batches.pop().unwrap();
let size = ready_batch.block_batch.size;
self.buffer_appender
.send(ready_batch.block_batch, size)
.await
.map_err(|_| BlockDownloadError::BufferWasClosed)?;
}
self.handle_free_client(chain_tracker, client).await;
Ok(())
}
}
}
async fn run(mut self) -> Result<(), BlockDownloadError> {
let mut chain_tracker = initial_chain_search(
&self.client_pool,
@ -221,11 +338,9 @@ where
)
.await?;
let mut next_request_id = 0;
// The request ID for which we updated `self.amount_of_blocks_to_request`
// `amount_of_blocks_to_request` will update for every new batch of blocks that come in.
let mut amount_of_blocks_to_request_updated_at = next_request_id;
let mut amount_of_blocks_to_request_updated_at = 0;
let mut check_client_pool_interval = interval(self.config.check_client_pool_interval);
check_client_pool_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
@ -233,7 +348,15 @@ where
loop {
tokio::select! {
_ = check_client_pool_interval.tick() => {
todo!()
self.check_for_free_clients(&mut chain_tracker).await?;
}
Some(res) = self.block_download_tasks.join_next() => {
let (start_height, res) = res.expect("Download batch future panicked");
self.handle_download_batch_res(start_height, res, &mut chain_tracker).await?;
}
else => {
self.check_for_free_clients(&mut chain_tracker).await?;
}
}
}