P2P: fix deadlock ()

* fix deadlock

* fix ci

* clean up + docs

* fmt

* remove extra tick

* fix typos
This commit is contained in:
Boog900 2025-03-04 21:36:27 +00:00 committed by GitHub
parent 4f3d11be1d
commit 1c04634396
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
23 changed files with 244 additions and 118 deletions
binaries/cuprated/src/txpool/dandelion
books/architecture/src/resources/cap
cryptonight/src
net
levin/src
wire/src
p2p
storage
blockchain/src/ops
database/src

View file

@ -12,7 +12,7 @@ use cuprate_dandelion_tower::{traits::StemRequest, OutboundPeer};
use cuprate_p2p::{ClientDropGuard, NetworkInterface, PeerSetRequest, PeerSetResponse};
use cuprate_p2p_core::{
client::{Client, InternalPeerID},
ClearNet, NetworkZone, PeerRequest, ProtocolRequest,
BroadcastMessage, ClearNet, NetworkZone, PeerRequest, ProtocolRequest,
};
use cuprate_wire::protocol::NewTransactions;
@ -91,17 +91,16 @@ impl<N: NetworkZone> Service<StemRequest<DandelionTx>> for StemPeerService<N> {
type Future = <Client<N> as Service<PeerRequest>>::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.poll_ready(cx)
self.0.broadcast_client().poll_ready(cx)
}
fn call(&mut self, req: StemRequest<DandelionTx>) -> Self::Future {
self.0
.call(PeerRequest::Protocol(ProtocolRequest::NewTransactions(
NewTransactions {
txs: vec![req.0 .0],
dandelionpp_fluff: false,
padding: Bytes::new(),
},
)))
.broadcast_client()
.call(BroadcastMessage::NewTransactions(NewTransactions {
txs: vec![req.0 .0],
dandelionpp_fluff: false,
padding: Bytes::new(),
}))
}
}

View file

@ -8,7 +8,7 @@ There are "synchronization primitives" that help with this, common ones being:
- [Channels](https://en.wikipedia.org/wiki/Channel_(programming))
- [Atomics](https://en.wikipedia.org/wiki/Linearizability#Primitive_atomic_instructions)
These tools are relatively easy to use in isolation, but trickier to do so when considering the entire system. It is not uncommon for _the_ bottleneck to be the [poor orchastration](https://en.wikipedia.org/wiki/Starvation_(computer_science)) of these primitives.
These tools are relatively easy to use in isolation, but trickier to do so when considering the entire system. It is not uncommon for _the_ bottleneck to be the [poor orchestration](https://en.wikipedia.org/wiki/Starvation_(computer_science)) of these primitives.
## Analogy
A common analogy for a parallel system is an intersection.

View file

@ -224,8 +224,10 @@ pub(crate) fn key_extend(key_bytes: &[u8; CN_AES_KEY_SIZE]) -> [u128; NUM_AES_RO
let w2 = w1 ^ ((pprev_key >> 64) as u32);
let w3 = w2 ^ ((pprev_key >> 96) as u32);
expanded_key[i] =
u128::from(w0) | u128::from(w1) << 32 | u128::from(w2) << 64 | u128::from(w3) << 96;
expanded_key[i] = u128::from(w0)
| (u128::from(w1) << 32)
| (u128::from(w2) << 64)
| (u128::from(w3) << 96);
w0_prev = w3;
}
@ -256,7 +258,7 @@ pub(crate) fn round_fwd(state: u128, key: u128) -> u128 {
r4 ^= CRYPTONIGHT_SBOX[768 + usize::from((state >> 88) as u8)];
let mut new_state =
u128::from(r4) << 96 | u128::from(r3) << 64 | u128::from(r2) << 32 | u128::from(r1);
(u128::from(r4) << 96) | (u128::from(r3) << 64) | (u128::from(r2) << 32) | u128::from(r1);
new_state ^= key;
new_state
}

View file

@ -35,18 +35,18 @@ pub(crate) fn variant2_shuffle_add(
let chunk1 = &mut long_state[chunk1_start];
let sum1 = chunk3_old.wrapping_add(b1) & U64_MASK;
let sum2 = (chunk3_old >> 64).wrapping_add(b1 >> 64) & U64_MASK;
*chunk1 = sum2 << 64 | sum1; // TODO remove some shifting above
*chunk1 = (sum2 << 64) | sum1; // TODO remove some shifting above
let chunk3 = &mut long_state[chunk3_start];
let sum1 = chunk2_old.wrapping_add(a) & U64_MASK;
let sum2 = (chunk2_old >> 64).wrapping_add(a >> 64) & U64_MASK;
*chunk3 = sum2 << 64 | sum1;
*chunk3 = (sum2 << 64) | sum1;
let b0 = b[0];
let chunk2 = &mut long_state[chunk2_start];
let sum1 = chunk1_old.wrapping_add(b0) & U64_MASK;
let sum2 = (chunk1_old >> 64).wrapping_add(b0 >> 64) & U64_MASK;
*chunk2 = sum2 << 64 | sum1;
*chunk2 = (sum2 << 64) | sum1;
if variant == Variant::R {
*c1 ^= chunk1_old ^ chunk2_old ^ chunk3_old;

View file

@ -401,8 +401,10 @@ pub(crate) fn variant4_random_math(
v4_random_math(code, r);
*a1 ^=
u128::from(r[2]) | u128::from(r[3]) << 32 | u128::from(r[0]) << 64 | u128::from(r[1]) << 96;
*a1 ^= u128::from(r[2])
| (u128::from(r[3]) << 32)
| (u128::from(r[0]) << 64)
| (u128::from(r[1]) << 96);
}
#[cfg(test)]

View file

@ -138,7 +138,7 @@ fn mul(a: u64, b: u64) -> u128 {
let lo = product as u64;
// swap hi and low, so this isn't just a multiply
u128::from(lo) << 64 | u128::from(hi)
(u128::from(lo) << 64) | u128::from(hi)
}
/// Original C code:
@ -153,7 +153,7 @@ fn sum_half_blocks(a: u128, b: u128) -> u128 {
let b_high = (b >> 64) as u64;
let sum_high = a_high.wrapping_add(b_high);
u128::from(sum_high) << 64 | u128::from(sum_low)
(u128::from(sum_high) << 64) | u128::from(sum_low)
}
/// Original C code:

View file

@ -243,7 +243,7 @@ pub trait LevinBody: Sized {
/// Decodes the message from the data in the header
fn decode_message<B: Buf>(
body: &mut B,
typ: MessageType,
ty: MessageType,
command: Self::Command,
) -> Result<Self, BucketError>;

View file

@ -395,10 +395,10 @@ impl LevinBody for Message {
fn decode_message<B: Buf>(
body: &mut B,
typ: MessageType,
ty: MessageType,
command: LevinCommand,
) -> Result<Self, BucketError> {
Ok(match typ {
Ok(match ty {
MessageType::Request => Self::Request(AdminRequestMessage::decode(body, command)?),
MessageType::Response => Self::Response(AdminResponseMessage::decode(body, command)?),
MessageType::Notification => Self::Protocol(ProtocolMessage::decode(body, command)?),

View file

@ -129,7 +129,7 @@ impl ChainResponse {
#[inline]
pub const fn cumulative_difficulty(&self) -> u128 {
let cumulative_difficulty = self.cumulative_difficulty_top64 as u128;
cumulative_difficulty << 64 | self.cumulative_difficulty_low64 as u128
(cumulative_difficulty << 64) | self.cumulative_difficulty_low64 as u128
}
}

View file

@ -91,7 +91,7 @@ impl DandelionConfig {
}
/// Returns the expected length of a stem.
pub fn expected_stem_length(&self) -> f64 {
pub const fn expected_stem_length(&self) -> f64 {
self.fluff_probability.recip()
}
}

View file

@ -9,7 +9,7 @@ use tokio::{
sync::{mpsc, OwnedSemaphorePermit, Semaphore},
task::JoinHandle,
};
use tokio_util::sync::PollSemaphore;
use tokio_util::sync::{PollSemaphore, PollSender};
use tower::{Service, ServiceExt};
use tracing::Instrument;
@ -31,7 +31,7 @@ mod weak;
pub use connector::{ConnectRequest, Connector};
pub use handshaker::{DoHandshakeRequest, HandshakeError, HandshakerBuilder};
pub use weak::WeakClient;
pub use weak::{WeakBroadcastClient, WeakClient};
/// An internal identifier for a given peer, will be their address if known
/// or a random u128 if not.
@ -85,7 +85,7 @@ pub struct Client<Z: NetworkZone> {
pub info: PeerInformation<Z::Addr>,
/// The channel to the [`Connection`](connection::Connection) task.
connection_tx: mpsc::Sender<connection::ConnectionTaskRequest>,
connection_tx: PollSender<connection::ConnectionTaskRequest>,
/// The [`JoinHandle`] of the spawned connection task.
connection_handle: JoinHandle<()>,
/// The [`JoinHandle`] of the spawned timeout monitor task.
@ -100,6 +100,12 @@ pub struct Client<Z: NetworkZone> {
error: SharedError<PeerError>,
}
impl<Z: NetworkZone> Drop for Client<Z> {
fn drop(&mut self) {
self.info.handle.send_close_signal();
}
}
impl<Z: NetworkZone> Client<Z> {
/// Creates a new [`Client`].
pub(crate) fn new(
@ -112,7 +118,7 @@ impl<Z: NetworkZone> Client<Z> {
) -> Self {
Self {
info,
connection_tx,
connection_tx: PollSender::new(connection_tx),
timeout_handle,
semaphore: PollSemaphore::new(semaphore),
permit: None,
@ -135,7 +141,7 @@ impl<Z: NetworkZone> Client<Z> {
pub fn downgrade(&self) -> WeakClient<Z> {
WeakClient {
info: self.info.clone(),
connection_tx: self.connection_tx.downgrade(),
connection_tx: self.connection_tx.clone(),
semaphore: self.semaphore.clone(),
permit: None,
error: self.error.clone(),
@ -158,14 +164,17 @@ impl<Z: NetworkZone> Service<PeerRequest> for Client<Z> {
return Poll::Ready(Err(err));
}
if self.permit.is_some() {
return Poll::Ready(Ok(()));
if self.permit.is_none() {
let permit = ready!(self.semaphore.poll_acquire(cx))
.expect("Client semaphore should not be closed!");
self.permit = Some(permit);
}
let permit = ready!(self.semaphore.poll_acquire(cx))
.expect("Client semaphore should not be closed!");
self.permit = Some(permit);
if ready!(self.connection_tx.poll_reserve(cx)).is_err() {
let err = self.set_err(PeerError::ClientChannelClosed);
return Poll::Ready(Err(err));
}
Poll::Ready(Ok(()))
}
@ -183,19 +192,13 @@ impl<Z: NetworkZone> Service<PeerRequest> for Client<Z> {
permit: Some(permit),
};
if let Err(e) = self.connection_tx.try_send(req) {
if let Err(req) = self.connection_tx.send_item(req) {
// The connection task could have closed between a call to `poll_ready` and the call to
// `call`, which means if we don't handle the error here the receiver would panic.
use mpsc::error::TrySendError;
self.set_err(PeerError::ClientChannelClosed);
match e {
TrySendError::Closed(req) | TrySendError::Full(req) => {
self.set_err(PeerError::ClientChannelClosed);
let resp = Err(PeerError::ClientChannelClosed.into());
drop(req.response_channel.send(resp));
}
}
let resp = Err(PeerError::ClientChannelClosed.into());
drop(req.into_inner().unwrap().response_channel.send(resp));
}
rx.into()

View file

@ -17,9 +17,9 @@ use tokio_stream::wrappers::ReceiverStream;
use cuprate_wire::{LevinCommand, Message, ProtocolMessage};
use crate::client::request_handler::PeerRequestHandler;
use crate::{
constants::{REQUEST_TIMEOUT, SENDING_TIMEOUT},
client::request_handler::PeerRequestHandler,
constants::{REQUEST_HANDLER_TIMEOUT, REQUEST_TIMEOUT, SENDING_TIMEOUT},
handles::ConnectionGuard,
AddressBook, BroadcastMessage, CoreSyncSvc, MessageID, NetworkZone, PeerError, PeerRequest,
PeerResponse, ProtocolRequestHandler, ProtocolResponse, SharedError,
@ -46,7 +46,7 @@ pub(crate) enum State {
/// The channel to send the response down.
tx: oneshot::Sender<Result<PeerResponse, tower::BoxError>>,
/// A permit for this request.
_req_permit: Option<OwnedSemaphorePermit>,
_req_permit: OwnedSemaphorePermit,
},
}
@ -141,7 +141,7 @@ where
self.send_message_to_peer(Message::Protocol(ProtocolMessage::NewFluffyBlock(block)))
.await
}
BroadcastMessage::NewTransaction(txs) => {
BroadcastMessage::NewTransactions(txs) => {
self.send_message_to_peer(Message::Protocol(ProtocolMessage::NewTransactions(txs)))
.await
}
@ -153,10 +153,17 @@ where
tracing::debug!("handling client request, id: {:?}", req.request.id());
if req.request.needs_response() {
assert!(
!matches!(self.state, State::WaitingForResponse { .. }),
"cannot handle more than 1 request at the same time"
);
self.state = State::WaitingForResponse {
request_id: req.request.id(),
tx: req.response_channel,
_req_permit: req.permit,
_req_permit: req
.permit
.expect("Client request should have a permit if a response is needed"),
};
self.send_message_to_peer(req.request.into()).await?;
@ -165,7 +172,7 @@ where
return Ok(());
}
// INVARIANT: This function cannot exit early without sending a response back down the
// INVARIANT: From now this function cannot exit early without sending a response back down the
// response channel.
let res = self.send_message_to_peer(req.request.into()).await;
@ -188,7 +195,15 @@ where
async fn handle_peer_request(&mut self, req: PeerRequest) -> Result<(), PeerError> {
tracing::debug!("Received peer request: {:?}", req.id());
let res = self.peer_request_handler.handle_peer_request(req).await?;
let res = timeout(
REQUEST_HANDLER_TIMEOUT,
self.peer_request_handler.handle_peer_request(req),
)
.await
.map_err(|_| {
tracing::warn!("Timed-out handling peer request, closing connection.");
PeerError::TimedOut
})??;
// This will be an error if a response does not need to be sent
if let Ok(res) = res.try_into() {
@ -249,6 +264,10 @@ where
tokio::select! {
biased;
() = self.connection_guard.should_shutdown() => {
tracing::debug!("connection guard has shutdown, shutting down connection.");
Err(PeerError::ConnectionClosed)
}
broadcast_req = self.broadcast_stream.next() => {
if let Some(broadcast_req) = broadcast_req {
self.handle_client_broadcast(broadcast_req).await
@ -282,6 +301,10 @@ where
tokio::select! {
biased;
() = self.connection_guard.should_shutdown() => {
tracing::debug!("connection guard has shutdown, shutting down connection.");
Err(PeerError::ConnectionClosed)
}
() = self.request_timeout.as_mut().expect("Request timeout was not set!") => {
Err(PeerError::ClientChannelClosed)
}
@ -292,11 +315,19 @@ where
Err(PeerError::ClientChannelClosed)
}
}
// We don't wait for client requests as we are already handling one.
client_req = self.client_rx.next() => {
// Although we can only handle 1 request from the client at a time, this channel is also used
// for specific broadcasts to this peer so we need to handle those here as well.
if let Some(client_req) = client_req {
self.handle_client_request(client_req).await
} else {
Err(PeerError::ClientChannelClosed)
}
},
peer_message = stream.next() => {
if let Some(peer_message) = peer_message {
self.handle_potential_response(peer_message?).await
}else {
} else {
Err(PeerError::ClientChannelClosed)
}
},
@ -331,11 +362,6 @@ where
}
loop {
if self.connection_guard.should_shutdown() {
tracing::debug!("connection guard has shutdown, shutting down connection.");
return self.shutdown(PeerError::ConnectionClosed);
}
let res = match self.state {
State::WaitingForRequest => self.state_waiting_for_request(&mut stream).await,
State::WaitingForResponse { .. } => {

View file

@ -36,8 +36,8 @@ use crate::{
timeout_monitor::connection_timeout_monitor_task, Client, InternalPeerID, PeerInformation,
},
constants::{
HANDSHAKE_TIMEOUT, MAX_EAGER_PROTOCOL_MESSAGES, MAX_PEERS_IN_PEER_LIST_MESSAGE,
PING_TIMEOUT,
CLIENT_QUEUE_SIZE, HANDSHAKE_TIMEOUT, MAX_EAGER_PROTOCOL_MESSAGES,
MAX_PEERS_IN_PEER_LIST_MESSAGE, PING_TIMEOUT,
},
handles::HandleBuilder,
AddressBook, AddressBookRequest, AddressBookResponse, BroadcastMessage, ConnectionDirection,
@ -448,7 +448,7 @@ where
// Set up the connection data.
let error_slot = SharedError::new();
let (connection_tx, client_rx) = mpsc::channel(1);
let (connection_tx, client_rx) = mpsc::channel(CLIENT_QUEUE_SIZE);
let info = PeerInformation {
id: addr,

View file

@ -54,7 +54,13 @@ where
interval.tick().await;
loop {
interval.tick().await;
tokio::select! {
() = peer_information.handle.closed() => {
tracing::debug!("Closing timeout monitor, connection disconnected.");
return Ok(());
}
_ = interval.tick() => ()
}
tracing::trace!("timeout monitor tick.");

View file

@ -1,15 +1,15 @@
use std::task::{ready, Context, Poll};
use futures::channel::oneshot;
use tokio::sync::{mpsc, OwnedSemaphorePermit};
use tokio_util::sync::PollSemaphore;
use tokio::sync::OwnedSemaphorePermit;
use tokio_util::sync::{PollSemaphore, PollSender};
use tower::Service;
use cuprate_helper::asynch::InfallibleOneshotReceiver;
use crate::{
client::{connection, PeerInformation},
NetworkZone, PeerError, PeerRequest, PeerResponse, SharedError,
BroadcastMessage, NetworkZone, PeerError, PeerRequest, PeerResponse, SharedError,
};
/// A weak handle to a [`Client`](super::Client).
@ -20,7 +20,7 @@ pub struct WeakClient<N: NetworkZone> {
pub info: PeerInformation<N::Addr>,
/// The channel to the [`Connection`](connection::Connection) task.
pub(super) connection_tx: mpsc::WeakSender<connection::ConnectionTaskRequest>,
pub(super) connection_tx: PollSender<connection::ConnectionTaskRequest>,
/// The semaphore that limits the requests sent to the peer.
pub(super) semaphore: PollSemaphore,
@ -41,6 +41,13 @@ impl<N: NetworkZone> WeakClient<N> {
}
.into()
}
/// Create a [`WeakBroadcastClient`] from this [`WeakClient`].
///
/// See the docs for [`WeakBroadcastClient`] for what this type can do.
pub fn broadcast_client(&mut self) -> WeakBroadcastClient<'_, N> {
WeakBroadcastClient(self)
}
}
impl<Z: NetworkZone> Service<PeerRequest> for WeakClient<Z> {
@ -53,24 +60,21 @@ impl<Z: NetworkZone> Service<PeerRequest> for WeakClient<Z> {
return Poll::Ready(Err(err.to_string().into()));
}
if self.connection_tx.strong_count() == 0 {
if self.permit.is_none() {
let permit = ready!(self.semaphore.poll_acquire(cx))
.expect("Client semaphore should not be closed!");
self.permit = Some(permit);
}
if ready!(self.connection_tx.poll_reserve(cx)).is_err() {
let err = self.set_err(PeerError::ClientChannelClosed);
return Poll::Ready(Err(err));
}
if self.permit.is_some() {
return Poll::Ready(Ok(()));
}
let permit = ready!(self.semaphore.poll_acquire(cx))
.expect("Client semaphore should not be closed!");
self.permit = Some(permit);
Poll::Ready(Ok(()))
}
#[expect(clippy::significant_drop_tightening)]
fn call(&mut self, request: PeerRequest) -> Self::Future {
let permit = self
.permit
@ -84,29 +88,76 @@ impl<Z: NetworkZone> Service<PeerRequest> for WeakClient<Z> {
permit: Some(permit),
};
match self.connection_tx.upgrade() {
None => {
self.set_err(PeerError::ClientChannelClosed);
if let Err(req) = self.connection_tx.send_item(req) {
// The connection task could have closed between a call to `poll_ready` and the call to
// `call`, which means if we don't handle the error here the receiver would panic.
self.set_err(PeerError::ClientChannelClosed);
let resp = Err(PeerError::ClientChannelClosed.into());
drop(req.response_channel.send(resp));
}
Some(sender) => {
if let Err(e) = sender.try_send(req) {
// The connection task could have closed between a call to `poll_ready` and the call to
// `call`, which means if we don't handle the error here the receiver would panic.
use mpsc::error::TrySendError;
match e {
TrySendError::Closed(req) | TrySendError::Full(req) => {
self.set_err(PeerError::ClientChannelClosed);
let resp = Err(PeerError::ClientChannelClosed.into());
drop(req.response_channel.send(resp));
}
}
}
}
let resp = Err(PeerError::ClientChannelClosed.into());
drop(req.into_inner().unwrap().response_channel.send(resp));
}
rx.into()
}
}
/// A client used to send [`BroadcastMessage`]s directly to a single peer, although these messages
/// can be sent using a [`WeakClient`] or [`Client`](super::Client), using this client type allows
/// bypassing the single request being handled at a time.
///
/// This means that if another [`WeakClient`] has a request in progress [`WeakBroadcastClient`] can
/// still send messages and does not need to wait for the other [`WeakClient`] to finish.
///
/// A thing to note is that a call to [`WeakBroadcastClient::poll_ready`] will still reserve a slot
/// in the queue, this should be kept in mind as many [`WeakBroadcastClient`]s calling [`WeakBroadcastClient::poll_ready`]
/// without [`WeakBroadcastClient::call`] will stop other [`WeakBroadcastClient`]s & the other types
/// of clients.
///
/// This type is kept in state with the [`WeakClient`] it was produced from, allowing you to do this:
///
/// ```rust, ignore
/// weak_client.broadcast_client().poll_ready(cx)
///
/// weak_client.broadcast_client().call(req)
/// ```
pub struct WeakBroadcastClient<'a, N: NetworkZone>(&'a mut WeakClient<N>);
impl<N: NetworkZone> Service<BroadcastMessage> for WeakBroadcastClient<'_, N> {
type Response = PeerResponse;
type Error = tower::BoxError;
type Future = InfallibleOneshotReceiver<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.permit.take();
if let Some(err) = self.0.error.try_get_err() {
return Poll::Ready(Err(err.to_string().into()));
}
if ready!(self.0.connection_tx.poll_reserve(cx)).is_err() {
let err = self.0.set_err(PeerError::ClientChannelClosed);
return Poll::Ready(Err(err));
}
Poll::Ready(Ok(()))
}
fn call(&mut self, request: BroadcastMessage) -> Self::Future {
let (tx, rx) = oneshot::channel();
let req = connection::ConnectionTaskRequest {
response_channel: tx,
request: request.into(),
// We don't need a permit as we only accept `BroadcastMessage`, which does not require a response.
permit: None,
};
if let Err(req) = self.0.connection_tx.send_item(req) {
// The connection task could have closed between a call to `poll_ready` and the call to
// `call`, which means if we don't handle the error here the receiver would panic.
self.0.set_err(PeerError::ClientChannelClosed);
let resp = Err(PeerError::ClientChannelClosed.into());
drop(req.into_inner().unwrap().response_channel.send(resp));
}
rx.into()

View file

@ -5,6 +5,10 @@ use std::time::Duration;
/// The request timeout - the time we give a peer to respond to a request.
pub(crate) const REQUEST_TIMEOUT: Duration = Duration::from_secs(60);
/// The timeout put on the given peer request handler to prevent the connection task from getting stuck
/// if handling a request takes too long.
pub(crate) const REQUEST_HANDLER_TIMEOUT: Duration = Duration::from_secs(10);
/// The timeout used when sending messages to a peer.
///
/// TODO: Make this configurable?
@ -28,6 +32,9 @@ pub(crate) const TIMEOUT_INTERVAL: Duration = Duration::from_secs(61);
/// it is not safe to keep too many of these messages around for long.
pub(crate) const MAX_EAGER_PROTOCOL_MESSAGES: usize = 1;
/// The maximum amount of requests allowed in the queue to the connection task.
pub(crate) const CLIENT_QUEUE_SIZE: usize = 5;
/// A timeout put on pings during handshakes.
///
/// When we receive an inbound connection we open an outbound connection to the node and send a ping message

View file

@ -60,8 +60,8 @@ pub struct ConnectionGuard {
impl ConnectionGuard {
/// Checks if we should close the connection.
pub fn should_shutdown(&self) -> bool {
self.token.is_cancelled()
pub fn should_shutdown(&self) -> WaitForCancellationFutureOwned {
self.token.clone().cancelled_owned()
}
/// Tell the corresponding [`ConnectionHandle`]s that this connection is closed.
///

View file

@ -57,7 +57,7 @@ pub enum MessageID {
pub enum BroadcastMessage {
NewFluffyBlock(NewFluffyBlock),
NewTransaction(NewTransactions),
NewTransactions(NewTransactions),
}
#[derive(Debug, Clone)]

View file

@ -3,7 +3,7 @@
use cuprate_wire::{Message, ProtocolMessage};
use crate::{PeerRequest, PeerResponse, ProtocolRequest, ProtocolResponse};
use crate::{BroadcastMessage, PeerRequest, PeerResponse, ProtocolRequest, ProtocolResponse};
#[derive(Debug)]
pub struct MessageConversionError;
@ -119,3 +119,32 @@ impl TryFrom<PeerResponse> for Message {
})
}
}
impl TryFrom<PeerRequest> for BroadcastMessage {
type Error = MessageConversionError;
fn try_from(value: PeerRequest) -> Result<Self, Self::Error> {
match value {
PeerRequest::Protocol(ProtocolRequest::NewTransactions(txs)) => {
Ok(Self::NewTransactions(txs))
}
PeerRequest::Protocol(ProtocolRequest::NewFluffyBlock(block)) => {
Ok(Self::NewFluffyBlock(block))
}
_ => Err(MessageConversionError),
}
}
}
impl From<BroadcastMessage> for PeerRequest {
fn from(value: BroadcastMessage) -> Self {
match value {
BroadcastMessage::NewTransactions(txs) => {
Self::Protocol(ProtocolRequest::NewTransactions(txs))
}
BroadcastMessage::NewFluffyBlock(block) => {
Self::Protocol(ProtocolRequest::NewFluffyBlock(block))
}
}
}
}

View file

@ -2,6 +2,7 @@
use std::{sync::Arc, time::Duration};
use futures::FutureExt;
use tokio::sync::Semaphore;
use cuprate_p2p_core::handles::HandleBuilder;
@ -19,7 +20,7 @@ fn send_ban_signal() {
assert_eq!(ban_time.0, Duration::from_secs(300));
connection_handle.send_close_signal();
assert!(guard.should_shutdown());
assert!(guard.should_shutdown().now_or_never().is_some());
guard.connection_closed();
assert!(connection_handle.is_closed());
@ -41,7 +42,7 @@ fn multiple_ban_signals() {
assert_eq!(ban_time.0, Duration::from_secs(300));
connection_handle.send_close_signal();
assert!(guard.should_shutdown());
assert!(guard.should_shutdown().now_or_never().is_some());
guard.connection_closed();
assert!(connection_handle.is_closed());

View file

@ -334,7 +334,7 @@ impl<N: NetworkZone> Stream for BroadcastMessageStream<N> {
txs.txs.len()
);
// no need to poll next_flush as we are ready now.
Poll::Ready(Some(BroadcastMessage::NewTransaction(txs)))
Poll::Ready(Some(BroadcastMessage::NewTransactions(txs)))
} else {
tracing::trace!("Diffusion flush timer expired but no txs to diffuse");
// poll next_flush now to register the waker with it.
@ -459,7 +459,7 @@ mod tests {
.unwrap();
let match_tx = |mes, txs| match mes {
BroadcastMessage::NewTransaction(tx) => assert_eq!(tx.txs.as_slice(), txs),
BroadcastMessage::NewTransactions(tx) => assert_eq!(tx.txs.as_slice(), txs),
BroadcastMessage::NewFluffyBlock(_) => panic!("Block broadcast?"),
};
@ -521,7 +521,7 @@ mod tests {
.unwrap();
let match_tx = |mes, txs| match mes {
BroadcastMessage::NewTransaction(tx) => assert_eq!(tx.txs.as_slice(), txs),
BroadcastMessage::NewTransactions(tx) => assert_eq!(tx.txs.as_slice(), txs),
BroadcastMessage::NewFluffyBlock(_) => panic!("Block broadcast?"),
};

View file

@ -7,8 +7,8 @@
/// Generate documentation for the required `# Error` section.
macro_rules! doc_error {
() => {
r#"# Errors
This function returns [`cuprate_database::RuntimeError::KeyNotFound`] if the input (if applicable) doesn't exist or other `RuntimeError`'s on database errors."#
r"# Errors
This function returns [`cuprate_database::RuntimeError::KeyNotFound`] if the input (if applicable) doesn't exist or other `RuntimeError`'s on database errors."
};
}
pub(super) use doc_error;
@ -17,7 +17,7 @@ pub(super) use doc_error;
/// that should be called directly with caution.
macro_rules! doc_add_block_inner_invariant {
() => {
r#"# ⚠️ Invariant ⚠️
r"# ⚠️ Invariant ⚠️
This function mainly exists to be used internally by the parent function [`crate::ops::block::add_block`].
`add_block()` makes sure all data related to the input is mutated, while
@ -27,7 +27,7 @@ This is usually undesired - although this function is still available to call di
When calling this function, ensure that either:
1. This effect (incomplete database mutation) is what is desired, or that...
2. ...the other tables will also be mutated to a correct state"#
2. ...the other tables will also be mutated to a correct state"
};
}
pub(super) use doc_add_block_inner_invariant;
@ -39,7 +39,7 @@ pub(super) use doc_add_block_inner_invariant;
/// it's not worth the effort to reduce the duplication.
macro_rules! doc_add_alt_block_inner_invariant {
() => {
r#"# ⚠️ Invariant ⚠️
r"# ⚠️ Invariant ⚠️
This function mainly exists to be used internally by the parent function [`crate::ops::alt_block::add_alt_block`].
`add_alt_block()` makes sure all data related to the input is mutated, while
@ -49,7 +49,7 @@ This is usually undesired - although this function is still available to call di
When calling this function, ensure that either:
1. This effect (incomplete database mutation) is what is desired, or that...
2. ...the other tables will also be mutated to a correct state"#
2. ...the other tables will also be mutated to a correct state"
};
}
pub(super) use doc_add_alt_block_inner_invariant;

View file

@ -179,11 +179,11 @@ pub trait Env: Sized {
/// opening/creating tables.
macro_rules! doc_heed_create_db_invariant {
() => {
r#"The first time you open/create tables, you _must_ use [`EnvInner::create_db`]
r"The first time you open/create tables, you _must_ use [`EnvInner::create_db`]
to set the proper flags / [`Key`](crate::Key) comparison for the `heed` backend.
Subsequent table opens will follow the flags/ordering, but only if
[`EnvInner::create_db`] was the _first_ function to open/create it."#
[`EnvInner::create_db`] was the _first_ function to open/create it."
};
}