From 69f9d84ae1706310a9bf581ba2d4a28603541ac4 Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Thu, 3 Oct 2024 01:53:47 +0100 Subject: [PATCH] fix merge --- Cargo.lock | 48 ++++++++++++++ binaries/cuprated/src/blockchain.rs | 2 +- binaries/cuprated/src/blockchain/free.rs | 16 ++++- binaries/cuprated/src/blockchain/syncer.rs | 26 ++++---- binaries/cuprated/src/main.rs | 1 + binaries/cuprated/src/p2p/request_handler.rs | 68 ++++++++++++++------ consensus/src/lib.rs | 14 ++-- p2p/p2p-core/src/protocol/try_from.rs | 1 + p2p/p2p/src/client_pool.rs | 13 ++++ p2p/p2p/src/constants.rs | 4 +- p2p/p2p/src/lib.rs | 7 +- storage/blockchain/Cargo.toml | 1 - 12 files changed, 147 insertions(+), 54 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ca5c1546..dd303bf6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1862,6 +1862,16 @@ dependencies = [ "tokio", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num-traits" version = "0.2.19" @@ -1899,6 +1909,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "page_size" version = "0.6.0" @@ -2514,6 +2530,15 @@ dependencies = [ "keccak", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shlex" version = "1.3.0" @@ -2894,6 +2919,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", ] [[package]] @@ -2902,7 +2939,12 @@ version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" dependencies = [ + "nu-ansi-term", + "sharded-slab", + "smallvec", + "thread_local", "tracing-core", + "tracing-log", ] [[package]] @@ -2985,6 +3027,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "version_check" version = "0.9.5" diff --git a/binaries/cuprated/src/blockchain.rs b/binaries/cuprated/src/blockchain.rs index 2668dd8f..649dcef0 100644 --- a/binaries/cuprated/src/blockchain.rs +++ b/binaries/cuprated/src/blockchain.rs @@ -35,7 +35,7 @@ pub use free::{handle_incoming_block, IncomingBlockError}; pub async fn check_add_genesis( blockchain_read_handle: &mut BlockchainReadHandle, blockchain_write_handle: &mut BlockchainWriteHandle, - network: &Network, + network: Network, ) { // Try to get the chain height, will fail if the genesis block is not in the DB. if blockchain_read_handle diff --git a/binaries/cuprated/src/blockchain/free.rs b/binaries/cuprated/src/blockchain/free.rs index d44d273c..56c68edf 100644 --- a/binaries/cuprated/src/blockchain/free.rs +++ b/binaries/cuprated/src/blockchain/free.rs @@ -68,7 +68,12 @@ pub async fn handle_incoming_block( return Ok(false); }; - if !BLOCKS_BEING_HANDLED.get_or_init(|| Mutex::new(HashSet::new())).lock().unwrap().insert(block_hash) { + if !BLOCKS_BEING_HANDLED + .get_or_init(|| Mutex::new(HashSet::new())) + .lock() + .unwrap() + .insert(block_hash) + { return Ok(false); } @@ -83,12 +88,17 @@ pub async fn handle_incoming_block( .await .expect("TODO: don't actually panic here"); - let res =response_rx + let res = response_rx .await .unwrap() .map_err(IncomingBlockError::InvalidBlock); - BLOCKS_BEING_HANDLED.get().unwrap().lock().unwrap().remove(&block_hash); + BLOCKS_BEING_HANDLED + .get() + .unwrap() + .lock() + .unwrap() + .remove(&block_hash); res } diff --git a/binaries/cuprated/src/blockchain/syncer.rs b/binaries/cuprated/src/blockchain/syncer.rs index 286d8a50..4b286ceb 100644 --- a/binaries/cuprated/src/blockchain/syncer.rs +++ b/binaries/cuprated/src/blockchain/syncer.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use std::time::Duration; use futures::StreamExt; +use tokio::time::interval; use tokio::{ sync::{mpsc, Notify}, time::sleep, @@ -17,6 +18,8 @@ use cuprate_p2p::{ }; use cuprate_p2p_core::ClearNet; +const CHECK_SYNC_FREQUENCY: Duration = Duration::from_secs(30); + /// An error returned from the [`syncer`]. #[derive(Debug, thiserror::Error)] pub enum SyncerError { @@ -50,6 +53,8 @@ where { tracing::info!("Starting blockchain syncer"); + let mut check_sync_interval = interval(CHECK_SYNC_FREQUENCY); + let BlockChainContextResponse::Context(mut blockchain_ctx) = context_svc .ready() .await? @@ -59,26 +64,21 @@ where panic!("Blockchain context service returned wrong response!"); }; - let mut peer_sync_watch = clearnet_interface.top_sync_stream(); + let client_pool = clearnet_interface.client_pool(); tracing::debug!("Waiting for new sync info in top sync channel"); - while let Some(top_sync_info) = peer_sync_watch.next().await { - tracing::info!( - "New sync info seen, top height: {}, top block hash: {}", - top_sync_info.chain_height, - hex::encode(top_sync_info.top_hash) - ); + loop { + check_sync_interval.tick().await; - // The new info could be from a peer giving us a block, so wait a couple seconds to allow the block to - // be added to our blockchain. - sleep(Duration::from_secs(2)).await; + tracing::trace!("Checking connected peers to see if we are behind",); check_update_blockchain_context(&mut context_svc, &mut blockchain_ctx).await?; let raw_blockchain_context = blockchain_ctx.unchecked_blockchain_context(); - if top_sync_info.cumulative_difficulty <= raw_blockchain_context.cumulative_difficulty { - tracing::debug!("New peer sync info is not ahead, nothing to do."); + if !client_pool.contains_client_with_more_cumulative_difficulty( + raw_blockchain_context.cumulative_difficulty, + ) { continue; } @@ -103,8 +103,6 @@ where } } } - - Ok(()) } async fn check_update_blockchain_context( diff --git a/binaries/cuprated/src/main.rs b/binaries/cuprated/src/main.rs index 775843df..ad7382d7 100644 --- a/binaries/cuprated/src/main.rs +++ b/binaries/cuprated/src/main.rs @@ -16,6 +16,7 @@ mod config; mod constants; mod p2p; mod rpc; +mod signals; mod statics; mod txpool; diff --git a/binaries/cuprated/src/p2p/request_handler.rs b/binaries/cuprated/src/p2p/request_handler.rs index e3245e7e..86507f1f 100644 --- a/binaries/cuprated/src/p2p/request_handler.rs +++ b/binaries/cuprated/src/p2p/request_handler.rs @@ -1,5 +1,5 @@ use bytes::Bytes; -use cuprate_p2p_core::{ProtocolRequest, ProtocolResponse}; +use cuprate_p2p_core::{NetworkZone, ProtocolRequest, ProtocolResponse}; use futures::future::BoxFuture; use futures::FutureExt; use monero_serai::block::Block; @@ -16,7 +16,8 @@ use cuprate_fixed_bytes::ByteArray; use cuprate_helper::asynch::rayon_spawn_async; use cuprate_helper::cast::usize_to_u64; use cuprate_helper::map::split_u128_into_low_high_bits; -use cuprate_p2p::constants::{MAX_BLOCKCHAIN_SUPPLEMENT_LEN, MAX_BLOCK_BATCH_LEN}; +use cuprate_p2p::constants::MAX_BLOCK_BATCH_LEN; +use cuprate_p2p_core::client::PeerInformation; use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse}; use cuprate_types::BlockCompleteEntry; use cuprate_wire::protocol::{ @@ -25,11 +26,41 @@ use cuprate_wire::protocol::{ }; #[derive(Clone)] -pub struct P2pProtocolRequestHandler { - pub(crate) blockchain_read_handle: BlockchainReadHandle, +pub struct P2pProtocolRequestHandlerMaker { + pub blockchain_read_handle: BlockchainReadHandle, } -impl Service for P2pProtocolRequestHandler { +impl Service> for P2pProtocolRequestHandlerMaker { + type Response = P2pProtocolRequestHandler; + type Error = tower::BoxError; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, peer_information: PeerInformation) -> Self::Future { + // TODO: check peer info. + + let blockchain_read_handle = self.blockchain_read_handle.clone(); + + async { + Ok(P2pProtocolRequestHandler { + peer_information, + blockchain_read_handle, + }) + } + .boxed() + } +} + +#[derive(Clone)] +pub struct P2pProtocolRequestHandler { + peer_information: PeerInformation, + blockchain_read_handle: BlockchainReadHandle, +} + +impl Service for P2pProtocolRequestHandler { type Response = ProtocolResponse; type Error = tower::BoxError; type Future = BoxFuture<'static, Result>; @@ -38,22 +69,8 @@ impl Service for P2pProtocolRequestHandler { Poll::Ready(Ok(())) } - fn call(&mut self, req: ProtocolRequest) -> Self::Future { - match req { - ProtocolRequest::GetObjects(req) => { - get_objects(self.blockchain_read_handle.clone(), req).boxed() - } - ProtocolRequest::GetChain(req) => { - get_chain(self.blockchain_read_handle.clone(), req).boxed() - } - ProtocolRequest::FluffyMissingTxs(_) => async { Ok(ProtocolResponse::NA) }.boxed(), - ProtocolRequest::GetTxPoolCompliment(_) => async { Ok(ProtocolResponse::NA) }.boxed(), - ProtocolRequest::NewBlock(_) => async { Ok(ProtocolResponse::NA) }.boxed(), - ProtocolRequest::NewFluffyBlock(block) => { - new_fluffy_block(self.blockchain_read_handle.clone(), block).boxed() - } - ProtocolRequest::NewTransactions(_) => async { Ok(ProtocolResponse::NA) }.boxed(), - } + fn call(&mut self, _: ProtocolRequest) -> Self::Future { + async { Ok(ProtocolResponse::NA) }.boxed() } } @@ -73,6 +90,9 @@ async fn get_objects( // de-allocate the backing [`Bytes`] drop(req); + return Ok(ProtocolResponse::NA); + /* + let res = blockchain_read_handle .oneshot(BlockchainReadRequest::BlockCompleteEntries(block_ids)) .await?; @@ -91,6 +111,8 @@ async fn get_objects( missed_ids: missed_ids.into(), current_blockchain_height: usize_to_u64(current_blockchain_height), })) + + */ } async fn get_chain( @@ -100,7 +122,9 @@ async fn get_chain( if req.block_ids.is_empty() { Err("No block hashes sent in a `ChainRequest`")?; } + return Ok(ProtocolResponse::NA); + /* if req.block_ids.len() > MAX_BLOCKCHAIN_SUPPLEMENT_LEN { Err("Too many block hashes in a `ChainRequest`")?; } @@ -136,6 +160,8 @@ async fn get_chain( m_block_weights: vec![], first_block: first_missing_block.map_or(Bytes::new(), Bytes::from), })) + + */ } async fn new_fluffy_block( diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index e104cec9..1e473158 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -37,6 +37,7 @@ pub use context::{ pub use transactions::{TxVerifierService, VerifyTxRequest, VerifyTxResponse}; // re-export. +pub use cuprate_consensus_rules::genesis::generate_genesis_block; pub use cuprate_types::{ blockchain::{BlockchainReadRequest, BlockchainResponse}, HardFork, @@ -68,13 +69,10 @@ pub enum ExtendedConsensusError { pub fn initialize_verifier( database: D, ctx_svc: Ctx, -) -> Result< - ( - BlockVerifierService, D>, - TxVerifierService, - ), - ConsensusError, -> +) -> ( + BlockVerifierService, D>, + TxVerifierService, +) where D: Database + Clone + Send + Sync + 'static, D::Future: Send + 'static, @@ -90,7 +88,7 @@ where { let tx_svc = TxVerifierService::new(database.clone()); let block_svc = BlockVerifierService::new(ctx_svc, tx_svc.clone(), database); - Ok((block_svc, tx_svc)) + (block_svc, tx_svc) } use __private::Database; diff --git a/p2p/p2p-core/src/protocol/try_from.rs b/p2p/p2p-core/src/protocol/try_from.rs index d3a7260f..b3c1ae3d 100644 --- a/p2p/p2p-core/src/protocol/try_from.rs +++ b/p2p/p2p-core/src/protocol/try_from.rs @@ -71,6 +71,7 @@ impl TryFrom for ProtocolMessage { ProtocolResponse::NewFluffyBlock(val) => Self::NewFluffyBlock(val), ProtocolResponse::GetChain(val) => Self::ChainEntryResponse(val), ProtocolResponse::GetObjects(val) => Self::GetObjectsResponse(val), + ProtocolResponse::FluffyMissingTxs(val) => Self::FluffyMissingTransactionsRequest(val), ProtocolResponse::NA => return Err(MessageConversionError), }) } diff --git a/p2p/p2p/src/client_pool.rs b/p2p/p2p/src/client_pool.rs index 77d3b6e5..735be76d 100644 --- a/p2p/p2p/src/client_pool.rs +++ b/p2p/p2p/src/client_pool.rs @@ -153,6 +153,19 @@ impl ClientPool { self.borrow_clients(&peers).collect() } + + pub fn contains_client_with_more_cumulative_difficulty( + &self, + cumulative_difficulty: u128, + ) -> bool { + self.clients + .iter() + .find(|element| { + let sync_data = element.value().info.core_sync_data.lock().unwrap(); + sync_data.cumulative_difficulty() > cumulative_difficulty + }) + .is_some() + } } mod sealed { diff --git a/p2p/p2p/src/constants.rs b/p2p/p2p/src/constants.rs index f2349600..f79bcbf0 100644 --- a/p2p/p2p/src/constants.rs +++ b/p2p/p2p/src/constants.rs @@ -23,7 +23,7 @@ pub(crate) const SHORT_BAN: Duration = Duration::from_secs(60 * 10); pub(crate) const MEDIUM_BAN: Duration = Duration::from_secs(60 * 60 * 24); /// The durations of a long ban. -pub(crate) const LONG_BAN: Duration = Duration::from_secs(60 * 60 * 24 * 7); +pub const LONG_BAN: Duration = Duration::from_secs(60 * 60 * 24 * 7); /// The default amount of time between inbound diffusion flushes. pub(crate) const DIFFUSION_FLUSH_AVERAGE_SECONDS_INBOUND: Duration = Duration::from_secs(5); @@ -53,7 +53,7 @@ pub(crate) const INITIAL_CHAIN_REQUESTS_TO_SEND: usize = 3; /// The enforced maximum amount of blocks to request in a batch. /// /// Requesting more than this will cause the peer to disconnect and potentially lead to bans. -pub(crate) const MAX_BLOCK_BATCH_LEN: usize = 100; +pub const MAX_BLOCK_BATCH_LEN: usize = 100; /// The timeout that the block downloader will use for requests. pub(crate) const BLOCK_DOWNLOADER_REQUEST_TIMEOUT: Duration = Duration::from_secs(30); diff --git a/p2p/p2p/src/lib.rs b/p2p/p2p/src/lib.rs index dbff56d5..37416b2d 100644 --- a/p2p/p2p/src/lib.rs +++ b/p2p/p2p/src/lib.rs @@ -174,9 +174,8 @@ impl NetworkInterface { self.address_book.clone() } - /// Pulls a client from the client pool, returning it in a guard that will return it there when it's - /// dropped. - pub fn borrow_client(&self, peer: &InternalPeerID) -> Option> { - self.pool.borrow_client(peer) + /// TODO + pub fn client_pool(&self) -> &Arc> { + &self.pool } } diff --git a/storage/blockchain/Cargo.toml b/storage/blockchain/Cargo.toml index c1473b6d..6eecb892 100644 --- a/storage/blockchain/Cargo.toml +++ b/storage/blockchain/Cargo.toml @@ -35,7 +35,6 @@ serde = { workspace = true, optional = true } tower = { workspace = true } thread_local = { workspace = true, optional = true } rayon = { workspace = true, optional = true } -bytes = "1.6.0" [dev-dependencies] cuprate-helper = { path = "../../helper", features = ["thread", "cast"] }