blinkcast/
static_mem.rs

1//! A channel implemented with static memory.
2//!
3//! This module implements a broadcast channel with a fixed-size buffer, without allocation.
4//! The buffer (hosted by the [`Sender`]) is stored in static memory, it can be in the stack
5//! or in global static variables, and this can be done because [`Sender::new`] is a `const fn`.
6//!
7//! When sending, we only need `&Sender`, so it can be done from multiple threads/cores as the same time.
8//!
9//! The channel overwrites the oldest message if the buffer is full, prioritizing the latest data.
10//!
11//! **Key Features:**
12//!
13//! * **Broadcast:** Multiple senders can send messages to multiple receivers simultaneously.
14//! * **Fixed-size Buffer:** Provides bounded memory usage with predictable performance.
15//! * **Overwriting Behavior:** Prioritizes the latest data in scenarios where the buffer becomes full.
16//! * **Cloneable:** the `Receiver` are cloneable, enabling flexible message distribution patterns,
17//!    the `Sender` is not cloneable, but if setup in a global static variable, it can be used from multiple locations.
18//!
19//! **Usage Considerations:**
20//! * Well-suited for scenarios where multiple components need to broadcast messages and the latest data takes priority.
21//! * Ideal when heap allocation is necessary or desirable.
22//! * Receivers must be fast enough to keep up with the senders and avoid losing messages due to overwriting.
23
24use core::mem::ManuallyDrop;
25
26use crate::{
27    core_impl, unpack_data_index, AtomicUsize, MaybeUninit, Node, Ordering, ReaderData, MAX_LEN,
28};
29
30struct InnerChannel<T, const N: usize> {
31    buffer: [Node<T>; N],
32    head: AtomicUsize,
33}
34
35impl<T: Clone + Sized, const N: usize> InnerChannel<T, N> {
36    const fn new() -> Self {
37        // Create an uninitialized array of `MaybeUninit`. The `assume_init` is
38        // safe because the type we are claiming to have initialized here is a
39        // bunch of `MaybeUninit`s, which do not require initialization
40        let mut uninit_buffer: [MaybeUninit<Node<T>>; N] =
41            unsafe { MaybeUninit::uninit().assume_init() };
42
43        let mut i = 0;
44        while i < N {
45            uninit_buffer[i] = MaybeUninit::new(Node::<T>::empty());
46            i += 1;
47        }
48
49        // Safety: we have initialized all the elements
50        // This transmute_copy will copy again, not sure if it can be optimized by the compiler
51        // but this is still an open issue (transmute doesn't work): https://github.com/rust-lang/rust/issues/61956
52        // or use `MaybeUninit::array_assume_init` when it is stabilized
53        #[repr(C)]
54        union InitializedData<T, const N: usize> {
55            uninit: ManuallyDrop<[MaybeUninit<Node<T>>; N]>,
56            init: ManuallyDrop<[Node<T>; N]>,
57        }
58        let buffer = ManuallyDrop::into_inner(unsafe {
59            InitializedData {
60                uninit: ManuallyDrop::new(uninit_buffer),
61            }
62            .init
63        });
64
65        Self {
66            buffer,
67            head: AtomicUsize::new(0),
68        }
69    }
70
71    fn push(&self, value: T) {
72        core_impl::push(&self.buffer, &self.head, value);
73    }
74
75    fn pop(&self, reader: &mut ReaderData) -> Option<T> {
76        core_impl::pop(&self.buffer, &self.head, reader)
77    }
78}
79
80/// The sender of the channel.
81///
82/// This is a the main channel component, as this is stored in static memory,
83/// The `Sender` is the owner of the memory.
84/// You can use it from multiple locations by storing it in a `static` variable.
85///
86/// `static_mem` doesn't have something like [`channel`](crate::alloc::channel) function,
87/// Because, we don't have heap to store the Sender and give you an [`Arc`](https://doc.rust-lang.org/std/sync/struct.Arc.html)
88/// to clone it. So, the user has to create the `receiver` from the `sender` manually.
89///
90/// Use [`new_receiver`](Sender::new_receiver) to create a receiver.
91/// It will start from the same point as the sender.
92///
93/// Broadcast messages sent by using the [`send`](Sender::send) method.
94///
95/// # Examples
96/// ```
97/// # #[cfg(not(loom))]
98/// # {
99/// use blinkcast::static_mem::Sender;
100///
101/// let sender = Sender::<i32, 4>::new();
102/// let mut receiver = sender.new_receiver();
103///
104/// sender.send(1);
105/// sender.send(2);
106///
107/// assert_eq!(receiver.recv(), Some(1));
108/// assert_eq!(receiver.recv(), Some(2));
109/// assert_eq!(receiver.recv(), None);
110/// # }
111/// ```
112pub struct Sender<T, const N: usize> {
113    queue: InnerChannel<T, N>,
114}
115
116unsafe impl<T: Clone + Send, const N: usize> Send for Sender<T, N> {}
117unsafe impl<T: Clone + Send, const N: usize> Sync for Sender<T, N> {}
118
119impl<T: Clone, const N: usize> Sender<T, N> {
120    /// Sends a message to the channel.
121    /// If the channel is full, the oldest message will be overwritten.
122    /// So the receiver must be quick or it will lose the old data.
123    pub fn send(&self, value: T) {
124        self.queue.push(value);
125    }
126
127    /// Creates a new channel with a buffer of size `N`.
128    pub const fn new() -> Self {
129        // TODO: use const_assert to check if N is a power of 2
130        assert!(N <= MAX_LEN, "Exceeded the maximum length");
131
132        Self {
133            queue: InnerChannel::<T, N>::new(),
134        }
135    }
136
137    /// Creates a new receiver that starts from the same point as the sender.
138    ///
139    /// # Examples
140    /// ```
141    /// # #[cfg(not(loom))]
142    /// # {
143    /// use blinkcast::static_mem::Sender;
144    ///
145    /// let sender = Sender::<i32, 4>::new();
146    ///
147    /// sender.send(1);
148    ///
149    /// let mut receiver = sender.new_receiver();
150    /// assert_eq!(receiver.recv(), None);
151    ///
152    /// sender.send(2);
153    /// assert_eq!(receiver.recv(), Some(2));
154    /// assert_eq!(receiver.recv(), None);
155    /// # }
156    /// ```
157    pub fn new_receiver(&self) -> Receiver<'_, T, N> {
158        let head = self.queue.head.load(Ordering::Relaxed);
159        let (lap, index) = unpack_data_index(head);
160
161        Receiver {
162            queue: &self.queue,
163            reader: ReaderData { index, lap },
164        }
165    }
166}
167
168impl<T: Clone, const N: usize> Default for Sender<T, N> {
169    fn default() -> Self {
170        Self::new()
171    }
172}
173
174/// The receiver of the channel.
175///
176/// Can be created with the [`new_receiver`](Sender::new_receiver) method of the [`Sender`].
177///
178/// This is a cloneable receiver, so you can have multiple receivers that start from the same
179/// point.
180///
181/// Broadcast messages sent by the channel are received by the [`recv`](Receiver::recv) method.
182///
183/// # Examples
184/// ```
185/// # #[cfg(not(loom))]
186/// # {
187/// use blinkcast::static_mem::Sender;
188///
189/// let sender = Sender::<i32, 4>::new();
190/// let mut receiver = sender.new_receiver();
191///
192/// sender.send(1);
193/// assert_eq!(receiver.recv(), Some(1));
194///
195/// sender.send(2);
196/// sender.send(3);
197///
198/// assert_eq!(receiver.recv(), Some(2));
199///
200/// // clone the receiver
201/// let mut receiver2 = receiver.clone();
202/// assert_eq!(receiver.recv(), Some(3));
203/// assert_eq!(receiver2.recv(), Some(3));
204/// assert_eq!(receiver.recv(), None);
205/// assert_eq!(receiver2.recv(), None);
206/// # }
207/// ```
208pub struct Receiver<'a, T, const N: usize> {
209    queue: &'a InnerChannel<T, N>,
210    reader: ReaderData,
211}
212
213unsafe impl<T: Clone + Send, const N: usize> Send for Receiver<'_, T, N> {}
214unsafe impl<T: Clone + Send, const N: usize> Sync for Receiver<'_, T, N> {}
215
216impl<T: Clone, const N: usize> Receiver<'_, T, N> {
217    /// Receives a message from the channel.
218    ///
219    /// If there is no message available, this method will return `None`.
220    pub fn recv(&mut self) -> Option<T> {
221        self.queue.pop(&mut self.reader)
222    }
223}
224
225impl<T: Clone, const N: usize> Clone for Receiver<'_, T, N> {
226    fn clone(&self) -> Self {
227        Self {
228            queue: self.queue,
229            reader: self.reader.clone(),
230        }
231    }
232}