diff --git a/Cargo.lock b/Cargo.lock index 6dcaba3..e55debe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -548,6 +548,7 @@ version = "0.0.0" dependencies = [ "bitflags 2.6.0", "bytemuck", + "bytes", "cuprate-constants", "cuprate-database", "cuprate-database-service", diff --git a/binaries/cuprated/src/p2p/request_handler.rs b/binaries/cuprated/src/p2p/request_handler.rs index a109ae1..3b94770 100644 --- a/binaries/cuprated/src/p2p/request_handler.rs +++ b/binaries/cuprated/src/p2p/request_handler.rs @@ -1,10 +1,10 @@ +use bytes::Bytes; +use futures::future::BoxFuture; +use futures::FutureExt; use std::{ future::{ready, Ready}, task::{Context, Poll}, }; - -use futures::future::BoxFuture; -use futures::FutureExt; use tower::{Service, ServiceExt}; use cuprate_blockchain::service::BlockchainReadHandle; @@ -15,7 +15,11 @@ use cuprate_helper::map::{combine_low_high_bits_to_u128, split_u128_into_low_hig use cuprate_p2p::constants::MAX_BLOCK_BATCH_LEN; use cuprate_p2p_core::{client::PeerInformation, NetworkZone, ProtocolRequest, ProtocolResponse}; use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse}; -use cuprate_wire::protocol::{ChainRequest, ChainResponse, GetObjectsRequest, GetObjectsResponse}; +use cuprate_types::{BlockCompleteEntry, MissingTxsInBlock, TransactionBlobs}; +use cuprate_wire::protocol::{ + ChainRequest, ChainResponse, FluffyMissingTransactionsRequest, GetObjectsRequest, + GetObjectsResponse, NewFluffyBlock, +}; #[derive(Clone)] pub struct P2pProtocolRequestHandlerMaker { @@ -63,12 +67,20 @@ impl Service for P2pProtocolRequestHandler { ProtocolRequest::GetObjects(r) => { get_objects(r, self.blockchain_read_handle.clone()).boxed() } - ProtocolRequest::GetChain(_) => todo!(), - ProtocolRequest::FluffyMissingTxs(_) => todo!(), - ProtocolRequest::GetTxPoolCompliment(_) => todo!(), - ProtocolRequest::NewBlock(_) => todo!(), + ProtocolRequest::GetChain(r) => { + get_chain(r, self.blockchain_read_handle.clone()).boxed() + } + ProtocolRequest::FluffyMissingTxs(r) => { + fluffy_missing_txs(r, self.blockchain_read_handle.clone()).boxed() + } + ProtocolRequest::NewBlock(_) => ready(Err(anyhow::anyhow!( + "Peer sent a full block when we support fluffy blocks" + ))) + .boxed(), ProtocolRequest::NewFluffyBlock(_) => todo!(), - ProtocolRequest::NewTransactions(_) => todo!(), + ProtocolRequest::GetTxPoolCompliment(_) | ProtocolRequest::NewTransactions(_) => { + ready(Ok(ProtocolResponse::NA)).boxed() + } // TODO: tx-pool } } } @@ -138,6 +150,10 @@ async fn get_chain( panic!("blockchain returned wrong response!"); }; + if start_height == 0 { + anyhow::bail!("The peers chain has a different genesis block than ours."); + } + let (cumulative_difficulty_low64, cumulative_difficulty_top64) = split_u128_into_low_high_bits(cumulative_difficulty); @@ -147,7 +163,7 @@ async fn get_chain( cumulative_difficulty_low64, cumulative_difficulty_top64, m_block_ids: ByteArrayVec::from(block_ids), - first_block: Default::default(), + first_block: first_block_blob.map_or(Bytes::new(), Bytes::from), // only needed when m_block_weights: if want_pruned_data { block_weights.into_iter().map(usize_to_u64).collect() @@ -156,3 +172,43 @@ async fn get_chain( }, })) } + +/// [`ProtocolRequest::FluffyMissingTxs`] +async fn fluffy_missing_txs( + mut request: FluffyMissingTransactionsRequest, + mut blockchain_read_handle: BlockchainReadHandle, +) -> anyhow::Result { + let tx_indexes = std::mem::take(&mut request.missing_tx_indices); + let block_hash: [u8; 32] = *request.block_hash; + let current_blockchain_height = request.current_blockchain_height; + + // de-allocate the backing `Bytes`. + drop(request); + + let BlockchainResponse::MissingTxsInBlock(res) = blockchain_read_handle + .ready() + .await? + .call(BlockchainReadRequest::MissingTxsInBlock { + block_hash, + tx_indexes, + }) + .await? + else { + panic!("blockchain returned wrong response!"); + }; + + let Some(MissingTxsInBlock { block, txs }) = res else { + anyhow::bail!("The peer requested txs out of range."); + }; + + Ok(ProtocolResponse::NewFluffyBlock(NewFluffyBlock { + b: BlockCompleteEntry { + block: Bytes::from(block), + txs: TransactionBlobs::Normal(txs.into_iter().map(Bytes::from).collect()), + pruned: false, + // only needed for pruned blocks. + block_weight: 0, + }, + current_blockchain_height, + })) +} diff --git a/net/wire/src/p2p/protocol.rs b/net/wire/src/p2p/protocol.rs index 1d1d45a..cc4b49d 100644 --- a/net/wire/src/p2p/protocol.rs +++ b/net/wire/src/p2p/protocol.rs @@ -159,7 +159,7 @@ epee_object!( current_blockchain_height: u64, ); -/// A request for Txs we are missing from our `TxPool` +/// A request for txs we are missing from an incoming block. #[derive(Debug, Clone, PartialEq, Eq)] pub struct FluffyMissingTransactionsRequest { /// The Block we are missing the Txs in diff --git a/storage/blockchain/src/service/read.rs b/storage/blockchain/src/service/read.rs index 10bafc2..7edf789 100644 --- a/storage/blockchain/src/service/read.rs +++ b/storage/blockchain/src/service/read.rs @@ -14,14 +14,6 @@ use rayon::{ }; use thread_local::ThreadLocal; -use cuprate_database::{ConcreteEnv, DatabaseIter, DatabaseRo, Env, EnvInner, RuntimeError}; -use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads}; -use cuprate_helper::map::combine_low_high_bits_to_u128; -use cuprate_types::{ - blockchain::{BlockchainReadRequest, BlockchainResponse}, - Chain, ChainId, ExtendedBlockHeader, OutputOnChain, -}; - use crate::{ ops::{ alt_block::{ @@ -45,6 +37,13 @@ use crate::{ AltBlockHeight, Amount, AmountIndex, BlockHash, BlockHeight, KeyImage, PreRctOutputId, }, }; +use cuprate_database::{ConcreteEnv, DatabaseIter, DatabaseRo, Env, EnvInner, RuntimeError}; +use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads}; +use cuprate_helper::map::combine_low_high_bits_to_u128; +use cuprate_types::{ + blockchain::{BlockchainReadRequest, BlockchainResponse}, + Chain, ChainId, ExtendedBlockHeader, MissingTxsInBlock, OutputOnChain, +}; //---------------------------------------------------------------------------------------------------- init_read_service /// Initialize the [`BlockchainReadHandle`] thread-pool backed by [`rayon`]. @@ -110,6 +109,10 @@ fn map_request( R::CompactChainHistory => compact_chain_history(env), R::NextChainEntry(block_hashes, amount) => next_chain_entry(env, &block_hashes, amount), R::FindFirstUnknown(block_ids) => find_first_unknown(env, &block_ids), + R::MissingTxsInBlock { + block_hash, + tx_indexes, + } => missing_txs_in_block(env, block_hash, tx_indexes), R::AltBlocksInChain(chain_id) => alt_blocks_in_chain(env, chain_id), } @@ -649,6 +652,36 @@ fn find_first_unknown(env: &ConcreteEnv, block_ids: &[BlockHash]) -> ResponseRes }) } +/// [`BlockchainReadRequest::MissingTxsInBlock`] +fn missing_txs_in_block( + env: &ConcreteEnv, + block_hash: [u8; 32], + missing_txs: Vec, +) -> ResponseResult { + // Single-threaded, no `ThreadLocal` required. + let env_inner = env.env_inner(); + let tx_ro = env_inner.tx_ro()?; + let tables = env_inner.open_tables(&tx_ro)?; + + let block_height = tables.block_heights().get(&block_hash)?; + + let (block, miner_tx_index, numb_txs) = get_block_blob_with_tx_indexes(&block_height, &tables)?; + let first_tx_index = miner_tx_index + 1; + + if numb_txs < missing_txs.len() { + return Ok(BlockchainResponse::MissingTxsInBlock(None)); + } + + let txs = missing_txs + .into_iter() + .map(|index_offset| Ok(tables.tx_blobs().get(&(first_tx_index + index_offset))?.0)) + .collect::>()?; + + Ok(BlockchainResponse::MissingTxsInBlock(Some( + MissingTxsInBlock { block, txs }, + ))) +} + /// [`BlockchainReadRequest::AltBlocksInChain`] fn alt_blocks_in_chain(env: &ConcreteEnv, chain_id: ChainId) -> ResponseResult { // Prepare tx/tables in `ThreadLocal`. diff --git a/types/src/blockchain.rs b/types/src/blockchain.rs index 526dc43..37a04f7 100644 --- a/types/src/blockchain.rs +++ b/types/src/blockchain.rs @@ -8,6 +8,7 @@ use std::{ ops::Range, }; +use crate::types::MissingTxsInBlock; use crate::{ types::{Chain, ExtendedBlockHeader, OutputOnChain, VerifiedBlockInformation}, AltBlockInformation, BlockCompleteEntry, ChainId, @@ -113,6 +114,16 @@ pub enum BlockchainReadRequest { /// as this request performs a binary search. FindFirstUnknown(Vec<[u8; 32]>), + /// A request for transactions from a specific block. + MissingTxsInBlock { + /// The block to get transactions from. + block_hash: [u8; 32], + /// The indexes of the transactions from the block. + /// This is not the global index of the txs, instead it is the local index as they appear in + /// the block/ + tx_indexes: Vec, + }, + /// A request for all alt blocks in the chain with the given [`ChainId`]. AltBlocksInChain(ChainId), } @@ -252,6 +263,11 @@ pub enum BlockchainResponse { /// This will be [`None`] if all blocks were known. FindFirstUnknown(Option<(usize, usize)>), + /// The response for [`BlockchainReadRequest::MissingTxsInBlock`]. + /// + /// Will return [`None`] if the request contained an index out of range. + MissingTxsInBlock(Option), + /// The response for [`BlockchainReadRequest::AltBlocksInChain`]. /// /// Contains all the alt blocks in the alt-chain in chronological order. diff --git a/types/src/lib.rs b/types/src/lib.rs index 0b0dbe6..e86cf1b 100644 --- a/types/src/lib.rs +++ b/types/src/lib.rs @@ -20,7 +20,7 @@ pub use transaction_verification_data::{ CachedVerificationState, TransactionVerificationData, TxVersion, }; pub use types::{ - AltBlockInformation, Chain, ChainId, ExtendedBlockHeader, OutputOnChain, + AltBlockInformation, Chain, ChainId, ExtendedBlockHeader, MissingTxsInBlock, OutputOnChain, VerifiedBlockInformation, VerifiedTransactionInformation, }; diff --git a/types/src/types.rs b/types/src/types.rs index a60ce6c..f6f19c7 100644 --- a/types/src/types.rs +++ b/types/src/types.rs @@ -155,6 +155,12 @@ pub struct OutputOnChain { pub commitment: EdwardsPoint, } +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct MissingTxsInBlock { + pub block: Vec, + pub txs: Vec>, +} + //---------------------------------------------------------------------------------------------------- Tests #[cfg(test)] mod test {