From e8de295b57a33b39245c1ada1bbe9e2f00275531 Mon Sep 17 00:00:00 2001 From: "hinto.janai" Date: Tue, 24 Sep 2024 18:02:39 -0400 Subject: [PATCH] add txpool msgs --- binaries/cuprated/src/rpc/request.rs | 2 +- .../cuprated/src/rpc/request/address_book.rs | 120 ++++++++++++++++++ binaries/cuprated/src/rpc/request/p2p.rs | 119 ----------------- binaries/cuprated/src/rpc/request/txpool.rs | 48 ++++++- storage/txpool/src/service/interface.rs | 32 ++++- storage/txpool/src/service/read.rs | 1 + 6 files changed, 197 insertions(+), 125 deletions(-) create mode 100644 binaries/cuprated/src/rpc/request/address_book.rs delete mode 100644 binaries/cuprated/src/rpc/request/p2p.rs diff --git a/binaries/cuprated/src/rpc/request.rs b/binaries/cuprated/src/rpc/request.rs index ac26ebe..6e59a88 100644 --- a/binaries/cuprated/src/rpc/request.rs +++ b/binaries/cuprated/src/rpc/request.rs @@ -12,7 +12,7 @@ //! the [`blockchain`] modules contains methods for the //! blockchain database [`tower::Service`] API. +mod address_book; mod blockchain; mod blockchain_manager; -mod p2p; mod txpool; diff --git a/binaries/cuprated/src/rpc/request/address_book.rs b/binaries/cuprated/src/rpc/request/address_book.rs new file mode 100644 index 0000000..78ed5fb --- /dev/null +++ b/binaries/cuprated/src/rpc/request/address_book.rs @@ -0,0 +1,120 @@ +//! Functions for TODO: doc enum message. + +use std::{ + collections::{HashMap, HashSet}, + convert::Infallible, + sync::Arc, +}; + +use anyhow::{anyhow, Error}; +use futures::StreamExt; +use monero_serai::block::Block; +use tower::{Service, ServiceExt}; + +use cuprate_helper::{ + cast::{u64_to_usize, usize_to_u64}, + map::split_u128_into_low_high_bits, +}; +use cuprate_p2p_core::{ + client::handshaker::builder::DummyAddressBook, + services::{AddressBookRequest, AddressBookResponse}, + AddressBook, ClearNet, NetworkZone, +}; +use cuprate_types::{ + blockchain::{BlockchainReadRequest, BlockchainWriteRequest}, + Chain, ExtendedBlockHeader, HardFork, OutputOnChain, VerifiedBlockInformation, +}; + +use crate::rpc::{CupratedRpcHandler, CupratedRpcHandlerState}; + +/// [`AddressBookRequest::PeerlistSize`] +pub(super) async fn peerlist_size( + address_book: &mut impl AddressBook, +) -> Result<(u64, u64), Error> { + let AddressBookResponse::PeerlistSize { white, grey } = address_book + .ready() + .await + .expect("TODO") + .call(AddressBookRequest::PeerlistSize) + .await + .expect("TODO") + else { + unreachable!(); + }; + + Ok((usize_to_u64(white), usize_to_u64(grey))) +} + +/// [`AddressBookRequest::ConnectionCount`] +pub(super) async fn connection_count( + address_book: &mut impl AddressBook, +) -> Result<(u64, u64), Error> { + let AddressBookResponse::ConnectionCount { incoming, outgoing } = address_book + .ready() + .await + .expect("TODO") + .call(AddressBookRequest::ConnectionCount) + .await + .expect("TODO") + else { + unreachable!(); + }; + + Ok((usize_to_u64(incoming), usize_to_u64(outgoing))) +} + +/// [`AddressBookRequest::SetBan`] +pub(super) async fn set_ban( + address_book: &mut impl AddressBook, + peer: Infallible, +) -> Result<(), Error> { + let AddressBookResponse::Ok = address_book + .ready() + .await + .expect("TODO") + .call(AddressBookRequest::SetBan(peer)) + .await + .expect("TODO") + else { + unreachable!(); + }; + + Ok(()) +} + +/// [`AddressBookRequest::GetBan`] +pub(super) async fn get_ban( + address_book: &mut impl AddressBook, + peer: Infallible, +) -> Result<(), Error> { + let AddressBookResponse::GetBan(ban) = address_book + .ready() + .await + .expect("TODO") + .call(AddressBookRequest::GetBan(peer)) + .await + .expect("TODO") + else { + unreachable!(); + }; + + Ok(()) +} + +/// [`AddressBookRequest::GetBans`] +pub(super) async fn get_bans( + address_book: &mut impl AddressBook, +) -> Result<(), Error> { + let AddressBookResponse::GetBans(bans) = address_book + .ready() + .await + .expect("TODO") + .call(AddressBookRequest::GetBans) + .await + .expect("TODO") + else { + unreachable!(); + }; + + Ok(todo!()) +} diff --git a/binaries/cuprated/src/rpc/request/p2p.rs b/binaries/cuprated/src/rpc/request/p2p.rs deleted file mode 100644 index 22f46d7..0000000 --- a/binaries/cuprated/src/rpc/request/p2p.rs +++ /dev/null @@ -1,119 +0,0 @@ -//! Functions for TODO: doc enum message. - -use std::{ - collections::{HashMap, HashSet}, - convert::Infallible, - sync::Arc, -}; - -use anyhow::{anyhow, Error}; -use futures::StreamExt; -use monero_serai::block::Block; -use tower::{Service, ServiceExt}; - -use cuprate_helper::{ - cast::{u64_to_usize, usize_to_u64}, - map::split_u128_into_low_high_bits, -}; -use cuprate_p2p_core::{ - client::handshaker::builder::DummyAddressBook, - services::{AddressBookRequest, AddressBookResponse}, - AddressBook, ClearNet, -}; -use cuprate_types::{ - blockchain::{BlockchainReadRequest, BlockchainWriteRequest}, - Chain, ExtendedBlockHeader, HardFork, OutputOnChain, VerifiedBlockInformation, -}; - -use crate::rpc::{CupratedRpcHandler, CupratedRpcHandlerState}; - -#[expect(clippy::needless_pass_by_ref_mut, reason = "TODO: remove after impl")] -impl CupratedRpcHandlerState { - /// [`AddressBookRequest::PeerlistSize`] - pub(super) async fn peerlist_size(&mut self) -> Result<(u64, u64), Error> { - let AddressBookResponse::::PeerlistSize { white, grey } = - >>::ready( - &mut DummyAddressBook, - ) - .await - .expect("TODO") - .call(AddressBookRequest::PeerlistSize) - .await - .expect("TODO") - else { - unreachable!(); - }; - - Ok((usize_to_u64(white), usize_to_u64(grey))) - } - - /// [`AddressBookRequest::ConnectionCount`] - pub(super) async fn connection_count(&mut self) -> Result<(u64, u64), Error> { - let AddressBookResponse::::ConnectionCount { incoming, outgoing } = - >>::ready( - &mut DummyAddressBook, - ) - .await - .expect("TODO") - .call(AddressBookRequest::ConnectionCount) - .await - .expect("TODO") - else { - unreachable!(); - }; - - Ok((usize_to_u64(incoming), usize_to_u64(outgoing))) - } - - /// [`AddressBookRequest::SetBan`] - pub(super) async fn set_ban(&mut self, peer: Infallible) -> Result<(), Error> { - let AddressBookResponse::::Ok = , - >>::ready(&mut DummyAddressBook) - .await - .expect("TODO") - .call(AddressBookRequest::SetBan(peer)) - .await - .expect("TODO") else { - unreachable!(); - }; - - Ok(()) - } - - /// [`AddressBookRequest::GetBan`] - pub(super) async fn get_ban(&mut self, peer: Infallible) -> Result<(), Error> { - let AddressBookResponse::::GetBan(ban) = - >>::ready( - &mut DummyAddressBook, - ) - .await - .expect("TODO") - .call(AddressBookRequest::GetBan(peer)) - .await - .expect("TODO") - else { - unreachable!(); - }; - - Ok(()) - } - - /// [`AddressBookRequest::GetBans`] - pub(super) async fn get_bans(&mut self) -> Result<(), Error> { - let AddressBookResponse::::GetBans(bans) = - >>::ready( - &mut DummyAddressBook, - ) - .await - .expect("TODO") - .call(AddressBookRequest::GetBans) - .await - .expect("TODO") - else { - unreachable!(); - }; - - Ok(todo!()) - } -} diff --git a/binaries/cuprated/src/rpc/request/txpool.rs b/binaries/cuprated/src/rpc/request/txpool.rs index b6c16b3..517661e 100644 --- a/binaries/cuprated/src/rpc/request/txpool.rs +++ b/binaries/cuprated/src/rpc/request/txpool.rs @@ -2,10 +2,12 @@ use std::{ collections::{HashMap, HashSet}, + convert::Infallible, sync::Arc, }; use anyhow::{anyhow, Error}; +use cuprate_database_service::DatabaseReadService; use futures::StreamExt; use monero_serai::block::Block; use tower::{Service, ServiceExt}; @@ -15,6 +17,10 @@ use cuprate_helper::{ cast::{u64_to_usize, usize_to_u64}, map::split_u128_into_low_high_bits, }; +use cuprate_txpool::service::{ + interface::{TxpoolReadRequest, TxpoolReadResponse}, + TxpoolReadHandle, +}; use cuprate_types::{ blockchain::{BlockchainReadRequest, BlockchainWriteRequest}, Chain, ExtendedBlockHeader, HardFork, OutputOnChain, VerifiedBlockInformation, @@ -22,4 +28,44 @@ use cuprate_types::{ use crate::rpc::{CupratedRpcHandler, CupratedRpcHandlerState}; -impl CupratedRpcHandlerState {} +/// [`TxpoolReadRequest::Backlog`] +pub(super) async fn backlog(txpool_read: &mut TxpoolReadHandle) -> Result, Error> { + let TxpoolReadResponse::Backlog(backlog) = txpool_read + .ready() + .await + .expect("TODO") + .call(TxpoolReadRequest::Backlog) + .await + .expect("TODO") + else { + unreachable!(); + }; + + Ok(backlog) +} + +/// [`TxpoolReadRequest::Size`] +pub(super) async fn size(txpool_read: &mut TxpoolReadHandle) -> Result { + let TxpoolReadResponse::Size(size) = txpool_read + .ready() + .await + .expect("TODO") + .call(TxpoolReadRequest::Size) + .await + .expect("TODO") + else { + unreachable!(); + }; + + Ok(usize_to_u64(size)) +} + +// [`::Flush`] +#[expect(clippy::needless_pass_by_ref_mut, reason = "TODO: remove after impl")] +pub(super) async fn flush( + txpool_read: &mut TxpoolReadHandle, + tx_hashes: Vec<[u8; 32]>, +) -> Result<(), Error> { + todo!(); + Ok(()) +} diff --git a/storage/txpool/src/service/interface.rs b/storage/txpool/src/service/interface.rs index 411b784..b3f4f75 100644 --- a/storage/txpool/src/service/interface.rs +++ b/storage/txpool/src/service/interface.rs @@ -12,19 +12,39 @@ use crate::types::TransactionHash; pub enum TxpoolReadRequest { /// A request for the blob (raw bytes) of a transaction with the given hash. TxBlob(TransactionHash), + /// A request for the [`TransactionVerificationData`] of a transaction in the tx pool. TxVerificationData(TransactionHash), + + /// TODO + Backlog, + + /// TODO + Size, } //---------------------------------------------------------------------------------------------------- TxpoolReadResponse /// The transaction pool [`tower::Service`] read response type. #[expect(clippy::large_enum_variant)] pub enum TxpoolReadResponse { - /// A response containing the raw bytes of a transaction. + /// Response to [`TxpoolReadRequest::TxBlob`]. + /// + /// The inner value is the raw bytes of a transaction. // TODO: use bytes::Bytes. TxBlob(Vec), - /// A response of [`TransactionVerificationData`]. + + /// Response to [`TxpoolReadRequest::TxVerificationData`]. TxVerificationData(TransactionVerificationData), + + /// Response to [`TxpoolReadRequest::Backlog`]. + /// + /// TODO + Backlog(Vec), + + /// Response to [`TxpoolReadRequest::Size`]. + /// + /// TODO + Size(usize), } //---------------------------------------------------------------------------------------------------- TxpoolWriteRequest @@ -42,6 +62,7 @@ pub enum TxpoolWriteRequest { /// [`true`] if this tx is in the stem state. state_stem: bool, }, + /// Remove a transaction with the given hash from the pool. /// /// Returns [`TxpoolWriteResponse::Ok`]. @@ -52,9 +73,12 @@ pub enum TxpoolWriteRequest { /// The transaction pool [`tower::Service`] write response type. #[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq)] pub enum TxpoolWriteResponse { - /// A [`TxpoolWriteRequest::AddTransaction`] response. + /// Response to: + /// - [`TxpoolWriteRequest::RemoveTransaction`] + Ok, + + /// Response to [`TxpoolWriteRequest::AddTransaction`]. /// /// If the inner value is [`Some`] the tx was not added to the pool as it double spends a tx with the given hash. AddTransaction(Option), - Ok, } diff --git a/storage/txpool/src/service/read.rs b/storage/txpool/src/service/read.rs index f006813..5e75529 100644 --- a/storage/txpool/src/service/read.rs +++ b/storage/txpool/src/service/read.rs @@ -58,6 +58,7 @@ fn map_request( match request { TxpoolReadRequest::TxBlob(tx_hash) => tx_blob(env, &tx_hash), TxpoolReadRequest::TxVerificationData(tx_hash) => tx_verification_data(env, &tx_hash), + TxpoolReadRequest::Backlog | TxpoolReadRequest::Size => todo!(), } }