veecle_os_runtime/
heapfree_executor.rs

1//! A multi-task executor that uses statically allocated state for tracking task status.
2
3use core::convert::Infallible;
4use core::fmt::Debug;
5use core::future::Future;
6use core::ops::{Add, Div};
7use core::pin::Pin;
8use core::sync::atomic::{AtomicUsize, Ordering};
9use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
10
11use futures::task::AtomicWaker;
12use generic_array::{ArrayLength, GenericArray};
13use typenum::operator_aliases::{Quot, Sum};
14use typenum::{Const, ToUInt, U};
15
16use crate::datastore::generational;
17
18type UsizeBits = U<{ usize::BITS as usize }>;
19type UsizeBitsMinusOne = typenum::operator_aliases::Sub1<UsizeBits>;
20
21/// Helper to force associated type normalization.
22trait AddUsizeBitsMinusOne {
23    /// See the sole implementation docs for the value.
24    type Output;
25}
26
27/// Helper to force associated type normalization.
28trait DivCeilUsizeBits {
29    /// See the sole implementation docs for the value.
30    type Output;
31}
32
33/// Private API to simplify bounds that users don't need to care about.
34///
35/// The name is like this because it appears in the public API docs as a bound, but doesn't get linked to.
36trait Internal {
37    /// See the sole implementation docs for the value.
38    ///
39    /// The bounds here are what are required by [`WakerShared::new`].
40    type LengthInWords: ArrayLength;
41}
42
43impl<const LEN: usize> AddUsizeBitsMinusOne for Const<LEN>
44where
45    Const<LEN>: ToUInt<Output: Add<UsizeBitsMinusOne>>,
46{
47    /// Calculates `LEN + usize::BITS - 1`.
48    type Output = Sum<U<LEN>, UsizeBitsMinusOne>;
49}
50
51impl<const LEN: usize> DivCeilUsizeBits for Const<LEN>
52where
53    Const<LEN>: AddUsizeBitsMinusOne<Output: Div<UsizeBits>>,
54{
55    /// Calculates `(LEN + usize::BITS - 1) / usize::BITS`, which is the same as `LEN.div_ceil(usize::BITS)`.
56    type Output = Quot<<Const<LEN> as AddUsizeBitsMinusOne>::Output, UsizeBits>;
57}
58
59impl<const LEN: usize> Internal for Const<LEN>
60where
61    Const<LEN>: DivCeilUsizeBits<Output: ArrayLength>,
62{
63    /// The length in "words" (`usize`s) required to store at least `LEN` bits.
64    ///
65    /// This is essentially `LEN.div_ceil(usize::BITS)`, but implemented as `(LEN + usize::BITS - 1) / usize::BITS`
66    /// because that's what operations `typenum` provides.
67    ///
68    /// The extra intermediate traits are required to force normalization of the associated types.
69    type LengthInWords = <Const<LEN> as DivCeilUsizeBits>::Output;
70}
71
72/// Helper to simplify getting the `LengthInWords` associated type.
73type LengthInWords<const LEN: usize> = <Const<LEN> as Internal>::LengthInWords;
74
75/// Data shared between multiple [`BitWaker`]s associated with a single [`Executor`].
76#[derive(Debug)]
77struct WakerShared<const LEN: usize>
78where
79    Const<LEN>: Internal,
80{
81    /// The outer [`Waker`] that this [`Executor`] is currently associated with.
82    waker: AtomicWaker,
83
84    /// Each bit stores the flag for a sub-future of an [`Executor`], to determine which need to run on each poll.
85    active: GenericArray<AtomicUsize, LengthInWords<LEN>>,
86}
87
88/// Gets the index of the word that the `index` flag is stored in, and the mask to identify the flag within the
89/// word.
90fn get_active_index_and_mask(index: usize) -> (usize, usize) {
91    let word_index = index / usize::BITS as usize;
92    let bit_index = index % usize::BITS as usize;
93    (word_index, 1 << bit_index)
94}
95
96impl<const LEN: usize> WakerShared<LEN>
97where
98    Const<LEN>: Internal,
99{
100    /// Creates a new `WakerShared` that can store the state of at least `LEN` sub-futures.
101    const fn new() -> Self {
102        let active = {
103            // Using `const_default` would be nicer, but that can't be used with atomics on 32-bit.
104            let mut active = GenericArray::uninit();
105
106            // Set all bits to 1 so that every future is considered woken and will be polled in the first loop.
107            let mut index = 0;
108            let slice = active.as_mut_slice();
109            while index < slice.len() {
110                slice[index].write(AtomicUsize::new(usize::MAX));
111                index += 1;
112            }
113
114            // SAFETY: We initialized every element of the array above.
115            unsafe { GenericArray::assume_init(active) }
116        };
117
118        Self {
119            waker: AtomicWaker::new(),
120            active,
121        }
122    }
123
124    /// Clears the flag for the `index` sub-future, and returns the previous value.
125    fn reset(&self, index: usize) -> bool {
126        let (active_word, mask) = self.get_active_ref_and_mask(index);
127        let previous_value = active_word.fetch_and(!mask, Ordering::Relaxed);
128        // Was the bit set in the previous value?
129        (previous_value & mask) != 0
130    }
131
132    /// Clears all flags and returns the indexes of any that were set.
133    fn reset_all(&self) -> impl Iterator<Item = usize> + use<'_, LEN> {
134        (0..LEN).filter(|&index| self.reset(index))
135    }
136
137    /// Sets the flag for the `index` sub-future, and returns the previous value, waking any currently registered outer
138    /// waker in the process.
139    fn set(&self, index: usize) -> bool {
140        let (active_word, mask) = self.get_active_ref_and_mask(index);
141        let previous_value = active_word.fetch_or(mask, Ordering::Relaxed);
142
143        self.waker.wake();
144
145        // Was the bit set in the previous value?
146        (previous_value & mask) != 0
147    }
148
149    /// Registers the [`Waker`] of the current context as to-be-woken when any sub-future wakes.
150    async fn register_current(&self) {
151        core::future::poll_fn(|ctx| {
152            self.waker.register(ctx.waker());
153            Poll::Ready(())
154        })
155        .await;
156    }
157
158    /// Gets the word that the `index` flag is stored in, and the mask to identify the flag within the word.
159    fn get_active_ref_and_mask(&self, index: usize) -> (&AtomicUsize, usize) {
160        let (index, mask) = get_active_index_and_mask(index);
161        (&self.active[index], mask)
162    }
163}
164
165/// A [`Waker`] that can be used to wake a sub-future within an outer task while tracking which sub-future it is.
166#[derive(Debug)]
167struct BitWaker<const LEN: usize>
168where
169    Const<LEN>: Internal,
170{
171    /// Index for this sub-future within `self.shared`.
172    index: usize,
173
174    /// Shared state for all sub-futures running in the same [`Executor`].
175    shared: Option<&'static WakerShared<LEN>>,
176}
177
178impl<const LEN: usize> BitWaker<LEN>
179where
180    Const<LEN>: Internal,
181{
182    // For all following comments:
183    // We use this vtable with a data pointer converted from an `&'static Self` in `as_waker`.
184    const VTABLE: &RawWakerVTable = &RawWakerVTable::new(
185        // `&'static Self: Copy` so we can trivially return a new `RawWaker`.
186        |ptr| RawWaker::new(ptr, Self::VTABLE),
187        // SAFETY: We can convert back to an `&'static Self` then call its methods.
188        |ptr| unsafe { &*ptr.cast::<Self>() }.wake_by_ref(),
189        // SAFETY: We can convert back to an `&'static Self` then call its methods.
190        |ptr| unsafe { &*ptr.cast::<Self>() }.wake_by_ref(),
191        // `&'static Self` has a no-op `drop_in_place` so we don't need to do anything.
192        |_| {},
193    );
194
195    /// Creates a new [`BitWaker`] that will panic if used, for const-initialization purposes.
196    const fn invalid() -> Self {
197        Self {
198            index: usize::MAX,
199            shared: None,
200        }
201    }
202
203    /// Creates a new [`BitWaker`] for the future at the given `index` in the [`WakerShared`].
204    const fn new(index: usize, shared: &'static WakerShared<LEN>) -> Self {
205        assert!(index < LEN, "Future index out of bounds.");
206        Self {
207            index,
208            shared: Some(shared),
209        }
210    }
211
212    /// Set the bit for this waker's `index` in its [`WakerShared`].
213    fn wake_by_ref(&self) {
214        self.shared.unwrap().set(self.index);
215    }
216
217    /// Create a [`Waker`] instance that will wake this [`BitWaker`].
218    fn as_waker(&'static self) -> Waker {
219        let pointer = (&raw const *self).cast();
220        // SAFETY: The vtable functions expect to be called with a data pointer converted from an `&'static Self`.
221        unsafe { Waker::new(pointer, Self::VTABLE) }
222    }
223}
224
225/// Permanent shared state required for the [`Executor`].
226#[derive(Debug)]
227#[expect(private_bounds)]
228pub struct ExecutorShared<const LEN: usize>
229where
230    Const<LEN>: Internal,
231{
232    shared: WakerShared<LEN>,
233    bit_wakers: [BitWaker<LEN>; LEN],
234}
235
236#[expect(private_bounds)]
237impl<const LEN: usize> ExecutorShared<LEN>
238where
239    Const<LEN>: Internal,
240{
241    /// Create a new instance of the shared state. This is a self-referential data structure so must be initialized in a
242    /// `static` like:
243    ///
244    /// ```rust
245    /// use veecle_os_runtime::__exports::ExecutorShared;
246    ///
247    /// static SHARED: ExecutorShared<5> = ExecutorShared::new(&SHARED);
248    /// ```
249    pub const fn new(&'static self) -> Self {
250        let mut bit_wakers = [const { BitWaker::invalid() }; LEN];
251        let mut index = 0;
252        while index < LEN {
253            bit_wakers[index] = BitWaker::new(index, &self.shared);
254            index += 1;
255        }
256        Self {
257            shared: WakerShared::new(),
258            bit_wakers,
259        }
260    }
261}
262
263/// Async sub-executor.
264///
265/// This sub-executor does not handle the main loop of waiting till a waker is woken and then polling the futures, it
266/// expects to be run as a task within an executor that does that (e.g. `futures::executor::block_on`,
267/// `wasm_bindgen_futures::spawn_local`, `veecle_freertos_integration::task::block_on_future`). Within that outer executor loop this
268/// sub-executor tracks which of its futures were the cause of a wake and polls only them.
269///
270/// Being designed like this allows this sub-executor to be agnostic to how a platform actually runs futures—including
271/// esoteric platforms like `wasm32-web` which cannot have a thread-park based executor—while still giving the required
272/// guarantees about when and how the sub-futures are polled.
273///
274/// # Polling strategy
275///
276/// The executor polls all woken futures in a fixed order.
277/// The order corresponds to the order of the elements provided to [`Executor::new()`].
278/// Initially, every future will be polled once.
279///
280/// ## Background
281///
282/// Any write to a slot should be visible for every reader.
283/// To ensure this, every future waiting for a reader must be polled before the writer is polled again.
284/// With in-order polling, this is straight-forward to guarantee, as every future that is woken by a write will be
285/// polled before the writer is polled again.
286// Developer notes:
287//
288// - [`generational::Source::increment_generation`] is used between each poll iteration in order to track that each
289// writer may only be written once per-iteration, if the writing actor has multiple data values to yield.
290//
291// - An alternative that was considered is a FIFO queue, but this approach requires additional complexity to fulfill the
292// read-write requirement.
293// Additionally, implementing in-order polling without locks is simpler compared to FIFO-approaches.
294// See <https://github.com/veecle/veecle-os/issues/167> for more information.
295#[expect(private_bounds)]
296pub struct Executor<'a, const LEN: usize>
297where
298    Const<LEN>: Internal,
299{
300    /// A generational source provided by the datastore.
301    source: Pin<&'a generational::Source>,
302    shared: &'static ExecutorShared<LEN>,
303    futures: [Pin<&'a mut (dyn Future<Output = Infallible> + 'a)>; LEN],
304}
305
306impl<const LEN: usize> core::fmt::Debug for Executor<'_, LEN>
307where
308    Const<LEN>: Internal,
309{
310    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
311        f.debug_struct("Executor")
312            .field("source", &self.source)
313            .field("shared", &self.shared)
314            .field("futures", &"<opaque>")
315            .finish()
316    }
317}
318
319#[expect(private_bounds)]
320impl<'a, const LEN: usize> Executor<'a, LEN>
321where
322    Const<LEN>: Internal,
323{
324    /// Creates a new [`Executor`] from the provided futures.
325    pub fn new(
326        shared: &'static ExecutorShared<LEN>,
327        source: Pin<&'a generational::Source>,
328        futures: [Pin<&'a mut (dyn Future<Output = Infallible> + 'a)>; LEN],
329    ) -> Self {
330        Self {
331            source,
332            shared,
333            futures,
334        }
335    }
336
337    /// Polls all woken futures once, returns `true` if at least one future was woken.
338    pub(crate) fn run_once(&mut self) -> bool {
339        let mut polled = false;
340
341        for index in self.shared.shared.reset_all() {
342            let future = &mut self.futures[index];
343            let waker = self.shared.bit_wakers[index].as_waker();
344            let mut context = Context::from_waker(&waker);
345            match future.as_mut().poll(&mut context) {
346                Poll::Pending => {}
347            }
348            polled = true;
349        }
350
351        self.source.increment_generation();
352
353        polled
354    }
355
356    /// Runs all futures in an endless loop.
357    pub async fn run(mut self) -> ! {
358        loop {
359            self.shared.shared.register_current().await;
360
361            // Only run through the list of futures once, relying on the outer executor to re-poll if any self-woke or
362            // woke a prior sub-future.
363            self.run_once();
364
365            // The sub-futures are responsible for waking if needed, yield here to the executor then continue to poll
366            // the sub-futures straight away.
367            let mut yielded = false;
368            core::future::poll_fn(|_| {
369                if yielded {
370                    Poll::Ready(())
371                } else {
372                    yielded = true;
373                    Poll::Pending
374                }
375            })
376            .await;
377        }
378    }
379}
380
381#[cfg(test)]
382#[cfg_attr(coverage_nightly, coverage(off))]
383mod tests {
384    use core::pin::pin;
385    use core::task::Poll;
386    use std::vec::Vec;
387
388    use super::{BitWaker, Executor, ExecutorShared, WakerShared, get_active_index_and_mask};
389    use crate::datastore::generational;
390
391    const TWO_WORDS: usize = usize::BITS as usize * 2;
392
393    #[test]
394    fn calculate_indices() {
395        // First bit in first element.
396        assert_eq!(get_active_index_and_mask(0), (0, 1 << 0));
397
398        // Second bit in first element.
399        assert_eq!(get_active_index_and_mask(1), (0, 1 << 1));
400
401        // Last bit in first element.
402        assert_eq!(
403            get_active_index_and_mask(usize::BITS as usize - 1),
404            (0, 1 << (usize::BITS as usize - 1))
405        );
406
407        // First bit in second element.
408        assert_eq!(get_active_index_and_mask(usize::BITS as usize), (1, 1 << 0));
409
410        // Second bit in second element.
411        assert_eq!(
412            get_active_index_and_mask(usize::BITS as usize + 1),
413            (1, 1 << 1)
414        );
415    }
416
417    #[test]
418    fn waker_shared_initializes_as_all_awake() {
419        assert_eq!(
420            Vec::from_iter(WakerShared::<0>::new().reset_all()),
421            // Annotation required because `impl PartialEq<serde_json::Value> for usize` _might_ be seen by `rustc` and
422            // make this ambiguous.
423            Vec::<usize>::new()
424        );
425        assert_eq!(
426            Vec::from_iter(WakerShared::<1>::new().reset_all()),
427            Vec::from_iter(0..1)
428        );
429        assert_eq!(
430            Vec::from_iter(WakerShared::<{ usize::BITS as usize - 1 }>::new().reset_all()),
431            Vec::from_iter(0..usize::BITS as usize - 1)
432        );
433        assert_eq!(
434            Vec::from_iter(WakerShared::<{ usize::BITS as usize }>::new().reset_all()),
435            Vec::from_iter(0..usize::BITS as usize)
436        );
437        assert_eq!(
438            Vec::from_iter(WakerShared::<{ usize::BITS as usize + 1 }>::new().reset_all()),
439            Vec::from_iter(0..usize::BITS as usize + 1)
440        );
441    }
442
443    #[test]
444    fn bitwaker_valid_indexes() {
445        static SHARED: WakerShared<TWO_WORDS> = WakerShared::new();
446        let mut i = 0;
447        while i < TWO_WORDS {
448            BitWaker::new(i, &SHARED).wake_by_ref();
449            i += 1;
450        }
451        assert!(std::panic::catch_unwind(|| BitWaker::new(i, &SHARED)).is_err());
452    }
453
454    #[test]
455    fn extra_code_coverage() {
456        static SHARED: ExecutorShared<1> = ExecutorShared::new(&SHARED);
457
458        // Not the expected API usage, but should work to get code-coverage of some methods that are normally only
459        // called in `const`-context.
460        let _ = ExecutorShared::new(&SHARED);
461
462        let source = pin!(generational::Source::new());
463        let futures = [pin!(async move { core::future::pending().await }) as _];
464        let executor = Executor::new(&SHARED, source.as_ref(), futures);
465
466        let _ = std::format!("{executor:?}");
467
468        let _ = BitWaker::<1>::invalid();
469    }
470
471    #[cfg(not(miri))] // Miri leak-checker doesn't like the leftover thread
472    #[test]
473    fn executor() {
474        let (tx, rx) = std::sync::mpsc::channel();
475
476        std::thread::spawn({
477            move || {
478                let source = pin!(generational::Source::new());
479
480                static SHARED: ExecutorShared<1> = ExecutorShared::new(&SHARED);
481                let futures = [pin!(async move {
482                    let mut yielded = false;
483                    core::future::poll_fn(|cx| {
484                        if yielded {
485                            Poll::Ready(())
486                        } else {
487                            yielded = true;
488                            cx.waker().wake_by_ref();
489
490                            // for code-coverage
491                            #[expect(clippy::waker_clone_wake)]
492                            cx.waker().clone().wake();
493
494                            Poll::Pending
495                        }
496                    })
497                    .await;
498                    // `Executor::run` doesn't return, so we notify that we got here and leave this thread parked.
499                    let _ = tx.send(());
500                    std::future::pending().await
501                }) as _];
502
503                let executor = Executor::new(&SHARED, source.as_ref(), futures);
504
505                futures::executor::block_on(executor.run());
506            }
507        });
508
509        assert!(rx.recv_timeout(std::time::Duration::from_secs(1)).is_ok());
510    }
511}