From a01846954dc35ec2095a2cbbf5d5b12da188ed35 Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Tue, 20 Aug 2024 16:39:23 +0100 Subject: [PATCH] add main chain batch handler --- Cargo.lock | 14 ++ binaries/cuprated/Cargo.toml | 13 ++ binaries/cuprated/src/blockchain.rs | 1 + binaries/cuprated/src/blockchain/manager.rs | 13 ++ .../src/blockchain/manager/batch_handler.rs | 143 ++++++++++++++++++ binaries/cuprated/src/blockchain/syncer.rs | 121 +++++++++++++++ binaries/cuprated/src/blockchain/types.rs | 27 ++++ consensus/fast-sync/src/fast_sync.rs | 3 +- consensus/src/block.rs | 4 +- p2p/p2p/src/lib.rs | 2 +- 10 files changed, 337 insertions(+), 4 deletions(-) create mode 100644 binaries/cuprated/src/blockchain/manager/batch_handler.rs create mode 100644 binaries/cuprated/src/blockchain/types.rs diff --git a/Cargo.lock b/Cargo.lock index 052b1ee6..fbcd326a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -884,6 +884,20 @@ dependencies = [ [[package]] name = "cuprated" version = "0.1.0" +dependencies = [ + "cuprate-blockchain", + "cuprate-consensus", + "cuprate-p2p", + "cuprate-p2p-core", + "cuprate-types", + "futures", + "hex", + "rayon", + "thiserror", + "tokio", + "tower", + "tracing", +] [[package]] name = "curve25519-dalek" diff --git a/binaries/cuprated/Cargo.toml b/binaries/cuprated/Cargo.toml index b5243906..dd6cdc1d 100644 --- a/binaries/cuprated/Cargo.toml +++ b/binaries/cuprated/Cargo.toml @@ -8,6 +8,19 @@ authors = ["Boog900", "hinto-janai", "SyntheticBird45"] repository = "https://github.com/Cuprate/cuprate/tree/main/binaries/cuprated" [dependencies] +cuprate-consensus = { path = "../../consensus" } +cuprate-blockchain = { path = "../../storage/blockchain" } +cuprate-p2p = { path = "../../p2p/p2p" } +cuprate-p2p-core = { path = "../../p2p/p2p-core" } +cuprate-types = { path = "../../types" } + +rayon = { workspace = true } +futures = { workspace = true } +tokio = { workspace = true } +tower = { workspace = true } +tracing = { workspace = true } +thiserror = { workspace = true } +hex = { workspace = true } [lints] workspace = true diff --git a/binaries/cuprated/src/blockchain.rs b/binaries/cuprated/src/blockchain.rs index 4abebeb6..0b6cd3b4 100644 --- a/binaries/cuprated/src/blockchain.rs +++ b/binaries/cuprated/src/blockchain.rs @@ -4,3 +4,4 @@ mod manager; mod syncer; +mod types; diff --git a/binaries/cuprated/src/blockchain/manager.rs b/binaries/cuprated/src/blockchain/manager.rs index 8b137891..c9f9b06b 100644 --- a/binaries/cuprated/src/blockchain/manager.rs +++ b/binaries/cuprated/src/blockchain/manager.rs @@ -1 +1,14 @@ +mod batch_handler; +use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle}; +use cuprate_consensus::{BlockChainContextService, BlockVerifierService, TxVerifierService}; + +struct BlockchainManager { + blockchain_write_handle: BlockchainWriteHandle, + blockchain_context_service: BlockChainContextService, + block_verifier_service: BlockVerifierService< + BlockChainContextService, + TxVerifierService, + BlockchainReadHandle, + >, +} diff --git a/binaries/cuprated/src/blockchain/manager/batch_handler.rs b/binaries/cuprated/src/blockchain/manager/batch_handler.rs new file mode 100644 index 00000000..f080f310 --- /dev/null +++ b/binaries/cuprated/src/blockchain/manager/batch_handler.rs @@ -0,0 +1,143 @@ +//! Block batch handling functions. + +use crate::blockchain::types::ConsensusBlockchainReadHandle; +use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle}; +use cuprate_consensus::context::NewBlockData; +use cuprate_consensus::{ + BlockChainContextRequest, BlockChainContextResponse, BlockChainContextService, + BlockVerifierService, BlockchainReadRequest, BlockchainResponse, ExtendedConsensusError, + VerifyBlockRequest, VerifyBlockResponse, VerifyTxRequest, VerifyTxResponse, +}; +use cuprate_p2p::block_downloader::BlockBatch; +use cuprate_types::blockchain::BlockchainWriteRequest; +use cuprate_types::{Chain, HardFork}; +use tower::{Service, ServiceExt}; +use tracing::{debug, error, info}; + +pub async fn handle_incoming_block_batch( + batch: BlockBatch, + block_verifier_service: &mut BlockVerifierService, + blockchain_context_service: &mut C, + blockchain_read_handle: &mut BlockchainReadHandle, + blockchain_write_handle: &mut BlockchainWriteHandle, +) where + C: Service< + BlockChainContextRequest, + Response = BlockChainContextResponse, + Error = tower::BoxError, + > + Clone + + Send + + 'static, + C::Future: Send + 'static, + + TxV: Service + + Clone + + Send + + 'static, + TxV::Future: Send + 'static, +{ + let (first_block, _) = batch + .blocks + .first() + .expect("Block batch should not be empty"); + + match blockchain_read_handle + .oneshot(BlockchainReadRequest::FindBlock( + first_block.header.previous, + )) + .await + { + Err(_) | Ok(BlockchainResponse::FindBlock(None)) => { + // The block downloader shouldn't be downloading orphan blocks + error!("Failed to find parent block for first block in batch."); + return; + } + Ok(BlockchainResponse::FindBlock(Some((chain, _)))) => match chain { + Chain::Main => { + handle_incoming_block_batch_main_chain( + batch, + block_verifier_service, + blockchain_context_service, + blockchain_write_handle, + ) + .await; + } + Chain::Alt(_) => todo!(), + }, + + Ok(_) => panic!("Blockchain service returned incorrect response"), + } +} + +async fn handle_incoming_block_batch_main_chain( + batch: BlockBatch, + block_verifier_service: &mut BlockVerifierService, + blockchain_context_service: &mut C, + blockchain_write_handle: &mut BlockchainWriteHandle, +) where + C: Service< + BlockChainContextRequest, + Response = BlockChainContextResponse, + Error = tower::BoxError, + > + Clone + + Send + + 'static, + C::Future: Send + 'static, + + TxV: Service + + Clone + + Send + + 'static, + TxV::Future: Send + 'static, +{ + let Ok(VerifyBlockResponse::MainChainBatchPrepped(prepped)) = block_verifier_service + .ready() + .await + .expect("TODO") + .call(VerifyBlockRequest::MainChainBatchPrepareBlocks { + blocks: batch.blocks, + }) + .await + else { + info!("Error verifying batch, banning peer"); + todo!() + }; + + for (block, txs) in prepped { + let Ok(VerifyBlockResponse::MainChain(verified_block)) = block_verifier_service + .ready() + .await + .expect("TODO") + .call(VerifyBlockRequest::MainChainPrepped { block, txs }) + .await + else { + info!("Error verifying batch, banning peer"); + todo!() + }; + + blockchain_context_service + .ready() + .await + .expect("TODO") + .call(BlockChainContextRequest::Update(NewBlockData { + block_hash: verified_block.block_hash, + height: verified_block.height, + timestamp: verified_block.block.header.timestamp, + weight: verified_block.weight, + long_term_weight: verified_block.long_term_weight, + generated_coins: verified_block.generated_coins, + vote: HardFork::from_vote(verified_block.block.header.hardfork_signal), + cumulative_difficulty: verified_block.cumulative_difficulty, + })) + .await + .expect("TODO"); + + blockchain_write_handle + .ready() + .await + .expect("TODO") + .call(BlockchainWriteRequest::WriteBlock(verified_block)) + .await + .expect("TODO"); + } +} diff --git a/binaries/cuprated/src/blockchain/syncer.rs b/binaries/cuprated/src/blockchain/syncer.rs index 8b137891..21c367b8 100644 --- a/binaries/cuprated/src/blockchain/syncer.rs +++ b/binaries/cuprated/src/blockchain/syncer.rs @@ -1 +1,122 @@ +use std::time::Duration; +use futures::StreamExt; +use tokio::{sync::mpsc, time::sleep}; +use tower::{Service, ServiceExt}; +use tracing::instrument; + +use cuprate_consensus::{BlockChainContext, BlockChainContextRequest, BlockChainContextResponse}; +use cuprate_p2p::{ + block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse}, + NetworkInterface, +}; +use cuprate_p2p_core::ClearNet; + +/// An error returned from the [`syncer`]. +#[derive(Debug, thiserror::Error)] +enum SyncerError { + #[error("Incoming block channel closed.")] + IncomingBlockChannelClosed, + #[error("One of our services returned an error: {0}.")] + ServiceError(#[from] tower::BoxError), +} + +#[instrument(level = "debug", skip_all)] +pub async fn syncer( + mut context_svc: C, + our_chain: CN, + clearnet_interface: NetworkInterface, + incoming_block_batch_tx: mpsc::Sender, + block_downloader_config: BlockDownloaderConfig, +) -> Result<(), SyncerError> +where + C: Service< + BlockChainContextRequest, + Response = BlockChainContextResponse, + Error = tower::BoxError, + >, + C::Future: Send + 'static, + CN: Service + + Clone + + Send + + 'static, + CN::Future: Send + 'static, +{ + tracing::info!("Starting blockchain syncer"); + + let BlockChainContextResponse::Context(mut blockchain_ctx) = context_svc + .ready() + .await? + .call(BlockChainContextRequest::GetContext) + .await? + else { + panic!("Blockchain context service returned wrong response!"); + }; + + let mut peer_sync_watch = clearnet_interface.top_sync_stream(); + + tracing::debug!("Waiting for new sync info in top sync channel"); + + while let Some(top_sync_info) = peer_sync_watch.next().await { + tracing::debug!( + "New sync info seen, top height: {}, top block hash: {}", + top_sync_info.chain_height, + hex::encode(top_sync_info.top_hash) + ); + + // The new info could be from a peer giving us a block, so wait a couple seconds to allow the block to + // be added to our blockchain. + sleep(Duration::from_secs(2)).await; + + check_update_blockchain_context(&mut context_svc, &mut blockchain_ctx).await?; + let raw_blockchain_context = blockchain_ctx.unchecked_blockchain_context(); + + if top_sync_info.cumulative_difficulty <= raw_blockchain_context.cumulative_difficulty { + tracing::debug!("New peer sync info is not ahead, nothing to do."); + continue; + } + + tracing::debug!( + "We are behind peers claimed cumulative difficulty, starting block downloader" + ); + let mut block_batch_stream = + clearnet_interface.block_downloader(our_chain.clone(), block_downloader_config); + + while let Some(batch) = block_batch_stream.next().await { + tracing::debug!("Got batch, len: {}", batch.blocks.len()); + if incoming_block_batch_tx.send(batch).await.is_err() { + return Err(SyncerError::IncomingBlockChannelClosed); + } + } + } + + Ok(()) +} + +async fn check_update_blockchain_context( + context_svc: C, + old_context: &mut BlockChainContext, +) -> Result<(), tower::BoxError> +where + C: Service< + BlockChainContextRequest, + Response = BlockChainContextResponse, + Error = tower::BoxError, + >, + C::Future: Send + 'static, +{ + if old_context.blockchain_context().is_ok() { + return Ok(()); + } + + let BlockChainContextResponse::Context(ctx) = context_svc + .oneshot(BlockChainContextRequest::GetContext) + .await? + else { + panic!("Blockchain context service returned wrong response!"); + }; + + *old_context = ctx; + + Ok(()) +} diff --git a/binaries/cuprated/src/blockchain/types.rs b/binaries/cuprated/src/blockchain/types.rs new file mode 100644 index 00000000..0387a7df --- /dev/null +++ b/binaries/cuprated/src/blockchain/types.rs @@ -0,0 +1,27 @@ +use cuprate_blockchain::cuprate_database::RuntimeError; +use cuprate_blockchain::service::BlockchainReadHandle; +use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse}; +use futures::future::MapErr; +use futures::TryFutureExt; +use std::task::{Context, Poll}; +use tower::Service; + +#[derive(Clone)] +pub struct ConsensusBlockchainReadHandle(BlockchainReadHandle); + +impl Service for ConsensusBlockchainReadHandle { + type Response = BlockchainResponse; + type Error = tower::BoxError; + type Future = MapErr< + >::Future, + fn(RuntimeError) -> tower::BoxError, + >; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.0.poll_ready(cx).map_err(Into::into) + } + + fn call(&mut self, req: BlockchainReadRequest) -> Self::Future { + self.0.call(req).map_err(Into::into) + } +} diff --git a/consensus/fast-sync/src/fast_sync.rs b/consensus/fast-sync/src/fast_sync.rs index 35fa6742..6baac8d3 100644 --- a/consensus/fast-sync/src/fast_sync.rs +++ b/consensus/fast-sync/src/fast_sync.rs @@ -40,11 +40,12 @@ static HASHES_OF_HASHES: &[HashOfHashes] = &[ const BATCH_SIZE: usize = 4; #[inline] -fn max_height() -> u64 { +pub fn max_height() -> u64 { (HASHES_OF_HASHES.len() * BATCH_SIZE) as u64 } #[derive(Debug, PartialEq)] +#[repr(transparent)] pub struct ValidBlockId(BlockId); fn valid_block_ids(block_ids: &[BlockId]) -> Vec { diff --git a/consensus/src/block.rs b/consensus/src/block.rs index e785a6b7..7297a5a6 100644 --- a/consensus/src/block.rs +++ b/consensus/src/block.rs @@ -250,7 +250,7 @@ where + Clone + Send + 'static, - D: Database + Clone + Send + Sync + 'static, + D: Database + Clone + Send + 'static, D::Future: Send + 'static, { /// Creates a new block verifier. @@ -284,7 +284,7 @@ where + 'static, TxV::Future: Send + 'static, - D: Database + Clone + Send + Sync + 'static, + D: Database + Clone + Send + 'static, D::Future: Send + 'static, { type Response = VerifyBlockResponse; diff --git a/p2p/p2p/src/lib.rs b/p2p/p2p/src/lib.rs index be18c2a3..82ecfce7 100644 --- a/p2p/p2p/src/lib.rs +++ b/p2p/p2p/src/lib.rs @@ -21,7 +21,7 @@ use cuprate_p2p_core::{ CoreSyncSvc, NetworkZone, ProtocolRequestHandler, }; -mod block_downloader; +pub mod block_downloader; mod broadcast; mod client_pool; pub mod config;