review comments

This commit is contained in:
Boog900 2024-05-22 22:22:59 +01:00
parent 73041b9fe1
commit e14ca2b846
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
2 changed files with 11 additions and 14 deletions

10
Cargo.lock generated
View file

@ -98,16 +98,6 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "async-buffer"
version = "0.1.0"
dependencies = [
"futures",
"pin-project",
"thiserror",
"tokio",
]
[[package]]
name = "async-lock"
version = "3.3.0"

View file

@ -1,6 +1,9 @@
//! Async Buffer
//!
//! A bounded SPSC, FIFO, async buffer that supports arbitrary weights for values.
//!
//! Weight is used to bound the channel, on creation you specify a max weight and for each value you
//! specify a weight.
use std::{
cmp::min,
future::Future,
@ -35,10 +38,10 @@ pub enum BufferError {
/// It should be noted that if there are no items in the buffer then a single item of any capacity is accepted.
/// i.e. if the capacity is 5 and there are no items in the buffer then any item even if it's weight is >5 will be
/// accepted.
pub fn new_buffer<T>(capacity: usize) -> (BufferAppender<T>, BufferStream<T>) {
pub fn new_buffer<T>(max_item_weight: usize) -> (BufferAppender<T>, BufferStream<T>) {
let (tx, rx) = unbounded();
let sink_waker = Arc::new(AtomicWaker::new());
let capacity_atomic = Arc::new(AtomicUsize::new(capacity));
let capacity_atomic = Arc::new(AtomicUsize::new(max_item_weight));
(
BufferAppender {
@ -110,8 +113,10 @@ impl<T> BufferAppender<T> {
}
}
/// Attempts to add an item to the buffer returning an error if there is not enough capacity or
/// the [`BufferStream`] was dropped.
/// Attempts to add an item to the buffer.
///
/// # Errors
/// Returns an error if there is not enough capacity or the [`BufferStream`] was dropped.
pub fn try_send(&mut self, item: T, size_needed: usize) -> Result<(), BufferError> {
let size_needed = min(self.max_item_weight, size_needed);
@ -147,6 +152,8 @@ pub struct BufferSinkSend<'a, T> {
#[pin]
ready: BufferSinkReady<'a, T>,
/// The item to send.
///
/// This is [`take`](Option::take)n and added to the buffer when there is enough capacity.
item: Option<T>,
}