blinkcast/static_mem.rs
1//! A channel implemented with static memory.
2//!
3//! This module implements a broadcast channel with a fixed-size buffer, without allocation.
4//! The buffer (hosted by the [`Sender`]) is stored in static memory, it can be in the stack
5//! or in global static variables, and this can be done because [`Sender::new`] is a `const fn`.
6//!
7//! When sending, we only need `&Sender`, so it can be done from multiple threads/cores as the same time.
8//!
9//! The channel overwrites the oldest message if the buffer is full, prioritizing the latest data.
10//!
11//! **Key Features:**
12//!
13//! * **Broadcast:** Multiple senders can send messages to multiple receivers simultaneously.
14//! * **Fixed-size Buffer:** Provides bounded memory usage with predictable performance.
15//! * **Overwriting Behavior:** Prioritizes the latest data in scenarios where the buffer becomes full.
16//! * **Cloneable:** the `Receiver` are cloneable, enabling flexible message distribution patterns,
17//! the `Sender` is not cloneable, but if setup in a global static variable, it can be used from multiple locations.
18//!
19//! **Usage Considerations:**
20//! * Well-suited for scenarios where multiple components need to broadcast messages and the latest data takes priority.
21//! * Ideal when heap allocation is necessary or desirable.
22//! * Receivers must be fast enough to keep up with the senders and avoid losing messages due to overwriting.
23
24use core::mem::ManuallyDrop;
25
26use crate::{
27 core_impl, unpack_data_index, AtomicUsize, MaybeUninit, Node, Ordering, ReaderData, MAX_LEN,
28};
29
30struct InnerChannel<T, const N: usize> {
31 buffer: [Node<T>; N],
32 head: AtomicUsize,
33}
34
35impl<T: Clone + Sized, const N: usize> InnerChannel<T, N> {
36 const fn new() -> Self {
37 // Create an uninitialized array of `MaybeUninit`. The `assume_init` is
38 // safe because the type we are claiming to have initialized here is a
39 // bunch of `MaybeUninit`s, which do not require initialization
40 let mut uninit_buffer: [MaybeUninit<Node<T>>; N] =
41 unsafe { MaybeUninit::uninit().assume_init() };
42
43 let mut i = 0;
44 while i < N {
45 uninit_buffer[i] = MaybeUninit::new(Node::<T>::empty());
46 i += 1;
47 }
48
49 // Safety: we have initialized all the elements
50 // This transmute_copy will copy again, not sure if it can be optimized by the compiler
51 // but this is still an open issue (transmute doesn't work): https://github.com/rust-lang/rust/issues/61956
52 // or use `MaybeUninit::array_assume_init` when it is stabilized
53 #[repr(C)]
54 union InitializedData<T, const N: usize> {
55 uninit: ManuallyDrop<[MaybeUninit<Node<T>>; N]>,
56 init: ManuallyDrop<[Node<T>; N]>,
57 }
58 let buffer = ManuallyDrop::into_inner(unsafe {
59 InitializedData {
60 uninit: ManuallyDrop::new(uninit_buffer),
61 }
62 .init
63 });
64
65 Self {
66 buffer,
67 head: AtomicUsize::new(0),
68 }
69 }
70
71 fn push(&self, value: T) {
72 core_impl::push(&self.buffer, &self.head, value);
73 }
74
75 fn pop(&self, reader: &mut ReaderData) -> Option<T> {
76 core_impl::pop(&self.buffer, &self.head, reader)
77 }
78}
79
80/// The sender of the channel.
81///
82/// This is a the main channel component, as this is stored in static memory,
83/// The `Sender` is the owner of the memory.
84/// You can use it from multiple locations by storing it in a `static` variable.
85///
86/// `static_mem` doesn't have something like [`channel`](crate::alloc::channel) function,
87/// Because, we don't have heap to store the Sender and give you an [`Arc`](https://doc.rust-lang.org/std/sync/struct.Arc.html)
88/// to clone it. So, the user has to create the `receiver` from the `sender` manually.
89///
90/// Use [`new_receiver`](Sender::new_receiver) to create a receiver.
91/// It will start from the same point as the sender.
92///
93/// Broadcast messages sent by using the [`send`](Sender::send) method.
94///
95/// # Examples
96/// ```
97/// # #[cfg(not(loom))]
98/// # {
99/// use blinkcast::static_mem::Sender;
100///
101/// let sender = Sender::<i32, 4>::new();
102/// let mut receiver = sender.new_receiver();
103///
104/// sender.send(1);
105/// sender.send(2);
106///
107/// assert_eq!(receiver.recv(), Some(1));
108/// assert_eq!(receiver.recv(), Some(2));
109/// assert_eq!(receiver.recv(), None);
110/// # }
111/// ```
112pub struct Sender<T, const N: usize> {
113 queue: InnerChannel<T, N>,
114}
115
116unsafe impl<T: Clone + Send, const N: usize> Send for Sender<T, N> {}
117unsafe impl<T: Clone + Send, const N: usize> Sync for Sender<T, N> {}
118
119impl<T: Clone, const N: usize> Sender<T, N> {
120 /// Sends a message to the channel.
121 /// If the channel is full, the oldest message will be overwritten.
122 /// So the receiver must be quick or it will lose the old data.
123 pub fn send(&self, value: T) {
124 self.queue.push(value);
125 }
126
127 /// Creates a new channel with a buffer of size `N`.
128 pub const fn new() -> Self {
129 // TODO: use const_assert to check if N is a power of 2
130 assert!(N <= MAX_LEN, "Exceeded the maximum length");
131
132 Self {
133 queue: InnerChannel::<T, N>::new(),
134 }
135 }
136
137 /// Creates a new receiver that starts from the same point as the sender.
138 ///
139 /// # Examples
140 /// ```
141 /// # #[cfg(not(loom))]
142 /// # {
143 /// use blinkcast::static_mem::Sender;
144 ///
145 /// let sender = Sender::<i32, 4>::new();
146 ///
147 /// sender.send(1);
148 ///
149 /// let mut receiver = sender.new_receiver();
150 /// assert_eq!(receiver.recv(), None);
151 ///
152 /// sender.send(2);
153 /// assert_eq!(receiver.recv(), Some(2));
154 /// assert_eq!(receiver.recv(), None);
155 /// # }
156 /// ```
157 pub fn new_receiver(&self) -> Receiver<'_, T, N> {
158 let head = self.queue.head.load(Ordering::Relaxed);
159 let (lap, index) = unpack_data_index(head);
160
161 Receiver {
162 queue: &self.queue,
163 reader: ReaderData { index, lap },
164 }
165 }
166}
167
168impl<T: Clone, const N: usize> Default for Sender<T, N> {
169 fn default() -> Self {
170 Self::new()
171 }
172}
173
174/// The receiver of the channel.
175///
176/// Can be created with the [`new_receiver`](Sender::new_receiver) method of the [`Sender`].
177///
178/// This is a cloneable receiver, so you can have multiple receivers that start from the same
179/// point.
180///
181/// Broadcast messages sent by the channel are received by the [`recv`](Receiver::recv) method.
182///
183/// # Examples
184/// ```
185/// # #[cfg(not(loom))]
186/// # {
187/// use blinkcast::static_mem::Sender;
188///
189/// let sender = Sender::<i32, 4>::new();
190/// let mut receiver = sender.new_receiver();
191///
192/// sender.send(1);
193/// assert_eq!(receiver.recv(), Some(1));
194///
195/// sender.send(2);
196/// sender.send(3);
197///
198/// assert_eq!(receiver.recv(), Some(2));
199///
200/// // clone the receiver
201/// let mut receiver2 = receiver.clone();
202/// assert_eq!(receiver.recv(), Some(3));
203/// assert_eq!(receiver2.recv(), Some(3));
204/// assert_eq!(receiver.recv(), None);
205/// assert_eq!(receiver2.recv(), None);
206/// # }
207/// ```
208pub struct Receiver<'a, T, const N: usize> {
209 queue: &'a InnerChannel<T, N>,
210 reader: ReaderData,
211}
212
213unsafe impl<T: Clone + Send, const N: usize> Send for Receiver<'_, T, N> {}
214unsafe impl<T: Clone + Send, const N: usize> Sync for Receiver<'_, T, N> {}
215
216impl<T: Clone, const N: usize> Receiver<'_, T, N> {
217 /// Receives a message from the channel.
218 ///
219 /// If there is no message available, this method will return `None`.
220 pub fn recv(&mut self) -> Option<T> {
221 self.queue.pop(&mut self.reader)
222 }
223}
224
225impl<T: Clone, const N: usize> Clone for Receiver<'_, T, N> {
226 fn clone(&self) -> Self {
227 Self {
228 queue: self.queue,
229 reader: self.reader.clone(),
230 }
231 }
232}