fix build
Some checks failed
Audit / audit (push) Has been cancelled
Deny / audit (push) Has been cancelled

This commit is contained in:
Boog900 2024-10-31 23:32:32 +00:00
parent d982a48b78
commit d0055f16bd
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
6 changed files with 71 additions and 15 deletions

2
Cargo.lock generated
View file

@ -963,7 +963,6 @@ dependencies = [
"monero-serai",
"rayon",
"serde",
"sha3",
"tempfile",
"thiserror",
"tokio",
@ -1064,7 +1063,6 @@ dependencies = [
"serde",
"serde_bytes",
"serde_json",
"sha3",
"thiserror",
"thread_local",
"tokio",

View file

@ -38,7 +38,6 @@ mod commands;
mod handler;
pub use commands::{BlockchainManagerCommand, IncomingBlockOk};
use cuprate_txpool::service::TxpoolWriteHandle;
/// Initialize the blockchain manager.
///

View file

@ -1,8 +1,51 @@
//! P2P
//!
//! Will handle initiating the P2P and contains a protocol request handler.
use crate::txpool::IncomingTxHandler;
use cuprate_blockchain::service::BlockchainReadHandle;
use cuprate_consensus::BlockChainContextService;
use cuprate_p2p::{NetworkInterface, P2PConfig};
use cuprate_p2p_core::ClearNet;
use cuprate_txpool::service::TxpoolReadHandle;
use futures::{FutureExt, TryFutureExt};
use tokio::sync::oneshot;
use tower::ServiceExt;
mod core_sync_service;
mod network_address;
pub mod request_handler;
pub use network_address::CrossNetworkInternalPeerId;
pub async fn start_clearnet_p2p(
blockchain_read_handle: BlockchainReadHandle,
blockchain_context_service: BlockChainContextService,
txpool_read_handle: TxpoolReadHandle,
config: P2PConfig<ClearNet>,
) -> Result<
(
NetworkInterface<ClearNet>,
oneshot::Sender<IncomingTxHandler>,
),
tower::BoxError,
> {
let (incoming_tx_handler_tx, incoming_tx_handler_rx) = oneshot::channel();
let request_handler_maker = request_handler::P2pProtocolRequestHandlerMaker {
blockchain_read_handle,
blockchain_context_service: blockchain_context_service.clone(),
txpool_read_handle,
incoming_tx_handler: None,
incoming_tx_handler_fut: incoming_tx_handler_rx.shared(),
};
Ok((
cuprate_p2p::initialize_network(
request_handler_maker.map_response(|s| s.map_err(Into::into)),
core_sync_service::CoreSyncService(blockchain_context_service),
config,
)
.await?,
incoming_tx_handler_tx,
))
}

View file

@ -42,6 +42,7 @@ use cuprate_wire::protocol::{
use crate::blockchain::interface::{self as blockchain_interface, IncomingBlockError};
use crate::constants::PANIC_CRITICAL_SERVICE_ERROR;
use crate::p2p::CrossNetworkInternalPeerId;
use crate::txpool::{IncomingTxHandler, IncomingTxs};
/// The P2P protocol request handler [`MakeService`](tower::MakeService).
@ -60,8 +61,12 @@ pub struct P2pProtocolRequestHandlerMaker {
pub incoming_tx_handler_fut: Shared<oneshot::Receiver<IncomingTxHandler>>,
}
impl<N: NetZoneAddress> Service<PeerInformation<N>> for P2pProtocolRequestHandlerMaker {
type Response = P2pProtocolRequestHandler<N>;
impl<A: NetZoneAddress> Service<PeerInformation<A>> for P2pProtocolRequestHandlerMaker
where
A: NetZoneAddress,
InternalPeerID<A>: Into<CrossNetworkInternalPeerId>,
{
type Response = P2pProtocolRequestHandler<A>;
type Error = tower::BoxError;
type Future = Ready<Result<Self::Response, Self::Error>>;
@ -79,7 +84,7 @@ impl<N: NetZoneAddress> Service<PeerInformation<N>> for P2pProtocolRequestHandle
Poll::Ready(Ok(()))
}
fn call(&mut self, peer_information: PeerInformation<N>) -> Self::Future {
fn call(&mut self, peer_information: PeerInformation<A>) -> Self::Future {
let Some(incoming_tx_handler) = self.incoming_tx_handler.clone() else {
panic!("poll_ready was not called or did not return `Poll::Ready`")
};
@ -114,7 +119,11 @@ pub struct P2pProtocolRequestHandler<N: NetZoneAddress> {
incoming_tx_handler: IncomingTxHandler,
}
impl<Z: NetZoneAddress> Service<ProtocolRequest> for P2pProtocolRequestHandler<Z> {
impl<A: NetZoneAddress> Service<ProtocolRequest> for P2pProtocolRequestHandler<A>
where
A: NetZoneAddress,
InternalPeerID<A>: Into<CrossNetworkInternalPeerId>,
{
type Response = ProtocolResponse;
type Error = anyhow::Error;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
@ -355,12 +364,16 @@ async fn new_fluffy_block<A: NetZoneAddress>(
}
/// [`ProtocolRequest::NewTransactions`]
async fn new_transactions<A: NetZoneAddress>(
async fn new_transactions<A>(
peer_information: PeerInformation<A>,
request: NewTransactions,
mut blockchain_context_service: BlockChainContextService,
mut incoming_tx_handler: IncomingTxHandler,
) -> anyhow::Result<ProtocolResponse> {
) -> anyhow::Result<ProtocolResponse>
where
A: NetZoneAddress,
InternalPeerID<A>: Into<CrossNetworkInternalPeerId>,
{
let BlockChainContextResponse::Context(context) = blockchain_context_service
.ready()
.await
@ -387,11 +400,9 @@ async fn new_transactions<A: NetZoneAddress>(
let state = if request.dandelionpp_fluff {
TxState::Fluff
} else {
let InternalPeerID::KnownAddr(addr) = peer_information.id else {
todo!("Anonymity networks")
};
TxState::Stem { from: addr.into() }
TxState::Stem {
from: peer_information.id.into(),
}
};
drop(request.padding);

View file

@ -12,4 +12,4 @@ mod dandelion;
mod incoming_tx;
mod txs_being_handled;
pub use incoming_tx::IncomingTxHandler;
pub use incoming_tx::{IncomingTxError, IncomingTxHandler, IncomingTxs};

View file

@ -43,9 +43,13 @@ use crate::{
};
/// An error that can happen handling an incoming tx.
#[derive(Debug, thiserror::Error)]
pub enum IncomingTxError {
#[error("Error parsing tx: {0}")]
Parse(std::io::Error),
#[error(transparent)]
Consensus(ExtendedConsensusError),
#[error("Duplicate tx in message")]
DuplicateTransaction,
}
@ -67,6 +71,7 @@ pub(super) type TxId = [u8; 32];
/// The service than handles incoming transaction pool transactions.
///
/// This service handles everything including verifying the tx, adding it to the pool and routing it to other nodes.
#[derive(Clone)]
pub struct IncomingTxHandler {
/// A store of txs currently being handled in incoming tx requests.
pub(super) txs_being_handled: TxsBeingHandled,