mirror of
https://github.com/hinto-janai/cuprate.git
synced 2025-01-22 02:34:29 +00:00
add get_objects
handler
This commit is contained in:
parent
403964bc22
commit
6e8fbf04f6
7 changed files with 214 additions and 6 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -547,6 +547,7 @@ version = "0.0.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bitflags 2.6.0",
|
"bitflags 2.6.0",
|
||||||
"bytemuck",
|
"bytemuck",
|
||||||
|
"bytes",
|
||||||
"cuprate-database",
|
"cuprate-database",
|
||||||
"cuprate-database-service",
|
"cuprate-database-service",
|
||||||
"cuprate-helper",
|
"cuprate-helper",
|
||||||
|
|
|
@ -1 +1,107 @@
|
||||||
|
use std::{
|
||||||
|
future::{ready, Ready},
|
||||||
|
task::{Context, Poll},
|
||||||
|
};
|
||||||
|
|
||||||
|
use futures::future::BoxFuture;
|
||||||
|
use futures::FutureExt;
|
||||||
|
use tower::{Service, ServiceExt};
|
||||||
|
|
||||||
|
use cuprate_blockchain::service::BlockchainReadHandle;
|
||||||
|
use cuprate_consensus::BlockChainContextService;
|
||||||
|
use cuprate_fixed_bytes::ByteArrayVec;
|
||||||
|
use cuprate_p2p::constants::MAX_BLOCK_BATCH_LEN;
|
||||||
|
use cuprate_p2p_core::{client::PeerInformation, NetworkZone, ProtocolRequest, ProtocolResponse};
|
||||||
|
use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse};
|
||||||
|
use cuprate_wire::protocol::{GetObjectsRequest, GetObjectsResponse};
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct P2pProtocolRequestHandlerMaker {
|
||||||
|
pub blockchain_read_handle: BlockchainReadHandle,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<N: NetworkZone> Service<PeerInformation<N>> for P2pProtocolRequestHandlerMaker {
|
||||||
|
type Response = P2pProtocolRequestHandler<N>;
|
||||||
|
type Error = tower::BoxError;
|
||||||
|
type Future = Ready<Result<Self::Response, Self::Error>>;
|
||||||
|
|
||||||
|
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn call(&mut self, peer_information: PeerInformation<N>) -> Self::Future {
|
||||||
|
// TODO: check sync info?
|
||||||
|
|
||||||
|
let blockchain_read_handle = self.blockchain_read_handle.clone();
|
||||||
|
|
||||||
|
ready(Ok(P2pProtocolRequestHandler {
|
||||||
|
peer_information,
|
||||||
|
blockchain_read_handle,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct P2pProtocolRequestHandler<N: NetworkZone> {
|
||||||
|
peer_information: PeerInformation<N>,
|
||||||
|
blockchain_read_handle: BlockchainReadHandle,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Z: NetworkZone> Service<ProtocolRequest> for P2pProtocolRequestHandler<Z> {
|
||||||
|
type Response = ProtocolResponse;
|
||||||
|
type Error = anyhow::Error;
|
||||||
|
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
|
||||||
|
|
||||||
|
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn call(&mut self, request: ProtocolRequest) -> Self::Future {
|
||||||
|
match request {
|
||||||
|
ProtocolRequest::GetObjects(r) => {
|
||||||
|
get_objects(r, self.blockchain_read_handle.clone()).boxed()
|
||||||
|
}
|
||||||
|
ProtocolRequest::GetChain(_) => todo!(),
|
||||||
|
ProtocolRequest::FluffyMissingTxs(_) => todo!(),
|
||||||
|
ProtocolRequest::GetTxPoolCompliment(_) => todo!(),
|
||||||
|
ProtocolRequest::NewBlock(_) => todo!(),
|
||||||
|
ProtocolRequest::NewFluffyBlock(_) => todo!(),
|
||||||
|
ProtocolRequest::NewTransactions(_) => todo!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//---------------------------------------------------------------------------------------------------- Handler functions
|
||||||
|
|
||||||
|
/// [`ProtocolRequest::GetObjects`]
|
||||||
|
async fn get_objects(
|
||||||
|
request: GetObjectsRequest,
|
||||||
|
mut blockchain_read_handle: BlockchainReadHandle,
|
||||||
|
) -> anyhow::Result<ProtocolResponse> {
|
||||||
|
if request.blocks.len() > MAX_BLOCK_BATCH_LEN {
|
||||||
|
anyhow::bail!("Peer requested more blocks than allowed.")
|
||||||
|
}
|
||||||
|
|
||||||
|
let block_ids: Vec<[u8; 32]> = (&request.blocks).into();
|
||||||
|
// de-allocate the backing `Bytes`.
|
||||||
|
drop(request);
|
||||||
|
|
||||||
|
let BlockchainResponse::BlockCompleteEntries {
|
||||||
|
blocks,
|
||||||
|
missing_hashes,
|
||||||
|
blockchain_height,
|
||||||
|
} = blockchain_read_handle
|
||||||
|
.ready()
|
||||||
|
.await?
|
||||||
|
.call(BlockchainReadRequest::BlockCompleteEntries(block_ids))
|
||||||
|
.await?
|
||||||
|
else {
|
||||||
|
panic!("blockchain returned wrong response!");
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(ProtocolResponse::GetObjects(GetObjectsResponse {
|
||||||
|
blocks,
|
||||||
|
missed_ids: ByteArrayVec::from(missing_hashes),
|
||||||
|
current_blockchain_height,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
|
@ -52,7 +52,7 @@ pub(crate) const INITIAL_CHAIN_REQUESTS_TO_SEND: usize = 3;
|
||||||
/// The enforced maximum amount of blocks to request in a batch.
|
/// The enforced maximum amount of blocks to request in a batch.
|
||||||
///
|
///
|
||||||
/// Requesting more than this will cause the peer to disconnect and potentially lead to bans.
|
/// Requesting more than this will cause the peer to disconnect and potentially lead to bans.
|
||||||
pub(crate) const MAX_BLOCK_BATCH_LEN: usize = 100;
|
pub const MAX_BLOCK_BATCH_LEN: usize = 100;
|
||||||
|
|
||||||
/// The timeout that the block downloader will use for requests.
|
/// The timeout that the block downloader will use for requests.
|
||||||
pub(crate) const BLOCK_DOWNLOADER_REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
|
pub(crate) const BLOCK_DOWNLOADER_REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
|
||||||
|
|
|
@ -35,6 +35,7 @@ serde = { workspace = true, optional = true }
|
||||||
tower = { workspace = true }
|
tower = { workspace = true }
|
||||||
thread_local = { workspace = true, optional = true }
|
thread_local = { workspace = true, optional = true }
|
||||||
rayon = { workspace = true, optional = true }
|
rayon = { workspace = true, optional = true }
|
||||||
|
bytes = "1.7.2"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
cuprate-helper = { path = "../../helper", features = ["thread", "cast"] }
|
cuprate-helper = { path = "../../helper", features = ["thread", "cast"] }
|
||||||
|
|
|
@ -1,7 +1,9 @@
|
||||||
//! Block functions.
|
//! Block functions.
|
||||||
|
|
||||||
|
use std::slice;
|
||||||
//---------------------------------------------------------------------------------------------------- Import
|
//---------------------------------------------------------------------------------------------------- Import
|
||||||
use bytemuck::TransparentWrapper;
|
use bytemuck::TransparentWrapper;
|
||||||
|
use bytes::{Bytes, BytesMut};
|
||||||
use monero_serai::{
|
use monero_serai::{
|
||||||
block::{Block, BlockHeader},
|
block::{Block, BlockHeader},
|
||||||
transaction::Transaction,
|
transaction::Transaction,
|
||||||
|
@ -10,15 +12,17 @@ use monero_serai::{
|
||||||
use cuprate_database::{
|
use cuprate_database::{
|
||||||
RuntimeError, StorableVec, {DatabaseRo, DatabaseRw},
|
RuntimeError, StorableVec, {DatabaseRo, DatabaseRw},
|
||||||
};
|
};
|
||||||
|
use cuprate_helper::cast::usize_to_u64;
|
||||||
use cuprate_helper::{
|
use cuprate_helper::{
|
||||||
map::{combine_low_high_bits_to_u128, split_u128_into_low_high_bits},
|
map::{combine_low_high_bits_to_u128, split_u128_into_low_high_bits},
|
||||||
tx::tx_fee,
|
tx::tx_fee,
|
||||||
};
|
};
|
||||||
use cuprate_types::{
|
use cuprate_types::{
|
||||||
AltBlockInformation, ChainId, ExtendedBlockHeader, HardFork, VerifiedBlockInformation,
|
AltBlockInformation, BlockCompleteEntry, ChainId, ExtendedBlockHeader, HardFork,
|
||||||
VerifiedTransactionInformation,
|
TransactionBlobs, VerifiedBlockInformation, VerifiedTransactionInformation,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use crate::tables::TablesIter;
|
||||||
use crate::{
|
use crate::{
|
||||||
ops::{
|
ops::{
|
||||||
alt_block,
|
alt_block,
|
||||||
|
@ -224,6 +228,60 @@ pub fn pop_block(
|
||||||
|
|
||||||
Ok((block_height, block_info.block_hash, block))
|
Ok((block_height, block_info.block_hash, block))
|
||||||
}
|
}
|
||||||
|
//---------------------------------------------------------------------------------------------------- `get_block_blob_with_tx_indexes`
|
||||||
|
pub fn get_block_blob_with_tx_indexes(
|
||||||
|
block_height: &BlockHeight,
|
||||||
|
tables: &impl Tables,
|
||||||
|
) -> Result<(Vec<u8>, u64, usize), RuntimeError> {
|
||||||
|
use monero_serai::io::write_varint;
|
||||||
|
|
||||||
|
let block_info = tables.block_infos().get(&block_height)?;
|
||||||
|
|
||||||
|
let miner_tx_idx = block_info.mining_tx_index;
|
||||||
|
let mut block_txs = tables.block_txs_hashes().get(&block_height)?.0;
|
||||||
|
let numb_txs = block_txs.len();
|
||||||
|
|
||||||
|
// Get the block header
|
||||||
|
let mut block = tables.block_header_blobs().get(&block_height)?.0;
|
||||||
|
|
||||||
|
// Add the miner tx to the blob.
|
||||||
|
let mut miner_tx_blob = tables.tx_blobs().get(&miner_tx_idx)?.0;
|
||||||
|
block.append(&mut miner_tx_blob);
|
||||||
|
|
||||||
|
// Add the blocks tx hashes.
|
||||||
|
write_varint(&block_txs.len(), &mut block)
|
||||||
|
.expect("The number of txs per block will not exceed u64::MAX");
|
||||||
|
for tx in block_txs {
|
||||||
|
block.extend_from_slice(&tx);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok((block, miner_tx_idx, numb_txs))
|
||||||
|
}
|
||||||
|
|
||||||
|
//---------------------------------------------------------------------------------------------------- `get_block_extended_header_*`
|
||||||
|
pub fn get_block_complete_entry(
|
||||||
|
block_hash: &BlockHash,
|
||||||
|
tables: &impl TablesIter,
|
||||||
|
) -> Result<BlockCompleteEntry, RuntimeError> {
|
||||||
|
let block_height = tables.block_heights().get(block_hash)?;
|
||||||
|
let (block_blob, miner_tx_idx, numb_non_miner_txs) =
|
||||||
|
get_block_blob_with_tx_indexes(&block_height, tables)?;
|
||||||
|
|
||||||
|
let first_tx_idx = miner_tx_idx + 1;
|
||||||
|
|
||||||
|
let tx_blobs = tables
|
||||||
|
.tx_blobs_iter()
|
||||||
|
.get_range(first_tx_idx..=usize_to_u64(numb_non_miner_txs))?
|
||||||
|
.map(|tx_blob| Ok(Bytes::from(tx_blob?.0)))
|
||||||
|
.collect::<Result<_, _>>()?;
|
||||||
|
|
||||||
|
Ok(BlockCompleteEntry {
|
||||||
|
block: Bytes::from(block_blob),
|
||||||
|
txs: TransactionBlobs::Normal(tx_blobs),
|
||||||
|
pruned: false,
|
||||||
|
block_weight: 0,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
//---------------------------------------------------------------------------------------------------- `get_block_extended_header_*`
|
//---------------------------------------------------------------------------------------------------- `get_block_extended_header_*`
|
||||||
/// Retrieve a [`ExtendedBlockHeader`] from the database.
|
/// Retrieve a [`ExtendedBlockHeader`] from the database.
|
||||||
|
|
|
@ -7,7 +7,7 @@ use std::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use rayon::{
|
use rayon::{
|
||||||
iter::{IntoParallelIterator, ParallelIterator},
|
iter::{IntoParallelIterator, ParallelIterator, Either},
|
||||||
prelude::*,
|
prelude::*,
|
||||||
ThreadPool,
|
ThreadPool,
|
||||||
};
|
};
|
||||||
|
@ -18,9 +18,10 @@ use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThre
|
||||||
use cuprate_helper::map::combine_low_high_bits_to_u128;
|
use cuprate_helper::map::combine_low_high_bits_to_u128;
|
||||||
use cuprate_types::{
|
use cuprate_types::{
|
||||||
blockchain::{BlockchainReadRequest, BlockchainResponse},
|
blockchain::{BlockchainReadRequest, BlockchainResponse},
|
||||||
Chain, ChainId, ExtendedBlockHeader, OutputOnChain,
|
BlockCompleteEntry, Chain, ChainId, ExtendedBlockHeader, OutputOnChain,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use crate::ops::block::get_block_complete_entry;
|
||||||
use crate::{
|
use crate::{
|
||||||
ops::{
|
ops::{
|
||||||
alt_block::{
|
alt_block::{
|
||||||
|
@ -92,6 +93,7 @@ fn map_request(
|
||||||
/* SOMEDAY: pre-request handling, run some code for each request? */
|
/* SOMEDAY: pre-request handling, run some code for each request? */
|
||||||
|
|
||||||
match request {
|
match request {
|
||||||
|
R::BlockCompleteEntries(block_hashes) => block_complete_entries(env, block_hashes),
|
||||||
R::BlockExtendedHeader(block) => block_extended_header(env, block),
|
R::BlockExtendedHeader(block) => block_extended_header(env, block),
|
||||||
R::BlockHash(block, chain) => block_hash(env, block, chain),
|
R::BlockHash(block, chain) => block_hash(env, block, chain),
|
||||||
R::FindBlock(block_hash) => find_block(env, block_hash),
|
R::FindBlock(block_hash) => find_block(env, block_hash),
|
||||||
|
@ -182,6 +184,38 @@ macro_rules! get_tables {
|
||||||
// TODO: The overhead of parallelism may be too much for every request, perfomace test to find optimal
|
// TODO: The overhead of parallelism may be too much for every request, perfomace test to find optimal
|
||||||
// amount of parallelism.
|
// amount of parallelism.
|
||||||
|
|
||||||
|
/// [`BlockchainReadRequest::BlockCompleteEntries`].
|
||||||
|
fn block_complete_entries(env: &ConcreteEnv, block_hashes: Vec<BlockHash>) -> ResponseResult {
|
||||||
|
// Prepare tx/tables in `ThreadLocal`.
|
||||||
|
let env_inner = env.env_inner();
|
||||||
|
let tx_ro = thread_local(env);
|
||||||
|
let tables = thread_local(env);
|
||||||
|
|
||||||
|
let (missing_hashes, blocks) = block_hashes
|
||||||
|
.into_par_iter()
|
||||||
|
.map(|block_hash| {
|
||||||
|
let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
|
||||||
|
let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
|
||||||
|
|
||||||
|
match get_block_complete_entry(&block_hash, tables) {
|
||||||
|
Err(RuntimeError::KeyNotFound) => Ok(Either::Left(block_hash)),
|
||||||
|
res => res.map(Either::Right),
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect::<Result<_, _>>()?;
|
||||||
|
|
||||||
|
let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
|
||||||
|
let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
|
||||||
|
|
||||||
|
let blockchain_height = crate::ops::blockchain::chain_height(tables.block_heights())?;
|
||||||
|
|
||||||
|
Ok(BlockchainResponse::BlockCompleteEntries {
|
||||||
|
blocks,
|
||||||
|
missing_hashes,
|
||||||
|
blockchain_height,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
/// [`BlockchainReadRequest::BlockExtendedHeader`].
|
/// [`BlockchainReadRequest::BlockExtendedHeader`].
|
||||||
#[inline]
|
#[inline]
|
||||||
fn block_extended_header(env: &ConcreteEnv, block_height: BlockHeight) -> ResponseResult {
|
fn block_extended_header(env: &ConcreteEnv, block_height: BlockHeight) -> ResponseResult {
|
||||||
|
|
|
@ -10,7 +10,7 @@ use std::{
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
types::{Chain, ExtendedBlockHeader, OutputOnChain, VerifiedBlockInformation},
|
types::{Chain, ExtendedBlockHeader, OutputOnChain, VerifiedBlockInformation},
|
||||||
AltBlockInformation, ChainId,
|
AltBlockInformation, BlockCompleteEntry, ChainId,
|
||||||
};
|
};
|
||||||
|
|
||||||
//---------------------------------------------------------------------------------------------------- ReadRequest
|
//---------------------------------------------------------------------------------------------------- ReadRequest
|
||||||
|
@ -24,6 +24,8 @@ use crate::{
|
||||||
/// See `Response` for the expected responses per `Request`.
|
/// See `Response` for the expected responses per `Request`.
|
||||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
pub enum BlockchainReadRequest {
|
pub enum BlockchainReadRequest {
|
||||||
|
BlockCompleteEntries(Vec<[u8; 32]>),
|
||||||
|
|
||||||
/// Request a block's extended header.
|
/// Request a block's extended header.
|
||||||
///
|
///
|
||||||
/// The input is the block's height.
|
/// The input is the block's height.
|
||||||
|
@ -149,6 +151,12 @@ pub enum BlockchainWriteRequest {
|
||||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
pub enum BlockchainResponse {
|
pub enum BlockchainResponse {
|
||||||
//------------------------------------------------------ Reads
|
//------------------------------------------------------ Reads
|
||||||
|
BlockCompleteEntries {
|
||||||
|
blocks: Vec<BlockCompleteEntry>,
|
||||||
|
missing_hashes: Vec<[u8; 32]>,
|
||||||
|
blockchain_height: usize,
|
||||||
|
},
|
||||||
|
|
||||||
/// Response to [`BlockchainReadRequest::BlockExtendedHeader`].
|
/// Response to [`BlockchainReadRequest::BlockExtendedHeader`].
|
||||||
///
|
///
|
||||||
/// Inner value is the extended headed of the requested block.
|
/// Inner value is the extended headed of the requested block.
|
||||||
|
|
Loading…
Reference in a new issue