diff --git a/binaries/cuprated/src/txpool/dandelion/tx_store.rs b/binaries/cuprated/src/txpool/dandelion/tx_store.rs index f13a646..c8fda16 100644 --- a/binaries/cuprated/src/txpool/dandelion/tx_store.rs +++ b/binaries/cuprated/src/txpool/dandelion/tx_store.rs @@ -65,9 +65,10 @@ impl Service> for TxStoreService { .boxed(), TxStoreRequest::Promote(tx_id) => self .txpool_write_handle + .clone() .oneshot(TxpoolWriteRequest::Promote(tx_id)) .map(|res| match res { - Ok(_) | Err(RuntimeError::KeyNotFound) => TxStoreResponse::Ok, + Ok(_) | Err(RuntimeError::KeyNotFound) => Ok(TxStoreResponse::Ok), Err(e) => Err(e.into()), }) .boxed(), diff --git a/binaries/cuprated/src/txpool/incoming_tx.rs b/binaries/cuprated/src/txpool/incoming_tx.rs index abac69d..addcad5 100644 --- a/binaries/cuprated/src/txpool/incoming_tx.rs +++ b/binaries/cuprated/src/txpool/incoming_tx.rs @@ -84,11 +84,9 @@ impl Service for IncomingTxHandler { } fn call(&mut self, req: IncomingTxs) -> Self::Future { - let IncomingTxs::Bytes { txs, state } = req; - handle_incoming_txs( - txs, - state, + req.txs, + req.state, self.txs_being_handled.clone(), self.blockchain_context_cache.clone(), self.tx_verifier_service.clone(), diff --git a/storage/txpool/src/lib.rs b/storage/txpool/src/lib.rs index 243dc4d..04d7bb9 100644 --- a/storage/txpool/src/lib.rs +++ b/storage/txpool/src/lib.rs @@ -13,7 +13,7 @@ pub mod tables; pub mod types; pub use config::Config; -pub use free::open; +pub use free::{open, transaction_blob_hash}; //re-exports pub use cuprate_database; diff --git a/storage/txpool/src/service/read.rs b/storage/txpool/src/service/read.rs index 20a4bcd..4bcb32a 100644 --- a/storage/txpool/src/service/read.rs +++ b/storage/txpool/src/service/read.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{collections::HashSet, sync::Arc}; use rayon::ThreadPool; @@ -11,8 +11,8 @@ use crate::{ interface::{TxpoolReadRequest, TxpoolReadResponse}, types::{ReadResponseResult, TxpoolReadHandle}, }, - tables::{OpenTables, TransactionBlobs}, - types::TransactionHash, + tables::{KnownBlobHashes, OpenTables, TransactionBlobs, TransactionInfos}, + types::{TransactionBlobHash, TransactionHash, TxStateFlags}, }; // TODO: update the docs here @@ -58,7 +58,9 @@ fn map_request( match request { TxpoolReadRequest::TxBlob(tx_hash) => tx_blob(env, &tx_hash), TxpoolReadRequest::TxVerificationData(tx_hash) => tx_verification_data(env, &tx_hash), - _ => todo!(), + TxpoolReadRequest::FilterKnownTxBlobHashes(blob_hashes) => { + filter_known_tx_blob_hashes(env, blob_hashes) + } } } @@ -86,10 +88,15 @@ fn tx_blob(env: &ConcreteEnv, tx_hash: &TransactionHash) -> ReadResponseResult { let tx_ro = inner_env.tx_ro()?; let tx_blobs_table = inner_env.open_db_ro::(&tx_ro)?; + let tx_infos_table = inner_env.open_db_ro::(&tx_ro)?; - tx_blobs_table - .get(tx_hash) - .map(|blob| TxpoolReadResponse::TxBlob(blob.0)) + let tx_blob = tx_blobs_table.get(tx_hash)?.0; + let tx_info = tx_infos_table.get(tx_hash)?; + + Ok(TxpoolReadResponse::TxBlob { + tx_blob, + state_stem: tx_info.flags.contains(TxStateFlags::STATE_STEM), + }) } /// [`TxpoolReadRequest::TxVerificationData`]. @@ -102,3 +109,29 @@ fn tx_verification_data(env: &ConcreteEnv, tx_hash: &TransactionHash) -> ReadRes get_transaction_verification_data(tx_hash, &tables).map(TxpoolReadResponse::TxVerificationData) } + +/// [`TxpoolReadRequest::FilterKnownTxBlobHashes`]. +fn filter_known_tx_blob_hashes( + env: &ConcreteEnv, + mut blob_hashes: HashSet, +) -> ReadResponseResult { + let inner_env = env.env_inner(); + let tx_ro = inner_env.tx_ro()?; + + let tx_blob_hashes = inner_env.open_db_ro::(&tx_ro)?; + + let mut err = None; + blob_hashes.retain(|blob_hash| match tx_blob_hashes.contains(blob_hash) { + Ok(exists) => !exists, + Err(e) => { + err.get_or_insert(e); + false + } + }); + + if let Some(e) = err { + return Err(e); + } + + Ok(TxpoolReadResponse::FilterKnownTxBlobHashes(blob_hashes)) +} diff --git a/storage/txpool/src/service/write.rs b/storage/txpool/src/service/write.rs index 8a3b1bf..84916c8 100644 --- a/storage/txpool/src/service/write.rs +++ b/storage/txpool/src/service/write.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use cuprate_database::{ConcreteEnv, Env, EnvInner, RuntimeError, TxRw}; +use cuprate_database::{ConcreteEnv, DatabaseRw, Env, EnvInner, RuntimeError, TxRw}; use cuprate_database_service::DatabaseWriteHandle; use cuprate_types::TransactionVerificationData; @@ -10,8 +10,8 @@ use crate::{ interface::{TxpoolWriteRequest, TxpoolWriteResponse}, types::TxpoolWriteHandle, }, - tables::OpenTables, - types::TransactionHash, + tables::{OpenTables, TransactionInfos}, + types::{TransactionHash, TxStateFlags}, }; //---------------------------------------------------------------------------------------------------- init_write_service @@ -31,6 +31,7 @@ fn handle_txpool_request( add_transaction(env, tx, *state_stem) } TxpoolWriteRequest::RemoveTransaction(tx_hash) => remove_transaction(env, tx_hash), + TxpoolWriteRequest::Promote(tx_hash) => promote(env, tx_hash), } } @@ -101,3 +102,21 @@ fn remove_transaction( TxRw::commit(tx_rw)?; Ok(TxpoolWriteResponse::Ok) } + +/// [`TxpoolWriteRequest::Promote`] +fn promote( + env: &ConcreteEnv, + tx_hash: &TransactionHash, +) -> Result { + let env_inner = env.env_inner(); + let tx_rw = env_inner.tx_rw()?; + + let mut tx_infos = env_inner.open_db_rw::(&tx_rw)?; + + tx_infos.update(tx_hash, |mut info| { + info.flags.remove(TxStateFlags::STATE_STEM); + Some(info) + })?; + + Ok(TxpoolWriteResponse::Ok) +}