blinkcast/
lib.rs

1//! Fast, Bounded, Lossy broadcast channel with `no_std` support.
2//! This is implemented with ring buffer and atomic operations, it may be considered lock-free,
3//! as we don't use `Lock` premitive, but the implementation may spin waiting for a contending
4//! writer/reader to finish accessing a specific node. Its very rare, but
5//! maybe I won't call it `lock-free` in the strict sense.
6//!
7//! The API of the `blinkcast` is similar to that of the [`std::sync::mpsc`](https://doc.rust-lang.org/std/sync/mpsc/) channels.
8//! However, there are some differences:
9//!
10//! - It allows for multiple consumers (receivers) and multiple prodocuers (senders).
11//! - The channel broadcasts every send to every consumer.
12//! - Lossy, the sender will overwrite old data, so receivers must be quick or they will lose the old data (don'
13//! t blink).
14//! - Implemented for `no_std` environments.
15//!
16//! The data sent must implment `Clone`, because it will be kept in the buffer, and readers can read it multiple times.
17//!
18//! The original object will remain in the buffer until its overwritten, at that point it will be dropped.
19//! Thus be careful if the value is a large allocation for example big `Arc`. One of the clones (original) will
20//! be kept by the buffer and will result in a delayed deallocation if that was not expected by the user.
21//! See [issue #1](https://github.com/Amjad50/blinkcast/issues/1)
22//!
23//! # Example
24//! Single sender multiple receivers
25//! ```
26//! # #[cfg(not(loom))]
27//! # {
28//! use blinkcast::alloc::channel;
29//!
30//! let (sender, mut receiver1) = channel::<i32>(4);
31//! sender.send(1);
32//! sender.send(2);
33//!
34//! let mut receiver2 = receiver1.clone();
35//!
36//! assert_eq!(receiver1.recv(), Some(1));
37//! assert_eq!(receiver1.recv(), Some(2));
38//! assert_eq!(receiver1.recv(), None);
39//!
40//! assert_eq!(receiver2.recv(), Some(1));
41//! assert_eq!(receiver2.recv(), Some(2));
42//! assert_eq!(receiver2.recv(), None);
43//! # }
44//! ```
45//! Multiple senders multiple receivers
46//! ```
47//! # #[cfg(not(loom))]
48//! # {
49//! use blinkcast::alloc::channel;
50//! use std::thread;
51//! let (sender1, mut receiver1) = channel::<i32>(100);
52//! let sender2 = sender1.clone();
53//!
54//! let t1 = thread::spawn(move || {
55//!     for i in 0..50 {
56//!         sender1.send(i);
57//!     }
58//! });
59//! let t2 = thread::spawn(move || {
60//!     for i in 0..50 {
61//!         sender2.send(i);
62//!     }
63//! });
64//!
65//! t1.join().unwrap();
66//! t2.join().unwrap();
67//!
68//! let mut receiver2 = receiver1.clone();
69//!
70//! let mut sum1 = 0;
71//! let mut sum2 = 0;
72//! for i in 0..100 {
73//!    let v1 = receiver1.recv().unwrap();
74//!    let v2 = receiver2.recv().unwrap();
75//!    sum1 += v1;
76//!    sum2 += v2;
77//!    assert_eq!(v1, v2);
78//! }
79//! assert_eq!(sum1, 49 * 50);
80//! assert_eq!(sum2, 49 * 50);
81//! # }
82//! ```
83//! Another example using the [`static_mem`] module (no allocation)
84//! ```
85//! # #[cfg(not(loom))]
86//! # {
87//! use blinkcast::static_mem::Sender;
88//!
89//! let sender = Sender::<i32, 4>::new();
90//! let mut receiver1 = sender.new_receiver();
91//! sender.send(1);
92//! sender.send(2);
93//!
94//! let mut receiver2 = receiver1.clone();
95//!
96//! assert_eq!(receiver1.recv(), Some(1));
97//! assert_eq!(receiver1.recv(), Some(2));
98//! assert_eq!(receiver1.recv(), None);
99//!
100//! assert_eq!(receiver2.recv(), Some(1));
101//! assert_eq!(receiver2.recv(), Some(2));
102//! assert_eq!(receiver2.recv(), None);
103//! # }
104
105#![cfg_attr(not(test), no_std)]
106#![cfg_attr(all(test, feature = "unstable"), feature(test))]
107
108use core::{
109    cell::{Cell, UnsafeCell},
110    mem::MaybeUninit,
111};
112
113#[cfg(not(loom))]
114use core::sync::atomic::{AtomicUsize, Ordering};
115#[cfg(loom)]
116use loom::sync::atomic::{AtomicUsize, Ordering};
117
118#[cfg(test)]
119mod tests;
120
121#[cfg(feature = "alloc")]
122pub mod alloc;
123mod core_impl;
124#[cfg(not(loom))]
125// Atomics in `loom` don't support `const fn`, so it crashes when compiling
126// since, we don't need `static_mem` to be tested with `loom`, we can just
127// exclude it if we are building for `loom`
128pub mod static_mem;
129
130// choose 64 for targets that support it, otherwise 32
131#[cfg(target_pointer_width = "64")]
132const LAP_SHIFT: u8 = 32;
133#[cfg(target_pointer_width = "32")]
134const LAP_SHIFT: u8 = 16;
135
136const INDEX_MASK: usize = (1 << LAP_SHIFT) - 1;
137/// The maximum length of the buffer allowed on this platform
138/// It will be `2^16 - 1` on 32bit platforms and `2^32 - 1` on 64bit platforms
139pub const MAX_LEN: usize = INDEX_MASK;
140
141const STATE_EMPTY: usize = 0;
142const STATE_AVAILABLE: usize = 1;
143const STATE_WRITING: usize = 2;
144// reading can be done by multiple readers
145// so we use value that is power of 2
146// we add this value to the state, which means there is a reader
147const STATE_READING: usize = 4;
148const READING_MASK: usize = usize::MAX & !(STATE_READING - 1);
149
150// extracts the lap and index
151// top 32bits are the lap
152// bottom 32bits are the index
153const fn unpack_data_index(index: usize) -> (usize, usize) {
154    let lap = index >> LAP_SHIFT;
155    let index = index & INDEX_MASK;
156    (lap, index)
157}
158
159const fn pack_data_index(lap: usize, index: usize) -> usize {
160    debug_assert!(lap < (1 << LAP_SHIFT));
161    debug_assert!(index < (1 << LAP_SHIFT));
162    (lap << LAP_SHIFT) | (index & INDEX_MASK)
163}
164
165#[inline]
166fn is_reading(state: usize) -> bool {
167    state & READING_MASK != 0
168}
169
170#[inline]
171fn is_readable(state: usize) -> bool {
172    state == STATE_AVAILABLE || is_reading(state)
173}
174
175struct Node<T> {
176    data: UnsafeCell<MaybeUninit<T>>,
177    state: AtomicUsize,
178    lap: Cell<usize>,
179}
180
181impl<T> Node<T> {
182    #[cfg(not(loom))]
183    // Atomics in `loom` don't support `const fn`, so it crashes when compiling
184    // since, we don't need `static_mem` to be tested with `loom`, we can just
185    // exclude it if we are building for `loom`
186    pub const fn empty() -> Self {
187        Self {
188            data: UnsafeCell::new(MaybeUninit::uninit()),
189            state: AtomicUsize::new(STATE_EMPTY),
190            lap: Cell::new(0),
191        }
192    }
193}
194
195impl<T> Default for Node<T> {
196    fn default() -> Self {
197        Self {
198            data: UnsafeCell::new(MaybeUninit::uninit()),
199            state: AtomicUsize::new(STATE_EMPTY),
200            lap: Cell::new(0),
201        }
202    }
203}
204
205#[derive(Clone)]
206struct ReaderData {
207    index: usize,
208    lap: usize,
209}