add fluffy_missing_txs handler

This commit is contained in:
Boog900 2024-10-04 21:45:40 +01:00
parent ec77a91241
commit aa274ebafe
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
7 changed files with 132 additions and 20 deletions

1
Cargo.lock generated
View file

@ -548,6 +548,7 @@ version = "0.0.0"
dependencies = [
"bitflags 2.6.0",
"bytemuck",
"bytes",
"cuprate-constants",
"cuprate-database",
"cuprate-database-service",

View file

@ -1,10 +1,10 @@
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::FutureExt;
use std::{
future::{ready, Ready},
task::{Context, Poll},
};
use futures::future::BoxFuture;
use futures::FutureExt;
use tower::{Service, ServiceExt};
use cuprate_blockchain::service::BlockchainReadHandle;
@ -15,7 +15,11 @@ use cuprate_helper::map::{combine_low_high_bits_to_u128, split_u128_into_low_hig
use cuprate_p2p::constants::MAX_BLOCK_BATCH_LEN;
use cuprate_p2p_core::{client::PeerInformation, NetworkZone, ProtocolRequest, ProtocolResponse};
use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse};
use cuprate_wire::protocol::{ChainRequest, ChainResponse, GetObjectsRequest, GetObjectsResponse};
use cuprate_types::{BlockCompleteEntry, MissingTxsInBlock, TransactionBlobs};
use cuprate_wire::protocol::{
ChainRequest, ChainResponse, FluffyMissingTransactionsRequest, GetObjectsRequest,
GetObjectsResponse, NewFluffyBlock,
};
#[derive(Clone)]
pub struct P2pProtocolRequestHandlerMaker {
@ -63,12 +67,20 @@ impl<Z: NetworkZone> Service<ProtocolRequest> for P2pProtocolRequestHandler<Z> {
ProtocolRequest::GetObjects(r) => {
get_objects(r, self.blockchain_read_handle.clone()).boxed()
}
ProtocolRequest::GetChain(_) => todo!(),
ProtocolRequest::FluffyMissingTxs(_) => todo!(),
ProtocolRequest::GetTxPoolCompliment(_) => todo!(),
ProtocolRequest::NewBlock(_) => todo!(),
ProtocolRequest::GetChain(r) => {
get_chain(r, self.blockchain_read_handle.clone()).boxed()
}
ProtocolRequest::FluffyMissingTxs(r) => {
fluffy_missing_txs(r, self.blockchain_read_handle.clone()).boxed()
}
ProtocolRequest::NewBlock(_) => ready(Err(anyhow::anyhow!(
"Peer sent a full block when we support fluffy blocks"
)))
.boxed(),
ProtocolRequest::NewFluffyBlock(_) => todo!(),
ProtocolRequest::NewTransactions(_) => todo!(),
ProtocolRequest::GetTxPoolCompliment(_) | ProtocolRequest::NewTransactions(_) => {
ready(Ok(ProtocolResponse::NA)).boxed()
} // TODO: tx-pool
}
}
}
@ -138,6 +150,10 @@ async fn get_chain(
panic!("blockchain returned wrong response!");
};
if start_height == 0 {
anyhow::bail!("The peers chain has a different genesis block than ours.");
}
let (cumulative_difficulty_low64, cumulative_difficulty_top64) =
split_u128_into_low_high_bits(cumulative_difficulty);
@ -147,7 +163,7 @@ async fn get_chain(
cumulative_difficulty_low64,
cumulative_difficulty_top64,
m_block_ids: ByteArrayVec::from(block_ids),
first_block: Default::default(),
first_block: first_block_blob.map_or(Bytes::new(), Bytes::from),
// only needed when
m_block_weights: if want_pruned_data {
block_weights.into_iter().map(usize_to_u64).collect()
@ -156,3 +172,43 @@ async fn get_chain(
},
}))
}
/// [`ProtocolRequest::FluffyMissingTxs`]
async fn fluffy_missing_txs(
mut request: FluffyMissingTransactionsRequest,
mut blockchain_read_handle: BlockchainReadHandle,
) -> anyhow::Result<ProtocolResponse> {
let tx_indexes = std::mem::take(&mut request.missing_tx_indices);
let block_hash: [u8; 32] = *request.block_hash;
let current_blockchain_height = request.current_blockchain_height;
// de-allocate the backing `Bytes`.
drop(request);
let BlockchainResponse::MissingTxsInBlock(res) = blockchain_read_handle
.ready()
.await?
.call(BlockchainReadRequest::MissingTxsInBlock {
block_hash,
tx_indexes,
})
.await?
else {
panic!("blockchain returned wrong response!");
};
let Some(MissingTxsInBlock { block, txs }) = res else {
anyhow::bail!("The peer requested txs out of range.");
};
Ok(ProtocolResponse::NewFluffyBlock(NewFluffyBlock {
b: BlockCompleteEntry {
block: Bytes::from(block),
txs: TransactionBlobs::Normal(txs.into_iter().map(Bytes::from).collect()),
pruned: false,
// only needed for pruned blocks.
block_weight: 0,
},
current_blockchain_height,
}))
}

View file

@ -159,7 +159,7 @@ epee_object!(
current_blockchain_height: u64,
);
/// A request for Txs we are missing from our `TxPool`
/// A request for txs we are missing from an incoming block.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FluffyMissingTransactionsRequest {
/// The Block we are missing the Txs in

View file

@ -14,14 +14,6 @@ use rayon::{
};
use thread_local::ThreadLocal;
use cuprate_database::{ConcreteEnv, DatabaseIter, DatabaseRo, Env, EnvInner, RuntimeError};
use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads};
use cuprate_helper::map::combine_low_high_bits_to_u128;
use cuprate_types::{
blockchain::{BlockchainReadRequest, BlockchainResponse},
Chain, ChainId, ExtendedBlockHeader, OutputOnChain,
};
use crate::{
ops::{
alt_block::{
@ -45,6 +37,13 @@ use crate::{
AltBlockHeight, Amount, AmountIndex, BlockHash, BlockHeight, KeyImage, PreRctOutputId,
},
};
use cuprate_database::{ConcreteEnv, DatabaseIter, DatabaseRo, Env, EnvInner, RuntimeError};
use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads};
use cuprate_helper::map::combine_low_high_bits_to_u128;
use cuprate_types::{
blockchain::{BlockchainReadRequest, BlockchainResponse},
Chain, ChainId, ExtendedBlockHeader, MissingTxsInBlock, OutputOnChain,
};
//---------------------------------------------------------------------------------------------------- init_read_service
/// Initialize the [`BlockchainReadHandle`] thread-pool backed by [`rayon`].
@ -110,6 +109,10 @@ fn map_request(
R::CompactChainHistory => compact_chain_history(env),
R::NextChainEntry(block_hashes, amount) => next_chain_entry(env, &block_hashes, amount),
R::FindFirstUnknown(block_ids) => find_first_unknown(env, &block_ids),
R::MissingTxsInBlock {
block_hash,
tx_indexes,
} => missing_txs_in_block(env, block_hash, tx_indexes),
R::AltBlocksInChain(chain_id) => alt_blocks_in_chain(env, chain_id),
}
@ -649,6 +652,36 @@ fn find_first_unknown(env: &ConcreteEnv, block_ids: &[BlockHash]) -> ResponseRes
})
}
/// [`BlockchainReadRequest::MissingTxsInBlock`]
fn missing_txs_in_block(
env: &ConcreteEnv,
block_hash: [u8; 32],
missing_txs: Vec<u64>,
) -> ResponseResult {
// Single-threaded, no `ThreadLocal` required.
let env_inner = env.env_inner();
let tx_ro = env_inner.tx_ro()?;
let tables = env_inner.open_tables(&tx_ro)?;
let block_height = tables.block_heights().get(&block_hash)?;
let (block, miner_tx_index, numb_txs) = get_block_blob_with_tx_indexes(&block_height, &tables)?;
let first_tx_index = miner_tx_index + 1;
if numb_txs < missing_txs.len() {
return Ok(BlockchainResponse::MissingTxsInBlock(None));
}
let txs = missing_txs
.into_iter()
.map(|index_offset| Ok(tables.tx_blobs().get(&(first_tx_index + index_offset))?.0))
.collect::<Result<_, RuntimeError>>()?;
Ok(BlockchainResponse::MissingTxsInBlock(Some(
MissingTxsInBlock { block, txs },
)))
}
/// [`BlockchainReadRequest::AltBlocksInChain`]
fn alt_blocks_in_chain(env: &ConcreteEnv, chain_id: ChainId) -> ResponseResult {
// Prepare tx/tables in `ThreadLocal`.

View file

@ -8,6 +8,7 @@ use std::{
ops::Range,
};
use crate::types::MissingTxsInBlock;
use crate::{
types::{Chain, ExtendedBlockHeader, OutputOnChain, VerifiedBlockInformation},
AltBlockInformation, BlockCompleteEntry, ChainId,
@ -113,6 +114,16 @@ pub enum BlockchainReadRequest {
/// as this request performs a binary search.
FindFirstUnknown(Vec<[u8; 32]>),
/// A request for transactions from a specific block.
MissingTxsInBlock {
/// The block to get transactions from.
block_hash: [u8; 32],
/// The indexes of the transactions from the block.
/// This is not the global index of the txs, instead it is the local index as they appear in
/// the block/
tx_indexes: Vec<u64>,
},
/// A request for all alt blocks in the chain with the given [`ChainId`].
AltBlocksInChain(ChainId),
}
@ -252,6 +263,11 @@ pub enum BlockchainResponse {
/// This will be [`None`] if all blocks were known.
FindFirstUnknown(Option<(usize, usize)>),
/// The response for [`BlockchainReadRequest::MissingTxsInBlock`].
///
/// Will return [`None`] if the request contained an index out of range.
MissingTxsInBlock(Option<MissingTxsInBlock>),
/// The response for [`BlockchainReadRequest::AltBlocksInChain`].
///
/// Contains all the alt blocks in the alt-chain in chronological order.

View file

@ -20,7 +20,7 @@ pub use transaction_verification_data::{
CachedVerificationState, TransactionVerificationData, TxVersion,
};
pub use types::{
AltBlockInformation, Chain, ChainId, ExtendedBlockHeader, OutputOnChain,
AltBlockInformation, Chain, ChainId, ExtendedBlockHeader, MissingTxsInBlock, OutputOnChain,
VerifiedBlockInformation, VerifiedTransactionInformation,
};

View file

@ -155,6 +155,12 @@ pub struct OutputOnChain {
pub commitment: EdwardsPoint,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct MissingTxsInBlock {
pub block: Vec<u8>,
pub txs: Vec<Vec<u8>>,
}
//---------------------------------------------------------------------------------------------------- Tests
#[cfg(test)]
mod test {