Add missing txpool requests

This commit is contained in:
Boog900 2024-10-13 02:20:45 +01:00
parent 4ba94f4bb7
commit 2d5567e47b
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
5 changed files with 67 additions and 16 deletions

View file

@ -65,9 +65,10 @@ impl Service<TxStoreRequest<TxId>> for TxStoreService {
.boxed(),
TxStoreRequest::Promote(tx_id) => self
.txpool_write_handle
.clone()
.oneshot(TxpoolWriteRequest::Promote(tx_id))
.map(|res| match res {
Ok(_) | Err(RuntimeError::KeyNotFound) => TxStoreResponse::Ok,
Ok(_) | Err(RuntimeError::KeyNotFound) => Ok(TxStoreResponse::Ok),
Err(e) => Err(e.into()),
})
.boxed(),

View file

@ -84,11 +84,9 @@ impl Service<IncomingTxs> for IncomingTxHandler {
}
fn call(&mut self, req: IncomingTxs) -> Self::Future {
let IncomingTxs::Bytes { txs, state } = req;
handle_incoming_txs(
txs,
state,
req.txs,
req.state,
self.txs_being_handled.clone(),
self.blockchain_context_cache.clone(),
self.tx_verifier_service.clone(),

View file

@ -13,7 +13,7 @@ pub mod tables;
pub mod types;
pub use config::Config;
pub use free::open;
pub use free::{open, transaction_blob_hash};
//re-exports
pub use cuprate_database;

View file

@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{collections::HashSet, sync::Arc};
use rayon::ThreadPool;
@ -11,8 +11,8 @@ use crate::{
interface::{TxpoolReadRequest, TxpoolReadResponse},
types::{ReadResponseResult, TxpoolReadHandle},
},
tables::{OpenTables, TransactionBlobs},
types::TransactionHash,
tables::{KnownBlobHashes, OpenTables, TransactionBlobs, TransactionInfos},
types::{TransactionBlobHash, TransactionHash, TxStateFlags},
};
// TODO: update the docs here
@ -58,7 +58,9 @@ fn map_request(
match request {
TxpoolReadRequest::TxBlob(tx_hash) => tx_blob(env, &tx_hash),
TxpoolReadRequest::TxVerificationData(tx_hash) => tx_verification_data(env, &tx_hash),
_ => todo!(),
TxpoolReadRequest::FilterKnownTxBlobHashes(blob_hashes) => {
filter_known_tx_blob_hashes(env, blob_hashes)
}
}
}
@ -86,10 +88,15 @@ fn tx_blob(env: &ConcreteEnv, tx_hash: &TransactionHash) -> ReadResponseResult {
let tx_ro = inner_env.tx_ro()?;
let tx_blobs_table = inner_env.open_db_ro::<TransactionBlobs>(&tx_ro)?;
let tx_infos_table = inner_env.open_db_ro::<TransactionInfos>(&tx_ro)?;
tx_blobs_table
.get(tx_hash)
.map(|blob| TxpoolReadResponse::TxBlob(blob.0))
let tx_blob = tx_blobs_table.get(tx_hash)?.0;
let tx_info = tx_infos_table.get(tx_hash)?;
Ok(TxpoolReadResponse::TxBlob {
tx_blob,
state_stem: tx_info.flags.contains(TxStateFlags::STATE_STEM),
})
}
/// [`TxpoolReadRequest::TxVerificationData`].
@ -102,3 +109,29 @@ fn tx_verification_data(env: &ConcreteEnv, tx_hash: &TransactionHash) -> ReadRes
get_transaction_verification_data(tx_hash, &tables).map(TxpoolReadResponse::TxVerificationData)
}
/// [`TxpoolReadRequest::FilterKnownTxBlobHashes`].
fn filter_known_tx_blob_hashes(
env: &ConcreteEnv,
mut blob_hashes: HashSet<TransactionBlobHash>,
) -> ReadResponseResult {
let inner_env = env.env_inner();
let tx_ro = inner_env.tx_ro()?;
let tx_blob_hashes = inner_env.open_db_ro::<KnownBlobHashes>(&tx_ro)?;
let mut err = None;
blob_hashes.retain(|blob_hash| match tx_blob_hashes.contains(blob_hash) {
Ok(exists) => !exists,
Err(e) => {
err.get_or_insert(e);
false
}
});
if let Some(e) = err {
return Err(e);
}
Ok(TxpoolReadResponse::FilterKnownTxBlobHashes(blob_hashes))
}

View file

@ -1,6 +1,6 @@
use std::sync::Arc;
use cuprate_database::{ConcreteEnv, Env, EnvInner, RuntimeError, TxRw};
use cuprate_database::{ConcreteEnv, DatabaseRw, Env, EnvInner, RuntimeError, TxRw};
use cuprate_database_service::DatabaseWriteHandle;
use cuprate_types::TransactionVerificationData;
@ -10,8 +10,8 @@ use crate::{
interface::{TxpoolWriteRequest, TxpoolWriteResponse},
types::TxpoolWriteHandle,
},
tables::OpenTables,
types::TransactionHash,
tables::{OpenTables, TransactionInfos},
types::{TransactionHash, TxStateFlags},
};
//---------------------------------------------------------------------------------------------------- init_write_service
@ -31,6 +31,7 @@ fn handle_txpool_request(
add_transaction(env, tx, *state_stem)
}
TxpoolWriteRequest::RemoveTransaction(tx_hash) => remove_transaction(env, tx_hash),
TxpoolWriteRequest::Promote(tx_hash) => promote(env, tx_hash),
}
}
@ -101,3 +102,21 @@ fn remove_transaction(
TxRw::commit(tx_rw)?;
Ok(TxpoolWriteResponse::Ok)
}
/// [`TxpoolWriteRequest::Promote`]
fn promote(
env: &ConcreteEnv,
tx_hash: &TransactionHash,
) -> Result<TxpoolWriteResponse, RuntimeError> {
let env_inner = env.env_inner();
let tx_rw = env_inner.tx_rw()?;
let mut tx_infos = env_inner.open_db_rw::<TransactionInfos>(&tx_rw)?;
tx_infos.update(tx_hash, |mut info| {
info.flags.remove(TxStateFlags::STATE_STEM);
Some(info)
})?;
Ok(TxpoolWriteResponse::Ok)
}