add function for incoming blocks

This commit is contained in:
Boog900 2024-09-13 00:03:03 +01:00
parent 01a3065cc8
commit 6ec5bc37a9
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
7 changed files with 118 additions and 14 deletions

View file

@ -16,6 +16,7 @@ use cuprate_types::{
VerifiedBlockInformation, VerifiedBlockInformation,
}; };
mod free;
mod manager; mod manager;
mod syncer; mod syncer;
mod types; mod types;
@ -120,7 +121,8 @@ pub async fn init_blockchain_manager(
blockchain_read_handle, blockchain_read_handle,
blockchain_context_service, blockchain_context_service,
block_verifier_service, block_verifier_service,
).await; )
.await;
tokio::spawn(manager.run(batch_rx)); tokio::spawn(manager.run(batch_rx));
} }

View file

@ -0,0 +1,93 @@
use crate::blockchain::manager::IncomingBlock;
use cuprate_blockchain::service::BlockchainReadHandle;
use cuprate_consensus::transactions::new_tx_verification_data;
use cuprate_helper::cast::usize_to_u64;
use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse};
use cuprate_types::Chain;
use monero_serai::block::Block;
use monero_serai::transaction::Transaction;
use rayon::prelude::*;
use std::collections::HashMap;
use std::sync::OnceLock;
use tokio::sync::{mpsc, oneshot};
use tower::{Service, ServiceExt};
static INCOMING_BLOCK_TX: OnceLock<mpsc::Sender<IncomingBlock>> = OnceLock::new();
#[derive(thiserror::Error)]
pub enum IncomingBlockError {
#[error("Unknown transactions in block.")]
UnknownTransactions(Vec<u64>),
#[error("The block has an unknown parent.")]
Orphan,
#[error(transparent)]
InvalidBlock(anyhow::Error),
}
pub async fn handle_incoming_block(
block: Block,
given_txs: Vec<Transaction>,
blockchain_read_handle: &mut BlockchainReadHandle,
) -> Result<(), IncomingBlockError> {
if !block_exists(block.header.previous, blockchain_read_handle).expect("TODO") {
return Err(IncomingBlockError::Orphan);
}
let block_hash = block.hash();
if block_exists(block_hash, blockchain_read_handle)
.await
.expect("TODO")
{
return Ok(());
}
let prepped_txs = given_txs
.into_par_iter()
.map(|tx| {
let tx = new_tx_verification_data(tx)?;
Ok((tx.tx_hash, tx))
})
.collect::<Result<_, anyhow::Error>>()
.map_err(IncomingBlockError::InvalidBlock)?;
// TODO: Get transactions from the tx pool first.
if given_txs.len() != block.transactions.len() {
return Err(IncomingBlockError::UnknownTransactions(
(0..usize_to_u64(block.transactions.len())).collect(),
));
}
let Some(incoming_block_tx) = INCOMING_BLOCK_TX.get() else {
return Ok(());
};
let (response_tx, response_rx) = oneshot::channel();
incoming_block_tx
.send(IncomingBlock {
block,
prepped_txs,
response_tx,
})
.await
.expect("TODO: don't actually panic here");
response_rx.await.map_err(IncomingBlockError::InvalidBlock)
}
async fn block_exists(
block_hash: [u8; 32],
blockchain_read_handle: &mut BlockchainReadHandle,
) -> Result<bool, anyhow::Error> {
let BlockchainResponse::FindBlock(chain) = blockchain_read_handle
.ready()
.await?
.call(BlockchainReadRequest::FindBlock(block_hash))
.await?
else {
panic!("Invalid blockchain response!");
};
Ok(chain.is_some())
}

View file

@ -1,6 +1,5 @@
mod handler; mod handler;
use std::collections::HashMap;
use crate::blockchain::types::ConsensusBlockchainReadHandle; use crate::blockchain::types::ConsensusBlockchainReadHandle;
use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle}; use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle};
use cuprate_consensus::context::RawBlockChainContext; use cuprate_consensus::context::RawBlockChainContext;
@ -14,15 +13,16 @@ use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse};
use cuprate_types::{Chain, TransactionVerificationData}; use cuprate_types::{Chain, TransactionVerificationData};
use futures::StreamExt; use futures::StreamExt;
use monero_serai::block::Block; use monero_serai::block::Block;
use std::collections::HashMap;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::sync::{Notify, oneshot}; use tokio::sync::{oneshot, Notify};
use tower::{Service, ServiceExt}; use tower::{Service, ServiceExt};
use tracing::error; use tracing::error;
pub struct IncomingBlock { pub struct IncomingBlock {
block: Block, pub block: Block,
prepped_txs: HashMap<[u8; 32], TransactionVerificationData>, pub prepped_txs: HashMap<[u8; 32], TransactionVerificationData>,
response_tx: oneshot::Sender<Result<(), anyhow::Error>>, pub response_tx: oneshot::Sender<Result<(), anyhow::Error>>,
} }
pub struct BlockchainManager { pub struct BlockchainManager {
@ -35,7 +35,6 @@ pub struct BlockchainManager {
TxVerifierService<ConsensusBlockchainReadHandle>, TxVerifierService<ConsensusBlockchainReadHandle>,
ConsensusBlockchainReadHandle, ConsensusBlockchainReadHandle,
>, >,
// TODO: stop_current_block_downloader: Notify, // TODO: stop_current_block_downloader: Notify,
} }
@ -56,7 +55,8 @@ impl BlockchainManager {
.expect("TODO") .expect("TODO")
.call(BlockChainContextRequest::GetContext) .call(BlockChainContextRequest::GetContext)
.await .await
.expect("TODO") else { .expect("TODO")
else {
panic!("Blockchain context service returned wrong response!"); panic!("Blockchain context service returned wrong response!");
}; };
@ -69,7 +69,11 @@ impl BlockchainManager {
} }
} }
pub async fn run(mut self, mut block_batch_rx: mpsc::Receiver<BlockBatch>, mut block_single_rx: mpsc::Receiver<IncomingBlock>) { pub async fn run(
mut self,
mut block_batch_rx: mpsc::Receiver<BlockBatch>,
mut block_single_rx: mpsc::Receiver<IncomingBlock>,
) {
loop { loop {
tokio::select! { tokio::select! {
Some(batch) = block_batch_rx.recv() => { Some(batch) = block_batch_rx.recv() => {

View file

@ -63,9 +63,13 @@ impl super::BlockchainManager {
.expect("Block batch should not be empty"); .expect("Block batch should not be empty");
if first_block.header.previous == self.cached_blockchain_context.top_hash { if first_block.header.previous == self.cached_blockchain_context.top_hash {
self.handle_incoming_block_batch_main_chain(batch).await.expect("TODO"); self.handle_incoming_block_batch_main_chain(batch)
.await
.expect("TODO");
} else { } else {
self.handle_incoming_block_batch_alt_chain(batch).await.expect("TODO"); self.handle_incoming_block_batch_alt_chain(batch)
.await
.expect("TODO");
} }
} }
@ -295,7 +299,8 @@ impl super::BlockchainManager {
.expect("TODO") .expect("TODO")
.call(BlockChainContextRequest::GetContext) .call(BlockChainContextRequest::GetContext)
.await .await
.expect("TODO") else { .expect("TODO")
else {
panic!("Incorrect response!"); panic!("Incorrect response!");
}; };

View file

@ -59,7 +59,8 @@ fn main() {
context_svc, context_svc,
block_verifier, block_verifier,
config.block_downloader_config(), config.block_downloader_config(),
).await; )
.await;
// TODO: this can be removed as long as the main thread does not exit, so when command handling // TODO: this can be removed as long as the main thread does not exit, so when command handling
// is added // is added

View file

@ -98,5 +98,4 @@ mod sealed {
{ {
type BorshAddr = T::Addr; type BorshAddr = T::Addr;
} }
} }

Binary file not shown.