cuprated: Add fast sync ()

* add specific method for context

* add new statemachine for tx verification

* fix consensus crates build

* working builds

* fix CI

* add docs

* fix CI

* fix docs

* fix clippy

* cleanup

* add docs to `blockchain_context`

* fix doc tests

* add output cache

* new monero-serai

* todo

* todo

* Revert "new monero-serai"

This reverts commit fe3f6acc67.

* use indexmap to request outputs

* clean up

* fix typos

* fix CI

* fix cargo hack

* fix reorgs

* check if a block is already present before adding it to the alt block cache

* fmt

* update to new monero oxide API

* fmt & fix cache

* update config values

* fix tests

* add fast sync

* fix start_height check

* disable kill switch for now

* add fast sync config options

* add docs

* wait for all blocks to download before starting the syncer again.

* fix permit

* typo

* fix import order

* fmt

* add docs + order imports

* fix clippy

* review fixes

* rename top -> stop
This commit is contained in:
Boog900 2025-03-08 01:54:34 +00:00 committed by GitHub
parent 5c2b56c78e
commit 1549a88618
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
27 changed files with 653 additions and 414 deletions

8
Cargo.lock generated
View file

@ -774,18 +774,16 @@ dependencies = [
name = "cuprate-fast-sync"
version = "0.1.0"
dependencies = [
"blake3",
"clap",
"cuprate-blockchain",
"cuprate-consensus",
"cuprate-consensus-context",
"cuprate-consensus-rules",
"cuprate-helper",
"cuprate-p2p",
"cuprate-p2p-core",
"cuprate-types",
"hex",
"hex-literal",
"monero-serai",
"sha3",
"thiserror",
"tokio",
"tower 0.5.1",
]

View file

@ -20,11 +20,13 @@ use cuprate_types::{
use crate::constants::PANIC_CRITICAL_SERVICE_ERROR;
mod chain_service;
mod fast_sync;
pub mod interface;
mod manager;
mod syncer;
mod types;
pub use fast_sync::set_fast_sync_hashes;
pub use manager::init_blockchain_manager;
pub use types::ConsensusBlockchainReadHandle;

View file

@ -4,7 +4,9 @@ use futures::{future::BoxFuture, FutureExt, TryFutureExt};
use tower::Service;
use cuprate_blockchain::service::BlockchainReadHandle;
use cuprate_fast_sync::validate_entries;
use cuprate_p2p::block_downloader::{ChainSvcRequest, ChainSvcResponse};
use cuprate_p2p_core::NetworkZone;
use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse};
/// That service that allows retrieving the chain state to give to the P2P crates, so we can figure out
@ -14,8 +16,8 @@ use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse};
#[derive(Clone)]
pub struct ChainService(pub BlockchainReadHandle);
impl Service<ChainSvcRequest> for ChainService {
type Response = ChainSvcResponse;
impl<N: NetworkZone> Service<ChainSvcRequest<N>> for ChainService {
type Response = ChainSvcResponse<N>;
type Error = tower::BoxError;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
@ -23,7 +25,7 @@ impl Service<ChainSvcRequest> for ChainService {
self.0.poll_ready(cx).map_err(Into::into)
}
fn call(&mut self, req: ChainSvcRequest) -> Self::Future {
fn call(&mut self, req: ChainSvcRequest<N>) -> Self::Future {
let map_res = |res: BlockchainResponse| match res {
BlockchainResponse::CompactChainHistory {
block_ids,
@ -67,6 +69,18 @@ impl Service<ChainSvcRequest> for ChainService {
})
.map_err(Into::into)
.boxed(),
ChainSvcRequest::ValidateEntries(entries, start_height) => {
let mut blockchain_read_handle = self.0.clone();
async move {
let (valid, unknown) =
validate_entries(entries, start_height, &mut blockchain_read_handle)
.await?;
Ok(ChainSvcResponse::ValidateEntries { valid, unknown })
}
.boxed()
}
}
}
}

View file

@ -0,0 +1,24 @@
use std::slice;
use cuprate_helper::network::Network;
/// The hashes of the compiled in fast sync file.
static FAST_SYNC_HASHES: &[[u8; 32]] = {
let bytes = include_bytes!("./fast_sync/fast_sync_hashes.bin");
if bytes.len() % 32 == 0 {
// SAFETY: The file byte length must be perfectly divisible by 32, checked above.
unsafe { slice::from_raw_parts(bytes.as_ptr().cast::<[u8; 32]>(), bytes.len() / 32) }
} else {
panic!();
}
};
/// Set the fast-sync hashes according to the provided values.
pub fn set_fast_sync_hashes(fast_sync: bool, network: Network) {
cuprate_fast_sync::set_fast_sync_hashes(if fast_sync && network == Network::Mainnet {
FAST_SYNC_HASHES
} else {
&[]
});
}

View file

@ -2,7 +2,7 @@ use std::{collections::HashMap, sync::Arc};
use futures::StreamExt;
use monero_serai::block::Block;
use tokio::sync::{mpsc, oneshot, Notify};
use tokio::sync::{mpsc, oneshot, Notify, OwnedSemaphorePermit};
use tower::{BoxError, Service, ServiceExt};
use tracing::error;
@ -106,15 +106,17 @@ impl BlockchainManager {
/// The [`BlockchainManager`] task.
pub async fn run(
mut self,
mut block_batch_rx: mpsc::Receiver<BlockBatch>,
mut block_batch_rx: mpsc::Receiver<(BlockBatch, Arc<OwnedSemaphorePermit>)>,
mut command_rx: mpsc::Receiver<BlockchainManagerCommand>,
) {
loop {
tokio::select! {
Some(batch) = block_batch_rx.recv() => {
Some((batch, permit)) = block_batch_rx.recv() => {
self.handle_incoming_block_batch(
batch,
).await;
drop(permit);
}
Some(incoming_command) = command_rx.recv() => {
self.handle_command(incoming_command).await;

View file

@ -1,4 +1,6 @@
//! The blockchain manager handler functions.
use std::{collections::HashMap, sync::Arc};
use bytes::Bytes;
use futures::{TryFutureExt, TryStreamExt};
use monero_serai::{
@ -6,10 +8,8 @@ use monero_serai::{
transaction::{Input, Transaction},
};
use rayon::prelude::*;
use std::ops::ControlFlow;
use std::{collections::HashMap, sync::Arc};
use tower::{Service, ServiceExt};
use tracing::{info, instrument};
use tracing::{info, instrument, Span};
use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle};
use cuprate_consensus::{
@ -21,12 +21,13 @@ use cuprate_consensus::{
BlockChainContextRequest, BlockChainContextResponse, ExtendedConsensusError,
};
use cuprate_consensus_context::NewBlockData;
use cuprate_fast_sync::{block_to_verified_block_information, fast_sync_stop_height};
use cuprate_helper::cast::usize_to_u64;
use cuprate_p2p::{block_downloader::BlockBatch, constants::LONG_BAN, BroadcastRequest};
use cuprate_txpool::service::interface::TxpoolWriteRequest;
use cuprate_types::{
blockchain::{BlockchainReadRequest, BlockchainResponse, BlockchainWriteRequest},
AltBlockInformation, HardFork, TransactionVerificationData, VerifiedBlockInformation,
AltBlockInformation, Chain, HardFork, TransactionVerificationData, VerifiedBlockInformation,
};
use crate::{
@ -166,6 +167,11 @@ impl super::BlockchainManager {
/// This function will panic if any internal service returns an unexpected error that we cannot
/// recover from or if the incoming batch contains no blocks.
async fn handle_incoming_block_batch_main_chain(&mut self, batch: BlockBatch) {
if batch.blocks.last().unwrap().0.number().unwrap() < fast_sync_stop_height() {
self.handle_incoming_block_batch_fast_sync(batch).await;
return;
}
let Ok((prepped_blocks, mut output_cache)) = batch_prepare_main_chain_blocks(
batch.blocks,
&mut self.blockchain_context_service,
@ -195,7 +201,32 @@ impl super::BlockchainManager {
self.add_valid_block_to_main_chain(verified_block).await;
}
info!("Successfully added block batch");
info!(fast_sync = false, "Successfully added block batch");
}
/// Handles an incoming block batch while we are under the fast sync height.
///
/// # Panics
///
/// This function will panic if any internal service returns an unexpected error that we cannot
/// recover from.
async fn handle_incoming_block_batch_fast_sync(&mut self, batch: BlockBatch) {
let mut valid_blocks = Vec::with_capacity(batch.blocks.len());
for (block, txs) in batch.blocks {
let block = block_to_verified_block_information(
block,
txs,
self.blockchain_context_service.blockchain_context(),
);
self.add_valid_block_to_blockchain_cache(&block).await;
valid_blocks.push(block);
}
self.batch_add_valid_block_to_blockchain_database(valid_blocks)
.await;
info!(fast_sync = true, "Successfully added block batch");
}
/// Handles an incoming [`BlockBatch`] that does not follow the main-chain.
@ -212,7 +243,6 @@ impl super::BlockchainManager {
/// recover from.
async fn handle_incoming_block_batch_alt_chain(&mut self, mut batch: BlockBatch) {
// TODO: this needs testing (this whole section does but alt-blocks specifically).
let mut blocks = batch.blocks.into_iter();
while let Some((block, txs)) = blocks.next() {
@ -248,6 +278,8 @@ impl super::BlockchainManager {
Ok(AddAltBlock::Cached) => (),
}
}
info!(alt_chain = true, "Successfully added block batch");
}
/// Handles an incoming alt [`Block`].
@ -284,9 +316,10 @@ impl super::BlockchainManager {
unreachable!();
};
if chain.is_some() {
// The block could also be in the main-chain here under some circumstances.
return Ok(AddAltBlock::Cached);
match chain {
Some((Chain::Alt(_), _)) => return Ok(AddAltBlock::Cached),
Some((Chain::Main, _)) => anyhow::bail!("Alt block already in main chain"),
None => (),
}
let alt_block_info =
@ -458,22 +491,8 @@ impl super::BlockchainManager {
})
.collect::<Vec<[u8; 32]>>();
self.blockchain_context_service
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(BlockChainContextRequest::Update(NewBlockData {
block_hash: verified_block.block_hash,
height: verified_block.height,
timestamp: verified_block.block.header.timestamp,
weight: verified_block.weight,
long_term_weight: verified_block.long_term_weight,
generated_coins: verified_block.generated_coins,
vote: HardFork::from_vote(verified_block.block.header.hardfork_signal),
cumulative_difficulty: verified_block.cumulative_difficulty,
}))
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR);
self.add_valid_block_to_blockchain_cache(&verified_block)
.await;
self.blockchain_write_handle
.ready()
@ -491,11 +510,60 @@ impl super::BlockchainManager {
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR);
}
/// Adds a [`VerifiedBlockInformation`] to the blockchain context cache.
///
/// # Panics
///
/// This function will panic if any internal service returns an unexpected error that we cannot
/// recover from.
async fn add_valid_block_to_blockchain_cache(
&mut self,
verified_block: &VerifiedBlockInformation,
) {
self.blockchain_context_service
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(BlockChainContextRequest::Update(NewBlockData {
block_hash: verified_block.block_hash,
height: verified_block.height,
timestamp: verified_block.block.header.timestamp,
weight: verified_block.weight,
long_term_weight: verified_block.long_term_weight,
generated_coins: verified_block.generated_coins,
vote: HardFork::from_vote(verified_block.block.header.hardfork_signal),
cumulative_difficulty: verified_block.cumulative_difficulty,
}))
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR);
}
/// Batch writes the [`VerifiedBlockInformation`]s to the database.
///
/// The blocks must be sequential.
///
/// # Panics
///
/// This function will panic if any internal service returns an unexpected error that we cannot
/// recover from.
async fn batch_add_valid_block_to_blockchain_database(
&mut self,
blocks: Vec<VerifiedBlockInformation>,
) {
self.blockchain_write_handle
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(BlockchainWriteRequest::BatchWriteBlocks(blocks))
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR);
}
}
/// The result from successfully adding an alt-block.
enum AddAltBlock {
/// The alt-block was cached or was already present in the DB.
/// The alt-block was cached.
Cached,
/// The chain was reorged.
Reorged,

View file

@ -3,7 +3,7 @@ use std::{sync::Arc, time::Duration};
use futures::StreamExt;
use tokio::{
sync::{mpsc, oneshot, Notify},
sync::{mpsc, Notify, OwnedSemaphorePermit, Semaphore},
time::interval,
};
use tower::{Service, ServiceExt};
@ -15,7 +15,7 @@ use cuprate_p2p::{
block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse},
NetworkInterface, PeerSetRequest, PeerSetResponse,
};
use cuprate_p2p_core::ClearNet;
use cuprate_p2p_core::{ClearNet, NetworkZone};
const CHECK_SYNC_FREQUENCY: Duration = Duration::from_secs(30);
@ -30,17 +30,21 @@ pub enum SyncerError {
/// The syncer tasks that makes sure we are fully synchronised with our connected peers.
#[instrument(level = "debug", skip_all)]
#[expect(clippy::significant_drop_tightening)]
pub async fn syncer<CN>(
mut context_svc: BlockchainContextService,
our_chain: CN,
mut clearnet_interface: NetworkInterface<ClearNet>,
incoming_block_batch_tx: mpsc::Sender<BlockBatch>,
incoming_block_batch_tx: mpsc::Sender<(BlockBatch, Arc<OwnedSemaphorePermit>)>,
stop_current_block_downloader: Arc<Notify>,
block_downloader_config: BlockDownloaderConfig,
) -> Result<(), SyncerError>
where
CN: Service<ChainSvcRequest, Response = ChainSvcResponse, Error = tower::BoxError>
+ Clone
CN: Service<
ChainSvcRequest<ClearNet>,
Response = ChainSvcResponse<ClearNet>,
Error = tower::BoxError,
> + Clone
+ Send
+ 'static,
CN::Future: Send + 'static,
@ -51,6 +55,9 @@ where
tracing::debug!("Waiting for new sync info in top sync channel");
let semaphore = Arc::new(Semaphore::new(1));
let mut sync_permit = Arc::new(Arc::clone(&semaphore).acquire_owned().await.unwrap());
loop {
check_sync_interval.tick().await;
@ -72,10 +79,19 @@ where
tokio::select! {
() = stop_current_block_downloader.notified() => {
tracing::info!("Received stop signal, stopping block downloader");
drop(sync_permit);
sync_permit = Arc::new(Arc::clone(&semaphore).acquire_owned().await.unwrap());
break;
}
batch = block_batch_stream.next() => {
let Some(batch) = batch else {
// Wait for all references to the permit have been dropped (which means all blocks in the queue
// have been handled before checking if we are synced.
drop(sync_permit);
sync_permit = Arc::new(Arc::clone(&semaphore).acquire_owned().await.unwrap());
let blockchain_context = context_svc.blockchain_context();
if !check_behind_peers(blockchain_context, &mut clearnet_interface).await? {
@ -86,7 +102,7 @@ where
};
tracing::debug!("Got batch, len: {}", batch.blocks.len());
if incoming_block_batch_tx.send(batch).await.is_err() {
if incoming_block_batch_tx.send((batch, Arc::clone(&sync_permit))).await.is_err() {
return Err(SyncerError::IncomingBlockChannelClosed);
}
}

View file

@ -47,6 +47,9 @@ pub enum Command {
/// Print status information on `cuprated`.
Status,
/// Print the height of first block not contained in the fast sync hashes.
FastSyncStopHeight,
}
/// The log output target.
@ -123,6 +126,11 @@ pub async fn io_loop(
println!("STATUS:\n uptime: {h}h {m}m {s}s,\n height: {height},\n top_hash: {top_hash}");
}
Command::FastSyncStopHeight => {
let stop_height = cuprate_fast_sync::fast_sync_stop_height();
println!("{stop_height}");
}
}
}
}

View file

@ -83,6 +83,8 @@ pub struct Config {
/// The network we should run on.
network: Network,
pub no_fast_sync: bool,
/// [`tracing`] config.
pub tracing: TracingConfig,

View file

@ -20,6 +20,13 @@ pub struct Args {
)]
pub network: Network,
/// Disable fast sync, all past blocks will undergo full verification when syncing.
///
/// This significantly increases initial sync time. This provides no extra security, you just
/// have to trust the devs to insert the correct hashes (which are verifiable).
#[arg(long)]
no_fast_sync: bool,
/// The amount of outbound clear-net connections to maintain.
#[arg(long)]
pub outbound_connections: Option<usize>,
@ -64,6 +71,7 @@ impl Args {
/// This may exit the program if a config value was set that requires an early exit.
pub const fn apply_args(&self, mut config: Config) -> Config {
config.network = self.network;
config.no_fast_sync = config.no_fast_sync || self.no_fast_sync;
if let Some(outbound_connections) = self.outbound_connections {
config.p2p.clear_net.general.outbound_connections = outbound_connections;

View file

@ -55,6 +55,8 @@ fn main() {
let config = config::read_config_and_args();
blockchain::set_fast_sync_hashes(!config.no_fast_sync, config.network());
// Initialize logging.
logging::init_logging(&config);
@ -82,6 +84,15 @@ fn main() {
// Initialize async tasks.
rt.block_on(async move {
// TODO: we could add an option for people to keep these like monerod?
blockchain_write_handle
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(BlockchainWriteRequest::FlushAltBlocks)
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR);
// Check add the genesis block to the blockchain.
blockchain::check_add_genesis(
&mut blockchain_read_handle,

View file

@ -5,23 +5,21 @@ edition = "2021"
license = "MIT"
[[bin]]
name = "cuprate-fast-sync-create-hashes"
name = "create-fs-file"
path = "src/create.rs"
[dependencies]
cuprate-blockchain = { workspace = true }
cuprate-consensus = { workspace = true }
cuprate-consensus-rules = { workspace = true }
cuprate-consensus-context = { workspace = true }
cuprate-types = { workspace = true }
cuprate-helper = { workspace = true, features = ["cast"] }
cuprate-p2p = { workspace = true }
cuprate-p2p-core = { workspace = true }
clap = { workspace = true, features = ["derive", "std"] }
hex = { workspace = true }
hex-literal = { workspace = true }
monero-serai = { workspace = true }
sha3 = { version = "0.10.8" }
thiserror = { workspace = true }
blake3 = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tower = { workspace = true }

View file

@ -3,7 +3,7 @@
reason = "binary shares same Cargo.toml as library"
)]
use std::{fmt::Write, fs::write};
use std::fs::write;
use clap::Parser;
use tower::{Service, ServiceExt};
@ -16,48 +16,30 @@ use cuprate_types::{
Chain,
};
use cuprate_fast_sync::{hash_of_hashes, BlockId, HashOfHashes};
const BATCH_SIZE: usize = 512;
use cuprate_fast_sync::FAST_SYNC_BATCH_LEN;
async fn read_batch(
handle: &mut BlockchainReadHandle,
height_from: usize,
) -> DbResult<Vec<BlockId>> {
let mut block_ids = Vec::<BlockId>::with_capacity(BATCH_SIZE);
) -> DbResult<Vec<[u8; 32]>> {
let request = BlockchainReadRequest::BlockHashInRange(
height_from..(height_from + FAST_SYNC_BATCH_LEN),
Chain::Main,
);
let response_channel = handle.ready().await?.call(request);
let response = response_channel.await?;
for height in height_from..(height_from + BATCH_SIZE) {
let request = BlockchainReadRequest::BlockHash(height, Chain::Main);
let response_channel = handle.ready().await?.call(request);
let response = response_channel.await?;
match response {
BlockchainResponse::BlockHash(block_id) => block_ids.push(block_id),
_ => unreachable!(),
}
}
let BlockchainResponse::BlockHashInRange(block_ids) = response else {
unreachable!()
};
Ok(block_ids)
}
fn generate_hex(hashes: &[HashOfHashes]) -> String {
let mut s = String::new();
writeln!(&mut s, "[").unwrap();
for hash in hashes {
writeln!(&mut s, "\thex!(\"{}\"),", hex::encode(hash)).unwrap();
}
writeln!(&mut s, "]").unwrap();
s
}
#[derive(Parser)]
#[command(version, about, long_about = None)]
struct Args {
#[arg(short, long)]
#[arg(long)]
height: usize,
}
@ -74,7 +56,7 @@ async fn main() {
let mut height = 0_usize;
while height < height_target {
while (height + FAST_SYNC_BATCH_LEN) < height_target {
if let Ok(block_ids) = read_batch(&mut read_handle, height).await {
let hash = hash_of_hashes(block_ids.as_slice());
hashes_of_hashes.push(hash);
@ -82,13 +64,22 @@ async fn main() {
println!("Failed to read next batch from database");
break;
}
height += BATCH_SIZE;
height += FAST_SYNC_BATCH_LEN;
println!("height: {height}");
}
drop(read_handle);
let generated = generate_hex(&hashes_of_hashes);
write("src/data/hashes_of_hashes", generated).expect("Could not write file");
write(
"data/fast_sync_hashes.bin",
hashes_of_hashes.concat().as_slice(),
)
.expect("Could not write file");
println!("Generated hashes up to block height {height}");
}
pub fn hash_of_hashes(hashes: &[[u8; 32]]) -> [u8; 32] {
blake3::hash(hashes.concat().as_slice()).into()
}

View file

@ -1,12 +0,0 @@
[
hex_literal::hex!("1adffbaf832784406018009e07d3dc3a39da7edb6632523c119ed8acb32eb934"),
hex_literal::hex!("ae960265e3398d04f3cd4f949ed13c2689424887c71c1441a03d900a9d3a777f"),
hex_literal::hex!("938c72d267bbd3a17cdecbe02443d00012ee62d6e9f3524f5a914192110b1798"),
hex_literal::hex!("de0c82e51549b6514b42a591fd5440dddb5cc0118ec461459a99017bf06a0a0a"),
hex_literal::hex!("9a50f4586ec7e0fb58c6383048d3b334180235fd34bb714af20f1a3ebce4c911"),
hex_literal::hex!("5a3942f9bb318d65997bf57c40e045d62e7edbe35f3dae57499c2c5554896543"),
hex_literal::hex!("9dccee3b094cdd1b98e357c2c81bfcea798ea75efd94e67c6f5e86f428c5ec2c"),
hex_literal::hex!("620397540d44f21c3c57c20e9d47c6aaf0b1bf4302a4d43e75f2e33edd1a4032"),
hex_literal::hex!("ef6c612fb17bd70ac2ac69b2f85a421b138cc3a81daf622b077cb402dbf68377"),
hex_literal::hex!("6815ecb2bd73a3ba5f20558bfe1b714c30d6892b290e0d6f6cbf18237cedf75a"),
]

View file

@ -1,225 +1,214 @@
use std::{
cmp,
collections::HashMap,
future::Future,
pin::Pin,
task::{Context, Poll},
cmp::min,
collections::{HashMap, VecDeque},
sync::OnceLock,
};
use blake3::Hasher;
use monero_serai::{
block::Block,
transaction::{Input, Transaction},
};
use tower::Service;
use tower::{Service, ServiceExt};
use cuprate_blockchain::service::BlockchainReadHandle;
use cuprate_consensus::transactions::new_tx_verification_data;
use cuprate_consensus_context::BlockchainContextService;
use cuprate_consensus_rules::{miner_tx::MinerTxError, ConsensusError};
use cuprate_helper::cast::u64_to_usize;
use cuprate_types::{VerifiedBlockInformation, VerifiedTransactionInformation};
use cuprate_consensus_context::BlockchainContext;
use cuprate_p2p::block_downloader::ChainEntry;
use cuprate_p2p_core::NetworkZone;
use cuprate_types::{
blockchain::{BlockchainReadRequest, BlockchainResponse},
Chain, VerifiedBlockInformation, VerifiedTransactionInformation,
};
use crate::{hash_of_hashes, BlockId, HashOfHashes};
/// A [`OnceLock`] representing the fast sync hashes.
static FAST_SYNC_HASHES: OnceLock<&[[u8; 32]]> = OnceLock::new();
#[cfg(not(test))]
static HASHES_OF_HASHES: &[HashOfHashes] = &include!("./data/hashes_of_hashes");
/// The size of a batch of block hashes to hash to create a fast sync hash.
pub const FAST_SYNC_BATCH_LEN: usize = 512;
#[cfg(not(test))]
const BATCH_SIZE: usize = 512;
#[cfg(test)]
static HASHES_OF_HASHES: &[HashOfHashes] = &[
hex_literal::hex!("3fdc9032c16d440f6c96be209c36d3d0e1aed61a2531490fe0ca475eb615c40a"),
hex_literal::hex!("0102030405060708010203040506070801020304050607080102030405060708"),
hex_literal::hex!("0102030405060708010203040506070801020304050607080102030405060708"),
];
#[cfg(test)]
const BATCH_SIZE: usize = 4;
#[inline]
fn max_height() -> u64 {
(HASHES_OF_HASHES.len() * BATCH_SIZE) as u64
/// Returns the height of the first block not included in the embedded hashes.
///
/// # Panics
///
/// This function will panic if [`set_fast_sync_hashes`] has not been called.
pub fn fast_sync_stop_height() -> usize {
FAST_SYNC_HASHES.get().unwrap().len() * FAST_SYNC_BATCH_LEN
}
#[derive(Debug, PartialEq, Eq)]
pub struct ValidBlockId(BlockId);
fn valid_block_ids(block_ids: &[BlockId]) -> Vec<ValidBlockId> {
block_ids.iter().map(|b| ValidBlockId(*b)).collect()
/// Sets the hashes to use for fast-sync.
///
/// # Panics
///
/// This will panic if this is called more than once.
pub fn set_fast_sync_hashes(hashes: &'static [[u8; 32]]) {
FAST_SYNC_HASHES.set(hashes).unwrap();
}
#[expect(clippy::large_enum_variant)]
pub enum FastSyncRequest {
ValidateHashes {
start_height: u64,
block_ids: Vec<BlockId>,
},
ValidateBlock {
block: Block,
txs: HashMap<[u8; 32], Transaction>,
token: ValidBlockId,
},
}
#[expect(clippy::large_enum_variant)]
#[derive(Debug, PartialEq, Eq)]
pub enum FastSyncResponse {
ValidateHashes {
validated_hashes: Vec<ValidBlockId>,
unknown_hashes: Vec<BlockId>,
},
ValidateBlock(VerifiedBlockInformation),
}
#[derive(thiserror::Error, Debug, PartialEq, Eq)]
pub enum FastSyncError {
#[error("Block does not match its expected hash")]
BlockHashMismatch,
#[error("Start height must be a multiple of the batch size")]
InvalidStartHeight,
#[error("Hash of hashes mismatch")]
Mismatch,
#[error("Given range too small for fast sync (less than one batch)")]
NothingToDo,
#[error("Start height too high for fast sync")]
OutOfRange,
#[error("Block does not have the expected height entry")]
BlockHeightMismatch,
#[error("Block does not contain the expected transaction list")]
TxsIncludedWithBlockIncorrect,
#[error(transparent)]
Consensus(#[from] ConsensusError),
#[error(transparent)]
MinerTx(#[from] MinerTxError),
#[error("Database error: {0}")]
DbErr(String),
}
impl From<tower::BoxError> for FastSyncError {
fn from(error: tower::BoxError) -> Self {
Self::DbErr(error.to_string())
}
}
pub struct FastSyncService {
context_svc: BlockchainContextService,
}
impl FastSyncService {
#[expect(dead_code)]
pub(crate) const fn new(context_svc: BlockchainContextService) -> Self {
Self { context_svc }
}
}
impl Service<FastSyncRequest> for FastSyncService {
type Response = FastSyncResponse;
type Error = FastSyncError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
/// Validates that the given [`ChainEntry`]s are in the fast-sync hashes.
///
/// `entries` should be a list of sequential entries.
/// `start_height` should be the height of the first block in the first entry.
///
/// Returns a tuple, the first element being the entries that are valid* the second
/// the entries we do not know are valid and should be passed in again when we have more entries.
///
/// *once we are passed the fast sync blocks all entries will be returned as valid as
/// we can not check their validity here.
///
/// There may be more entries returned than passed in as entries could be split.
///
/// # Panics
///
/// This will panic if [`set_fast_sync_hashes`] has not been called.
pub async fn validate_entries<N: NetworkZone>(
mut entries: VecDeque<ChainEntry<N>>,
start_height: usize,
blockchain_read_handle: &mut BlockchainReadHandle,
) -> Result<(VecDeque<ChainEntry<N>>, VecDeque<ChainEntry<N>>), tower::BoxError> {
// if we are past the top fast sync block return all entries as valid.
if start_height >= fast_sync_stop_height() {
return Ok((entries, VecDeque::new()));
}
fn call(&mut self, req: FastSyncRequest) -> Self::Future {
let mut context_svc = self.context_svc.clone();
/*
The algorithm used here needs to preserve which peer told us about which blocks, so we cannot
simply join all the hashes together return all the ones that can be validated and the ones that
can't, we need to keep the batches separate.
Box::pin(async move {
match req {
FastSyncRequest::ValidateHashes {
start_height,
block_ids,
} => validate_hashes(start_height, &block_ids),
FastSyncRequest::ValidateBlock { block, txs, token } => {
validate_block(&mut context_svc, block, txs, &token)
}
}
})
}
}
The first step is to calculate how many hashes we need from the blockchain to make up the first
fast-sync hash.
fn validate_hashes(
start_height: u64,
block_ids: &[BlockId],
) -> Result<FastSyncResponse, FastSyncError> {
let start_height_usize = u64_to_usize(start_height);
Then will take out all the batches at the end for which we cannot make up a full fast-sync hash
for, we will split a batch if it can only be partially validated.
if start_height_usize % BATCH_SIZE != 0 {
return Err(FastSyncError::InvalidStartHeight);
}
With the remaining hashes from the blockchain and the hashes in the batches we can validate we
work on calculating the fast sync hashes and comparing them to the ones in [`FAST_SYNC_HASHES`].
*/
if start_height >= max_height() {
return Err(FastSyncError::OutOfRange);
}
// First calculate the start and stop for this range of hashes.
let hashes_start_height = (start_height / FAST_SYNC_BATCH_LEN) * FAST_SYNC_BATCH_LEN;
let amount_of_hashes = entries.iter().map(|e| e.ids.len()).sum::<usize>();
let last_height = amount_of_hashes + start_height;
let stop_height = start_height_usize + block_ids.len();
let hashes_stop_height = min(
(last_height / FAST_SYNC_BATCH_LEN) * FAST_SYNC_BATCH_LEN,
fast_sync_stop_height(),
);
let batch_from = start_height_usize / BATCH_SIZE;
let batch_to = cmp::min(stop_height / BATCH_SIZE, HASHES_OF_HASHES.len());
let n_batches = batch_to - batch_from;
let mut hashes_stop_diff_last_height = last_height - hashes_stop_height;
if n_batches == 0 {
return Err(FastSyncError::NothingToDo);
}
let mut unknown = VecDeque::new();
for i in 0..n_batches {
let batch = &block_ids[BATCH_SIZE * i..BATCH_SIZE * (i + 1)];
let actual = hash_of_hashes(batch);
let expected = HASHES_OF_HASHES[batch_from + i];
// start moving from the back of the batches taking enough hashes out so we are only left with hashes
// that can be verified.
while !entries.is_empty() && hashes_stop_diff_last_height != 0 {
let back = entries.back_mut().unwrap();
if expected != actual {
return Err(FastSyncError::Mismatch);
if back.ids.len() >= hashes_stop_diff_last_height {
// This batch is partially valid so split it.
unknown.push_front(ChainEntry {
ids: back
.ids
.drain((back.ids.len() - hashes_stop_diff_last_height)..)
.collect(),
peer: back.peer,
handle: back.handle.clone(),
});
break;
}
// Add this batch to the front of the unknowns, we do not know its validity.
let back = entries.pop_back().unwrap();
hashes_stop_diff_last_height -= back.ids.len();
unknown.push_front(back);
}
let validated_hashes = valid_block_ids(&block_ids[..n_batches * BATCH_SIZE]);
let unknown_hashes = block_ids[n_batches * BATCH_SIZE..].to_vec();
// get the hashes we are missing to create the first fast-sync hash.
let BlockchainResponse::BlockHashInRange(hashes) = blockchain_read_handle
.ready()
.await?
.call(BlockchainReadRequest::BlockHashInRange(
hashes_start_height..start_height,
Chain::Main,
))
.await?
else {
unreachable!()
};
Ok(FastSyncResponse::ValidateHashes {
validated_hashes,
unknown_hashes,
})
// Start verifying the hashes.
let mut hasher = Hasher::default();
let mut last_i = 1;
for (i, hash) in hashes
.iter()
.chain(entries.iter().flat_map(|e| e.ids.iter()))
.enumerate()
{
hasher.update(hash);
if (i + 1) % FAST_SYNC_BATCH_LEN == 0 {
let got_hash = hasher.finalize();
if got_hash
!= FAST_SYNC_HASHES.get().unwrap()
[get_hash_index_for_height(hashes_start_height + i)]
{
return Err("Hashes do not match".into());
}
hasher.reset();
}
last_i = i + 1;
}
// Make sure we actually checked all hashes.
assert_eq!(last_i % FAST_SYNC_BATCH_LEN, 0);
Ok((entries, unknown))
}
fn validate_block(
context_svc: &mut BlockchainContextService,
block: Block,
mut txs: HashMap<[u8; 32], Transaction>,
token: &ValidBlockId,
) -> Result<FastSyncResponse, FastSyncError> {
let block_chain_ctx = context_svc.blockchain_context().clone();
/// Get the index of the hash that contains this block in the fast sync hashes.
const fn get_hash_index_for_height(height: usize) -> usize {
height / FAST_SYNC_BATCH_LEN
}
/// Creates a [`VerifiedBlockInformation`] from a block known to be valid.
///
/// # Panics
///
/// This may panic if used on an invalid block.
pub fn block_to_verified_block_information(
block: Block,
txs: Vec<Transaction>,
blockchin_ctx: &BlockchainContext,
) -> VerifiedBlockInformation {
let block_hash = block.hash();
if block_hash != token.0 {
return Err(FastSyncError::BlockHashMismatch);
}
let block_blob = block.serialize();
let Some(Input::Gen(height)) = block.miner_transaction.prefix().inputs.first() else {
return Err(FastSyncError::MinerTx(MinerTxError::InputNotOfTypeGen));
panic!("fast sync block invalid");
};
if *height != block_chain_ctx.chain_height {
return Err(FastSyncError::BlockHeightMismatch);
}
assert_eq!(
*height, blockchin_ctx.chain_height,
"fast sync block invalid"
);
let mut txs = txs
.into_iter()
.map(|tx| {
let data = new_tx_verification_data(tx).expect("fast sync block invalid");
(data.tx_hash, data)
})
.collect::<HashMap<_, _>>();
let mut verified_txs = Vec::with_capacity(txs.len());
for tx in &block.transactions {
let tx = txs
.remove(tx)
.ok_or(FastSyncError::TxsIncludedWithBlockIncorrect)?;
let data = txs.remove(tx).expect("fast sync block invalid");
let data = new_tx_verification_data(tx)?;
verified_txs.push(VerifiedTransactionInformation {
tx_blob: data.tx_blob,
tx_weight: data.tx_weight,
@ -243,68 +232,16 @@ fn validate_block(
let weight = block.miner_transaction.weight()
+ verified_txs.iter().map(|tx| tx.tx_weight).sum::<usize>();
Ok(FastSyncResponse::ValidateBlock(VerifiedBlockInformation {
VerifiedBlockInformation {
block_blob,
txs: verified_txs,
block_hash,
pow_hash: [0_u8; 32],
pow_hash: [u8::MAX; 32],
height: *height,
generated_coins,
weight,
long_term_weight: block_chain_ctx.next_block_long_term_weight(weight),
cumulative_difficulty: block_chain_ctx.cumulative_difficulty
+ block_chain_ctx.next_difficulty,
long_term_weight: blockchin_ctx.next_block_long_term_weight(weight),
cumulative_difficulty: blockchin_ctx.cumulative_difficulty + blockchin_ctx.next_difficulty,
block,
}))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_validate_hashes_errors() {
let ids = [[1_u8; 32], [2_u8; 32], [3_u8; 32], [4_u8; 32], [5_u8; 32]];
assert_eq!(
validate_hashes(3, &[]),
Err(FastSyncError::InvalidStartHeight)
);
assert_eq!(
validate_hashes(3, &ids),
Err(FastSyncError::InvalidStartHeight)
);
assert_eq!(validate_hashes(20, &[]), Err(FastSyncError::OutOfRange));
assert_eq!(validate_hashes(20, &ids), Err(FastSyncError::OutOfRange));
assert_eq!(validate_hashes(4, &[]), Err(FastSyncError::NothingToDo));
assert_eq!(
validate_hashes(4, &ids[..3]),
Err(FastSyncError::NothingToDo)
);
}
#[test]
fn test_validate_hashes_success() {
let ids = [[1_u8; 32], [2_u8; 32], [3_u8; 32], [4_u8; 32], [5_u8; 32]];
let validated_hashes = valid_block_ids(&ids[0..4]);
let unknown_hashes = ids[4..].to_vec();
assert_eq!(
validate_hashes(0, &ids),
Ok(FastSyncResponse::ValidateHashes {
validated_hashes,
unknown_hashes
})
);
}
#[test]
fn test_validate_hashes_mismatch() {
let ids = [
[1_u8; 32], [2_u8; 32], [3_u8; 32], [5_u8; 32], [1_u8; 32], [2_u8; 32], [3_u8; 32],
[4_u8; 32],
];
assert_eq!(validate_hashes(0, &ids), Err(FastSyncError::Mismatch));
assert_eq!(validate_hashes(4, &ids), Err(FastSyncError::Mismatch));
}
}

View file

@ -4,7 +4,9 @@ use cuprate_blockchain as _;
use hex as _;
use tokio as _;
pub mod fast_sync;
pub mod util;
mod fast_sync;
pub use util::{hash_of_hashes, BlockId, HashOfHashes};
pub use fast_sync::{
block_to_verified_block_information, fast_sync_stop_height, set_fast_sync_hashes,
validate_entries, FAST_SYNC_BATCH_LEN,
};

View file

@ -1,8 +0,0 @@
use sha3::{Digest, Keccak256};
pub type BlockId = [u8; 32];
pub type HashOfHashes = [u8; 32];
pub fn hash_of_hashes(hashes: &[BlockId]) -> HashOfHashes {
Keccak256::digest(hashes.concat().as_slice()).into()
}

View file

@ -7,7 +7,7 @@
//! The block downloader is started by [`download_blocks`].
use std::{
cmp::{max, min, Reverse},
collections::{BTreeMap, BinaryHeap},
collections::{BTreeMap, BinaryHeap, VecDeque},
time::Duration,
};
@ -28,9 +28,9 @@ use cuprate_pruning::PruningSeed;
use crate::{
constants::{
BLOCK_DOWNLOADER_REQUEST_TIMEOUT, EMPTY_CHAIN_ENTRIES_BEFORE_TOP_ASSUMED, LONG_BAN,
MAX_BLOCK_BATCH_LEN, MAX_DOWNLOAD_FAILURES,
MAX_BLOCK_BATCH_LEN, MAX_DOWNLOAD_FAILURES, MOST_RECENT_BATCH_WEIGHTS_FOR_BATCH_SIZE,
},
peer_set::ClientDropGuard,
peer_set::{ClientDropGuard, PeerSetRequest, PeerSetResponse},
};
mod block_queue;
@ -40,9 +40,9 @@ mod request_chain;
#[cfg(test)]
mod tests;
use crate::peer_set::{PeerSetRequest, PeerSetResponse};
use block_queue::{BlockQueue, ReadyQueueBatch};
use chain_tracker::{BlocksToRetrieve, ChainEntry, ChainTracker};
pub use chain_tracker::ChainEntry;
use chain_tracker::{BlocksToRetrieve, ChainTracker};
use download_batch::download_batch_task;
use request_chain::{initial_chain_search, request_chain_entry_from_peer};
@ -95,17 +95,19 @@ pub(crate) enum BlockDownloadError {
}
/// The request type for the chain service.
pub enum ChainSvcRequest {
pub enum ChainSvcRequest<N: NetworkZone> {
/// A request for the current chain history.
CompactHistory,
/// A request to find the first unknown block ID in a list of block IDs.
FindFirstUnknown(Vec<[u8; 32]>),
/// A request for our current cumulative difficulty.
CumulativeDifficulty,
ValidateEntries(VecDeque<ChainEntry<N>>, usize),
}
/// The response type for the chain service.
pub enum ChainSvcResponse {
pub enum ChainSvcResponse<N: NetworkZone> {
/// The response for [`ChainSvcRequest::CompactHistory`].
CompactHistory {
/// A list of blocks IDs in our chain, starting with the most recent block, all the way to the genesis block.
@ -123,6 +125,11 @@ pub enum ChainSvcResponse {
///
/// The current cumulative difficulty of our chain.
CumulativeDifficulty(u128),
ValidateEntries {
valid: VecDeque<ChainEntry<N>>,
unknown: VecDeque<ChainEntry<N>>,
},
}
/// This function starts the block downloader and returns a [`BufferStream`] that will produce
@ -140,7 +147,7 @@ pub fn download_blocks<N: NetworkZone, C>(
config: BlockDownloaderConfig,
) -> BufferStream<BlockBatch>
where
C: Service<ChainSvcRequest, Response = ChainSvcResponse, Error = tower::BoxError>
C: Service<ChainSvcRequest<N>, Response = ChainSvcResponse<N>, Error = tower::BoxError>
+ Send
+ 'static,
C::Future: Send + 'static,
@ -191,10 +198,7 @@ struct BlockDownloader<N: NetworkZone, C> {
/// The service that holds our current chain state.
our_chain_svc: C,
/// The amount of blocks to request in the next batch.
amount_of_blocks_to_request: usize,
/// The height at which [`Self::amount_of_blocks_to_request`] was updated.
amount_of_blocks_to_request_updated_at: usize,
most_recent_batch_sizes: BinaryHeap<Reverse<(usize, BatchSizeInformation)>>,
/// The amount of consecutive empty chain entries we received.
///
@ -227,7 +231,7 @@ struct BlockDownloader<N: NetworkZone, C> {
impl<N: NetworkZone, C> BlockDownloader<N, C>
where
C: Service<ChainSvcRequest, Response = ChainSvcResponse, Error = tower::BoxError>
C: Service<ChainSvcRequest<N>, Response = ChainSvcResponse<N>, Error = tower::BoxError>
+ Send
+ 'static,
C::Future: Send + 'static,
@ -242,8 +246,7 @@ where
Self {
peer_set,
our_chain_svc,
amount_of_blocks_to_request: config.initial_batch_len,
amount_of_blocks_to_request_updated_at: 0,
most_recent_batch_sizes: BinaryHeap::new(),
amount_of_empty_chain_entries: 0,
block_download_tasks: JoinSet::new(),
chain_entry_task: JoinSet::new(),
@ -280,6 +283,28 @@ where
}
}
fn amount_of_blocks_to_request(&self) -> usize {
let biggest_batch = self
.most_recent_batch_sizes
.iter()
.max_by(|batch_1, batch_2| {
let av1 = batch_1.0 .1.byte_size / batch_1.0 .1.len;
let av2 = batch_2.0 .1.byte_size / batch_2.0 .1.len;
av1.cmp(&av2)
});
let Some(biggest_batch) = biggest_batch else {
return self.config.initial_batch_len;
};
calculate_next_block_batch_size(
biggest_batch.0 .1.byte_size,
biggest_batch.0 .1.len,
self.config.target_batch_bytes,
)
}
/// Attempts to send another request for an inflight batch
///
/// This function will find the batch(es) that we are waiting on to clear our ready queue and sends another request
@ -387,9 +412,10 @@ where
// No failed requests that we can handle, request some new blocks.
let Some(mut block_entry_to_get) = chain_tracker
.blocks_to_get(&client.info.pruning_seed, self.amount_of_blocks_to_request)
else {
let Some(mut block_entry_to_get) = chain_tracker.blocks_to_get(
&client.info.pruning_seed,
self.amount_of_blocks_to_request(),
) else {
return Some(client);
};
@ -428,7 +454,7 @@ where
&& self.amount_of_empty_chain_entries <= EMPTY_CHAIN_ENTRIES_BEFORE_TOP_ASSUMED
// Check we have a big buffer of pending block IDs to retrieve, we don't want to be waiting around
// for a chain entry.
&& chain_tracker.block_requests_queued(self.amount_of_blocks_to_request) < 500
&& chain_tracker.block_requests_queued(self.amount_of_blocks_to_request()) < 500
// Make sure this peer actually has the chain.
&& chain_tracker.should_ask_for_next_chain_entry(&client.info.pruning_seed)
{
@ -561,19 +587,25 @@ where
// If the batch is higher than the last time we updated `amount_of_blocks_to_request`, update it
// again.
if start_height > self.amount_of_blocks_to_request_updated_at {
self.amount_of_blocks_to_request = calculate_next_block_batch_size(
block_batch.size,
block_batch.blocks.len(),
self.config.target_batch_bytes,
);
if start_height
> self
.most_recent_batch_sizes
.peek()
.map(|Reverse((height, _))| *height)
.unwrap_or_default()
{
self.most_recent_batch_sizes.push(Reverse((
start_height,
BatchSizeInformation {
len: block_batch.blocks.len(),
byte_size: block_batch.size,
},
)));
tracing::debug!(
"Updating batch size of new batches, new size: {}",
self.amount_of_blocks_to_request
);
self.amount_of_blocks_to_request_updated_at = start_height;
if self.most_recent_batch_sizes.len() > MOST_RECENT_BATCH_WEIGHTS_FOR_BATCH_SIZE
{
self.most_recent_batch_sizes.pop();
}
}
self.block_queue
@ -642,12 +674,15 @@ where
Some(Ok(res)) = self.chain_entry_task.join_next() => {
match res {
Ok((client, entry)) => {
if chain_tracker.add_entry(entry).is_ok() {
tracing::debug!("Successfully added chain entry to chain tracker.");
self.amount_of_empty_chain_entries = 0;
} else {
tracing::debug!("Failed to add incoming chain entry to chain tracker.");
self.amount_of_empty_chain_entries += 1;
match chain_tracker.add_entry(entry, &mut self.our_chain_svc).await {
Ok(()) => {
tracing::debug!("Successfully added chain entry to chain tracker.");
self.amount_of_empty_chain_entries = 0;
}
Err(e) => {
tracing::debug!("Failed to add incoming chain entry to chain tracker: {e:?}");
self.amount_of_empty_chain_entries += 1;
}
}
pending_peers
@ -683,6 +718,12 @@ const fn client_has_block_in_range(
&& pruning_seed.has_full_block(start_height + length, MAX_BLOCK_HEIGHT_USIZE)
}
#[derive(Debug, Ord, PartialOrd, Eq, PartialEq)]
struct BatchSizeInformation {
len: usize,
byte_size: usize,
}
/// Calculates the next amount of blocks to request in a batch.
///
/// Parameters:

View file

@ -1,16 +1,20 @@
use std::{cmp::min, collections::VecDeque};
use std::{cmp::min, collections::VecDeque, mem};
use cuprate_fixed_bytes::ByteArrayVec;
use tower::{Service, ServiceExt};
use cuprate_constants::block::MAX_BLOCK_HEIGHT_USIZE;
use cuprate_p2p_core::{client::InternalPeerID, handles::ConnectionHandle, NetworkZone};
use cuprate_pruning::PruningSeed;
use crate::constants::MEDIUM_BAN;
use crate::{
block_downloader::{ChainSvcRequest, ChainSvcResponse},
constants::MEDIUM_BAN,
};
/// A new chain entry to add to our chain tracker.
#[derive(Debug)]
pub(crate) struct ChainEntry<N: NetworkZone> {
pub struct ChainEntry<N: NetworkZone> {
/// A list of block IDs.
pub ids: Vec<[u8; 32]>,
/// The peer who told us about this chain entry.
@ -39,12 +43,15 @@ pub(crate) struct BlocksToRetrieve<N: NetworkZone> {
}
/// An error returned from the [`ChainTracker`].
#[derive(Debug, Clone)]
#[derive(Debug)]
pub(crate) enum ChainTrackerError {
/// The new chain entry is invalid.
NewEntryIsInvalid,
NewEntryIsEmpty,
/// The new chain entry does not follow from the top of our chain tracker.
NewEntryDoesNotFollowChain,
#[expect(dead_code)] // This is used for logging
ChainSvcError(tower::BoxError),
}
/// # Chain Tracker
@ -52,8 +59,10 @@ pub(crate) enum ChainTrackerError {
/// This struct allows following a single chain. It takes in [`ChainEntry`]s and
/// allows getting [`BlocksToRetrieve`].
pub(crate) struct ChainTracker<N: NetworkZone> {
/// A list of [`ChainEntry`]s, in order.
entries: VecDeque<ChainEntry<N>>,
/// A list of [`ChainEntry`]s, in order, that we should request.
valid_entries: VecDeque<ChainEntry<N>>,
/// A list of [`ChainEntry`]s that are pending more [`ChainEntry`]s to check validity.
unknown_entries: VecDeque<ChainEntry<N>>,
/// The height of the first block, in the first entry in [`Self::entries`].
first_height: usize,
/// The hash of the last block in the last entry.
@ -66,23 +75,39 @@ pub(crate) struct ChainTracker<N: NetworkZone> {
impl<N: NetworkZone> ChainTracker<N> {
/// Creates a new chain tracker.
pub(crate) fn new(
pub(crate) async fn new<C>(
new_entry: ChainEntry<N>,
first_height: usize,
our_genesis: [u8; 32],
previous_hash: [u8; 32],
) -> Self {
our_chain_svc: &mut C,
) -> Result<Self, ChainTrackerError>
where
C: Service<ChainSvcRequest<N>, Response = ChainSvcResponse<N>, Error = tower::BoxError>,
{
let top_seen_hash = *new_entry.ids.last().unwrap();
let mut entries = VecDeque::with_capacity(1);
entries.push_back(new_entry);
Self {
entries,
let ChainSvcResponse::ValidateEntries { valid, unknown } = our_chain_svc
.ready()
.await
.map_err(ChainTrackerError::ChainSvcError)?
.call(ChainSvcRequest::ValidateEntries(entries, first_height))
.await
.map_err(ChainTrackerError::ChainSvcError)?
else {
unreachable!()
};
Ok(Self {
valid_entries: valid,
unknown_entries: unknown,
first_height,
top_seen_hash,
previous_hash,
our_genesis,
}
})
}
/// Returns `true` if the peer is expected to have the next block after our highest seen block
@ -99,8 +124,9 @@ impl<N: NetworkZone> ChainTracker<N> {
/// Returns the height of the highest block we are tracking.
pub(crate) fn top_height(&self) -> usize {
let top_block_idx = self
.entries
.valid_entries
.iter()
.chain(self.unknown_entries.iter())
.map(|entry| entry.ids.len())
.sum::<usize>();
@ -112,32 +138,32 @@ impl<N: NetworkZone> ChainTracker<N> {
/// # Panics
/// This function panics if `batch_size` is `0`.
pub(crate) fn block_requests_queued(&self, batch_size: usize) -> usize {
self.entries
self.valid_entries
.iter()
.map(|entry| entry.ids.len().div_ceil(batch_size))
.sum()
}
/// Attempts to add an incoming [`ChainEntry`] to the chain tracker.
pub(crate) fn add_entry(
pub(crate) async fn add_entry<C>(
&mut self,
mut chain_entry: ChainEntry<N>,
) -> Result<(), ChainTrackerError> {
if chain_entry.ids.is_empty() {
our_chain_svc: &mut C,
) -> Result<(), ChainTrackerError>
where
C: Service<ChainSvcRequest<N>, Response = ChainSvcResponse<N>, Error = tower::BoxError>,
{
if chain_entry.ids.len() == 1 {
return Err(ChainTrackerError::NewEntryIsEmpty);
}
let Some(first) = chain_entry.ids.first() else {
// The peer must send at lest one overlapping block.
chain_entry.handle.ban_peer(MEDIUM_BAN);
return Err(ChainTrackerError::NewEntryIsInvalid);
}
};
if chain_entry.ids.len() == 1 {
return Err(ChainTrackerError::NewEntryDoesNotFollowChain);
}
if self
.entries
.back()
.is_some_and(|last_entry| last_entry.ids.last().unwrap() != &chain_entry.ids[0])
{
if *first != self.top_seen_hash {
return Err(ChainTrackerError::NewEntryDoesNotFollowChain);
}
@ -150,7 +176,29 @@ impl<N: NetworkZone> ChainTracker<N> {
self.top_seen_hash = *new_entry.ids.last().unwrap();
self.entries.push_back(new_entry);
self.unknown_entries.push_back(new_entry);
let ChainSvcResponse::ValidateEntries { mut valid, unknown } = our_chain_svc
.ready()
.await
.map_err(ChainTrackerError::ChainSvcError)?
.call(ChainSvcRequest::ValidateEntries(
mem::take(&mut self.unknown_entries),
self.first_height
+ self
.valid_entries
.iter()
.map(|e| e.ids.len())
.sum::<usize>(),
))
.await
.map_err(ChainTrackerError::ChainSvcError)?
else {
unreachable!()
};
self.valid_entries.append(&mut valid);
self.unknown_entries = unknown;
Ok(())
}
@ -167,7 +215,7 @@ impl<N: NetworkZone> ChainTracker<N> {
return None;
}
let entry = self.entries.front_mut()?;
let entry = self.valid_entries.front_mut()?;
// Calculate the ending index for us to get in this batch, it will be one of these:
// - smallest out of `max_blocks`
@ -204,7 +252,7 @@ impl<N: NetworkZone> ChainTracker<N> {
self.previous_hash = blocks.ids[blocks.ids.len() - 1];
if entry.ids.is_empty() {
self.entries.pop_front();
self.valid_entries.pop_front();
}
Some(blocks)

View file

@ -84,7 +84,7 @@ pub async fn initial_chain_search<N: NetworkZone, C>(
mut our_chain_svc: C,
) -> Result<ChainTracker<N>, BlockDownloadError>
where
C: Service<ChainSvcRequest, Response = ChainSvcResponse, Error = tower::BoxError>,
C: Service<ChainSvcRequest<N>, Response = ChainSvcResponse<N>, Error = tower::BoxError>,
{
tracing::debug!("Getting our chain history");
// Get our history.
@ -214,7 +214,15 @@ where
first_entry.ids.len()
);
let tracker = ChainTracker::new(first_entry, expected_height, our_genesis, previous_id);
let tracker = ChainTracker::new(
first_entry,
expected_height,
our_genesis,
previous_id,
&mut our_chain_svc,
)
.await
.map_err(|_| BlockDownloadError::ChainInvalid)?;
Ok(tracker)
}

View file

@ -1,4 +1,5 @@
use std::{
collections::VecDeque,
fmt::{Debug, Formatter},
future::Future,
pin::Pin,
@ -269,8 +270,8 @@ struct OurChainSvc {
genesis: [u8; 32],
}
impl Service<ChainSvcRequest> for OurChainSvc {
type Response = ChainSvcResponse;
impl Service<ChainSvcRequest<ClearNet>> for OurChainSvc {
type Response = ChainSvcResponse<ClearNet>;
type Error = tower::BoxError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
@ -279,7 +280,7 @@ impl Service<ChainSvcRequest> for OurChainSvc {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: ChainSvcRequest) -> Self::Future {
fn call(&mut self, req: ChainSvcRequest<ClearNet>) -> Self::Future {
let genesis = self.genesis;
async move {
@ -292,6 +293,10 @@ impl Service<ChainSvcRequest> for OurChainSvc {
ChainSvcResponse::FindFirstUnknown(Some((1, 1)))
}
ChainSvcRequest::CumulativeDifficulty => ChainSvcResponse::CumulativeDifficulty(1),
ChainSvcRequest::ValidateEntries(valid, _) => ChainSvcResponse::ValidateEntries {
valid,
unknown: VecDeque::new(),
},
})
}
.boxed()

View file

@ -75,6 +75,9 @@ pub(crate) const MAX_DOWNLOAD_FAILURES: usize = 5;
/// The amount of empty chain entries to receive before we assume we have found the top of the chain.
pub(crate) const EMPTY_CHAIN_ENTRIES_BEFORE_TOP_ASSUMED: usize = 5;
/// The amount of most recent block batches we use to calculate batch size.
pub(crate) const MOST_RECENT_BATCH_WEIGHTS_FOR_BATCH_SIZE: usize = 100;
#[cfg(test)]
mod tests {
use super::*;

View file

@ -168,7 +168,7 @@ impl<N: NetworkZone> NetworkInterface<N> {
config: BlockDownloaderConfig,
) -> BufferStream<BlockBatch>
where
C: Service<ChainSvcRequest, Response = ChainSvcResponse, Error = tower::BoxError>
C: Service<ChainSvcRequest<N>, Response = ChainSvcResponse<N>, Error = tower::BoxError>
+ Send
+ 'static,
C::Future: Send + 'static,

View file

@ -12,6 +12,7 @@
use std::{
cmp::min,
collections::{HashMap, HashSet},
ops::Range,
sync::Arc,
};
@ -107,6 +108,7 @@ fn map_request(
R::BlockCompleteEntries(block_hashes) => block_complete_entries(env, block_hashes),
R::BlockExtendedHeader(block) => block_extended_header(env, block),
R::BlockHash(block, chain) => block_hash(env, block, chain),
R::BlockHashInRange(blocks, chain) => block_hash_in_range(env, blocks, chain),
R::FindBlock(block_hash) => find_block(env, block_hash),
R::FilterUnknownHashes(hashes) => filter_unknown_hashes(env, hashes),
R::BlockExtendedHeaderInRange(range, chain) => {
@ -271,6 +273,34 @@ fn block_hash(env: &ConcreteEnv, block_height: BlockHeight, chain: Chain) -> Res
Ok(BlockchainResponse::BlockHash(block_hash))
}
/// [`BlockchainReadRequest::BlockHashInRange`].
#[inline]
fn block_hash_in_range(env: &ConcreteEnv, range: Range<usize>, chain: Chain) -> ResponseResult {
// Prepare tx/tables in `ThreadLocal`.
let env_inner = env.env_inner();
let tx_ro = thread_local(env);
let block_hash = range
.into_par_iter()
.map(|block_height| {
let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
let table_block_infos = env_inner.open_db_ro::<BlockInfos>(tx_ro)?;
let block_hash = match chain {
Chain::Main => get_block_info(&block_height, &table_block_infos)?.block_hash,
Chain::Alt(chain) => {
get_alt_block_hash(&block_height, chain, &env_inner.open_tables(tx_ro)?)?
}
};
Ok(block_hash)
})
.collect::<Result<_, RuntimeError>>()?;
Ok(BlockchainResponse::BlockHashInRange(block_hash))
}
/// [`BlockchainReadRequest::FindBlock`]
fn find_block(env: &ConcreteEnv, block_hash: BlockHash) -> ResponseResult {
// Single-threaded, no `ThreadLocal` required.
@ -330,7 +360,7 @@ fn filter_unknown_hashes(env: &ConcreteEnv, mut hashes: HashSet<BlockHash>) -> R
#[inline]
fn block_extended_header_in_range(
env: &ConcreteEnv,
range: std::ops::Range<BlockHeight>,
range: Range<BlockHeight>,
chain: Chain,
) -> ResponseResult {
// Prepare tx/tables in `ThreadLocal`.

View file

@ -39,6 +39,7 @@ fn handle_blockchain_request(
) -> DbResult<BlockchainResponse> {
match req {
BlockchainWriteRequest::WriteBlock(block) => write_block(env, block),
BlockchainWriteRequest::BatchWriteBlocks(blocks) => write_blocks(env, blocks),
BlockchainWriteRequest::WriteAltBlock(alt_block) => write_alt_block(env, alt_block),
BlockchainWriteRequest::PopBlocks(numb_blocks) => pop_blocks(env, *numb_blocks),
BlockchainWriteRequest::ReverseReorg(old_main_chain_id) => {
@ -80,6 +81,33 @@ fn write_block(env: &ConcreteEnv, block: &VerifiedBlockInformation) -> ResponseR
}
}
/// [`BlockchainWriteRequest::BatchWriteBlocks`].
#[inline]
fn write_blocks(env: &ConcreteEnv, block: &Vec<VerifiedBlockInformation>) -> ResponseResult {
let env_inner = env.env_inner();
let tx_rw = env_inner.tx_rw()?;
let result = {
let mut tables_mut = env_inner.open_tables_mut(&tx_rw)?;
for block in block {
crate::ops::block::add_block(block, &mut tables_mut)?;
}
Ok(())
};
match result {
Ok(()) => {
TxRw::commit(tx_rw)?;
Ok(BlockchainResponse::Ok)
}
Err(e) => {
TxRw::abort(tx_rw).expect(TX_RW_ABORT_FAIL);
Err(e)
}
}
}
/// [`BlockchainWriteRequest::WriteAltBlock`].
#[inline]
fn write_alt_block(env: &ConcreteEnv, block: &AltBlockInformation) -> ResponseResult {

View file

@ -44,6 +44,11 @@ pub enum BlockchainReadRequest {
/// The input is the block's height and the chain it is on.
BlockHash(usize, Chain),
/// Request a range of block's hashes.
///
/// The input is the range of block heights and the chain it is on.
BlockHashInRange(Range<usize>, Chain),
/// Request to check if we have a block and which [`Chain`] it is on.
///
/// The input is the block's hash.
@ -173,6 +178,11 @@ pub enum BlockchainWriteRequest {
/// Input is an already verified block.
WriteBlock(VerifiedBlockInformation),
/// Request that a batch of blocks be written to the database.
///
/// Input is an already verified batch of blocks.
BatchWriteBlocks(Vec<VerifiedBlockInformation>),
/// Write an alternative block to the database,
///
/// Input is the alternative block.
@ -229,6 +239,11 @@ pub enum BlockchainResponse {
/// Inner value is the hash of the requested block.
BlockHash([u8; 32]),
/// Response to [`BlockchainReadRequest::BlockHashInRange`].
///
/// Inner value is the hashes of the requested blocks, in order.
BlockHashInRange(Vec<[u8; 32]>),
/// Response to [`BlockchainReadRequest::FindBlock`].
///
/// Inner value is the chain and height of the block if found.