blinkcast/alloc.rs
1//! A channel implemented with heap allocated buffers (require `alloc` feature).
2//!
3//! This module implements a broadcast channel with a fixed-size buffer, allocated
4//! in the heap. This means that multiple senders can be used without the need to use the same reference.
5//!
6//! The channel overwrites the oldest message if the buffer is full, prioritizing the latest data.
7//!
8//! **Key Features:**
9//!
10//! * **Broadcast:** Multiple senders can send messages to multiple receivers simultaneously.
11//! * **Heap-allocated Buffers:** Ensures data storage flexibility when the application requires it.
12//! * **Fixed-size Buffer:** Provides bounded memory usage with predictable performance.
13//! * **Overwriting Behavior:** Prioritizes the latest data in scenarios where the buffer becomes full.
14//! * **Cloneable:** Both `Sender` and `Receiver` are cloneable, enabling flexible message distribution patterns.
15//!
16//! **Usage Considerations:**
17//! * Well-suited for scenarios where multiple components need to broadcast messages and the latest data takes priority.
18//! * Ideal when heap allocation is necessary or desirable.
19//! * Receivers must be fast enough to keep up with the senders and avoid losing messages due to overwriting.
20//!
21//! # Examples
22//! ```
23//! # #[cfg(not(loom))]
24//! # {
25//! use blinkcast::alloc::channel;
26//! let (sender, mut receiver) = channel::<i32>(4);
27//! sender.send(1);
28//! assert_eq!(receiver.recv(), Some(1));
29//!
30//! sender.send(2);
31//! sender.send(3);
32//!
33//! assert_eq!(receiver.recv(), Some(2));
34//!
35//! // clone the receiver
36//! let mut receiver2 = receiver.clone();
37//! assert_eq!(receiver.recv(), Some(3));
38//! assert_eq!(receiver2.recv(), Some(3));
39//! assert_eq!(receiver.recv(), None);
40//! assert_eq!(receiver2.recv(), None);
41//! # }
42//! ```
43
44extern crate alloc;
45use alloc::{boxed::Box, vec::Vec};
46
47use crate::{core_impl, unpack_data_index, AtomicUsize, Node, Ordering, ReaderData, MAX_LEN};
48
49#[cfg(not(loom))]
50use alloc::sync::Arc;
51#[cfg(loom)]
52use loom::sync::Arc;
53
54struct InnerChannel<T> {
55 buffer: Box<[Node<T>]>,
56 head: AtomicUsize,
57}
58
59impl<T: Clone> InnerChannel<T> {
60 fn new(size: usize) -> Self {
61 assert!(size <= MAX_LEN, "Exceeded the maximum length");
62
63 let mut buffer = Vec::with_capacity(size);
64 for _ in 0..size {
65 buffer.push(Default::default());
66 }
67 let buffer = buffer.into_boxed_slice();
68 Self {
69 buffer,
70 head: AtomicUsize::new(0),
71 }
72 }
73
74 fn push(&self, value: T) {
75 core_impl::push(&self.buffer, &self.head, value);
76 }
77
78 fn pop(&self, reader: &mut ReaderData) -> Option<T> {
79 core_impl::pop(&self.buffer, &self.head, reader)
80 }
81}
82
83/// The sender of the [`channel`].
84///
85/// This is a cloneable sender, so you can have multiple senders that will send to the same
86/// channel.
87///
88/// Broadcast messages sent by using the [`send`](Sender::send) method.
89///
90/// # Examples
91/// ```
92/// # #[cfg(not(loom))]
93/// # {
94/// use blinkcast::alloc::channel;
95///
96/// let (sender, mut receiver) = channel::<i32>(4);
97///
98/// sender.send(1);
99/// let sender2 = sender.clone();
100/// sender2.send(2);
101///
102/// assert_eq!(receiver.recv(), Some(1));
103/// assert_eq!(receiver.recv(), Some(2));
104/// assert_eq!(receiver.recv(), None);
105/// # }
106/// ```
107/// Or using the [`new`](Sender::new) method:
108/// ```
109/// # #[cfg(not(loom))]
110/// # {
111/// use blinkcast::alloc::Sender;
112///
113/// let sender = Sender::<i32>::new(4);
114///
115/// let mut receiver = sender.new_receiver();
116///
117/// sender.send(1);
118/// sender.send(2);
119/// assert_eq!(receiver.recv(), Some(1));
120/// assert_eq!(receiver.recv(), Some(2));
121/// assert_eq!(receiver.recv(), None);
122/// # }
123/// ```
124pub struct Sender<T> {
125 queue: Arc<InnerChannel<T>>,
126}
127
128unsafe impl<T: Clone + Send> Send for Sender<T> {}
129unsafe impl<T: Clone + Send> Sync for Sender<T> {}
130
131impl<T: Clone> Sender<T> {
132 /// Sends a message to the channel.
133 /// If the channel is full, the oldest message will be overwritten.
134 /// So the receiver must be quick or it will lose the old data.
135 pub fn send(&self, value: T) {
136 self.queue.push(value);
137 }
138
139 /// Creates a new channel with a buffer of size `N`.
140 #[allow(clippy::new_without_default)]
141 pub fn new(size: usize) -> Self {
142 Self {
143 queue: Arc::new(InnerChannel::<T>::new(size)),
144 }
145 }
146
147 /// Creates a new receiver that starts from the same point as the sender.
148 ///
149 /// # Examples
150 /// ```
151 /// # #[cfg(not(loom))]
152 /// # {
153 /// use blinkcast::alloc::Sender;
154 ///
155 /// let sender = Sender::<i32>::new(4);
156 ///
157 /// sender.send(1);
158 ///
159 /// let mut receiver = sender.new_receiver();
160 /// assert_eq!(receiver.recv(), None);
161 ///
162 /// sender.send(2);
163 /// assert_eq!(receiver.recv(), Some(2));
164 /// assert_eq!(receiver.recv(), None);
165 /// # }
166 /// ```
167 pub fn new_receiver(&self) -> Receiver<T> {
168 let head = self.queue.head.load(Ordering::Relaxed);
169 let (lap, index) = unpack_data_index(head);
170
171 Receiver {
172 queue: self.queue.clone(),
173 reader: ReaderData { index, lap },
174 }
175 }
176}
177
178impl<T> Clone for Sender<T> {
179 fn clone(&self) -> Self {
180 Self {
181 queue: self.queue.clone(),
182 }
183 }
184}
185
186/// The receiver of the [`channel`].
187///
188/// Can also be created with the [`new_receiver`](Sender::new_receiver) method of the [`Sender`].
189///
190/// This is a cloneable receiver, so you can have multiple receivers that start from the same
191/// point.
192///
193/// Broadcast messages sent by the channel are received by the [`recv`](Receiver::recv) method.
194///
195/// # Examples
196/// ```
197/// # #[cfg(not(loom))]
198/// # {
199/// use blinkcast::alloc::channel;
200/// let (sender, mut receiver) = channel::<i32>(4);
201/// sender.send(1);
202/// assert_eq!(receiver.recv(), Some(1));
203///
204/// sender.send(2);
205/// sender.send(3);
206///
207/// assert_eq!(receiver.recv(), Some(2));
208///
209/// // clone the receiver
210/// let mut receiver2 = receiver.clone();
211/// assert_eq!(receiver.recv(), Some(3));
212/// assert_eq!(receiver2.recv(), Some(3));
213/// assert_eq!(receiver.recv(), None);
214/// assert_eq!(receiver2.recv(), None);
215/// # }
216/// ```
217pub struct Receiver<T> {
218 queue: Arc<InnerChannel<T>>,
219 reader: ReaderData,
220}
221
222unsafe impl<T: Clone + Send> Send for Receiver<T> {}
223unsafe impl<T: Clone + Send> Sync for Receiver<T> {}
224
225impl<T: Clone> Receiver<T> {
226 /// Receives a message from the channel.
227 ///
228 /// If there is no message available, this method will return `None`.
229 pub fn recv(&mut self) -> Option<T> {
230 self.queue.pop(&mut self.reader)
231 }
232}
233
234impl<T: Clone> Clone for Receiver<T> {
235 fn clone(&self) -> Self {
236 Self {
237 queue: self.queue.clone(),
238 reader: self.reader.clone(),
239 }
240 }
241}
242
243/// Creates a new channel, returning the [`Sender`] and [`Receiver`] for it.
244///
245/// Both of the sender and receiver are cloneable, so you can have multiple senders and receivers.
246///
247/// Another method to create a channel is using the [`Sender::new`] and [`Sender::new_receiver`] methods.
248///
249/// # Examples
250/// ```
251/// # #[cfg(not(loom))]
252/// # {
253/// use blinkcast::alloc::channel;
254/// let (sender, mut receiver) = channel::<i32>(4);
255///
256/// sender.send(1);
257/// sender.send(2);
258///
259/// assert_eq!(receiver.recv(), Some(1));
260///
261/// let sender2 = sender.clone();
262/// sender2.send(3);
263///
264/// assert_eq!(receiver.recv(), Some(2));
265///
266/// let mut receiver2 = receiver.clone();
267/// assert_eq!(receiver.recv(), Some(3));
268/// assert_eq!(receiver2.recv(), Some(3));
269/// assert_eq!(receiver.recv(), None);
270/// assert_eq!(receiver2.recv(), None);
271/// # }
272/// ```
273pub fn channel<T: Clone>(size: usize) -> (Sender<T>, Receiver<T>) {
274 let sender = Sender::<T>::new(size);
275 let receiver = sender.new_receiver();
276 (sender, receiver)
277}