diff --git a/binaries/cuprated/src/blockchain.rs b/binaries/cuprated/src/blockchain.rs index e1f5765..eb23224 100644 --- a/binaries/cuprated/src/blockchain.rs +++ b/binaries/cuprated/src/blockchain.rs @@ -1,6 +1,8 @@ //! Blockchain //! //! Will contain the chain manager and syncer. + +use futures::FutureExt; use tokio::sync::mpsc; use tower::{Service, ServiceExt}; @@ -95,7 +97,7 @@ pub async fn init_consensus( } /// Initializes the blockchain manager task and syncer. -pub fn init_blockchain_manager( +pub async fn init_blockchain_manager( clearnet_interface: NetworkInterface, blockchain_write_handle: BlockchainWriteHandle, blockchain_read_handle: BlockchainReadHandle, @@ -118,7 +120,7 @@ pub fn init_blockchain_manager( blockchain_read_handle, blockchain_context_service, block_verifier_service, - ); + ).await; tokio::spawn(manager.run(batch_rx)); } diff --git a/binaries/cuprated/src/blockchain/manager.rs b/binaries/cuprated/src/blockchain/manager.rs index eaf8669..e1e78d4 100644 --- a/binaries/cuprated/src/blockchain/manager.rs +++ b/binaries/cuprated/src/blockchain/manager.rs @@ -1,6 +1,6 @@ -mod batch_handler; mod handler; +use std::collections::HashMap; use crate::blockchain::types::ConsensusBlockchainReadHandle; use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle}; use cuprate_consensus::context::RawBlockChainContext; @@ -11,12 +11,20 @@ use cuprate_consensus::{ }; use cuprate_p2p::block_downloader::BlockBatch; use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse}; -use cuprate_types::Chain; +use cuprate_types::{Chain, TransactionVerificationData}; use futures::StreamExt; -use tokio::sync::mpsc::Receiver; +use monero_serai::block::Block; +use tokio::sync::mpsc; +use tokio::sync::{Notify, oneshot}; use tower::{Service, ServiceExt}; use tracing::error; +pub struct IncomingBlock { + block: Block, + prepped_txs: HashMap<[u8; 32], TransactionVerificationData>, + response_tx: oneshot::Sender>, +} + pub struct BlockchainManager { blockchain_write_handle: BlockchainWriteHandle, blockchain_read_handle: BlockchainReadHandle, @@ -27,105 +35,58 @@ pub struct BlockchainManager { TxVerifierService, ConsensusBlockchainReadHandle, >, + + // TODO: stop_current_block_downloader: Notify, } impl BlockchainManager { - pub const fn new( + pub async fn new( blockchain_write_handle: BlockchainWriteHandle, blockchain_read_handle: BlockchainReadHandle, - blockchain_context_service: BlockChainContextService, + mut blockchain_context_service: BlockChainContextService, block_verifier_service: BlockVerifierService< BlockChainContextService, TxVerifierService, ConsensusBlockchainReadHandle, >, ) -> Self { + let BlockChainContextResponse::Context(blockchain_context) = blockchain_context_service + .ready() + .await + .expect("TODO") + .call(BlockChainContextRequest::GetContext) + .await + .expect("TODO") else { + panic!("Blockchain context service returned wrong response!"); + }; + Self { blockchain_write_handle, blockchain_read_handle, blockchain_context_service, - cached_blockchain_context: todo!(), + cached_blockchain_context: blockchain_context.unchecked_blockchain_context().clone(), block_verifier_service, } } - async fn handle_incoming_main_chain_batch( - &mut self, - batch: BlockBatch, - ) -> Result<(), anyhow::Error> { - let VerifyBlockResponse::MainChainBatchPrepped(prepped) = self - .block_verifier_service - .ready() - .await - .expect("TODO") - .call(VerifyBlockRequest::MainChainBatchPrepareBlocks { - blocks: batch.blocks, - }) - .await? - else { - panic!("Incorrect response!"); - }; - - for (block, txs) in prepped { - let VerifyBlockResponse::MainChain(verified_block) = block_verifier_service - .ready() - .await - .expect("TODO") - .call(VerifyBlockRequest::MainChainPrepped { block, txs }) - .await - .unwrap() - else { - panic!("Incorrect response!"); - }; - - blockchain_context_service - .ready() - .await - .expect("TODO") - .call(BlockChainContextRequest::Update(NewBlockData { - block_hash: verified_block.block_hash, - height: verified_block.height, - timestamp: verified_block.block.header.timestamp, - weight: verified_block.weight, - long_term_weight: verified_block.long_term_weight, - generated_coins: verified_block.generated_coins, - vote: HardFork::from_vote(verified_block.block.header.hardfork_signal), - cumulative_difficulty: verified_block.cumulative_difficulty, - })) - .await - .expect("TODO"); - - blockchain_write_handle - .ready() - .await - .expect("TODO") - .call(BlockchainWriteRequest::WriteBlock(verified_block)) - .await - .expect("TODO"); - } - } - - async fn handle_incoming_block_batch(&mut self, batch: BlockBatch) { - let (first_block, _) = batch - .blocks - .first() - .expect("Block batch should not be empty"); - - if first_block.header.previous == self.cached_blockchain_context.top_hash { - todo!("Main chain") - } else { - todo!("Alt chain") - } - } - - pub async fn run(mut self, mut batch_rx: Receiver) { + pub async fn run(mut self, mut block_batch_rx: mpsc::Receiver, mut block_single_rx: mpsc::Receiver) { loop { tokio::select! { - Some(batch) = batch_rx.recv() => { + Some(batch) = block_batch_rx.recv() => { self.handle_incoming_block_batch( batch, ).await; } + Some(incoming_block) = block_single_rx.recv() => { + let IncomingBlock { + block, + prepped_txs, + response_tx + } = incoming_block; + + let res = self.handle_incoming_block(block, prepped_txs).await; + let _ = response_tx.send(res); + } else => { todo!("TODO: exit the BC manager") } diff --git a/binaries/cuprated/src/blockchain/manager/batch_handler.rs b/binaries/cuprated/src/blockchain/manager/batch_handler.rs deleted file mode 100644 index ea08af4..0000000 --- a/binaries/cuprated/src/blockchain/manager/batch_handler.rs +++ /dev/null @@ -1,121 +0,0 @@ -//! Block batch handling functions. - -use crate::blockchain::types::ConsensusBlockchainReadHandle; -use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle}; -use cuprate_consensus::context::NewBlockData; -use cuprate_consensus::transactions::new_tx_verification_data; -use cuprate_consensus::{ - BlockChainContextRequest, BlockChainContextResponse, BlockChainContextService, - BlockVerifierService, BlockchainReadRequest, BlockchainResponse, ExtendedConsensusError, - VerifyBlockRequest, VerifyBlockResponse, VerifyTxRequest, VerifyTxResponse, -}; -use cuprate_p2p::block_downloader::BlockBatch; -use cuprate_types::blockchain::BlockchainWriteRequest; -use cuprate_types::{Chain, HardFork}; -use rayon::prelude::*; -use tower::{Service, ServiceExt}; -use tracing::{debug, error, info}; - -async fn handle_incoming_block_batch_main_chain( - batch: BlockBatch, - block_verifier_service: &mut BlockVerifierService, - blockchain_context_service: &mut C, - blockchain_write_handle: &mut BlockchainWriteHandle, -) -> Result<(), anyhow::Error> -where - C: Service< - BlockChainContextRequest, - Response = BlockChainContextResponse, - Error = tower::BoxError, - > + Clone - + Send - + 'static, - C::Future: Send + 'static, - - TxV: Service - + Clone - + Send - + 'static, - TxV::Future: Send + 'static, -{ - info!( - "Handling batch to main chain height: {}", - batch.blocks.first().unwrap().0.number().unwrap() - ); - - let VerifyBlockResponse::MainChainBatchPrepped(prepped) = block_verifier_service - .ready() - .await - .expect("TODO") - .call(VerifyBlockRequest::MainChainBatchPrepareBlocks { - blocks: batch.blocks, - }) - .await? - else { - panic!("Incorrect response!"); - }; - - for (block, txs) in prepped { - let VerifyBlockResponse::MainChain(verified_block) = block_verifier_service - .ready() - .await - .expect("TODO") - .call(VerifyBlockRequest::MainChainPrepped { block, txs }) - .await? - else { - panic!("Incorrect response!"); - }; - - blockchain_context_service - .ready() - .await - .expect("TODO") - .call(BlockChainContextRequest::Update(NewBlockData { - block_hash: verified_block.block_hash, - height: verified_block.height, - timestamp: verified_block.block.header.timestamp, - weight: verified_block.weight, - long_term_weight: verified_block.long_term_weight, - generated_coins: verified_block.generated_coins, - vote: HardFork::from_vote(verified_block.block.header.hardfork_signal), - cumulative_difficulty: verified_block.cumulative_difficulty, - })) - .await - .expect("TODO"); - - blockchain_write_handle - .ready() - .await - .expect("TODO") - .call(BlockchainWriteRequest::WriteBlock(verified_block)) - .await - .expect("TODO"); - } -} - -async fn handle_incoming_block_batch_alt_chain( - batch: BlockBatch, - block_verifier_service: &mut BlockVerifierService, - blockchain_context_service: &mut C, - blockchain_write_handle: &mut BlockchainWriteHandle, -) -> Result<(), anyhow::Error> -where - C: Service< - BlockChainContextRequest, - Response = BlockChainContextResponse, - Error = tower::BoxError, - > + Clone - + Send - + 'static, - C::Future: Send + 'static, - - TxV: Service - + Clone - + Send - + 'static, - TxV::Future: Send + 'static, -{ - for (block, txs) in batch.blocks { - alt_block_info.cumulative_difficulty - } -} diff --git a/binaries/cuprated/src/blockchain/manager/handler.rs b/binaries/cuprated/src/blockchain/manager/handler.rs index 221acb8..6dcffef 100644 --- a/binaries/cuprated/src/blockchain/manager/handler.rs +++ b/binaries/cuprated/src/blockchain/manager/handler.rs @@ -1,5 +1,8 @@ use crate::blockchain::types::ConsensusBlockchainReadHandle; +use crate::signals::REORG_LOCK; use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle}; +use cuprate_consensus::block::PreparedBlock; +use cuprate_consensus::context::NewBlockData; use cuprate_consensus::transactions::new_tx_verification_data; use cuprate_consensus::{ BlockChainContextRequest, BlockChainContextResponse, BlockVerifierService, @@ -10,154 +13,292 @@ use cuprate_p2p::block_downloader::BlockBatch; use cuprate_types::blockchain::{ BlockchainReadRequest, BlockchainResponse, BlockchainWriteRequest, }; -use cuprate_types::AltBlockInformation; +use cuprate_types::{ + AltBlockInformation, HardFork, TransactionVerificationData, VerifiedBlockInformation, +}; +use futures::{TryFutureExt, TryStreamExt}; use monero_serai::block::Block; use monero_serai::transaction::Transaction; use rayon::prelude::*; +use std::collections::HashMap; +use std::sync::Arc; use tower::{Service, ServiceExt}; +use tracing::info; -async fn handle_incoming_alt_block( - block: Block, - txs: Vec, - current_cumulative_difficulty: u128, - block_verifier_service: &mut BlockVerifierService, - blockchain_context_service: &mut C, - blockchain_write_handle: &mut BlockchainWriteHandle, -) -> Result<(), anyhow::Error> -where - C: Service< - BlockChainContextRequest, - Response = BlockChainContextResponse, - Error = tower::BoxError, - > + Clone - + Send - + 'static, - C::Future: Send + 'static, +impl super::BlockchainManager { + pub async fn handle_incoming_block( + &mut self, + block: Block, + prepared_txs: HashMap<[u8; 32], TransactionVerificationData>, + ) -> Result<(), anyhow::Error> { + if block.header.previous != self.cached_blockchain_context.top_hash { + self.handle_incoming_alt_block(block, prepared_txs).await?; - TxV: Service - + Clone - + Send - + 'static, - TxV::Future: Send + 'static, -{ - let prepared_txs = txs - .into_par_iter() - .map(|tx| { - let tx = new_tx_verification_data(tx)?; - (tx.tx_hash, tx) - }) - .collect::>()?; + return Ok(()); + } - let VerifyBlockResponse::AltChain(alt_block_info) = block_verifier_service - .ready() - .await - .expect("TODO") - .call(VerifyBlockRequest::AltChain { - block, - prepared_txs, - }) - .await? - else { - panic!("Incorrect response!"); - }; + let VerifyBlockResponse::MainChain(verified_block) = self + .block_verifier_service + .ready() + .await + .expect("TODO") + .call(VerifyBlockRequest::MainChain { + block, + prepared_txs, + }) + .await? + else { + panic!("Incorrect response!"); + }; - if alt_block_info.cumulative_difficulty > current_cumulative_difficulty { - todo!("do re-org"); + self.add_valid_block_to_main_chain(verified_block).await; + + Ok(()) } - blockchain_write_handle - .ready() - .await - .expect("TODO") - .call(BlockchainWriteRequest::WriteAltBlock(alt_block_info))?; + pub async fn handle_incoming_block_batch(&mut self, batch: BlockBatch) { + let (first_block, _) = batch + .blocks + .first() + .expect("Block batch should not be empty"); - Ok(()) -} - -async fn try_do_reorg( - top_alt_block: AltBlockInformation, - chain_height: usize, - block_verifier_service: &mut BlockVerifierService, - blockchain_context_service: &mut C, - blockchain_write_handle: &mut BlockchainWriteHandle, - blockchain_read_handle: &mut BlockchainReadHandle, -) -> Result<(), anyhow::Error> -where - C: Service< - BlockChainContextRequest, - Response = BlockChainContextResponse, - Error = tower::BoxError, - > + Clone - + Send - + 'static, - C::Future: Send + 'static, - - TxV: Service - + Clone - + Send - + 'static, - TxV::Future: Send + 'static, -{ - let BlockchainResponse::AltBlocksInChain(mut alt_blocks) = blockchain_read_handle - .ready() - .await - .expect("TODO") - .call(BlockchainReadRequest::AltBlocksInChain( - top_alt_block.chain_id, - )) - .await? - else { - panic!("Incorrect response!"); - }; - - alt_blocks.push(top_alt_block); - - let split_height = alt_blocks[0].height; - - let BlockchainResponse::PopBlocks(old_main_chain_id) = blockchain_write_handle - .ready() - .await - .expect("TODO") - .call(BlockchainWriteRequest::PopBlocks( - chain_height - split_height + 1, - )) - .await? - else { - panic!("Incorrect response!"); - }; - - todo!() -} - -async fn verify_add_alt_blocks_to_main_chain( - alt_blocks: Vec, - block_verifier_service: &mut BlockVerifierService, - blockchain_context_service: &mut C, - blockchain_write_handle: &mut BlockchainWriteHandle, -) -> Result<(), anyhow::Error> -where - C: Service< - BlockChainContextRequest, - Response = BlockChainContextResponse, - Error = tower::BoxError, - > + Clone - + Send - + 'static, - C::Future: Send + 'static, - - TxV: Service - + Clone - + Send - + 'static, - TxV::Future: Send + 'static, -{ - let VerifyBlockResponse::AltChain(alt_block_info) = block_verifier_service - .ready() - .await - .expect("TODO") - .call(VerifyBlockRequest::MainChainPrepped { block, txs }) - .await? - else { - panic!("Incorrect response!"); - }; + if first_block.header.previous == self.cached_blockchain_context.top_hash { + self.handle_incoming_block_batch_main_chain(batch).await.expect("TODO"); + } else { + self.handle_incoming_block_batch_alt_chain(batch).await.expect("TODO"); + } + } + + async fn handle_incoming_block_batch_main_chain( + &mut self, + batch: BlockBatch, + ) -> Result<(), anyhow::Error> { + info!( + "Handling batch to main chain height: {}", + batch.blocks.first().unwrap().0.number().unwrap() + ); + + let VerifyBlockResponse::MainChainBatchPrepped(prepped) = self + .block_verifier_service + .ready() + .await + .expect("TODO") + .call(VerifyBlockRequest::MainChainBatchPrepareBlocks { + blocks: batch.blocks, + }) + .await? + else { + panic!("Incorrect response!"); + }; + + for (block, txs) in prepped { + let VerifyBlockResponse::MainChain(verified_block) = self + .block_verifier_service + .ready() + .await + .expect("TODO") + .call(VerifyBlockRequest::MainChainPrepped { block, txs }) + .await? + else { + panic!("Incorrect response!"); + }; + + self.add_valid_block_to_main_chain(verified_block).await; + } + + Ok(()) + } + + async fn handle_incoming_block_batch_alt_chain( + &mut self, + batch: BlockBatch, + ) -> Result<(), anyhow::Error> { + for (block, txs) in batch.blocks { + let txs = txs + .into_par_iter() + .map(|tx| { + let tx = new_tx_verification_data(tx)?; + Ok((tx.tx_hash, tx)) + }) + .collect::>()?; + + self.handle_incoming_alt_block(block, txs).await?; + } + + Ok(()) + } + + pub async fn handle_incoming_alt_block( + &mut self, + block: Block, + prepared_txs: HashMap<[u8; 32], TransactionVerificationData>, + ) -> Result<(), anyhow::Error> { + let VerifyBlockResponse::AltChain(alt_block_info) = self + .block_verifier_service + .ready() + .await + .expect("TODO") + .call(VerifyBlockRequest::AltChain { + block, + prepared_txs, + }) + .await? + else { + panic!("Incorrect response!"); + }; + + // TODO: check in consensus crate if alt block already exists. + + if alt_block_info.cumulative_difficulty + > self.cached_blockchain_context.cumulative_difficulty + { + self.try_do_reorg(alt_block_info).await?; + // TODO: ban the peer if the reorg failed. + + return Ok(()); + } + + self.blockchain_write_handle + .ready() + .await + .expect("TODO") + .call(BlockchainWriteRequest::WriteAltBlock(alt_block_info)) + .await?; + + Ok(()) + } + + async fn try_do_reorg( + &mut self, + top_alt_block: AltBlockInformation, + ) -> Result<(), anyhow::Error> { + let _guard = REORG_LOCK.write().await; + + let BlockchainResponse::AltBlocksInChain(mut alt_blocks) = self + .blockchain_read_handle + .ready() + .await + .expect("TODO") + .call(BlockchainReadRequest::AltBlocksInChain( + top_alt_block.chain_id, + )) + .await? + else { + panic!("Incorrect response!"); + }; + + alt_blocks.push(top_alt_block); + + let split_height = alt_blocks[0].height; + let current_main_chain_height = self.cached_blockchain_context.chain_height; + + let BlockchainResponse::PopBlocks(old_main_chain_id) = self + .blockchain_write_handle + .ready() + .await + .expect("TODO") + .call(BlockchainWriteRequest::PopBlocks( + current_main_chain_height - split_height + 1, + )) + .await + .expect("TODO") + else { + panic!("Incorrect response!"); + }; + + self.blockchain_context_service + .ready() + .await + .expect("TODO") + .call(BlockChainContextRequest::PopBlocks { + numb_blocks: current_main_chain_height - split_height + 1, + }) + .await + .expect("TODO"); + + let reorg_res = self.verify_add_alt_blocks_to_main_chain(alt_blocks).await; + + match reorg_res { + Ok(()) => Ok(()), + Err(e) => { + todo!("Reverse reorg") + } + } + } + + async fn verify_add_alt_blocks_to_main_chain( + &mut self, + alt_blocks: Vec, + ) -> Result<(), anyhow::Error> { + for mut alt_block in alt_blocks { + let prepped_txs = alt_block + .txs + .drain(..) + .map(|tx| Ok(Arc::new(tx.try_into()?))) + .collect::>()?; + + let prepped_block = PreparedBlock::new_alt_block(alt_block)?; + + let VerifyBlockResponse::MainChain(verified_block) = self + .block_verifier_service + .ready() + .await + .expect("TODO") + .call(VerifyBlockRequest::MainChainPrepped { + block: prepped_block, + txs: prepped_txs, + }) + .await? + else { + panic!("Incorrect response!"); + }; + + self.add_valid_block_to_main_chain(verified_block).await; + } + + Ok(()) + } + + pub async fn add_valid_block_to_main_chain( + &mut self, + verified_block: VerifiedBlockInformation, + ) { + self.blockchain_context_service + .ready() + .await + .expect("TODO") + .call(BlockChainContextRequest::Update(NewBlockData { + block_hash: verified_block.block_hash, + height: verified_block.height, + timestamp: verified_block.block.header.timestamp, + weight: verified_block.weight, + long_term_weight: verified_block.long_term_weight, + generated_coins: verified_block.generated_coins, + vote: HardFork::from_vote(verified_block.block.header.hardfork_signal), + cumulative_difficulty: verified_block.cumulative_difficulty, + })) + .await + .expect("TODO"); + + self.blockchain_write_handle + .ready() + .await + .expect("TODO") + .call(BlockchainWriteRequest::WriteBlock(verified_block)) + .await + .expect("TODO"); + + let BlockChainContextResponse::Context(blockchain_context) = self + .blockchain_context_service + .ready() + .await + .expect("TODO") + .call(BlockChainContextRequest::GetContext) + .await + .expect("TODO") else { + panic!("Incorrect response!"); + }; + + self.cached_blockchain_context = blockchain_context.unchecked_blockchain_context().clone(); + } } diff --git a/binaries/cuprated/src/main.rs b/binaries/cuprated/src/main.rs index 0e706b9..0a88536 100644 --- a/binaries/cuprated/src/main.rs +++ b/binaries/cuprated/src/main.rs @@ -18,6 +18,7 @@ mod blockchain; mod config; mod p2p; mod rpc; +mod signals; mod txpool; use blockchain::check_add_genesis; @@ -58,7 +59,7 @@ fn main() { context_svc, block_verifier, config.block_downloader_config(), - ); + ).await; // TODO: this can be removed as long as the main thread does not exit, so when command handling // is added diff --git a/binaries/cuprated/src/signals.rs b/binaries/cuprated/src/signals.rs new file mode 100644 index 0000000..cafd8cd --- /dev/null +++ b/binaries/cuprated/src/signals.rs @@ -0,0 +1,3 @@ +use tokio::sync::RwLock; + +pub static REORG_LOCK: RwLock<()> = RwLock::const_new(()); diff --git a/p2p/address-book/src/lib.rs b/p2p/address-book/src/lib.rs index 8c531e7..ae35a1b 100644 --- a/p2p/address-book/src/lib.rs +++ b/p2p/address-book/src/lib.rs @@ -88,19 +88,15 @@ mod sealed { /// An internal trait for the address book for a [`NetworkZone`] that adds the requirement of [`borsh`] traits /// onto the network address. - pub trait BorshNetworkZone: - NetworkZone< - Addr: NetZoneAddress - + borsh::BorshDeserialize - + borsh::BorshSerialize, - > - { + pub trait BorshNetworkZone: NetworkZone { + type BorshAddr: NetZoneAddress + borsh::BorshDeserialize + borsh::BorshSerialize; } impl BorshNetworkZone for T where T::Addr: borsh::BorshDeserialize + borsh::BorshSerialize, - ::BanID: borsh::BorshDeserialize + borsh::BorshSerialize, { + type BorshAddr = T::Addr; } + } diff --git a/p2p_state.bin b/p2p_state.bin index bf2d673..281771c 100644 Binary files a/p2p_state.bin and b/p2p_state.bin differ diff --git a/storage/blockchain/src/service/read.rs b/storage/blockchain/src/service/read.rs index 07fa825..e502c9f 100644 --- a/storage/blockchain/src/service/read.rs +++ b/storage/blockchain/src/service/read.rs @@ -628,7 +628,7 @@ fn next_missing_chain_entry(env: &ConcreteEnv, block_hashes: Vec<[u8; 32]>) -> R let block_hashes: Vec<_> = table_block_infos .get_range(start_height..end_height)? .map(|block_info| Ok(block_info?.block_hash)) - .collect::>()?; + .collect::>()?; let first_missing_block = if block_hashes.len() > 1 { let table_block_blobs = env_inner.open_db_ro::(&tx_ro)?;