diff --git a/Cargo.lock b/Cargo.lock index 2ad83bd3..4593984e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -774,18 +774,16 @@ dependencies = [ name = "cuprate-fast-sync" version = "0.1.0" dependencies = [ + "blake3", "clap", "cuprate-blockchain", "cuprate-consensus", "cuprate-consensus-context", - "cuprate-consensus-rules", - "cuprate-helper", + "cuprate-p2p", + "cuprate-p2p-core", "cuprate-types", "hex", - "hex-literal", "monero-serai", - "sha3", - "thiserror", "tokio", "tower 0.5.1", ] diff --git a/binaries/cuprated/src/blockchain.rs b/binaries/cuprated/src/blockchain.rs index 34317084..a196a9f2 100644 --- a/binaries/cuprated/src/blockchain.rs +++ b/binaries/cuprated/src/blockchain.rs @@ -20,11 +20,13 @@ use cuprate_types::{ use crate::constants::PANIC_CRITICAL_SERVICE_ERROR; mod chain_service; +mod fast_sync; pub mod interface; mod manager; mod syncer; mod types; +pub use fast_sync::set_fast_sync_hashes; pub use manager::init_blockchain_manager; pub use types::ConsensusBlockchainReadHandle; diff --git a/binaries/cuprated/src/blockchain/chain_service.rs b/binaries/cuprated/src/blockchain/chain_service.rs index af862d1d..4716ad4f 100644 --- a/binaries/cuprated/src/blockchain/chain_service.rs +++ b/binaries/cuprated/src/blockchain/chain_service.rs @@ -4,7 +4,9 @@ use futures::{future::BoxFuture, FutureExt, TryFutureExt}; use tower::Service; use cuprate_blockchain::service::BlockchainReadHandle; +use cuprate_fast_sync::validate_entries; use cuprate_p2p::block_downloader::{ChainSvcRequest, ChainSvcResponse}; +use cuprate_p2p_core::NetworkZone; use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse}; /// That service that allows retrieving the chain state to give to the P2P crates, so we can figure out @@ -14,8 +16,8 @@ use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse}; #[derive(Clone)] pub struct ChainService(pub BlockchainReadHandle); -impl Service<ChainSvcRequest> for ChainService { - type Response = ChainSvcResponse; +impl<N: NetworkZone> Service<ChainSvcRequest<N>> for ChainService { + type Response = ChainSvcResponse<N>; type Error = tower::BoxError; type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>; @@ -23,7 +25,7 @@ impl Service<ChainSvcRequest> for ChainService { self.0.poll_ready(cx).map_err(Into::into) } - fn call(&mut self, req: ChainSvcRequest) -> Self::Future { + fn call(&mut self, req: ChainSvcRequest<N>) -> Self::Future { let map_res = |res: BlockchainResponse| match res { BlockchainResponse::CompactChainHistory { block_ids, @@ -67,6 +69,18 @@ impl Service<ChainSvcRequest> for ChainService { }) .map_err(Into::into) .boxed(), + ChainSvcRequest::ValidateEntries(entries, start_height) => { + let mut blockchain_read_handle = self.0.clone(); + + async move { + let (valid, unknown) = + validate_entries(entries, start_height, &mut blockchain_read_handle) + .await?; + + Ok(ChainSvcResponse::ValidateEntries { valid, unknown }) + } + .boxed() + } } } } diff --git a/binaries/cuprated/src/blockchain/fast_sync.rs b/binaries/cuprated/src/blockchain/fast_sync.rs new file mode 100644 index 00000000..63b11df4 --- /dev/null +++ b/binaries/cuprated/src/blockchain/fast_sync.rs @@ -0,0 +1,24 @@ +use std::slice; + +use cuprate_helper::network::Network; + +/// The hashes of the compiled in fast sync file. +static FAST_SYNC_HASHES: &[[u8; 32]] = { + let bytes = include_bytes!("./fast_sync/fast_sync_hashes.bin"); + + if bytes.len() % 32 == 0 { + // SAFETY: The file byte length must be perfectly divisible by 32, checked above. + unsafe { slice::from_raw_parts(bytes.as_ptr().cast::<[u8; 32]>(), bytes.len() / 32) } + } else { + panic!(); + } +}; + +/// Set the fast-sync hashes according to the provided values. +pub fn set_fast_sync_hashes(fast_sync: bool, network: Network) { + cuprate_fast_sync::set_fast_sync_hashes(if fast_sync && network == Network::Mainnet { + FAST_SYNC_HASHES + } else { + &[] + }); +} diff --git a/binaries/cuprated/src/blockchain/fast_sync/fast_sync_hashes.bin b/binaries/cuprated/src/blockchain/fast_sync/fast_sync_hashes.bin new file mode 100644 index 00000000..5d94b7d8 Binary files /dev/null and b/binaries/cuprated/src/blockchain/fast_sync/fast_sync_hashes.bin differ diff --git a/binaries/cuprated/src/blockchain/manager.rs b/binaries/cuprated/src/blockchain/manager.rs index 8dde7ac5..782e6efe 100644 --- a/binaries/cuprated/src/blockchain/manager.rs +++ b/binaries/cuprated/src/blockchain/manager.rs @@ -2,7 +2,7 @@ use std::{collections::HashMap, sync::Arc}; use futures::StreamExt; use monero_serai::block::Block; -use tokio::sync::{mpsc, oneshot, Notify}; +use tokio::sync::{mpsc, oneshot, Notify, OwnedSemaphorePermit}; use tower::{BoxError, Service, ServiceExt}; use tracing::error; @@ -106,15 +106,17 @@ impl BlockchainManager { /// The [`BlockchainManager`] task. pub async fn run( mut self, - mut block_batch_rx: mpsc::Receiver<BlockBatch>, + mut block_batch_rx: mpsc::Receiver<(BlockBatch, Arc<OwnedSemaphorePermit>)>, mut command_rx: mpsc::Receiver<BlockchainManagerCommand>, ) { loop { tokio::select! { - Some(batch) = block_batch_rx.recv() => { + Some((batch, permit)) = block_batch_rx.recv() => { self.handle_incoming_block_batch( batch, ).await; + + drop(permit); } Some(incoming_command) = command_rx.recv() => { self.handle_command(incoming_command).await; diff --git a/binaries/cuprated/src/blockchain/manager/handler.rs b/binaries/cuprated/src/blockchain/manager/handler.rs index 0c8264bf..a6583844 100644 --- a/binaries/cuprated/src/blockchain/manager/handler.rs +++ b/binaries/cuprated/src/blockchain/manager/handler.rs @@ -1,4 +1,6 @@ //! The blockchain manager handler functions. +use std::{collections::HashMap, sync::Arc}; + use bytes::Bytes; use futures::{TryFutureExt, TryStreamExt}; use monero_serai::{ @@ -6,10 +8,8 @@ use monero_serai::{ transaction::{Input, Transaction}, }; use rayon::prelude::*; -use std::ops::ControlFlow; -use std::{collections::HashMap, sync::Arc}; use tower::{Service, ServiceExt}; -use tracing::{info, instrument}; +use tracing::{info, instrument, Span}; use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle}; use cuprate_consensus::{ @@ -21,12 +21,13 @@ use cuprate_consensus::{ BlockChainContextRequest, BlockChainContextResponse, ExtendedConsensusError, }; use cuprate_consensus_context::NewBlockData; +use cuprate_fast_sync::{block_to_verified_block_information, fast_sync_stop_height}; use cuprate_helper::cast::usize_to_u64; use cuprate_p2p::{block_downloader::BlockBatch, constants::LONG_BAN, BroadcastRequest}; use cuprate_txpool::service::interface::TxpoolWriteRequest; use cuprate_types::{ blockchain::{BlockchainReadRequest, BlockchainResponse, BlockchainWriteRequest}, - AltBlockInformation, HardFork, TransactionVerificationData, VerifiedBlockInformation, + AltBlockInformation, Chain, HardFork, TransactionVerificationData, VerifiedBlockInformation, }; use crate::{ @@ -166,6 +167,11 @@ impl super::BlockchainManager { /// This function will panic if any internal service returns an unexpected error that we cannot /// recover from or if the incoming batch contains no blocks. async fn handle_incoming_block_batch_main_chain(&mut self, batch: BlockBatch) { + if batch.blocks.last().unwrap().0.number().unwrap() < fast_sync_stop_height() { + self.handle_incoming_block_batch_fast_sync(batch).await; + return; + } + let Ok((prepped_blocks, mut output_cache)) = batch_prepare_main_chain_blocks( batch.blocks, &mut self.blockchain_context_service, @@ -195,7 +201,32 @@ impl super::BlockchainManager { self.add_valid_block_to_main_chain(verified_block).await; } - info!("Successfully added block batch"); + info!(fast_sync = false, "Successfully added block batch"); + } + + /// Handles an incoming block batch while we are under the fast sync height. + /// + /// # Panics + /// + /// This function will panic if any internal service returns an unexpected error that we cannot + /// recover from. + async fn handle_incoming_block_batch_fast_sync(&mut self, batch: BlockBatch) { + let mut valid_blocks = Vec::with_capacity(batch.blocks.len()); + for (block, txs) in batch.blocks { + let block = block_to_verified_block_information( + block, + txs, + self.blockchain_context_service.blockchain_context(), + ); + self.add_valid_block_to_blockchain_cache(&block).await; + + valid_blocks.push(block); + } + + self.batch_add_valid_block_to_blockchain_database(valid_blocks) + .await; + + info!(fast_sync = true, "Successfully added block batch"); } /// Handles an incoming [`BlockBatch`] that does not follow the main-chain. @@ -212,7 +243,6 @@ impl super::BlockchainManager { /// recover from. async fn handle_incoming_block_batch_alt_chain(&mut self, mut batch: BlockBatch) { // TODO: this needs testing (this whole section does but alt-blocks specifically). - let mut blocks = batch.blocks.into_iter(); while let Some((block, txs)) = blocks.next() { @@ -248,6 +278,8 @@ impl super::BlockchainManager { Ok(AddAltBlock::Cached) => (), } } + + info!(alt_chain = true, "Successfully added block batch"); } /// Handles an incoming alt [`Block`]. @@ -284,9 +316,10 @@ impl super::BlockchainManager { unreachable!(); }; - if chain.is_some() { - // The block could also be in the main-chain here under some circumstances. - return Ok(AddAltBlock::Cached); + match chain { + Some((Chain::Alt(_), _)) => return Ok(AddAltBlock::Cached), + Some((Chain::Main, _)) => anyhow::bail!("Alt block already in main chain"), + None => (), } let alt_block_info = @@ -458,22 +491,8 @@ impl super::BlockchainManager { }) .collect::<Vec<[u8; 32]>>(); - self.blockchain_context_service - .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) - .call(BlockChainContextRequest::Update(NewBlockData { - block_hash: verified_block.block_hash, - height: verified_block.height, - timestamp: verified_block.block.header.timestamp, - weight: verified_block.weight, - long_term_weight: verified_block.long_term_weight, - generated_coins: verified_block.generated_coins, - vote: HardFork::from_vote(verified_block.block.header.hardfork_signal), - cumulative_difficulty: verified_block.cumulative_difficulty, - })) - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR); + self.add_valid_block_to_blockchain_cache(&verified_block) + .await; self.blockchain_write_handle .ready() @@ -491,11 +510,60 @@ impl super::BlockchainManager { .await .expect(PANIC_CRITICAL_SERVICE_ERROR); } + + /// Adds a [`VerifiedBlockInformation`] to the blockchain context cache. + /// + /// # Panics + /// + /// This function will panic if any internal service returns an unexpected error that we cannot + /// recover from. + async fn add_valid_block_to_blockchain_cache( + &mut self, + verified_block: &VerifiedBlockInformation, + ) { + self.blockchain_context_service + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(BlockChainContextRequest::Update(NewBlockData { + block_hash: verified_block.block_hash, + height: verified_block.height, + timestamp: verified_block.block.header.timestamp, + weight: verified_block.weight, + long_term_weight: verified_block.long_term_weight, + generated_coins: verified_block.generated_coins, + vote: HardFork::from_vote(verified_block.block.header.hardfork_signal), + cumulative_difficulty: verified_block.cumulative_difficulty, + })) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR); + } + + /// Batch writes the [`VerifiedBlockInformation`]s to the database. + /// + /// The blocks must be sequential. + /// + /// # Panics + /// + /// This function will panic if any internal service returns an unexpected error that we cannot + /// recover from. + async fn batch_add_valid_block_to_blockchain_database( + &mut self, + blocks: Vec<VerifiedBlockInformation>, + ) { + self.blockchain_write_handle + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(BlockchainWriteRequest::BatchWriteBlocks(blocks)) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR); + } } /// The result from successfully adding an alt-block. enum AddAltBlock { - /// The alt-block was cached or was already present in the DB. + /// The alt-block was cached. Cached, /// The chain was reorged. Reorged, diff --git a/binaries/cuprated/src/blockchain/syncer.rs b/binaries/cuprated/src/blockchain/syncer.rs index 106c3eed..8a2c218f 100644 --- a/binaries/cuprated/src/blockchain/syncer.rs +++ b/binaries/cuprated/src/blockchain/syncer.rs @@ -3,7 +3,7 @@ use std::{sync::Arc, time::Duration}; use futures::StreamExt; use tokio::{ - sync::{mpsc, oneshot, Notify}, + sync::{mpsc, Notify, OwnedSemaphorePermit, Semaphore}, time::interval, }; use tower::{Service, ServiceExt}; @@ -15,7 +15,7 @@ use cuprate_p2p::{ block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse}, NetworkInterface, PeerSetRequest, PeerSetResponse, }; -use cuprate_p2p_core::ClearNet; +use cuprate_p2p_core::{ClearNet, NetworkZone}; const CHECK_SYNC_FREQUENCY: Duration = Duration::from_secs(30); @@ -30,17 +30,21 @@ pub enum SyncerError { /// The syncer tasks that makes sure we are fully synchronised with our connected peers. #[instrument(level = "debug", skip_all)] +#[expect(clippy::significant_drop_tightening)] pub async fn syncer<CN>( mut context_svc: BlockchainContextService, our_chain: CN, mut clearnet_interface: NetworkInterface<ClearNet>, - incoming_block_batch_tx: mpsc::Sender<BlockBatch>, + incoming_block_batch_tx: mpsc::Sender<(BlockBatch, Arc<OwnedSemaphorePermit>)>, stop_current_block_downloader: Arc<Notify>, block_downloader_config: BlockDownloaderConfig, ) -> Result<(), SyncerError> where - CN: Service<ChainSvcRequest, Response = ChainSvcResponse, Error = tower::BoxError> - + Clone + CN: Service< + ChainSvcRequest<ClearNet>, + Response = ChainSvcResponse<ClearNet>, + Error = tower::BoxError, + > + Clone + Send + 'static, CN::Future: Send + 'static, @@ -51,6 +55,9 @@ where tracing::debug!("Waiting for new sync info in top sync channel"); + let semaphore = Arc::new(Semaphore::new(1)); + + let mut sync_permit = Arc::new(Arc::clone(&semaphore).acquire_owned().await.unwrap()); loop { check_sync_interval.tick().await; @@ -72,10 +79,19 @@ where tokio::select! { () = stop_current_block_downloader.notified() => { tracing::info!("Received stop signal, stopping block downloader"); + + drop(sync_permit); + sync_permit = Arc::new(Arc::clone(&semaphore).acquire_owned().await.unwrap()); + break; } batch = block_batch_stream.next() => { let Some(batch) = batch else { + // Wait for all references to the permit have been dropped (which means all blocks in the queue + // have been handled before checking if we are synced. + drop(sync_permit); + sync_permit = Arc::new(Arc::clone(&semaphore).acquire_owned().await.unwrap()); + let blockchain_context = context_svc.blockchain_context(); if !check_behind_peers(blockchain_context, &mut clearnet_interface).await? { @@ -86,7 +102,7 @@ where }; tracing::debug!("Got batch, len: {}", batch.blocks.len()); - if incoming_block_batch_tx.send(batch).await.is_err() { + if incoming_block_batch_tx.send((batch, Arc::clone(&sync_permit))).await.is_err() { return Err(SyncerError::IncomingBlockChannelClosed); } } diff --git a/binaries/cuprated/src/commands.rs b/binaries/cuprated/src/commands.rs index e99282be..89e9cd71 100644 --- a/binaries/cuprated/src/commands.rs +++ b/binaries/cuprated/src/commands.rs @@ -47,6 +47,9 @@ pub enum Command { /// Print status information on `cuprated`. Status, + + /// Print the height of first block not contained in the fast sync hashes. + FastSyncStopHeight, } /// The log output target. @@ -123,6 +126,11 @@ pub async fn io_loop( println!("STATUS:\n uptime: {h}h {m}m {s}s,\n height: {height},\n top_hash: {top_hash}"); } + Command::FastSyncStopHeight => { + let stop_height = cuprate_fast_sync::fast_sync_stop_height(); + + println!("{stop_height}"); + } } } } diff --git a/binaries/cuprated/src/config.rs b/binaries/cuprated/src/config.rs index 99758bfb..84bfabf7 100644 --- a/binaries/cuprated/src/config.rs +++ b/binaries/cuprated/src/config.rs @@ -83,6 +83,8 @@ pub struct Config { /// The network we should run on. network: Network, + pub no_fast_sync: bool, + /// [`tracing`] config. pub tracing: TracingConfig, diff --git a/binaries/cuprated/src/config/args.rs b/binaries/cuprated/src/config/args.rs index 58c144d7..3c27ec3e 100644 --- a/binaries/cuprated/src/config/args.rs +++ b/binaries/cuprated/src/config/args.rs @@ -20,6 +20,13 @@ pub struct Args { )] pub network: Network, + /// Disable fast sync, all past blocks will undergo full verification when syncing. + /// + /// This significantly increases initial sync time. This provides no extra security, you just + /// have to trust the devs to insert the correct hashes (which are verifiable). + #[arg(long)] + no_fast_sync: bool, + /// The amount of outbound clear-net connections to maintain. #[arg(long)] pub outbound_connections: Option<usize>, @@ -64,6 +71,7 @@ impl Args { /// This may exit the program if a config value was set that requires an early exit. pub const fn apply_args(&self, mut config: Config) -> Config { config.network = self.network; + config.no_fast_sync = config.no_fast_sync || self.no_fast_sync; if let Some(outbound_connections) = self.outbound_connections { config.p2p.clear_net.general.outbound_connections = outbound_connections; diff --git a/binaries/cuprated/src/main.rs b/binaries/cuprated/src/main.rs index 9f607f32..6ac9a674 100644 --- a/binaries/cuprated/src/main.rs +++ b/binaries/cuprated/src/main.rs @@ -55,6 +55,8 @@ fn main() { let config = config::read_config_and_args(); + blockchain::set_fast_sync_hashes(!config.no_fast_sync, config.network()); + // Initialize logging. logging::init_logging(&config); @@ -82,6 +84,15 @@ fn main() { // Initialize async tasks. rt.block_on(async move { + // TODO: we could add an option for people to keep these like monerod? + blockchain_write_handle + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(BlockchainWriteRequest::FlushAltBlocks) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR); + // Check add the genesis block to the blockchain. blockchain::check_add_genesis( &mut blockchain_read_handle, diff --git a/consensus/fast-sync/Cargo.toml b/consensus/fast-sync/Cargo.toml index 8e732a6f..4aa84e36 100644 --- a/consensus/fast-sync/Cargo.toml +++ b/consensus/fast-sync/Cargo.toml @@ -5,23 +5,21 @@ edition = "2021" license = "MIT" [[bin]] -name = "cuprate-fast-sync-create-hashes" +name = "create-fs-file" path = "src/create.rs" [dependencies] cuprate-blockchain = { workspace = true } cuprate-consensus = { workspace = true } -cuprate-consensus-rules = { workspace = true } cuprate-consensus-context = { workspace = true } cuprate-types = { workspace = true } -cuprate-helper = { workspace = true, features = ["cast"] } +cuprate-p2p = { workspace = true } +cuprate-p2p-core = { workspace = true } clap = { workspace = true, features = ["derive", "std"] } hex = { workspace = true } -hex-literal = { workspace = true } monero-serai = { workspace = true } -sha3 = { version = "0.10.8" } -thiserror = { workspace = true } +blake3 = { workspace = true } tokio = { workspace = true, features = ["full"] } tower = { workspace = true } diff --git a/consensus/fast-sync/src/create.rs b/consensus/fast-sync/src/create.rs index 9410f60a..349d6a29 100644 --- a/consensus/fast-sync/src/create.rs +++ b/consensus/fast-sync/src/create.rs @@ -3,7 +3,7 @@ reason = "binary shares same Cargo.toml as library" )] -use std::{fmt::Write, fs::write}; +use std::fs::write; use clap::Parser; use tower::{Service, ServiceExt}; @@ -16,48 +16,30 @@ use cuprate_types::{ Chain, }; -use cuprate_fast_sync::{hash_of_hashes, BlockId, HashOfHashes}; - -const BATCH_SIZE: usize = 512; +use cuprate_fast_sync::FAST_SYNC_BATCH_LEN; async fn read_batch( handle: &mut BlockchainReadHandle, height_from: usize, -) -> DbResult<Vec<BlockId>> { - let mut block_ids = Vec::<BlockId>::with_capacity(BATCH_SIZE); +) -> DbResult<Vec<[u8; 32]>> { + let request = BlockchainReadRequest::BlockHashInRange( + height_from..(height_from + FAST_SYNC_BATCH_LEN), + Chain::Main, + ); + let response_channel = handle.ready().await?.call(request); + let response = response_channel.await?; - for height in height_from..(height_from + BATCH_SIZE) { - let request = BlockchainReadRequest::BlockHash(height, Chain::Main); - let response_channel = handle.ready().await?.call(request); - let response = response_channel.await?; - - match response { - BlockchainResponse::BlockHash(block_id) => block_ids.push(block_id), - _ => unreachable!(), - } - } + let BlockchainResponse::BlockHashInRange(block_ids) = response else { + unreachable!() + }; Ok(block_ids) } -fn generate_hex(hashes: &[HashOfHashes]) -> String { - let mut s = String::new(); - - writeln!(&mut s, "[").unwrap(); - - for hash in hashes { - writeln!(&mut s, "\thex!(\"{}\"),", hex::encode(hash)).unwrap(); - } - - writeln!(&mut s, "]").unwrap(); - - s -} - #[derive(Parser)] #[command(version, about, long_about = None)] struct Args { - #[arg(short, long)] + #[arg(long)] height: usize, } @@ -74,7 +56,7 @@ async fn main() { let mut height = 0_usize; - while height < height_target { + while (height + FAST_SYNC_BATCH_LEN) < height_target { if let Ok(block_ids) = read_batch(&mut read_handle, height).await { let hash = hash_of_hashes(block_ids.as_slice()); hashes_of_hashes.push(hash); @@ -82,13 +64,22 @@ async fn main() { println!("Failed to read next batch from database"); break; } - height += BATCH_SIZE; + height += FAST_SYNC_BATCH_LEN; + + println!("height: {height}"); } drop(read_handle); - let generated = generate_hex(&hashes_of_hashes); - write("src/data/hashes_of_hashes", generated).expect("Could not write file"); + write( + "data/fast_sync_hashes.bin", + hashes_of_hashes.concat().as_slice(), + ) + .expect("Could not write file"); println!("Generated hashes up to block height {height}"); } + +pub fn hash_of_hashes(hashes: &[[u8; 32]]) -> [u8; 32] { + blake3::hash(hashes.concat().as_slice()).into() +} diff --git a/consensus/fast-sync/src/data/hashes_of_hashes b/consensus/fast-sync/src/data/hashes_of_hashes deleted file mode 100644 index 2e5e99aa..00000000 --- a/consensus/fast-sync/src/data/hashes_of_hashes +++ /dev/null @@ -1,12 +0,0 @@ -[ - hex_literal::hex!("1adffbaf832784406018009e07d3dc3a39da7edb6632523c119ed8acb32eb934"), - hex_literal::hex!("ae960265e3398d04f3cd4f949ed13c2689424887c71c1441a03d900a9d3a777f"), - hex_literal::hex!("938c72d267bbd3a17cdecbe02443d00012ee62d6e9f3524f5a914192110b1798"), - hex_literal::hex!("de0c82e51549b6514b42a591fd5440dddb5cc0118ec461459a99017bf06a0a0a"), - hex_literal::hex!("9a50f4586ec7e0fb58c6383048d3b334180235fd34bb714af20f1a3ebce4c911"), - hex_literal::hex!("5a3942f9bb318d65997bf57c40e045d62e7edbe35f3dae57499c2c5554896543"), - hex_literal::hex!("9dccee3b094cdd1b98e357c2c81bfcea798ea75efd94e67c6f5e86f428c5ec2c"), - hex_literal::hex!("620397540d44f21c3c57c20e9d47c6aaf0b1bf4302a4d43e75f2e33edd1a4032"), - hex_literal::hex!("ef6c612fb17bd70ac2ac69b2f85a421b138cc3a81daf622b077cb402dbf68377"), - hex_literal::hex!("6815ecb2bd73a3ba5f20558bfe1b714c30d6892b290e0d6f6cbf18237cedf75a"), -] diff --git a/consensus/fast-sync/src/fast_sync.rs b/consensus/fast-sync/src/fast_sync.rs index 6016bb0c..faacc2bd 100644 --- a/consensus/fast-sync/src/fast_sync.rs +++ b/consensus/fast-sync/src/fast_sync.rs @@ -1,225 +1,214 @@ use std::{ - cmp, - collections::HashMap, - future::Future, - pin::Pin, - task::{Context, Poll}, + cmp::min, + collections::{HashMap, VecDeque}, + sync::OnceLock, }; +use blake3::Hasher; use monero_serai::{ block::Block, transaction::{Input, Transaction}, }; -use tower::Service; +use tower::{Service, ServiceExt}; +use cuprate_blockchain::service::BlockchainReadHandle; use cuprate_consensus::transactions::new_tx_verification_data; -use cuprate_consensus_context::BlockchainContextService; -use cuprate_consensus_rules::{miner_tx::MinerTxError, ConsensusError}; -use cuprate_helper::cast::u64_to_usize; -use cuprate_types::{VerifiedBlockInformation, VerifiedTransactionInformation}; +use cuprate_consensus_context::BlockchainContext; +use cuprate_p2p::block_downloader::ChainEntry; +use cuprate_p2p_core::NetworkZone; +use cuprate_types::{ + blockchain::{BlockchainReadRequest, BlockchainResponse}, + Chain, VerifiedBlockInformation, VerifiedTransactionInformation, +}; -use crate::{hash_of_hashes, BlockId, HashOfHashes}; +/// A [`OnceLock`] representing the fast sync hashes. +static FAST_SYNC_HASHES: OnceLock<&[[u8; 32]]> = OnceLock::new(); -#[cfg(not(test))] -static HASHES_OF_HASHES: &[HashOfHashes] = &include!("./data/hashes_of_hashes"); +/// The size of a batch of block hashes to hash to create a fast sync hash. +pub const FAST_SYNC_BATCH_LEN: usize = 512; -#[cfg(not(test))] -const BATCH_SIZE: usize = 512; - -#[cfg(test)] -static HASHES_OF_HASHES: &[HashOfHashes] = &[ - hex_literal::hex!("3fdc9032c16d440f6c96be209c36d3d0e1aed61a2531490fe0ca475eb615c40a"), - hex_literal::hex!("0102030405060708010203040506070801020304050607080102030405060708"), - hex_literal::hex!("0102030405060708010203040506070801020304050607080102030405060708"), -]; - -#[cfg(test)] -const BATCH_SIZE: usize = 4; - -#[inline] -fn max_height() -> u64 { - (HASHES_OF_HASHES.len() * BATCH_SIZE) as u64 +/// Returns the height of the first block not included in the embedded hashes. +/// +/// # Panics +/// +/// This function will panic if [`set_fast_sync_hashes`] has not been called. +pub fn fast_sync_stop_height() -> usize { + FAST_SYNC_HASHES.get().unwrap().len() * FAST_SYNC_BATCH_LEN } -#[derive(Debug, PartialEq, Eq)] -pub struct ValidBlockId(BlockId); - -fn valid_block_ids(block_ids: &[BlockId]) -> Vec<ValidBlockId> { - block_ids.iter().map(|b| ValidBlockId(*b)).collect() +/// Sets the hashes to use for fast-sync. +/// +/// # Panics +/// +/// This will panic if this is called more than once. +pub fn set_fast_sync_hashes(hashes: &'static [[u8; 32]]) { + FAST_SYNC_HASHES.set(hashes).unwrap(); } -#[expect(clippy::large_enum_variant)] -pub enum FastSyncRequest { - ValidateHashes { - start_height: u64, - block_ids: Vec<BlockId>, - }, - ValidateBlock { - block: Block, - txs: HashMap<[u8; 32], Transaction>, - token: ValidBlockId, - }, -} - -#[expect(clippy::large_enum_variant)] -#[derive(Debug, PartialEq, Eq)] -pub enum FastSyncResponse { - ValidateHashes { - validated_hashes: Vec<ValidBlockId>, - unknown_hashes: Vec<BlockId>, - }, - ValidateBlock(VerifiedBlockInformation), -} - -#[derive(thiserror::Error, Debug, PartialEq, Eq)] -pub enum FastSyncError { - #[error("Block does not match its expected hash")] - BlockHashMismatch, - - #[error("Start height must be a multiple of the batch size")] - InvalidStartHeight, - - #[error("Hash of hashes mismatch")] - Mismatch, - - #[error("Given range too small for fast sync (less than one batch)")] - NothingToDo, - - #[error("Start height too high for fast sync")] - OutOfRange, - - #[error("Block does not have the expected height entry")] - BlockHeightMismatch, - - #[error("Block does not contain the expected transaction list")] - TxsIncludedWithBlockIncorrect, - - #[error(transparent)] - Consensus(#[from] ConsensusError), - - #[error(transparent)] - MinerTx(#[from] MinerTxError), - - #[error("Database error: {0}")] - DbErr(String), -} - -impl From<tower::BoxError> for FastSyncError { - fn from(error: tower::BoxError) -> Self { - Self::DbErr(error.to_string()) - } -} - -pub struct FastSyncService { - context_svc: BlockchainContextService, -} - -impl FastSyncService { - #[expect(dead_code)] - pub(crate) const fn new(context_svc: BlockchainContextService) -> Self { - Self { context_svc } - } -} - -impl Service<FastSyncRequest> for FastSyncService { - type Response = FastSyncResponse; - type Error = FastSyncError; - type Future = - Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>; - - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - Poll::Ready(Ok(())) +/// Validates that the given [`ChainEntry`]s are in the fast-sync hashes. +/// +/// `entries` should be a list of sequential entries. +/// `start_height` should be the height of the first block in the first entry. +/// +/// Returns a tuple, the first element being the entries that are valid* the second +/// the entries we do not know are valid and should be passed in again when we have more entries. +/// +/// *once we are passed the fast sync blocks all entries will be returned as valid as +/// we can not check their validity here. +/// +/// There may be more entries returned than passed in as entries could be split. +/// +/// # Panics +/// +/// This will panic if [`set_fast_sync_hashes`] has not been called. +pub async fn validate_entries<N: NetworkZone>( + mut entries: VecDeque<ChainEntry<N>>, + start_height: usize, + blockchain_read_handle: &mut BlockchainReadHandle, +) -> Result<(VecDeque<ChainEntry<N>>, VecDeque<ChainEntry<N>>), tower::BoxError> { + // if we are past the top fast sync block return all entries as valid. + if start_height >= fast_sync_stop_height() { + return Ok((entries, VecDeque::new())); } - fn call(&mut self, req: FastSyncRequest) -> Self::Future { - let mut context_svc = self.context_svc.clone(); + /* + The algorithm used here needs to preserve which peer told us about which blocks, so we cannot + simply join all the hashes together return all the ones that can be validated and the ones that + can't, we need to keep the batches separate. - Box::pin(async move { - match req { - FastSyncRequest::ValidateHashes { - start_height, - block_ids, - } => validate_hashes(start_height, &block_ids), - FastSyncRequest::ValidateBlock { block, txs, token } => { - validate_block(&mut context_svc, block, txs, &token) - } - } - }) - } -} + The first step is to calculate how many hashes we need from the blockchain to make up the first + fast-sync hash. -fn validate_hashes( - start_height: u64, - block_ids: &[BlockId], -) -> Result<FastSyncResponse, FastSyncError> { - let start_height_usize = u64_to_usize(start_height); + Then will take out all the batches at the end for which we cannot make up a full fast-sync hash + for, we will split a batch if it can only be partially validated. - if start_height_usize % BATCH_SIZE != 0 { - return Err(FastSyncError::InvalidStartHeight); - } + With the remaining hashes from the blockchain and the hashes in the batches we can validate we + work on calculating the fast sync hashes and comparing them to the ones in [`FAST_SYNC_HASHES`]. + */ - if start_height >= max_height() { - return Err(FastSyncError::OutOfRange); - } + // First calculate the start and stop for this range of hashes. + let hashes_start_height = (start_height / FAST_SYNC_BATCH_LEN) * FAST_SYNC_BATCH_LEN; + let amount_of_hashes = entries.iter().map(|e| e.ids.len()).sum::<usize>(); + let last_height = amount_of_hashes + start_height; - let stop_height = start_height_usize + block_ids.len(); + let hashes_stop_height = min( + (last_height / FAST_SYNC_BATCH_LEN) * FAST_SYNC_BATCH_LEN, + fast_sync_stop_height(), + ); - let batch_from = start_height_usize / BATCH_SIZE; - let batch_to = cmp::min(stop_height / BATCH_SIZE, HASHES_OF_HASHES.len()); - let n_batches = batch_to - batch_from; + let mut hashes_stop_diff_last_height = last_height - hashes_stop_height; - if n_batches == 0 { - return Err(FastSyncError::NothingToDo); - } + let mut unknown = VecDeque::new(); - for i in 0..n_batches { - let batch = &block_ids[BATCH_SIZE * i..BATCH_SIZE * (i + 1)]; - let actual = hash_of_hashes(batch); - let expected = HASHES_OF_HASHES[batch_from + i]; + // start moving from the back of the batches taking enough hashes out so we are only left with hashes + // that can be verified. + while !entries.is_empty() && hashes_stop_diff_last_height != 0 { + let back = entries.back_mut().unwrap(); - if expected != actual { - return Err(FastSyncError::Mismatch); + if back.ids.len() >= hashes_stop_diff_last_height { + // This batch is partially valid so split it. + unknown.push_front(ChainEntry { + ids: back + .ids + .drain((back.ids.len() - hashes_stop_diff_last_height)..) + .collect(), + peer: back.peer, + handle: back.handle.clone(), + }); + + break; } + + // Add this batch to the front of the unknowns, we do not know its validity. + let back = entries.pop_back().unwrap(); + hashes_stop_diff_last_height -= back.ids.len(); + unknown.push_front(back); } - let validated_hashes = valid_block_ids(&block_ids[..n_batches * BATCH_SIZE]); - let unknown_hashes = block_ids[n_batches * BATCH_SIZE..].to_vec(); + // get the hashes we are missing to create the first fast-sync hash. + let BlockchainResponse::BlockHashInRange(hashes) = blockchain_read_handle + .ready() + .await? + .call(BlockchainReadRequest::BlockHashInRange( + hashes_start_height..start_height, + Chain::Main, + )) + .await? + else { + unreachable!() + }; - Ok(FastSyncResponse::ValidateHashes { - validated_hashes, - unknown_hashes, - }) + // Start verifying the hashes. + let mut hasher = Hasher::default(); + let mut last_i = 1; + for (i, hash) in hashes + .iter() + .chain(entries.iter().flat_map(|e| e.ids.iter())) + .enumerate() + { + hasher.update(hash); + + if (i + 1) % FAST_SYNC_BATCH_LEN == 0 { + let got_hash = hasher.finalize(); + + if got_hash + != FAST_SYNC_HASHES.get().unwrap() + [get_hash_index_for_height(hashes_start_height + i)] + { + return Err("Hashes do not match".into()); + } + hasher.reset(); + } + + last_i = i + 1; + } + // Make sure we actually checked all hashes. + assert_eq!(last_i % FAST_SYNC_BATCH_LEN, 0); + + Ok((entries, unknown)) } -fn validate_block( - context_svc: &mut BlockchainContextService, - block: Block, - mut txs: HashMap<[u8; 32], Transaction>, - token: &ValidBlockId, -) -> Result<FastSyncResponse, FastSyncError> { - let block_chain_ctx = context_svc.blockchain_context().clone(); +/// Get the index of the hash that contains this block in the fast sync hashes. +const fn get_hash_index_for_height(height: usize) -> usize { + height / FAST_SYNC_BATCH_LEN +} +/// Creates a [`VerifiedBlockInformation`] from a block known to be valid. +/// +/// # Panics +/// +/// This may panic if used on an invalid block. +pub fn block_to_verified_block_information( + block: Block, + txs: Vec<Transaction>, + blockchin_ctx: &BlockchainContext, +) -> VerifiedBlockInformation { let block_hash = block.hash(); - if block_hash != token.0 { - return Err(FastSyncError::BlockHashMismatch); - } let block_blob = block.serialize(); let Some(Input::Gen(height)) = block.miner_transaction.prefix().inputs.first() else { - return Err(FastSyncError::MinerTx(MinerTxError::InputNotOfTypeGen)); + panic!("fast sync block invalid"); }; - if *height != block_chain_ctx.chain_height { - return Err(FastSyncError::BlockHeightMismatch); - } + + assert_eq!( + *height, blockchin_ctx.chain_height, + "fast sync block invalid" + ); + + let mut txs = txs + .into_iter() + .map(|tx| { + let data = new_tx_verification_data(tx).expect("fast sync block invalid"); + + (data.tx_hash, data) + }) + .collect::<HashMap<_, _>>(); let mut verified_txs = Vec::with_capacity(txs.len()); for tx in &block.transactions { - let tx = txs - .remove(tx) - .ok_or(FastSyncError::TxsIncludedWithBlockIncorrect)?; + let data = txs.remove(tx).expect("fast sync block invalid"); - let data = new_tx_verification_data(tx)?; verified_txs.push(VerifiedTransactionInformation { tx_blob: data.tx_blob, tx_weight: data.tx_weight, @@ -243,68 +232,16 @@ fn validate_block( let weight = block.miner_transaction.weight() + verified_txs.iter().map(|tx| tx.tx_weight).sum::<usize>(); - Ok(FastSyncResponse::ValidateBlock(VerifiedBlockInformation { + VerifiedBlockInformation { block_blob, txs: verified_txs, block_hash, - pow_hash: [0_u8; 32], + pow_hash: [u8::MAX; 32], height: *height, generated_coins, weight, - long_term_weight: block_chain_ctx.next_block_long_term_weight(weight), - cumulative_difficulty: block_chain_ctx.cumulative_difficulty - + block_chain_ctx.next_difficulty, + long_term_weight: blockchin_ctx.next_block_long_term_weight(weight), + cumulative_difficulty: blockchin_ctx.cumulative_difficulty + blockchin_ctx.next_difficulty, block, - })) -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_validate_hashes_errors() { - let ids = [[1_u8; 32], [2_u8; 32], [3_u8; 32], [4_u8; 32], [5_u8; 32]]; - assert_eq!( - validate_hashes(3, &[]), - Err(FastSyncError::InvalidStartHeight) - ); - assert_eq!( - validate_hashes(3, &ids), - Err(FastSyncError::InvalidStartHeight) - ); - - assert_eq!(validate_hashes(20, &[]), Err(FastSyncError::OutOfRange)); - assert_eq!(validate_hashes(20, &ids), Err(FastSyncError::OutOfRange)); - - assert_eq!(validate_hashes(4, &[]), Err(FastSyncError::NothingToDo)); - assert_eq!( - validate_hashes(4, &ids[..3]), - Err(FastSyncError::NothingToDo) - ); - } - - #[test] - fn test_validate_hashes_success() { - let ids = [[1_u8; 32], [2_u8; 32], [3_u8; 32], [4_u8; 32], [5_u8; 32]]; - let validated_hashes = valid_block_ids(&ids[0..4]); - let unknown_hashes = ids[4..].to_vec(); - assert_eq!( - validate_hashes(0, &ids), - Ok(FastSyncResponse::ValidateHashes { - validated_hashes, - unknown_hashes - }) - ); - } - - #[test] - fn test_validate_hashes_mismatch() { - let ids = [ - [1_u8; 32], [2_u8; 32], [3_u8; 32], [5_u8; 32], [1_u8; 32], [2_u8; 32], [3_u8; 32], - [4_u8; 32], - ]; - assert_eq!(validate_hashes(0, &ids), Err(FastSyncError::Mismatch)); - assert_eq!(validate_hashes(4, &ids), Err(FastSyncError::Mismatch)); } } diff --git a/consensus/fast-sync/src/lib.rs b/consensus/fast-sync/src/lib.rs index 8dbdc649..c42ff4ca 100644 --- a/consensus/fast-sync/src/lib.rs +++ b/consensus/fast-sync/src/lib.rs @@ -4,7 +4,9 @@ use cuprate_blockchain as _; use hex as _; use tokio as _; -pub mod fast_sync; -pub mod util; +mod fast_sync; -pub use util::{hash_of_hashes, BlockId, HashOfHashes}; +pub use fast_sync::{ + block_to_verified_block_information, fast_sync_stop_height, set_fast_sync_hashes, + validate_entries, FAST_SYNC_BATCH_LEN, +}; diff --git a/consensus/fast-sync/src/util.rs b/consensus/fast-sync/src/util.rs deleted file mode 100644 index f8460f63..00000000 --- a/consensus/fast-sync/src/util.rs +++ /dev/null @@ -1,8 +0,0 @@ -use sha3::{Digest, Keccak256}; - -pub type BlockId = [u8; 32]; -pub type HashOfHashes = [u8; 32]; - -pub fn hash_of_hashes(hashes: &[BlockId]) -> HashOfHashes { - Keccak256::digest(hashes.concat().as_slice()).into() -} diff --git a/p2p/p2p/src/block_downloader.rs b/p2p/p2p/src/block_downloader.rs index db103000..ddfc2beb 100644 --- a/p2p/p2p/src/block_downloader.rs +++ b/p2p/p2p/src/block_downloader.rs @@ -7,7 +7,7 @@ //! The block downloader is started by [`download_blocks`]. use std::{ cmp::{max, min, Reverse}, - collections::{BTreeMap, BinaryHeap}, + collections::{BTreeMap, BinaryHeap, VecDeque}, time::Duration, }; @@ -28,9 +28,9 @@ use cuprate_pruning::PruningSeed; use crate::{ constants::{ BLOCK_DOWNLOADER_REQUEST_TIMEOUT, EMPTY_CHAIN_ENTRIES_BEFORE_TOP_ASSUMED, LONG_BAN, - MAX_BLOCK_BATCH_LEN, MAX_DOWNLOAD_FAILURES, + MAX_BLOCK_BATCH_LEN, MAX_DOWNLOAD_FAILURES, MOST_RECENT_BATCH_WEIGHTS_FOR_BATCH_SIZE, }, - peer_set::ClientDropGuard, + peer_set::{ClientDropGuard, PeerSetRequest, PeerSetResponse}, }; mod block_queue; @@ -40,9 +40,9 @@ mod request_chain; #[cfg(test)] mod tests; -use crate::peer_set::{PeerSetRequest, PeerSetResponse}; use block_queue::{BlockQueue, ReadyQueueBatch}; -use chain_tracker::{BlocksToRetrieve, ChainEntry, ChainTracker}; +pub use chain_tracker::ChainEntry; +use chain_tracker::{BlocksToRetrieve, ChainTracker}; use download_batch::download_batch_task; use request_chain::{initial_chain_search, request_chain_entry_from_peer}; @@ -95,17 +95,19 @@ pub(crate) enum BlockDownloadError { } /// The request type for the chain service. -pub enum ChainSvcRequest { +pub enum ChainSvcRequest<N: NetworkZone> { /// A request for the current chain history. CompactHistory, /// A request to find the first unknown block ID in a list of block IDs. FindFirstUnknown(Vec<[u8; 32]>), /// A request for our current cumulative difficulty. CumulativeDifficulty, + + ValidateEntries(VecDeque<ChainEntry<N>>, usize), } /// The response type for the chain service. -pub enum ChainSvcResponse { +pub enum ChainSvcResponse<N: NetworkZone> { /// The response for [`ChainSvcRequest::CompactHistory`]. CompactHistory { /// A list of blocks IDs in our chain, starting with the most recent block, all the way to the genesis block. @@ -123,6 +125,11 @@ pub enum ChainSvcResponse { /// /// The current cumulative difficulty of our chain. CumulativeDifficulty(u128), + + ValidateEntries { + valid: VecDeque<ChainEntry<N>>, + unknown: VecDeque<ChainEntry<N>>, + }, } /// This function starts the block downloader and returns a [`BufferStream`] that will produce @@ -140,7 +147,7 @@ pub fn download_blocks<N: NetworkZone, C>( config: BlockDownloaderConfig, ) -> BufferStream<BlockBatch> where - C: Service<ChainSvcRequest, Response = ChainSvcResponse, Error = tower::BoxError> + C: Service<ChainSvcRequest<N>, Response = ChainSvcResponse<N>, Error = tower::BoxError> + Send + 'static, C::Future: Send + 'static, @@ -191,10 +198,7 @@ struct BlockDownloader<N: NetworkZone, C> { /// The service that holds our current chain state. our_chain_svc: C, - /// The amount of blocks to request in the next batch. - amount_of_blocks_to_request: usize, - /// The height at which [`Self::amount_of_blocks_to_request`] was updated. - amount_of_blocks_to_request_updated_at: usize, + most_recent_batch_sizes: BinaryHeap<Reverse<(usize, BatchSizeInformation)>>, /// The amount of consecutive empty chain entries we received. /// @@ -227,7 +231,7 @@ struct BlockDownloader<N: NetworkZone, C> { impl<N: NetworkZone, C> BlockDownloader<N, C> where - C: Service<ChainSvcRequest, Response = ChainSvcResponse, Error = tower::BoxError> + C: Service<ChainSvcRequest<N>, Response = ChainSvcResponse<N>, Error = tower::BoxError> + Send + 'static, C::Future: Send + 'static, @@ -242,8 +246,7 @@ where Self { peer_set, our_chain_svc, - amount_of_blocks_to_request: config.initial_batch_len, - amount_of_blocks_to_request_updated_at: 0, + most_recent_batch_sizes: BinaryHeap::new(), amount_of_empty_chain_entries: 0, block_download_tasks: JoinSet::new(), chain_entry_task: JoinSet::new(), @@ -280,6 +283,28 @@ where } } + fn amount_of_blocks_to_request(&self) -> usize { + let biggest_batch = self + .most_recent_batch_sizes + .iter() + .max_by(|batch_1, batch_2| { + let av1 = batch_1.0 .1.byte_size / batch_1.0 .1.len; + let av2 = batch_2.0 .1.byte_size / batch_2.0 .1.len; + + av1.cmp(&av2) + }); + + let Some(biggest_batch) = biggest_batch else { + return self.config.initial_batch_len; + }; + + calculate_next_block_batch_size( + biggest_batch.0 .1.byte_size, + biggest_batch.0 .1.len, + self.config.target_batch_bytes, + ) + } + /// Attempts to send another request for an inflight batch /// /// This function will find the batch(es) that we are waiting on to clear our ready queue and sends another request @@ -387,9 +412,10 @@ where // No failed requests that we can handle, request some new blocks. - let Some(mut block_entry_to_get) = chain_tracker - .blocks_to_get(&client.info.pruning_seed, self.amount_of_blocks_to_request) - else { + let Some(mut block_entry_to_get) = chain_tracker.blocks_to_get( + &client.info.pruning_seed, + self.amount_of_blocks_to_request(), + ) else { return Some(client); }; @@ -428,7 +454,7 @@ where && self.amount_of_empty_chain_entries <= EMPTY_CHAIN_ENTRIES_BEFORE_TOP_ASSUMED // Check we have a big buffer of pending block IDs to retrieve, we don't want to be waiting around // for a chain entry. - && chain_tracker.block_requests_queued(self.amount_of_blocks_to_request) < 500 + && chain_tracker.block_requests_queued(self.amount_of_blocks_to_request()) < 500 // Make sure this peer actually has the chain. && chain_tracker.should_ask_for_next_chain_entry(&client.info.pruning_seed) { @@ -561,19 +587,25 @@ where // If the batch is higher than the last time we updated `amount_of_blocks_to_request`, update it // again. - if start_height > self.amount_of_blocks_to_request_updated_at { - self.amount_of_blocks_to_request = calculate_next_block_batch_size( - block_batch.size, - block_batch.blocks.len(), - self.config.target_batch_bytes, - ); + if start_height + > self + .most_recent_batch_sizes + .peek() + .map(|Reverse((height, _))| *height) + .unwrap_or_default() + { + self.most_recent_batch_sizes.push(Reverse(( + start_height, + BatchSizeInformation { + len: block_batch.blocks.len(), + byte_size: block_batch.size, + }, + ))); - tracing::debug!( - "Updating batch size of new batches, new size: {}", - self.amount_of_blocks_to_request - ); - - self.amount_of_blocks_to_request_updated_at = start_height; + if self.most_recent_batch_sizes.len() > MOST_RECENT_BATCH_WEIGHTS_FOR_BATCH_SIZE + { + self.most_recent_batch_sizes.pop(); + } } self.block_queue @@ -642,12 +674,15 @@ where Some(Ok(res)) = self.chain_entry_task.join_next() => { match res { Ok((client, entry)) => { - if chain_tracker.add_entry(entry).is_ok() { - tracing::debug!("Successfully added chain entry to chain tracker."); - self.amount_of_empty_chain_entries = 0; - } else { - tracing::debug!("Failed to add incoming chain entry to chain tracker."); - self.amount_of_empty_chain_entries += 1; + match chain_tracker.add_entry(entry, &mut self.our_chain_svc).await { + Ok(()) => { + tracing::debug!("Successfully added chain entry to chain tracker."); + self.amount_of_empty_chain_entries = 0; + } + Err(e) => { + tracing::debug!("Failed to add incoming chain entry to chain tracker: {e:?}"); + self.amount_of_empty_chain_entries += 1; + } } pending_peers @@ -683,6 +718,12 @@ const fn client_has_block_in_range( && pruning_seed.has_full_block(start_height + length, MAX_BLOCK_HEIGHT_USIZE) } +#[derive(Debug, Ord, PartialOrd, Eq, PartialEq)] +struct BatchSizeInformation { + len: usize, + byte_size: usize, +} + /// Calculates the next amount of blocks to request in a batch. /// /// Parameters: diff --git a/p2p/p2p/src/block_downloader/chain_tracker.rs b/p2p/p2p/src/block_downloader/chain_tracker.rs index df5aebb5..33f294f0 100644 --- a/p2p/p2p/src/block_downloader/chain_tracker.rs +++ b/p2p/p2p/src/block_downloader/chain_tracker.rs @@ -1,16 +1,20 @@ -use std::{cmp::min, collections::VecDeque}; +use std::{cmp::min, collections::VecDeque, mem}; use cuprate_fixed_bytes::ByteArrayVec; +use tower::{Service, ServiceExt}; use cuprate_constants::block::MAX_BLOCK_HEIGHT_USIZE; use cuprate_p2p_core::{client::InternalPeerID, handles::ConnectionHandle, NetworkZone}; use cuprate_pruning::PruningSeed; -use crate::constants::MEDIUM_BAN; +use crate::{ + block_downloader::{ChainSvcRequest, ChainSvcResponse}, + constants::MEDIUM_BAN, +}; /// A new chain entry to add to our chain tracker. #[derive(Debug)] -pub(crate) struct ChainEntry<N: NetworkZone> { +pub struct ChainEntry<N: NetworkZone> { /// A list of block IDs. pub ids: Vec<[u8; 32]>, /// The peer who told us about this chain entry. @@ -39,12 +43,15 @@ pub(crate) struct BlocksToRetrieve<N: NetworkZone> { } /// An error returned from the [`ChainTracker`]. -#[derive(Debug, Clone)] +#[derive(Debug)] pub(crate) enum ChainTrackerError { /// The new chain entry is invalid. NewEntryIsInvalid, + NewEntryIsEmpty, /// The new chain entry does not follow from the top of our chain tracker. NewEntryDoesNotFollowChain, + #[expect(dead_code)] // This is used for logging + ChainSvcError(tower::BoxError), } /// # Chain Tracker @@ -52,8 +59,10 @@ pub(crate) enum ChainTrackerError { /// This struct allows following a single chain. It takes in [`ChainEntry`]s and /// allows getting [`BlocksToRetrieve`]. pub(crate) struct ChainTracker<N: NetworkZone> { - /// A list of [`ChainEntry`]s, in order. - entries: VecDeque<ChainEntry<N>>, + /// A list of [`ChainEntry`]s, in order, that we should request. + valid_entries: VecDeque<ChainEntry<N>>, + /// A list of [`ChainEntry`]s that are pending more [`ChainEntry`]s to check validity. + unknown_entries: VecDeque<ChainEntry<N>>, /// The height of the first block, in the first entry in [`Self::entries`]. first_height: usize, /// The hash of the last block in the last entry. @@ -66,23 +75,39 @@ pub(crate) struct ChainTracker<N: NetworkZone> { impl<N: NetworkZone> ChainTracker<N> { /// Creates a new chain tracker. - pub(crate) fn new( + pub(crate) async fn new<C>( new_entry: ChainEntry<N>, first_height: usize, our_genesis: [u8; 32], previous_hash: [u8; 32], - ) -> Self { + our_chain_svc: &mut C, + ) -> Result<Self, ChainTrackerError> + where + C: Service<ChainSvcRequest<N>, Response = ChainSvcResponse<N>, Error = tower::BoxError>, + { let top_seen_hash = *new_entry.ids.last().unwrap(); let mut entries = VecDeque::with_capacity(1); entries.push_back(new_entry); - Self { - entries, + let ChainSvcResponse::ValidateEntries { valid, unknown } = our_chain_svc + .ready() + .await + .map_err(ChainTrackerError::ChainSvcError)? + .call(ChainSvcRequest::ValidateEntries(entries, first_height)) + .await + .map_err(ChainTrackerError::ChainSvcError)? + else { + unreachable!() + }; + + Ok(Self { + valid_entries: valid, + unknown_entries: unknown, first_height, top_seen_hash, previous_hash, our_genesis, - } + }) } /// Returns `true` if the peer is expected to have the next block after our highest seen block @@ -99,8 +124,9 @@ impl<N: NetworkZone> ChainTracker<N> { /// Returns the height of the highest block we are tracking. pub(crate) fn top_height(&self) -> usize { let top_block_idx = self - .entries + .valid_entries .iter() + .chain(self.unknown_entries.iter()) .map(|entry| entry.ids.len()) .sum::<usize>(); @@ -112,32 +138,32 @@ impl<N: NetworkZone> ChainTracker<N> { /// # Panics /// This function panics if `batch_size` is `0`. pub(crate) fn block_requests_queued(&self, batch_size: usize) -> usize { - self.entries + self.valid_entries .iter() .map(|entry| entry.ids.len().div_ceil(batch_size)) .sum() } /// Attempts to add an incoming [`ChainEntry`] to the chain tracker. - pub(crate) fn add_entry( + pub(crate) async fn add_entry<C>( &mut self, mut chain_entry: ChainEntry<N>, - ) -> Result<(), ChainTrackerError> { - if chain_entry.ids.is_empty() { + our_chain_svc: &mut C, + ) -> Result<(), ChainTrackerError> + where + C: Service<ChainSvcRequest<N>, Response = ChainSvcResponse<N>, Error = tower::BoxError>, + { + if chain_entry.ids.len() == 1 { + return Err(ChainTrackerError::NewEntryIsEmpty); + } + + let Some(first) = chain_entry.ids.first() else { // The peer must send at lest one overlapping block. chain_entry.handle.ban_peer(MEDIUM_BAN); return Err(ChainTrackerError::NewEntryIsInvalid); - } + }; - if chain_entry.ids.len() == 1 { - return Err(ChainTrackerError::NewEntryDoesNotFollowChain); - } - - if self - .entries - .back() - .is_some_and(|last_entry| last_entry.ids.last().unwrap() != &chain_entry.ids[0]) - { + if *first != self.top_seen_hash { return Err(ChainTrackerError::NewEntryDoesNotFollowChain); } @@ -150,7 +176,29 @@ impl<N: NetworkZone> ChainTracker<N> { self.top_seen_hash = *new_entry.ids.last().unwrap(); - self.entries.push_back(new_entry); + self.unknown_entries.push_back(new_entry); + + let ChainSvcResponse::ValidateEntries { mut valid, unknown } = our_chain_svc + .ready() + .await + .map_err(ChainTrackerError::ChainSvcError)? + .call(ChainSvcRequest::ValidateEntries( + mem::take(&mut self.unknown_entries), + self.first_height + + self + .valid_entries + .iter() + .map(|e| e.ids.len()) + .sum::<usize>(), + )) + .await + .map_err(ChainTrackerError::ChainSvcError)? + else { + unreachable!() + }; + + self.valid_entries.append(&mut valid); + self.unknown_entries = unknown; Ok(()) } @@ -167,7 +215,7 @@ impl<N: NetworkZone> ChainTracker<N> { return None; } - let entry = self.entries.front_mut()?; + let entry = self.valid_entries.front_mut()?; // Calculate the ending index for us to get in this batch, it will be one of these: // - smallest out of `max_blocks` @@ -204,7 +252,7 @@ impl<N: NetworkZone> ChainTracker<N> { self.previous_hash = blocks.ids[blocks.ids.len() - 1]; if entry.ids.is_empty() { - self.entries.pop_front(); + self.valid_entries.pop_front(); } Some(blocks) diff --git a/p2p/p2p/src/block_downloader/request_chain.rs b/p2p/p2p/src/block_downloader/request_chain.rs index 4e0f855b..9adbaaa8 100644 --- a/p2p/p2p/src/block_downloader/request_chain.rs +++ b/p2p/p2p/src/block_downloader/request_chain.rs @@ -84,7 +84,7 @@ pub async fn initial_chain_search<N: NetworkZone, C>( mut our_chain_svc: C, ) -> Result<ChainTracker<N>, BlockDownloadError> where - C: Service<ChainSvcRequest, Response = ChainSvcResponse, Error = tower::BoxError>, + C: Service<ChainSvcRequest<N>, Response = ChainSvcResponse<N>, Error = tower::BoxError>, { tracing::debug!("Getting our chain history"); // Get our history. @@ -214,7 +214,15 @@ where first_entry.ids.len() ); - let tracker = ChainTracker::new(first_entry, expected_height, our_genesis, previous_id); + let tracker = ChainTracker::new( + first_entry, + expected_height, + our_genesis, + previous_id, + &mut our_chain_svc, + ) + .await + .map_err(|_| BlockDownloadError::ChainInvalid)?; Ok(tracker) } diff --git a/p2p/p2p/src/block_downloader/tests.rs b/p2p/p2p/src/block_downloader/tests.rs index 707b858d..3d5808ed 100644 --- a/p2p/p2p/src/block_downloader/tests.rs +++ b/p2p/p2p/src/block_downloader/tests.rs @@ -1,4 +1,5 @@ use std::{ + collections::VecDeque, fmt::{Debug, Formatter}, future::Future, pin::Pin, @@ -269,8 +270,8 @@ struct OurChainSvc { genesis: [u8; 32], } -impl Service<ChainSvcRequest> for OurChainSvc { - type Response = ChainSvcResponse; +impl Service<ChainSvcRequest<ClearNet>> for OurChainSvc { + type Response = ChainSvcResponse<ClearNet>; type Error = tower::BoxError; type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>; @@ -279,7 +280,7 @@ impl Service<ChainSvcRequest> for OurChainSvc { Poll::Ready(Ok(())) } - fn call(&mut self, req: ChainSvcRequest) -> Self::Future { + fn call(&mut self, req: ChainSvcRequest<ClearNet>) -> Self::Future { let genesis = self.genesis; async move { @@ -292,6 +293,10 @@ impl Service<ChainSvcRequest> for OurChainSvc { ChainSvcResponse::FindFirstUnknown(Some((1, 1))) } ChainSvcRequest::CumulativeDifficulty => ChainSvcResponse::CumulativeDifficulty(1), + ChainSvcRequest::ValidateEntries(valid, _) => ChainSvcResponse::ValidateEntries { + valid, + unknown: VecDeque::new(), + }, }) } .boxed() diff --git a/p2p/p2p/src/constants.rs b/p2p/p2p/src/constants.rs index a81557c2..e37089f5 100644 --- a/p2p/p2p/src/constants.rs +++ b/p2p/p2p/src/constants.rs @@ -75,6 +75,9 @@ pub(crate) const MAX_DOWNLOAD_FAILURES: usize = 5; /// The amount of empty chain entries to receive before we assume we have found the top of the chain. pub(crate) const EMPTY_CHAIN_ENTRIES_BEFORE_TOP_ASSUMED: usize = 5; +/// The amount of most recent block batches we use to calculate batch size. +pub(crate) const MOST_RECENT_BATCH_WEIGHTS_FOR_BATCH_SIZE: usize = 100; + #[cfg(test)] mod tests { use super::*; diff --git a/p2p/p2p/src/lib.rs b/p2p/p2p/src/lib.rs index fb506582..e5bfc2cc 100644 --- a/p2p/p2p/src/lib.rs +++ b/p2p/p2p/src/lib.rs @@ -168,7 +168,7 @@ impl<N: NetworkZone> NetworkInterface<N> { config: BlockDownloaderConfig, ) -> BufferStream<BlockBatch> where - C: Service<ChainSvcRequest, Response = ChainSvcResponse, Error = tower::BoxError> + C: Service<ChainSvcRequest<N>, Response = ChainSvcResponse<N>, Error = tower::BoxError> + Send + 'static, C::Future: Send + 'static, diff --git a/storage/blockchain/src/service/read.rs b/storage/blockchain/src/service/read.rs index 9d88e66e..c922465c 100644 --- a/storage/blockchain/src/service/read.rs +++ b/storage/blockchain/src/service/read.rs @@ -12,6 +12,7 @@ use std::{ cmp::min, collections::{HashMap, HashSet}, + ops::Range, sync::Arc, }; @@ -107,6 +108,7 @@ fn map_request( R::BlockCompleteEntries(block_hashes) => block_complete_entries(env, block_hashes), R::BlockExtendedHeader(block) => block_extended_header(env, block), R::BlockHash(block, chain) => block_hash(env, block, chain), + R::BlockHashInRange(blocks, chain) => block_hash_in_range(env, blocks, chain), R::FindBlock(block_hash) => find_block(env, block_hash), R::FilterUnknownHashes(hashes) => filter_unknown_hashes(env, hashes), R::BlockExtendedHeaderInRange(range, chain) => { @@ -271,6 +273,34 @@ fn block_hash(env: &ConcreteEnv, block_height: BlockHeight, chain: Chain) -> Res Ok(BlockchainResponse::BlockHash(block_hash)) } +/// [`BlockchainReadRequest::BlockHashInRange`]. +#[inline] +fn block_hash_in_range(env: &ConcreteEnv, range: Range<usize>, chain: Chain) -> ResponseResult { + // Prepare tx/tables in `ThreadLocal`. + let env_inner = env.env_inner(); + let tx_ro = thread_local(env); + + let block_hash = range + .into_par_iter() + .map(|block_height| { + let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?; + + let table_block_infos = env_inner.open_db_ro::<BlockInfos>(tx_ro)?; + + let block_hash = match chain { + Chain::Main => get_block_info(&block_height, &table_block_infos)?.block_hash, + Chain::Alt(chain) => { + get_alt_block_hash(&block_height, chain, &env_inner.open_tables(tx_ro)?)? + } + }; + + Ok(block_hash) + }) + .collect::<Result<_, RuntimeError>>()?; + + Ok(BlockchainResponse::BlockHashInRange(block_hash)) +} + /// [`BlockchainReadRequest::FindBlock`] fn find_block(env: &ConcreteEnv, block_hash: BlockHash) -> ResponseResult { // Single-threaded, no `ThreadLocal` required. @@ -330,7 +360,7 @@ fn filter_unknown_hashes(env: &ConcreteEnv, mut hashes: HashSet<BlockHash>) -> R #[inline] fn block_extended_header_in_range( env: &ConcreteEnv, - range: std::ops::Range<BlockHeight>, + range: Range<BlockHeight>, chain: Chain, ) -> ResponseResult { // Prepare tx/tables in `ThreadLocal`. diff --git a/storage/blockchain/src/service/write.rs b/storage/blockchain/src/service/write.rs index 84c2538f..c1e857c4 100644 --- a/storage/blockchain/src/service/write.rs +++ b/storage/blockchain/src/service/write.rs @@ -39,6 +39,7 @@ fn handle_blockchain_request( ) -> DbResult<BlockchainResponse> { match req { BlockchainWriteRequest::WriteBlock(block) => write_block(env, block), + BlockchainWriteRequest::BatchWriteBlocks(blocks) => write_blocks(env, blocks), BlockchainWriteRequest::WriteAltBlock(alt_block) => write_alt_block(env, alt_block), BlockchainWriteRequest::PopBlocks(numb_blocks) => pop_blocks(env, *numb_blocks), BlockchainWriteRequest::ReverseReorg(old_main_chain_id) => { @@ -80,6 +81,33 @@ fn write_block(env: &ConcreteEnv, block: &VerifiedBlockInformation) -> ResponseR } } +/// [`BlockchainWriteRequest::BatchWriteBlocks`]. +#[inline] +fn write_blocks(env: &ConcreteEnv, block: &Vec<VerifiedBlockInformation>) -> ResponseResult { + let env_inner = env.env_inner(); + let tx_rw = env_inner.tx_rw()?; + + let result = { + let mut tables_mut = env_inner.open_tables_mut(&tx_rw)?; + for block in block { + crate::ops::block::add_block(block, &mut tables_mut)?; + } + + Ok(()) + }; + + match result { + Ok(()) => { + TxRw::commit(tx_rw)?; + Ok(BlockchainResponse::Ok) + } + Err(e) => { + TxRw::abort(tx_rw).expect(TX_RW_ABORT_FAIL); + Err(e) + } + } +} + /// [`BlockchainWriteRequest::WriteAltBlock`]. #[inline] fn write_alt_block(env: &ConcreteEnv, block: &AltBlockInformation) -> ResponseResult { diff --git a/types/src/blockchain.rs b/types/src/blockchain.rs index 370065c8..07cf925d 100644 --- a/types/src/blockchain.rs +++ b/types/src/blockchain.rs @@ -44,6 +44,11 @@ pub enum BlockchainReadRequest { /// The input is the block's height and the chain it is on. BlockHash(usize, Chain), + /// Request a range of block's hashes. + /// + /// The input is the range of block heights and the chain it is on. + BlockHashInRange(Range<usize>, Chain), + /// Request to check if we have a block and which [`Chain`] it is on. /// /// The input is the block's hash. @@ -173,6 +178,11 @@ pub enum BlockchainWriteRequest { /// Input is an already verified block. WriteBlock(VerifiedBlockInformation), + /// Request that a batch of blocks be written to the database. + /// + /// Input is an already verified batch of blocks. + BatchWriteBlocks(Vec<VerifiedBlockInformation>), + /// Write an alternative block to the database, /// /// Input is the alternative block. @@ -229,6 +239,11 @@ pub enum BlockchainResponse { /// Inner value is the hash of the requested block. BlockHash([u8; 32]), + /// Response to [`BlockchainReadRequest::BlockHashInRange`]. + /// + /// Inner value is the hashes of the requested blocks, in order. + BlockHashInRange(Vec<[u8; 32]>), + /// Response to [`BlockchainReadRequest::FindBlock`]. /// /// Inner value is the chain and height of the block if found.