handle more p2p requests + alt blocks
Some checks failed
Audit / audit (push) Has been cancelled
Deny / audit (push) Has been cancelled

This commit is contained in:
Boog900 2024-09-12 02:24:07 +01:00
parent d4e0e30133
commit 915633fe70
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
15 changed files with 602 additions and 110 deletions

1
Cargo.lock generated
View file

@ -531,6 +531,7 @@ version = "0.0.0"
dependencies = [ dependencies = [
"bitflags 2.5.0", "bitflags 2.5.0",
"bytemuck", "bytemuck",
"bytes",
"cuprate-database", "cuprate-database",
"cuprate-database-service", "cuprate-database-service",
"cuprate-helper", "cuprate-helper",

View file

@ -23,7 +23,7 @@ cuprate-p2p-core = { path = "../../p2p/p2p-core" }
cuprate-dandelion-tower = { path = "../../p2p/dandelion-tower" } cuprate-dandelion-tower = { path = "../../p2p/dandelion-tower" }
cuprate-async-buffer = { path = "../../p2p/async-buffer" } cuprate-async-buffer = { path = "../../p2p/async-buffer" }
cuprate-address-book = { path = "../../p2p/address-book" } cuprate-address-book = { path = "../../p2p/address-book" }
cuprate-blockchain = { path = "../../storage/blockchain" } cuprate-blockchain = { path = "../../storage/blockchain", features = ["service"] }
cuprate-database-service = { path = "../../storage/service" } cuprate-database-service = { path = "../../storage/service" }
cuprate-txpool = { path = "../../storage/txpool" } cuprate-txpool = { path = "../../storage/txpool" }
cuprate-database = { path = "../../storage/database" } cuprate-database = { path = "../../storage/database" }

View file

@ -1,17 +1,27 @@
mod batch_handler; mod batch_handler;
mod handler;
use crate::blockchain::manager::batch_handler::handle_incoming_block_batch;
use crate::blockchain::types::ConsensusBlockchainReadHandle; use crate::blockchain::types::ConsensusBlockchainReadHandle;
use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle}; use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle};
use cuprate_consensus::{BlockChainContextService, BlockVerifierService, TxVerifierService}; use cuprate_consensus::context::RawBlockChainContext;
use cuprate_consensus::{
BlockChainContextRequest, BlockChainContextResponse, BlockChainContextService,
BlockVerifierService, ExtendedConsensusError, TxVerifierService, VerifyBlockRequest,
VerifyBlockResponse, VerifyTxRequest, VerifyTxResponse,
};
use cuprate_p2p::block_downloader::BlockBatch; use cuprate_p2p::block_downloader::BlockBatch;
use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse};
use cuprate_types::Chain;
use futures::StreamExt; use futures::StreamExt;
use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::Receiver;
use tower::{Service, ServiceExt};
use tracing::error;
pub struct BlockchainManager { pub struct BlockchainManager {
blockchain_write_handle: BlockchainWriteHandle, blockchain_write_handle: BlockchainWriteHandle,
blockchain_read_handle: BlockchainReadHandle, blockchain_read_handle: BlockchainReadHandle,
blockchain_context_service: BlockChainContextService, blockchain_context_service: BlockChainContextService,
cached_blockchain_context: RawBlockChainContext,
block_verifier_service: BlockVerifierService< block_verifier_service: BlockVerifierService<
BlockChainContextService, BlockChainContextService,
TxVerifierService<ConsensusBlockchainReadHandle>, TxVerifierService<ConsensusBlockchainReadHandle>,
@ -34,20 +44,86 @@ impl BlockchainManager {
blockchain_write_handle, blockchain_write_handle,
blockchain_read_handle, blockchain_read_handle,
blockchain_context_service, blockchain_context_service,
cached_blockchain_context: todo!(),
block_verifier_service, block_verifier_service,
} }
} }
async fn handle_incoming_main_chain_batch(
&mut self,
batch: BlockBatch,
) -> Result<(), anyhow::Error> {
let VerifyBlockResponse::MainChainBatchPrepped(prepped) = self
.block_verifier_service
.ready()
.await
.expect("TODO")
.call(VerifyBlockRequest::MainChainBatchPrepareBlocks {
blocks: batch.blocks,
})
.await?
else {
panic!("Incorrect response!");
};
for (block, txs) in prepped {
let VerifyBlockResponse::MainChain(verified_block) = block_verifier_service
.ready()
.await
.expect("TODO")
.call(VerifyBlockRequest::MainChainPrepped { block, txs })
.await
.unwrap()
else {
panic!("Incorrect response!");
};
blockchain_context_service
.ready()
.await
.expect("TODO")
.call(BlockChainContextRequest::Update(NewBlockData {
block_hash: verified_block.block_hash,
height: verified_block.height,
timestamp: verified_block.block.header.timestamp,
weight: verified_block.weight,
long_term_weight: verified_block.long_term_weight,
generated_coins: verified_block.generated_coins,
vote: HardFork::from_vote(verified_block.block.header.hardfork_signal),
cumulative_difficulty: verified_block.cumulative_difficulty,
}))
.await
.expect("TODO");
blockchain_write_handle
.ready()
.await
.expect("TODO")
.call(BlockchainWriteRequest::WriteBlock(verified_block))
.await
.expect("TODO");
}
}
async fn handle_incoming_block_batch(&mut self, batch: BlockBatch) {
let (first_block, _) = batch
.blocks
.first()
.expect("Block batch should not be empty");
if first_block.header.previous == self.cached_blockchain_context.top_hash {
todo!("Main chain")
} else {
todo!("Alt chain")
}
}
pub async fn run(mut self, mut batch_rx: Receiver<BlockBatch>) { pub async fn run(mut self, mut batch_rx: Receiver<BlockBatch>) {
loop { loop {
tokio::select! { tokio::select! {
Some(batch) = batch_rx.recv() => { Some(batch) = batch_rx.recv() => {
handle_incoming_block_batch( self.handle_incoming_block_batch(
batch, batch,
&mut self.block_verifier_service,
&mut self.blockchain_context_service,
&mut self.blockchain_read_handle,
&mut self.blockchain_write_handle
).await; ).await;
} }
else => { else => {

View file

@ -3,6 +3,7 @@
use crate::blockchain::types::ConsensusBlockchainReadHandle; use crate::blockchain::types::ConsensusBlockchainReadHandle;
use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle}; use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle};
use cuprate_consensus::context::NewBlockData; use cuprate_consensus::context::NewBlockData;
use cuprate_consensus::transactions::new_tx_verification_data;
use cuprate_consensus::{ use cuprate_consensus::{
BlockChainContextRequest, BlockChainContextResponse, BlockChainContextService, BlockChainContextRequest, BlockChainContextResponse, BlockChainContextService,
BlockVerifierService, BlockchainReadRequest, BlockchainResponse, ExtendedConsensusError, BlockVerifierService, BlockchainReadRequest, BlockchainResponse, ExtendedConsensusError,
@ -11,82 +12,17 @@ use cuprate_consensus::{
use cuprate_p2p::block_downloader::BlockBatch; use cuprate_p2p::block_downloader::BlockBatch;
use cuprate_types::blockchain::BlockchainWriteRequest; use cuprate_types::blockchain::BlockchainWriteRequest;
use cuprate_types::{Chain, HardFork}; use cuprate_types::{Chain, HardFork};
use rayon::prelude::*;
use tower::{Service, ServiceExt}; use tower::{Service, ServiceExt};
use tracing::{debug, error, info}; use tracing::{debug, error, info};
pub async fn handle_incoming_block_batch<C, TxV>(
batch: BlockBatch,
block_verifier_service: &mut BlockVerifierService<C, TxV, ConsensusBlockchainReadHandle>,
blockchain_context_service: &mut C,
blockchain_read_handle: &mut BlockchainReadHandle,
blockchain_write_handle: &mut BlockchainWriteHandle,
) where
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Clone
+ Send
+ 'static,
C::Future: Send + 'static,
TxV: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ExtendedConsensusError>
+ Clone
+ Send
+ 'static,
TxV::Future: Send + 'static,
{
let (first_block, _) = batch
.blocks
.first()
.expect("Block batch should not be empty");
handle_incoming_block_batch_main_chain(
batch,
block_verifier_service,
blockchain_context_service,
blockchain_write_handle,
)
.await;
// TODO: alt block to the DB
/*
match blockchain_read_handle
.oneshot(BlockchainReadRequest::FindBlock(
first_block.header.previous,
))
.await
{
Err(_) | Ok(BlockchainResponse::FindBlock(None)) => {
// The block downloader shouldn't be downloading orphan blocks
error!("Failed to find parent block for first block in batch.");
return;
}
Ok(BlockchainResponse::FindBlock(Some((chain, _)))) => match chain {
Chain::Main => {
handle_incoming_block_batch_main_chain(
batch,
block_verifier_service,
blockchain_context_service,
blockchain_write_handle,
)
.await;
}
Chain::Alt(_) => todo!(),
},
Ok(_) => panic!("Blockchain service returned incorrect response"),
}
*/
}
async fn handle_incoming_block_batch_main_chain<C, TxV>( async fn handle_incoming_block_batch_main_chain<C, TxV>(
batch: BlockBatch, batch: BlockBatch,
block_verifier_service: &mut BlockVerifierService<C, TxV, ConsensusBlockchainReadHandle>, block_verifier_service: &mut BlockVerifierService<C, TxV, ConsensusBlockchainReadHandle>,
blockchain_context_service: &mut C, blockchain_context_service: &mut C,
blockchain_write_handle: &mut BlockchainWriteHandle, blockchain_write_handle: &mut BlockchainWriteHandle,
) where ) -> Result<(), anyhow::Error>
where
C: Service< C: Service<
BlockChainContextRequest, BlockChainContextRequest,
Response = BlockChainContextResponse, Response = BlockChainContextResponse,
@ -114,8 +50,7 @@ async fn handle_incoming_block_batch_main_chain<C, TxV>(
.call(VerifyBlockRequest::MainChainBatchPrepareBlocks { .call(VerifyBlockRequest::MainChainBatchPrepareBlocks {
blocks: batch.blocks, blocks: batch.blocks,
}) })
.await .await?
.unwrap()
else { else {
panic!("Incorrect response!"); panic!("Incorrect response!");
}; };
@ -126,8 +61,7 @@ async fn handle_incoming_block_batch_main_chain<C, TxV>(
.await .await
.expect("TODO") .expect("TODO")
.call(VerifyBlockRequest::MainChainPrepped { block, txs }) .call(VerifyBlockRequest::MainChainPrepped { block, txs })
.await .await?
.unwrap()
else { else {
panic!("Incorrect response!"); panic!("Incorrect response!");
}; };
@ -158,3 +92,30 @@ async fn handle_incoming_block_batch_main_chain<C, TxV>(
.expect("TODO"); .expect("TODO");
} }
} }
async fn handle_incoming_block_batch_alt_chain<C, TxV>(
batch: BlockBatch,
block_verifier_service: &mut BlockVerifierService<C, TxV, ConsensusBlockchainReadHandle>,
blockchain_context_service: &mut C,
blockchain_write_handle: &mut BlockchainWriteHandle,
) -> Result<(), anyhow::Error>
where
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Clone
+ Send
+ 'static,
C::Future: Send + 'static,
TxV: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ExtendedConsensusError>
+ Clone
+ Send
+ 'static,
TxV::Future: Send + 'static,
{
for (block, txs) in batch.blocks {
alt_block_info.cumulative_difficulty
}
}

View file

@ -0,0 +1,163 @@
use crate::blockchain::types::ConsensusBlockchainReadHandle;
use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle};
use cuprate_consensus::transactions::new_tx_verification_data;
use cuprate_consensus::{
BlockChainContextRequest, BlockChainContextResponse, BlockVerifierService,
ExtendedConsensusError, VerifyBlockRequest, VerifyBlockResponse, VerifyTxRequest,
VerifyTxResponse,
};
use cuprate_p2p::block_downloader::BlockBatch;
use cuprate_types::blockchain::{
BlockchainReadRequest, BlockchainResponse, BlockchainWriteRequest,
};
use cuprate_types::AltBlockInformation;
use monero_serai::block::Block;
use monero_serai::transaction::Transaction;
use rayon::prelude::*;
use tower::{Service, ServiceExt};
async fn handle_incoming_alt_block<C, TxV>(
block: Block,
txs: Vec<Transaction>,
current_cumulative_difficulty: u128,
block_verifier_service: &mut BlockVerifierService<C, TxV, ConsensusBlockchainReadHandle>,
blockchain_context_service: &mut C,
blockchain_write_handle: &mut BlockchainWriteHandle,
) -> Result<(), anyhow::Error>
where
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Clone
+ Send
+ 'static,
C::Future: Send + 'static,
TxV: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ExtendedConsensusError>
+ Clone
+ Send
+ 'static,
TxV::Future: Send + 'static,
{
let prepared_txs = txs
.into_par_iter()
.map(|tx| {
let tx = new_tx_verification_data(tx)?;
(tx.tx_hash, tx)
})
.collect::<Result<_, _>>()?;
let VerifyBlockResponse::AltChain(alt_block_info) = block_verifier_service
.ready()
.await
.expect("TODO")
.call(VerifyBlockRequest::AltChain {
block,
prepared_txs,
})
.await?
else {
panic!("Incorrect response!");
};
if alt_block_info.cumulative_difficulty > current_cumulative_difficulty {
todo!("do re-org");
}
blockchain_write_handle
.ready()
.await
.expect("TODO")
.call(BlockchainWriteRequest::WriteAltBlock(alt_block_info))?;
Ok(())
}
async fn try_do_reorg<C, TxV>(
top_alt_block: AltBlockInformation,
chain_height: usize,
block_verifier_service: &mut BlockVerifierService<C, TxV, ConsensusBlockchainReadHandle>,
blockchain_context_service: &mut C,
blockchain_write_handle: &mut BlockchainWriteHandle,
blockchain_read_handle: &mut BlockchainReadHandle,
) -> Result<(), anyhow::Error>
where
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Clone
+ Send
+ 'static,
C::Future: Send + 'static,
TxV: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ExtendedConsensusError>
+ Clone
+ Send
+ 'static,
TxV::Future: Send + 'static,
{
let BlockchainResponse::AltBlocksInChain(mut alt_blocks) = blockchain_read_handle
.ready()
.await
.expect("TODO")
.call(BlockchainReadRequest::AltBlocksInChain(
top_alt_block.chain_id,
))
.await?
else {
panic!("Incorrect response!");
};
alt_blocks.push(top_alt_block);
let split_height = alt_blocks[0].height;
let BlockchainResponse::PopBlocks(old_main_chain_id) = blockchain_write_handle
.ready()
.await
.expect("TODO")
.call(BlockchainWriteRequest::PopBlocks(
chain_height - split_height + 1,
))
.await?
else {
panic!("Incorrect response!");
};
todo!()
}
async fn verify_add_alt_blocks_to_main_chain<C, TxV>(
alt_blocks: Vec<AltBlockInformation>,
block_verifier_service: &mut BlockVerifierService<C, TxV, ConsensusBlockchainReadHandle>,
blockchain_context_service: &mut C,
blockchain_write_handle: &mut BlockchainWriteHandle,
) -> Result<(), anyhow::Error>
where
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Clone
+ Send
+ 'static,
C::Future: Send + 'static,
TxV: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ExtendedConsensusError>
+ Clone
+ Send
+ 'static,
TxV::Future: Send + 'static,
{
let VerifyBlockResponse::AltChain(alt_block_info) = block_verifier_service
.ready()
.await
.expect("TODO")
.call(VerifyBlockRequest::MainChainPrepped { block, txs })
.await?
else {
panic!("Incorrect response!");
};
}

View file

@ -42,7 +42,9 @@ fn main() {
.unwrap(); .unwrap();
let net = cuprate_p2p::initialize_network( let net = cuprate_p2p::initialize_network(
p2p::request_handler::P2pProtocolRequestHandler, p2p::request_handler::P2pProtocolRequestHandler {
blockchain_read_handle: bc_read_handle.clone(),
},
p2p::core_sync_svc::CoreSyncService(context_svc.clone()), p2p::core_sync_svc::CoreSyncService(context_svc.clone()),
config.clearnet_config(), config.clearnet_config(),
) )

View file

@ -1,12 +1,23 @@
use bytes::Bytes;
use cuprate_p2p_core::{ProtocolRequest, ProtocolResponse}; use cuprate_p2p_core::{ProtocolRequest, ProtocolResponse};
use futures::future::BoxFuture; use futures::future::BoxFuture;
use futures::FutureExt; use futures::FutureExt;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use tower::Service; use tower::{Service, ServiceExt};
use tracing::trace; use tracing::trace;
use cuprate_blockchain::service::BlockchainReadHandle;
use cuprate_helper::cast::usize_to_u64;
use cuprate_helper::map::split_u128_into_low_high_bits;
use cuprate_p2p::constants::{MAX_BLOCKCHAIN_SUPPLEMENT_LEN, MAX_BLOCK_BATCH_LEN};
use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse};
use cuprate_types::BlockCompleteEntry;
use cuprate_wire::protocol::{ChainRequest, ChainResponse, GetObjectsRequest, GetObjectsResponse};
#[derive(Clone)] #[derive(Clone)]
pub struct P2pProtocolRequestHandler; pub struct P2pProtocolRequestHandler {
pub(crate) blockchain_read_handle: BlockchainReadHandle,
}
impl Service<ProtocolRequest> for P2pProtocolRequestHandler { impl Service<ProtocolRequest> for P2pProtocolRequestHandler {
type Response = ProtocolResponse; type Response = ProtocolResponse;
@ -19,15 +30,98 @@ impl Service<ProtocolRequest> for P2pProtocolRequestHandler {
fn call(&mut self, req: ProtocolRequest) -> Self::Future { fn call(&mut self, req: ProtocolRequest) -> Self::Future {
match req { match req {
ProtocolRequest::GetObjects(_) => trace!("TODO: GetObjects"), ProtocolRequest::GetObjects(req) => {
ProtocolRequest::GetChain(_) => trace!("TODO: GetChain"), get_objects(self.blockchain_read_handle.clone(), req).boxed()
ProtocolRequest::FluffyMissingTxs(_) => trace!("TODO: FluffyMissingTxs"), }
ProtocolRequest::GetTxPoolCompliment(_) => trace!("TODO: GetTxPoolCompliment"), ProtocolRequest::GetChain(req) => {
ProtocolRequest::NewBlock(_) => trace!("TODO: NewBlock"), get_chain(self.blockchain_read_handle.clone(), req).boxed()
ProtocolRequest::NewFluffyBlock(_) => trace!("TODO: NewFluffyBlock"), }
ProtocolRequest::NewTransactions(_) => trace!("TODO: NewTransactions"), ProtocolRequest::FluffyMissingTxs(_) => async { Ok(ProtocolResponse::NA) }.boxed(),
ProtocolRequest::GetTxPoolCompliment(_) => async { Ok(ProtocolResponse::NA) }.boxed(),
ProtocolRequest::NewBlock(_) => async { Ok(ProtocolResponse::NA) }.boxed(),
ProtocolRequest::NewFluffyBlock(_) => async { Ok(ProtocolResponse::NA) }.boxed(),
ProtocolRequest::NewTransactions(_) => async { Ok(ProtocolResponse::NA) }.boxed(),
} }
async { Ok(ProtocolResponse::NA) }.boxed()
} }
} }
async fn get_objects(
blockchain_read_handle: BlockchainReadHandle,
req: GetObjectsRequest,
) -> Result<ProtocolResponse, tower::BoxError> {
if req.blocks.is_empty() {
Err("No blocks requested in a GetObjectsRequest")?;
}
if req.blocks.len() > MAX_BLOCK_BATCH_LEN {
Err("Too many blocks requested in a GetObjectsRequest")?;
}
let block_ids: Vec<[u8; 32]> = (&req.blocks).into();
// de-allocate the backing [`Bytes`]
drop(req);
let res = blockchain_read_handle
.oneshot(BlockchainReadRequest::BlockCompleteEntries(block_ids))
.await?;
let BlockchainResponse::BlockCompleteEntries {
blocks,
missed_ids,
current_blockchain_height,
} = res
else {
panic!("Blockchain service returned wrong response!");
};
Ok(ProtocolResponse::GetObjects(GetObjectsResponse {
blocks,
missed_ids: missed_ids.into(),
current_blockchain_height: usize_to_u64(current_blockchain_height),
}))
}
async fn get_chain(
blockchain_read_handle: BlockchainReadHandle,
req: ChainRequest,
) -> Result<ProtocolResponse, tower::BoxError> {
if req.block_ids.is_empty() {
Err("No block hashes sent in a `ChainRequest`")?;
}
if req.block_ids.len() > MAX_BLOCKCHAIN_SUPPLEMENT_LEN {
Err("Too many block hashes in a `ChainRequest`")?;
}
let block_ids: Vec<[u8; 32]> = (&req.block_ids).into();
// de-allocate the backing [`Bytes`]
drop(req);
let res = blockchain_read_handle
.oneshot(BlockchainReadRequest::NextMissingChainEntry(block_ids))
.await?;
let BlockchainResponse::NextMissingChainEntry {
next_entry,
first_missing_block,
start_height,
chain_height,
cumulative_difficulty,
} = res
else {
panic!("Blockchain service returned wrong response!");
};
let (cumulative_difficulty_low64, cumulative_difficulty_top64) =
split_u128_into_low_high_bits(cumulative_difficulty);
Ok(ProtocolResponse::GetChain(ChainResponse {
start_height: usize_to_u64(start_height),
total_height: usize_to_u64(chain_height),
cumulative_difficulty_low64,
cumulative_difficulty_top64,
m_block_ids: next_entry.into(),
m_block_weights: vec![],
first_block: first_missing_block.map_or(Bytes::new(), Bytes::from),
}))
}

View file

@ -8,6 +8,7 @@ use std::{
}; };
use futures::FutureExt; use futures::FutureExt;
use monero_serai::generators::H;
use monero_serai::{ use monero_serai::{
block::Block, block::Block,
transaction::{Input, Transaction}, transaction::{Input, Transaction},
@ -183,6 +184,19 @@ impl PreparedBlock {
block: block.block, block: block.block,
}) })
} }
pub fn new_alt_block(block: AltBlockInformation) -> Result<PreparedBlock, ConsensusError> {
Ok(PreparedBlock {
block_blob: block.block_blob,
hf_vote: HardFork::from_version(block.block.header.hardfork_version)
.map_err(|_| BlockError::HardForkError(HardForkError::HardForkUnknown))?,
hf_version: HardFork::from_vote(block.block.header.hardfork_signal),
block_hash: block.block_hash,
pow_hash: block.pow_hash,
miner_tx_weight: block.block.miner_transaction.weight(),
block: block.block,
})
}
} }
/// A request to verify a block. /// A request to verify a block.

View file

@ -88,14 +88,19 @@ mod sealed {
/// An internal trait for the address book for a [`NetworkZone`] that adds the requirement of [`borsh`] traits /// An internal trait for the address book for a [`NetworkZone`] that adds the requirement of [`borsh`] traits
/// onto the network address. /// onto the network address.
pub trait BorshNetworkZone: NetworkZone<Addr = Self::BorshAddr> { pub trait BorshNetworkZone:
type BorshAddr: NetZoneAddress + borsh::BorshDeserialize + borsh::BorshSerialize; NetworkZone<
Addr: NetZoneAddress<BanID: borsh::BorshDeserialize + borsh::BorshSerialize>
+ borsh::BorshDeserialize
+ borsh::BorshSerialize,
>
{
} }
impl<T: NetworkZone> BorshNetworkZone for T impl<T: NetworkZone> BorshNetworkZone for T
where where
T::Addr: borsh::BorshDeserialize + borsh::BorshSerialize, T::Addr: borsh::BorshDeserialize + borsh::BorshSerialize,
<T::Addr as NetZoneAddress>::BanID: borsh::BorshDeserialize + borsh::BorshSerialize,
{ {
type BorshAddr = T::Addr;
} }
} }

View file

@ -46,7 +46,12 @@ pub(crate) const INITIAL_CHAIN_REQUESTS_TO_SEND: usize = 3;
/// The enforced maximum amount of blocks to request in a batch. /// The enforced maximum amount of blocks to request in a batch.
/// ///
/// Requesting more than this will cause the peer to disconnect and potentially lead to bans. /// Requesting more than this will cause the peer to disconnect and potentially lead to bans.
pub(crate) const MAX_BLOCK_BATCH_LEN: usize = 100; pub const MAX_BLOCK_BATCH_LEN: usize = 100;
/// The enforced maximum amount of block hashes in a blockchain supplement request.
///
/// Requesting more than this might cause the peer to disconnect and potentially lead to bans.
pub const MAX_BLOCKCHAIN_SUPPLEMENT_LEN: usize = 250;
/// The timeout that the block downloader will use for requests. /// The timeout that the block downloader will use for requests.
pub(crate) const BLOCK_DOWNLOADER_REQUEST_TIMEOUT: Duration = Duration::from_secs(30); pub(crate) const BLOCK_DOWNLOADER_REQUEST_TIMEOUT: Duration = Duration::from_secs(30);

View file

@ -26,7 +26,7 @@ mod broadcast;
mod client_pool; mod client_pool;
pub mod config; pub mod config;
pub mod connection_maintainer; pub mod connection_maintainer;
mod constants; pub mod constants;
mod inbound_server; mod inbound_server;
mod sync_states; mod sync_states;

View file

@ -38,6 +38,7 @@ serde = { workspace = true, optional = true }
tower = { workspace = true } tower = { workspace = true }
thread_local = { workspace = true, optional = true } thread_local = { workspace = true, optional = true }
rayon = { workspace = true, optional = true } rayon = { workspace = true, optional = true }
bytes = "1.6.0"
[dev-dependencies] [dev-dependencies]
cuprate-helper = { path = "../../helper", features = ["thread", "cast"] } cuprate-helper = { path = "../../helper", features = ["thread", "cast"] }

View file

@ -2,20 +2,23 @@
//---------------------------------------------------------------------------------------------------- Import //---------------------------------------------------------------------------------------------------- Import
use bytemuck::TransparentWrapper; use bytemuck::TransparentWrapper;
use monero_serai::block::{Block, BlockHeader}; use bytes::Bytes;
use cuprate_database::{ use cuprate_database::{
RuntimeError, StorableVec, {DatabaseRo, DatabaseRw}, RuntimeError, StorableVec, {DatabaseIter, DatabaseRo, DatabaseRw},
}; };
use cuprate_helper::cast::usize_to_u64;
use cuprate_helper::{ use cuprate_helper::{
map::{combine_low_high_bits_to_u128, split_u128_into_low_high_bits}, map::{combine_low_high_bits_to_u128, split_u128_into_low_high_bits},
tx_utils::tx_fee, tx_utils::tx_fee,
}; };
use cuprate_types::{ use cuprate_types::{
AltBlockInformation, ChainId, ExtendedBlockHeader, HardFork, VerifiedBlockInformation, AltBlockInformation, BlockCompleteEntry, ChainId, ExtendedBlockHeader, HardFork,
VerifiedTransactionInformation, TransactionBlobs, VerifiedBlockInformation, VerifiedTransactionInformation,
}; };
use monero_serai::block::{Block, BlockHeader};
use crate::tables::TablesIter;
use crate::{ use crate::{
ops::{ ops::{
alt_block, alt_block,
@ -256,6 +259,40 @@ pub fn get_block_extended_header_top(
Ok((header, height)) Ok((header, height))
} }
//---------------------------------------------------------------------------------------------------- `get_block_complete_entry`
pub fn get_block_complete_entry(
block_hash: &BlockHash,
tables: &impl TablesIter,
) -> Result<BlockCompleteEntry, RuntimeError> {
let height = tables.block_heights().get(block_hash)?;
let block_blob = tables.block_blobs().get(&height)?.0;
let block = Block::read(&mut block_blob.as_slice()).expect("Valid block failed to be read");
let txs = if let Some(first_tx) = block.transactions.first() {
let first_tx_idx = tables.tx_ids().get(first_tx)?;
let end_tx_idx = first_tx_idx + usize_to_u64(block.transactions.len());
let tx_blobs = tables.tx_blobs_iter().get_range(first_tx_idx..end_tx_idx)?;
tx_blobs
.map(|res| Ok(Bytes::from(res?.0)))
.collect::<Result<_, RuntimeError>>()?
} else {
vec![]
};
Ok(BlockCompleteEntry {
block: Bytes::from(block_blob),
txs: TransactionBlobs::Normal(txs),
pruned: false,
// This is only needed when pruned.
block_weight: 0,
})
}
//---------------------------------------------------------------------------------------------------- Misc //---------------------------------------------------------------------------------------------------- Misc
/// Retrieve a [`BlockInfo`] via its [`BlockHeight`]. /// Retrieve a [`BlockInfo`] via its [`BlockHeight`].
#[doc = doc_error!()] #[doc = doc_error!()]

View file

@ -1,19 +1,20 @@
//! Database reader thread-pool definitions and logic. //! Database reader thread-pool definitions and logic.
//---------------------------------------------------------------------------------------------------- Import //---------------------------------------------------------------------------------------------------- Import
use rayon::{
iter::{Either, IntoParallelIterator, ParallelIterator},
prelude::*,
ThreadPool,
};
use std::cmp::min;
use std::ops::Index;
use std::{ use std::{
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
sync::Arc, sync::Arc,
}; };
use rayon::{
iter::{IntoParallelIterator, ParallelIterator},
prelude::*,
ThreadPool,
};
use thread_local::ThreadLocal; use thread_local::ThreadLocal;
use cuprate_database::{ConcreteEnv, DatabaseRo, Env, EnvInner, RuntimeError}; use cuprate_database::{ConcreteEnv, DatabaseIter, DatabaseRo, Env, EnvInner, RuntimeError};
use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads}; use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads};
use cuprate_helper::map::combine_low_high_bits_to_u128; use cuprate_helper::map::combine_low_high_bits_to_u128;
use cuprate_types::{ use cuprate_types::{
@ -21,6 +22,8 @@ use cuprate_types::{
Chain, ChainId, ExtendedBlockHeader, OutputOnChain, Chain, ChainId, ExtendedBlockHeader, OutputOnChain,
}; };
use crate::ops::block::get_block_complete_entry;
use crate::tables::{BlockBlobs, TxIds};
use crate::{ use crate::{
ops::{ ops::{
alt_block::{ alt_block::{
@ -92,6 +95,7 @@ fn map_request(
/* SOMEDAY: pre-request handling, run some code for each request? */ /* SOMEDAY: pre-request handling, run some code for each request? */
match request { match request {
R::BlockCompleteEntries(blocks) => block_complete_entries(env, blocks),
R::BlockExtendedHeader(block) => block_extended_header(env, block), R::BlockExtendedHeader(block) => block_extended_header(env, block),
R::BlockHash(block, chain) => block_hash(env, block, chain), R::BlockHash(block, chain) => block_hash(env, block, chain),
R::FindBlock(block_hash) => find_block(env, block_hash), R::FindBlock(block_hash) => find_block(env, block_hash),
@ -106,6 +110,7 @@ fn map_request(
R::KeyImagesSpent(set) => key_images_spent(env, set), R::KeyImagesSpent(set) => key_images_spent(env, set),
R::CompactChainHistory => compact_chain_history(env), R::CompactChainHistory => compact_chain_history(env),
R::FindFirstUnknown(block_ids) => find_first_unknown(env, &block_ids), R::FindFirstUnknown(block_ids) => find_first_unknown(env, &block_ids),
R::NextMissingChainEntry(block_hashes) => next_missing_chain_entry(env, block_hashes),
R::AltBlocksInChain(chain_id) => alt_blocks_in_chain(env, chain_id), R::AltBlocksInChain(chain_id) => alt_blocks_in_chain(env, chain_id),
} }
@ -180,9 +185,41 @@ macro_rules! get_tables {
// FIXME: implement multi-transaction read atomicity. // FIXME: implement multi-transaction read atomicity.
// <https://github.com/Cuprate/cuprate/pull/113#discussion_r1576874589>. // <https://github.com/Cuprate/cuprate/pull/113#discussion_r1576874589>.
// TODO: The overhead of parallelism may be too much for every request, perfomace test to find optimal // TODO: The overhead of parallelism may be too much for every request, performance test to find optimal
// amount of parallelism. // amount of parallelism.
/// [`BlockchainReadRequest::BlockCompleteEntries`].
fn block_complete_entries(env: &ConcreteEnv, block_hashes: Vec<BlockHash>) -> ResponseResult {
// Prepare tx/tables in `ThreadLocal`.
let env_inner = env.env_inner();
let tx_ro = thread_local(env);
let tables = thread_local(env);
let (missed_ids, blocks) = block_hashes
.into_par_iter()
.map(|block_hash| {
let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
match get_block_complete_entry(&block_hash, tables) {
Err(RuntimeError::KeyNotFound) => Ok(Either::Left(block_hash)),
res => res.map(Either::Right),
}
})
.collect::<Result<_, _>>()?;
let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
let chain_height = crate::ops::blockchain::chain_height(tables.block_heights())?;
Ok(BlockchainResponse::BlockCompleteEntries {
blocks,
missed_ids,
current_blockchain_height: chain_height,
})
}
/// [`BlockchainReadRequest::BlockExtendedHeader`]. /// [`BlockchainReadRequest::BlockExtendedHeader`].
#[inline] #[inline]
fn block_extended_header(env: &ConcreteEnv, block_height: BlockHeight) -> ResponseResult { fn block_extended_header(env: &ConcreteEnv, block_height: BlockHeight) -> ResponseResult {
@ -556,6 +593,62 @@ fn find_first_unknown(env: &ConcreteEnv, block_ids: &[BlockHash]) -> ResponseRes
}) })
} }
/// [`BlockchainReadRequest::NextMissingChainEntry`]
fn next_missing_chain_entry(env: &ConcreteEnv, block_hashes: Vec<[u8; 32]>) -> ResponseResult {
let env_inner = env.env_inner();
let tx_ro = env_inner.tx_ro()?;
let table_block_heights = env_inner.open_db_ro::<BlockHeights>(&tx_ro)?;
let table_block_infos = env_inner.open_db_ro::<BlockInfos>(&tx_ro)?;
let (top_block_height, top_block_info) = table_block_infos.last()?;
let mut start_height = 0;
for block_hash in block_hashes {
match table_block_heights.get(&block_hash) {
Ok(height) => {
start_height = height;
break;
}
Err(RuntimeError::KeyNotFound) => continue,
Err(e) => return Err(e),
}
}
let table_block_infos = env_inner.open_db_ro::<BlockInfos>(&tx_ro)?;
const DEFAULT_CHAIN_ENTRY_SIZE: usize = 10_000;
let end_height = min(
start_height + DEFAULT_CHAIN_ENTRY_SIZE,
top_block_height + 1,
);
let block_hashes: Vec<_> = table_block_infos
.get_range(start_height..end_height)?
.map(|block_info| Ok(block_info?.block_hash))
.collect::<Result<_, _>>()?;
let first_missing_block = if block_hashes.len() > 1 {
let table_block_blobs = env_inner.open_db_ro::<BlockBlobs>(&tx_ro)?;
Some(table_block_blobs.get(&(start_height + 1))?.0)
} else {
None
};
Ok(BlockchainResponse::NextMissingChainEntry {
next_entry: block_hashes,
first_missing_block,
start_height,
chain_height: top_block_height + 1,
cumulative_difficulty: combine_low_high_bits_to_u128(
top_block_info.cumulative_difficulty_low,
top_block_info.cumulative_difficulty_high,
),
})
}
/// [`BlockchainReadRequest::AltBlocksInChain`] /// [`BlockchainReadRequest::AltBlocksInChain`]
fn alt_blocks_in_chain(env: &ConcreteEnv, chain_id: ChainId) -> ResponseResult { fn alt_blocks_in_chain(env: &ConcreteEnv, chain_id: ChainId) -> ResponseResult {
// Prepare tx/tables in `ThreadLocal`. // Prepare tx/tables in `ThreadLocal`.

View file

@ -11,7 +11,7 @@ use std::{
use crate::{ use crate::{
types::{Chain, ExtendedBlockHeader, OutputOnChain, VerifiedBlockInformation}, types::{Chain, ExtendedBlockHeader, OutputOnChain, VerifiedBlockInformation},
AltBlockInformation, ChainId, AltBlockInformation, BlockCompleteEntry, ChainId,
}; };
//---------------------------------------------------------------------------------------------------- ReadRequest //---------------------------------------------------------------------------------------------------- ReadRequest
@ -25,6 +25,11 @@ use crate::{
/// See `Response` for the expected responses per `Request`. /// See `Response` for the expected responses per `Request`.
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub enum BlockchainReadRequest { pub enum BlockchainReadRequest {
/// Request for [`BlockCompleteEntry`]s.
///
/// The input is the hashes of the blocks wanted.
BlockCompleteEntries(Vec<[u8; 32]>),
/// Request a block's extended header. /// Request a block's extended header.
/// ///
/// The input is the block's height. /// The input is the block's height.
@ -101,6 +106,13 @@ pub enum BlockchainReadRequest {
/// order, or else the returned response is unspecified and meaningless, /// order, or else the returned response is unspecified and meaningless,
/// as this request performs a binary search. /// as this request performs a binary search.
FindFirstUnknown(Vec<[u8; 32]>), FindFirstUnknown(Vec<[u8; 32]>),
/// A request for the next missing chain entry.
///
/// The input is a list of block hashes in reverse chronological order that do not necessarily
/// directly follow each other.
NextMissingChainEntry(Vec<[u8; 32]>),
/// A request for all alt blocks in the chain with the given [`ChainId`]. /// A request for all alt blocks in the chain with the given [`ChainId`].
AltBlocksInChain(ChainId), AltBlocksInChain(ChainId),
} }
@ -145,6 +157,16 @@ pub enum BlockchainWriteRequest {
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub enum BlockchainResponse { pub enum BlockchainResponse {
//------------------------------------------------------ Reads //------------------------------------------------------ Reads
/// Response to [`BlockchainReadRequest::BlockCompleteEntries`]
BlockCompleteEntries {
/// The blocks requested that we had.
blocks: Vec<BlockCompleteEntry>,
/// The hashes of the blocks we did not have.
missed_ids: Vec<[u8; 32]>,
/// The current height of our blockchain.
current_blockchain_height: usize,
},
/// Response to [`BlockchainReadRequest::BlockExtendedHeader`]. /// Response to [`BlockchainReadRequest::BlockExtendedHeader`].
/// ///
/// Inner value is the extended headed of the requested block. /// Inner value is the extended headed of the requested block.
@ -218,6 +240,24 @@ pub enum BlockchainResponse {
/// This will be [`None`] if all blocks were known. /// This will be [`None`] if all blocks were known.
FindFirstUnknown(Option<(usize, usize)>), FindFirstUnknown(Option<(usize, usize)>),
/// The response for [`BlockchainReadRequest::NextMissingChainEntry`]
NextMissingChainEntry {
/// A list of block hashes that should be next from the requested chain.
///
/// The first block hash will overlap with one of the blocks in the request.
next_entry: Vec<[u8; 32]>,
/// The block blob of the second block in `next_entry`.
///
/// If there is only 1 block in `next_entry` then this will be [`None`].
first_missing_block: Option<Vec<u8>>,
/// The height of the first block in `next_entry`.
start_height: usize,
/// The current height of our chain.
chain_height: usize,
/// The cumulative difficulty of our chain.
cumulative_difficulty: u128,
},
/// The response for [`BlockchainReadRequest::AltBlocksInChain`]. /// The response for [`BlockchainReadRequest::AltBlocksInChain`].
/// ///
/// Contains all the alt blocks in the alt-chain in chronological order. /// Contains all the alt blocks in the alt-chain in chronological order.