clean up handler code

This commit is contained in:
Boog900 2024-09-12 22:17:44 +01:00
parent 915633fe70
commit 01a3065cc8
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
9 changed files with 332 additions and 349 deletions

View file

@ -1,6 +1,8 @@
//! Blockchain
//!
//! Will contain the chain manager and syncer.
use futures::FutureExt;
use tokio::sync::mpsc;
use tower::{Service, ServiceExt};
@ -95,7 +97,7 @@ pub async fn init_consensus(
}
/// Initializes the blockchain manager task and syncer.
pub fn init_blockchain_manager(
pub async fn init_blockchain_manager(
clearnet_interface: NetworkInterface<ClearNet>,
blockchain_write_handle: BlockchainWriteHandle,
blockchain_read_handle: BlockchainReadHandle,
@ -118,7 +120,7 @@ pub fn init_blockchain_manager(
blockchain_read_handle,
blockchain_context_service,
block_verifier_service,
);
).await;
tokio::spawn(manager.run(batch_rx));
}

View file

@ -1,6 +1,6 @@
mod batch_handler;
mod handler;
use std::collections::HashMap;
use crate::blockchain::types::ConsensusBlockchainReadHandle;
use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle};
use cuprate_consensus::context::RawBlockChainContext;
@ -11,12 +11,20 @@ use cuprate_consensus::{
};
use cuprate_p2p::block_downloader::BlockBatch;
use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse};
use cuprate_types::Chain;
use cuprate_types::{Chain, TransactionVerificationData};
use futures::StreamExt;
use tokio::sync::mpsc::Receiver;
use monero_serai::block::Block;
use tokio::sync::mpsc;
use tokio::sync::{Notify, oneshot};
use tower::{Service, ServiceExt};
use tracing::error;
pub struct IncomingBlock {
block: Block,
prepped_txs: HashMap<[u8; 32], TransactionVerificationData>,
response_tx: oneshot::Sender<Result<(), anyhow::Error>>,
}
pub struct BlockchainManager {
blockchain_write_handle: BlockchainWriteHandle,
blockchain_read_handle: BlockchainReadHandle,
@ -27,105 +35,58 @@ pub struct BlockchainManager {
TxVerifierService<ConsensusBlockchainReadHandle>,
ConsensusBlockchainReadHandle,
>,
// TODO: stop_current_block_downloader: Notify,
}
impl BlockchainManager {
pub const fn new(
pub async fn new(
blockchain_write_handle: BlockchainWriteHandle,
blockchain_read_handle: BlockchainReadHandle,
blockchain_context_service: BlockChainContextService,
mut blockchain_context_service: BlockChainContextService,
block_verifier_service: BlockVerifierService<
BlockChainContextService,
TxVerifierService<ConsensusBlockchainReadHandle>,
ConsensusBlockchainReadHandle,
>,
) -> Self {
let BlockChainContextResponse::Context(blockchain_context) = blockchain_context_service
.ready()
.await
.expect("TODO")
.call(BlockChainContextRequest::GetContext)
.await
.expect("TODO") else {
panic!("Blockchain context service returned wrong response!");
};
Self {
blockchain_write_handle,
blockchain_read_handle,
blockchain_context_service,
cached_blockchain_context: todo!(),
cached_blockchain_context: blockchain_context.unchecked_blockchain_context().clone(),
block_verifier_service,
}
}
async fn handle_incoming_main_chain_batch(
&mut self,
batch: BlockBatch,
) -> Result<(), anyhow::Error> {
let VerifyBlockResponse::MainChainBatchPrepped(prepped) = self
.block_verifier_service
.ready()
.await
.expect("TODO")
.call(VerifyBlockRequest::MainChainBatchPrepareBlocks {
blocks: batch.blocks,
})
.await?
else {
panic!("Incorrect response!");
};
for (block, txs) in prepped {
let VerifyBlockResponse::MainChain(verified_block) = block_verifier_service
.ready()
.await
.expect("TODO")
.call(VerifyBlockRequest::MainChainPrepped { block, txs })
.await
.unwrap()
else {
panic!("Incorrect response!");
};
blockchain_context_service
.ready()
.await
.expect("TODO")
.call(BlockChainContextRequest::Update(NewBlockData {
block_hash: verified_block.block_hash,
height: verified_block.height,
timestamp: verified_block.block.header.timestamp,
weight: verified_block.weight,
long_term_weight: verified_block.long_term_weight,
generated_coins: verified_block.generated_coins,
vote: HardFork::from_vote(verified_block.block.header.hardfork_signal),
cumulative_difficulty: verified_block.cumulative_difficulty,
}))
.await
.expect("TODO");
blockchain_write_handle
.ready()
.await
.expect("TODO")
.call(BlockchainWriteRequest::WriteBlock(verified_block))
.await
.expect("TODO");
}
}
async fn handle_incoming_block_batch(&mut self, batch: BlockBatch) {
let (first_block, _) = batch
.blocks
.first()
.expect("Block batch should not be empty");
if first_block.header.previous == self.cached_blockchain_context.top_hash {
todo!("Main chain")
} else {
todo!("Alt chain")
}
}
pub async fn run(mut self, mut batch_rx: Receiver<BlockBatch>) {
pub async fn run(mut self, mut block_batch_rx: mpsc::Receiver<BlockBatch>, mut block_single_rx: mpsc::Receiver<IncomingBlock>) {
loop {
tokio::select! {
Some(batch) = batch_rx.recv() => {
Some(batch) = block_batch_rx.recv() => {
self.handle_incoming_block_batch(
batch,
).await;
}
Some(incoming_block) = block_single_rx.recv() => {
let IncomingBlock {
block,
prepped_txs,
response_tx
} = incoming_block;
let res = self.handle_incoming_block(block, prepped_txs).await;
let _ = response_tx.send(res);
}
else => {
todo!("TODO: exit the BC manager")
}

View file

@ -1,121 +0,0 @@
//! Block batch handling functions.
use crate::blockchain::types::ConsensusBlockchainReadHandle;
use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle};
use cuprate_consensus::context::NewBlockData;
use cuprate_consensus::transactions::new_tx_verification_data;
use cuprate_consensus::{
BlockChainContextRequest, BlockChainContextResponse, BlockChainContextService,
BlockVerifierService, BlockchainReadRequest, BlockchainResponse, ExtendedConsensusError,
VerifyBlockRequest, VerifyBlockResponse, VerifyTxRequest, VerifyTxResponse,
};
use cuprate_p2p::block_downloader::BlockBatch;
use cuprate_types::blockchain::BlockchainWriteRequest;
use cuprate_types::{Chain, HardFork};
use rayon::prelude::*;
use tower::{Service, ServiceExt};
use tracing::{debug, error, info};
async fn handle_incoming_block_batch_main_chain<C, TxV>(
batch: BlockBatch,
block_verifier_service: &mut BlockVerifierService<C, TxV, ConsensusBlockchainReadHandle>,
blockchain_context_service: &mut C,
blockchain_write_handle: &mut BlockchainWriteHandle,
) -> Result<(), anyhow::Error>
where
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Clone
+ Send
+ 'static,
C::Future: Send + 'static,
TxV: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ExtendedConsensusError>
+ Clone
+ Send
+ 'static,
TxV::Future: Send + 'static,
{
info!(
"Handling batch to main chain height: {}",
batch.blocks.first().unwrap().0.number().unwrap()
);
let VerifyBlockResponse::MainChainBatchPrepped(prepped) = block_verifier_service
.ready()
.await
.expect("TODO")
.call(VerifyBlockRequest::MainChainBatchPrepareBlocks {
blocks: batch.blocks,
})
.await?
else {
panic!("Incorrect response!");
};
for (block, txs) in prepped {
let VerifyBlockResponse::MainChain(verified_block) = block_verifier_service
.ready()
.await
.expect("TODO")
.call(VerifyBlockRequest::MainChainPrepped { block, txs })
.await?
else {
panic!("Incorrect response!");
};
blockchain_context_service
.ready()
.await
.expect("TODO")
.call(BlockChainContextRequest::Update(NewBlockData {
block_hash: verified_block.block_hash,
height: verified_block.height,
timestamp: verified_block.block.header.timestamp,
weight: verified_block.weight,
long_term_weight: verified_block.long_term_weight,
generated_coins: verified_block.generated_coins,
vote: HardFork::from_vote(verified_block.block.header.hardfork_signal),
cumulative_difficulty: verified_block.cumulative_difficulty,
}))
.await
.expect("TODO");
blockchain_write_handle
.ready()
.await
.expect("TODO")
.call(BlockchainWriteRequest::WriteBlock(verified_block))
.await
.expect("TODO");
}
}
async fn handle_incoming_block_batch_alt_chain<C, TxV>(
batch: BlockBatch,
block_verifier_service: &mut BlockVerifierService<C, TxV, ConsensusBlockchainReadHandle>,
blockchain_context_service: &mut C,
blockchain_write_handle: &mut BlockchainWriteHandle,
) -> Result<(), anyhow::Error>
where
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Clone
+ Send
+ 'static,
C::Future: Send + 'static,
TxV: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ExtendedConsensusError>
+ Clone
+ Send
+ 'static,
TxV::Future: Send + 'static,
{
for (block, txs) in batch.blocks {
alt_block_info.cumulative_difficulty
}
}

View file

@ -1,5 +1,8 @@
use crate::blockchain::types::ConsensusBlockchainReadHandle;
use crate::signals::REORG_LOCK;
use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle};
use cuprate_consensus::block::PreparedBlock;
use cuprate_consensus::context::NewBlockData;
use cuprate_consensus::transactions::new_tx_verification_data;
use cuprate_consensus::{
BlockChainContextRequest, BlockChainContextResponse, BlockVerifierService,
@ -10,154 +13,292 @@ use cuprate_p2p::block_downloader::BlockBatch;
use cuprate_types::blockchain::{
BlockchainReadRequest, BlockchainResponse, BlockchainWriteRequest,
};
use cuprate_types::AltBlockInformation;
use cuprate_types::{
AltBlockInformation, HardFork, TransactionVerificationData, VerifiedBlockInformation,
};
use futures::{TryFutureExt, TryStreamExt};
use monero_serai::block::Block;
use monero_serai::transaction::Transaction;
use rayon::prelude::*;
use std::collections::HashMap;
use std::sync::Arc;
use tower::{Service, ServiceExt};
use tracing::info;
async fn handle_incoming_alt_block<C, TxV>(
block: Block,
txs: Vec<Transaction>,
current_cumulative_difficulty: u128,
block_verifier_service: &mut BlockVerifierService<C, TxV, ConsensusBlockchainReadHandle>,
blockchain_context_service: &mut C,
blockchain_write_handle: &mut BlockchainWriteHandle,
) -> Result<(), anyhow::Error>
where
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Clone
+ Send
+ 'static,
C::Future: Send + 'static,
impl super::BlockchainManager {
pub async fn handle_incoming_block(
&mut self,
block: Block,
prepared_txs: HashMap<[u8; 32], TransactionVerificationData>,
) -> Result<(), anyhow::Error> {
if block.header.previous != self.cached_blockchain_context.top_hash {
self.handle_incoming_alt_block(block, prepared_txs).await?;
TxV: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ExtendedConsensusError>
+ Clone
+ Send
+ 'static,
TxV::Future: Send + 'static,
{
let prepared_txs = txs
.into_par_iter()
.map(|tx| {
let tx = new_tx_verification_data(tx)?;
(tx.tx_hash, tx)
})
.collect::<Result<_, _>>()?;
return Ok(());
}
let VerifyBlockResponse::AltChain(alt_block_info) = block_verifier_service
.ready()
.await
.expect("TODO")
.call(VerifyBlockRequest::AltChain {
block,
prepared_txs,
})
.await?
else {
panic!("Incorrect response!");
};
let VerifyBlockResponse::MainChain(verified_block) = self
.block_verifier_service
.ready()
.await
.expect("TODO")
.call(VerifyBlockRequest::MainChain {
block,
prepared_txs,
})
.await?
else {
panic!("Incorrect response!");
};
if alt_block_info.cumulative_difficulty > current_cumulative_difficulty {
todo!("do re-org");
self.add_valid_block_to_main_chain(verified_block).await;
Ok(())
}
blockchain_write_handle
.ready()
.await
.expect("TODO")
.call(BlockchainWriteRequest::WriteAltBlock(alt_block_info))?;
pub async fn handle_incoming_block_batch(&mut self, batch: BlockBatch) {
let (first_block, _) = batch
.blocks
.first()
.expect("Block batch should not be empty");
Ok(())
}
async fn try_do_reorg<C, TxV>(
top_alt_block: AltBlockInformation,
chain_height: usize,
block_verifier_service: &mut BlockVerifierService<C, TxV, ConsensusBlockchainReadHandle>,
blockchain_context_service: &mut C,
blockchain_write_handle: &mut BlockchainWriteHandle,
blockchain_read_handle: &mut BlockchainReadHandle,
) -> Result<(), anyhow::Error>
where
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Clone
+ Send
+ 'static,
C::Future: Send + 'static,
TxV: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ExtendedConsensusError>
+ Clone
+ Send
+ 'static,
TxV::Future: Send + 'static,
{
let BlockchainResponse::AltBlocksInChain(mut alt_blocks) = blockchain_read_handle
.ready()
.await
.expect("TODO")
.call(BlockchainReadRequest::AltBlocksInChain(
top_alt_block.chain_id,
))
.await?
else {
panic!("Incorrect response!");
};
alt_blocks.push(top_alt_block);
let split_height = alt_blocks[0].height;
let BlockchainResponse::PopBlocks(old_main_chain_id) = blockchain_write_handle
.ready()
.await
.expect("TODO")
.call(BlockchainWriteRequest::PopBlocks(
chain_height - split_height + 1,
))
.await?
else {
panic!("Incorrect response!");
};
todo!()
}
async fn verify_add_alt_blocks_to_main_chain<C, TxV>(
alt_blocks: Vec<AltBlockInformation>,
block_verifier_service: &mut BlockVerifierService<C, TxV, ConsensusBlockchainReadHandle>,
blockchain_context_service: &mut C,
blockchain_write_handle: &mut BlockchainWriteHandle,
) -> Result<(), anyhow::Error>
where
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Clone
+ Send
+ 'static,
C::Future: Send + 'static,
TxV: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ExtendedConsensusError>
+ Clone
+ Send
+ 'static,
TxV::Future: Send + 'static,
{
let VerifyBlockResponse::AltChain(alt_block_info) = block_verifier_service
.ready()
.await
.expect("TODO")
.call(VerifyBlockRequest::MainChainPrepped { block, txs })
.await?
else {
panic!("Incorrect response!");
};
if first_block.header.previous == self.cached_blockchain_context.top_hash {
self.handle_incoming_block_batch_main_chain(batch).await.expect("TODO");
} else {
self.handle_incoming_block_batch_alt_chain(batch).await.expect("TODO");
}
}
async fn handle_incoming_block_batch_main_chain(
&mut self,
batch: BlockBatch,
) -> Result<(), anyhow::Error> {
info!(
"Handling batch to main chain height: {}",
batch.blocks.first().unwrap().0.number().unwrap()
);
let VerifyBlockResponse::MainChainBatchPrepped(prepped) = self
.block_verifier_service
.ready()
.await
.expect("TODO")
.call(VerifyBlockRequest::MainChainBatchPrepareBlocks {
blocks: batch.blocks,
})
.await?
else {
panic!("Incorrect response!");
};
for (block, txs) in prepped {
let VerifyBlockResponse::MainChain(verified_block) = self
.block_verifier_service
.ready()
.await
.expect("TODO")
.call(VerifyBlockRequest::MainChainPrepped { block, txs })
.await?
else {
panic!("Incorrect response!");
};
self.add_valid_block_to_main_chain(verified_block).await;
}
Ok(())
}
async fn handle_incoming_block_batch_alt_chain(
&mut self,
batch: BlockBatch,
) -> Result<(), anyhow::Error> {
for (block, txs) in batch.blocks {
let txs = txs
.into_par_iter()
.map(|tx| {
let tx = new_tx_verification_data(tx)?;
Ok((tx.tx_hash, tx))
})
.collect::<Result<_, anyhow::Error>>()?;
self.handle_incoming_alt_block(block, txs).await?;
}
Ok(())
}
pub async fn handle_incoming_alt_block(
&mut self,
block: Block,
prepared_txs: HashMap<[u8; 32], TransactionVerificationData>,
) -> Result<(), anyhow::Error> {
let VerifyBlockResponse::AltChain(alt_block_info) = self
.block_verifier_service
.ready()
.await
.expect("TODO")
.call(VerifyBlockRequest::AltChain {
block,
prepared_txs,
})
.await?
else {
panic!("Incorrect response!");
};
// TODO: check in consensus crate if alt block already exists.
if alt_block_info.cumulative_difficulty
> self.cached_blockchain_context.cumulative_difficulty
{
self.try_do_reorg(alt_block_info).await?;
// TODO: ban the peer if the reorg failed.
return Ok(());
}
self.blockchain_write_handle
.ready()
.await
.expect("TODO")
.call(BlockchainWriteRequest::WriteAltBlock(alt_block_info))
.await?;
Ok(())
}
async fn try_do_reorg(
&mut self,
top_alt_block: AltBlockInformation,
) -> Result<(), anyhow::Error> {
let _guard = REORG_LOCK.write().await;
let BlockchainResponse::AltBlocksInChain(mut alt_blocks) = self
.blockchain_read_handle
.ready()
.await
.expect("TODO")
.call(BlockchainReadRequest::AltBlocksInChain(
top_alt_block.chain_id,
))
.await?
else {
panic!("Incorrect response!");
};
alt_blocks.push(top_alt_block);
let split_height = alt_blocks[0].height;
let current_main_chain_height = self.cached_blockchain_context.chain_height;
let BlockchainResponse::PopBlocks(old_main_chain_id) = self
.blockchain_write_handle
.ready()
.await
.expect("TODO")
.call(BlockchainWriteRequest::PopBlocks(
current_main_chain_height - split_height + 1,
))
.await
.expect("TODO")
else {
panic!("Incorrect response!");
};
self.blockchain_context_service
.ready()
.await
.expect("TODO")
.call(BlockChainContextRequest::PopBlocks {
numb_blocks: current_main_chain_height - split_height + 1,
})
.await
.expect("TODO");
let reorg_res = self.verify_add_alt_blocks_to_main_chain(alt_blocks).await;
match reorg_res {
Ok(()) => Ok(()),
Err(e) => {
todo!("Reverse reorg")
}
}
}
async fn verify_add_alt_blocks_to_main_chain(
&mut self,
alt_blocks: Vec<AltBlockInformation>,
) -> Result<(), anyhow::Error> {
for mut alt_block in alt_blocks {
let prepped_txs = alt_block
.txs
.drain(..)
.map(|tx| Ok(Arc::new(tx.try_into()?)))
.collect::<Result<_, anyhow::Error>>()?;
let prepped_block = PreparedBlock::new_alt_block(alt_block)?;
let VerifyBlockResponse::MainChain(verified_block) = self
.block_verifier_service
.ready()
.await
.expect("TODO")
.call(VerifyBlockRequest::MainChainPrepped {
block: prepped_block,
txs: prepped_txs,
})
.await?
else {
panic!("Incorrect response!");
};
self.add_valid_block_to_main_chain(verified_block).await;
}
Ok(())
}
pub async fn add_valid_block_to_main_chain(
&mut self,
verified_block: VerifiedBlockInformation,
) {
self.blockchain_context_service
.ready()
.await
.expect("TODO")
.call(BlockChainContextRequest::Update(NewBlockData {
block_hash: verified_block.block_hash,
height: verified_block.height,
timestamp: verified_block.block.header.timestamp,
weight: verified_block.weight,
long_term_weight: verified_block.long_term_weight,
generated_coins: verified_block.generated_coins,
vote: HardFork::from_vote(verified_block.block.header.hardfork_signal),
cumulative_difficulty: verified_block.cumulative_difficulty,
}))
.await
.expect("TODO");
self.blockchain_write_handle
.ready()
.await
.expect("TODO")
.call(BlockchainWriteRequest::WriteBlock(verified_block))
.await
.expect("TODO");
let BlockChainContextResponse::Context(blockchain_context) = self
.blockchain_context_service
.ready()
.await
.expect("TODO")
.call(BlockChainContextRequest::GetContext)
.await
.expect("TODO") else {
panic!("Incorrect response!");
};
self.cached_blockchain_context = blockchain_context.unchecked_blockchain_context().clone();
}
}

View file

@ -18,6 +18,7 @@ mod blockchain;
mod config;
mod p2p;
mod rpc;
mod signals;
mod txpool;
use blockchain::check_add_genesis;
@ -58,7 +59,7 @@ fn main() {
context_svc,
block_verifier,
config.block_downloader_config(),
);
).await;
// TODO: this can be removed as long as the main thread does not exit, so when command handling
// is added

View file

@ -0,0 +1,3 @@
use tokio::sync::RwLock;
pub static REORG_LOCK: RwLock<()> = RwLock::const_new(());

View file

@ -88,19 +88,15 @@ mod sealed {
/// An internal trait for the address book for a [`NetworkZone`] that adds the requirement of [`borsh`] traits
/// onto the network address.
pub trait BorshNetworkZone:
NetworkZone<
Addr: NetZoneAddress<BanID: borsh::BorshDeserialize + borsh::BorshSerialize>
+ borsh::BorshDeserialize
+ borsh::BorshSerialize,
>
{
pub trait BorshNetworkZone: NetworkZone<Addr = Self::BorshAddr> {
type BorshAddr: NetZoneAddress + borsh::BorshDeserialize + borsh::BorshSerialize;
}
impl<T: NetworkZone> BorshNetworkZone for T
where
T::Addr: borsh::BorshDeserialize + borsh::BorshSerialize,
<T::Addr as NetZoneAddress>::BanID: borsh::BorshDeserialize + borsh::BorshSerialize,
{
type BorshAddr = T::Addr;
}
}

Binary file not shown.

View file

@ -628,7 +628,7 @@ fn next_missing_chain_entry(env: &ConcreteEnv, block_hashes: Vec<[u8; 32]>) -> R
let block_hashes: Vec<_> = table_block_infos
.get_range(start_height..end_height)?
.map(|block_info| Ok(block_info?.block_hash))
.collect::<Result<_, _>>()?;
.collect::<Result<_, RuntimeError>>()?;
let first_missing_block = if block_hashes.len() > 1 {
let table_block_blobs = env_inner.open_db_ro::<BlockBlobs>(&tx_ro)?;