mirror of
https://github.com/hinto-janai/cuprate.git
synced 2024-11-16 15:58:14 +00:00
Merge branch 'main' into constants
This commit is contained in:
commit
01f34e5107
35 changed files with 284 additions and 233 deletions
2
Cargo.lock
generated
2
Cargo.lock
generated
|
@ -793,6 +793,7 @@ version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"borsh",
|
"borsh",
|
||||||
|
"cfg-if",
|
||||||
"cuprate-helper",
|
"cuprate-helper",
|
||||||
"cuprate-pruning",
|
"cuprate-pruning",
|
||||||
"cuprate-test-utils",
|
"cuprate-test-utils",
|
||||||
|
@ -807,7 +808,6 @@ dependencies = [
|
||||||
"tokio-util",
|
"tokio-util",
|
||||||
"tower",
|
"tower",
|
||||||
"tracing",
|
"tracing",
|
||||||
"tracing-subscriber",
|
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
|
|
@ -14,13 +14,14 @@ cuprate-helper = { path = "../../helper", features = ["asynch"], default-feature
|
||||||
cuprate-wire = { path = "../../net/wire", features = ["tracing"] }
|
cuprate-wire = { path = "../../net/wire", features = ["tracing"] }
|
||||||
cuprate-pruning = { path = "../../pruning" }
|
cuprate-pruning = { path = "../../pruning" }
|
||||||
|
|
||||||
tokio = { workspace = true, features = ["net", "sync", "macros", "time"]}
|
tokio = { workspace = true, features = ["net", "sync", "macros", "time", "rt", "rt-multi-thread"]}
|
||||||
tokio-util = { workspace = true, features = ["codec"] }
|
tokio-util = { workspace = true, features = ["codec"] }
|
||||||
tokio-stream = { workspace = true, features = ["sync"]}
|
tokio-stream = { workspace = true, features = ["sync"]}
|
||||||
futures = { workspace = true, features = ["std"] }
|
futures = { workspace = true, features = ["std"] }
|
||||||
async-trait = { workspace = true }
|
async-trait = { workspace = true }
|
||||||
tower = { workspace = true, features = ["util", "tracing"] }
|
tower = { workspace = true, features = ["util", "tracing"] }
|
||||||
|
|
||||||
|
cfg-if = { workspace = true }
|
||||||
thiserror = { workspace = true }
|
thiserror = { workspace = true }
|
||||||
tracing = { workspace = true, features = ["std", "attributes"] }
|
tracing = { workspace = true, features = ["std", "attributes"] }
|
||||||
hex-literal = { workspace = true }
|
hex-literal = { workspace = true }
|
||||||
|
@ -28,9 +29,10 @@ hex-literal = { workspace = true }
|
||||||
borsh = { workspace = true, features = ["derive", "std"], optional = true }
|
borsh = { workspace = true, features = ["derive", "std"], optional = true }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
cuprate-test-utils = {path = "../../test-utils"}
|
cuprate-test-utils = { path = "../../test-utils" }
|
||||||
|
|
||||||
hex = { workspace = true, features = ["std"] }
|
hex = { workspace = true, features = ["std"] }
|
||||||
tokio = { workspace = true, features = ["net", "rt-multi-thread", "rt", "macros"]}
|
|
||||||
tokio-test = { workspace = true }
|
tokio-test = { workspace = true }
|
||||||
tracing-subscriber = { workspace = true }
|
|
||||||
|
[lints]
|
||||||
|
workspace = true
|
|
@ -43,8 +43,8 @@ pub enum InternalPeerID<A> {
|
||||||
impl<A: Display> Display for InternalPeerID<A> {
|
impl<A: Display> Display for InternalPeerID<A> {
|
||||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||||
match self {
|
match self {
|
||||||
InternalPeerID::KnownAddr(addr) => addr.fmt(f),
|
Self::KnownAddr(addr) => addr.fmt(f),
|
||||||
InternalPeerID::Unknown(id) => f.write_str(&format!("Unknown, ID: {id}")),
|
Self::Unknown(id) => f.write_str(&format!("Unknown, ID: {id}")),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -113,7 +113,7 @@ impl<Z: NetworkZone> Client<Z> {
|
||||||
fn set_err(&self, err: PeerError) -> tower::BoxError {
|
fn set_err(&self, err: PeerError) -> tower::BoxError {
|
||||||
let err_str = err.to_string();
|
let err_str = err.to_string();
|
||||||
match self.error.try_insert_err(err) {
|
match self.error.try_insert_err(err) {
|
||||||
Ok(_) => err_str,
|
Ok(()) => err_str,
|
||||||
Err(e) => e.to_string(),
|
Err(e) => e.to_string(),
|
||||||
}
|
}
|
||||||
.into()
|
.into()
|
||||||
|
@ -169,9 +169,8 @@ impl<Z: NetworkZone> Service<PeerRequest> for Client<Z> {
|
||||||
TrySendError::Closed(req) | TrySendError::Full(req) => {
|
TrySendError::Closed(req) | TrySendError::Full(req) => {
|
||||||
self.set_err(PeerError::ClientChannelClosed);
|
self.set_err(PeerError::ClientChannelClosed);
|
||||||
|
|
||||||
let _ = req
|
let resp = Err(PeerError::ClientChannelClosed.into());
|
||||||
.response_channel
|
drop(req.response_channel.send(resp));
|
||||||
.send(Err(PeerError::ClientChannelClosed.into()));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -216,7 +215,7 @@ where
|
||||||
|
|
||||||
tracing::debug!("Sending back response");
|
tracing::debug!("Sending back response");
|
||||||
|
|
||||||
let _ = req.response_channel.send(Ok(res));
|
drop(req.response_channel.send(Ok(res)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
.instrument(task_span),
|
.instrument(task_span),
|
||||||
|
|
|
@ -26,7 +26,7 @@ use crate::{
|
||||||
};
|
};
|
||||||
|
|
||||||
/// A request to the connection task from a [`Client`](crate::client::Client).
|
/// A request to the connection task from a [`Client`](crate::client::Client).
|
||||||
pub struct ConnectionTaskRequest {
|
pub(crate) struct ConnectionTaskRequest {
|
||||||
/// The request.
|
/// The request.
|
||||||
pub request: PeerRequest,
|
pub request: PeerRequest,
|
||||||
/// The response channel.
|
/// The response channel.
|
||||||
|
@ -36,7 +36,7 @@ pub struct ConnectionTaskRequest {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The connection state.
|
/// The connection state.
|
||||||
pub enum State {
|
pub(crate) enum State {
|
||||||
/// Waiting for a request from Cuprate or the connected peer.
|
/// Waiting for a request from Cuprate or the connected peer.
|
||||||
WaitingForRequest,
|
WaitingForRequest,
|
||||||
/// Waiting for a response from the peer.
|
/// Waiting for a response from the peer.
|
||||||
|
@ -53,7 +53,7 @@ pub enum State {
|
||||||
/// Returns if the [`LevinCommand`] is the correct response message for our request.
|
/// Returns if the [`LevinCommand`] is the correct response message for our request.
|
||||||
///
|
///
|
||||||
/// e.g. that we didn't get a block for a txs request.
|
/// e.g. that we didn't get a block for a txs request.
|
||||||
fn levin_command_response(message_id: &MessageID, command: LevinCommand) -> bool {
|
const fn levin_command_response(message_id: MessageID, command: LevinCommand) -> bool {
|
||||||
matches!(
|
matches!(
|
||||||
(message_id, command),
|
(message_id, command),
|
||||||
(MessageID::Handshake, LevinCommand::Handshake)
|
(MessageID::Handshake, LevinCommand::Handshake)
|
||||||
|
@ -71,7 +71,7 @@ fn levin_command_response(message_id: &MessageID, command: LevinCommand) -> bool
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This represents a connection to a peer.
|
/// This represents a connection to a peer.
|
||||||
pub struct Connection<Z: NetworkZone, A, CS, PS, PR, BrdcstStrm> {
|
pub(crate) struct Connection<Z: NetworkZone, A, CS, PS, PR, BrdcstStrm> {
|
||||||
/// The peer sink - where we send messages to the peer.
|
/// The peer sink - where we send messages to the peer.
|
||||||
peer_sink: Z::Sink,
|
peer_sink: Z::Sink,
|
||||||
|
|
||||||
|
@ -104,15 +104,15 @@ where
|
||||||
BrdcstStrm: Stream<Item = BroadcastMessage> + Send + 'static,
|
BrdcstStrm: Stream<Item = BroadcastMessage> + Send + 'static,
|
||||||
{
|
{
|
||||||
/// Create a new connection struct.
|
/// Create a new connection struct.
|
||||||
pub fn new(
|
pub(crate) fn new(
|
||||||
peer_sink: Z::Sink,
|
peer_sink: Z::Sink,
|
||||||
client_rx: mpsc::Receiver<ConnectionTaskRequest>,
|
client_rx: mpsc::Receiver<ConnectionTaskRequest>,
|
||||||
broadcast_stream: BrdcstStrm,
|
broadcast_stream: BrdcstStrm,
|
||||||
peer_request_handler: PeerRequestHandler<Z, A, CS, PS, PR>,
|
peer_request_handler: PeerRequestHandler<Z, A, CS, PS, PR>,
|
||||||
connection_guard: ConnectionGuard,
|
connection_guard: ConnectionGuard,
|
||||||
error: SharedError<PeerError>,
|
error: SharedError<PeerError>,
|
||||||
) -> Connection<Z, A, CS, PS, PR, BrdcstStrm> {
|
) -> Self {
|
||||||
Connection {
|
Self {
|
||||||
peer_sink,
|
peer_sink,
|
||||||
state: State::WaitingForRequest,
|
state: State::WaitingForRequest,
|
||||||
request_timeout: None,
|
request_timeout: None,
|
||||||
|
@ -174,15 +174,14 @@ where
|
||||||
if let Err(e) = res {
|
if let Err(e) = res {
|
||||||
// can't clone the error so turn it to a string first, hacky but oh well.
|
// can't clone the error so turn it to a string first, hacky but oh well.
|
||||||
let err_str = e.to_string();
|
let err_str = e.to_string();
|
||||||
let _ = req.response_channel.send(Err(err_str.clone().into()));
|
drop(req.response_channel.send(Err(err_str.into())));
|
||||||
return Err(e);
|
return Err(e);
|
||||||
} else {
|
|
||||||
// We still need to respond even if the response is this.
|
|
||||||
let _ = req
|
|
||||||
.response_channel
|
|
||||||
.send(Ok(PeerResponse::Protocol(ProtocolResponse::NA)));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// We still need to respond even if the response is this.
|
||||||
|
let resp = Ok(PeerResponse::Protocol(ProtocolResponse::NA));
|
||||||
|
drop(req.response_channel.send(resp));
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -215,7 +214,7 @@ where
|
||||||
};
|
};
|
||||||
|
|
||||||
// Check if the message is a response to our request.
|
// Check if the message is a response to our request.
|
||||||
if levin_command_response(request_id, mes.command()) {
|
if levin_command_response(*request_id, mes.command()) {
|
||||||
// TODO: Do more checks before returning response.
|
// TODO: Do more checks before returning response.
|
||||||
|
|
||||||
let State::WaitingForResponse { tx, .. } =
|
let State::WaitingForResponse { tx, .. } =
|
||||||
|
@ -224,9 +223,11 @@ where
|
||||||
panic!("Not in correct state, can't receive response!")
|
panic!("Not in correct state, can't receive response!")
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = tx.send(Ok(mes
|
let resp = Ok(mes
|
||||||
.try_into()
|
.try_into()
|
||||||
.map_err(|_| PeerError::PeerSentInvalidMessage)?));
|
.map_err(|_| PeerError::PeerSentInvalidMessage)?);
|
||||||
|
|
||||||
|
drop(tx.send(resp));
|
||||||
|
|
||||||
self.request_timeout = None;
|
self.request_timeout = None;
|
||||||
|
|
||||||
|
@ -282,7 +283,7 @@ where
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
biased;
|
biased;
|
||||||
_ = self.request_timeout.as_mut().expect("Request timeout was not set!") => {
|
() = self.request_timeout.as_mut().expect("Request timeout was not set!") => {
|
||||||
Err(PeerError::ClientChannelClosed)
|
Err(PeerError::ClientChannelClosed)
|
||||||
}
|
}
|
||||||
broadcast_req = self.broadcast_stream.next() => {
|
broadcast_req = self.broadcast_stream.next() => {
|
||||||
|
@ -306,8 +307,11 @@ where
|
||||||
/// Runs the Connection handler logic, this should be put in a separate task.
|
/// Runs the Connection handler logic, this should be put in a separate task.
|
||||||
///
|
///
|
||||||
/// `eager_protocol_messages` are protocol messages that we received during a handshake.
|
/// `eager_protocol_messages` are protocol messages that we received during a handshake.
|
||||||
pub async fn run<Str>(mut self, mut stream: Str, eager_protocol_messages: Vec<ProtocolMessage>)
|
pub(crate) async fn run<Str>(
|
||||||
where
|
mut self,
|
||||||
|
mut stream: Str,
|
||||||
|
eager_protocol_messages: Vec<ProtocolMessage>,
|
||||||
|
) where
|
||||||
Str: FusedStream<Item = Result<Message, cuprate_wire::BucketError>> + Unpin,
|
Str: FusedStream<Item = Result<Message, cuprate_wire::BucketError>> + Unpin,
|
||||||
{
|
{
|
||||||
tracing::debug!(
|
tracing::debug!(
|
||||||
|
@ -348,6 +352,7 @@ where
|
||||||
|
|
||||||
/// Shutdowns the connection, flushing pending requests and setting the error slot, if it hasn't been
|
/// Shutdowns the connection, flushing pending requests and setting the error slot, if it hasn't been
|
||||||
/// set already.
|
/// set already.
|
||||||
|
#[expect(clippy::significant_drop_tightening)]
|
||||||
fn shutdown(mut self, err: PeerError) {
|
fn shutdown(mut self, err: PeerError) {
|
||||||
tracing::debug!("Connection task shutting down: {}", err);
|
tracing::debug!("Connection task shutting down: {}", err);
|
||||||
|
|
||||||
|
@ -362,11 +367,11 @@ where
|
||||||
if let State::WaitingForResponse { tx, .. } =
|
if let State::WaitingForResponse { tx, .. } =
|
||||||
std::mem::replace(&mut self.state, State::WaitingForRequest)
|
std::mem::replace(&mut self.state, State::WaitingForRequest)
|
||||||
{
|
{
|
||||||
let _ = tx.send(Err(err_str.clone().into()));
|
drop(tx.send(Err(err_str.clone().into())));
|
||||||
}
|
}
|
||||||
|
|
||||||
while let Ok(req) = client_rx.try_recv() {
|
while let Ok(req) = client_rx.try_recv() {
|
||||||
let _ = req.response_channel.send(Err(err_str.clone().into()));
|
drop(req.response_channel.send(Err(err_str.clone().into())));
|
||||||
}
|
}
|
||||||
|
|
||||||
self.connection_guard.connection_closed();
|
self.connection_guard.connection_closed();
|
||||||
|
|
|
@ -40,7 +40,9 @@ impl<Z: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
|
||||||
Connector<Z, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
|
Connector<Z, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
|
||||||
{
|
{
|
||||||
/// Create a new connector from a handshaker.
|
/// Create a new connector from a handshaker.
|
||||||
pub fn new(handshaker: HandShaker<Z, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>) -> Self {
|
pub const fn new(
|
||||||
|
handshaker: HandShaker<Z, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>,
|
||||||
|
) -> Self {
|
||||||
Self { handshaker }
|
Self { handshaker }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -113,7 +113,7 @@ impl<Z: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
|
||||||
HandShaker<Z, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
|
HandShaker<Z, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
|
||||||
{
|
{
|
||||||
/// Creates a new handshaker.
|
/// Creates a new handshaker.
|
||||||
fn new(
|
const fn new(
|
||||||
address_book: AdrBook,
|
address_book: AdrBook,
|
||||||
peer_sync_svc: PSync,
|
peer_sync_svc: PSync,
|
||||||
core_sync_svc: CSync,
|
core_sync_svc: CSync,
|
||||||
|
@ -226,11 +226,12 @@ pub async fn ping<N: NetworkZone>(addr: N::Addr) -> Result<u64, HandshakeError>
|
||||||
Err(BucketError::IO(std::io::Error::new(
|
Err(BucketError::IO(std::io::Error::new(
|
||||||
std::io::ErrorKind::ConnectionAborted,
|
std::io::ErrorKind::ConnectionAborted,
|
||||||
"The peer stream returned None",
|
"The peer stream returned None",
|
||||||
)))?
|
))
|
||||||
|
.into())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This function completes a handshake with the requested peer.
|
/// This function completes a handshake with the requested peer.
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[expect(clippy::too_many_arguments)]
|
||||||
async fn handshake<Z: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr, BrdcstStrm>(
|
async fn handshake<Z: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr, BrdcstStrm>(
|
||||||
req: DoHandshakeRequest<Z>,
|
req: DoHandshakeRequest<Z>,
|
||||||
|
|
||||||
|
@ -403,7 +404,10 @@ where
|
||||||
break 'check_out_addr None;
|
break 'check_out_addr None;
|
||||||
};
|
};
|
||||||
|
|
||||||
// u32 does not make sense as a port so just truncate it.
|
#[expect(
|
||||||
|
clippy::cast_possible_truncation,
|
||||||
|
reason = "u32 does not make sense as a port so just truncate it."
|
||||||
|
)]
|
||||||
outbound_address.set_port(peer_node_data.my_port as u16);
|
outbound_address.set_port(peer_node_data.my_port as u16);
|
||||||
|
|
||||||
let Ok(Ok(ping_peer_id)) = timeout(
|
let Ok(Ok(ping_peer_id)) = timeout(
|
||||||
|
@ -508,7 +512,7 @@ where
|
||||||
info.id,
|
info.id,
|
||||||
info.handle.clone(),
|
info.handle.clone(),
|
||||||
connection_tx.clone(),
|
connection_tx.clone(),
|
||||||
semaphore.clone(),
|
Arc::clone(&semaphore),
|
||||||
address_book,
|
address_book,
|
||||||
core_sync_svc,
|
core_sync_svc,
|
||||||
peer_sync_svc,
|
peer_sync_svc,
|
||||||
|
@ -671,7 +675,7 @@ async fn wait_for_message<Z: NetworkZone>(
|
||||||
_ => {
|
_ => {
|
||||||
return Err(HandshakeError::PeerSentInvalidMessage(
|
return Err(HandshakeError::PeerSentInvalidMessage(
|
||||||
"Peer sent an admin request before responding to the handshake",
|
"Peer sent an admin request before responding to the handshake",
|
||||||
))
|
));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -686,16 +690,17 @@ async fn wait_for_message<Z: NetworkZone>(
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
_ => Err(HandshakeError::PeerSentInvalidMessage(
|
Message::Response(_) => Err(HandshakeError::PeerSentInvalidMessage(
|
||||||
"Peer sent an incorrect message",
|
"Peer sent an incorrect message",
|
||||||
)),
|
)),
|
||||||
}?
|
}?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Err(BucketError::IO(std::io::Error::new(
|
Err(BucketError::IO(std::io::Error::new(
|
||||||
std::io::ErrorKind::ConnectionAborted,
|
std::io::ErrorKind::ConnectionAborted,
|
||||||
"The peer stream returned None",
|
"The peer stream returned None",
|
||||||
)))?
|
))
|
||||||
|
.into())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sends a [`AdminResponseMessage::SupportFlags`] down the peer sink.
|
/// Sends a [`AdminResponseMessage::SupportFlags`] down the peer sink.
|
||||||
|
|
|
@ -87,14 +87,13 @@ impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
|
||||||
where
|
where
|
||||||
NAdrBook: AddressBook<N> + Clone,
|
NAdrBook: AddressBook<N> + Clone,
|
||||||
{
|
{
|
||||||
let HandshakerBuilder {
|
let Self {
|
||||||
core_sync_svc,
|
core_sync_svc,
|
||||||
peer_sync_svc,
|
peer_sync_svc,
|
||||||
protocol_request_svc,
|
protocol_request_svc,
|
||||||
our_basic_node_data,
|
our_basic_node_data,
|
||||||
broadcast_stream_maker,
|
broadcast_stream_maker,
|
||||||
connection_parent_span,
|
connection_parent_span,
|
||||||
_zone,
|
|
||||||
..
|
..
|
||||||
} = self;
|
} = self;
|
||||||
|
|
||||||
|
@ -106,7 +105,7 @@ impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
|
||||||
our_basic_node_data,
|
our_basic_node_data,
|
||||||
broadcast_stream_maker,
|
broadcast_stream_maker,
|
||||||
connection_parent_span,
|
connection_parent_span,
|
||||||
_zone,
|
_zone: PhantomData,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -130,14 +129,13 @@ impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
|
||||||
where
|
where
|
||||||
NCSync: CoreSyncSvc + Clone,
|
NCSync: CoreSyncSvc + Clone,
|
||||||
{
|
{
|
||||||
let HandshakerBuilder {
|
let Self {
|
||||||
address_book,
|
address_book,
|
||||||
peer_sync_svc,
|
peer_sync_svc,
|
||||||
protocol_request_svc,
|
protocol_request_svc,
|
||||||
our_basic_node_data,
|
our_basic_node_data,
|
||||||
broadcast_stream_maker,
|
broadcast_stream_maker,
|
||||||
connection_parent_span,
|
connection_parent_span,
|
||||||
_zone,
|
|
||||||
..
|
..
|
||||||
} = self;
|
} = self;
|
||||||
|
|
||||||
|
@ -149,7 +147,7 @@ impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
|
||||||
our_basic_node_data,
|
our_basic_node_data,
|
||||||
broadcast_stream_maker,
|
broadcast_stream_maker,
|
||||||
connection_parent_span,
|
connection_parent_span,
|
||||||
_zone,
|
_zone: PhantomData,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -167,14 +165,13 @@ impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
|
||||||
where
|
where
|
||||||
NPSync: PeerSyncSvc<N> + Clone,
|
NPSync: PeerSyncSvc<N> + Clone,
|
||||||
{
|
{
|
||||||
let HandshakerBuilder {
|
let Self {
|
||||||
address_book,
|
address_book,
|
||||||
core_sync_svc,
|
core_sync_svc,
|
||||||
protocol_request_svc,
|
protocol_request_svc,
|
||||||
our_basic_node_data,
|
our_basic_node_data,
|
||||||
broadcast_stream_maker,
|
broadcast_stream_maker,
|
||||||
connection_parent_span,
|
connection_parent_span,
|
||||||
_zone,
|
|
||||||
..
|
..
|
||||||
} = self;
|
} = self;
|
||||||
|
|
||||||
|
@ -186,7 +183,7 @@ impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
|
||||||
our_basic_node_data,
|
our_basic_node_data,
|
||||||
broadcast_stream_maker,
|
broadcast_stream_maker,
|
||||||
connection_parent_span,
|
connection_parent_span,
|
||||||
_zone,
|
_zone: PhantomData,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -204,14 +201,13 @@ impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
|
||||||
where
|
where
|
||||||
NProtoHdlr: ProtocolRequestHandler + Clone,
|
NProtoHdlr: ProtocolRequestHandler + Clone,
|
||||||
{
|
{
|
||||||
let HandshakerBuilder {
|
let Self {
|
||||||
address_book,
|
address_book,
|
||||||
core_sync_svc,
|
core_sync_svc,
|
||||||
peer_sync_svc,
|
peer_sync_svc,
|
||||||
our_basic_node_data,
|
our_basic_node_data,
|
||||||
broadcast_stream_maker,
|
broadcast_stream_maker,
|
||||||
connection_parent_span,
|
connection_parent_span,
|
||||||
_zone,
|
|
||||||
..
|
..
|
||||||
} = self;
|
} = self;
|
||||||
|
|
||||||
|
@ -223,7 +219,7 @@ impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
|
||||||
our_basic_node_data,
|
our_basic_node_data,
|
||||||
broadcast_stream_maker,
|
broadcast_stream_maker,
|
||||||
connection_parent_span,
|
connection_parent_span,
|
||||||
_zone,
|
_zone: PhantomData,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -242,14 +238,13 @@ impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
|
||||||
BrdcstStrm: Stream<Item = BroadcastMessage> + Send + 'static,
|
BrdcstStrm: Stream<Item = BroadcastMessage> + Send + 'static,
|
||||||
NBrdcstStrmMkr: Fn(InternalPeerID<N::Addr>) -> BrdcstStrm + Clone + Send + 'static,
|
NBrdcstStrmMkr: Fn(InternalPeerID<N::Addr>) -> BrdcstStrm + Clone + Send + 'static,
|
||||||
{
|
{
|
||||||
let HandshakerBuilder {
|
let Self {
|
||||||
address_book,
|
address_book,
|
||||||
core_sync_svc,
|
core_sync_svc,
|
||||||
peer_sync_svc,
|
peer_sync_svc,
|
||||||
protocol_request_svc,
|
protocol_request_svc,
|
||||||
our_basic_node_data,
|
our_basic_node_data,
|
||||||
connection_parent_span,
|
connection_parent_span,
|
||||||
_zone,
|
|
||||||
..
|
..
|
||||||
} = self;
|
} = self;
|
||||||
|
|
||||||
|
@ -261,7 +256,7 @@ impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
|
||||||
our_basic_node_data,
|
our_basic_node_data,
|
||||||
broadcast_stream_maker: new_broadcast_stream_maker,
|
broadcast_stream_maker: new_broadcast_stream_maker,
|
||||||
connection_parent_span,
|
connection_parent_span,
|
||||||
_zone,
|
_zone: PhantomData,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -270,6 +265,7 @@ impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
|
||||||
/// ## Default Connection Parent Span
|
/// ## Default Connection Parent Span
|
||||||
///
|
///
|
||||||
/// The default connection span will be [`Span::none`].
|
/// The default connection span will be [`Span::none`].
|
||||||
|
#[must_use]
|
||||||
pub fn with_connection_parent_span(self, connection_parent_span: Span) -> Self {
|
pub fn with_connection_parent_span(self, connection_parent_span: Span) -> Self {
|
||||||
Self {
|
Self {
|
||||||
connection_parent_span: Some(connection_parent_span),
|
connection_parent_span: Some(connection_parent_span),
|
||||||
|
|
|
@ -42,8 +42,8 @@ pub struct DummyCoreSyncSvc(CoreSyncData);
|
||||||
|
|
||||||
impl DummyCoreSyncSvc {
|
impl DummyCoreSyncSvc {
|
||||||
/// Returns a [`DummyCoreSyncSvc`] that will just return the mainnet genesis [`CoreSyncData`].
|
/// Returns a [`DummyCoreSyncSvc`] that will just return the mainnet genesis [`CoreSyncData`].
|
||||||
pub fn static_mainnet_genesis() -> DummyCoreSyncSvc {
|
pub const fn static_mainnet_genesis() -> Self {
|
||||||
DummyCoreSyncSvc(CoreSyncData {
|
Self(CoreSyncData {
|
||||||
cumulative_difficulty: 1,
|
cumulative_difficulty: 1,
|
||||||
cumulative_difficulty_top64: 0,
|
cumulative_difficulty_top64: 0,
|
||||||
current_height: 1,
|
current_height: 1,
|
||||||
|
@ -56,8 +56,8 @@ impl DummyCoreSyncSvc {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns a [`DummyCoreSyncSvc`] that will just return the testnet genesis [`CoreSyncData`].
|
/// Returns a [`DummyCoreSyncSvc`] that will just return the testnet genesis [`CoreSyncData`].
|
||||||
pub fn static_testnet_genesis() -> DummyCoreSyncSvc {
|
pub const fn static_testnet_genesis() -> Self {
|
||||||
DummyCoreSyncSvc(CoreSyncData {
|
Self(CoreSyncData {
|
||||||
cumulative_difficulty: 1,
|
cumulative_difficulty: 1,
|
||||||
cumulative_difficulty_top64: 0,
|
cumulative_difficulty_top64: 0,
|
||||||
current_height: 1,
|
current_height: 1,
|
||||||
|
@ -70,8 +70,8 @@ impl DummyCoreSyncSvc {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns a [`DummyCoreSyncSvc`] that will just return the stagenet genesis [`CoreSyncData`].
|
/// Returns a [`DummyCoreSyncSvc`] that will just return the stagenet genesis [`CoreSyncData`].
|
||||||
pub fn static_stagenet_genesis() -> DummyCoreSyncSvc {
|
pub const fn static_stagenet_genesis() -> Self {
|
||||||
DummyCoreSyncSvc(CoreSyncData {
|
Self(CoreSyncData {
|
||||||
cumulative_difficulty: 1,
|
cumulative_difficulty: 1,
|
||||||
cumulative_difficulty_top64: 0,
|
cumulative_difficulty_top64: 0,
|
||||||
current_height: 1,
|
current_height: 1,
|
||||||
|
@ -84,8 +84,8 @@ impl DummyCoreSyncSvc {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns a [`DummyCoreSyncSvc`] that will return the provided [`CoreSyncData`].
|
/// Returns a [`DummyCoreSyncSvc`] that will return the provided [`CoreSyncData`].
|
||||||
pub fn static_custom(data: CoreSyncData) -> DummyCoreSyncSvc {
|
pub const fn static_custom(data: CoreSyncData) -> Self {
|
||||||
DummyCoreSyncSvc(data)
|
Self(data)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -46,7 +46,7 @@ pub(crate) struct PeerRequestHandler<Z: NetworkZone, A, CS, PS, PR> {
|
||||||
pub peer_info: PeerInformation<Z::Addr>,
|
pub peer_info: PeerInformation<Z::Addr>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Z: NetworkZone, A, CS, PS, PR> PeerRequestHandler<Z, A, CS, PS, PR>
|
impl<Z, A, CS, PS, PR> PeerRequestHandler<Z, A, CS, PS, PR>
|
||||||
where
|
where
|
||||||
Z: NetworkZone,
|
Z: NetworkZone,
|
||||||
A: AddressBook<Z>,
|
A: AddressBook<Z>,
|
||||||
|
@ -55,7 +55,7 @@ where
|
||||||
PR: ProtocolRequestHandler,
|
PR: ProtocolRequestHandler,
|
||||||
{
|
{
|
||||||
/// Handles an incoming [`PeerRequest`] to our node.
|
/// Handles an incoming [`PeerRequest`] to our node.
|
||||||
pub async fn handle_peer_request(
|
pub(crate) async fn handle_peer_request(
|
||||||
&mut self,
|
&mut self,
|
||||||
req: PeerRequest,
|
req: PeerRequest,
|
||||||
) -> Result<PeerResponse, tower::BoxError> {
|
) -> Result<PeerResponse, tower::BoxError> {
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
//! Timeout Monitor
|
//! Timeout Monitor
|
||||||
//!
|
//!
|
||||||
//! This module holds the task that sends periodic [TimedSync](PeerRequest::TimedSync) requests to a peer to make
|
//! This module holds the task that sends periodic [`TimedSync`](PeerRequest::TimedSync) requests to a peer to make
|
||||||
//! sure the connection is still active.
|
//! sure the connection is still active.
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
@ -64,7 +64,7 @@ where
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
let Ok(permit) = semaphore.clone().try_acquire_owned() else {
|
let Ok(permit) = Arc::clone(&semaphore).try_acquire_owned() else {
|
||||||
// If we can't get a permit the connection is currently waiting for a response, so no need to
|
// If we can't get a permit the connection is currently waiting for a response, so no need to
|
||||||
// do a timed sync.
|
// do a timed sync.
|
||||||
continue;
|
continue;
|
||||||
|
|
|
@ -4,7 +4,7 @@ pub struct SharedError<T>(Arc<OnceLock<T>>);
|
||||||
|
|
||||||
impl<T> Clone for SharedError<T> {
|
impl<T> Clone for SharedError<T> {
|
||||||
fn clone(&self) -> Self {
|
fn clone(&self) -> Self {
|
||||||
Self(self.0.clone())
|
Self(Arc::clone(&self.0))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,11 +18,12 @@ pub struct HandleBuilder {
|
||||||
|
|
||||||
impl HandleBuilder {
|
impl HandleBuilder {
|
||||||
/// Create a new builder.
|
/// Create a new builder.
|
||||||
pub fn new() -> Self {
|
pub const fn new() -> Self {
|
||||||
Self { permit: None }
|
Self { permit: None }
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sets the permit for this connection.
|
/// Sets the permit for this connection.
|
||||||
|
#[must_use]
|
||||||
pub fn with_permit(mut self, permit: Option<OwnedSemaphorePermit>) -> Self {
|
pub fn with_permit(mut self, permit: Option<OwnedSemaphorePermit>) -> Self {
|
||||||
self.permit = permit;
|
self.permit = permit;
|
||||||
self
|
self
|
||||||
|
@ -40,7 +41,7 @@ impl HandleBuilder {
|
||||||
_permit: self.permit,
|
_permit: self.permit,
|
||||||
},
|
},
|
||||||
ConnectionHandle {
|
ConnectionHandle {
|
||||||
token: token.clone(),
|
token,
|
||||||
ban: Arc::new(OnceLock::new()),
|
ban: Arc::new(OnceLock::new()),
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
@ -66,13 +67,13 @@ impl ConnectionGuard {
|
||||||
///
|
///
|
||||||
/// This will be called on [`Drop::drop`].
|
/// This will be called on [`Drop::drop`].
|
||||||
pub fn connection_closed(&self) {
|
pub fn connection_closed(&self) {
|
||||||
self.token.cancel()
|
self.token.cancel();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for ConnectionGuard {
|
impl Drop for ConnectionGuard {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
self.token.cancel()
|
self.token.cancel();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -90,6 +91,10 @@ impl ConnectionHandle {
|
||||||
}
|
}
|
||||||
/// Bans the peer for the given `duration`.
|
/// Bans the peer for the given `duration`.
|
||||||
pub fn ban_peer(&self, duration: Duration) {
|
pub fn ban_peer(&self, duration: Duration) {
|
||||||
|
#[expect(
|
||||||
|
clippy::let_underscore_must_use,
|
||||||
|
reason = "error means peer is already banned; fine to ignore"
|
||||||
|
)]
|
||||||
let _ = self.ban.set(BanPeer(duration));
|
let _ = self.ban.set(BanPeer(duration));
|
||||||
self.token.cancel();
|
self.token.cancel();
|
||||||
}
|
}
|
||||||
|
@ -103,6 +108,6 @@ impl ConnectionHandle {
|
||||||
}
|
}
|
||||||
/// Sends the signal to the connection task to disconnect.
|
/// Sends the signal to the connection task to disconnect.
|
||||||
pub fn send_close_signal(&self) {
|
pub fn send_close_signal(&self) {
|
||||||
self.token.cancel()
|
self.token.cancel();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,7 +6,7 @@
|
||||||
//!
|
//!
|
||||||
//! # Network Zones
|
//! # Network Zones
|
||||||
//!
|
//!
|
||||||
//! This crate abstracts over network zones, Tor/I2p/clearnet with the [NetworkZone] trait. Currently only clearnet is implemented: [ClearNet].
|
//! This crate abstracts over network zones, Tor/I2p/clearnet with the [`NetworkZone`] trait. Currently only clearnet is implemented: [`ClearNet`].
|
||||||
//!
|
//!
|
||||||
//! # Usage
|
//! # Usage
|
||||||
//!
|
//!
|
||||||
|
@ -56,6 +56,16 @@
|
||||||
//! .unwrap();
|
//! .unwrap();
|
||||||
//! # });
|
//! # });
|
||||||
//! ```
|
//! ```
|
||||||
|
|
||||||
|
cfg_if::cfg_if! {
|
||||||
|
// Used in `tests/`
|
||||||
|
if #[cfg(test)] {
|
||||||
|
use cuprate_test_utils as _;
|
||||||
|
use tokio_test as _;
|
||||||
|
use hex as _;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
use std::{fmt::Debug, future::Future, hash::Hash};
|
use std::{fmt::Debug, future::Future, hash::Hash};
|
||||||
|
|
||||||
use futures::{Sink, Stream};
|
use futures::{Sink, Stream};
|
||||||
|
@ -102,7 +112,7 @@ pub trait NetZoneAddress:
|
||||||
+ Unpin
|
+ Unpin
|
||||||
+ 'static
|
+ 'static
|
||||||
{
|
{
|
||||||
/// Cuprate needs to be able to ban peers by IP addresses and not just by SocketAddr as
|
/// Cuprate needs to be able to ban peers by IP addresses and not just by `SocketAddr` as
|
||||||
/// that include the port, to be able to facilitate this network addresses must have a ban ID
|
/// that include the port, to be able to facilitate this network addresses must have a ban ID
|
||||||
/// which for hidden services could just be the address it self but for clear net addresses will
|
/// which for hidden services could just be the address it self but for clear net addresses will
|
||||||
/// be the IP address.
|
/// be the IP address.
|
||||||
|
|
|
@ -19,7 +19,7 @@ impl NetZoneAddress for SocketAddr {
|
||||||
type BanID = IpAddr;
|
type BanID = IpAddr;
|
||||||
|
|
||||||
fn set_port(&mut self, port: u16) {
|
fn set_port(&mut self, port: u16) {
|
||||||
SocketAddr::set_port(self, port)
|
Self::set_port(self, port);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn ban_id(&self) -> Self::BanID {
|
fn ban_id(&self) -> Self::BanID {
|
||||||
|
|
|
@ -8,7 +8,7 @@
|
||||||
//!
|
//!
|
||||||
//! Here is every P2P request/response.
|
//! Here is every P2P request/response.
|
||||||
//!
|
//!
|
||||||
//! *note admin messages are already request/response so "Handshake" is actually made of a HandshakeRequest & HandshakeResponse
|
//! *note admin messages are already request/response so "Handshake" is actually made of a `HandshakeRequest` & `HandshakeResponse`
|
||||||
//!
|
//!
|
||||||
//! ```md
|
//! ```md
|
||||||
//! Admin:
|
//! Admin:
|
||||||
|
@ -78,15 +78,15 @@ pub enum PeerRequest {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PeerRequest {
|
impl PeerRequest {
|
||||||
pub fn id(&self) -> MessageID {
|
pub const fn id(&self) -> MessageID {
|
||||||
match self {
|
match self {
|
||||||
PeerRequest::Admin(admin_req) => match admin_req {
|
Self::Admin(admin_req) => match admin_req {
|
||||||
AdminRequestMessage::Handshake(_) => MessageID::Handshake,
|
AdminRequestMessage::Handshake(_) => MessageID::Handshake,
|
||||||
AdminRequestMessage::TimedSync(_) => MessageID::TimedSync,
|
AdminRequestMessage::TimedSync(_) => MessageID::TimedSync,
|
||||||
AdminRequestMessage::Ping => MessageID::Ping,
|
AdminRequestMessage::Ping => MessageID::Ping,
|
||||||
AdminRequestMessage::SupportFlags => MessageID::SupportFlags,
|
AdminRequestMessage::SupportFlags => MessageID::SupportFlags,
|
||||||
},
|
},
|
||||||
PeerRequest::Protocol(protocol_request) => match protocol_request {
|
Self::Protocol(protocol_request) => match protocol_request {
|
||||||
ProtocolRequest::GetObjects(_) => MessageID::GetObjects,
|
ProtocolRequest::GetObjects(_) => MessageID::GetObjects,
|
||||||
ProtocolRequest::GetChain(_) => MessageID::GetChain,
|
ProtocolRequest::GetChain(_) => MessageID::GetChain,
|
||||||
ProtocolRequest::FluffyMissingTxs(_) => MessageID::FluffyMissingTxs,
|
ProtocolRequest::FluffyMissingTxs(_) => MessageID::FluffyMissingTxs,
|
||||||
|
@ -98,10 +98,10 @@ impl PeerRequest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn needs_response(&self) -> bool {
|
pub const fn needs_response(&self) -> bool {
|
||||||
!matches!(
|
!matches!(
|
||||||
self,
|
self,
|
||||||
PeerRequest::Protocol(
|
Self::Protocol(
|
||||||
ProtocolRequest::NewBlock(_)
|
ProtocolRequest::NewBlock(_)
|
||||||
| ProtocolRequest::NewFluffyBlock(_)
|
| ProtocolRequest::NewFluffyBlock(_)
|
||||||
| ProtocolRequest::NewTransactions(_)
|
| ProtocolRequest::NewTransactions(_)
|
||||||
|
@ -126,15 +126,15 @@ pub enum PeerResponse {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PeerResponse {
|
impl PeerResponse {
|
||||||
pub fn id(&self) -> Option<MessageID> {
|
pub const fn id(&self) -> Option<MessageID> {
|
||||||
Some(match self {
|
Some(match self {
|
||||||
PeerResponse::Admin(admin_res) => match admin_res {
|
Self::Admin(admin_res) => match admin_res {
|
||||||
AdminResponseMessage::Handshake(_) => MessageID::Handshake,
|
AdminResponseMessage::Handshake(_) => MessageID::Handshake,
|
||||||
AdminResponseMessage::TimedSync(_) => MessageID::TimedSync,
|
AdminResponseMessage::TimedSync(_) => MessageID::TimedSync,
|
||||||
AdminResponseMessage::Ping(_) => MessageID::Ping,
|
AdminResponseMessage::Ping(_) => MessageID::Ping,
|
||||||
AdminResponseMessage::SupportFlags(_) => MessageID::SupportFlags,
|
AdminResponseMessage::SupportFlags(_) => MessageID::SupportFlags,
|
||||||
},
|
},
|
||||||
PeerResponse::Protocol(protocol_res) => match protocol_res {
|
Self::Protocol(protocol_res) => match protocol_res {
|
||||||
ProtocolResponse::GetObjects(_) => MessageID::GetObjects,
|
ProtocolResponse::GetObjects(_) => MessageID::GetObjects,
|
||||||
ProtocolResponse::GetChain(_) => MessageID::GetChain,
|
ProtocolResponse::GetChain(_) => MessageID::GetChain,
|
||||||
ProtocolResponse::NewFluffyBlock(_) => MessageID::NewBlock,
|
ProtocolResponse::NewFluffyBlock(_) => MessageID::NewBlock,
|
||||||
|
|
|
@ -11,15 +11,13 @@ pub struct MessageConversionError;
|
||||||
impl From<ProtocolRequest> for ProtocolMessage {
|
impl From<ProtocolRequest> for ProtocolMessage {
|
||||||
fn from(value: ProtocolRequest) -> Self {
|
fn from(value: ProtocolRequest) -> Self {
|
||||||
match value {
|
match value {
|
||||||
ProtocolRequest::GetObjects(val) => ProtocolMessage::GetObjectsRequest(val),
|
ProtocolRequest::GetObjects(val) => Self::GetObjectsRequest(val),
|
||||||
ProtocolRequest::GetChain(val) => ProtocolMessage::ChainRequest(val),
|
ProtocolRequest::GetChain(val) => Self::ChainRequest(val),
|
||||||
ProtocolRequest::FluffyMissingTxs(val) => {
|
ProtocolRequest::FluffyMissingTxs(val) => Self::FluffyMissingTransactionsRequest(val),
|
||||||
ProtocolMessage::FluffyMissingTransactionsRequest(val)
|
ProtocolRequest::GetTxPoolCompliment(val) => Self::GetTxPoolCompliment(val),
|
||||||
}
|
ProtocolRequest::NewBlock(val) => Self::NewBlock(val),
|
||||||
ProtocolRequest::GetTxPoolCompliment(val) => ProtocolMessage::GetTxPoolCompliment(val),
|
ProtocolRequest::NewFluffyBlock(val) => Self::NewFluffyBlock(val),
|
||||||
ProtocolRequest::NewBlock(val) => ProtocolMessage::NewBlock(val),
|
ProtocolRequest::NewTransactions(val) => Self::NewTransactions(val),
|
||||||
ProtocolRequest::NewFluffyBlock(val) => ProtocolMessage::NewFluffyBlock(val),
|
|
||||||
ProtocolRequest::NewTransactions(val) => ProtocolMessage::NewTransactions(val),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -29,15 +27,13 @@ impl TryFrom<ProtocolMessage> for ProtocolRequest {
|
||||||
|
|
||||||
fn try_from(value: ProtocolMessage) -> Result<Self, Self::Error> {
|
fn try_from(value: ProtocolMessage) -> Result<Self, Self::Error> {
|
||||||
Ok(match value {
|
Ok(match value {
|
||||||
ProtocolMessage::GetObjectsRequest(val) => ProtocolRequest::GetObjects(val),
|
ProtocolMessage::GetObjectsRequest(val) => Self::GetObjects(val),
|
||||||
ProtocolMessage::ChainRequest(val) => ProtocolRequest::GetChain(val),
|
ProtocolMessage::ChainRequest(val) => Self::GetChain(val),
|
||||||
ProtocolMessage::FluffyMissingTransactionsRequest(val) => {
|
ProtocolMessage::FluffyMissingTransactionsRequest(val) => Self::FluffyMissingTxs(val),
|
||||||
ProtocolRequest::FluffyMissingTxs(val)
|
ProtocolMessage::GetTxPoolCompliment(val) => Self::GetTxPoolCompliment(val),
|
||||||
}
|
ProtocolMessage::NewBlock(val) => Self::NewBlock(val),
|
||||||
ProtocolMessage::GetTxPoolCompliment(val) => ProtocolRequest::GetTxPoolCompliment(val),
|
ProtocolMessage::NewFluffyBlock(val) => Self::NewFluffyBlock(val),
|
||||||
ProtocolMessage::NewBlock(val) => ProtocolRequest::NewBlock(val),
|
ProtocolMessage::NewTransactions(val) => Self::NewTransactions(val),
|
||||||
ProtocolMessage::NewFluffyBlock(val) => ProtocolRequest::NewFluffyBlock(val),
|
|
||||||
ProtocolMessage::NewTransactions(val) => ProtocolRequest::NewTransactions(val),
|
|
||||||
ProtocolMessage::GetObjectsResponse(_) | ProtocolMessage::ChainEntryResponse(_) => {
|
ProtocolMessage::GetObjectsResponse(_) | ProtocolMessage::ChainEntryResponse(_) => {
|
||||||
return Err(MessageConversionError)
|
return Err(MessageConversionError)
|
||||||
}
|
}
|
||||||
|
@ -48,8 +44,8 @@ impl TryFrom<ProtocolMessage> for ProtocolRequest {
|
||||||
impl From<PeerRequest> for Message {
|
impl From<PeerRequest> for Message {
|
||||||
fn from(value: PeerRequest) -> Self {
|
fn from(value: PeerRequest) -> Self {
|
||||||
match value {
|
match value {
|
||||||
PeerRequest::Admin(val) => Message::Request(val),
|
PeerRequest::Admin(val) => Self::Request(val),
|
||||||
PeerRequest::Protocol(val) => Message::Protocol(val.into()),
|
PeerRequest::Protocol(val) => Self::Protocol(val.into()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -59,8 +55,8 @@ impl TryFrom<Message> for PeerRequest {
|
||||||
|
|
||||||
fn try_from(value: Message) -> Result<Self, Self::Error> {
|
fn try_from(value: Message) -> Result<Self, Self::Error> {
|
||||||
match value {
|
match value {
|
||||||
Message::Request(req) => Ok(PeerRequest::Admin(req)),
|
Message::Request(req) => Ok(Self::Admin(req)),
|
||||||
Message::Protocol(pro) => Ok(PeerRequest::Protocol(pro.try_into()?)),
|
Message::Protocol(pro) => Ok(Self::Protocol(pro.try_into()?)),
|
||||||
Message::Response(_) => Err(MessageConversionError),
|
Message::Response(_) => Err(MessageConversionError),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -71,10 +67,10 @@ impl TryFrom<ProtocolResponse> for ProtocolMessage {
|
||||||
|
|
||||||
fn try_from(value: ProtocolResponse) -> Result<Self, Self::Error> {
|
fn try_from(value: ProtocolResponse) -> Result<Self, Self::Error> {
|
||||||
Ok(match value {
|
Ok(match value {
|
||||||
ProtocolResponse::NewTransactions(val) => ProtocolMessage::NewTransactions(val),
|
ProtocolResponse::NewTransactions(val) => Self::NewTransactions(val),
|
||||||
ProtocolResponse::NewFluffyBlock(val) => ProtocolMessage::NewFluffyBlock(val),
|
ProtocolResponse::NewFluffyBlock(val) => Self::NewFluffyBlock(val),
|
||||||
ProtocolResponse::GetChain(val) => ProtocolMessage::ChainEntryResponse(val),
|
ProtocolResponse::GetChain(val) => Self::ChainEntryResponse(val),
|
||||||
ProtocolResponse::GetObjects(val) => ProtocolMessage::GetObjectsResponse(val),
|
ProtocolResponse::GetObjects(val) => Self::GetObjectsResponse(val),
|
||||||
ProtocolResponse::NA => return Err(MessageConversionError),
|
ProtocolResponse::NA => return Err(MessageConversionError),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -85,10 +81,10 @@ impl TryFrom<ProtocolMessage> for ProtocolResponse {
|
||||||
|
|
||||||
fn try_from(value: ProtocolMessage) -> Result<Self, Self::Error> {
|
fn try_from(value: ProtocolMessage) -> Result<Self, Self::Error> {
|
||||||
Ok(match value {
|
Ok(match value {
|
||||||
ProtocolMessage::NewTransactions(val) => ProtocolResponse::NewTransactions(val),
|
ProtocolMessage::NewTransactions(val) => Self::NewTransactions(val),
|
||||||
ProtocolMessage::NewFluffyBlock(val) => ProtocolResponse::NewFluffyBlock(val),
|
ProtocolMessage::NewFluffyBlock(val) => Self::NewFluffyBlock(val),
|
||||||
ProtocolMessage::ChainEntryResponse(val) => ProtocolResponse::GetChain(val),
|
ProtocolMessage::ChainEntryResponse(val) => Self::GetChain(val),
|
||||||
ProtocolMessage::GetObjectsResponse(val) => ProtocolResponse::GetObjects(val),
|
ProtocolMessage::GetObjectsResponse(val) => Self::GetObjects(val),
|
||||||
ProtocolMessage::ChainRequest(_)
|
ProtocolMessage::ChainRequest(_)
|
||||||
| ProtocolMessage::FluffyMissingTransactionsRequest(_)
|
| ProtocolMessage::FluffyMissingTransactionsRequest(_)
|
||||||
| ProtocolMessage::GetObjectsRequest(_)
|
| ProtocolMessage::GetObjectsRequest(_)
|
||||||
|
@ -103,8 +99,8 @@ impl TryFrom<Message> for PeerResponse {
|
||||||
|
|
||||||
fn try_from(value: Message) -> Result<Self, Self::Error> {
|
fn try_from(value: Message) -> Result<Self, Self::Error> {
|
||||||
match value {
|
match value {
|
||||||
Message::Response(res) => Ok(PeerResponse::Admin(res)),
|
Message::Response(res) => Ok(Self::Admin(res)),
|
||||||
Message::Protocol(pro) => Ok(PeerResponse::Protocol(pro.try_into()?)),
|
Message::Protocol(pro) => Ok(Self::Protocol(pro.try_into()?)),
|
||||||
Message::Request(_) => Err(MessageConversionError),
|
Message::Request(_) => Err(MessageConversionError),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -115,8 +111,8 @@ impl TryFrom<PeerResponse> for Message {
|
||||||
|
|
||||||
fn try_from(value: PeerResponse) -> Result<Self, Self::Error> {
|
fn try_from(value: PeerResponse) -> Result<Self, Self::Error> {
|
||||||
Ok(match value {
|
Ok(match value {
|
||||||
PeerResponse::Admin(val) => Message::Response(val),
|
PeerResponse::Admin(val) => Self::Response(val),
|
||||||
PeerResponse::Protocol(val) => Message::Protocol(val.try_into()?),
|
PeerResponse::Protocol(val) => Self::Protocol(val.try_into()?),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,7 +52,7 @@ pub struct ZoneSpecificPeerListEntryBase<A: NetZoneAddress> {
|
||||||
pub rpc_credits_per_hash: u32,
|
pub rpc_credits_per_hash: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<A: NetZoneAddress> From<ZoneSpecificPeerListEntryBase<A>> for cuprate_wire::PeerListEntryBase {
|
impl<A: NetZoneAddress> From<ZoneSpecificPeerListEntryBase<A>> for PeerListEntryBase {
|
||||||
fn from(value: ZoneSpecificPeerListEntryBase<A>) -> Self {
|
fn from(value: ZoneSpecificPeerListEntryBase<A>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
adr: value.adr.into(),
|
adr: value.adr.into(),
|
||||||
|
@ -74,9 +74,7 @@ pub enum PeerListConversionError {
|
||||||
PruningSeed(#[from] PruningError),
|
PruningSeed(#[from] PruningError),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<A: NetZoneAddress> TryFrom<cuprate_wire::PeerListEntryBase>
|
impl<A: NetZoneAddress> TryFrom<PeerListEntryBase> for ZoneSpecificPeerListEntryBase<A> {
|
||||||
for ZoneSpecificPeerListEntryBase<A>
|
|
||||||
{
|
|
||||||
type Error = PeerListConversionError;
|
type Error = PeerListConversionError;
|
||||||
|
|
||||||
fn try_from(value: PeerListEntryBase) -> Result<Self, Self::Error> {
|
fn try_from(value: PeerListEntryBase) -> Result<Self, Self::Error> {
|
||||||
|
|
|
@ -1,4 +1,7 @@
|
||||||
//! This file contains a test for a handshake with monerod but uses fragmented messages.
|
//! This file contains a test for a handshake with monerod but uses fragmented messages.
|
||||||
|
|
||||||
|
#![expect(unused_crate_dependencies, reason = "external test module")]
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
net::SocketAddr,
|
net::SocketAddr,
|
||||||
pin::Pin,
|
pin::Pin,
|
||||||
|
@ -21,6 +24,13 @@ use tokio_util::{
|
||||||
use tower::{Service, ServiceExt};
|
use tower::{Service, ServiceExt};
|
||||||
|
|
||||||
use cuprate_helper::network::Network;
|
use cuprate_helper::network::Network;
|
||||||
|
use cuprate_test_utils::monerod::monerod;
|
||||||
|
use cuprate_wire::{
|
||||||
|
common::PeerSupportFlags,
|
||||||
|
levin::{message::make_fragmented_messages, LevinMessage, Protocol},
|
||||||
|
BasicNodeData, Message, MoneroWireCodec,
|
||||||
|
};
|
||||||
|
|
||||||
use cuprate_p2p_core::{
|
use cuprate_p2p_core::{
|
||||||
client::{
|
client::{
|
||||||
handshaker::HandshakerBuilder, ConnectRequest, Connector, DoHandshakeRequest,
|
handshaker::HandshakerBuilder, ConnectRequest, Connector, DoHandshakeRequest,
|
||||||
|
@ -28,13 +38,6 @@ use cuprate_p2p_core::{
|
||||||
},
|
},
|
||||||
ClearNetServerCfg, ConnectionDirection, NetworkZone,
|
ClearNetServerCfg, ConnectionDirection, NetworkZone,
|
||||||
};
|
};
|
||||||
use cuprate_wire::{
|
|
||||||
common::PeerSupportFlags,
|
|
||||||
levin::{message::make_fragmented_messages, LevinMessage, Protocol},
|
|
||||||
BasicNodeData, Message, MoneroWireCodec,
|
|
||||||
};
|
|
||||||
|
|
||||||
use cuprate_test_utils::monerod::monerod;
|
|
||||||
|
|
||||||
/// A network zone equal to clear net where every message sent is turned into a fragmented message.
|
/// A network zone equal to clear net where every message sent is turned into a fragmented message.
|
||||||
/// Does not support sending fragmented or dummy messages manually.
|
/// Does not support sending fragmented or dummy messages manually.
|
||||||
|
@ -184,7 +187,7 @@ async fn fragmented_handshake_monerod_to_cuprate() {
|
||||||
let next_connection_fut = timeout(Duration::from_secs(30), listener.next());
|
let next_connection_fut = timeout(Duration::from_secs(30), listener.next());
|
||||||
|
|
||||||
if let Some(Ok((addr, stream, sink))) = next_connection_fut.await.unwrap() {
|
if let Some(Ok((addr, stream, sink))) = next_connection_fut.await.unwrap() {
|
||||||
let _ = handshaker
|
handshaker
|
||||||
.ready()
|
.ready()
|
||||||
.await
|
.await
|
||||||
.unwrap()
|
.unwrap()
|
||||||
|
|
|
@ -1,3 +1,5 @@
|
||||||
|
#![expect(unused_crate_dependencies, reason = "external test module")]
|
||||||
|
|
||||||
use std::{sync::Arc, time::Duration};
|
use std::{sync::Arc, time::Duration};
|
||||||
|
|
||||||
use tokio::sync::Semaphore;
|
use tokio::sync::Semaphore;
|
||||||
|
|
|
@ -1,3 +1,5 @@
|
||||||
|
#![expect(unused_crate_dependencies, reason = "external test module")]
|
||||||
|
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
|
@ -9,6 +11,10 @@ use tokio_util::codec::{FramedRead, FramedWrite};
|
||||||
use tower::{Service, ServiceExt};
|
use tower::{Service, ServiceExt};
|
||||||
|
|
||||||
use cuprate_helper::network::Network;
|
use cuprate_helper::network::Network;
|
||||||
|
use cuprate_test_utils::{
|
||||||
|
monerod::monerod,
|
||||||
|
test_netzone::{TestNetZone, TestNetZoneAddr},
|
||||||
|
};
|
||||||
use cuprate_wire::{common::PeerSupportFlags, BasicNodeData, MoneroWireCodec};
|
use cuprate_wire::{common::PeerSupportFlags, BasicNodeData, MoneroWireCodec};
|
||||||
|
|
||||||
use cuprate_p2p_core::{
|
use cuprate_p2p_core::{
|
||||||
|
@ -19,12 +25,8 @@ use cuprate_p2p_core::{
|
||||||
ClearNet, ClearNetServerCfg, ConnectionDirection, NetworkZone,
|
ClearNet, ClearNetServerCfg, ConnectionDirection, NetworkZone,
|
||||||
};
|
};
|
||||||
|
|
||||||
use cuprate_test_utils::{
|
|
||||||
monerod::monerod,
|
|
||||||
test_netzone::{TestNetZone, TestNetZoneAddr},
|
|
||||||
};
|
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
#[expect(clippy::significant_drop_tightening)]
|
||||||
async fn handshake_cuprate_to_cuprate() {
|
async fn handshake_cuprate_to_cuprate() {
|
||||||
// Tests a Cuprate <-> Cuprate handshake by making 2 handshake services and making them talk to
|
// Tests a Cuprate <-> Cuprate handshake by making 2 handshake services and making them talk to
|
||||||
// each other.
|
// each other.
|
||||||
|
@ -147,7 +149,7 @@ async fn handshake_monerod_to_cuprate() {
|
||||||
let next_connection_fut = timeout(Duration::from_secs(30), listener.next());
|
let next_connection_fut = timeout(Duration::from_secs(30), listener.next());
|
||||||
|
|
||||||
if let Some(Ok((addr, stream, sink))) = next_connection_fut.await.unwrap() {
|
if let Some(Ok((addr, stream, sink))) = next_connection_fut.await.unwrap() {
|
||||||
let _ = handshaker
|
handshaker
|
||||||
.ready()
|
.ready()
|
||||||
.await
|
.await
|
||||||
.unwrap()
|
.unwrap()
|
||||||
|
|
|
@ -1,6 +1,9 @@
|
||||||
|
#![expect(unused_crate_dependencies, reason = "external test module")]
|
||||||
|
|
||||||
use tower::{Service, ServiceExt};
|
use tower::{Service, ServiceExt};
|
||||||
|
|
||||||
use cuprate_helper::network::Network;
|
use cuprate_helper::network::Network;
|
||||||
|
use cuprate_test_utils::monerod::monerod;
|
||||||
use cuprate_wire::{common::PeerSupportFlags, protocol::GetObjectsRequest, BasicNodeData};
|
use cuprate_wire::{common::PeerSupportFlags, protocol::GetObjectsRequest, BasicNodeData};
|
||||||
|
|
||||||
use cuprate_p2p_core::{
|
use cuprate_p2p_core::{
|
||||||
|
@ -9,8 +12,6 @@ use cuprate_p2p_core::{
|
||||||
ClearNet, ProtocolRequest, ProtocolResponse,
|
ClearNet, ProtocolRequest, ProtocolResponse,
|
||||||
};
|
};
|
||||||
|
|
||||||
use cuprate_test_utils::monerod::monerod;
|
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn get_single_block_from_monerod() {
|
async fn get_single_block_from_monerod() {
|
||||||
let monerod = monerod(["--out-peers=0"]).await;
|
let monerod = monerod(["--out-peers=0"]).await;
|
||||||
|
|
|
@ -40,3 +40,6 @@ cuprate-test-utils = { path = "../../test-utils" }
|
||||||
indexmap = { workspace = true }
|
indexmap = { workspace = true }
|
||||||
proptest = { workspace = true }
|
proptest = { workspace = true }
|
||||||
tokio-test = { workspace = true }
|
tokio-test = { workspace = true }
|
||||||
|
|
||||||
|
[lints]
|
||||||
|
workspace = true
|
|
@ -79,7 +79,7 @@ pub struct BlockDownloaderConfig {
|
||||||
|
|
||||||
/// An error that occurred in the [`BlockDownloader`].
|
/// An error that occurred in the [`BlockDownloader`].
|
||||||
#[derive(Debug, thiserror::Error)]
|
#[derive(Debug, thiserror::Error)]
|
||||||
pub enum BlockDownloadError {
|
pub(crate) enum BlockDownloadError {
|
||||||
#[error("A request to a peer timed out.")]
|
#[error("A request to a peer timed out.")]
|
||||||
TimedOut,
|
TimedOut,
|
||||||
#[error("The block buffer was closed.")]
|
#[error("The block buffer was closed.")]
|
||||||
|
@ -220,7 +220,7 @@ struct BlockDownloader<N: NetworkZone, S, C> {
|
||||||
/// The running chain entry tasks.
|
/// The running chain entry tasks.
|
||||||
///
|
///
|
||||||
/// Returns a result of the chain entry or an error.
|
/// Returns a result of the chain entry or an error.
|
||||||
#[allow(clippy::type_complexity)]
|
#[expect(clippy::type_complexity)]
|
||||||
chain_entry_task: JoinSet<Result<(ClientPoolDropGuard<N>, ChainEntry<N>), BlockDownloadError>>,
|
chain_entry_task: JoinSet<Result<(ClientPoolDropGuard<N>, ChainEntry<N>), BlockDownloadError>>,
|
||||||
|
|
||||||
/// The current inflight requests.
|
/// The current inflight requests.
|
||||||
|
@ -274,7 +274,7 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Checks if we can make use of any peers that are currently pending requests.
|
/// Checks if we can make use of any peers that are currently pending requests.
|
||||||
async fn check_pending_peers(
|
fn check_pending_peers(
|
||||||
&mut self,
|
&mut self,
|
||||||
chain_tracker: &mut ChainTracker<N>,
|
chain_tracker: &mut ChainTracker<N>,
|
||||||
pending_peers: &mut BTreeMap<PruningSeed, Vec<ClientPoolDropGuard<N>>>,
|
pending_peers: &mut BTreeMap<PruningSeed, Vec<ClientPoolDropGuard<N>>>,
|
||||||
|
@ -288,7 +288,8 @@ where
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(peer) = self.try_handle_free_client(chain_tracker, peer).await {
|
let client = self.try_handle_free_client(chain_tracker, peer);
|
||||||
|
if let Some(peer) = client {
|
||||||
// This peer is ok however it does not have the data we currently need, this will only happen
|
// This peer is ok however it does not have the data we currently need, this will only happen
|
||||||
// because of its pruning seed so just skip over all peers with this pruning seed.
|
// because of its pruning seed so just skip over all peers with this pruning seed.
|
||||||
peers.push(peer);
|
peers.push(peer);
|
||||||
|
@ -304,7 +305,7 @@ where
|
||||||
/// for them.
|
/// for them.
|
||||||
///
|
///
|
||||||
/// Returns the [`ClientPoolDropGuard`] back if it doesn't have the batch according to its pruning seed.
|
/// Returns the [`ClientPoolDropGuard`] back if it doesn't have the batch according to its pruning seed.
|
||||||
async fn request_inflight_batch_again(
|
fn request_inflight_batch_again(
|
||||||
&mut self,
|
&mut self,
|
||||||
client: ClientPoolDropGuard<N>,
|
client: ClientPoolDropGuard<N>,
|
||||||
) -> Option<ClientPoolDropGuard<N>> {
|
) -> Option<ClientPoolDropGuard<N>> {
|
||||||
|
@ -355,7 +356,7 @@ where
|
||||||
///
|
///
|
||||||
/// Returns the [`ClientPoolDropGuard`] back if it doesn't have the data we currently need according
|
/// Returns the [`ClientPoolDropGuard`] back if it doesn't have the data we currently need according
|
||||||
/// to its pruning seed.
|
/// to its pruning seed.
|
||||||
async fn request_block_batch(
|
fn request_block_batch(
|
||||||
&mut self,
|
&mut self,
|
||||||
chain_tracker: &mut ChainTracker<N>,
|
chain_tracker: &mut ChainTracker<N>,
|
||||||
client: ClientPoolDropGuard<N>,
|
client: ClientPoolDropGuard<N>,
|
||||||
|
@ -400,7 +401,7 @@ where
|
||||||
|
|
||||||
// If our ready queue is too large send duplicate requests for the blocks we are waiting on.
|
// If our ready queue is too large send duplicate requests for the blocks we are waiting on.
|
||||||
if self.block_queue.size() >= self.config.in_progress_queue_size {
|
if self.block_queue.size() >= self.config.in_progress_queue_size {
|
||||||
return self.request_inflight_batch_again(client).await;
|
return self.request_inflight_batch_again(client);
|
||||||
}
|
}
|
||||||
|
|
||||||
// No failed requests that we can handle, request some new blocks.
|
// No failed requests that we can handle, request some new blocks.
|
||||||
|
@ -435,7 +436,7 @@ where
|
||||||
///
|
///
|
||||||
/// Returns the [`ClientPoolDropGuard`] back if it doesn't have the data we currently need according
|
/// Returns the [`ClientPoolDropGuard`] back if it doesn't have the data we currently need according
|
||||||
/// to its pruning seed.
|
/// to its pruning seed.
|
||||||
async fn try_handle_free_client(
|
fn try_handle_free_client(
|
||||||
&mut self,
|
&mut self,
|
||||||
chain_tracker: &mut ChainTracker<N>,
|
chain_tracker: &mut ChainTracker<N>,
|
||||||
client: ClientPoolDropGuard<N>,
|
client: ClientPoolDropGuard<N>,
|
||||||
|
@ -473,7 +474,7 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
// Request a batch of blocks instead.
|
// Request a batch of blocks instead.
|
||||||
self.request_block_batch(chain_tracker, client).await
|
self.request_block_batch(chain_tracker, client)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Checks the [`ClientPool`] for free peers.
|
/// Checks the [`ClientPool`] for free peers.
|
||||||
|
@ -517,7 +518,7 @@ where
|
||||||
.push(client);
|
.push(client);
|
||||||
}
|
}
|
||||||
|
|
||||||
self.check_pending_peers(chain_tracker, pending_peers).await;
|
self.check_pending_peers(chain_tracker, pending_peers);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -575,7 +576,7 @@ where
|
||||||
.or_default()
|
.or_default()
|
||||||
.push(client);
|
.push(client);
|
||||||
|
|
||||||
self.check_pending_peers(chain_tracker, pending_peers).await;
|
self.check_pending_peers(chain_tracker, pending_peers);
|
||||||
|
|
||||||
return Ok(());
|
return Ok(());
|
||||||
};
|
};
|
||||||
|
@ -612,7 +613,7 @@ where
|
||||||
.or_default()
|
.or_default()
|
||||||
.push(client);
|
.push(client);
|
||||||
|
|
||||||
self.check_pending_peers(chain_tracker, pending_peers).await;
|
self.check_pending_peers(chain_tracker, pending_peers);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -680,7 +681,7 @@ where
|
||||||
.or_default()
|
.or_default()
|
||||||
.push(client);
|
.push(client);
|
||||||
|
|
||||||
self.check_pending_peers(&mut chain_tracker, &mut pending_peers).await;
|
self.check_pending_peers(&mut chain_tracker, &mut pending_peers);
|
||||||
}
|
}
|
||||||
Err(_) => self.amount_of_empty_chain_entries += 1
|
Err(_) => self.amount_of_empty_chain_entries += 1
|
||||||
}
|
}
|
||||||
|
@ -699,7 +700,7 @@ struct BlockDownloadTaskResponse<N: NetworkZone> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns if a peer has all the blocks in a range, according to its [`PruningSeed`].
|
/// Returns if a peer has all the blocks in a range, according to its [`PruningSeed`].
|
||||||
fn client_has_block_in_range(
|
const fn client_has_block_in_range(
|
||||||
pruning_seed: &PruningSeed,
|
pruning_seed: &PruningSeed,
|
||||||
start_height: usize,
|
start_height: usize,
|
||||||
length: usize,
|
length: usize,
|
||||||
|
|
|
@ -13,7 +13,7 @@ use super::{BlockBatch, BlockDownloadError};
|
||||||
///
|
///
|
||||||
/// Also, the [`Ord`] impl is reversed so older blocks (lower height) come first in a [`BinaryHeap`].
|
/// Also, the [`Ord`] impl is reversed so older blocks (lower height) come first in a [`BinaryHeap`].
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct ReadyQueueBatch {
|
pub(crate) struct ReadyQueueBatch {
|
||||||
/// The start height of the batch.
|
/// The start height of the batch.
|
||||||
pub start_height: usize,
|
pub start_height: usize,
|
||||||
/// The batch of blocks.
|
/// The batch of blocks.
|
||||||
|
@ -43,7 +43,7 @@ impl Ord for ReadyQueueBatch {
|
||||||
|
|
||||||
/// The block queue that holds downloaded block batches, adding them to the [`async_buffer`] when the
|
/// The block queue that holds downloaded block batches, adding them to the [`async_buffer`] when the
|
||||||
/// oldest batch has been downloaded.
|
/// oldest batch has been downloaded.
|
||||||
pub struct BlockQueue {
|
pub(crate) struct BlockQueue {
|
||||||
/// A queue of ready batches.
|
/// A queue of ready batches.
|
||||||
ready_batches: BinaryHeap<ReadyQueueBatch>,
|
ready_batches: BinaryHeap<ReadyQueueBatch>,
|
||||||
/// The size, in bytes, of all the batches in [`Self::ready_batches`].
|
/// The size, in bytes, of all the batches in [`Self::ready_batches`].
|
||||||
|
@ -55,8 +55,8 @@ pub struct BlockQueue {
|
||||||
|
|
||||||
impl BlockQueue {
|
impl BlockQueue {
|
||||||
/// Creates a new [`BlockQueue`].
|
/// Creates a new [`BlockQueue`].
|
||||||
pub fn new(buffer_appender: BufferAppender<BlockBatch>) -> BlockQueue {
|
pub(crate) const fn new(buffer_appender: BufferAppender<BlockBatch>) -> Self {
|
||||||
BlockQueue {
|
Self {
|
||||||
ready_batches: BinaryHeap::new(),
|
ready_batches: BinaryHeap::new(),
|
||||||
ready_batches_size: 0,
|
ready_batches_size: 0,
|
||||||
buffer_appender,
|
buffer_appender,
|
||||||
|
@ -64,12 +64,12 @@ impl BlockQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the oldest batch that has not been put in the [`async_buffer`] yet.
|
/// Returns the oldest batch that has not been put in the [`async_buffer`] yet.
|
||||||
pub fn oldest_ready_batch(&self) -> Option<usize> {
|
pub(crate) fn oldest_ready_batch(&self) -> Option<usize> {
|
||||||
self.ready_batches.peek().map(|batch| batch.start_height)
|
self.ready_batches.peek().map(|batch| batch.start_height)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the size of all the batches that have not been put into the [`async_buffer`] yet.
|
/// Returns the size of all the batches that have not been put into the [`async_buffer`] yet.
|
||||||
pub fn size(&self) -> usize {
|
pub(crate) const fn size(&self) -> usize {
|
||||||
self.ready_batches_size
|
self.ready_batches_size
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -77,7 +77,7 @@ impl BlockQueue {
|
||||||
///
|
///
|
||||||
/// `oldest_in_flight_start_height` should be the start height of the oldest batch that is still inflight, if
|
/// `oldest_in_flight_start_height` should be the start height of the oldest batch that is still inflight, if
|
||||||
/// there are no batches inflight then this should be [`None`].
|
/// there are no batches inflight then this should be [`None`].
|
||||||
pub async fn add_incoming_batch(
|
pub(crate) async fn add_incoming_batch(
|
||||||
&mut self,
|
&mut self,
|
||||||
new_batch: ReadyQueueBatch,
|
new_batch: ReadyQueueBatch,
|
||||||
oldest_in_flight_start_height: Option<usize>,
|
oldest_in_flight_start_height: Option<usize>,
|
||||||
|
|
|
@ -21,7 +21,7 @@ pub(crate) struct ChainEntry<N: NetworkZone> {
|
||||||
|
|
||||||
/// A batch of blocks to retrieve.
|
/// A batch of blocks to retrieve.
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct BlocksToRetrieve<N: NetworkZone> {
|
pub(crate) struct BlocksToRetrieve<N: NetworkZone> {
|
||||||
/// The block IDs to get.
|
/// The block IDs to get.
|
||||||
pub ids: ByteArrayVec<32>,
|
pub ids: ByteArrayVec<32>,
|
||||||
/// The hash of the last block before this batch.
|
/// The hash of the last block before this batch.
|
||||||
|
@ -40,7 +40,7 @@ pub struct BlocksToRetrieve<N: NetworkZone> {
|
||||||
|
|
||||||
/// An error returned from the [`ChainTracker`].
|
/// An error returned from the [`ChainTracker`].
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub enum ChainTrackerError {
|
pub(crate) enum ChainTrackerError {
|
||||||
/// The new chain entry is invalid.
|
/// The new chain entry is invalid.
|
||||||
NewEntryIsInvalid,
|
NewEntryIsInvalid,
|
||||||
/// The new chain entry does not follow from the top of our chain tracker.
|
/// The new chain entry does not follow from the top of our chain tracker.
|
||||||
|
@ -51,7 +51,7 @@ pub enum ChainTrackerError {
|
||||||
///
|
///
|
||||||
/// This struct allows following a single chain. It takes in [`ChainEntry`]s and
|
/// This struct allows following a single chain. It takes in [`ChainEntry`]s and
|
||||||
/// allows getting [`BlocksToRetrieve`].
|
/// allows getting [`BlocksToRetrieve`].
|
||||||
pub struct ChainTracker<N: NetworkZone> {
|
pub(crate) struct ChainTracker<N: NetworkZone> {
|
||||||
/// A list of [`ChainEntry`]s, in order.
|
/// A list of [`ChainEntry`]s, in order.
|
||||||
entries: VecDeque<ChainEntry<N>>,
|
entries: VecDeque<ChainEntry<N>>,
|
||||||
/// The height of the first block, in the first entry in [`Self::entries`].
|
/// The height of the first block, in the first entry in [`Self::entries`].
|
||||||
|
@ -66,7 +66,7 @@ pub struct ChainTracker<N: NetworkZone> {
|
||||||
|
|
||||||
impl<N: NetworkZone> ChainTracker<N> {
|
impl<N: NetworkZone> ChainTracker<N> {
|
||||||
/// Creates a new chain tracker.
|
/// Creates a new chain tracker.
|
||||||
pub fn new(
|
pub(crate) fn new(
|
||||||
new_entry: ChainEntry<N>,
|
new_entry: ChainEntry<N>,
|
||||||
first_height: usize,
|
first_height: usize,
|
||||||
our_genesis: [u8; 32],
|
our_genesis: [u8; 32],
|
||||||
|
@ -77,9 +77,9 @@ impl<N: NetworkZone> ChainTracker<N> {
|
||||||
entries.push_back(new_entry);
|
entries.push_back(new_entry);
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
top_seen_hash,
|
|
||||||
entries,
|
entries,
|
||||||
first_height,
|
first_height,
|
||||||
|
top_seen_hash,
|
||||||
previous_hash,
|
previous_hash,
|
||||||
our_genesis,
|
our_genesis,
|
||||||
}
|
}
|
||||||
|
@ -87,17 +87,17 @@ impl<N: NetworkZone> ChainTracker<N> {
|
||||||
|
|
||||||
/// Returns `true` if the peer is expected to have the next block after our highest seen block
|
/// Returns `true` if the peer is expected to have the next block after our highest seen block
|
||||||
/// according to their pruning seed.
|
/// according to their pruning seed.
|
||||||
pub fn should_ask_for_next_chain_entry(&self, seed: &PruningSeed) -> bool {
|
pub(crate) fn should_ask_for_next_chain_entry(&self, seed: &PruningSeed) -> bool {
|
||||||
seed.has_full_block(self.top_height(), MAX_BLOCK_HEIGHT_USIZE)
|
seed.has_full_block(self.top_height(), MAX_BLOCK_HEIGHT_USIZE)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the simple history, the highest seen block and the genesis block.
|
/// Returns the simple history, the highest seen block and the genesis block.
|
||||||
pub fn get_simple_history(&self) -> [[u8; 32]; 2] {
|
pub(crate) const fn get_simple_history(&self) -> [[u8; 32]; 2] {
|
||||||
[self.top_seen_hash, self.our_genesis]
|
[self.top_seen_hash, self.our_genesis]
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the height of the highest block we are tracking.
|
/// Returns the height of the highest block we are tracking.
|
||||||
pub fn top_height(&self) -> usize {
|
pub(crate) fn top_height(&self) -> usize {
|
||||||
let top_block_idx = self
|
let top_block_idx = self
|
||||||
.entries
|
.entries
|
||||||
.iter()
|
.iter()
|
||||||
|
@ -111,7 +111,7 @@ impl<N: NetworkZone> ChainTracker<N> {
|
||||||
///
|
///
|
||||||
/// # Panics
|
/// # Panics
|
||||||
/// This function panics if `batch_size` is `0`.
|
/// This function panics if `batch_size` is `0`.
|
||||||
pub fn block_requests_queued(&self, batch_size: usize) -> usize {
|
pub(crate) fn block_requests_queued(&self, batch_size: usize) -> usize {
|
||||||
self.entries
|
self.entries
|
||||||
.iter()
|
.iter()
|
||||||
.map(|entry| entry.ids.len().div_ceil(batch_size))
|
.map(|entry| entry.ids.len().div_ceil(batch_size))
|
||||||
|
@ -119,7 +119,10 @@ impl<N: NetworkZone> ChainTracker<N> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Attempts to add an incoming [`ChainEntry`] to the chain tracker.
|
/// Attempts to add an incoming [`ChainEntry`] to the chain tracker.
|
||||||
pub fn add_entry(&mut self, mut chain_entry: ChainEntry<N>) -> Result<(), ChainTrackerError> {
|
pub(crate) fn add_entry(
|
||||||
|
&mut self,
|
||||||
|
mut chain_entry: ChainEntry<N>,
|
||||||
|
) -> Result<(), ChainTrackerError> {
|
||||||
if chain_entry.ids.is_empty() {
|
if chain_entry.ids.is_empty() {
|
||||||
// The peer must send at lest one overlapping block.
|
// The peer must send at lest one overlapping block.
|
||||||
chain_entry.handle.ban_peer(MEDIUM_BAN);
|
chain_entry.handle.ban_peer(MEDIUM_BAN);
|
||||||
|
@ -155,7 +158,7 @@ impl<N: NetworkZone> ChainTracker<N> {
|
||||||
/// Returns a batch of blocks to request.
|
/// Returns a batch of blocks to request.
|
||||||
///
|
///
|
||||||
/// The returned batches length will be less than or equal to `max_blocks`
|
/// The returned batches length will be less than or equal to `max_blocks`
|
||||||
pub fn blocks_to_get(
|
pub(crate) fn blocks_to_get(
|
||||||
&mut self,
|
&mut self,
|
||||||
pruning_seed: &PruningSeed,
|
pruning_seed: &PruningSeed,
|
||||||
max_blocks: usize,
|
max_blocks: usize,
|
||||||
|
|
|
@ -30,6 +30,7 @@ use crate::{
|
||||||
attempt = _attempt
|
attempt = _attempt
|
||||||
)
|
)
|
||||||
)]
|
)]
|
||||||
|
#[expect(clippy::used_underscore_binding)]
|
||||||
pub async fn download_batch_task<N: NetworkZone>(
|
pub async fn download_batch_task<N: NetworkZone>(
|
||||||
client: ClientPoolDropGuard<N>,
|
client: ClientPoolDropGuard<N>,
|
||||||
ids: ByteArrayVec<32>,
|
ids: ByteArrayVec<32>,
|
||||||
|
@ -103,6 +104,7 @@ async fn request_batch_from_peer<N: NetworkZone>(
|
||||||
Ok((client, batch))
|
Ok((client, batch))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[expect(clippy::needless_pass_by_value)]
|
||||||
fn deserialize_batch(
|
fn deserialize_batch(
|
||||||
blocks_response: GetObjectsResponse,
|
blocks_response: GetObjectsResponse,
|
||||||
expected_start_height: usize,
|
expected_start_height: usize,
|
||||||
|
|
|
@ -30,7 +30,7 @@ use crate::{
|
||||||
///
|
///
|
||||||
/// Because the block downloader only follows and downloads one chain we only have to send the block hash of
|
/// Because the block downloader only follows and downloads one chain we only have to send the block hash of
|
||||||
/// top block we have found and the genesis block, this is then called `short_history`.
|
/// top block we have found and the genesis block, this is then called `short_history`.
|
||||||
pub async fn request_chain_entry_from_peer<N: NetworkZone>(
|
pub(crate) async fn request_chain_entry_from_peer<N: NetworkZone>(
|
||||||
mut client: ClientPoolDropGuard<N>,
|
mut client: ClientPoolDropGuard<N>,
|
||||||
short_history: [[u8; 32]; 2],
|
short_history: [[u8; 32]; 2],
|
||||||
) -> Result<(ClientPoolDropGuard<N>, ChainEntry<N>), BlockDownloadError> {
|
) -> Result<(ClientPoolDropGuard<N>, ChainEntry<N>), BlockDownloadError> {
|
||||||
|
@ -179,7 +179,7 @@ where
|
||||||
Some(res) => {
|
Some(res) => {
|
||||||
// res has already been set, replace it if this peer claims higher cumulative difficulty
|
// res has already been set, replace it if this peer claims higher cumulative difficulty
|
||||||
if res.0.cumulative_difficulty() < task_res.0.cumulative_difficulty() {
|
if res.0.cumulative_difficulty() < task_res.0.cumulative_difficulty() {
|
||||||
let _ = mem::replace(res, task_res);
|
drop(mem::replace(res, task_res));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
|
|
|
@ -47,6 +47,7 @@ proptest! {
|
||||||
|
|
||||||
let tokio_pool = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap();
|
let tokio_pool = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap();
|
||||||
|
|
||||||
|
#[expect(clippy::significant_drop_tightening)]
|
||||||
tokio_pool.block_on(async move {
|
tokio_pool.block_on(async move {
|
||||||
timeout(Duration::from_secs(600), async move {
|
timeout(Duration::from_secs(600), async move {
|
||||||
let client_pool = ClientPool::new();
|
let client_pool = ClientPool::new();
|
||||||
|
@ -54,7 +55,7 @@ proptest! {
|
||||||
let mut peer_ids = Vec::with_capacity(peers);
|
let mut peer_ids = Vec::with_capacity(peers);
|
||||||
|
|
||||||
for _ in 0..peers {
|
for _ in 0..peers {
|
||||||
let client = mock_block_downloader_client(blockchain.clone());
|
let client = mock_block_downloader_client(Arc::clone(&blockchain));
|
||||||
|
|
||||||
peer_ids.push(client.info.id);
|
peer_ids.push(client.info.id);
|
||||||
|
|
||||||
|
@ -156,7 +157,7 @@ prop_compose! {
|
||||||
for (height, mut block) in blocks.into_iter().enumerate() {
|
for (height, mut block) in blocks.into_iter().enumerate() {
|
||||||
if let Some(last) = blockchain.last() {
|
if let Some(last) = blockchain.last() {
|
||||||
block.0.header.previous = *last.0;
|
block.0.header.previous = *last.0;
|
||||||
block.0.miner_transaction.prefix_mut().inputs = vec![Input::Gen(height)]
|
block.0.miner_transaction.prefix_mut().inputs = vec![Input::Gen(height)];
|
||||||
}
|
}
|
||||||
|
|
||||||
blockchain.insert(block.0.hash(), block);
|
blockchain.insert(block.0.hash(), block);
|
||||||
|
@ -173,7 +174,7 @@ fn mock_block_downloader_client(blockchain: Arc<MockBlockchain>) -> Client<Clear
|
||||||
cuprate_p2p_core::handles::HandleBuilder::new().build();
|
cuprate_p2p_core::handles::HandleBuilder::new().build();
|
||||||
|
|
||||||
let request_handler = service_fn(move |req: PeerRequest| {
|
let request_handler = service_fn(move |req: PeerRequest| {
|
||||||
let bc = blockchain.clone();
|
let bc = Arc::clone(&blockchain);
|
||||||
|
|
||||||
async move {
|
async move {
|
||||||
match req {
|
match req {
|
||||||
|
|
|
@ -35,7 +35,7 @@ use crate::constants::{
|
||||||
|
|
||||||
/// The configuration for the [`BroadcastSvc`].
|
/// The configuration for the [`BroadcastSvc`].
|
||||||
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||||
pub struct BroadcastConfig {
|
pub(crate) struct BroadcastConfig {
|
||||||
/// The average number of seconds between diffusion flushes for outbound connections.
|
/// The average number of seconds between diffusion flushes for outbound connections.
|
||||||
pub diffusion_flush_average_seconds_outbound: Duration,
|
pub diffusion_flush_average_seconds_outbound: Duration,
|
||||||
/// The average number of seconds between diffusion flushes for inbound connections.
|
/// The average number of seconds between diffusion flushes for inbound connections.
|
||||||
|
@ -57,7 +57,7 @@ impl Default for BroadcastConfig {
|
||||||
/// - The [`BroadcastSvc`]
|
/// - The [`BroadcastSvc`]
|
||||||
/// - A function that takes in [`InternalPeerID`]s and produces [`BroadcastMessageStream`]s to give to **outbound** peers.
|
/// - A function that takes in [`InternalPeerID`]s and produces [`BroadcastMessageStream`]s to give to **outbound** peers.
|
||||||
/// - A function that takes in [`InternalPeerID`]s and produces [`BroadcastMessageStream`]s to give to **inbound** peers.
|
/// - A function that takes in [`InternalPeerID`]s and produces [`BroadcastMessageStream`]s to give to **inbound** peers.
|
||||||
pub fn init_broadcast_channels<N: NetworkZone>(
|
pub(crate) fn init_broadcast_channels<N: NetworkZone>(
|
||||||
config: BroadcastConfig,
|
config: BroadcastConfig,
|
||||||
) -> (
|
) -> (
|
||||||
BroadcastSvc<N>,
|
BroadcastSvc<N>,
|
||||||
|
@ -193,7 +193,7 @@ impl<N: NetworkZone> Service<BroadcastRequest<N>> for BroadcastSvc<N> {
|
||||||
};
|
};
|
||||||
|
|
||||||
// An error here means _all_ receivers were dropped which we assume will never happen.
|
// An error here means _all_ receivers were dropped which we assume will never happen.
|
||||||
let _ = match direction {
|
drop(match direction {
|
||||||
Some(ConnectionDirection::Inbound) => {
|
Some(ConnectionDirection::Inbound) => {
|
||||||
self.tx_broadcast_channel_inbound.send(nex_tx_info)
|
self.tx_broadcast_channel_inbound.send(nex_tx_info)
|
||||||
}
|
}
|
||||||
|
@ -201,10 +201,10 @@ impl<N: NetworkZone> Service<BroadcastRequest<N>> for BroadcastSvc<N> {
|
||||||
self.tx_broadcast_channel_outbound.send(nex_tx_info)
|
self.tx_broadcast_channel_outbound.send(nex_tx_info)
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
let _ = self.tx_broadcast_channel_outbound.send(nex_tx_info.clone());
|
drop(self.tx_broadcast_channel_outbound.send(nex_tx_info.clone()));
|
||||||
self.tx_broadcast_channel_inbound.send(nex_tx_info)
|
self.tx_broadcast_channel_inbound.send(nex_tx_info)
|
||||||
}
|
}
|
||||||
};
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -246,7 +246,7 @@ struct BroadcastTxInfo<N: NetworkZone> {
|
||||||
///
|
///
|
||||||
/// This is given to the connection task to await on for broadcast messages.
|
/// This is given to the connection task to await on for broadcast messages.
|
||||||
#[pin_project::pin_project]
|
#[pin_project::pin_project]
|
||||||
pub struct BroadcastMessageStream<N: NetworkZone> {
|
pub(crate) struct BroadcastMessageStream<N: NetworkZone> {
|
||||||
/// The peer that is holding this stream.
|
/// The peer that is holding this stream.
|
||||||
addr: InternalPeerID<N::Addr>,
|
addr: InternalPeerID<N::Addr>,
|
||||||
|
|
||||||
|
@ -336,8 +336,9 @@ impl<N: NetworkZone> Stream for BroadcastMessageStream<N> {
|
||||||
Poll::Ready(Some(BroadcastMessage::NewTransaction(txs)))
|
Poll::Ready(Some(BroadcastMessage::NewTransaction(txs)))
|
||||||
} else {
|
} else {
|
||||||
tracing::trace!("Diffusion flush timer expired but no txs to diffuse");
|
tracing::trace!("Diffusion flush timer expired but no txs to diffuse");
|
||||||
// poll next_flush now to register the waker with it
|
// poll next_flush now to register the waker with it.
|
||||||
// the waker will already be registered with the block broadcast channel.
|
// the waker will already be registered with the block broadcast channel.
|
||||||
|
#[expect(clippy::let_underscore_must_use)]
|
||||||
let _ = this.next_flush.poll(cx);
|
let _ = this.next_flush.poll(cx);
|
||||||
Poll::Pending
|
Poll::Pending
|
||||||
}
|
}
|
||||||
|
@ -458,7 +459,7 @@ mod tests {
|
||||||
|
|
||||||
let match_tx = |mes, txs| match mes {
|
let match_tx = |mes, txs| match mes {
|
||||||
BroadcastMessage::NewTransaction(tx) => assert_eq!(tx.txs.as_slice(), txs),
|
BroadcastMessage::NewTransaction(tx) => assert_eq!(tx.txs.as_slice(), txs),
|
||||||
_ => panic!("Block broadcast?"),
|
BroadcastMessage::NewFluffyBlock(_) => panic!("Block broadcast?"),
|
||||||
};
|
};
|
||||||
|
|
||||||
let next = outbound_stream.next().await.unwrap();
|
let next = outbound_stream.next().await.unwrap();
|
||||||
|
@ -520,7 +521,7 @@ mod tests {
|
||||||
|
|
||||||
let match_tx = |mes, txs| match mes {
|
let match_tx = |mes, txs| match mes {
|
||||||
BroadcastMessage::NewTransaction(tx) => assert_eq!(tx.txs.as_slice(), txs),
|
BroadcastMessage::NewTransaction(tx) => assert_eq!(tx.txs.as_slice(), txs),
|
||||||
_ => panic!("Block broadcast?"),
|
BroadcastMessage::NewFluffyBlock(_) => panic!("Block broadcast?"),
|
||||||
};
|
};
|
||||||
|
|
||||||
let next = outbound_stream.next().await.unwrap();
|
let next = outbound_stream.next().await.unwrap();
|
||||||
|
@ -536,6 +537,6 @@ mod tests {
|
||||||
futures::future::select(inbound_stream_from.next(), outbound_stream_from.next())
|
futures::future::select(inbound_stream_from.next(), outbound_stream_from.next())
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.is_err())
|
.is_err());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,7 +8,7 @@
|
||||||
//! returns the peer to the pool when it is dropped.
|
//! returns the peer to the pool when it is dropped.
|
||||||
//!
|
//!
|
||||||
//! Internally the pool is a [`DashMap`] which means care should be taken in `async` code
|
//! Internally the pool is a [`DashMap`] which means care should be taken in `async` code
|
||||||
//! as internally this uses blocking RwLocks.
|
//! as internally this uses blocking `RwLock`s.
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use dashmap::DashMap;
|
use dashmap::DashMap;
|
||||||
|
@ -24,7 +24,7 @@ use cuprate_p2p_core::{
|
||||||
pub(crate) mod disconnect_monitor;
|
pub(crate) mod disconnect_monitor;
|
||||||
mod drop_guard_client;
|
mod drop_guard_client;
|
||||||
|
|
||||||
pub use drop_guard_client::ClientPoolDropGuard;
|
pub(crate) use drop_guard_client::ClientPoolDropGuard;
|
||||||
|
|
||||||
/// The client pool, which holds currently connected free peers.
|
/// The client pool, which holds currently connected free peers.
|
||||||
///
|
///
|
||||||
|
@ -38,16 +38,17 @@ pub struct ClientPool<N: NetworkZone> {
|
||||||
|
|
||||||
impl<N: NetworkZone> ClientPool<N> {
|
impl<N: NetworkZone> ClientPool<N> {
|
||||||
/// Returns a new [`ClientPool`] wrapped in an [`Arc`].
|
/// Returns a new [`ClientPool`] wrapped in an [`Arc`].
|
||||||
pub fn new() -> Arc<ClientPool<N>> {
|
pub fn new() -> Arc<Self> {
|
||||||
let (tx, rx) = mpsc::unbounded_channel();
|
let (tx, rx) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
let pool = Arc::new(ClientPool {
|
let pool = Arc::new(Self {
|
||||||
clients: DashMap::new(),
|
clients: DashMap::new(),
|
||||||
new_connection_tx: tx,
|
new_connection_tx: tx,
|
||||||
});
|
});
|
||||||
|
|
||||||
tokio::spawn(
|
tokio::spawn(
|
||||||
disconnect_monitor::disconnect_monitor(rx, pool.clone()).instrument(Span::current()),
|
disconnect_monitor::disconnect_monitor(rx, Arc::clone(&pool))
|
||||||
|
.instrument(Span::current()),
|
||||||
);
|
);
|
||||||
|
|
||||||
pool
|
pool
|
||||||
|
@ -69,8 +70,7 @@ impl<N: NetworkZone> ClientPool<N> {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
let res = self.clients.insert(id, client);
|
assert!(self.clients.insert(id, client).is_none());
|
||||||
assert!(res.is_none());
|
|
||||||
|
|
||||||
// We have to check this again otherwise we could have a race condition where a
|
// We have to check this again otherwise we could have a race condition where a
|
||||||
// peer is disconnected after the first check, the disconnect monitor tries to remove it,
|
// peer is disconnected after the first check, the disconnect monitor tries to remove it,
|
||||||
|
@ -121,7 +121,6 @@ impl<N: NetworkZone> ClientPool<N> {
|
||||||
/// Note that the returned iterator is not guaranteed to contain every peer asked for.
|
/// Note that the returned iterator is not guaranteed to contain every peer asked for.
|
||||||
///
|
///
|
||||||
/// See [`Self::borrow_client`] for borrowing a single client.
|
/// See [`Self::borrow_client`] for borrowing a single client.
|
||||||
#[allow(private_interfaces)] // TODO: Remove me when 2024 Rust
|
|
||||||
pub fn borrow_clients<'a, 'b>(
|
pub fn borrow_clients<'a, 'b>(
|
||||||
self: &'a Arc<Self>,
|
self: &'a Arc<Self>,
|
||||||
peers: &'b [InternalPeerID<N::Addr>],
|
peers: &'b [InternalPeerID<N::Addr>],
|
||||||
|
@ -133,7 +132,7 @@ impl<N: NetworkZone> ClientPool<N> {
|
||||||
mod sealed {
|
mod sealed {
|
||||||
/// TODO: Remove me when 2024 Rust
|
/// TODO: Remove me when 2024 Rust
|
||||||
///
|
///
|
||||||
/// https://rust-lang.github.io/rfcs/3498-lifetime-capture-rules-2024.html#the-captures-trick
|
/// <https://rust-lang.github.io/rfcs/3498-lifetime-capture-rules-2024.html#the-captures-trick>
|
||||||
pub trait Captures<U> {}
|
pub trait Captures<U> {}
|
||||||
|
|
||||||
impl<T: ?Sized, U> Captures<U> for T {}
|
impl<T: ?Sized, U> Captures<U> for T {}
|
||||||
|
|
|
@ -78,6 +78,6 @@ impl<N: NetworkZone> Future for PeerDisconnectFut<N> {
|
||||||
|
|
||||||
this.closed_fut
|
this.closed_fut
|
||||||
.poll(cx)
|
.poll(cx)
|
||||||
.map(|_| this.peer_id.take().unwrap())
|
.map(|()| this.peer_id.take().unwrap())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -99,12 +99,17 @@ where
|
||||||
|
|
||||||
/// Connects to random seeds to get peers and immediately disconnects
|
/// Connects to random seeds to get peers and immediately disconnects
|
||||||
#[instrument(level = "info", skip(self))]
|
#[instrument(level = "info", skip(self))]
|
||||||
|
#[expect(
|
||||||
|
clippy::significant_drop_in_scrutinee,
|
||||||
|
clippy::significant_drop_tightening
|
||||||
|
)]
|
||||||
async fn connect_to_random_seeds(&mut self) -> Result<(), OutboundConnectorError> {
|
async fn connect_to_random_seeds(&mut self) -> Result<(), OutboundConnectorError> {
|
||||||
let seeds = N::SEEDS.choose_multiple(&mut thread_rng(), MAX_SEED_CONNECTIONS);
|
let seeds = N::SEEDS.choose_multiple(&mut thread_rng(), MAX_SEED_CONNECTIONS);
|
||||||
|
|
||||||
if seeds.len() == 0 {
|
assert!(
|
||||||
panic!("No seed nodes available to get peers from");
|
seeds.len() != 0,
|
||||||
}
|
"No seed nodes available to get peers from"
|
||||||
|
);
|
||||||
|
|
||||||
let mut allowed_errors = seeds.len();
|
let mut allowed_errors = seeds.len();
|
||||||
|
|
||||||
|
@ -129,7 +134,7 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
while let Some(res) = handshake_futs.join_next().await {
|
while let Some(res) = handshake_futs.join_next().await {
|
||||||
if matches!(res, Err(_) | Ok(Err(_)) | Ok(Ok(Err(_)))) {
|
if matches!(res, Err(_) | Ok(Err(_) | Ok(Err(_)))) {
|
||||||
allowed_errors -= 1;
|
allowed_errors -= 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -144,7 +149,7 @@ where
|
||||||
/// Connects to a given outbound peer.
|
/// Connects to a given outbound peer.
|
||||||
#[instrument(level = "info", skip_all)]
|
#[instrument(level = "info", skip_all)]
|
||||||
async fn connect_to_outbound_peer(&mut self, permit: OwnedSemaphorePermit, addr: N::Addr) {
|
async fn connect_to_outbound_peer(&mut self, permit: OwnedSemaphorePermit, addr: N::Addr) {
|
||||||
let client_pool = self.client_pool.clone();
|
let client_pool = Arc::clone(&self.client_pool);
|
||||||
let connection_fut = self
|
let connection_fut = self
|
||||||
.connector_svc
|
.connector_svc
|
||||||
.ready()
|
.ready()
|
||||||
|
@ -157,6 +162,7 @@ where
|
||||||
|
|
||||||
tokio::spawn(
|
tokio::spawn(
|
||||||
async move {
|
async move {
|
||||||
|
#[expect(clippy::significant_drop_in_scrutinee)]
|
||||||
if let Ok(Ok(peer)) = timeout(HANDSHAKE_TIMEOUT, connection_fut).await {
|
if let Ok(Ok(peer)) = timeout(HANDSHAKE_TIMEOUT, connection_fut).await {
|
||||||
client_pool.add_new_client(peer);
|
client_pool.add_new_client(peer);
|
||||||
}
|
}
|
||||||
|
@ -166,14 +172,16 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handles a request from the peer set for more peers.
|
/// Handles a request from the peer set for more peers.
|
||||||
|
#[expect(
|
||||||
|
clippy::significant_drop_tightening,
|
||||||
|
reason = "we need to hold onto a permit"
|
||||||
|
)]
|
||||||
async fn handle_peer_request(
|
async fn handle_peer_request(
|
||||||
&mut self,
|
&mut self,
|
||||||
req: &MakeConnectionRequest,
|
req: &MakeConnectionRequest,
|
||||||
) -> Result<(), OutboundConnectorError> {
|
) -> Result<(), OutboundConnectorError> {
|
||||||
// try to get a permit.
|
// try to get a permit.
|
||||||
let permit = self
|
let permit = Arc::clone(&self.outbound_semaphore)
|
||||||
.outbound_semaphore
|
|
||||||
.clone()
|
|
||||||
.try_acquire_owned()
|
.try_acquire_owned()
|
||||||
.or_else(|_| {
|
.or_else(|_| {
|
||||||
// if we can't get a permit add one if we are below the max number of connections.
|
// if we can't get a permit add one if we are below the max number of connections.
|
||||||
|
@ -183,7 +191,9 @@ where
|
||||||
} else {
|
} else {
|
||||||
self.outbound_semaphore.add_permits(1);
|
self.outbound_semaphore.add_permits(1);
|
||||||
self.extra_peers += 1;
|
self.extra_peers += 1;
|
||||||
Ok(self.outbound_semaphore.clone().try_acquire_owned().unwrap())
|
Ok(Arc::clone(&self.outbound_semaphore)
|
||||||
|
.try_acquire_owned()
|
||||||
|
.unwrap())
|
||||||
}
|
}
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
|
@ -272,12 +282,12 @@ where
|
||||||
tracing::info!("Shutting down outbound connector, make connection channel closed.");
|
tracing::info!("Shutting down outbound connector, make connection channel closed.");
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
// We can't really do much about errors in this function.
|
#[expect(clippy::let_underscore_must_use, reason = "We can't really do much about errors in this function.")]
|
||||||
let _ = self.handle_peer_request(&peer_req).await;
|
let _ = self.handle_peer_request(&peer_req).await;
|
||||||
},
|
},
|
||||||
// This future is not cancellation safe as you will lose your space in the queue but as we are the only place
|
// This future is not cancellation safe as you will lose your space in the queue but as we are the only place
|
||||||
// that actually requires permits that should be ok.
|
// that actually requires permits that should be ok.
|
||||||
Ok(permit) = self.outbound_semaphore.clone().acquire_owned() => {
|
Ok(permit) = Arc::clone(&self.outbound_semaphore).acquire_owned() => {
|
||||||
if self.handle_free_permit(permit).await.is_err() {
|
if self.handle_free_permit(permit).await.is_err() {
|
||||||
// if we got an error then we still have a permit free so to prevent this from just looping
|
// if we got an error then we still have a permit free so to prevent this from just looping
|
||||||
// uncontrollably add a timeout.
|
// uncontrollably add a timeout.
|
||||||
|
|
|
@ -100,7 +100,7 @@ where
|
||||||
};
|
};
|
||||||
|
|
||||||
// If we're still behind our maximum limit, Initiate handshake.
|
// If we're still behind our maximum limit, Initiate handshake.
|
||||||
if let Ok(permit) = semaphore.clone().try_acquire_owned() {
|
if let Ok(permit) = Arc::clone(&semaphore).try_acquire_owned() {
|
||||||
tracing::debug!("Permit free for incoming connection, attempting handshake.");
|
tracing::debug!("Permit free for incoming connection, attempting handshake.");
|
||||||
|
|
||||||
let fut = handshaker.ready().await?.call(DoHandshakeRequest {
|
let fut = handshaker.ready().await?.call(DoHandshakeRequest {
|
||||||
|
@ -111,11 +111,12 @@ where
|
||||||
permit: Some(permit),
|
permit: Some(permit),
|
||||||
});
|
});
|
||||||
|
|
||||||
let cloned_pool = client_pool.clone();
|
let cloned_pool = Arc::clone(&client_pool);
|
||||||
|
|
||||||
tokio::spawn(
|
tokio::spawn(
|
||||||
async move {
|
async move {
|
||||||
if let Ok(Ok(peer)) = timeout(HANDSHAKE_TIMEOUT, fut).await {
|
let client = timeout(HANDSHAKE_TIMEOUT, fut).await;
|
||||||
|
if let Ok(Ok(peer)) = client {
|
||||||
cloned_pool.add_new_client(peer);
|
cloned_pool.add_new_client(peer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -133,8 +134,10 @@ where
|
||||||
let fut = timeout(PING_REQUEST_TIMEOUT, peer_stream.next());
|
let fut = timeout(PING_REQUEST_TIMEOUT, peer_stream.next());
|
||||||
|
|
||||||
// Ok if timeout did not elapsed -> Some if there is a message -> Ok if it has been decoded
|
// Ok if timeout did not elapsed -> Some if there is a message -> Ok if it has been decoded
|
||||||
if let Ok(Some(Ok(Message::Request(AdminRequestMessage::Ping)))) = fut.await
|
if matches!(
|
||||||
{
|
fut.await,
|
||||||
|
Ok(Some(Ok(Message::Request(AdminRequestMessage::Ping))))
|
||||||
|
) {
|
||||||
let response = peer_sink
|
let response = peer_sink
|
||||||
.send(
|
.send(
|
||||||
Message::Response(AdminResponseMessage::Ping(PingResponse {
|
Message::Response(AdminResponseMessage::Ping(PingResponse {
|
||||||
|
@ -148,7 +151,7 @@ where
|
||||||
if let Err(err) = response {
|
if let Err(err) = response {
|
||||||
tracing::debug!(
|
tracing::debug!(
|
||||||
"Unable to respond to ping request from peer ({addr}): {err}"
|
"Unable to respond to ping request from peer ({addr}): {err}"
|
||||||
)
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -103,7 +103,7 @@ where
|
||||||
let outbound_connector = Connector::new(outbound_handshaker);
|
let outbound_connector = Connector::new(outbound_handshaker);
|
||||||
let outbound_connection_maintainer = connection_maintainer::OutboundConnectionKeeper::new(
|
let outbound_connection_maintainer = connection_maintainer::OutboundConnectionKeeper::new(
|
||||||
config.clone(),
|
config.clone(),
|
||||||
client_pool.clone(),
|
Arc::clone(&client_pool),
|
||||||
make_connection_rx,
|
make_connection_rx,
|
||||||
address_book.clone(),
|
address_book.clone(),
|
||||||
outbound_connector,
|
outbound_connector,
|
||||||
|
@ -118,17 +118,17 @@ where
|
||||||
);
|
);
|
||||||
background_tasks.spawn(
|
background_tasks.spawn(
|
||||||
inbound_server::inbound_server(
|
inbound_server::inbound_server(
|
||||||
client_pool.clone(),
|
Arc::clone(&client_pool),
|
||||||
inbound_handshaker,
|
inbound_handshaker,
|
||||||
address_book.clone(),
|
address_book.clone(),
|
||||||
config,
|
config,
|
||||||
)
|
)
|
||||||
.map(|res| {
|
.map(|res| {
|
||||||
if let Err(e) = res {
|
if let Err(e) = res {
|
||||||
tracing::error!("Error in inbound connection listener: {e}")
|
tracing::error!("Error in inbound connection listener: {e}");
|
||||||
}
|
}
|
||||||
|
|
||||||
tracing::info!("Inbound connection listener shutdown")
|
tracing::info!("Inbound connection listener shutdown");
|
||||||
})
|
})
|
||||||
.instrument(Span::current()),
|
.instrument(Span::current()),
|
||||||
);
|
);
|
||||||
|
@ -155,7 +155,7 @@ pub struct NetworkInterface<N: NetworkZone> {
|
||||||
/// on that claimed chain.
|
/// on that claimed chain.
|
||||||
top_block_watch: watch::Receiver<sync_states::NewSyncInfo>,
|
top_block_watch: watch::Receiver<sync_states::NewSyncInfo>,
|
||||||
/// A channel to request extra connections.
|
/// A channel to request extra connections.
|
||||||
#[allow(dead_code)] // will be used eventually
|
#[expect(dead_code, reason = "will be used eventually")]
|
||||||
make_connection_tx: mpsc::Sender<MakeConnectionRequest>,
|
make_connection_tx: mpsc::Sender<MakeConnectionRequest>,
|
||||||
/// The address book service.
|
/// The address book service.
|
||||||
address_book: BoxCloneService<AddressBookRequest<N>, AddressBookResponse<N>, tower::BoxError>,
|
address_book: BoxCloneService<AddressBookRequest<N>, AddressBookResponse<N>, tower::BoxError>,
|
||||||
|
@ -184,7 +184,7 @@ impl<N: NetworkZone> NetworkInterface<N> {
|
||||||
C::Future: Send + 'static,
|
C::Future: Send + 'static,
|
||||||
{
|
{
|
||||||
block_downloader::download_blocks(
|
block_downloader::download_blocks(
|
||||||
self.pool.clone(),
|
Arc::clone(&self.pool),
|
||||||
self.sync_states_svc.clone(),
|
self.sync_states_svc.clone(),
|
||||||
our_chain_service,
|
our_chain_service,
|
||||||
config,
|
config,
|
||||||
|
|
|
@ -41,7 +41,7 @@ pub struct NewSyncInfo {
|
||||||
/// This is the service that handles:
|
/// This is the service that handles:
|
||||||
/// 1. Finding out if we need to sync
|
/// 1. Finding out if we need to sync
|
||||||
/// 1. Giving the peers that should be synced _from_, to the requester
|
/// 1. Giving the peers that should be synced _from_, to the requester
|
||||||
pub struct PeerSyncSvc<N: NetworkZone> {
|
pub(crate) struct PeerSyncSvc<N: NetworkZone> {
|
||||||
/// A map of cumulative difficulties to peers.
|
/// A map of cumulative difficulties to peers.
|
||||||
cumulative_difficulties: BTreeMap<u128, HashSet<InternalPeerID<N::Addr>>>,
|
cumulative_difficulties: BTreeMap<u128, HashSet<InternalPeerID<N::Addr>>>,
|
||||||
/// A map of peers to cumulative difficulties.
|
/// A map of peers to cumulative difficulties.
|
||||||
|
@ -57,7 +57,7 @@ pub struct PeerSyncSvc<N: NetworkZone> {
|
||||||
impl<N: NetworkZone> PeerSyncSvc<N> {
|
impl<N: NetworkZone> PeerSyncSvc<N> {
|
||||||
/// Creates a new [`PeerSyncSvc`] with a [`Receiver`](watch::Receiver) that will be updated with
|
/// Creates a new [`PeerSyncSvc`] with a [`Receiver`](watch::Receiver) that will be updated with
|
||||||
/// the highest seen sync data, this makes no guarantees about which peer will be chosen in case of a tie.
|
/// the highest seen sync data, this makes no guarantees about which peer will be chosen in case of a tie.
|
||||||
pub fn new() -> (Self, watch::Receiver<NewSyncInfo>) {
|
pub(crate) fn new() -> (Self, watch::Receiver<NewSyncInfo>) {
|
||||||
let (watch_tx, mut watch_rx) = watch::channel(NewSyncInfo {
|
let (watch_tx, mut watch_rx) = watch::channel(NewSyncInfo {
|
||||||
chain_height: 0,
|
chain_height: 0,
|
||||||
top_hash: [0; 32],
|
top_hash: [0; 32],
|
||||||
|
@ -109,9 +109,7 @@ impl<N: NetworkZone> PeerSyncSvc<N> {
|
||||||
if let Some(block_needed) = block_needed {
|
if let Some(block_needed) = block_needed {
|
||||||
// we just use `MAX_BLOCK_HEIGHT_USIZE` as the blockchain height, this only means
|
// we just use `MAX_BLOCK_HEIGHT_USIZE` as the blockchain height, this only means
|
||||||
// we don't take into account the tip blocks which are not pruned.
|
// we don't take into account the tip blocks which are not pruned.
|
||||||
self.peers
|
self.peers[peer]
|
||||||
.get(peer)
|
|
||||||
.unwrap()
|
|
||||||
.1
|
.1
|
||||||
.has_full_block(block_needed, MAX_BLOCK_HEIGHT_USIZE)
|
.has_full_block(block_needed, MAX_BLOCK_HEIGHT_USIZE)
|
||||||
} else {
|
} else {
|
||||||
|
@ -127,7 +125,7 @@ impl<N: NetworkZone> PeerSyncSvc<N> {
|
||||||
&mut self,
|
&mut self,
|
||||||
peer_id: InternalPeerID<N::Addr>,
|
peer_id: InternalPeerID<N::Addr>,
|
||||||
handle: ConnectionHandle,
|
handle: ConnectionHandle,
|
||||||
core_sync_data: CoreSyncData,
|
core_sync_data: &CoreSyncData,
|
||||||
) -> Result<(), tower::BoxError> {
|
) -> Result<(), tower::BoxError> {
|
||||||
tracing::trace!(
|
tracing::trace!(
|
||||||
"Received new core sync data from peer, top hash: {}",
|
"Received new core sync data from peer, top hash: {}",
|
||||||
|
@ -177,7 +175,7 @@ impl<N: NetworkZone> PeerSyncSvc<N> {
|
||||||
self.closed_connections.push(PeerDisconnectFut {
|
self.closed_connections.push(PeerDisconnectFut {
|
||||||
closed_fut: handle.closed(),
|
closed_fut: handle.closed(),
|
||||||
peer_id: Some(peer_id),
|
peer_id: Some(peer_id),
|
||||||
})
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
self.cumulative_difficulties
|
self.cumulative_difficulties
|
||||||
|
@ -191,11 +189,15 @@ impl<N: NetworkZone> PeerSyncSvc<N> {
|
||||||
|| self
|
|| self
|
||||||
.last_peer_in_watcher_handle
|
.last_peer_in_watcher_handle
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.is_some_and(|handle| handle.is_closed())
|
.is_some_and(ConnectionHandle::is_closed)
|
||||||
{
|
{
|
||||||
tracing::debug!(
|
tracing::debug!(
|
||||||
"Updating sync watcher channel with new highest seen cumulative difficulty: {new_cumulative_difficulty}"
|
"Updating sync watcher channel with new highest seen cumulative difficulty: {new_cumulative_difficulty}"
|
||||||
);
|
);
|
||||||
|
#[expect(
|
||||||
|
clippy::let_underscore_must_use,
|
||||||
|
reason = "dropped receivers can be ignored"
|
||||||
|
)]
|
||||||
let _ = self.new_height_watcher.send(NewSyncInfo {
|
let _ = self.new_height_watcher.send(NewSyncInfo {
|
||||||
top_hash: core_sync_data.top_id,
|
top_hash: core_sync_data.top_id,
|
||||||
chain_height: core_sync_data.current_height,
|
chain_height: core_sync_data.current_height,
|
||||||
|
@ -229,8 +231,8 @@ impl<N: NetworkZone> Service<PeerSyncRequest<N>> for PeerSyncSvc<N> {
|
||||||
block_needed,
|
block_needed,
|
||||||
))),
|
))),
|
||||||
PeerSyncRequest::IncomingCoreSyncData(peer_id, handle, sync_data) => self
|
PeerSyncRequest::IncomingCoreSyncData(peer_id, handle, sync_data) => self
|
||||||
.update_peer_sync_info(peer_id, handle, sync_data)
|
.update_peer_sync_info(peer_id, handle, &sync_data)
|
||||||
.map(|_| PeerSyncResponse::Ok),
|
.map(|()| PeerSyncResponse::Ok),
|
||||||
};
|
};
|
||||||
|
|
||||||
ready(res)
|
ready(res)
|
||||||
|
@ -414,6 +416,6 @@ mod tests {
|
||||||
assert!(
|
assert!(
|
||||||
peers.contains(&InternalPeerID::Unknown(0))
|
peers.contains(&InternalPeerID::Unknown(0))
|
||||||
&& peers.contains(&InternalPeerID::Unknown(1))
|
&& peers.contains(&InternalPeerID::Unknown(1))
|
||||||
)
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue