working block downloader

This commit is contained in:
Boog900 2024-05-29 15:22:44 +01:00
parent 6979dba92d
commit 7e19a59d94
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
2 changed files with 122 additions and 22 deletions

View file

@ -25,6 +25,7 @@ use monero_p2p::{
services::{PeerSyncRequest, PeerSyncResponse},
NetworkZone, PeerRequest, PeerResponse, PeerSyncSvc,
};
use monero_pruning::CRYPTONOTE_MAX_BLOCK_HEIGHT;
use monero_wire::protocol::{ChainRequest, ChainResponse, GetObjectsRequest};
use crate::client_pool::{ClientPool, ClientPoolDropGuard};
@ -170,14 +171,17 @@ struct BlockDownloader<N: NetworkZone, S, C> {
amount_of_blocks_to_request: usize,
#[allow(clippy::type_complexity)]
block_download_tasks: JoinSet<(
u64,
Result<(ClientPoolDropGuard<N>, BlockBatch), BlockDownloadError>,
)>,
chain_entry_task: JoinSet<()>,
#[allow(clippy::type_complexity)]
chain_entry_task: JoinSet<Result<(ClientPoolDropGuard<N>, ChainEntry<N>), BlockDownloadError>>,
inflight_requests: BTreeMap<u64, BlocksToRetrieve<N>>,
ready_batches: BinaryHeap<ReadyQueueBatch>,
failed_batches: BinaryHeap<Reverse<u64>>,
buffer_appender: BufferAppender<BlockBatch>,
@ -210,20 +214,46 @@ where
chain_entry_task: JoinSet::new(),
inflight_requests: BTreeMap::new(),
ready_batches: BinaryHeap::new(),
failed_batches: BinaryHeap::new(),
buffer_appender,
config,
}
}
async fn handle_free_client(
async fn request_block_batch(
&mut self,
chain_tracker: &mut ChainTracker<N>,
client: ClientPoolDropGuard<N>,
) {
if chain_tracker.block_requests_queued(self.amount_of_blocks_to_request) < 15
&& chain_tracker.should_ask_for_next_chain_entry(&client.info.pruning_seed)
{
todo!("request chain entry")
while let Some(failed_request) = self.failed_batches.peek() {
if let Some(request) = self.inflight_requests.get(&failed_request.0) {
if client
.info
.pruning_seed
.has_full_block(request.start_height, CRYPTONOTE_MAX_BLOCK_HEIGHT)
&& client.info.pruning_seed.has_full_block(
request.start_height + u64::try_from(request.ids.len()).unwrap(),
CRYPTONOTE_MAX_BLOCK_HEIGHT,
)
{
let ids = request.ids.clone();
let start_height = request.start_height;
self.block_download_tasks.spawn(async move {
(
start_height,
request_batch_from_peer(client, ids, start_height).await,
)
});
self.failed_batches.pop();
return;
}
break;
} else {
self.failed_batches.pop();
}
}
let Some(block_entry_to_get) = chain_tracker
@ -248,12 +278,32 @@ where
});
}
async fn handle_free_client(
&mut self,
chain_tracker: &mut ChainTracker<N>,
client: ClientPoolDropGuard<N>,
) {
if self.chain_entry_task.len() < 2
&& chain_tracker.block_requests_queued(self.amount_of_blocks_to_request) < 15
&& chain_tracker.should_ask_for_next_chain_entry(&client.info.pruning_seed)
{
let history = chain_tracker.get_simple_history();
self.chain_entry_task
.spawn(request_chain_entry_from_peer(client, history));
return;
}
self.request_block_batch(chain_tracker, client).await;
}
async fn check_for_free_clients(
&mut self,
chain_tracker: &mut ChainTracker<N>,
) -> Result<(), BlockDownloadError> {
// This value might be slightly behind but thats ok.
let ChainSvcResponse::CumulativeDifficulty(cum_diff) = self
let ChainSvcResponse::CumulativeDifficulty(current_cumulative_difficulty) = self
.our_chain_svc
.ready()
.await?
@ -268,7 +318,7 @@ where
.ready()
.await?
.call(PeerSyncRequest::PeersToSyncFrom {
current_cumulative_difficulty: 0,
current_cumulative_difficulty,
block_needed: None,
})
.await?
@ -294,7 +344,13 @@ where
chain_tracker: &mut ChainTracker<N>,
) -> Result<(), BlockDownloadError> {
match res {
Err(_) => todo!("Check if block IDs was invalid"),
Err(_) => {
if self.inflight_requests.contains_key(&start_height) {
self.failed_batches.push(Reverse(start_height))
}
Ok(())
}
Ok((client, block_batch)) => {
if self.inflight_requests.remove(&start_height).is_none() {
self.handle_free_client(chain_tracker, client).await;
@ -355,6 +411,17 @@ where
self.handle_download_batch_res(start_height, res, &mut chain_tracker).await?;
}
Some(Ok(res)) = self.chain_entry_task.join_next() => {
match res {
Ok((client, entry)) => {
if chain_tracker.add_entry(entry).is_ok() {
self.chain_entry_task.abort_all();
}
self.handle_free_client(&mut chain_tracker, client).await;
}
Err(_) => {}
}
}
else => {
self.check_for_free_clients(&mut chain_tracker).await?;
}
@ -452,6 +519,44 @@ async fn request_batch_from_peer<N: NetworkZone>(
))
}
async fn request_chain_entry_from_peer<N: NetworkZone>(
mut client: ClientPoolDropGuard<N>,
short_history: [[u8; 32]; 2],
) -> Result<(ClientPoolDropGuard<N>, ChainEntry<N>), BlockDownloadError> {
let PeerResponse::GetChain(chain_res) = client
.ready()
.await?
.call(PeerRequest::GetChain(ChainRequest {
block_ids: short_history.into(),
prune: false,
}))
.await?
else {
panic!("Connection task returned wrong response!");
};
if chain_res.m_block_ids.is_empty() {
client.info.handle.ban_peer(MEDIUM_BAN);
return Err(BlockDownloadError::PeersResponseWasInvalid);
}
// We must have at least one overlapping block.
if !(chain_res.m_block_ids[0] == short_history[0]
|| chain_res.m_block_ids[0] == short_history[1])
{
client.info.handle.ban_peer(MEDIUM_BAN);
return Err(BlockDownloadError::PeersResponseWasInvalid);
}
let entry = ChainEntry {
ids: (&chain_res.m_block_ids).into(),
peer: client.info.id,
handle: client.info.handle.clone(),
};
Ok((client, entry))
}
async fn initial_chain_search<N: NetworkZone, S, C>(
client_pool: &Arc<ClientPool<N>>,
mut peer_sync_svc: S,

View file

@ -93,34 +93,29 @@ impl<N: NetworkZone> ChainTracker<N> {
.sum()
}
pub fn add_entry(
&mut self,
mut chain_entry: ChainResponse,
peer: InternalPeerID<N::Addr>,
handle: ConnectionHandle,
) -> Result<(), ChainTrackerError> {
pub fn add_entry(&mut self, mut chain_entry: ChainEntry<N>) -> Result<(), ChainTrackerError> {
// TODO: check chain entries length.
if chain_entry.m_block_ids.is_empty() {
if chain_entry.ids.is_empty() {
// The peer must send at lest one overlapping block.
handle.ban_peer(MEDIUM_BAN);
chain_entry.handle.ban_peer(MEDIUM_BAN);
return Err(ChainTrackerError::NewEntryIsInvalid);
}
if self
.entries
.back()
.is_some_and(|last_entry| last_entry.ids.last().unwrap() != &chain_entry.m_block_ids[0])
.is_some_and(|last_entry| last_entry.ids.last().unwrap() != &chain_entry.ids[0])
{
return Err(ChainTrackerError::NewEntryDoesNotFollowChain);
}
tracing::warn!("len: {}", chain_entry.m_block_ids.len());
tracing::warn!("len: {}", chain_entry.ids.len());
let new_entry = ChainEntry {
// ignore the first block - we already know it.
ids: (&chain_entry.m_block_ids.split_off(1)).into(),
peer,
handle,
ids: chain_entry.ids.split_off(1),
peer: chain_entry.peer,
handle: chain_entry.handle,
};
self.top_seen_hash = *new_entry.ids.last().unwrap();