mirror of
https://github.com/Cuprate/cuprate.git
synced 2024-12-23 03:59:31 +00:00
P2P: async buffer (#109)
* impl async buffer * clippy * review comments
This commit is contained in:
parent
0158c6671a
commit
e8b20e6115
3 changed files with 256 additions and 0 deletions
14
p2p/async-buffer/Cargo.toml
Normal file
14
p2p/async-buffer/Cargo.toml
Normal file
|
@ -0,0 +1,14 @@
|
||||||
|
[package]
|
||||||
|
name = "async-buffer"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
license = "MIT"
|
||||||
|
authors = ["Boog900"]
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
thiserror = { workspace = true }
|
||||||
|
futures = { workspace = true, features = ["std"] }
|
||||||
|
pin-project = { workspace = true }
|
||||||
|
|
||||||
|
[dev-dependencies]
|
||||||
|
tokio = { workspace = true, features = ["full"] }
|
205
p2p/async-buffer/src/lib.rs
Normal file
205
p2p/async-buffer/src/lib.rs
Normal file
|
@ -0,0 +1,205 @@
|
||||||
|
//! Async Buffer
|
||||||
|
//!
|
||||||
|
//! A bounded SPSC, FIFO, async buffer that supports arbitrary weights for values.
|
||||||
|
//!
|
||||||
|
//! Weight is used to bound the channel, on creation you specify a max weight and for each value you
|
||||||
|
//! specify a weight.
|
||||||
|
use std::{
|
||||||
|
cmp::min,
|
||||||
|
future::Future,
|
||||||
|
pin::Pin,
|
||||||
|
sync::{
|
||||||
|
atomic::{AtomicUsize, Ordering},
|
||||||
|
Arc,
|
||||||
|
},
|
||||||
|
task::{Context, Poll},
|
||||||
|
};
|
||||||
|
|
||||||
|
use futures::{
|
||||||
|
channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
|
||||||
|
ready,
|
||||||
|
task::AtomicWaker,
|
||||||
|
Stream, StreamExt,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[derive(thiserror::Error, Debug, Copy, Clone, Eq, PartialEq)]
|
||||||
|
pub enum BufferError {
|
||||||
|
#[error("The buffer did not have enough capacity.")]
|
||||||
|
NotEnoughCapacity,
|
||||||
|
#[error("The other end of the buffer disconnected.")]
|
||||||
|
Disconnected,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Initializes a new buffer with the provided capacity.
|
||||||
|
///
|
||||||
|
/// The capacity inputted is not the max number of items, it is the max combined weight of all items
|
||||||
|
/// in the buffer.
|
||||||
|
///
|
||||||
|
/// It should be noted that if there are no items in the buffer then a single item of any capacity is accepted.
|
||||||
|
/// i.e. if the capacity is 5 and there are no items in the buffer then any item even if it's weight is >5 will be
|
||||||
|
/// accepted.
|
||||||
|
pub fn new_buffer<T>(max_item_weight: usize) -> (BufferAppender<T>, BufferStream<T>) {
|
||||||
|
let (tx, rx) = unbounded();
|
||||||
|
let sink_waker = Arc::new(AtomicWaker::new());
|
||||||
|
let capacity_atomic = Arc::new(AtomicUsize::new(max_item_weight));
|
||||||
|
|
||||||
|
(
|
||||||
|
BufferAppender {
|
||||||
|
queue: tx,
|
||||||
|
sink_waker: sink_waker.clone(),
|
||||||
|
capacity: capacity_atomic.clone(),
|
||||||
|
max_item_weight: capacity,
|
||||||
|
},
|
||||||
|
BufferStream {
|
||||||
|
queue: rx,
|
||||||
|
sink_waker,
|
||||||
|
capacity: capacity_atomic,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The stream side of the buffer.
|
||||||
|
pub struct BufferStream<T> {
|
||||||
|
/// The internal queue of items.
|
||||||
|
queue: UnboundedReceiver<(T, usize)>,
|
||||||
|
/// The waker for the [`BufferAppender`]
|
||||||
|
sink_waker: Arc<AtomicWaker>,
|
||||||
|
/// The current capacity of the buffer.
|
||||||
|
capacity: Arc<AtomicUsize>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Stream for BufferStream<T> {
|
||||||
|
type Item = T;
|
||||||
|
|
||||||
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
|
let Some((item, size)) = ready!(self.queue.poll_next_unpin(cx)) else {
|
||||||
|
return Poll::Ready(None);
|
||||||
|
};
|
||||||
|
|
||||||
|
// add the capacity back to the buffer.
|
||||||
|
self.capacity.fetch_add(size, Ordering::AcqRel);
|
||||||
|
// wake the sink.
|
||||||
|
self.sink_waker.wake();
|
||||||
|
|
||||||
|
Poll::Ready(Some(item))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The appender/sink side of the buffer.
|
||||||
|
pub struct BufferAppender<T> {
|
||||||
|
/// The internal queue of items.
|
||||||
|
queue: UnboundedSender<(T, usize)>,
|
||||||
|
/// Our waker.
|
||||||
|
sink_waker: Arc<AtomicWaker>,
|
||||||
|
/// The current capacity of the buffer.
|
||||||
|
capacity: Arc<AtomicUsize>,
|
||||||
|
/// The max weight of an item, equal to the total allowed weight of the buffer.
|
||||||
|
max_item_weight: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> BufferAppender<T> {
|
||||||
|
/// Returns a future that resolves when the channel has enough capacity for
|
||||||
|
/// a single message of `size_needed`.
|
||||||
|
///
|
||||||
|
/// It should be noted that if there are no items in the buffer then a single item of any capacity is accepted.
|
||||||
|
/// i.e. if the capacity is 5 and there are no items in the buffer then any item even if it's weight is >5 will be
|
||||||
|
/// accepted.
|
||||||
|
pub fn ready(&mut self, size_needed: usize) -> BufferSinkReady<'_, T> {
|
||||||
|
let size_needed = min(self.max_item_weight, size_needed);
|
||||||
|
|
||||||
|
BufferSinkReady {
|
||||||
|
sink: self,
|
||||||
|
size_needed,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Attempts to add an item to the buffer.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
/// Returns an error if there is not enough capacity or the [`BufferStream`] was dropped.
|
||||||
|
pub fn try_send(&mut self, item: T, size_needed: usize) -> Result<(), BufferError> {
|
||||||
|
let size_needed = min(self.max_item_weight, size_needed);
|
||||||
|
|
||||||
|
if self.capacity.load(Ordering::Acquire) < size_needed {
|
||||||
|
return Err(BufferError::NotEnoughCapacity);
|
||||||
|
}
|
||||||
|
|
||||||
|
let prev_size = self.capacity.fetch_sub(size_needed, Ordering::AcqRel);
|
||||||
|
|
||||||
|
// make sure we haven't wrapped the capacity around.
|
||||||
|
assert!(prev_size >= size_needed);
|
||||||
|
|
||||||
|
self.queue
|
||||||
|
.unbounded_send((item, size_needed))
|
||||||
|
.map_err(|_| BufferError::Disconnected)?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Waits for capacity in the buffer and then sends the item.
|
||||||
|
pub fn send(&mut self, item: T, size_needed: usize) -> BufferSinkSend<'_, T> {
|
||||||
|
BufferSinkSend {
|
||||||
|
ready: self.ready(size_needed),
|
||||||
|
item: Some(item),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A [`Future`] for adding an item to the buffer.
|
||||||
|
#[pin_project::pin_project]
|
||||||
|
pub struct BufferSinkSend<'a, T> {
|
||||||
|
/// A future that resolves when the channel has capacity.
|
||||||
|
#[pin]
|
||||||
|
ready: BufferSinkReady<'a, T>,
|
||||||
|
/// The item to send.
|
||||||
|
///
|
||||||
|
/// This is [`take`](Option::take)n and added to the buffer when there is enough capacity.
|
||||||
|
item: Option<T>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, T> Future for BufferSinkSend<'a, T> {
|
||||||
|
type Output = Result<(), BufferError>;
|
||||||
|
|
||||||
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
|
let mut this = self.project();
|
||||||
|
|
||||||
|
let size_needed = this.ready.size_needed;
|
||||||
|
|
||||||
|
this.ready.as_mut().poll(cx).map(|_| {
|
||||||
|
this.ready
|
||||||
|
.sink
|
||||||
|
.try_send(this.item.take().unwrap(), size_needed)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A [`Future`] for waiting for capacity in the buffer.
|
||||||
|
pub struct BufferSinkReady<'a, T> {
|
||||||
|
/// The sink side of the buffer.
|
||||||
|
sink: &'a mut BufferAppender<T>,
|
||||||
|
/// The capacity needed.
|
||||||
|
///
|
||||||
|
/// This future will wait forever if this is higher than the total availability of the buffer.
|
||||||
|
size_needed: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, T> Future for BufferSinkReady<'a, T> {
|
||||||
|
type Output = ();
|
||||||
|
|
||||||
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
|
// Check before setting the waker just in case it has capacity now,
|
||||||
|
if self.sink.capacity.load(Ordering::Acquire) >= self.size_needed {
|
||||||
|
return Poll::Ready(());
|
||||||
|
}
|
||||||
|
|
||||||
|
// set the waker
|
||||||
|
self.sink.sink_waker.register(cx.waker());
|
||||||
|
|
||||||
|
// check the capacity again to avoid a race condition that would result in lost notifications.
|
||||||
|
if self.sink.capacity.load(Ordering::Acquire) >= self.size_needed {
|
||||||
|
Poll::Ready(())
|
||||||
|
} else {
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
37
p2p/async-buffer/tests/basic.rs
Normal file
37
p2p/async-buffer/tests/basic.rs
Normal file
|
@ -0,0 +1,37 @@
|
||||||
|
use futures::{FutureExt, StreamExt};
|
||||||
|
|
||||||
|
use async_buffer::new_buffer;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn async_buffer_send_rec() {
|
||||||
|
let (mut tx, mut rx) = new_buffer(1000);
|
||||||
|
|
||||||
|
tx.send(4, 5).await.unwrap();
|
||||||
|
tx.send(8, 5).await.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(rx.next().await.unwrap(), 4);
|
||||||
|
assert_eq!(rx.next().await.unwrap(), 8);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn capacity_reached() {
|
||||||
|
let (mut tx, mut rx) = new_buffer(1000);
|
||||||
|
|
||||||
|
tx.send(4, 1000).await.unwrap();
|
||||||
|
|
||||||
|
assert!(tx.ready(1).now_or_never().is_none());
|
||||||
|
|
||||||
|
let fut = tx.ready(1);
|
||||||
|
|
||||||
|
rx.next().await;
|
||||||
|
|
||||||
|
assert!(fut.now_or_never().is_some());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn single_item_over_capacity() {
|
||||||
|
let (mut tx, mut rx) = new_buffer(1000);
|
||||||
|
tx.send(4, 1_000_000).await.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(rx.next().await.unwrap(), 4);
|
||||||
|
}
|
Loading…
Reference in a new issue