add cross network address

This commit is contained in:
Boog900 2024-10-15 16:23:08 +01:00
parent e790fa09f1
commit bffec38fbe
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
15 changed files with 103 additions and 76 deletions

View file

@ -8,19 +8,16 @@ use std::{
};
use monero_serai::{block::Block, transaction::Transaction};
use rayon::prelude::*;
use tokio::sync::{mpsc, oneshot};
use tower::{Service, ServiceExt};
use cuprate_blockchain::service::BlockchainReadHandle;
use cuprate_consensus::transactions::new_tx_verification_data;
use cuprate_helper::cast::usize_to_u64;
use cuprate_txpool::service::interface::{TxpoolReadRequest, TxpoolReadResponse};
use cuprate_txpool::service::TxpoolReadHandle;
use cuprate_types::{
blockchain::{BlockchainReadRequest, BlockchainResponse},
Chain,
use cuprate_txpool::service::{
interface::{TxpoolReadRequest, TxpoolReadResponse},
TxpoolReadHandle,
};
use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse};
use crate::{
blockchain::manager::{BlockchainManagerCommand, IncomingBlockOk},
@ -114,6 +111,10 @@ pub async fn handle_incoming_block(
for needed_hash in needed_hashes {
let Some(tx) = given_txs.remove(&needed_hash) else {
// We return back the indexes of all txs missing from our pool, not taking into account the txs
// that were given with the block, as these txs will be dropped. It is not worth it to try to add
// these txs to the pool as this will only happen with a misbehaving peer or if the txpool reaches
// the size limit.
return Err(IncomingBlockError::UnknownTransactions(block_hash, missing));
};

View file

@ -17,6 +17,7 @@ use cuprate_p2p::{
BroadcastSvc, NetworkInterface,
};
use cuprate_p2p_core::ClearNet;
use cuprate_txpool::service::TxpoolWriteHandle;
use cuprate_types::{
blockchain::{BlockchainReadRequest, BlockchainResponse},
Chain, TransactionVerificationData,
@ -36,7 +37,6 @@ mod commands;
mod handler;
pub use commands::{BlockchainManagerCommand, IncomingBlockOk};
use cuprate_txpool::service::TxpoolWriteHandle;
/// Initialize the blockchain manager.
///

View file

@ -1,8 +1,10 @@
//! The blockchain manager handler functions.
use bytes::Bytes;
use futures::{TryFutureExt, TryStreamExt};
use monero_serai::transaction::Input;
use monero_serai::{block::Block, transaction::Transaction};
use monero_serai::{
block::Block,
transaction::{Input, Transaction},
};
use rayon::prelude::*;
use std::ops::ControlFlow;
use std::{collections::HashMap, sync::Arc};
@ -24,11 +26,8 @@ use cuprate_types::{
AltBlockInformation, HardFork, TransactionVerificationData, VerifiedBlockInformation,
};
use crate::blockchain::manager::commands::IncomingBlockOk;
use crate::{
blockchain::{
manager::commands::BlockchainManagerCommand, types::ConsensusBlockchainReadHandle,
},
blockchain::manager::commands::{BlockchainManagerCommand, IncomingBlockOk},
constants::PANIC_CRITICAL_SERVICE_ERROR,
signals::REORG_LOCK,
};

View file

@ -1,11 +1,10 @@
// FIXME: This whole module is not great and should be rewritten when the PeerSet is made.
use std::{pin::pin, sync::Arc, time::Duration};
use std::{sync::Arc, time::Duration};
use futures::StreamExt;
use tokio::time::interval;
use tokio::{
sync::{mpsc, Notify},
time::sleep,
time::interval,
};
use tower::{Service, ServiceExt};
use tracing::instrument;

View file

@ -1,13 +1,7 @@
use std::task::{Context, Poll};
use futures::future::BoxFuture;
use futures::{FutureExt, TryFutureExt};
use tower::{util::MapErr, Service};
use tower::util::MapErr;
use cuprate_blockchain::{cuprate_database::RuntimeError, service::BlockchainReadHandle};
use cuprate_consensus::{BlockChainContextService, BlockVerifierService, TxVerifierService};
use cuprate_p2p::block_downloader::{ChainSvcRequest, ChainSvcResponse};
use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse};
/// The [`BlockVerifierService`] with all generic types defined.
pub type ConcreteBlockVerifierService = BlockVerifierService<

View file

@ -12,8 +12,6 @@
reason = "TODO: remove after v1.0.0"
)]
use crate::txpool::IncomingTxHandler;
mod blockchain;
mod config;
mod constants;

View file

@ -2,4 +2,7 @@
//!
//! Will handle initiating the P2P and contains a protocol request handler.
mod network_address;
pub mod request_handler;
pub use network_address::CrossNetworkInternalPeerId;

View file

@ -0,0 +1,16 @@
use std::net::SocketAddr;
use cuprate_p2p_core::{client::InternalPeerID, ClearNet, NetworkZone};
/// An identifier for a P2P peer on any network.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum CrossNetworkInternalPeerId {
/// A clear-net peer.
ClearNet(InternalPeerID<<ClearNet as NetworkZone>::Addr>),
}
impl From<InternalPeerID<<ClearNet as NetworkZone>::Addr>> for CrossNetworkInternalPeerId {
fn from(addr: InternalPeerID<<ClearNet as NetworkZone>::Addr>) -> Self {
Self::ClearNet(addr)
}
}

View file

@ -1,7 +1,7 @@
//! Global `static`s used throughout `cuprated`.
use std::{
sync::{atomic::AtomicU64, LazyLock},
sync::LazyLock,
time::{SystemTime, UNIX_EPOCH},
};

View file

@ -1,7 +1,6 @@
//! Transaction Pool
//!
//! Will handle initiating the tx-pool, providing the preprocessor required for the dandelion pool.
//! Handles initiating the tx-pool, providing the preprocessor required for the dandelion pool.
use cuprate_consensus::BlockChainContextService;
use cuprate_p2p::NetworkInterface;
use cuprate_p2p_core::ClearNet;
@ -13,9 +12,11 @@ mod dandelion;
mod incoming_tx;
mod txs_being_handled;
pub use incoming_tx::{IncomingTxError, IncomingTxHandler, IncomingTxs};
pub use incoming_tx::IncomingTxHandler;
pub fn init_incoming_tx_handler(
/// Initialize the [`IncomingTxHandler`].
#[expect(clippy::significant_drop_tightening)]
pub fn incoming_tx_handler(
clear_net: NetworkInterface<ClearNet>,
txpool_write_handle: TxpoolWriteHandle,
txpool_read_handle: TxpoolReadHandle,

View file

@ -1,19 +1,21 @@
use std::time::Duration;
use bytes::Bytes;
use cuprate_dandelion_tower::pool::DandelionPoolService;
use cuprate_dandelion_tower::{DandelionConfig, DandelionRouter, Graph};
use super::incoming_tx::{DandelionTx, TxId};
use crate::p2p::CrossNetworkInternalPeerId;
use cuprate_dandelion_tower::{
pool::DandelionPoolService, DandelionConfig, DandelionRouter, Graph,
};
use cuprate_p2p::NetworkInterface;
use cuprate_p2p_core::ClearNet;
use cuprate_txpool::service::{TxpoolReadHandle, TxpoolWriteHandle};
use cuprate_wire::NetworkAddress;
use super::incoming_tx::{DandelionTx, TxId};
mod diffuse_service;
mod stem_service;
mod tx_store;
/// The configuration used for [`cuprate_dandelion_tower`].
///
/// TODO: should we expose this? probably not.
const DANDELION_CONFIG: DandelionConfig = DandelionConfig {
time_between_hop: Duration::from_millis(175),
epoch_duration: Duration::from_secs(10 * 60),
@ -21,20 +23,23 @@ const DANDELION_CONFIG: DandelionConfig = DandelionConfig {
graph: Graph::FourRegular,
};
/// A [`DandelionRouter`] with all generic types defined.
type ConcreteDandelionRouter = DandelionRouter<
stem_service::OutboundPeerStream,
diffuse_service::DiffuseService,
NetworkAddress,
CrossNetworkInternalPeerId,
stem_service::StemPeerService<ClearNet>,
DandelionTx,
>;
/// Starts the dandelion pool manager task and returns a handle to send txs to broadcast.
pub fn start_dandelion_pool_manager(
router: ConcreteDandelionRouter,
txpool_read_handle: TxpoolReadHandle,
txpool_write_handle: TxpoolWriteHandle,
) -> DandelionPoolService<DandelionTx, TxId, NetworkAddress> {
) -> DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId> {
cuprate_dandelion_tower::pool::start_dandelion_pool_manager(
// TODO: make this constant configurable?
32,
router,
tx_store::TxStoreService {
@ -45,6 +50,7 @@ pub fn start_dandelion_pool_manager(
)
}
/// Creates a [`DandelionRouter`] from a [`NetworkInterface`].
pub fn dandelion_router(clear_net: NetworkInterface<ClearNet>) -> ConcreteDandelionRouter {
DandelionRouter::new(
diffuse_service::DiffuseService {

View file

@ -7,7 +7,7 @@ use futures::FutureExt;
use tower::Service;
use cuprate_dandelion_tower::traits::DiffuseRequest;
use cuprate_p2p::{BroadcastRequest, BroadcastSvc, NetworkInterface};
use cuprate_p2p::{BroadcastRequest, BroadcastSvc};
use cuprate_p2p_core::ClearNet;
use super::DandelionTx;
@ -29,7 +29,8 @@ impl Service<DiffuseRequest<DandelionTx>> for DiffuseService {
}
fn call(&mut self, req: DiffuseRequest<DandelionTx>) -> Self::Future {
// TODO: Call `into_inner` when 1.82.0 stabilizes
// TODO: the dandelion crate should pass along where we got the tx from.
// TODO: Use `into_inner` when 1.82.0 stabilizes.
self.clear_net_broadcast_service
.call(BroadcastRequest::Transaction {
tx_bytes: req.0 .0,

View file

@ -3,19 +3,20 @@ use std::{
task::{Context, Poll},
};
use bytes::Bytes;
use futures::Stream;
use tower::Service;
use cuprate_dandelion_tower::{traits::StemRequest, OutboundPeer};
use cuprate_p2p::NetworkInterface;
use cuprate_p2p_core::{
client::{Client, InternalPeerID},
ClearNet, NetworkZone, PeerRequest, ProtocolRequest,
};
use cuprate_wire::{protocol::NewTransactions, NetworkAddress};
use bytes::Bytes;
use futures::Stream;
use tower::Service;
use cuprate_wire::protocol::NewTransactions;
use super::DandelionTx;
use crate::p2p::CrossNetworkInternalPeerId;
/// The dandelion outbound peer stream.
pub struct OutboundPeerStream {
@ -23,7 +24,10 @@ pub struct OutboundPeerStream {
}
impl Stream for OutboundPeerStream {
type Item = Result<OutboundPeer<NetworkAddress, StemPeerService<ClearNet>>, tower::BoxError>;
type Item = Result<
OutboundPeer<CrossNetworkInternalPeerId, StemPeerService<ClearNet>>,
tower::BoxError,
>;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// TODO: make the outbound peer choice random.
@ -32,12 +36,10 @@ impl Stream for OutboundPeerStream {
.client_pool()
.outbound_client()
.map_or(OutboundPeer::Exhausted, |client| {
let addr = match client.info.id {
InternalPeerID::KnownAddr(addr) => addr,
InternalPeerID::Unknown(_) => panic!("Outbound peer had an unknown address"),
};
OutboundPeer::Peer(addr.into(), StemPeerService(client))
OutboundPeer::Peer(
CrossNetworkInternalPeerId::ClearNet(client.info.id),
StemPeerService(client),
)
}))))
}
}

View file

@ -1,11 +1,8 @@
use std::{
f32::consts::E,
task::{Context, Poll},
};
use std::task::{Context, Poll};
use bytes::Bytes;
use futures::{future::BoxFuture, FutureExt, StreamExt, TryFutureExt};
use tower::{util::Oneshot, Service, ServiceExt};
use futures::{future::BoxFuture, FutureExt};
use tower::{Service, ServiceExt};
use cuprate_dandelion_tower::{
traits::{TxStoreRequest, TxStoreResponse},
@ -13,7 +10,7 @@ use cuprate_dandelion_tower::{
};
use cuprate_database::RuntimeError;
use cuprate_txpool::service::{
interface::{TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest, TxpoolWriteResponse},
interface::{TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest},
TxpoolReadHandle, TxpoolWriteHandle,
};

View file

@ -1,24 +1,20 @@
use std::{
collections::HashSet,
future::ready,
sync::Arc,
task::{Context, Poll},
};
use bytes::Bytes;
use dashmap::DashSet;
use futures::{future::BoxFuture, FutureExt};
use monero_serai::transaction::Transaction;
use sha3::{Digest, Sha3_256};
use tower::{Service, ServiceExt};
use cuprate_consensus::{
transactions::new_tx_verification_data, BlockChainContextRequest, BlockChainContextResponse,
BlockChainContextService, ExtendedConsensusError, TxVerifierService, VerifyTxRequest,
VerifyTxResponse,
BlockChainContextService, ExtendedConsensusError, VerifyTxRequest,
};
use cuprate_dandelion_tower::{
pool::{DandelionPoolService, IncomingTx, IncomingTxBuilder},
pool::{DandelionPoolService, IncomingTxBuilder},
State, TxState,
};
use cuprate_helper::asynch::rayon_spawn_async;
@ -27,8 +23,8 @@ use cuprate_txpool::service::{
TxpoolReadHandle, TxpoolWriteHandle,
};
use cuprate_types::TransactionVerificationData;
use cuprate_wire::NetworkAddress;
use crate::p2p::CrossNetworkInternalPeerId;
use crate::{
blockchain::ConcreteTxVerifierService,
constants::PANIC_CRITICAL_SERVICE_ERROR,
@ -45,8 +41,10 @@ pub enum IncomingTxError {
/// Incoming transactions.
pub struct IncomingTxs {
/// The raw bytes of the transactions.
pub txs: Vec<Bytes>,
pub state: TxState<NetworkAddress>,
/// The routing state of the transactions.
pub state: TxState<CrossNetworkInternalPeerId>,
}
/// The transaction type used for dandelion++.
@ -65,7 +63,8 @@ pub struct IncomingTxHandler {
/// The blockchain context cache.
pub(super) blockchain_context_cache: BlockChainContextService,
/// The dandelion txpool manager.
pub(super) dandelion_pool_manager: DandelionPoolService<DandelionTx, TxId, NetworkAddress>,
pub(super) dandelion_pool_manager:
DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId>,
/// The transaction verifier service.
pub(super) tx_verifier_service: ConcreteTxVerifierService,
/// The txpool write handle.
@ -98,18 +97,19 @@ impl Service<IncomingTxs> for IncomingTxHandler {
}
}
/// Handles the incoming txs.
#[expect(clippy::too_many_arguments)]
async fn handle_incoming_txs(
txs: Vec<Bytes>,
state: TxState<NetworkAddress>,
state: TxState<CrossNetworkInternalPeerId>,
txs_being_handled: TxsBeingHandled,
mut blockchain_context_cache: BlockChainContextService,
mut tx_verifier_service: ConcreteTxVerifierService,
mut txpool_write_handle: TxpoolWriteHandle,
mut txpool_read_handle: TxpoolReadHandle,
mut dandelion_pool_manager: DandelionPoolService<DandelionTx, TxId, NetworkAddress>,
mut dandelion_pool_manager: DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId>,
) -> Result<(), IncomingTxError> {
let reorg_guard = REORG_LOCK.read().await;
let _reorg_guard = REORG_LOCK.read().await;
let (txs, stem_pool_txs, txs_being_handled_guard) =
prepare_incoming_txs(txs, txs_being_handled, &mut txpool_read_handle).await?;
@ -167,6 +167,8 @@ async fn handle_incoming_txs(
/// Prepares the incoming transactions for verification.
///
/// This will filter out all transactions already in the pool or txs already being handled in another request.
///
/// # Returns
async fn prepare_incoming_txs(
tx_blobs: Vec<Bytes>,
txs_being_handled: TxsBeingHandled,
@ -246,9 +248,13 @@ async fn prepare_incoming_txs(
async fn handle_valid_tx(
tx: Arc<TransactionVerificationData>,
state: TxState<NetworkAddress>,
state: TxState<CrossNetworkInternalPeerId>,
txpool_write_handle: &mut TxpoolWriteHandle,
dandelion_pool_manager: &mut DandelionPoolService<DandelionTx, TxId, NetworkAddress>,
dandelion_pool_manager: &mut DandelionPoolService<
DandelionTx,
TxId,
CrossNetworkInternalPeerId,
>,
) {
let incoming_tx =
IncomingTxBuilder::new(DandelionTx(Bytes::copy_from_slice(&tx.tx_blob)), tx.tx_hash);
@ -291,9 +297,13 @@ async fn handle_valid_tx(
async fn rerelay_stem_tx(
tx_hash: &TxId,
state: TxState<NetworkAddress>,
state: TxState<CrossNetworkInternalPeerId>,
txpool_read_handle: &mut TxpoolReadHandle,
dandelion_pool_manager: &mut DandelionPoolService<DandelionTx, TxId, NetworkAddress>,
dandelion_pool_manager: &mut DandelionPoolService<
DandelionTx,
TxId,
CrossNetworkInternalPeerId,
>,
) {
let TxpoolReadResponse::TxBlob { tx_blob, .. } = txpool_read_handle
.ready()