From f909c262faf912647e7b800662f467d07a7636ce Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Wed, 21 Aug 2024 16:02:00 +0100 Subject: [PATCH] add blockchain init --- Cargo.lock | 70 +++++++++++++++++++ binaries/cuprated/Cargo.toml | 8 ++- binaries/cuprated/src/blockchain.rs | 67 ++++++++++++++++++ binaries/cuprated/src/blockchain/manager.rs | 49 +++++++++++++- binaries/cuprated/src/blockchain/syncer.rs | 4 +- binaries/cuprated/src/blockchain/types.rs | 75 ++++++++++++++++++++- binaries/cuprated/src/main.rs | 25 ++++++- consensus/src/lib.rs | 15 ++--- 8 files changed, 292 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fbcd326a..58b66fe6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -44,12 +44,55 @@ dependencies = [ "libc", ] +[[package]] +name = "anstream" +version = "0.6.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "418c75fa768af9c03be99d17643f93f79bbba589895012a80e3452a19ddda15b" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + [[package]] name = "anstyle" version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "038dfcf04a5feb68e9c60b21c9625a54c2c0616e79b72b0fd87075a056ae1d1b" +[[package]] +name = "anstyle-parse" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c03a11a9034d92058ceb6ee011ce58af4a9bf61491aa7e1e59ecd24bd40d22d4" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad186efb764318d35165f1758e7dcef3b10628e26d41a44bc5550652e6804391" +dependencies = [ + "windows-sys 0.52.0", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61a38449feb7068f52bb06c12759005cf459ee52bb4adc1d5a7c4322d716fb19" +dependencies = [ + "anstyle", + "windows-sys 0.52.0", +] + [[package]] name = "async-stream" version = "0.3.5" @@ -352,8 +395,10 @@ version = "4.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7e204572485eb3fbf28f871612191521df159bc3e15a9f5064c66dba3a8c05f" dependencies = [ + "anstream", "anstyle", "clap_lex", + "strsim", ] [[package]] @@ -374,6 +419,12 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4b82cf0babdbd58558212896d1a4272303a57bdb245c2bf1147185fb45640e70" +[[package]] +name = "colorchoice" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b6a852b24ab71dffc585bcb46eaf7959d175cb865a7152e35b348d1b2960422" + [[package]] name = "core-foundation" version = "0.9.4" @@ -885,6 +936,7 @@ dependencies = [ name = "cuprated" version = "0.1.0" dependencies = [ + "clap", "cuprate-blockchain", "cuprate-consensus", "cuprate-p2p", @@ -1590,6 +1642,12 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "is_terminal_polyfill" +version = "1.70.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8478577c03552c21db0e2724ffb8986a5ce7af88107e6be5d2ee6e158c12800" + [[package]] name = "itoa" version = "1.0.11" @@ -2623,6 +2681,12 @@ dependencies = [ "spin", ] +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + [[package]] name = "subtle" version = "2.5.0" @@ -2992,6 +3056,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + [[package]] name = "version_check" version = "0.9.4" diff --git a/binaries/cuprated/Cargo.toml b/binaries/cuprated/Cargo.toml index dd6cdc1d..74eb2d59 100644 --- a/binaries/cuprated/Cargo.toml +++ b/binaries/cuprated/Cargo.toml @@ -16,11 +16,13 @@ cuprate-types = { path = "../../types" } rayon = { workspace = true } futures = { workspace = true } -tokio = { workspace = true } +tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } tower = { workspace = true } tracing = { workspace = true } thiserror = { workspace = true } hex = { workspace = true } -[lints] -workspace = true +clap = { workspace = true, features = ["default", "derive"] } + +#[lints] +#workspace = true diff --git a/binaries/cuprated/src/blockchain.rs b/binaries/cuprated/src/blockchain.rs index 0b6cd3b4..a9af7d72 100644 --- a/binaries/cuprated/src/blockchain.rs +++ b/binaries/cuprated/src/blockchain.rs @@ -2,6 +2,73 @@ //! //! Will contain the chain manager and syncer. +use crate::blockchain::manager::BlockchainManager; +use crate::blockchain::types::{ + ChainService, ConcreteBlockVerifierService, ConcreteTxVerifierService, + ConsensusBlockchainReadHandle, +}; +use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle}; +use cuprate_consensus::{ + BlockChainContextService, BlockVerifierService, ContextConfig, TxVerifierService, +}; +use cuprate_p2p::block_downloader::BlockDownloaderConfig; +use cuprate_p2p::NetworkInterface; +use cuprate_p2p_core::ClearNet; +use tokio::sync::mpsc; + mod manager; mod syncer; mod types; + +pub async fn init_consensus( + blockchain_read_handle: BlockchainReadHandle, + context_config: ContextConfig, +) -> Result< + ( + ConcreteBlockVerifierService, + ConcreteTxVerifierService, + BlockChainContextService, + ), + tower::BoxError, +> { + let ctx_service = cuprate_consensus::initialize_blockchain_context( + context_config, + ConsensusBlockchainReadHandle(blockchain_read_handle.clone()), + ) + .await?; + + let (block_verifier_svc, tx_verifier_svc) = cuprate_consensus::initialize_verifier( + ConsensusBlockchainReadHandle(blockchain_read_handle), + ctx_service.clone(), + ); + + Ok((block_verifier_svc, tx_verifier_svc, ctx_service)) +} + +pub fn init_blockchain_manager( + clearnet_interface: NetworkInterface, + block_downloader_config: BlockDownloaderConfig, + blockchain_write_handle: BlockchainWriteHandle, + blockchain_read_handle: BlockchainReadHandle, + blockchain_context_service: BlockChainContextService, + block_verifier_service: ConcreteBlockVerifierService, +) { + let (batch_tx, batch_rx) = mpsc::channel(1); + + tokio::spawn(syncer::syncer( + blockchain_context_service.clone(), + ChainService(blockchain_read_handle.clone()), + clearnet_interface, + batch_tx, + block_downloader_config, + )); + + let manager = BlockchainManager::new( + blockchain_write_handle, + blockchain_read_handle, + blockchain_context_service, + block_verifier_service, + ); + + tokio::spawn(manager.run(batch_rx)); +} diff --git a/binaries/cuprated/src/blockchain/manager.rs b/binaries/cuprated/src/blockchain/manager.rs index c9f9b06b..5a526a59 100644 --- a/binaries/cuprated/src/blockchain/manager.rs +++ b/binaries/cuprated/src/blockchain/manager.rs @@ -1,14 +1,57 @@ mod batch_handler; +use crate::blockchain::manager::batch_handler::handle_incoming_block_batch; +use crate::blockchain::types::ConsensusBlockchainReadHandle; use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle}; use cuprate_consensus::{BlockChainContextService, BlockVerifierService, TxVerifierService}; +use cuprate_p2p::block_downloader::BlockBatch; +use futures::StreamExt; +use tokio::sync::mpsc::Receiver; -struct BlockchainManager { +pub struct BlockchainManager { blockchain_write_handle: BlockchainWriteHandle, + blockchain_read_handle: BlockchainReadHandle, blockchain_context_service: BlockChainContextService, block_verifier_service: BlockVerifierService< BlockChainContextService, - TxVerifierService, - BlockchainReadHandle, + TxVerifierService, + ConsensusBlockchainReadHandle, >, } + +impl BlockchainManager { + pub const fn new( + blockchain_write_handle: BlockchainWriteHandle, + blockchain_read_handle: BlockchainReadHandle, + blockchain_context_service: BlockChainContextService, + block_verifier_service: BlockVerifierService< + BlockChainContextService, + TxVerifierService, + ConsensusBlockchainReadHandle, + >, + ) -> Self { + Self { + blockchain_write_handle, + blockchain_read_handle, + blockchain_context_service, + block_verifier_service, + } + } + + pub async fn run(mut self, mut batch_rx: Receiver) { + tokio::select! { + Some(batch) = batch_rx.recv() => { + handle_incoming_block_batch( + batch, + &mut self.block_verifier_service, + &mut self.blockchain_context_service, + &mut self.blockchain_read_handle, + &mut self.blockchain_write_handle + ).await; + } + else => { + todo!("Exit the BC manager") + } + } + } +} diff --git a/binaries/cuprated/src/blockchain/syncer.rs b/binaries/cuprated/src/blockchain/syncer.rs index 21c367b8..dc738123 100644 --- a/binaries/cuprated/src/blockchain/syncer.rs +++ b/binaries/cuprated/src/blockchain/syncer.rs @@ -14,7 +14,7 @@ use cuprate_p2p_core::ClearNet; /// An error returned from the [`syncer`]. #[derive(Debug, thiserror::Error)] -enum SyncerError { +pub enum SyncerError { #[error("Incoming block channel closed.")] IncomingBlockChannelClosed, #[error("One of our services returned an error: {0}.")] @@ -58,7 +58,7 @@ where tracing::debug!("Waiting for new sync info in top sync channel"); while let Some(top_sync_info) = peer_sync_watch.next().await { - tracing::debug!( + tracing::info!( "New sync info seen, top height: {}, top block hash: {}", top_sync_info.chain_height, hex::encode(top_sync_info.top_hash) diff --git a/binaries/cuprated/src/blockchain/types.rs b/binaries/cuprated/src/blockchain/types.rs index 0387a7df..46576a46 100644 --- a/binaries/cuprated/src/blockchain/types.rs +++ b/binaries/cuprated/src/blockchain/types.rs @@ -1,13 +1,23 @@ use cuprate_blockchain::cuprate_database::RuntimeError; use cuprate_blockchain::service::BlockchainReadHandle; +use cuprate_consensus::{BlockChainContextService, BlockVerifierService, TxVerifierService}; +use cuprate_p2p::block_downloader::{ChainSvcRequest, ChainSvcResponse}; use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse}; -use futures::future::MapErr; -use futures::TryFutureExt; +use futures::future::{BoxFuture, MapErr}; +use futures::{FutureExt, TryFutureExt}; use std::task::{Context, Poll}; use tower::Service; +pub type ConcreteBlockVerifierService = BlockVerifierService< + BlockChainContextService, + TxVerifierService, + ConsensusBlockchainReadHandle, +>; + +pub type ConcreteTxVerifierService = TxVerifierService; + #[derive(Clone)] -pub struct ConsensusBlockchainReadHandle(BlockchainReadHandle); +pub struct ConsensusBlockchainReadHandle(pub BlockchainReadHandle); impl Service for ConsensusBlockchainReadHandle { type Response = BlockchainResponse; @@ -25,3 +35,62 @@ impl Service for ConsensusBlockchainReadHandle { self.0.call(req).map_err(Into::into) } } + +#[derive(Clone)] +pub struct ChainService(pub BlockchainReadHandle); + +impl Service for ChainService { + type Response = ChainSvcResponse; + type Error = tower::BoxError; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.0.poll_ready(cx).map_err(Into::into) + } + + fn call(&mut self, req: ChainSvcRequest) -> Self::Future { + let map_res = |res: BlockchainResponse| match res { + BlockchainResponse::CompactChainHistory { + block_ids, + cumulative_difficulty, + } => ChainSvcResponse::CompactHistory { + block_ids, + cumulative_difficulty, + }, + BlockchainResponse::FindFirstUnknown(res) => ChainSvcResponse::FindFirstUnknown(res), + _ => panic!("Blockchain returned wrong response"), + }; + + match req { + ChainSvcRequest::CompactHistory => self + .0 + .call(BlockchainReadRequest::CompactChainHistory) + .map_ok(map_res) + .map_err(Into::into) + .boxed(), + ChainSvcRequest::FindFirstUnknown(req) => self + .0 + .call(BlockchainReadRequest::FindFirstUnknown(req)) + .map_ok(map_res) + .map_err(Into::into) + .boxed(), + ChainSvcRequest::CumulativeDifficulty => self + .0 + .call(BlockchainReadRequest::CompactChainHistory) + .map_ok(|res| { + // TODO create a custom request instead of hijacking this one. + let BlockchainResponse::CompactChainHistory { + cumulative_difficulty, + .. + } = res + else { + panic!("Blockchain returned wrong response"); + }; + + ChainSvcResponse::CumulativeDifficulty(cumulative_difficulty) + }) + .map_err(Into::into) + .boxed(), + } + } +} diff --git a/binaries/cuprated/src/main.rs b/binaries/cuprated/src/main.rs index 918429c9..4d205a15 100644 --- a/binaries/cuprated/src/main.rs +++ b/binaries/cuprated/src/main.rs @@ -1,9 +1,32 @@ +use clap::Parser; + mod blockchain; mod config; mod p2p; mod rpc; mod txpool; +#[derive(Parser)] +struct Args {} fn main() { - todo!() + let _args = Args::parse(); + + let (bc_read_handle, bc_write_handle, _) = + cuprate_blockchain::service::init(cuprate_blockchain::config::Config::default()).unwrap(); + + let async_rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + + async_rt.block_on(async move { + let (block_verifier, tx_verifier, context_svc) = blockchain::init_consensus( + bc_read_handle, + cuprate_consensus::ContextConfig::main_net(), + ) + .await + .unwrap(); + + //blockchain::init_blockchain_manager() + }); } diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index 3b7f2ae1..2c69e667 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -50,16 +50,13 @@ pub enum ExtendedConsensusError { } /// Initialize the 2 verifier [`tower::Service`]s (block and transaction). -pub async fn initialize_verifier( +pub fn initialize_verifier( database: D, ctx_svc: Ctx, -) -> Result< - ( - BlockVerifierService, D>, - TxVerifierService, - ), - ConsensusError, -> +) -> ( + BlockVerifierService, D>, + TxVerifierService, +) where D: Database + Clone + Send + Sync + 'static, D::Future: Send + 'static, @@ -75,7 +72,7 @@ where { let tx_svc = TxVerifierService::new(database.clone()); let block_svc = BlockVerifierService::new(ctx_svc, tx_svc.clone(), database); - Ok((block_svc, tx_svc)) + (block_svc, tx_svc) } use __private::Database;