veecle_os_runtime/memory_pool.rs
1//! An interrupt/thread-safe memory pool.
2//!
3//! The memory pool allows using static, stack or heap memory to store `SIZE` instances of `T`.
4//! [`MemoryPool::chunk`] provides [`Chunk`]s to interact with instances of `T`.
5//! [`Chunk`] is a pointer type, which means it is cheap to move.
6//! This makes the memory pool well suited for moving data between actors without copying.
7//! The memory pool is especially useful for large chunks of data or data that is expensive to move.
8//!
9//! [`Chunk`]s are automatically made available for re-use on drop.
10//!
11//! [`Chunk`]s can be created by:
12//! - [`MemoryPool::reserve`] and [`MemoryPoolToken::init`], which uses the provided value of `T` to initialize the
13//! chunk. [`MemoryPool::chunk`] combines both into a single method call.
14//! - [`MemoryPool::reserve`] and [`MemoryPoolToken::init_in_place`] to initialize `T` in place.
15//!
16//! # Example
17//!
18//! ```
19//! use veecle_os_runtime::{ExclusiveReader, Writer};
20//! use veecle_os_runtime::memory_pool::{Chunk, MemoryPool};
21//! use core::convert::Infallible;
22//! use veecle_os_runtime::Storable;
23//!
24//! #[derive(Debug, Storable)]
25//! #[storable(data_type = "Chunk<'static, u8>")]
26//! pub struct Data;
27//!
28//! #[veecle_os_runtime::actor]
29//! async fn exclusive_read_actor(mut reader: ExclusiveReader<'_, Data>) -> Infallible {
30//! loop {
31//! if let Some(chunk) = reader.take() {
32//! println!("Chunk received: {:?}", chunk);
33//! println!("Chunk content: {:?}", *chunk);
34//! } else {
35//! reader.wait_for_update().await;
36//! }
37//! }
38//! }
39//!
40//! #[veecle_os_runtime::actor]
41//! async fn write_actor(
42//! mut writer: Writer<'_, Data>,
43//! #[init_context] pool: &'static MemoryPool<u8, 5>,
44//! ) -> Infallible {
45//! for index in 0..10 {
46//! writer.write(pool.chunk(index).unwrap()).await;
47//! }
48//! # // Exit the application to allow doc-tests to complete.
49//! # std::process::exit(0);
50//! }
51//!
52//! static POOL: MemoryPool<u8, 5> = MemoryPool::new();
53//!
54//! # futures::executor::block_on(
55//! #
56//! veecle_os_runtime::execute! {
57//! store: [Data],
58//! actors: [
59//! ExclusiveReadActor,
60//! WriteActor: &POOL,
61//! ]
62//! }
63//! # );
64//! ```
65
66use core::cell::UnsafeCell;
67use core::fmt;
68use core::fmt::{Debug, Formatter};
69use core::mem::MaybeUninit;
70use core::ops::{Deref, DerefMut};
71use core::sync::atomic::{AtomicBool, Ordering};
72
73/// Interrupt- and thread-safe memory pool.
74///
75/// See [module-level documentation][self] for more information.
76#[derive(Debug)]
77pub struct MemoryPool<T, const SIZE: usize> {
78 chunks: [MemoryPoolInner<T>; SIZE],
79}
80
81impl<T, const SIZE: usize> Default for MemoryPool<T, SIZE> {
82 fn default() -> Self {
83 Self::new()
84 }
85}
86
87impl<T, const SIZE: usize> MemoryPool<T, SIZE> {
88 /// Creates a new [`MemoryPool`].
89 ///
90 /// `SIZE` is required to be larger than 0.
91 pub const fn new() -> Self {
92 const {
93 assert!(SIZE > 0, "empty ObjectPool");
94 }
95
96 Self {
97 chunks: [const { MemoryPoolInner::new() }; SIZE],
98 }
99 }
100
101 /// Reserves an element in the [`MemoryPool`].
102 ///
103 /// Returns `None` if no element is available.
104 ///
105 /// The returned token has to be initialized via [`MemoryPoolToken::init`] before use.
106 /// See [`MemoryPool::chunk`] for a convenience wrapper combining reserving and initializing a [`Chunk`].
107 pub fn reserve(&self) -> Option<MemoryPoolToken<'_, T>> {
108 self.chunks.iter().find_map(|chunk| chunk.reserve())
109 }
110
111 /// Retrieves a [`Chunk`] from the [`MemoryPool`] and initializes it with `init_value`.
112 ///
113 /// Returns `Err(init_value)` if no more [`Chunk`]s are available.
114 ///
115 /// Convenience wrapper combining [`MemoryPool::reserve`] and [`MemoryPoolToken::init].
116 pub fn chunk(&self, init_value: T) -> Result<Chunk<'_, T>, T> {
117 // We need to split reserving and initializing of the `Chunk` because we cannot copy the `init_value` into
118 // every `reserve` call.
119 let token = self.reserve();
120
121 if let Some(token) = token {
122 Ok(token.init(init_value))
123 } else {
124 Err(init_value)
125 }
126 }
127
128 /// Calculates the amount of chunks currently available.
129 ///
130 /// Due to accesses from interrupts and/or other threads, this value might not be correct.
131 /// Only intended for metrics.
132 pub fn chunks_available(&self) -> usize {
133 self.chunks
134 .iter()
135 .map(|chunk| usize::from(chunk.is_available()))
136 .sum()
137 }
138}
139
140// SAFETY: All accesses to the `MemoryPool` are done through the `MemoryPool::chunk` method which is synchronized by
141// atomics.
142unsafe impl<T, const N: usize> Sync for MemoryPool<T, N> {}
143
144/// Container for the `T` instance and synchronization atomic for the [`MemoryPool`].
145#[derive(Debug)]
146struct MemoryPoolInner<T> {
147 data: UnsafeCell<MaybeUninit<T>>,
148 available: AtomicBool,
149}
150
151impl<T> MemoryPoolInner<T> {
152 /// Creates a new `MemoryPoolInner`.
153 ///
154 /// Marked available and uninitialized.
155 const fn new() -> Self {
156 Self {
157 data: UnsafeCell::new(MaybeUninit::uninit()),
158 available: AtomicBool::new(true),
159 }
160 }
161
162 /// Reserves this [`MemoryPoolInner`].
163 fn reserve(&self) -> Option<MemoryPoolToken<'_, T>> {
164 if self.available.swap(false, Ordering::AcqRel) {
165 Some(MemoryPoolToken { inner: Some(self) })
166 } else {
167 None
168 }
169 }
170
171 /// Returns `true` if the [`MemoryPoolInner`] is currently available.
172 fn is_available(&self) -> bool {
173 self.available.load(Ordering::Acquire)
174 }
175}
176
177/// A token reserving an element in a [`MemoryPool`] which can be initialized to create a [`Chunk`].
178#[derive(Debug)]
179pub struct MemoryPoolToken<'a, T> {
180 inner: Option<&'a MemoryPoolInner<T>>,
181}
182
183impl<'a, T> MemoryPoolToken<'a, T> {
184 /// Consumes the [`MemoryPoolToken.inner`][field@MemoryPoolToken::inner] to prevent [`MemoryPoolToken`]'s drop
185 /// implementation from making the element available.
186 fn consume(&mut self) -> (&'a mut MaybeUninit<T>, &'a AtomicBool) {
187 let Some(inner) = self.inner.take() else {
188 unreachable!("`MemoryPoolToken` should only be consumed once");
189 };
190
191 let inner_data = {
192 let inner_data_ptr = inner.data.get();
193 // SAFETY:
194 // - `UnsafeCell` has the same layout as its content, thus the `chunk_ptr` points to an aligned and valid
195 // value of `MaybeUninit<T>`.
196 // - We ensure via the `ChunkMetadata` that only this single mutable reference to the content of the
197 // `UnsafeCell` exists.
198 unsafe { inner_data_ptr.as_mut() }
199 .expect("pointer to the contents of an `UnsafeCell` should not be null")
200 };
201
202 (inner_data, &inner.available)
203 }
204
205 /// Consumes and turns the [`MemoryPoolToken`] into an initialized [`Chunk`].
206 pub fn init(mut self, init_value: T) -> Chunk<'a, T> {
207 let (inner_data, available) = self.consume();
208
209 inner_data.write(init_value);
210
211 // SAFETY:
212 // `inner_data` has be initialized by writing the `init_value`.
213 unsafe { Chunk::new(inner_data, available) }
214 }
215
216 /// Initializes a [`Chunk`] in place via `init_function`.
217 ///
218 /// # Safety
219 ///
220 /// `init_function` must initialize the passed parameter to a valid `T` before the function returns.
221 pub unsafe fn init_in_place(
222 mut self,
223 init_function: impl FnOnce(&mut MaybeUninit<T>),
224 ) -> Chunk<'a, T> {
225 let (inner_data, available) = self.consume();
226
227 init_function(inner_data);
228
229 // SAFETY:
230 // `inner_data` has be initialized by `init_function`.
231 unsafe { Chunk::new(inner_data, available) }
232 }
233}
234
235impl<T> Drop for MemoryPoolToken<'_, T> {
236 fn drop(&mut self) {
237 if let Some(inner) = self.inner.take() {
238 inner.available.store(true, Ordering::Release);
239 }
240 }
241}
242
243/// A pointer type pointing to an instance of `T` in a [`MemoryPool`].
244///
245/// See [module-level documentation][self] for more information.
246pub struct Chunk<'a, T> {
247 // We're using `&mut MaybeUninit<T>` instead of `&mut T` to be able to drop `T` without going through a pointer
248 // while only having a reference.
249 // We cannot drop the contents of a reference without creating a dangling reference in the `Drop` implementation.
250 inner: &'a mut MaybeUninit<T>,
251 // Only held to ensure the chunk is made available on drop.
252 token: &'a AtomicBool,
253}
254
255impl<T> Debug for Chunk<'_, T>
256where
257 T: Debug,
258{
259 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
260 Debug::fmt(&**self, f)
261 }
262}
263
264impl<'a, T> Chunk<'a, T> {
265 /// Creates a new [`Chunk`].
266 ///
267 /// # Safety
268 ///
269 /// The `chunk` must be initialized.
270 unsafe fn new(chunk: &'a mut MaybeUninit<T>, token: &'a AtomicBool) -> Self {
271 Self {
272 inner: chunk,
273 token,
274 }
275 }
276}
277
278impl<T> Deref for Chunk<'_, T> {
279 type Target = T;
280
281 fn deref(&self) -> &Self::Target {
282 // SAFETY: The `Self::new` safety documentation requires the chunk to be initialized.
283 // It is only dropped in the drop implementation and cannot be un-initialized by any `Chunk` method, thus it is
284 // initialized here.
285 unsafe { self.inner.assume_init_ref() }
286 }
287}
288
289impl<T> DerefMut for Chunk<'_, T> {
290 fn deref_mut(&mut self) -> &mut <Self as Deref>::Target {
291 // SAFETY: The `Self::new` safety documentation requires the chunk to be initialized.
292 // It is only dropped in the drop implementation and cannot be un-initialized by any `Chunk` method, thus it is
293 // initialized here.
294 unsafe { self.inner.assume_init_mut() }
295 }
296}
297
298impl<T> Drop for Chunk<'_, T> {
299 fn drop(&mut self) {
300 // SAFETY: The `Self::new` safety documentation requires the chunk to be initialized.
301 // It is only dropped in the drop implementation and cannot be un-initialized by any `Chunk` method, thus it is
302 // initialized here.
303 unsafe { self.inner.assume_init_drop() };
304 debug_assert!(
305 !self.token.swap(true, Ordering::AcqRel),
306 "chunk was made available a second time"
307 );
308 }
309}
310
311#[cfg(test)]
312#[cfg_attr(coverage_nightly, coverage(off))]
313mod test {
314 use std::format;
315 use std::sync::atomic::AtomicUsize;
316
317 use super::*;
318
319 #[test]
320 fn pool() {
321 static POOL: MemoryPool<[u8; 10], 2> = MemoryPool::new();
322
323 let mut chunk = POOL.chunk([0; 10]).unwrap();
324 let chunk1 = POOL.chunk([0; 10]).unwrap();
325 assert!(POOL.chunk([0; 10]).is_err());
326 assert_eq!(chunk[0], 0);
327 chunk[0] += 1;
328 assert_eq!(chunk[0], 1);
329 assert_eq!(chunk1[0], 0);
330 }
331
332 #[test]
333 fn drop_test() {
334 #[derive(Debug)]
335 pub struct Dropper {}
336 impl Drop for Dropper {
337 fn drop(&mut self) {
338 COUNTER.fetch_add(1, Ordering::Relaxed);
339 }
340 }
341
342 static COUNTER: AtomicUsize = AtomicUsize::new(0);
343
344 {
345 let pool: MemoryPool<Dropper, 2> = MemoryPool::new();
346
347 let _ = pool.chunk(Dropper {});
348 assert_eq!(COUNTER.load(Ordering::Relaxed), 1);
349
350 {
351 let _dropper1 = pool.chunk(Dropper {}).unwrap();
352 let _dropper2 = pool.chunk(Dropper {}).unwrap();
353 assert!(pool.chunk(Dropper {}).is_err());
354 }
355 assert_eq!(COUNTER.load(Ordering::Relaxed), 4);
356 let _ = pool.chunk(Dropper {});
357 assert_eq!(COUNTER.load(Ordering::Relaxed), 5);
358 }
359
360 // After dropping `pool`, there were no additional drops of the contained type.
361 assert_eq!(COUNTER.load(Ordering::Relaxed), 5);
362 }
363
364 #[test]
365 fn drop_memory_pool_token() {
366 let pool = MemoryPool::<usize, 1>::new();
367 assert_eq!(pool.chunks_available(), 1);
368 {
369 let _token = pool.reserve().unwrap();
370 assert_eq!(pool.chunks_available(), 0);
371 }
372 assert_eq!(pool.chunks_available(), 1);
373 }
374
375 #[test]
376 fn chunks_available() {
377 let pool = MemoryPool::<usize, 2>::new();
378 assert_eq!(pool.chunks_available(), 2);
379 {
380 let _chunk = pool.chunk(0);
381 assert_eq!(pool.chunks_available(), 1);
382 let _chunk = pool.chunk(0);
383 assert_eq!(pool.chunks_available(), 0);
384 }
385 assert_eq!(pool.chunks_available(), 2);
386 }
387
388 #[test]
389 fn reserve_init() {
390 let pool = MemoryPool::<usize, 2>::new();
391 let token = pool.reserve().unwrap();
392 let chunk = token.init(2);
393 assert_eq!(*chunk, 2);
394 }
395
396 #[test]
397 fn reserve_init_in_place() {
398 let pool = MemoryPool::<usize, 2>::new();
399 let token = pool.reserve().unwrap();
400 // SAFETY: The passed closure initializes the chunk correctly.
401 let chunk = unsafe {
402 token.init_in_place(|m| {
403 m.write(2);
404 })
405 };
406 assert_eq!(*chunk, 2);
407 }
408
409 #[test]
410 #[should_panic(expected = "`MemoryPoolToken` should only be consumed once")]
411 fn consume_none() {
412 let pool = MemoryPool::<usize, 2>::new();
413 let mut token = pool.reserve().unwrap();
414 let _ = token.consume();
415 let _ = token.consume();
416 }
417
418 /// Ensures the `MemoryPool` and `Chunk` don't lose their `Send` & `Sync` auto trait implementations when
419 /// refactoring.
420 #[test]
421 fn send_sync() {
422 fn send<T>()
423 where
424 T: Send,
425 {
426 }
427 fn sync<T>()
428 where
429 T: Sync,
430 {
431 }
432 send::<MemoryPool<[u8; 10], 2>>();
433 sync::<MemoryPool<[u8; 10], 2>>();
434
435 send::<Chunk<[u8; 10]>>();
436 sync::<Chunk<[u8; 10]>>();
437 }
438
439 #[test]
440 fn debug_chunk() {
441 let pool = MemoryPool::<usize, 2>::new();
442 let chunk = pool.chunk(0).unwrap();
443 assert_eq!(format!("{chunk:?}"), "0");
444 }
445
446 #[test]
447 fn default_memory_pool() {
448 let pool: MemoryPool<usize, 2> = MemoryPool::default();
449 assert_eq!(pool.chunks_available(), 2);
450 }
451}