mirror of
https://github.com/Cuprate/cuprate.git
synced 2024-11-16 15:58:17 +00:00
change p2p
to not use the peer sync service
This commit is contained in:
parent
01e23258c0
commit
7ae07da6a7
12 changed files with 199 additions and 560 deletions
131
Cargo.lock
generated
131
Cargo.lock
generated
|
@ -736,6 +736,39 @@ dependencies = [
|
|||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cuprate-p2p"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"borsh",
|
||||
"bytes",
|
||||
"cuprate-address-book",
|
||||
"cuprate-async-buffer",
|
||||
"cuprate-fixed-bytes",
|
||||
"cuprate-helper",
|
||||
"cuprate-p2p-core",
|
||||
"cuprate-pruning",
|
||||
"cuprate-test-utils",
|
||||
"cuprate-types",
|
||||
"cuprate-wire",
|
||||
"dashmap",
|
||||
"futures",
|
||||
"indexmap",
|
||||
"monero-serai",
|
||||
"pin-project",
|
||||
"proptest",
|
||||
"rand",
|
||||
"rand_distr",
|
||||
"rayon",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tokio-test",
|
||||
"tokio-util",
|
||||
"tower",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cuprate-p2p-core"
|
||||
version = "0.1.0"
|
||||
|
@ -877,6 +910,72 @@ dependencies = [
|
|||
"thiserror",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cuprated"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"bitflags 2.6.0",
|
||||
"borsh",
|
||||
"bytemuck",
|
||||
"bytes",
|
||||
"cfg-if",
|
||||
"chrono",
|
||||
"clap",
|
||||
"crossbeam",
|
||||
"crypto-bigint",
|
||||
"cuprate-address-book",
|
||||
"cuprate-async-buffer",
|
||||
"cuprate-blockchain",
|
||||
"cuprate-consensus",
|
||||
"cuprate-consensus-rules",
|
||||
"cuprate-cryptonight",
|
||||
"cuprate-dandelion-tower",
|
||||
"cuprate-database",
|
||||
"cuprate-database-service",
|
||||
"cuprate-epee-encoding",
|
||||
"cuprate-fast-sync",
|
||||
"cuprate-fixed-bytes",
|
||||
"cuprate-helper",
|
||||
"cuprate-json-rpc",
|
||||
"cuprate-levin",
|
||||
"cuprate-p2p",
|
||||
"cuprate-p2p-core",
|
||||
"cuprate-pruning",
|
||||
"cuprate-rpc-interface",
|
||||
"cuprate-rpc-types",
|
||||
"cuprate-test-utils",
|
||||
"cuprate-txpool",
|
||||
"cuprate-types",
|
||||
"cuprate-wire",
|
||||
"curve25519-dalek",
|
||||
"dashmap",
|
||||
"dirs",
|
||||
"futures",
|
||||
"hex",
|
||||
"hex-literal",
|
||||
"indexmap",
|
||||
"monero-serai",
|
||||
"paste",
|
||||
"pin-project",
|
||||
"rand",
|
||||
"rand_distr",
|
||||
"randomx-rs",
|
||||
"rayon",
|
||||
"serde",
|
||||
"serde_bytes",
|
||||
"serde_json",
|
||||
"thiserror",
|
||||
"thread_local",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
"tower",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "curve25519-dalek"
|
||||
version = "4.1.3"
|
||||
|
@ -922,6 +1021,19 @@ dependencies = [
|
|||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "dashmap"
|
||||
version = "5.5.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"hashbrown",
|
||||
"lock_api",
|
||||
"once_cell",
|
||||
"parking_lot_core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "diff"
|
||||
version = "0.1.13"
|
||||
|
@ -2306,6 +2418,15 @@ dependencies = [
|
|||
"serde_derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_bytes"
|
||||
version = "0.11.15"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "387cc504cb06bb40a96c8e04e951fe01854cf6bc921053c954e4a606d9675c6a"
|
||||
dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_derive"
|
||||
version = "1.0.210"
|
||||
|
@ -2679,6 +2800,7 @@ dependencies = [
|
|||
"pin-project",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
"tracing",
|
||||
|
@ -2728,6 +2850,15 @@ dependencies = [
|
|||
"once_cell",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-subscriber"
|
||||
version = "0.3.18"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b"
|
||||
dependencies = [
|
||||
"tracing-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "try-lock"
|
||||
version = "0.2.5"
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
resolver = "2"
|
||||
|
||||
members = [
|
||||
#"binaries/cuprated",
|
||||
"binaries/cuprated",
|
||||
"consensus",
|
||||
"consensus/fast-sync",
|
||||
"consensus/rules",
|
||||
|
@ -12,7 +12,7 @@ members = [
|
|||
"net/fixed-bytes",
|
||||
"net/levin",
|
||||
"net/wire",
|
||||
#"p2p/p2p",
|
||||
"p2p/p2p",
|
||||
"p2p/p2p-core",
|
||||
"p2p/dandelion-tower",
|
||||
"p2p/async-buffer",
|
||||
|
|
|
@ -27,6 +27,7 @@ mod request_handler;
|
|||
mod timeout_monitor;
|
||||
|
||||
pub use connector::{ConnectRequest, Connector};
|
||||
use cuprate_pruning::PruningSeed;
|
||||
use cuprate_wire::CoreSyncData;
|
||||
pub use handshaker::{DoHandshakeRequest, HandshakeError, HandshakerBuilder};
|
||||
|
||||
|
@ -59,6 +60,8 @@ pub struct PeerInformation<A> {
|
|||
pub handle: ConnectionHandle,
|
||||
/// The direction of this connection (inbound|outbound).
|
||||
pub direction: ConnectionDirection,
|
||||
/// The peer's [`PruningSeed`].
|
||||
pub pruning_seed: PruningSeed,
|
||||
/// The [`CoreSyncData`] of this peer.
|
||||
///
|
||||
/// Data across fields are not necessarily related, so [`CoreSyncData::top_id`] is not always the
|
||||
|
|
|
@ -455,6 +455,7 @@ where
|
|||
id: addr,
|
||||
handle,
|
||||
direction,
|
||||
pruning_seed: PruningSeed::decompress_p2p_rules(peer_core_sync.pruning_seed)?,
|
||||
core_sync_data: Arc::new(Mutex::new(peer_core_sync)),
|
||||
};
|
||||
|
||||
|
|
|
@ -30,7 +30,7 @@ thiserror = { workspace = true }
|
|||
bytes = { workspace = true, features = ["std"] }
|
||||
rand = { workspace = true, features = ["std", "std_rng"] }
|
||||
rand_distr = { workspace = true, features = ["std"] }
|
||||
hex = { workspace = true, features = ["std"] }
|
||||
##hex = { workspace = true, features = ["std"] }
|
||||
tracing = { workspace = true, features = ["std", "attributes"] }
|
||||
borsh = { workspace = true, features = ["derive", "std"] }
|
||||
|
||||
|
|
|
@ -22,11 +22,7 @@ use tower::{Service, ServiceExt};
|
|||
use tracing::{instrument, Instrument, Span};
|
||||
|
||||
use cuprate_async_buffer::{BufferAppender, BufferStream};
|
||||
use cuprate_p2p_core::{
|
||||
handles::ConnectionHandle,
|
||||
services::{PeerSyncRequest, PeerSyncResponse},
|
||||
NetworkZone, PeerSyncSvc,
|
||||
};
|
||||
use cuprate_p2p_core::{handles::ConnectionHandle, NetworkZone};
|
||||
use cuprate_pruning::{PruningSeed, CRYPTONOTE_MAX_BLOCK_HEIGHT};
|
||||
|
||||
use crate::{
|
||||
|
@ -137,14 +133,12 @@ pub enum ChainSvcResponse {
|
|||
/// The block downloader may fail before the whole chain is downloaded. If this is the case you can
|
||||
/// call this function again, so it can start the search again.
|
||||
#[instrument(level = "error", skip_all, name = "block_downloader")]
|
||||
pub fn download_blocks<N: NetworkZone, S, C>(
|
||||
pub fn download_blocks<N: NetworkZone, C>(
|
||||
client_pool: Arc<ClientPool<N>>,
|
||||
peer_sync_svc: S,
|
||||
our_chain_svc: C,
|
||||
config: BlockDownloaderConfig,
|
||||
) -> BufferStream<BlockBatch>
|
||||
where
|
||||
S: PeerSyncSvc<N> + Clone,
|
||||
C: Service<ChainSvcRequest, Response = ChainSvcResponse, Error = tower::BoxError>
|
||||
+ Send
|
||||
+ 'static,
|
||||
|
@ -152,13 +146,8 @@ where
|
|||
{
|
||||
let (buffer_appender, buffer_stream) = cuprate_async_buffer::new_buffer(config.buffer_size);
|
||||
|
||||
let block_downloader = BlockDownloader::new(
|
||||
client_pool,
|
||||
peer_sync_svc,
|
||||
our_chain_svc,
|
||||
buffer_appender,
|
||||
config,
|
||||
);
|
||||
let block_downloader =
|
||||
BlockDownloader::new(client_pool, our_chain_svc, buffer_appender, config);
|
||||
|
||||
tokio::spawn(
|
||||
block_downloader
|
||||
|
@ -195,12 +184,10 @@ where
|
|||
/// - request the next chain entry
|
||||
/// - download an already requested batch of blocks (this might happen due to an error in the previous request
|
||||
/// or because the queue of ready blocks is too large, so we need the oldest block to clear it).
|
||||
struct BlockDownloader<N: NetworkZone, S, C> {
|
||||
struct BlockDownloader<N: NetworkZone, C> {
|
||||
/// The client pool.
|
||||
client_pool: Arc<ClientPool<N>>,
|
||||
|
||||
/// The service that holds the peer's sync states.
|
||||
peer_sync_svc: S,
|
||||
/// The service that holds our current chain state.
|
||||
our_chain_svc: C,
|
||||
|
||||
|
@ -238,9 +225,8 @@ struct BlockDownloader<N: NetworkZone, S, C> {
|
|||
config: BlockDownloaderConfig,
|
||||
}
|
||||
|
||||
impl<N: NetworkZone, S, C> BlockDownloader<N, S, C>
|
||||
impl<N: NetworkZone, C> BlockDownloader<N, C>
|
||||
where
|
||||
S: PeerSyncSvc<N> + Clone,
|
||||
C: Service<ChainSvcRequest, Response = ChainSvcResponse, Error = tower::BoxError>
|
||||
+ Send
|
||||
+ 'static,
|
||||
|
@ -249,16 +235,12 @@ where
|
|||
/// Creates a new [`BlockDownloader`]
|
||||
fn new(
|
||||
client_pool: Arc<ClientPool<N>>,
|
||||
|
||||
peer_sync_svc: S,
|
||||
our_chain_svc: C,
|
||||
buffer_appender: BufferAppender<BlockBatch>,
|
||||
|
||||
config: BlockDownloaderConfig,
|
||||
) -> Self {
|
||||
Self {
|
||||
client_pool,
|
||||
peer_sync_svc,
|
||||
our_chain_svc,
|
||||
amount_of_blocks_to_request: config.initial_batch_size,
|
||||
amount_of_blocks_to_request_updated_at: 0,
|
||||
|
@ -495,22 +477,10 @@ where
|
|||
panic!("Chain service returned wrong response.");
|
||||
};
|
||||
|
||||
let PeerSyncResponse::PeersToSyncFrom(peers) = self
|
||||
.peer_sync_svc
|
||||
.ready()
|
||||
.await?
|
||||
.call(PeerSyncRequest::PeersToSyncFrom {
|
||||
current_cumulative_difficulty,
|
||||
block_needed: None,
|
||||
})
|
||||
.await?
|
||||
else {
|
||||
panic!("Peer sync service returned wrong response.");
|
||||
};
|
||||
|
||||
tracing::debug!("Response received from peer sync service");
|
||||
|
||||
for client in self.client_pool.borrow_clients(&peers) {
|
||||
for client in self
|
||||
.client_pool
|
||||
.clients_with_more_cumulative_difficulty(current_cumulative_difficulty)
|
||||
{
|
||||
pending_peers
|
||||
.entry(client.info.pruning_seed)
|
||||
.or_default()
|
||||
|
@ -621,12 +591,8 @@ where
|
|||
|
||||
/// Starts the main loop of the block downloader.
|
||||
async fn run(mut self) -> Result<(), BlockDownloadError> {
|
||||
let mut chain_tracker = initial_chain_search(
|
||||
&self.client_pool,
|
||||
self.peer_sync_svc.clone(),
|
||||
&mut self.our_chain_svc,
|
||||
)
|
||||
.await?;
|
||||
let mut chain_tracker =
|
||||
initial_chain_search(&self.client_pool, &mut self.our_chain_svc).await?;
|
||||
|
||||
let mut pending_peers = BTreeMap::new();
|
||||
|
||||
|
|
|
@ -1,16 +1,12 @@
|
|||
use std::{mem, sync::Arc};
|
||||
|
||||
use rand::prelude::SliceRandom;
|
||||
use rand::thread_rng;
|
||||
use tokio::{task::JoinSet, time::timeout};
|
||||
use tower::{Service, ServiceExt};
|
||||
use tracing::{instrument, Instrument, Span};
|
||||
|
||||
use cuprate_p2p_core::{
|
||||
client::InternalPeerID,
|
||||
handles::ConnectionHandle,
|
||||
services::{PeerSyncRequest, PeerSyncResponse},
|
||||
NetworkZone, PeerRequest, PeerResponse, PeerSyncSvc, ProtocolRequest, ProtocolResponse,
|
||||
client::InternalPeerID, handles::ConnectionHandle, NetworkZone, PeerRequest, PeerResponse,
|
||||
ProtocolRequest, ProtocolResponse,
|
||||
};
|
||||
use cuprate_wire::protocol::{ChainRequest, ChainResponse};
|
||||
|
||||
|
@ -83,13 +79,11 @@ pub(crate) async fn request_chain_entry_from_peer<N: NetworkZone>(
|
|||
///
|
||||
/// We then wait for their response and choose the peer who claims the highest cumulative difficulty.
|
||||
#[instrument(level = "error", skip_all)]
|
||||
pub async fn initial_chain_search<N: NetworkZone, S, C>(
|
||||
pub async fn initial_chain_search<N: NetworkZone, C>(
|
||||
client_pool: &Arc<ClientPool<N>>,
|
||||
mut peer_sync_svc: S,
|
||||
mut our_chain_svc: C,
|
||||
) -> Result<ChainTracker<N>, BlockDownloadError>
|
||||
where
|
||||
S: PeerSyncSvc<N>,
|
||||
C: Service<ChainSvcRequest, Response = ChainSvcResponse, Error = tower::BoxError>,
|
||||
{
|
||||
tracing::debug!("Getting our chain history");
|
||||
|
@ -108,29 +102,9 @@ where
|
|||
|
||||
let our_genesis = *block_ids.last().expect("Blockchain had no genesis block.");
|
||||
|
||||
tracing::debug!("Getting a list of peers with higher cumulative difficulty");
|
||||
|
||||
let PeerSyncResponse::PeersToSyncFrom(mut peers) = peer_sync_svc
|
||||
.ready()
|
||||
.await?
|
||||
.call(PeerSyncRequest::PeersToSyncFrom {
|
||||
block_needed: None,
|
||||
current_cumulative_difficulty: cumulative_difficulty,
|
||||
})
|
||||
.await?
|
||||
else {
|
||||
panic!("peer sync service sent wrong response.");
|
||||
};
|
||||
|
||||
tracing::debug!(
|
||||
"{} peers claim they have a higher cumulative difficulty",
|
||||
peers.len()
|
||||
);
|
||||
|
||||
// Shuffle the list to remove any possibility of peers being able to prioritize getting picked.
|
||||
peers.shuffle(&mut thread_rng());
|
||||
|
||||
let mut peers = client_pool.borrow_clients(&peers);
|
||||
let mut peers = client_pool
|
||||
.clients_with_more_cumulative_difficulty(cumulative_difficulty)
|
||||
.into_iter();
|
||||
|
||||
let mut futs = JoinSet::new();
|
||||
|
||||
|
|
|
@ -1,3 +1,11 @@
|
|||
use futures::{FutureExt, StreamExt};
|
||||
use indexmap::IndexMap;
|
||||
use monero_serai::{
|
||||
block::{Block, BlockHeader},
|
||||
transaction::{Input, Timelock, Transaction, TransactionPrefix},
|
||||
};
|
||||
use proptest::{collection::vec, prelude::*};
|
||||
use std::sync::Mutex;
|
||||
use std::{
|
||||
fmt::{Debug, Formatter},
|
||||
future::Future,
|
||||
|
@ -6,27 +14,19 @@ use std::{
|
|||
task::{Context, Poll},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use futures::{FutureExt, StreamExt};
|
||||
use indexmap::IndexMap;
|
||||
use monero_serai::{
|
||||
block::{Block, BlockHeader},
|
||||
transaction::{Input, Timelock, Transaction, TransactionPrefix},
|
||||
};
|
||||
use proptest::{collection::vec, prelude::*};
|
||||
use tokio::time::timeout;
|
||||
use tower::{service_fn, Service};
|
||||
|
||||
use cuprate_fixed_bytes::ByteArrayVec;
|
||||
use cuprate_p2p_core::{
|
||||
client::{mock_client, Client, InternalPeerID, PeerInformation},
|
||||
services::{PeerSyncRequest, PeerSyncResponse},
|
||||
ClearNet, ConnectionDirection, NetworkZone, PeerRequest, PeerResponse, ProtocolRequest,
|
||||
ClearNet, ConnectionDirection, PeerRequest, PeerResponse, ProtocolRequest,
|
||||
ProtocolResponse,
|
||||
};
|
||||
use cuprate_pruning::PruningSeed;
|
||||
use cuprate_types::{BlockCompleteEntry, TransactionBlobs};
|
||||
use cuprate_wire::protocol::{ChainResponse, GetObjectsResponse};
|
||||
use cuprate_wire::CoreSyncData;
|
||||
|
||||
use crate::{
|
||||
block_downloader::{download_blocks, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse},
|
||||
|
@ -64,7 +64,6 @@ proptest! {
|
|||
|
||||
let stream = download_blocks(
|
||||
client_pool,
|
||||
SyncStateSvc(peer_ids) ,
|
||||
OurChainSvc {
|
||||
genesis: *blockchain.blocks.first().unwrap().0
|
||||
},
|
||||
|
@ -255,31 +254,19 @@ fn mock_block_downloader_client(blockchain: Arc<MockBlockchain>) -> Client<Clear
|
|||
handle: connection_handle,
|
||||
direction: ConnectionDirection::Inbound,
|
||||
pruning_seed: PruningSeed::NotPruned,
|
||||
core_sync_data: Arc::new(Mutex::new(CoreSyncData {
|
||||
cumulative_difficulty: u64::MAX,
|
||||
cumulative_difficulty_top64: u64::MAX,
|
||||
current_height: 0,
|
||||
pruning_seed: 0,
|
||||
top_id: [0; 32],
|
||||
top_version: 0,
|
||||
})),
|
||||
};
|
||||
|
||||
mock_client(info, connection_guard, request_handler)
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct SyncStateSvc<Z: NetworkZone>(Vec<InternalPeerID<Z::Addr>>);
|
||||
|
||||
impl Service<PeerSyncRequest<ClearNet>> for SyncStateSvc<ClearNet> {
|
||||
type Response = PeerSyncResponse<ClearNet>;
|
||||
type Error = tower::BoxError;
|
||||
type Future =
|
||||
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
|
||||
|
||||
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn call(&mut self, _: PeerSyncRequest<ClearNet>) -> Self::Future {
|
||||
let peers = self.0.clone();
|
||||
|
||||
async move { Ok(PeerSyncResponse::PeersToSyncFrom(peers)) }.boxed()
|
||||
}
|
||||
}
|
||||
|
||||
struct OurChainSvc {
|
||||
genesis: [u8; 32],
|
||||
}
|
||||
|
|
|
@ -127,6 +127,27 @@ impl<N: NetworkZone> ClientPool<N> {
|
|||
) -> impl Iterator<Item = ClientPoolDropGuard<N>> + sealed::Captures<(&'a (), &'b ())> {
|
||||
peers.iter().filter_map(|peer| self.borrow_client(peer))
|
||||
}
|
||||
|
||||
pub fn clients_with_more_cumulative_difficulty(
|
||||
self: &Arc<Self>,
|
||||
cumulative_difficulty: u128,
|
||||
) -> Vec<ClientPoolDropGuard<N>> {
|
||||
let peers = self
|
||||
.clients
|
||||
.iter()
|
||||
.filter_map(|element| {
|
||||
let peer_sync_info = element.value().info.core_sync_data.lock().unwrap();
|
||||
|
||||
if peer_sync_info.cumulative_difficulty() > cumulative_difficulty {
|
||||
Some(*element.key())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
self.borrow_clients(&peers).collect()
|
||||
}
|
||||
}
|
||||
|
||||
mod sealed {
|
||||
|
|
|
@ -15,6 +15,7 @@ pub(crate) const MAX_SEED_CONNECTIONS: usize = 3;
|
|||
/// The timeout for when we fail to find a peer to connect to.
|
||||
pub(crate) const OUTBOUND_CONNECTION_ATTEMPT_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
|
||||
#[allow(dead_code)]
|
||||
/// The durations of a short ban.
|
||||
pub(crate) const SHORT_BAN: Duration = Duration::from_secs(60 * 10);
|
||||
|
||||
|
|
|
@ -6,10 +6,9 @@ use std::sync::Arc;
|
|||
|
||||
use futures::FutureExt;
|
||||
use tokio::{
|
||||
sync::{mpsc, watch},
|
||||
sync::mpsc,
|
||||
task::JoinSet,
|
||||
};
|
||||
use tokio_stream::wrappers::WatchStream;
|
||||
use tower::{buffer::Buffer, util::BoxCloneService, Service, ServiceExt};
|
||||
use tracing::{instrument, Instrument, Span};
|
||||
|
||||
|
@ -17,7 +16,7 @@ use cuprate_async_buffer::BufferStream;
|
|||
use cuprate_p2p_core::{
|
||||
client::Connector,
|
||||
client::InternalPeerID,
|
||||
services::{AddressBookRequest, AddressBookResponse, PeerSyncRequest},
|
||||
services::{AddressBookRequest, AddressBookResponse},
|
||||
CoreSyncSvc, NetworkZone, ProtocolRequestHandler,
|
||||
};
|
||||
|
||||
|
@ -28,7 +27,6 @@ pub mod config;
|
|||
pub mod connection_maintainer;
|
||||
mod constants;
|
||||
mod inbound_server;
|
||||
mod sync_states;
|
||||
|
||||
use block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse};
|
||||
pub use broadcast::{BroadcastRequest, BroadcastSvc};
|
||||
|
@ -63,12 +61,6 @@ where
|
|||
config.max_inbound_connections + config.outbound_connections,
|
||||
);
|
||||
|
||||
let (sync_states_svc, top_block_watch) = sync_states::PeerSyncSvc::new();
|
||||
let sync_states_svc = Buffer::new(
|
||||
sync_states_svc,
|
||||
config.max_inbound_connections + config.outbound_connections,
|
||||
);
|
||||
|
||||
// Use the default config. Changing the defaults affects tx fluff times, which could affect D++ so for now don't allow changing
|
||||
// this.
|
||||
let (broadcast_svc, outbound_mkr, inbound_mkr) =
|
||||
|
@ -83,7 +75,6 @@ where
|
|||
let outbound_handshaker_builder =
|
||||
cuprate_p2p_core::client::HandshakerBuilder::new(basic_node_data)
|
||||
.with_address_book(address_book.clone())
|
||||
.with_peer_sync_svc(sync_states_svc.clone())
|
||||
.with_core_sync_svc(core_sync_svc)
|
||||
.with_protocol_request_handler(protocol_request_handler)
|
||||
.with_broadcast_stream_maker(outbound_mkr)
|
||||
|
@ -136,9 +127,7 @@ where
|
|||
Ok(NetworkInterface {
|
||||
pool: client_pool,
|
||||
broadcast_svc,
|
||||
top_block_watch,
|
||||
make_connection_tx,
|
||||
sync_states_svc,
|
||||
address_book: address_book.boxed_clone(),
|
||||
_background_tasks: Arc::new(background_tasks),
|
||||
})
|
||||
|
@ -151,16 +140,11 @@ pub struct NetworkInterface<N: NetworkZone> {
|
|||
pool: Arc<client_pool::ClientPool<N>>,
|
||||
/// A [`Service`] that allows broadcasting to all connected peers.
|
||||
broadcast_svc: BroadcastSvc<N>,
|
||||
/// A [`watch`] channel that contains the highest seen cumulative difficulty and other info
|
||||
/// on that claimed chain.
|
||||
top_block_watch: watch::Receiver<sync_states::NewSyncInfo>,
|
||||
/// A channel to request extra connections.
|
||||
#[expect(dead_code, reason = "will be used eventually")]
|
||||
make_connection_tx: mpsc::Sender<MakeConnectionRequest>,
|
||||
/// The address book service.
|
||||
address_book: BoxCloneService<AddressBookRequest<N>, AddressBookResponse<N>, tower::BoxError>,
|
||||
/// The peer's sync states service.
|
||||
sync_states_svc: Buffer<sync_states::PeerSyncSvc<N>, PeerSyncRequest<N>>,
|
||||
/// Background tasks that will be aborted when this interface is dropped.
|
||||
_background_tasks: Arc<JoinSet<()>>,
|
||||
}
|
||||
|
@ -183,17 +167,7 @@ impl<N: NetworkZone> NetworkInterface<N> {
|
|||
+ 'static,
|
||||
C::Future: Send + 'static,
|
||||
{
|
||||
block_downloader::download_blocks(
|
||||
Arc::clone(&self.pool),
|
||||
self.sync_states_svc.clone(),
|
||||
our_chain_service,
|
||||
config,
|
||||
)
|
||||
}
|
||||
|
||||
/// Returns a stream which yields the highest seen sync state from a connected peer.
|
||||
pub fn top_sync_stream(&self) -> WatchStream<sync_states::NewSyncInfo> {
|
||||
WatchStream::from_changes(self.top_block_watch.clone())
|
||||
block_downloader::download_blocks(Arc::clone(&self.pool), our_chain_service, config)
|
||||
}
|
||||
|
||||
/// Returns the address book service.
|
||||
|
|
|
@ -1,419 +0,0 @@
|
|||
//! # Sync States
|
||||
//!
|
||||
//! This module contains a [`PeerSyncSvc`], which keeps track of the claimed chain states of connected peers.
|
||||
//! This allows checking if we are behind and getting a list of peers who claim they are ahead.
|
||||
use std::{
|
||||
cmp::Ordering,
|
||||
collections::{BTreeMap, HashMap, HashSet},
|
||||
future::{ready, Ready},
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use futures::{stream::FuturesUnordered, StreamExt};
|
||||
use tokio::sync::watch;
|
||||
use tower::Service;
|
||||
|
||||
use cuprate_p2p_core::{
|
||||
client::InternalPeerID,
|
||||
handles::ConnectionHandle,
|
||||
NetworkZone,
|
||||
};
|
||||
use cuprate_pruning::{PruningSeed, CRYPTONOTE_MAX_BLOCK_HEIGHT};
|
||||
use cuprate_wire::CoreSyncData;
|
||||
|
||||
use crate::{client_pool::disconnect_monitor::PeerDisconnectFut, constants::SHORT_BAN};
|
||||
|
||||
/// The highest claimed sync info from our connected peers.
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
pub struct NewSyncInfo {
|
||||
/// The peers chain height.
|
||||
pub chain_height: u64,
|
||||
/// The peers top block's hash.
|
||||
pub top_hash: [u8; 32],
|
||||
/// The peers cumulative difficulty.
|
||||
pub cumulative_difficulty: u128,
|
||||
}
|
||||
|
||||
/// A service that keeps track of our peers blockchains.
|
||||
///
|
||||
/// This is the service that handles:
|
||||
/// 1. Finding out if we need to sync
|
||||
/// 1. Giving the peers that should be synced _from_, to the requester
|
||||
pub(crate) struct PeerSyncSvc<N: NetworkZone> {
|
||||
/// A map of cumulative difficulties to peers.
|
||||
cumulative_difficulties: BTreeMap<u128, HashSet<InternalPeerID<N::Addr>>>,
|
||||
/// A map of peers to cumulative difficulties.
|
||||
peers: HashMap<InternalPeerID<N::Addr>, (u128, PruningSeed)>,
|
||||
/// A watch channel for *a* top synced peer info.
|
||||
new_height_watcher: watch::Sender<NewSyncInfo>,
|
||||
/// The handle to the peer that has data in `new_height_watcher`.
|
||||
last_peer_in_watcher_handle: Option<ConnectionHandle>,
|
||||
/// A [`FuturesUnordered`] that resolves when a peer disconnects.
|
||||
closed_connections: FuturesUnordered<PeerDisconnectFut<N>>,
|
||||
}
|
||||
|
||||
impl<N: NetworkZone> PeerSyncSvc<N> {
|
||||
/// Creates a new [`PeerSyncSvc`] with a [`Receiver`](watch::Receiver) that will be updated with
|
||||
/// the highest seen sync data, this makes no guarantees about which peer will be chosen in case of a tie.
|
||||
pub(crate) fn new() -> (Self, watch::Receiver<NewSyncInfo>) {
|
||||
let (watch_tx, mut watch_rx) = watch::channel(NewSyncInfo {
|
||||
chain_height: 0,
|
||||
top_hash: [0; 32],
|
||||
cumulative_difficulty: 0,
|
||||
});
|
||||
|
||||
watch_rx.mark_unchanged();
|
||||
|
||||
(
|
||||
Self {
|
||||
cumulative_difficulties: BTreeMap::new(),
|
||||
peers: HashMap::new(),
|
||||
new_height_watcher: watch_tx,
|
||||
last_peer_in_watcher_handle: None,
|
||||
closed_connections: FuturesUnordered::new(),
|
||||
},
|
||||
watch_rx,
|
||||
)
|
||||
}
|
||||
|
||||
/// This function checks if any peers have disconnected, removing them if they have.
|
||||
fn poll_disconnected(&mut self, cx: &mut Context<'_>) {
|
||||
while let Poll::Ready(Some(peer_id)) = self.closed_connections.poll_next_unpin(cx) {
|
||||
tracing::trace!("Peer {peer_id} disconnected, removing from peers sync info service.");
|
||||
let (peer_cum_diff, _) = self.peers.remove(&peer_id).unwrap();
|
||||
|
||||
let cum_diff_peers = self
|
||||
.cumulative_difficulties
|
||||
.get_mut(&peer_cum_diff)
|
||||
.unwrap();
|
||||
cum_diff_peers.remove(&peer_id);
|
||||
if cum_diff_peers.is_empty() {
|
||||
// If this was the last peer remove the whole entry for this cumulative difficulty.
|
||||
self.cumulative_difficulties.remove(&peer_cum_diff);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a list of peers that claim to have a higher cumulative difficulty than `current_cum_diff`.
|
||||
fn peers_to_sync_from(
|
||||
&self,
|
||||
current_cum_diff: u128,
|
||||
block_needed: Option<usize>,
|
||||
) -> Vec<InternalPeerID<N::Addr>> {
|
||||
self.cumulative_difficulties
|
||||
.range((current_cum_diff + 1)..)
|
||||
.flat_map(|(_, peers)| peers)
|
||||
.filter(|peer| {
|
||||
if let Some(block_needed) = block_needed {
|
||||
// we just use CRYPTONOTE_MAX_BLOCK_HEIGHT as the blockchain height, this only means
|
||||
// we don't take into account the tip blocks which are not pruned.
|
||||
self.peers[peer]
|
||||
.1
|
||||
.has_full_block(block_needed, CRYPTONOTE_MAX_BLOCK_HEIGHT)
|
||||
} else {
|
||||
true
|
||||
}
|
||||
})
|
||||
.copied()
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Updates a peers sync state.
|
||||
fn update_peer_sync_info(
|
||||
&mut self,
|
||||
peer_id: InternalPeerID<N::Addr>,
|
||||
handle: ConnectionHandle,
|
||||
core_sync_data: &CoreSyncData,
|
||||
) -> Result<(), tower::BoxError> {
|
||||
tracing::trace!(
|
||||
"Received new core sync data from peer, top hash: {}",
|
||||
hex::encode(core_sync_data.top_id)
|
||||
);
|
||||
|
||||
let new_cumulative_difficulty = core_sync_data.cumulative_difficulty();
|
||||
|
||||
if let Some((old_cum_diff, _)) = self.peers.get_mut(&peer_id) {
|
||||
match (*old_cum_diff).cmp(&new_cumulative_difficulty) {
|
||||
Ordering::Equal => {
|
||||
// If the cumulative difficulty of the peers chain hasn't changed then no need to update anything.
|
||||
return Ok(());
|
||||
}
|
||||
Ordering::Greater => {
|
||||
// This will only happen if a peer lowers its cumulative difficulty during the connection.
|
||||
// This won't happen if a peer re-syncs their blockchain as then the connection would have closed.
|
||||
tracing::debug!(
|
||||
"Peer's claimed cumulative difficulty has dropped, closing connection and banning peer for: {} seconds.", SHORT_BAN.as_secs()
|
||||
);
|
||||
handle.ban_peer(SHORT_BAN);
|
||||
return Err("Peers cumulative difficulty dropped".into());
|
||||
}
|
||||
Ordering::Less => (),
|
||||
}
|
||||
|
||||
// Remove the old cumulative difficulty entry for this peer
|
||||
let old_cum_diff_peers = self.cumulative_difficulties.get_mut(old_cum_diff).unwrap();
|
||||
old_cum_diff_peers.remove(&peer_id);
|
||||
if old_cum_diff_peers.is_empty() {
|
||||
// If this was the last peer remove the whole entry for this cumulative difficulty.
|
||||
self.cumulative_difficulties.remove(old_cum_diff);
|
||||
}
|
||||
// update the cumulative difficulty
|
||||
*old_cum_diff = new_cumulative_difficulty;
|
||||
} else {
|
||||
// The peer is new so add it the list of peers.
|
||||
self.peers.insert(
|
||||
peer_id,
|
||||
(
|
||||
new_cumulative_difficulty,
|
||||
PruningSeed::decompress_p2p_rules(core_sync_data.pruning_seed)?,
|
||||
),
|
||||
);
|
||||
|
||||
// add it to the list of peers to watch for disconnection.
|
||||
self.closed_connections.push(PeerDisconnectFut {
|
||||
closed_fut: handle.closed(),
|
||||
peer_id: Some(peer_id),
|
||||
});
|
||||
}
|
||||
|
||||
self.cumulative_difficulties
|
||||
.entry(new_cumulative_difficulty)
|
||||
.or_default()
|
||||
.insert(peer_id);
|
||||
|
||||
// If the claimed cumulative difficulty is higher than the current one in the watcher
|
||||
// or if the peer in the watch has disconnected, update it.
|
||||
if self.new_height_watcher.borrow().cumulative_difficulty < new_cumulative_difficulty
|
||||
|| self
|
||||
.last_peer_in_watcher_handle
|
||||
.as_ref()
|
||||
.is_some_and(ConnectionHandle::is_closed)
|
||||
{
|
||||
tracing::debug!(
|
||||
"Updating sync watcher channel with new highest seen cumulative difficulty: {new_cumulative_difficulty}"
|
||||
);
|
||||
#[expect(
|
||||
clippy::let_underscore_must_use,
|
||||
reason = "dropped receivers can be ignored"
|
||||
)]
|
||||
let _ = self.new_height_watcher.send(NewSyncInfo {
|
||||
top_hash: core_sync_data.top_id,
|
||||
chain_height: core_sync_data.current_height,
|
||||
cumulative_difficulty: new_cumulative_difficulty,
|
||||
});
|
||||
self.last_peer_in_watcher_handle.replace(handle);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<N: NetworkZone> Service<PeerSyncRequest<N>> for PeerSyncSvc<N> {
|
||||
type Response = PeerSyncResponse<N>;
|
||||
type Error = tower::BoxError;
|
||||
type Future = Ready<Result<Self::Response, Self::Error>>;
|
||||
|
||||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.poll_disconnected(cx);
|
||||
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn call(&mut self, req: PeerSyncRequest<N>) -> Self::Future {
|
||||
let res = match req {
|
||||
PeerSyncRequest::PeersToSyncFrom {
|
||||
current_cumulative_difficulty,
|
||||
block_needed,
|
||||
} => Ok(PeerSyncResponse::PeersToSyncFrom(self.peers_to_sync_from(
|
||||
current_cumulative_difficulty,
|
||||
block_needed,
|
||||
))),
|
||||
PeerSyncRequest::IncomingCoreSyncData(peer_id, handle, sync_data) => self
|
||||
.update_peer_sync_info(peer_id, handle, &sync_data)
|
||||
.map(|()| PeerSyncResponse::Ok),
|
||||
};
|
||||
|
||||
ready(res)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use tower::{Service, ServiceExt};
|
||||
|
||||
use cuprate_p2p_core::{
|
||||
client::InternalPeerID, handles::HandleBuilder, services::PeerSyncRequest,
|
||||
};
|
||||
use cuprate_wire::CoreSyncData;
|
||||
|
||||
use cuprate_p2p_core::services::PeerSyncResponse;
|
||||
use cuprate_test_utils::test_netzone::TestNetZone;
|
||||
|
||||
use super::PeerSyncSvc;
|
||||
|
||||
#[tokio::test]
|
||||
async fn top_sync_channel_updates() {
|
||||
let (_g, handle) = HandleBuilder::new().build();
|
||||
|
||||
let (mut svc, mut watch) = PeerSyncSvc::<TestNetZone<true, true, true>>::new();
|
||||
|
||||
assert!(!watch.has_changed().unwrap());
|
||||
|
||||
svc.ready()
|
||||
.await
|
||||
.unwrap()
|
||||
.call(PeerSyncRequest::IncomingCoreSyncData(
|
||||
InternalPeerID::Unknown(0),
|
||||
handle.clone(),
|
||||
CoreSyncData {
|
||||
cumulative_difficulty: 1_000,
|
||||
cumulative_difficulty_top64: 0,
|
||||
current_height: 0,
|
||||
pruning_seed: 0,
|
||||
top_id: [0; 32],
|
||||
top_version: 0,
|
||||
},
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(watch.has_changed().unwrap());
|
||||
|
||||
assert_eq!(watch.borrow().top_hash, [0; 32]);
|
||||
assert_eq!(watch.borrow().cumulative_difficulty, 1000);
|
||||
assert_eq!(watch.borrow_and_update().chain_height, 0);
|
||||
|
||||
svc.ready()
|
||||
.await
|
||||
.unwrap()
|
||||
.call(PeerSyncRequest::IncomingCoreSyncData(
|
||||
InternalPeerID::Unknown(1),
|
||||
handle.clone(),
|
||||
CoreSyncData {
|
||||
cumulative_difficulty: 1_000,
|
||||
cumulative_difficulty_top64: 0,
|
||||
current_height: 0,
|
||||
pruning_seed: 0,
|
||||
top_id: [0; 32],
|
||||
top_version: 0,
|
||||
},
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(!watch.has_changed().unwrap());
|
||||
|
||||
svc.ready()
|
||||
.await
|
||||
.unwrap()
|
||||
.call(PeerSyncRequest::IncomingCoreSyncData(
|
||||
InternalPeerID::Unknown(2),
|
||||
handle.clone(),
|
||||
CoreSyncData {
|
||||
cumulative_difficulty: 1_001,
|
||||
cumulative_difficulty_top64: 0,
|
||||
current_height: 0,
|
||||
pruning_seed: 0,
|
||||
top_id: [1; 32],
|
||||
top_version: 0,
|
||||
},
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(watch.has_changed().unwrap());
|
||||
|
||||
assert_eq!(watch.borrow().top_hash, [1; 32]);
|
||||
assert_eq!(watch.borrow().cumulative_difficulty, 1001);
|
||||
assert_eq!(watch.borrow_and_update().chain_height, 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn peer_sync_info_updates() {
|
||||
let (_g, handle) = HandleBuilder::new().build();
|
||||
|
||||
let (mut svc, _watch) = PeerSyncSvc::<TestNetZone<true, true, true>>::new();
|
||||
|
||||
svc.ready()
|
||||
.await
|
||||
.unwrap()
|
||||
.call(PeerSyncRequest::IncomingCoreSyncData(
|
||||
InternalPeerID::Unknown(0),
|
||||
handle.clone(),
|
||||
CoreSyncData {
|
||||
cumulative_difficulty: 1_000,
|
||||
cumulative_difficulty_top64: 0,
|
||||
current_height: 0,
|
||||
pruning_seed: 0,
|
||||
top_id: [0; 32],
|
||||
top_version: 0,
|
||||
},
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(svc.peers.len(), 1);
|
||||
assert_eq!(svc.cumulative_difficulties.len(), 1);
|
||||
|
||||
svc.ready()
|
||||
.await
|
||||
.unwrap()
|
||||
.call(PeerSyncRequest::IncomingCoreSyncData(
|
||||
InternalPeerID::Unknown(0),
|
||||
handle.clone(),
|
||||
CoreSyncData {
|
||||
cumulative_difficulty: 1_001,
|
||||
cumulative_difficulty_top64: 0,
|
||||
current_height: 0,
|
||||
pruning_seed: 0,
|
||||
top_id: [0; 32],
|
||||
top_version: 0,
|
||||
},
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(svc.peers.len(), 1);
|
||||
assert_eq!(svc.cumulative_difficulties.len(), 1);
|
||||
|
||||
svc.ready()
|
||||
.await
|
||||
.unwrap()
|
||||
.call(PeerSyncRequest::IncomingCoreSyncData(
|
||||
InternalPeerID::Unknown(1),
|
||||
handle.clone(),
|
||||
CoreSyncData {
|
||||
cumulative_difficulty: 10,
|
||||
cumulative_difficulty_top64: 0,
|
||||
current_height: 0,
|
||||
pruning_seed: 0,
|
||||
top_id: [0; 32],
|
||||
top_version: 0,
|
||||
},
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(svc.peers.len(), 2);
|
||||
assert_eq!(svc.cumulative_difficulties.len(), 2);
|
||||
|
||||
let PeerSyncResponse::PeersToSyncFrom(peers) = svc
|
||||
.ready()
|
||||
.await
|
||||
.unwrap()
|
||||
.call(PeerSyncRequest::PeersToSyncFrom {
|
||||
block_needed: None,
|
||||
current_cumulative_difficulty: 0,
|
||||
})
|
||||
.await
|
||||
.unwrap()
|
||||
else {
|
||||
panic!("Wrong response for request.")
|
||||
};
|
||||
|
||||
assert!(
|
||||
peers.contains(&InternalPeerID::Unknown(0))
|
||||
&& peers.contains(&InternalPeerID::Unknown(1))
|
||||
);
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue