diff --git a/binaries/cuprated/src/blockchain.rs b/binaries/cuprated/src/blockchain.rs index eb23224e..46cc6a43 100644 --- a/binaries/cuprated/src/blockchain.rs +++ b/binaries/cuprated/src/blockchain.rs @@ -16,6 +16,7 @@ use cuprate_types::{ VerifiedBlockInformation, }; +mod free; mod manager; mod syncer; mod types; @@ -120,7 +121,8 @@ pub async fn init_blockchain_manager( blockchain_read_handle, blockchain_context_service, block_verifier_service, - ).await; + ) + .await; tokio::spawn(manager.run(batch_rx)); } diff --git a/binaries/cuprated/src/blockchain/free.rs b/binaries/cuprated/src/blockchain/free.rs new file mode 100644 index 00000000..becdf307 --- /dev/null +++ b/binaries/cuprated/src/blockchain/free.rs @@ -0,0 +1,93 @@ +use crate::blockchain::manager::IncomingBlock; +use cuprate_blockchain::service::BlockchainReadHandle; +use cuprate_consensus::transactions::new_tx_verification_data; +use cuprate_helper::cast::usize_to_u64; +use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse}; +use cuprate_types::Chain; +use monero_serai::block::Block; +use monero_serai::transaction::Transaction; +use rayon::prelude::*; +use std::collections::HashMap; +use std::sync::OnceLock; +use tokio::sync::{mpsc, oneshot}; +use tower::{Service, ServiceExt}; + +static INCOMING_BLOCK_TX: OnceLock> = OnceLock::new(); + +#[derive(thiserror::Error)] +pub enum IncomingBlockError { + #[error("Unknown transactions in block.")] + UnknownTransactions(Vec), + #[error("The block has an unknown parent.")] + Orphan, + #[error(transparent)] + InvalidBlock(anyhow::Error), +} + +pub async fn handle_incoming_block( + block: Block, + given_txs: Vec, + blockchain_read_handle: &mut BlockchainReadHandle, +) -> Result<(), IncomingBlockError> { + if !block_exists(block.header.previous, blockchain_read_handle).expect("TODO") { + return Err(IncomingBlockError::Orphan); + } + + let block_hash = block.hash(); + + if block_exists(block_hash, blockchain_read_handle) + .await + .expect("TODO") + { + return Ok(()); + } + + let prepped_txs = given_txs + .into_par_iter() + .map(|tx| { + let tx = new_tx_verification_data(tx)?; + Ok((tx.tx_hash, tx)) + }) + .collect::>() + .map_err(IncomingBlockError::InvalidBlock)?; + + // TODO: Get transactions from the tx pool first. + if given_txs.len() != block.transactions.len() { + return Err(IncomingBlockError::UnknownTransactions( + (0..usize_to_u64(block.transactions.len())).collect(), + )); + } + + let Some(incoming_block_tx) = INCOMING_BLOCK_TX.get() else { + return Ok(()); + }; + + let (response_tx, response_rx) = oneshot::channel(); + + incoming_block_tx + .send(IncomingBlock { + block, + prepped_txs, + response_tx, + }) + .await + .expect("TODO: don't actually panic here"); + + response_rx.await.map_err(IncomingBlockError::InvalidBlock) +} + +async fn block_exists( + block_hash: [u8; 32], + blockchain_read_handle: &mut BlockchainReadHandle, +) -> Result { + let BlockchainResponse::FindBlock(chain) = blockchain_read_handle + .ready() + .await? + .call(BlockchainReadRequest::FindBlock(block_hash)) + .await? + else { + panic!("Invalid blockchain response!"); + }; + + Ok(chain.is_some()) +} diff --git a/binaries/cuprated/src/blockchain/manager.rs b/binaries/cuprated/src/blockchain/manager.rs index e1e78d46..69de3399 100644 --- a/binaries/cuprated/src/blockchain/manager.rs +++ b/binaries/cuprated/src/blockchain/manager.rs @@ -1,6 +1,5 @@ mod handler; -use std::collections::HashMap; use crate::blockchain::types::ConsensusBlockchainReadHandle; use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle}; use cuprate_consensus::context::RawBlockChainContext; @@ -14,15 +13,16 @@ use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse}; use cuprate_types::{Chain, TransactionVerificationData}; use futures::StreamExt; use monero_serai::block::Block; +use std::collections::HashMap; use tokio::sync::mpsc; -use tokio::sync::{Notify, oneshot}; +use tokio::sync::{oneshot, Notify}; use tower::{Service, ServiceExt}; use tracing::error; pub struct IncomingBlock { - block: Block, - prepped_txs: HashMap<[u8; 32], TransactionVerificationData>, - response_tx: oneshot::Sender>, + pub block: Block, + pub prepped_txs: HashMap<[u8; 32], TransactionVerificationData>, + pub response_tx: oneshot::Sender>, } pub struct BlockchainManager { @@ -35,7 +35,6 @@ pub struct BlockchainManager { TxVerifierService, ConsensusBlockchainReadHandle, >, - // TODO: stop_current_block_downloader: Notify, } @@ -56,7 +55,8 @@ impl BlockchainManager { .expect("TODO") .call(BlockChainContextRequest::GetContext) .await - .expect("TODO") else { + .expect("TODO") + else { panic!("Blockchain context service returned wrong response!"); }; @@ -69,7 +69,11 @@ impl BlockchainManager { } } - pub async fn run(mut self, mut block_batch_rx: mpsc::Receiver, mut block_single_rx: mpsc::Receiver) { + pub async fn run( + mut self, + mut block_batch_rx: mpsc::Receiver, + mut block_single_rx: mpsc::Receiver, + ) { loop { tokio::select! { Some(batch) = block_batch_rx.recv() => { diff --git a/binaries/cuprated/src/blockchain/manager/handler.rs b/binaries/cuprated/src/blockchain/manager/handler.rs index 6dcffef7..f9f6ce80 100644 --- a/binaries/cuprated/src/blockchain/manager/handler.rs +++ b/binaries/cuprated/src/blockchain/manager/handler.rs @@ -63,9 +63,13 @@ impl super::BlockchainManager { .expect("Block batch should not be empty"); if first_block.header.previous == self.cached_blockchain_context.top_hash { - self.handle_incoming_block_batch_main_chain(batch).await.expect("TODO"); + self.handle_incoming_block_batch_main_chain(batch) + .await + .expect("TODO"); } else { - self.handle_incoming_block_batch_alt_chain(batch).await.expect("TODO"); + self.handle_incoming_block_batch_alt_chain(batch) + .await + .expect("TODO"); } } @@ -295,7 +299,8 @@ impl super::BlockchainManager { .expect("TODO") .call(BlockChainContextRequest::GetContext) .await - .expect("TODO") else { + .expect("TODO") + else { panic!("Incorrect response!"); }; diff --git a/binaries/cuprated/src/main.rs b/binaries/cuprated/src/main.rs index 0a885363..2c26c118 100644 --- a/binaries/cuprated/src/main.rs +++ b/binaries/cuprated/src/main.rs @@ -59,7 +59,8 @@ fn main() { context_svc, block_verifier, config.block_downloader_config(), - ).await; + ) + .await; // TODO: this can be removed as long as the main thread does not exit, so when command handling // is added diff --git a/p2p/address-book/src/lib.rs b/p2p/address-book/src/lib.rs index ae35a1bb..c0903485 100644 --- a/p2p/address-book/src/lib.rs +++ b/p2p/address-book/src/lib.rs @@ -98,5 +98,4 @@ mod sealed { { type BorshAddr = T::Addr; } - } diff --git a/p2p_state.bin b/p2p_state.bin index 281771ca..fc17e050 100644 Binary files a/p2p_state.bin and b/p2p_state.bin differ