diff --git a/binaries/cuprated/src/blockchain.rs b/binaries/cuprated/src/blockchain.rs index 46cc6a43..f05878ac 100644 --- a/binaries/cuprated/src/blockchain.rs +++ b/binaries/cuprated/src/blockchain.rs @@ -3,7 +3,8 @@ //! Will contain the chain manager and syncer. use futures::FutureExt; -use tokio::sync::mpsc; +use std::sync::Arc; +use tokio::sync::{mpsc, Notify}; use tower::{Service, ServiceExt}; use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle}; @@ -26,6 +27,7 @@ use types::{ ChainService, ConcreteBlockVerifierService, ConcreteTxVerifierService, ConsensusBlockchainReadHandle, }; +use crate::blockchain::free::INCOMING_BLOCK_TX; /// Checks if the genesis block is in the blockchain and adds it if not. pub async fn check_add_genesis( @@ -107,12 +109,17 @@ pub async fn init_blockchain_manager( block_downloader_config: BlockDownloaderConfig, ) { let (batch_tx, batch_rx) = mpsc::channel(1); + let stop_current_block_downloader = Arc::new(Notify::new()); + let (command_tx, command_rx) = mpsc::channel(1); + + INCOMING_BLOCK_TX.set(command_tx).unwrap(); tokio::spawn(syncer::syncer( blockchain_context_service.clone(), ChainService(blockchain_read_handle.clone()), - clearnet_interface, + clearnet_interface.clone(), batch_tx, + stop_current_block_downloader.clone(), block_downloader_config, )); @@ -121,8 +128,10 @@ pub async fn init_blockchain_manager( blockchain_read_handle, blockchain_context_service, block_verifier_service, + stop_current_block_downloader, + clearnet_interface.broadcast_svc(), ) .await; - tokio::spawn(manager.run(batch_rx)); + tokio::spawn(manager.run(batch_rx, command_rx)); } diff --git a/binaries/cuprated/src/blockchain/free.rs b/binaries/cuprated/src/blockchain/free.rs index becdf307..eb80a31d 100644 --- a/binaries/cuprated/src/blockchain/free.rs +++ b/binaries/cuprated/src/blockchain/free.rs @@ -1,4 +1,3 @@ -use crate::blockchain::manager::IncomingBlock; use cuprate_blockchain::service::BlockchainReadHandle; use cuprate_consensus::transactions::new_tx_verification_data; use cuprate_helper::cast::usize_to_u64; @@ -11,10 +10,11 @@ use std::collections::HashMap; use std::sync::OnceLock; use tokio::sync::{mpsc, oneshot}; use tower::{Service, ServiceExt}; +use crate::blockchain::manager::commands::BlockchainManagerCommand; -static INCOMING_BLOCK_TX: OnceLock> = OnceLock::new(); +pub static INCOMING_BLOCK_TX: OnceLock> = OnceLock::new(); -#[derive(thiserror::Error)] +#[derive(Debug, thiserror::Error)] pub enum IncomingBlockError { #[error("Unknown transactions in block.")] UnknownTransactions(Vec), @@ -28,8 +28,8 @@ pub async fn handle_incoming_block( block: Block, given_txs: Vec, blockchain_read_handle: &mut BlockchainReadHandle, -) -> Result<(), IncomingBlockError> { - if !block_exists(block.header.previous, blockchain_read_handle).expect("TODO") { +) -> Result { + if !block_exists(block.header.previous, blockchain_read_handle).await.expect("TODO") { return Err(IncomingBlockError::Orphan); } @@ -39,7 +39,14 @@ pub async fn handle_incoming_block( .await .expect("TODO") { - return Ok(()); + return Ok(false); + } + + // TODO: Get transactions from the tx pool first. + if given_txs.len() != block.transactions.len() { + return Err(IncomingBlockError::UnknownTransactions( + (0..usize_to_u64(block.transactions.len())).collect(), + )); } let prepped_txs = given_txs @@ -51,21 +58,14 @@ pub async fn handle_incoming_block( .collect::>() .map_err(IncomingBlockError::InvalidBlock)?; - // TODO: Get transactions from the tx pool first. - if given_txs.len() != block.transactions.len() { - return Err(IncomingBlockError::UnknownTransactions( - (0..usize_to_u64(block.transactions.len())).collect(), - )); - } - let Some(incoming_block_tx) = INCOMING_BLOCK_TX.get() else { - return Ok(()); + return Ok(false); }; let (response_tx, response_rx) = oneshot::channel(); incoming_block_tx - .send(IncomingBlock { + .send( BlockchainManagerCommand::AddBlock { block, prepped_txs, response_tx, @@ -73,7 +73,7 @@ pub async fn handle_incoming_block( .await .expect("TODO: don't actually panic here"); - response_rx.await.map_err(IncomingBlockError::InvalidBlock) + response_rx.await.unwrap().map_err(IncomingBlockError::InvalidBlock) } async fn block_exists( diff --git a/binaries/cuprated/src/blockchain/manager.rs b/binaries/cuprated/src/blockchain/manager.rs index ae5a1d3d..b208436c 100644 --- a/binaries/cuprated/src/blockchain/manager.rs +++ b/binaries/cuprated/src/blockchain/manager.rs @@ -1,4 +1,5 @@ mod handler; +pub(super) mod commands; use crate::blockchain::types::ConsensusBlockchainReadHandle; use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle}; @@ -9,21 +10,20 @@ use cuprate_consensus::{ VerifyBlockResponse, VerifyTxRequest, VerifyTxResponse, }; use cuprate_p2p::block_downloader::BlockBatch; +use cuprate_p2p::BroadcastSvc; +use cuprate_p2p_core::ClearNet; use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse}; use cuprate_types::{Chain, TransactionVerificationData}; use futures::StreamExt; use monero_serai::block::Block; use std::collections::HashMap; +use std::sync::Arc; use tokio::sync::mpsc; use tokio::sync::{oneshot, Notify}; use tower::{Service, ServiceExt}; use tracing::error; - -pub struct IncomingBlock { - pub block: Block, - pub prepped_txs: HashMap<[u8; 32], TransactionVerificationData>, - pub response_tx: oneshot::Sender>, -} +use tracing_subscriber::fmt::time::FormatTime; +use crate::blockchain::manager::commands::BlockchainManagerCommand; pub struct BlockchainManager { blockchain_write_handle: BlockchainWriteHandle, @@ -35,7 +35,8 @@ pub struct BlockchainManager { TxVerifierService, ConsensusBlockchainReadHandle, >, - stop_current_block_downloader: Notify, + stop_current_block_downloader: Arc, + broadcast_svc: BroadcastSvc, } impl BlockchainManager { @@ -48,6 +49,8 @@ impl BlockchainManager { TxVerifierService, ConsensusBlockchainReadHandle, >, + stop_current_block_downloader: Arc, + broadcast_svc: BroadcastSvc, ) -> Self { let BlockChainContextResponse::Context(blockchain_context) = blockchain_context_service .ready() @@ -66,13 +69,15 @@ impl BlockchainManager { blockchain_context_service, cached_blockchain_context: blockchain_context.unchecked_blockchain_context().clone(), block_verifier_service, + stop_current_block_downloader, + broadcast_svc, } } pub async fn run( mut self, mut block_batch_rx: mpsc::Receiver, - mut block_single_rx: mpsc::Receiver, + mut command_rx: mpsc::Receiver, ) { loop { tokio::select! { @@ -81,15 +86,8 @@ impl BlockchainManager { batch, ).await; } - Some(incoming_block) = block_single_rx.recv() => { - let IncomingBlock { - block, - prepped_txs, - response_tx - } = incoming_block; - - let res = self.handle_incoming_block(block, prepped_txs).await; - let _ = response_tx.send(res); + Some(incoming_command) = command_rx.recv() => { + self.handle_command(incoming_command).await; } else => { todo!("TODO: exit the BC manager") diff --git a/binaries/cuprated/src/blockchain/manager/commands.rs b/binaries/cuprated/src/blockchain/manager/commands.rs new file mode 100644 index 00000000..1b6f4a48 --- /dev/null +++ b/binaries/cuprated/src/blockchain/manager/commands.rs @@ -0,0 +1,17 @@ +use std::collections::HashMap; + +use monero_serai::block::Block; +use tokio::sync::oneshot; + +use cuprate_types::TransactionVerificationData; + +pub enum BlockchainManagerCommand { + AddBlock { + block: Block, + prepped_txs: HashMap<[u8; 32], TransactionVerificationData>, + response_tx: oneshot::Sender>, + }, + + PopBlocks, +} + diff --git a/binaries/cuprated/src/blockchain/manager/handler.rs b/binaries/cuprated/src/blockchain/manager/handler.rs index 1bdae16c..a925e710 100644 --- a/binaries/cuprated/src/blockchain/manager/handler.rs +++ b/binaries/cuprated/src/blockchain/manager/handler.rs @@ -1,8 +1,8 @@ -use std::{collections::HashMap, sync::Arc}; - +use bytes::Bytes; use futures::{TryFutureExt, TryStreamExt}; use monero_serai::{block::Block, transaction::Transaction}; use rayon::prelude::*; +use std::{collections::HashMap, sync::Arc}; use tower::{Service, ServiceExt}; use tracing::info; @@ -13,15 +13,45 @@ use cuprate_consensus::{ ExtendedConsensusError, VerifyBlockRequest, VerifyBlockResponse, VerifyTxRequest, VerifyTxResponse, }; -use cuprate_p2p::{block_downloader::BlockBatch, constants::LONG_BAN}; +use cuprate_helper::cast::usize_to_u64; +use cuprate_p2p::{block_downloader::BlockBatch, constants::LONG_BAN, BroadcastRequest}; use cuprate_types::{ blockchain::{BlockchainReadRequest, BlockchainResponse, BlockchainWriteRequest}, AltBlockInformation, HardFork, TransactionVerificationData, VerifiedBlockInformation, }; use crate::{blockchain::types::ConsensusBlockchainReadHandle, signals::REORG_LOCK}; +use crate::blockchain::manager::commands::BlockchainManagerCommand; impl super::BlockchainManager { + pub async fn handle_command(&mut self, command: BlockchainManagerCommand) { + match command { + BlockchainManagerCommand::AddBlock { + block, + prepped_txs, + response_tx + } => { + let res = self.handle_incoming_block(block, prepped_txs).await; + + drop(response_tx.send(res)); + } + BlockchainManagerCommand::PopBlocks => todo!() + } + } + + async fn broadcast_block(&mut self, block_bytes: Bytes, blockchain_height: usize) { + self.broadcast_svc + .ready() + .await + .expect("TODO") + .call(BroadcastRequest::Block { + block_bytes, + current_blockchain_height: usize_to_u64(blockchain_height), + }) + .await + .expect("TODO"); + } + /// Handle an incoming [`Block`]. /// /// This function will route to [`Self::handle_incoming_alt_block`] if the block does not follow @@ -56,8 +86,12 @@ impl super::BlockchainManager { panic!("Incorrect response!"); }; + let block_blob = Bytes::copy_from_slice(&verified_block.block_blob); self.add_valid_block_to_main_chain(verified_block).await; + self.broadcast_block(block_blob, self.cached_blockchain_context.chain_height) + .await; + Ok(true) } @@ -101,11 +135,6 @@ impl super::BlockchainManager { batch.blocks.first().unwrap().0.number().unwrap() ); - let ban_cancel_download = || { - batch.peer_handle.ban_peer(LONG_BAN); - self.stop_current_block_downloader.notify_one(); - }; - let batch_prep_res = self .block_verifier_service .ready() @@ -119,7 +148,8 @@ impl super::BlockchainManager { let prepped_blocks = match batch_prep_res { Ok(VerifyBlockResponse::MainChainBatchPrepped(prepped_blocks)) => prepped_blocks, Err(_) => { - ban_cancel_download(); + batch.peer_handle.ban_peer(LONG_BAN); + self.stop_current_block_downloader.notify_one(); return; } _ => panic!("Incorrect response!"), @@ -134,10 +164,11 @@ impl super::BlockchainManager { .call(VerifyBlockRequest::MainChainPrepped { block, txs }) .await; - let VerifyBlockResponse::MainChain(verified_block) = match verify_res { + let verified_block = match verify_res { Ok(VerifyBlockResponse::MainChain(verified_block)) => verified_block, Err(_) => { - ban_cancel_download(); + batch.peer_handle.ban_peer(LONG_BAN); + self.stop_current_block_downloader.notify_one(); return; } _ => panic!("Incorrect response!"), @@ -145,8 +176,6 @@ impl super::BlockchainManager { self.add_valid_block_to_main_chain(verified_block).await; } - - Ok(()) } /// Handles an incoming [`BlockBatch`] that does not follow the main-chain. @@ -175,7 +204,7 @@ impl super::BlockchainManager { self.handle_incoming_alt_block(block, txs).await?; - Ok(()) + Ok::<_, anyhow::Error>(()) } .await; diff --git a/binaries/cuprated/src/blockchain/syncer.rs b/binaries/cuprated/src/blockchain/syncer.rs index fbf4a88f..286d8a50 100644 --- a/binaries/cuprated/src/blockchain/syncer.rs +++ b/binaries/cuprated/src/blockchain/syncer.rs @@ -1,4 +1,5 @@ use std::pin::pin; +use std::sync::Arc; use std::time::Duration; use futures::StreamExt; @@ -31,7 +32,7 @@ pub async fn syncer( our_chain: CN, clearnet_interface: NetworkInterface, incoming_block_batch_tx: mpsc::Sender, - stop_current_block_downloader: Notify, + stop_current_block_downloader: Arc, block_downloader_config: BlockDownloaderConfig, ) -> Result<(), SyncerError> where diff --git a/p2p_state.bin b/p2p_state.bin deleted file mode 100644 index 9faaaff0..00000000 Binary files a/p2p_state.bin and /dev/null differ