blinkcast/
alloc.rs

1//! A channel implemented with heap allocated buffers (require `alloc` feature).
2//!
3//! This module implements a broadcast channel with a fixed-size buffer, allocated
4//! in the heap. This means that multiple senders can be used without the need to use the same reference.
5//!
6//! The channel overwrites the oldest message if the buffer is full, prioritizing the latest data.
7//!
8//! **Key Features:**
9//!
10//! * **Broadcast:** Multiple senders can send messages to multiple receivers simultaneously.
11//! * **Heap-allocated Buffers:** Ensures data storage flexibility when the application requires it.
12//! * **Fixed-size Buffer:** Provides bounded memory usage with predictable performance.
13//! * **Overwriting Behavior:** Prioritizes the latest data in scenarios where the buffer becomes full.
14//! * **Cloneable:** Both `Sender` and `Receiver` are cloneable, enabling flexible message distribution patterns.
15//!
16//! **Usage Considerations:**
17//! * Well-suited for scenarios where multiple components need to broadcast messages and the latest data takes priority.
18//! * Ideal when heap allocation is necessary or desirable.
19//! * Receivers must be fast enough to keep up with the senders and avoid losing messages due to overwriting.
20//!
21//! # Examples
22//! ```
23//! # #[cfg(not(loom))]
24//! # {
25//! use blinkcast::alloc::channel;
26//! let (sender, mut receiver) = channel::<i32>(4);
27//! sender.send(1);
28//! assert_eq!(receiver.recv(), Some(1));
29//!
30//! sender.send(2);
31//! sender.send(3);
32//!
33//! assert_eq!(receiver.recv(), Some(2));
34//!
35//! // clone the receiver
36//! let mut receiver2 = receiver.clone();
37//! assert_eq!(receiver.recv(), Some(3));
38//! assert_eq!(receiver2.recv(), Some(3));
39//! assert_eq!(receiver.recv(), None);
40//! assert_eq!(receiver2.recv(), None);
41//! # }
42//! ```
43
44extern crate alloc;
45use alloc::{boxed::Box, vec::Vec};
46
47use crate::{core_impl, unpack_data_index, AtomicUsize, Node, Ordering, ReaderData, MAX_LEN};
48
49#[cfg(not(loom))]
50use alloc::sync::Arc;
51#[cfg(loom)]
52use loom::sync::Arc;
53
54struct InnerChannel<T> {
55    buffer: Box<[Node<T>]>,
56    head: AtomicUsize,
57}
58
59impl<T: Clone> InnerChannel<T> {
60    fn new(size: usize) -> Self {
61        assert!(size <= MAX_LEN, "Exceeded the maximum length");
62
63        let mut buffer = Vec::with_capacity(size);
64        for _ in 0..size {
65            buffer.push(Default::default());
66        }
67        let buffer = buffer.into_boxed_slice();
68        Self {
69            buffer,
70            head: AtomicUsize::new(0),
71        }
72    }
73
74    fn push(&self, value: T) {
75        core_impl::push(&self.buffer, &self.head, value);
76    }
77
78    fn pop(&self, reader: &mut ReaderData) -> Option<T> {
79        core_impl::pop(&self.buffer, &self.head, reader)
80    }
81}
82
83/// The sender of the [`channel`].
84///
85/// This is a cloneable sender, so you can have multiple senders that will send to the same
86/// channel.
87///
88/// Broadcast messages sent by using the [`send`](Sender::send) method.
89///
90/// # Examples
91/// ```
92/// # #[cfg(not(loom))]
93/// # {
94/// use blinkcast::alloc::channel;
95///
96/// let (sender, mut receiver) = channel::<i32>(4);
97///
98/// sender.send(1);
99/// let sender2 = sender.clone();
100/// sender2.send(2);
101///
102/// assert_eq!(receiver.recv(), Some(1));
103/// assert_eq!(receiver.recv(), Some(2));
104/// assert_eq!(receiver.recv(), None);
105/// # }
106/// ```
107/// Or using the [`new`](Sender::new) method:
108/// ```
109/// # #[cfg(not(loom))]
110/// # {
111/// use blinkcast::alloc::Sender;
112///
113/// let sender = Sender::<i32>::new(4);
114///
115/// let mut receiver = sender.new_receiver();
116///
117/// sender.send(1);
118/// sender.send(2);
119/// assert_eq!(receiver.recv(), Some(1));
120/// assert_eq!(receiver.recv(), Some(2));
121/// assert_eq!(receiver.recv(), None);
122/// # }
123/// ```
124pub struct Sender<T> {
125    queue: Arc<InnerChannel<T>>,
126}
127
128unsafe impl<T: Clone + Send> Send for Sender<T> {}
129unsafe impl<T: Clone + Send> Sync for Sender<T> {}
130
131impl<T: Clone> Sender<T> {
132    /// Sends a message to the channel.
133    /// If the channel is full, the oldest message will be overwritten.
134    /// So the receiver must be quick or it will lose the old data.
135    pub fn send(&self, value: T) {
136        self.queue.push(value);
137    }
138
139    /// Creates a new channel with a buffer of size `N`.
140    #[allow(clippy::new_without_default)]
141    pub fn new(size: usize) -> Self {
142        Self {
143            queue: Arc::new(InnerChannel::<T>::new(size)),
144        }
145    }
146
147    /// Creates a new receiver that starts from the same point as the sender.
148    ///
149    /// # Examples
150    /// ```
151    /// # #[cfg(not(loom))]
152    /// # {
153    /// use blinkcast::alloc::Sender;
154    ///
155    /// let sender = Sender::<i32>::new(4);
156    ///
157    /// sender.send(1);
158    ///
159    /// let mut receiver = sender.new_receiver();
160    /// assert_eq!(receiver.recv(), None);
161    ///
162    /// sender.send(2);
163    /// assert_eq!(receiver.recv(), Some(2));
164    /// assert_eq!(receiver.recv(), None);
165    /// # }
166    /// ```
167    pub fn new_receiver(&self) -> Receiver<T> {
168        let head = self.queue.head.load(Ordering::Relaxed);
169        let (lap, index) = unpack_data_index(head);
170
171        Receiver {
172            queue: self.queue.clone(),
173            reader: ReaderData { index, lap },
174        }
175    }
176}
177
178impl<T> Clone for Sender<T> {
179    fn clone(&self) -> Self {
180        Self {
181            queue: self.queue.clone(),
182        }
183    }
184}
185
186/// The receiver of the [`channel`].
187///
188/// Can also be created with the [`new_receiver`](Sender::new_receiver) method of the [`Sender`].
189///
190/// This is a cloneable receiver, so you can have multiple receivers that start from the same
191/// point.
192///
193/// Broadcast messages sent by the channel are received by the [`recv`](Receiver::recv) method.
194///
195/// # Examples
196/// ```
197/// # #[cfg(not(loom))]
198/// # {
199/// use blinkcast::alloc::channel;
200/// let (sender, mut receiver) = channel::<i32>(4);
201/// sender.send(1);
202/// assert_eq!(receiver.recv(), Some(1));
203///
204/// sender.send(2);
205/// sender.send(3);
206///
207/// assert_eq!(receiver.recv(), Some(2));
208///
209/// // clone the receiver
210/// let mut receiver2 = receiver.clone();
211/// assert_eq!(receiver.recv(), Some(3));
212/// assert_eq!(receiver2.recv(), Some(3));
213/// assert_eq!(receiver.recv(), None);
214/// assert_eq!(receiver2.recv(), None);
215/// # }
216/// ```
217pub struct Receiver<T> {
218    queue: Arc<InnerChannel<T>>,
219    reader: ReaderData,
220}
221
222unsafe impl<T: Clone + Send> Send for Receiver<T> {}
223unsafe impl<T: Clone + Send> Sync for Receiver<T> {}
224
225impl<T: Clone> Receiver<T> {
226    /// Receives a message from the channel.
227    ///
228    /// If there is no message available, this method will return `None`.
229    pub fn recv(&mut self) -> Option<T> {
230        self.queue.pop(&mut self.reader)
231    }
232}
233
234impl<T: Clone> Clone for Receiver<T> {
235    fn clone(&self) -> Self {
236        Self {
237            queue: self.queue.clone(),
238            reader: self.reader.clone(),
239        }
240    }
241}
242
243/// Creates a new channel, returning the [`Sender`] and [`Receiver`] for it.
244///
245/// Both of the sender and receiver are cloneable, so you can have multiple senders and receivers.
246///
247/// Another method to create a channel is using the [`Sender::new`] and [`Sender::new_receiver`] methods.
248///
249/// # Examples
250/// ```
251/// # #[cfg(not(loom))]
252/// # {
253/// use blinkcast::alloc::channel;
254/// let (sender, mut receiver) = channel::<i32>(4);
255///
256/// sender.send(1);
257/// sender.send(2);
258///
259/// assert_eq!(receiver.recv(), Some(1));
260///
261/// let sender2 = sender.clone();
262/// sender2.send(3);
263///
264/// assert_eq!(receiver.recv(), Some(2));
265///
266/// let mut receiver2 = receiver.clone();
267/// assert_eq!(receiver.recv(), Some(3));
268/// assert_eq!(receiver2.recv(), Some(3));
269/// assert_eq!(receiver.recv(), None);
270/// assert_eq!(receiver2.recv(), None);
271/// # }
272/// ```
273pub fn channel<T: Clone>(size: usize) -> (Sender<T>, Receiver<T>) {
274    let sender = Sender::<T>::new(size);
275    let receiver = sender.new_receiver();
276    (sender, receiver)
277}