mirror of
https://github.com/Cuprate/cuprate.git
synced 2024-12-23 03:59:31 +00:00
Merge branch 'p2p-request-handler' into init
This commit is contained in:
commit
11468dc3b7
17 changed files with 875 additions and 41 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -580,6 +580,7 @@ version = "0.0.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bitflags 2.6.0",
|
"bitflags 2.6.0",
|
||||||
"bytemuck",
|
"bytemuck",
|
||||||
|
"bytes",
|
||||||
"cuprate-constants",
|
"cuprate-constants",
|
||||||
"cuprate-database",
|
"cuprate-database",
|
||||||
"cuprate-database-service",
|
"cuprate-database-service",
|
||||||
|
|
|
@ -1,8 +1,57 @@
|
||||||
//! P2P
|
//! P2P
|
||||||
//!
|
//!
|
||||||
//! Will handle initiating the P2P and contains a protocol request handler.
|
//! Will handle initiating the P2P and contains a protocol request handler.
|
||||||
|
use futures::{FutureExt, TryFutureExt};
|
||||||
|
use tokio::sync::oneshot;
|
||||||
|
use tower::ServiceExt;
|
||||||
|
|
||||||
|
use cuprate_blockchain::service::BlockchainReadHandle;
|
||||||
|
use cuprate_consensus::BlockChainContextService;
|
||||||
|
use cuprate_p2p::{NetworkInterface, P2PConfig};
|
||||||
|
use cuprate_p2p_core::ClearNet;
|
||||||
|
use cuprate_txpool::service::TxpoolReadHandle;
|
||||||
|
|
||||||
|
use crate::txpool::IncomingTxHandler;
|
||||||
|
|
||||||
|
mod core_sync_service;
|
||||||
mod network_address;
|
mod network_address;
|
||||||
pub mod request_handler;
|
pub mod request_handler;
|
||||||
|
|
||||||
pub use network_address::CrossNetworkInternalPeerId;
|
pub use network_address::CrossNetworkInternalPeerId;
|
||||||
|
|
||||||
|
/// Starts the P2P clearnet network, returning a [`NetworkInterface`] to interact with it.
|
||||||
|
///
|
||||||
|
/// A [`oneshot::Sender`] is also returned to provide the [`IncomingTxHandler`], until this is provided network
|
||||||
|
/// handshakes can not be completed.
|
||||||
|
pub async fn start_clearnet_p2p(
|
||||||
|
blockchain_read_handle: BlockchainReadHandle,
|
||||||
|
blockchain_context_service: BlockChainContextService,
|
||||||
|
txpool_read_handle: TxpoolReadHandle,
|
||||||
|
config: P2PConfig<ClearNet>,
|
||||||
|
) -> Result<
|
||||||
|
(
|
||||||
|
NetworkInterface<ClearNet>,
|
||||||
|
oneshot::Sender<IncomingTxHandler>,
|
||||||
|
),
|
||||||
|
tower::BoxError,
|
||||||
|
> {
|
||||||
|
let (incoming_tx_handler_tx, incoming_tx_handler_rx) = oneshot::channel();
|
||||||
|
|
||||||
|
let request_handler_maker = request_handler::P2pProtocolRequestHandlerMaker {
|
||||||
|
blockchain_read_handle,
|
||||||
|
blockchain_context_service: blockchain_context_service.clone(),
|
||||||
|
txpool_read_handle,
|
||||||
|
incoming_tx_handler: None,
|
||||||
|
incoming_tx_handler_fut: incoming_tx_handler_rx.shared(),
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok((
|
||||||
|
cuprate_p2p::initialize_network(
|
||||||
|
request_handler_maker.map_response(|s| s.map_err(Into::into)),
|
||||||
|
core_sync_service::CoreSyncService(blockchain_context_service),
|
||||||
|
config,
|
||||||
|
)
|
||||||
|
.await?,
|
||||||
|
incoming_tx_handler_tx,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
49
binaries/cuprated/src/p2p/core_sync_service.rs
Normal file
49
binaries/cuprated/src/p2p/core_sync_service.rs
Normal file
|
@ -0,0 +1,49 @@
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
|
use futures::{future::BoxFuture, FutureExt, TryFutureExt};
|
||||||
|
use tower::Service;
|
||||||
|
|
||||||
|
use cuprate_consensus::{
|
||||||
|
BlockChainContextRequest, BlockChainContextResponse, BlockChainContextService,
|
||||||
|
};
|
||||||
|
use cuprate_helper::{cast::usize_to_u64, map::split_u128_into_low_high_bits};
|
||||||
|
use cuprate_p2p_core::services::{CoreSyncDataRequest, CoreSyncDataResponse};
|
||||||
|
use cuprate_wire::CoreSyncData;
|
||||||
|
|
||||||
|
/// The core sync service.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct CoreSyncService(pub BlockChainContextService);
|
||||||
|
|
||||||
|
impl Service<CoreSyncDataRequest> for CoreSyncService {
|
||||||
|
type Response = CoreSyncDataResponse;
|
||||||
|
type Error = tower::BoxError;
|
||||||
|
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
|
||||||
|
|
||||||
|
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
|
self.0.poll_ready(cx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn call(&mut self, _: CoreSyncDataRequest) -> Self::Future {
|
||||||
|
self.0
|
||||||
|
.call(BlockChainContextRequest::Context)
|
||||||
|
.map_ok(|res| {
|
||||||
|
let BlockChainContextResponse::Context(context) = res else {
|
||||||
|
unreachable!()
|
||||||
|
};
|
||||||
|
|
||||||
|
let context = context.unchecked_blockchain_context();
|
||||||
|
let (cumulative_difficulty, cumulative_difficulty_top64) =
|
||||||
|
split_u128_into_low_high_bits(context.cumulative_difficulty);
|
||||||
|
|
||||||
|
CoreSyncDataResponse(CoreSyncData {
|
||||||
|
cumulative_difficulty,
|
||||||
|
cumulative_difficulty_top64,
|
||||||
|
current_height: usize_to_u64(context.chain_height),
|
||||||
|
pruning_seed: 0,
|
||||||
|
top_id: context.top_hash,
|
||||||
|
top_version: context.current_hf.as_u8(),
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.boxed()
|
||||||
|
}
|
||||||
|
}
|
|
@ -1 +1,432 @@
|
||||||
|
use std::{
|
||||||
|
collections::HashSet,
|
||||||
|
future::{ready, Ready},
|
||||||
|
hash::Hash,
|
||||||
|
task::{Context, Poll},
|
||||||
|
};
|
||||||
|
|
||||||
|
use bytes::Bytes;
|
||||||
|
use futures::{
|
||||||
|
future::{BoxFuture, Shared},
|
||||||
|
FutureExt,
|
||||||
|
};
|
||||||
|
use monero_serai::{block::Block, transaction::Transaction};
|
||||||
|
use tokio::sync::{broadcast, oneshot, watch};
|
||||||
|
use tokio_stream::wrappers::WatchStream;
|
||||||
|
use tower::{Service, ServiceExt};
|
||||||
|
|
||||||
|
use cuprate_blockchain::service::BlockchainReadHandle;
|
||||||
|
use cuprate_consensus::{
|
||||||
|
transactions::new_tx_verification_data, BlockChainContextRequest, BlockChainContextResponse,
|
||||||
|
BlockChainContextService,
|
||||||
|
};
|
||||||
|
use cuprate_dandelion_tower::TxState;
|
||||||
|
use cuprate_fixed_bytes::ByteArrayVec;
|
||||||
|
use cuprate_helper::cast::u64_to_usize;
|
||||||
|
use cuprate_helper::{
|
||||||
|
asynch::rayon_spawn_async,
|
||||||
|
cast::usize_to_u64,
|
||||||
|
map::{combine_low_high_bits_to_u128, split_u128_into_low_high_bits},
|
||||||
|
};
|
||||||
|
use cuprate_p2p::constants::{
|
||||||
|
MAX_BLOCKS_IDS_IN_CHAIN_ENTRY, MAX_BLOCK_BATCH_LEN, MAX_TRANSACTION_BLOB_SIZE, MEDIUM_BAN,
|
||||||
|
};
|
||||||
|
use cuprate_p2p_core::{
|
||||||
|
client::{InternalPeerID, PeerInformation},
|
||||||
|
NetZoneAddress, NetworkZone, ProtocolRequest, ProtocolResponse,
|
||||||
|
};
|
||||||
|
use cuprate_txpool::service::TxpoolReadHandle;
|
||||||
|
use cuprate_types::{
|
||||||
|
blockchain::{BlockchainReadRequest, BlockchainResponse},
|
||||||
|
BlockCompleteEntry, MissingTxsInBlock, TransactionBlobs,
|
||||||
|
};
|
||||||
|
use cuprate_wire::protocol::{
|
||||||
|
ChainRequest, ChainResponse, FluffyMissingTransactionsRequest, GetObjectsRequest,
|
||||||
|
GetObjectsResponse, NewFluffyBlock, NewTransactions,
|
||||||
|
};
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
blockchain::interface::{self as blockchain_interface, IncomingBlockError},
|
||||||
|
constants::PANIC_CRITICAL_SERVICE_ERROR,
|
||||||
|
p2p::CrossNetworkInternalPeerId,
|
||||||
|
txpool::{IncomingTxError, IncomingTxHandler, IncomingTxs},
|
||||||
|
};
|
||||||
|
|
||||||
|
/// The P2P protocol request handler [`MakeService`](tower::MakeService).
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct P2pProtocolRequestHandlerMaker {
|
||||||
|
pub blockchain_read_handle: BlockchainReadHandle,
|
||||||
|
|
||||||
|
pub blockchain_context_service: BlockChainContextService,
|
||||||
|
|
||||||
|
pub txpool_read_handle: TxpoolReadHandle,
|
||||||
|
|
||||||
|
/// The [`IncomingTxHandler`], wrapped in an [`Option`] as there is a cyclic reference between [`P2pProtocolRequestHandlerMaker`]
|
||||||
|
/// and the [`IncomingTxHandler`].
|
||||||
|
pub incoming_tx_handler: Option<IncomingTxHandler>,
|
||||||
|
|
||||||
|
/// A [`Future`](std::future::Future) that produces the [`IncomingTxHandler`].
|
||||||
|
pub incoming_tx_handler_fut: Shared<oneshot::Receiver<IncomingTxHandler>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<A: NetZoneAddress> Service<PeerInformation<A>> for P2pProtocolRequestHandlerMaker
|
||||||
|
where
|
||||||
|
InternalPeerID<A>: Into<CrossNetworkInternalPeerId>,
|
||||||
|
{
|
||||||
|
type Response = P2pProtocolRequestHandler<A>;
|
||||||
|
type Error = tower::BoxError;
|
||||||
|
type Future = Ready<Result<Self::Response, Self::Error>>;
|
||||||
|
|
||||||
|
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
|
if self.incoming_tx_handler.is_none() {
|
||||||
|
return self
|
||||||
|
.incoming_tx_handler_fut
|
||||||
|
.poll_unpin(cx)
|
||||||
|
.map(|incoming_tx_handler| {
|
||||||
|
self.incoming_tx_handler = Some(incoming_tx_handler?);
|
||||||
|
Ok(())
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn call(&mut self, peer_information: PeerInformation<A>) -> Self::Future {
|
||||||
|
let Some(incoming_tx_handler) = self.incoming_tx_handler.clone() else {
|
||||||
|
panic!("poll_ready was not called or did not return `Poll::Ready`")
|
||||||
|
};
|
||||||
|
|
||||||
|
// TODO: check sync info?
|
||||||
|
|
||||||
|
let blockchain_read_handle = self.blockchain_read_handle.clone();
|
||||||
|
let txpool_read_handle = self.txpool_read_handle.clone();
|
||||||
|
|
||||||
|
ready(Ok(P2pProtocolRequestHandler {
|
||||||
|
peer_information,
|
||||||
|
blockchain_read_handle,
|
||||||
|
blockchain_context_service: self.blockchain_context_service.clone(),
|
||||||
|
txpool_read_handle,
|
||||||
|
incoming_tx_handler,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The P2P protocol request handler.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct P2pProtocolRequestHandler<N: NetZoneAddress> {
|
||||||
|
peer_information: PeerInformation<N>,
|
||||||
|
|
||||||
|
blockchain_read_handle: BlockchainReadHandle,
|
||||||
|
|
||||||
|
blockchain_context_service: BlockChainContextService,
|
||||||
|
|
||||||
|
txpool_read_handle: TxpoolReadHandle,
|
||||||
|
|
||||||
|
incoming_tx_handler: IncomingTxHandler,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<A: NetZoneAddress> Service<ProtocolRequest> for P2pProtocolRequestHandler<A>
|
||||||
|
where
|
||||||
|
InternalPeerID<A>: Into<CrossNetworkInternalPeerId>,
|
||||||
|
{
|
||||||
|
type Response = ProtocolResponse;
|
||||||
|
type Error = anyhow::Error;
|
||||||
|
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
|
||||||
|
|
||||||
|
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn call(&mut self, request: ProtocolRequest) -> Self::Future {
|
||||||
|
match request {
|
||||||
|
ProtocolRequest::GetObjects(r) => {
|
||||||
|
get_objects(r, self.blockchain_read_handle.clone()).boxed()
|
||||||
|
}
|
||||||
|
ProtocolRequest::GetChain(r) => {
|
||||||
|
get_chain(r, self.blockchain_read_handle.clone()).boxed()
|
||||||
|
}
|
||||||
|
ProtocolRequest::FluffyMissingTxs(r) => {
|
||||||
|
fluffy_missing_txs(r, self.blockchain_read_handle.clone()).boxed()
|
||||||
|
}
|
||||||
|
ProtocolRequest::NewBlock(_) => ready(Err(anyhow::anyhow!(
|
||||||
|
"Peer sent a full block when we support fluffy blocks"
|
||||||
|
)))
|
||||||
|
.boxed(),
|
||||||
|
ProtocolRequest::NewFluffyBlock(r) => new_fluffy_block(
|
||||||
|
self.peer_information.clone(),
|
||||||
|
r,
|
||||||
|
self.blockchain_read_handle.clone(),
|
||||||
|
self.txpool_read_handle.clone(),
|
||||||
|
)
|
||||||
|
.boxed(),
|
||||||
|
ProtocolRequest::NewTransactions(r) => new_transactions(
|
||||||
|
self.peer_information.clone(),
|
||||||
|
r,
|
||||||
|
self.blockchain_context_service.clone(),
|
||||||
|
self.incoming_tx_handler.clone(),
|
||||||
|
)
|
||||||
|
.boxed(),
|
||||||
|
ProtocolRequest::GetTxPoolCompliment(_) => ready(Ok(ProtocolResponse::NA)).boxed(), // TODO: should we support this?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//---------------------------------------------------------------------------------------------------- Handler functions
|
||||||
|
|
||||||
|
/// [`ProtocolRequest::GetObjects`]
|
||||||
|
async fn get_objects(
|
||||||
|
request: GetObjectsRequest,
|
||||||
|
mut blockchain_read_handle: BlockchainReadHandle,
|
||||||
|
) -> anyhow::Result<ProtocolResponse> {
|
||||||
|
if request.blocks.len() > MAX_BLOCK_BATCH_LEN {
|
||||||
|
anyhow::bail!("Peer requested more blocks than allowed.")
|
||||||
|
}
|
||||||
|
|
||||||
|
let block_hashes: Vec<[u8; 32]> = (&request.blocks).into();
|
||||||
|
// deallocate the backing `Bytes`.
|
||||||
|
drop(request);
|
||||||
|
|
||||||
|
let BlockchainResponse::BlockCompleteEntries {
|
||||||
|
blocks,
|
||||||
|
missing_hashes,
|
||||||
|
blockchain_height,
|
||||||
|
} = blockchain_read_handle
|
||||||
|
.ready()
|
||||||
|
.await?
|
||||||
|
.call(BlockchainReadRequest::BlockCompleteEntries(block_hashes))
|
||||||
|
.await?
|
||||||
|
else {
|
||||||
|
panic!("blockchain returned wrong response!");
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(ProtocolResponse::GetObjects(GetObjectsResponse {
|
||||||
|
blocks,
|
||||||
|
missed_ids: ByteArrayVec::from(missing_hashes),
|
||||||
|
current_blockchain_height: usize_to_u64(blockchain_height),
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// [`ProtocolRequest::GetChain`]
|
||||||
|
async fn get_chain(
|
||||||
|
request: ChainRequest,
|
||||||
|
mut blockchain_read_handle: BlockchainReadHandle,
|
||||||
|
) -> anyhow::Result<ProtocolResponse> {
|
||||||
|
if request.block_ids.len() > MAX_BLOCKS_IDS_IN_CHAIN_ENTRY {
|
||||||
|
anyhow::bail!("Peer sent too many block hashes in chain request.")
|
||||||
|
}
|
||||||
|
|
||||||
|
let block_hashes: Vec<[u8; 32]> = (&request.block_ids).into();
|
||||||
|
let want_pruned_data = request.prune;
|
||||||
|
// deallocate the backing `Bytes`.
|
||||||
|
drop(request);
|
||||||
|
|
||||||
|
let BlockchainResponse::NextChainEntry {
|
||||||
|
start_height,
|
||||||
|
chain_height,
|
||||||
|
block_ids,
|
||||||
|
block_weights,
|
||||||
|
cumulative_difficulty,
|
||||||
|
first_block_blob,
|
||||||
|
} = blockchain_read_handle
|
||||||
|
.ready()
|
||||||
|
.await?
|
||||||
|
.call(BlockchainReadRequest::NextChainEntry(block_hashes, 10_000))
|
||||||
|
.await?
|
||||||
|
else {
|
||||||
|
panic!("blockchain returned wrong response!");
|
||||||
|
};
|
||||||
|
|
||||||
|
if start_height == 0 {
|
||||||
|
anyhow::bail!("The peers chain has a different genesis block than ours.");
|
||||||
|
}
|
||||||
|
|
||||||
|
let (cumulative_difficulty_low64, cumulative_difficulty_top64) =
|
||||||
|
split_u128_into_low_high_bits(cumulative_difficulty);
|
||||||
|
|
||||||
|
Ok(ProtocolResponse::GetChain(ChainResponse {
|
||||||
|
start_height: usize_to_u64(start_height),
|
||||||
|
total_height: usize_to_u64(chain_height),
|
||||||
|
cumulative_difficulty_low64,
|
||||||
|
cumulative_difficulty_top64,
|
||||||
|
m_block_ids: ByteArrayVec::from(block_ids),
|
||||||
|
first_block: first_block_blob.map_or(Bytes::new(), Bytes::from),
|
||||||
|
// only needed when pruned
|
||||||
|
m_block_weights: if want_pruned_data {
|
||||||
|
block_weights.into_iter().map(usize_to_u64).collect()
|
||||||
|
} else {
|
||||||
|
vec![]
|
||||||
|
},
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// [`ProtocolRequest::FluffyMissingTxs`]
|
||||||
|
async fn fluffy_missing_txs(
|
||||||
|
mut request: FluffyMissingTransactionsRequest,
|
||||||
|
mut blockchain_read_handle: BlockchainReadHandle,
|
||||||
|
) -> anyhow::Result<ProtocolResponse> {
|
||||||
|
let tx_indexes = std::mem::take(&mut request.missing_tx_indices);
|
||||||
|
let block_hash: [u8; 32] = *request.block_hash;
|
||||||
|
let current_blockchain_height = request.current_blockchain_height;
|
||||||
|
|
||||||
|
// deallocate the backing `Bytes`.
|
||||||
|
drop(request);
|
||||||
|
|
||||||
|
let BlockchainResponse::MissingTxsInBlock(res) = blockchain_read_handle
|
||||||
|
.ready()
|
||||||
|
.await?
|
||||||
|
.call(BlockchainReadRequest::MissingTxsInBlock {
|
||||||
|
block_hash,
|
||||||
|
tx_indexes,
|
||||||
|
})
|
||||||
|
.await?
|
||||||
|
else {
|
||||||
|
panic!("blockchain returned wrong response!");
|
||||||
|
};
|
||||||
|
|
||||||
|
let Some(MissingTxsInBlock { block, txs }) = res else {
|
||||||
|
anyhow::bail!("The peer requested txs out of range.");
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(ProtocolResponse::NewFluffyBlock(NewFluffyBlock {
|
||||||
|
b: BlockCompleteEntry {
|
||||||
|
block: Bytes::from(block),
|
||||||
|
txs: TransactionBlobs::Normal(txs.into_iter().map(Bytes::from).collect()),
|
||||||
|
pruned: false,
|
||||||
|
// only needed for pruned blocks.
|
||||||
|
block_weight: 0,
|
||||||
|
},
|
||||||
|
current_blockchain_height,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// [`ProtocolRequest::NewFluffyBlock`]
|
||||||
|
async fn new_fluffy_block<A: NetZoneAddress>(
|
||||||
|
peer_information: PeerInformation<A>,
|
||||||
|
request: NewFluffyBlock,
|
||||||
|
mut blockchain_read_handle: BlockchainReadHandle,
|
||||||
|
mut txpool_read_handle: TxpoolReadHandle,
|
||||||
|
) -> anyhow::Result<ProtocolResponse> {
|
||||||
|
// TODO: check context service here and ignore the block?
|
||||||
|
let current_blockchain_height = request.current_blockchain_height;
|
||||||
|
|
||||||
|
peer_information
|
||||||
|
.core_sync_data
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.current_height = current_blockchain_height;
|
||||||
|
|
||||||
|
let (block, txs) = rayon_spawn_async(move || -> Result<_, anyhow::Error> {
|
||||||
|
let block = Block::read(&mut request.b.block.as_ref())?;
|
||||||
|
|
||||||
|
let tx_blobs = request
|
||||||
|
.b
|
||||||
|
.txs
|
||||||
|
.take_normal()
|
||||||
|
.ok_or(anyhow::anyhow!("Peer sent pruned txs in fluffy block"))?;
|
||||||
|
|
||||||
|
let txs = tx_blobs
|
||||||
|
.into_iter()
|
||||||
|
.map(|tx_blob| {
|
||||||
|
if tx_blob.len() > MAX_TRANSACTION_BLOB_SIZE {
|
||||||
|
anyhow::bail!("Peer sent a transaction over the size limit.");
|
||||||
|
}
|
||||||
|
|
||||||
|
let tx = Transaction::read(&mut tx_blob.as_ref())?;
|
||||||
|
|
||||||
|
Ok((tx.hash(), tx))
|
||||||
|
})
|
||||||
|
.collect::<Result<_, anyhow::Error>>()?;
|
||||||
|
|
||||||
|
// The backing `Bytes` will be deallocated when this closure returns.
|
||||||
|
|
||||||
|
Ok((block, txs))
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let res = blockchain_interface::handle_incoming_block(
|
||||||
|
block,
|
||||||
|
txs,
|
||||||
|
&mut blockchain_read_handle,
|
||||||
|
&mut txpool_read_handle,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
match res {
|
||||||
|
Ok(_) => Ok(ProtocolResponse::NA),
|
||||||
|
Err(IncomingBlockError::UnknownTransactions(block_hash, missing_tx_indices)) => Ok(
|
||||||
|
ProtocolResponse::FluffyMissingTransactionsRequest(FluffyMissingTransactionsRequest {
|
||||||
|
block_hash: block_hash.into(),
|
||||||
|
current_blockchain_height,
|
||||||
|
missing_tx_indices: missing_tx_indices.into_iter().map(usize_to_u64).collect(),
|
||||||
|
}),
|
||||||
|
),
|
||||||
|
Err(IncomingBlockError::Orphan) => {
|
||||||
|
// Block's parent was unknown, could be syncing?
|
||||||
|
Ok(ProtocolResponse::NA)
|
||||||
|
}
|
||||||
|
Err(e) => Err(e.into()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// [`ProtocolRequest::NewTransactions`]
|
||||||
|
async fn new_transactions<A>(
|
||||||
|
peer_information: PeerInformation<A>,
|
||||||
|
request: NewTransactions,
|
||||||
|
mut blockchain_context_service: BlockChainContextService,
|
||||||
|
mut incoming_tx_handler: IncomingTxHandler,
|
||||||
|
) -> anyhow::Result<ProtocolResponse>
|
||||||
|
where
|
||||||
|
A: NetZoneAddress,
|
||||||
|
InternalPeerID<A>: Into<CrossNetworkInternalPeerId>,
|
||||||
|
{
|
||||||
|
let BlockChainContextResponse::Context(context) = blockchain_context_service
|
||||||
|
.ready()
|
||||||
|
.await
|
||||||
|
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||||
|
.call(BlockChainContextRequest::Context)
|
||||||
|
.await
|
||||||
|
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||||
|
else {
|
||||||
|
unreachable!()
|
||||||
|
};
|
||||||
|
|
||||||
|
let context = context.unchecked_blockchain_context();
|
||||||
|
|
||||||
|
// If we are more than 2 blocks behind the peer then ignore the txs - we are probably still syncing.
|
||||||
|
if usize_to_u64(context.chain_height + 2)
|
||||||
|
< peer_information
|
||||||
|
.core_sync_data
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.current_height
|
||||||
|
{
|
||||||
|
return Ok(ProtocolResponse::NA);
|
||||||
|
}
|
||||||
|
|
||||||
|
let state = if request.dandelionpp_fluff {
|
||||||
|
TxState::Fluff
|
||||||
|
} else {
|
||||||
|
TxState::Stem {
|
||||||
|
from: peer_information.id.into(),
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Drop all the data except the stuff we still need.
|
||||||
|
let NewTransactions {
|
||||||
|
txs,
|
||||||
|
dandelionpp_fluff: _,
|
||||||
|
padding: _,
|
||||||
|
} = request;
|
||||||
|
|
||||||
|
let res = incoming_tx_handler
|
||||||
|
.ready()
|
||||||
|
.await
|
||||||
|
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||||
|
.call(IncomingTxs { txs, state })
|
||||||
|
.await;
|
||||||
|
|
||||||
|
match res {
|
||||||
|
Ok(()) => Ok(ProtocolResponse::NA),
|
||||||
|
Err(e) => Err(e.into()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -12,4 +12,4 @@ mod dandelion;
|
||||||
mod incoming_tx;
|
mod incoming_tx;
|
||||||
mod txs_being_handled;
|
mod txs_being_handled;
|
||||||
|
|
||||||
pub use incoming_tx::IncomingTxHandler;
|
pub use incoming_tx::{IncomingTxError, IncomingTxHandler, IncomingTxs};
|
||||||
|
|
|
@ -43,9 +43,13 @@ use crate::{
|
||||||
};
|
};
|
||||||
|
|
||||||
/// An error that can happen handling an incoming tx.
|
/// An error that can happen handling an incoming tx.
|
||||||
|
#[derive(Debug, thiserror::Error)]
|
||||||
pub enum IncomingTxError {
|
pub enum IncomingTxError {
|
||||||
|
#[error("Error parsing tx: {0}")]
|
||||||
Parse(std::io::Error),
|
Parse(std::io::Error),
|
||||||
|
#[error(transparent)]
|
||||||
Consensus(ExtendedConsensusError),
|
Consensus(ExtendedConsensusError),
|
||||||
|
#[error("Duplicate tx in message")]
|
||||||
DuplicateTransaction,
|
DuplicateTransaction,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -67,6 +71,7 @@ pub(super) type TxId = [u8; 32];
|
||||||
/// The service than handles incoming transaction pool transactions.
|
/// The service than handles incoming transaction pool transactions.
|
||||||
///
|
///
|
||||||
/// This service handles everything including verifying the tx, adding it to the pool and routing it to other nodes.
|
/// This service handles everything including verifying the tx, adding it to the pool and routing it to other nodes.
|
||||||
|
#[derive(Clone)]
|
||||||
pub struct IncomingTxHandler {
|
pub struct IncomingTxHandler {
|
||||||
/// A store of txs currently being handled in incoming tx requests.
|
/// A store of txs currently being handled in incoming tx requests.
|
||||||
pub(super) txs_being_handled: TxsBeingHandled,
|
pub(super) txs_being_handled: TxsBeingHandled,
|
||||||
|
|
|
@ -159,7 +159,7 @@ epee_object!(
|
||||||
current_blockchain_height: u64,
|
current_blockchain_height: u64,
|
||||||
);
|
);
|
||||||
|
|
||||||
/// A request for Txs we are missing from our `TxPool`
|
/// A request for txs we are missing from an incoming block.
|
||||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
pub struct FluffyMissingTransactionsRequest {
|
pub struct FluffyMissingTransactionsRequest {
|
||||||
/// The Block we are missing the Txs in
|
/// The Block we are missing the Txs in
|
||||||
|
|
|
@ -116,6 +116,7 @@ pub enum ProtocolResponse {
|
||||||
GetChain(ChainResponse),
|
GetChain(ChainResponse),
|
||||||
NewFluffyBlock(NewFluffyBlock),
|
NewFluffyBlock(NewFluffyBlock),
|
||||||
NewTransactions(NewTransactions),
|
NewTransactions(NewTransactions),
|
||||||
|
FluffyMissingTransactionsRequest(FluffyMissingTransactionsRequest),
|
||||||
NA,
|
NA,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -139,6 +140,9 @@ impl PeerResponse {
|
||||||
ProtocolResponse::GetChain(_) => MessageID::GetChain,
|
ProtocolResponse::GetChain(_) => MessageID::GetChain,
|
||||||
ProtocolResponse::NewFluffyBlock(_) => MessageID::NewBlock,
|
ProtocolResponse::NewFluffyBlock(_) => MessageID::NewBlock,
|
||||||
ProtocolResponse::NewTransactions(_) => MessageID::NewFluffyBlock,
|
ProtocolResponse::NewTransactions(_) => MessageID::NewFluffyBlock,
|
||||||
|
ProtocolResponse::FluffyMissingTransactionsRequest(_) => {
|
||||||
|
MessageID::FluffyMissingTxs
|
||||||
|
}
|
||||||
|
|
||||||
ProtocolResponse::NA => return None,
|
ProtocolResponse::NA => return None,
|
||||||
},
|
},
|
||||||
|
|
|
@ -71,6 +71,9 @@ impl TryFrom<ProtocolResponse> for ProtocolMessage {
|
||||||
ProtocolResponse::NewFluffyBlock(val) => Self::NewFluffyBlock(val),
|
ProtocolResponse::NewFluffyBlock(val) => Self::NewFluffyBlock(val),
|
||||||
ProtocolResponse::GetChain(val) => Self::ChainEntryResponse(val),
|
ProtocolResponse::GetChain(val) => Self::ChainEntryResponse(val),
|
||||||
ProtocolResponse::GetObjects(val) => Self::GetObjectsResponse(val),
|
ProtocolResponse::GetObjects(val) => Self::GetObjectsResponse(val),
|
||||||
|
ProtocolResponse::FluffyMissingTransactionsRequest(val) => {
|
||||||
|
Self::FluffyMissingTransactionsRequest(val)
|
||||||
|
}
|
||||||
ProtocolResponse::NA => return Err(MessageConversionError),
|
ProtocolResponse::NA => return Err(MessageConversionError),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,7 +52,7 @@ pub(crate) const INITIAL_CHAIN_REQUESTS_TO_SEND: usize = 3;
|
||||||
/// The enforced maximum amount of blocks to request in a batch.
|
/// The enforced maximum amount of blocks to request in a batch.
|
||||||
///
|
///
|
||||||
/// Requesting more than this will cause the peer to disconnect and potentially lead to bans.
|
/// Requesting more than this will cause the peer to disconnect and potentially lead to bans.
|
||||||
pub(crate) const MAX_BLOCK_BATCH_LEN: usize = 100;
|
pub const MAX_BLOCK_BATCH_LEN: usize = 100;
|
||||||
|
|
||||||
/// The timeout that the block downloader will use for requests.
|
/// The timeout that the block downloader will use for requests.
|
||||||
pub(crate) const BLOCK_DOWNLOADER_REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
|
pub(crate) const BLOCK_DOWNLOADER_REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
|
||||||
|
@ -61,13 +61,13 @@ pub(crate) const BLOCK_DOWNLOADER_REQUEST_TIMEOUT: Duration = Duration::from_sec
|
||||||
/// be less than.
|
/// be less than.
|
||||||
///
|
///
|
||||||
/// ref: <https://monero-book.cuprate.org/consensus_rules/transactions.html#transaction-size>
|
/// ref: <https://monero-book.cuprate.org/consensus_rules/transactions.html#transaction-size>
|
||||||
pub(crate) const MAX_TRANSACTION_BLOB_SIZE: usize = 1_000_000;
|
pub const MAX_TRANSACTION_BLOB_SIZE: usize = 1_000_000;
|
||||||
|
|
||||||
/// The maximum amount of block IDs allowed in a chain entry response.
|
/// The maximum amount of block IDs allowed in a chain entry response.
|
||||||
///
|
///
|
||||||
/// ref: <https://github.com/monero-project/monero/blob/cc73fe71162d564ffda8e549b79a350bca53c454/src/cryptonote_config.h#L97>
|
/// ref: <https://github.com/monero-project/monero/blob/cc73fe71162d564ffda8e549b79a350bca53c454/src/cryptonote_config.h#L97>
|
||||||
// TODO: link to the protocol book when this section is added.
|
// TODO: link to the protocol book when this section is added.
|
||||||
pub(crate) const MAX_BLOCKS_IDS_IN_CHAIN_ENTRY: usize = 25_000;
|
pub const MAX_BLOCKS_IDS_IN_CHAIN_ENTRY: usize = 25_000;
|
||||||
|
|
||||||
/// The amount of failures downloading a specific batch before we stop attempting to download it.
|
/// The amount of failures downloading a specific batch before we stop attempting to download it.
|
||||||
pub(crate) const MAX_DOWNLOAD_FAILURES: usize = 5;
|
pub(crate) const MAX_DOWNLOAD_FAILURES: usize = 5;
|
||||||
|
|
|
@ -34,6 +34,7 @@ serde = { workspace = true, optional = true }
|
||||||
tower = { workspace = true }
|
tower = { workspace = true }
|
||||||
thread_local = { workspace = true }
|
thread_local = { workspace = true }
|
||||||
rayon = { workspace = true }
|
rayon = { workspace = true }
|
||||||
|
bytes = "1.7.2"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
cuprate-constants = { workspace = true }
|
cuprate-constants = { workspace = true }
|
||||||
|
|
|
@ -2,21 +2,23 @@
|
||||||
|
|
||||||
//---------------------------------------------------------------------------------------------------- Import
|
//---------------------------------------------------------------------------------------------------- Import
|
||||||
use bytemuck::TransparentWrapper;
|
use bytemuck::TransparentWrapper;
|
||||||
|
use bytes::Bytes;
|
||||||
use monero_serai::{
|
use monero_serai::{
|
||||||
block::{Block, BlockHeader},
|
block::{Block, BlockHeader},
|
||||||
transaction::Transaction,
|
transaction::Transaction,
|
||||||
};
|
};
|
||||||
|
|
||||||
use cuprate_database::{
|
use cuprate_database::{
|
||||||
RuntimeError, StorableVec, {DatabaseRo, DatabaseRw},
|
RuntimeError, StorableVec, {DatabaseIter, DatabaseRo, DatabaseRw},
|
||||||
};
|
};
|
||||||
|
use cuprate_helper::cast::usize_to_u64;
|
||||||
use cuprate_helper::{
|
use cuprate_helper::{
|
||||||
map::{combine_low_high_bits_to_u128, split_u128_into_low_high_bits},
|
map::{combine_low_high_bits_to_u128, split_u128_into_low_high_bits},
|
||||||
tx::tx_fee,
|
tx::tx_fee,
|
||||||
};
|
};
|
||||||
use cuprate_types::{
|
use cuprate_types::{
|
||||||
AltBlockInformation, ChainId, ExtendedBlockHeader, HardFork, VerifiedBlockInformation,
|
AltBlockInformation, BlockCompleteEntry, ChainId, ExtendedBlockHeader, HardFork,
|
||||||
VerifiedTransactionInformation,
|
TransactionBlobs, VerifiedBlockInformation, VerifiedTransactionInformation,
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
@ -27,7 +29,7 @@ use crate::{
|
||||||
output::get_rct_num_outputs,
|
output::get_rct_num_outputs,
|
||||||
tx::{add_tx, remove_tx},
|
tx::{add_tx, remove_tx},
|
||||||
},
|
},
|
||||||
tables::{BlockHeights, BlockInfos, Tables, TablesMut},
|
tables::{BlockHeights, BlockInfos, Tables, TablesIter, TablesMut},
|
||||||
types::{BlockHash, BlockHeight, BlockInfo},
|
types::{BlockHash, BlockHeight, BlockInfo},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -224,6 +226,66 @@ pub fn pop_block(
|
||||||
|
|
||||||
Ok((block_height, block_info.block_hash, block))
|
Ok((block_height, block_info.block_hash, block))
|
||||||
}
|
}
|
||||||
|
//---------------------------------------------------------------------------------------------------- `get_block_blob_with_tx_indexes`
|
||||||
|
/// Retrieve a block's raw bytes, the index of the miner transaction and the number of non miner-txs in the block.
|
||||||
|
///
|
||||||
|
#[doc = doc_error!()]
|
||||||
|
pub fn get_block_blob_with_tx_indexes(
|
||||||
|
block_height: &BlockHeight,
|
||||||
|
tables: &impl Tables,
|
||||||
|
) -> Result<(Vec<u8>, u64, usize), RuntimeError> {
|
||||||
|
use monero_serai::io::write_varint;
|
||||||
|
|
||||||
|
let block_info = tables.block_infos().get(block_height)?;
|
||||||
|
|
||||||
|
let miner_tx_idx = block_info.mining_tx_index;
|
||||||
|
let block_txs = tables.block_txs_hashes().get(block_height)?.0;
|
||||||
|
let numb_txs = block_txs.len();
|
||||||
|
|
||||||
|
// Get the block header
|
||||||
|
let mut block = tables.block_header_blobs().get(block_height)?.0;
|
||||||
|
|
||||||
|
// Add the miner tx to the blob.
|
||||||
|
let mut miner_tx_blob = tables.tx_blobs().get(&miner_tx_idx)?.0;
|
||||||
|
block.append(&mut miner_tx_blob);
|
||||||
|
|
||||||
|
// Add the blocks tx hashes.
|
||||||
|
write_varint(&block_txs.len(), &mut block)
|
||||||
|
.expect("The number of txs per block will not exceed u64::MAX");
|
||||||
|
|
||||||
|
let block_txs_bytes = bytemuck::cast_slice(&block_txs);
|
||||||
|
block.extend_from_slice(block_txs_bytes);
|
||||||
|
|
||||||
|
Ok((block, miner_tx_idx, numb_txs))
|
||||||
|
}
|
||||||
|
|
||||||
|
//---------------------------------------------------------------------------------------------------- `get_block_extended_header_*`
|
||||||
|
/// Retrieve a [`BlockCompleteEntry`] from the database.
|
||||||
|
///
|
||||||
|
#[doc = doc_error!()]
|
||||||
|
pub fn get_block_complete_entry(
|
||||||
|
block_hash: &BlockHash,
|
||||||
|
tables: &impl TablesIter,
|
||||||
|
) -> Result<BlockCompleteEntry, RuntimeError> {
|
||||||
|
let block_height = tables.block_heights().get(block_hash)?;
|
||||||
|
let (block_blob, miner_tx_idx, numb_non_miner_txs) =
|
||||||
|
get_block_blob_with_tx_indexes(&block_height, tables)?;
|
||||||
|
|
||||||
|
let first_tx_idx = miner_tx_idx + 1;
|
||||||
|
|
||||||
|
let tx_blobs = tables
|
||||||
|
.tx_blobs_iter()
|
||||||
|
.get_range(first_tx_idx..=usize_to_u64(numb_non_miner_txs))?
|
||||||
|
.map(|tx_blob| Ok(Bytes::from(tx_blob?.0)))
|
||||||
|
.collect::<Result<_, RuntimeError>>()?;
|
||||||
|
|
||||||
|
Ok(BlockCompleteEntry {
|
||||||
|
block: Bytes::from(block_blob),
|
||||||
|
txs: TransactionBlobs::Normal(tx_blobs),
|
||||||
|
pruned: false,
|
||||||
|
block_weight: 0,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
//---------------------------------------------------------------------------------------------------- `get_block_extended_header_*`
|
//---------------------------------------------------------------------------------------------------- `get_block_extended_header_*`
|
||||||
/// Retrieve a [`ExtendedBlockHeader`] from the database.
|
/// Retrieve a [`ExtendedBlockHeader`] from the database.
|
||||||
|
|
|
@ -4,9 +4,9 @@
|
||||||
use cuprate_database::{DatabaseRo, RuntimeError};
|
use cuprate_database::{DatabaseRo, RuntimeError};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
ops::macros::doc_error,
|
ops::{block::block_exists, macros::doc_error},
|
||||||
tables::{BlockHeights, BlockInfos},
|
tables::{BlockHeights, BlockInfos},
|
||||||
types::BlockHeight,
|
types::{BlockHash, BlockHeight},
|
||||||
};
|
};
|
||||||
|
|
||||||
//---------------------------------------------------------------------------------------------------- Free Functions
|
//---------------------------------------------------------------------------------------------------- Free Functions
|
||||||
|
@ -78,6 +78,45 @@ pub fn cumulative_generated_coins(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Find the split point between our chain and a list of [`BlockHash`]s from another chain.
|
||||||
|
///
|
||||||
|
/// This function accepts chains in chronological and reverse chronological order, however
|
||||||
|
/// if the wrong order is specified the return value is meaningless.
|
||||||
|
///
|
||||||
|
/// For chronologically ordered chains this will return the index of the first unknown, for reverse
|
||||||
|
/// chronologically ordered chains this will return the index of the fist known.
|
||||||
|
///
|
||||||
|
/// If all blocks are known for chronologically ordered chains or unknown for reverse chronologically
|
||||||
|
/// ordered chains then the length of the chain will be returned.
|
||||||
|
#[doc = doc_error!()]
|
||||||
|
#[inline]
|
||||||
|
pub fn find_split_point(
|
||||||
|
block_ids: &[BlockHash],
|
||||||
|
chronological_order: bool,
|
||||||
|
table_block_heights: &impl DatabaseRo<BlockHeights>,
|
||||||
|
) -> Result<usize, RuntimeError> {
|
||||||
|
let mut err = None;
|
||||||
|
|
||||||
|
// Do a binary search to find the first unknown/known block in the batch.
|
||||||
|
let idx =
|
||||||
|
block_ids.partition_point(
|
||||||
|
|block_id| match block_exists(block_id, table_block_heights) {
|
||||||
|
Ok(exists) => exists & chronological_order,
|
||||||
|
Err(e) => {
|
||||||
|
err.get_or_insert(e);
|
||||||
|
// if this happens the search is scrapped, just return `false` back.
|
||||||
|
false
|
||||||
|
}
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
if let Some(e) = err {
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(idx)
|
||||||
|
}
|
||||||
|
|
||||||
//---------------------------------------------------------------------------------------------------- Tests
|
//---------------------------------------------------------------------------------------------------- Tests
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
|
|
|
@ -10,23 +10,24 @@
|
||||||
|
|
||||||
//---------------------------------------------------------------------------------------------------- Import
|
//---------------------------------------------------------------------------------------------------- Import
|
||||||
use std::{
|
use std::{
|
||||||
|
cmp::min,
|
||||||
collections::{HashMap, HashSet},
|
collections::{HashMap, HashSet},
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
};
|
};
|
||||||
|
|
||||||
use rayon::{
|
use rayon::{
|
||||||
iter::{IntoParallelIterator, ParallelIterator},
|
iter::{Either, IntoParallelIterator, ParallelIterator},
|
||||||
prelude::*,
|
prelude::*,
|
||||||
ThreadPool,
|
ThreadPool,
|
||||||
};
|
};
|
||||||
use thread_local::ThreadLocal;
|
use thread_local::ThreadLocal;
|
||||||
|
|
||||||
use cuprate_database::{ConcreteEnv, DatabaseRo, Env, EnvInner, RuntimeError};
|
use cuprate_database::{ConcreteEnv, DatabaseIter, DatabaseRo, Env, EnvInner, RuntimeError};
|
||||||
use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads};
|
use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads};
|
||||||
use cuprate_helper::map::combine_low_high_bits_to_u128;
|
use cuprate_helper::map::combine_low_high_bits_to_u128;
|
||||||
use cuprate_types::{
|
use cuprate_types::{
|
||||||
blockchain::{BlockchainReadRequest, BlockchainResponse},
|
blockchain::{BlockchainReadRequest, BlockchainResponse},
|
||||||
Chain, ChainId, ExtendedBlockHeader, OutputHistogramInput, OutputOnChain,
|
Chain, ChainId, ExtendedBlockHeader, MissingTxsInBlock, OutputHistogramInput, OutputOnChain,
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
@ -36,9 +37,10 @@ use crate::{
|
||||||
get_alt_chain_history_ranges,
|
get_alt_chain_history_ranges,
|
||||||
},
|
},
|
||||||
block::{
|
block::{
|
||||||
block_exists, get_block_extended_header_from_height, get_block_height, get_block_info,
|
block_exists, get_block_blob_with_tx_indexes, get_block_complete_entry,
|
||||||
|
get_block_extended_header_from_height, get_block_height, get_block_info,
|
||||||
},
|
},
|
||||||
blockchain::{cumulative_generated_coins, top_block_height},
|
blockchain::{cumulative_generated_coins, find_split_point, top_block_height},
|
||||||
key_image::key_image_exists,
|
key_image::key_image_exists,
|
||||||
output::id_to_output_on_chain,
|
output::id_to_output_on_chain,
|
||||||
},
|
},
|
||||||
|
@ -46,7 +48,7 @@ use crate::{
|
||||||
free::{compact_history_genesis_not_included, compact_history_index_to_height_offset},
|
free::{compact_history_genesis_not_included, compact_history_index_to_height_offset},
|
||||||
types::{BlockchainReadHandle, ResponseResult},
|
types::{BlockchainReadHandle, ResponseResult},
|
||||||
},
|
},
|
||||||
tables::{AltBlockHeights, BlockHeights, BlockInfos, OpenTables, Tables},
|
tables::{AltBlockHeights, BlockHeights, BlockInfos, OpenTables, Tables, TablesIter},
|
||||||
types::{
|
types::{
|
||||||
AltBlockHeight, Amount, AmountIndex, BlockHash, BlockHeight, KeyImage, PreRctOutputId,
|
AltBlockHeight, Amount, AmountIndex, BlockHash, BlockHeight, KeyImage, PreRctOutputId,
|
||||||
},
|
},
|
||||||
|
@ -100,6 +102,7 @@ fn map_request(
|
||||||
/* SOMEDAY: pre-request handling, run some code for each request? */
|
/* SOMEDAY: pre-request handling, run some code for each request? */
|
||||||
|
|
||||||
match request {
|
match request {
|
||||||
|
R::BlockCompleteEntries(block_hashes) => block_complete_entries(env, block_hashes),
|
||||||
R::BlockExtendedHeader(block) => block_extended_header(env, block),
|
R::BlockExtendedHeader(block) => block_extended_header(env, block),
|
||||||
R::BlockHash(block, chain) => block_hash(env, block, chain),
|
R::BlockHash(block, chain) => block_hash(env, block, chain),
|
||||||
R::FindBlock(block_hash) => find_block(env, block_hash),
|
R::FindBlock(block_hash) => find_block(env, block_hash),
|
||||||
|
@ -113,7 +116,12 @@ fn map_request(
|
||||||
R::NumberOutputsWithAmount(vec) => number_outputs_with_amount(env, vec),
|
R::NumberOutputsWithAmount(vec) => number_outputs_with_amount(env, vec),
|
||||||
R::KeyImagesSpent(set) => key_images_spent(env, set),
|
R::KeyImagesSpent(set) => key_images_spent(env, set),
|
||||||
R::CompactChainHistory => compact_chain_history(env),
|
R::CompactChainHistory => compact_chain_history(env),
|
||||||
|
R::NextChainEntry(block_hashes, amount) => next_chain_entry(env, &block_hashes, amount),
|
||||||
R::FindFirstUnknown(block_ids) => find_first_unknown(env, &block_ids),
|
R::FindFirstUnknown(block_ids) => find_first_unknown(env, &block_ids),
|
||||||
|
R::MissingTxsInBlock {
|
||||||
|
block_hash,
|
||||||
|
tx_indexes,
|
||||||
|
} => missing_txs_in_block(env, block_hash, tx_indexes),
|
||||||
R::AltBlocksInChain(chain_id) => alt_blocks_in_chain(env, chain_id),
|
R::AltBlocksInChain(chain_id) => alt_blocks_in_chain(env, chain_id),
|
||||||
R::Block { height } => block(env, height),
|
R::Block { height } => block(env, height),
|
||||||
R::BlockByHash(hash) => block_by_hash(env, hash),
|
R::BlockByHash(hash) => block_by_hash(env, hash),
|
||||||
|
@ -198,6 +206,38 @@ macro_rules! get_tables {
|
||||||
// TODO: The overhead of parallelism may be too much for every request, perfomace test to find optimal
|
// TODO: The overhead of parallelism may be too much for every request, perfomace test to find optimal
|
||||||
// amount of parallelism.
|
// amount of parallelism.
|
||||||
|
|
||||||
|
/// [`BlockchainReadRequest::BlockCompleteEntries`].
|
||||||
|
fn block_complete_entries(env: &ConcreteEnv, block_hashes: Vec<BlockHash>) -> ResponseResult {
|
||||||
|
// Prepare tx/tables in `ThreadLocal`.
|
||||||
|
let env_inner = env.env_inner();
|
||||||
|
let tx_ro = thread_local(env);
|
||||||
|
let tables = thread_local(env);
|
||||||
|
|
||||||
|
let (missing_hashes, blocks) = block_hashes
|
||||||
|
.into_par_iter()
|
||||||
|
.map(|block_hash| {
|
||||||
|
let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
|
||||||
|
let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
|
||||||
|
|
||||||
|
match get_block_complete_entry(&block_hash, tables) {
|
||||||
|
Err(RuntimeError::KeyNotFound) => Ok(Either::Left(block_hash)),
|
||||||
|
res => res.map(Either::Right),
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect::<Result<_, _>>()?;
|
||||||
|
|
||||||
|
let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
|
||||||
|
let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
|
||||||
|
|
||||||
|
let blockchain_height = crate::ops::blockchain::chain_height(tables.block_heights())?;
|
||||||
|
|
||||||
|
Ok(BlockchainResponse::BlockCompleteEntries {
|
||||||
|
blocks,
|
||||||
|
missing_hashes,
|
||||||
|
blockchain_height,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
/// [`BlockchainReadRequest::BlockExtendedHeader`].
|
/// [`BlockchainReadRequest::BlockExtendedHeader`].
|
||||||
#[inline]
|
#[inline]
|
||||||
fn block_extended_header(env: &ConcreteEnv, block_height: BlockHeight) -> ResponseResult {
|
fn block_extended_header(env: &ConcreteEnv, block_height: BlockHeight) -> ResponseResult {
|
||||||
|
@ -534,6 +574,76 @@ fn compact_chain_history(env: &ConcreteEnv) -> ResponseResult {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// [`BlockchainReadRequest::NextChainEntry`]
|
||||||
|
///
|
||||||
|
/// # Invariant
|
||||||
|
/// `block_ids` must be sorted in reverse chronological block order, or else
|
||||||
|
/// the returned result is unspecified and meaningless, as this function
|
||||||
|
/// performs a binary search.
|
||||||
|
fn next_chain_entry(
|
||||||
|
env: &ConcreteEnv,
|
||||||
|
block_ids: &[BlockHash],
|
||||||
|
next_entry_size: usize,
|
||||||
|
) -> ResponseResult {
|
||||||
|
// Single-threaded, no `ThreadLocal` required.
|
||||||
|
let env_inner = env.env_inner();
|
||||||
|
let tx_ro = env_inner.tx_ro()?;
|
||||||
|
|
||||||
|
let tables = env_inner.open_tables(&tx_ro)?;
|
||||||
|
let table_block_heights = tables.block_heights();
|
||||||
|
let table_block_infos = tables.block_infos_iter();
|
||||||
|
|
||||||
|
let idx = find_split_point(block_ids, false, table_block_heights)?;
|
||||||
|
|
||||||
|
// This will happen if we have a different genesis block.
|
||||||
|
if idx == block_ids.len() {
|
||||||
|
return Ok(BlockchainResponse::NextChainEntry {
|
||||||
|
start_height: 0,
|
||||||
|
chain_height: 0,
|
||||||
|
block_ids: vec![],
|
||||||
|
block_weights: vec![],
|
||||||
|
cumulative_difficulty: 0,
|
||||||
|
first_block_blob: None,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// The returned chain entry must overlap with one of the blocks we were told about.
|
||||||
|
let first_known_block_hash = block_ids[idx];
|
||||||
|
let first_known_height = table_block_heights.get(&first_known_block_hash)?;
|
||||||
|
|
||||||
|
let chain_height = crate::ops::blockchain::chain_height(table_block_heights)?;
|
||||||
|
let last_height_in_chain_entry = min(first_known_height + next_entry_size, chain_height);
|
||||||
|
|
||||||
|
let (block_ids, block_weights) = table_block_infos
|
||||||
|
.get_range(first_known_height..last_height_in_chain_entry)?
|
||||||
|
.map(|block_info| {
|
||||||
|
let block_info = block_info?;
|
||||||
|
|
||||||
|
Ok((block_info.block_hash, block_info.weight))
|
||||||
|
})
|
||||||
|
.collect::<Result<(Vec<_>, Vec<_>), RuntimeError>>()?;
|
||||||
|
|
||||||
|
let top_block_info = table_block_infos.get(&(chain_height - 1))?;
|
||||||
|
|
||||||
|
let first_block_blob = if block_ids.len() >= 2 {
|
||||||
|
Some(get_block_blob_with_tx_indexes(&(first_known_height + 1), &tables)?.0)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(BlockchainResponse::NextChainEntry {
|
||||||
|
start_height: first_known_height,
|
||||||
|
chain_height,
|
||||||
|
block_ids,
|
||||||
|
block_weights,
|
||||||
|
cumulative_difficulty: combine_low_high_bits_to_u128(
|
||||||
|
top_block_info.cumulative_difficulty_low,
|
||||||
|
top_block_info.cumulative_difficulty_high,
|
||||||
|
),
|
||||||
|
first_block_blob,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
/// [`BlockchainReadRequest::FindFirstUnknown`]
|
/// [`BlockchainReadRequest::FindFirstUnknown`]
|
||||||
///
|
///
|
||||||
/// # Invariant
|
/// # Invariant
|
||||||
|
@ -546,24 +656,7 @@ fn find_first_unknown(env: &ConcreteEnv, block_ids: &[BlockHash]) -> ResponseRes
|
||||||
|
|
||||||
let table_block_heights = env_inner.open_db_ro::<BlockHeights>(&tx_ro)?;
|
let table_block_heights = env_inner.open_db_ro::<BlockHeights>(&tx_ro)?;
|
||||||
|
|
||||||
let mut err = None;
|
let idx = find_split_point(block_ids, true, &table_block_heights)?;
|
||||||
|
|
||||||
// Do a binary search to find the first unknown block in the batch.
|
|
||||||
let idx =
|
|
||||||
block_ids.partition_point(
|
|
||||||
|block_id| match block_exists(block_id, &table_block_heights) {
|
|
||||||
Ok(exists) => exists,
|
|
||||||
Err(e) => {
|
|
||||||
err.get_or_insert(e);
|
|
||||||
// if this happens the search is scrapped, just return `false` back.
|
|
||||||
false
|
|
||||||
}
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
if let Some(e) = err {
|
|
||||||
return Err(e);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(if idx == block_ids.len() {
|
Ok(if idx == block_ids.len() {
|
||||||
BlockchainResponse::FindFirstUnknown(None)
|
BlockchainResponse::FindFirstUnknown(None)
|
||||||
|
@ -576,6 +669,36 @@ fn find_first_unknown(env: &ConcreteEnv, block_ids: &[BlockHash]) -> ResponseRes
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// [`BlockchainReadRequest::MissingTxsInBlock`]
|
||||||
|
fn missing_txs_in_block(
|
||||||
|
env: &ConcreteEnv,
|
||||||
|
block_hash: [u8; 32],
|
||||||
|
missing_txs: Vec<u64>,
|
||||||
|
) -> ResponseResult {
|
||||||
|
// Single-threaded, no `ThreadLocal` required.
|
||||||
|
let env_inner = env.env_inner();
|
||||||
|
let tx_ro = env_inner.tx_ro()?;
|
||||||
|
let tables = env_inner.open_tables(&tx_ro)?;
|
||||||
|
|
||||||
|
let block_height = tables.block_heights().get(&block_hash)?;
|
||||||
|
|
||||||
|
let (block, miner_tx_index, numb_txs) = get_block_blob_with_tx_indexes(&block_height, &tables)?;
|
||||||
|
let first_tx_index = miner_tx_index + 1;
|
||||||
|
|
||||||
|
if numb_txs < missing_txs.len() {
|
||||||
|
return Ok(BlockchainResponse::MissingTxsInBlock(None));
|
||||||
|
}
|
||||||
|
|
||||||
|
let txs = missing_txs
|
||||||
|
.into_iter()
|
||||||
|
.map(|index_offset| Ok(tables.tx_blobs().get(&(first_tx_index + index_offset))?.0))
|
||||||
|
.collect::<Result<_, RuntimeError>>()?;
|
||||||
|
|
||||||
|
Ok(BlockchainResponse::MissingTxsInBlock(Some(
|
||||||
|
MissingTxsInBlock { block, txs },
|
||||||
|
)))
|
||||||
|
}
|
||||||
|
|
||||||
/// [`BlockchainReadRequest::AltBlocksInChain`]
|
/// [`BlockchainReadRequest::AltBlocksInChain`]
|
||||||
fn alt_blocks_in_chain(env: &ConcreteEnv, chain_id: ChainId) -> ResponseResult {
|
fn alt_blocks_in_chain(env: &ConcreteEnv, chain_id: ChainId) -> ResponseResult {
|
||||||
// Prepare tx/tables in `ThreadLocal`.
|
// Prepare tx/tables in `ThreadLocal`.
|
||||||
|
|
|
@ -11,9 +11,11 @@ use std::{
|
||||||
use monero_serai::block::Block;
|
use monero_serai::block::Block;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
types::{Chain, ExtendedBlockHeader, OutputOnChain, VerifiedBlockInformation},
|
types::{
|
||||||
AltBlockInformation, ChainId, ChainInfo, CoinbaseTxSum, OutputHistogramEntry,
|
Chain, ExtendedBlockHeader, MissingTxsInBlock, OutputOnChain, VerifiedBlockInformation,
|
||||||
OutputHistogramInput,
|
},
|
||||||
|
AltBlockInformation, BlockCompleteEntry, ChainId, ChainInfo, CoinbaseTxSum,
|
||||||
|
OutputHistogramEntry, OutputHistogramInput,
|
||||||
};
|
};
|
||||||
|
|
||||||
//---------------------------------------------------------------------------------------------------- ReadRequest
|
//---------------------------------------------------------------------------------------------------- ReadRequest
|
||||||
|
@ -27,6 +29,11 @@ use crate::{
|
||||||
/// See `Response` for the expected responses per `Request`.
|
/// See `Response` for the expected responses per `Request`.
|
||||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
pub enum BlockchainReadRequest {
|
pub enum BlockchainReadRequest {
|
||||||
|
/// Request [`BlockCompleteEntry`]s.
|
||||||
|
///
|
||||||
|
/// The input is the block hashes.
|
||||||
|
BlockCompleteEntries(Vec<[u8; 32]>),
|
||||||
|
|
||||||
/// Request a block's extended header.
|
/// Request a block's extended header.
|
||||||
///
|
///
|
||||||
/// The input is the block's height.
|
/// The input is the block's height.
|
||||||
|
@ -96,6 +103,16 @@ pub enum BlockchainReadRequest {
|
||||||
/// A request for the compact chain history.
|
/// A request for the compact chain history.
|
||||||
CompactChainHistory,
|
CompactChainHistory,
|
||||||
|
|
||||||
|
/// A request for the next chain entry.
|
||||||
|
///
|
||||||
|
/// Input is a list of block hashes and the amount of block hashes to return in the next chain entry.
|
||||||
|
///
|
||||||
|
/// # Invariant
|
||||||
|
/// The [`Vec`] containing the block IDs must be sorted in reverse chronological block
|
||||||
|
/// order, or else the returned response is unspecified and meaningless,
|
||||||
|
/// as this request performs a binary search
|
||||||
|
NextChainEntry(Vec<[u8; 32]>, usize),
|
||||||
|
|
||||||
/// A request to find the first unknown block ID in a list of block IDs.
|
/// A request to find the first unknown block ID in a list of block IDs.
|
||||||
///
|
///
|
||||||
/// # Invariant
|
/// # Invariant
|
||||||
|
@ -104,6 +121,16 @@ pub enum BlockchainReadRequest {
|
||||||
/// as this request performs a binary search.
|
/// as this request performs a binary search.
|
||||||
FindFirstUnknown(Vec<[u8; 32]>),
|
FindFirstUnknown(Vec<[u8; 32]>),
|
||||||
|
|
||||||
|
/// A request for transactions from a specific block.
|
||||||
|
MissingTxsInBlock {
|
||||||
|
/// The block to get transactions from.
|
||||||
|
block_hash: [u8; 32],
|
||||||
|
/// The indexes of the transactions from the block.
|
||||||
|
/// This is not the global index of the txs, instead it is the local index as they appear in
|
||||||
|
/// the block.
|
||||||
|
tx_indexes: Vec<u64>,
|
||||||
|
},
|
||||||
|
|
||||||
/// A request for all alt blocks in the chain with the given [`ChainId`].
|
/// A request for all alt blocks in the chain with the given [`ChainId`].
|
||||||
AltBlocksInChain(ChainId),
|
AltBlocksInChain(ChainId),
|
||||||
|
|
||||||
|
@ -182,6 +209,16 @@ pub enum BlockchainWriteRequest {
|
||||||
#[expect(clippy::large_enum_variant)]
|
#[expect(clippy::large_enum_variant)]
|
||||||
pub enum BlockchainResponse {
|
pub enum BlockchainResponse {
|
||||||
//------------------------------------------------------ Reads
|
//------------------------------------------------------ Reads
|
||||||
|
/// Response to [`BlockchainReadRequest::BlockCompleteEntries`].
|
||||||
|
BlockCompleteEntries {
|
||||||
|
/// The [`BlockCompleteEntry`]s that we had.
|
||||||
|
blocks: Vec<BlockCompleteEntry>,
|
||||||
|
/// The hashes of blocks that were requested, but we don't have.
|
||||||
|
missing_hashes: Vec<[u8; 32]>,
|
||||||
|
/// Our blockchain height.
|
||||||
|
blockchain_height: usize,
|
||||||
|
},
|
||||||
|
|
||||||
/// Response to [`BlockchainReadRequest::BlockExtendedHeader`].
|
/// Response to [`BlockchainReadRequest::BlockExtendedHeader`].
|
||||||
///
|
///
|
||||||
/// Inner value is the extended headed of the requested block.
|
/// Inner value is the extended headed of the requested block.
|
||||||
|
@ -248,6 +285,24 @@ pub enum BlockchainResponse {
|
||||||
cumulative_difficulty: u128,
|
cumulative_difficulty: u128,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
/// Response to [`BlockchainReadRequest::NextChainEntry`].
|
||||||
|
///
|
||||||
|
/// If all blocks were unknown `start_height` will be `0`, the other fields will be meaningless.
|
||||||
|
NextChainEntry {
|
||||||
|
/// The start height of this entry, `0` if we could not find the split point.
|
||||||
|
start_height: usize,
|
||||||
|
/// The current chain height.
|
||||||
|
chain_height: usize,
|
||||||
|
/// The next block hashes in the entry.
|
||||||
|
block_ids: Vec<[u8; 32]>,
|
||||||
|
/// The block weights of the next blocks.
|
||||||
|
block_weights: Vec<usize>,
|
||||||
|
/// The current cumulative difficulty of our chain.
|
||||||
|
cumulative_difficulty: u128,
|
||||||
|
/// The block blob of the 2nd block in `block_ids`, if there is one.
|
||||||
|
first_block_blob: Option<Vec<u8>>,
|
||||||
|
},
|
||||||
|
|
||||||
/// Response to [`BlockchainReadRequest::FindFirstUnknown`].
|
/// Response to [`BlockchainReadRequest::FindFirstUnknown`].
|
||||||
///
|
///
|
||||||
/// Contains the index of the first unknown block and its expected height.
|
/// Contains the index of the first unknown block and its expected height.
|
||||||
|
@ -255,7 +310,12 @@ pub enum BlockchainResponse {
|
||||||
/// This will be [`None`] if all blocks were known.
|
/// This will be [`None`] if all blocks were known.
|
||||||
FindFirstUnknown(Option<(usize, usize)>),
|
FindFirstUnknown(Option<(usize, usize)>),
|
||||||
|
|
||||||
/// Response to [`BlockchainReadRequest::AltBlocksInChain`].
|
/// The response for [`BlockchainReadRequest::MissingTxsInBlock`].
|
||||||
|
///
|
||||||
|
/// Will return [`None`] if the request contained an index out of range.
|
||||||
|
MissingTxsInBlock(Option<MissingTxsInBlock>),
|
||||||
|
|
||||||
|
/// The response for [`BlockchainReadRequest::AltBlocksInChain`].
|
||||||
///
|
///
|
||||||
/// Contains all the alt blocks in the alt-chain in chronological order.
|
/// Contains all the alt blocks in the alt-chain in chronological order.
|
||||||
AltBlocksInChain(Vec<AltBlockInformation>),
|
AltBlocksInChain(Vec<AltBlockInformation>),
|
||||||
|
|
|
@ -26,8 +26,8 @@ pub use transaction_verification_data::{
|
||||||
pub use types::{
|
pub use types::{
|
||||||
AddAuxPow, AltBlockInformation, AuxPow, Chain, ChainId, ChainInfo, CoinbaseTxSum,
|
AddAuxPow, AltBlockInformation, AuxPow, Chain, ChainId, ChainInfo, CoinbaseTxSum,
|
||||||
ExtendedBlockHeader, FeeEstimate, HardForkInfo, MinerData, MinerDataTxBacklogEntry,
|
ExtendedBlockHeader, FeeEstimate, HardForkInfo, MinerData, MinerDataTxBacklogEntry,
|
||||||
OutputHistogramEntry, OutputHistogramInput, OutputOnChain, VerifiedBlockInformation,
|
MissingTxsInBlock, OutputHistogramEntry, OutputHistogramInput, OutputOnChain,
|
||||||
VerifiedTransactionInformation,
|
VerifiedBlockInformation, VerifiedTransactionInformation,
|
||||||
};
|
};
|
||||||
|
|
||||||
//---------------------------------------------------------------------------------------------------- Feature-gated
|
//---------------------------------------------------------------------------------------------------- Feature-gated
|
||||||
|
|
|
@ -259,6 +259,13 @@ pub struct AddAuxPow {
|
||||||
pub aux_pow: Vec<AuxPow>,
|
pub aux_pow: Vec<AuxPow>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The inner response for a request for missing txs.
|
||||||
|
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||||
|
pub struct MissingTxsInBlock {
|
||||||
|
pub block: Vec<u8>,
|
||||||
|
pub txs: Vec<Vec<u8>>,
|
||||||
|
}
|
||||||
|
|
||||||
//---------------------------------------------------------------------------------------------------- Tests
|
//---------------------------------------------------------------------------------------------------- Tests
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
|
|
Loading…
Reference in a new issue