veecle_os_runtime/datastore/
combined_readers.rs

1use core::future::{Future, poll_fn};
2use core::pin::pin;
3use core::task::Poll;
4
5/// Allows combining (nearly) arbitrary amounts of [`Reader`]s, [`ExclusiveReader`]s or [`InitializedReader`]s.
6///
7/// [`ExclusiveReader`]: super::ExclusiveReader
8/// [`InitializedReader`]: super::InitializedReader
9/// [`Reader`]: super::Reader
10pub trait CombineReaders {
11    /// The (tuple) value that will be read from the combined readers.
12    type ToBeRead<'b>;
13
14    /// Reads a tuple of values from all combined readers in the provided function.
15    fn read<U>(&self, f: impl FnOnce(Self::ToBeRead<'_>) -> U) -> U;
16
17    /// Observes the combined readers for updates.
18    ///
19    /// Will return if **any** of the readers is updated.
20    ///
21    /// This returns `&mut Self` to allow chaining a call to [`read`][Self::read`].
22    #[allow(async_fn_in_trait)]
23    async fn wait_for_update(&mut self) -> &mut Self;
24}
25
26pub(super) trait Sealed {}
27
28#[allow(private_bounds)]
29/// A marker trait for types that can be used with [`CombineReaders`], see that for more details.
30pub trait CombinableReader: Sealed {
31    /// The (owned) type that this type reads, will be exposed as a reference in the [`CombineReaders::read`] callback.
32    type ToBeRead: 'static;
33
34    /// Internal implementation details.
35    ///
36    /// Borrows the value of the reader from the slot's internal [`RefCell`][core::cell::RefCell].
37    #[doc(hidden)]
38    fn borrow(&self) -> core::cell::Ref<'_, Self::ToBeRead>;
39
40    /// Internal implementation details.
41    ///
42    /// See [`Reader::wait_for_update`] for more.
43    ///
44    /// [`Reader::wait_for_update`]: super::Reader::wait_for_update
45    #[doc(hidden)]
46    #[allow(async_fn_in_trait)]
47    async fn wait_for_update(&mut self);
48}
49
50/// Implements [`CombineReaders`] for provided types for the various reader types.
51macro_rules! impl_combined_reader_helper {
52    (
53        tuples: [
54            $(($($generic_type:ident)*),)*
55        ],
56    ) => {
57        $(
58            impl<$($generic_type,)*> CombineReaders for ( $( &mut $generic_type, )* )
59            where
60                $($generic_type: CombinableReader,)*
61            {
62                type ToBeRead<'x> = (
63                    $(&'x <$generic_type as CombinableReader>::ToBeRead,)*
64                );
65
66                #[allow(non_snake_case)]
67                #[veecle_telemetry::instrument]
68                fn read<A>(&self, f: impl FnOnce(Self::ToBeRead<'_>) -> A) -> A {
69                    let ($($generic_type,)*) = self;
70                    let ($($generic_type,)*) = ($({
71                        $generic_type.borrow()
72                    },)*);
73                    f(($(&*$generic_type,)*))
74                }
75
76                #[allow(non_snake_case)]
77                #[veecle_telemetry::instrument]
78                async fn wait_for_update(&mut self) -> &mut Self {
79                    {
80                        let ($($generic_type,)*) = self;
81                        let ($(mut $generic_type,)*) = ($(pin!($generic_type.wait_for_update()),)*);
82                        poll_fn(move |cx| {
83                            // We check every reader to increment the generation for every reader.
84                            let mut update_available = false;
85                            $(
86                                if $generic_type.as_mut().poll(cx).is_ready() {
87                                    update_available = true;
88                                }
89                            )*
90                            if update_available {
91                                Poll::Ready(())
92                            } else {
93                                Poll::Pending
94                            }
95                        }).await;
96                    }
97                    self
98                }
99            }
100        )*
101    };
102}
103
104impl_combined_reader_helper!(
105    tuples: [
106        // We don't implement this for a tuple with only one type, as that is just a reader.
107        (T U),
108        (T U V),
109        (T U V W),
110        (T U V W X),
111        (T U V W X Y),
112        (T U V W X Y Z),
113    ],
114);
115
116#[cfg(test)]
117#[cfg_attr(coverage_nightly, coverage(off))]
118mod tests {
119    use core::pin::pin;
120    use futures::FutureExt;
121
122    use crate::datastore::{
123        CombineReaders, ExclusiveReader, Reader, Slot, Storable, Writer, generational,
124    };
125
126    #[test]
127    fn read_exclusive_reader() {
128        #[derive(Eq, PartialEq, Debug, Clone, Storable)]
129        #[storable(crate = crate)]
130        struct Sensor0(u8);
131        #[derive(Eq, PartialEq, Debug, Clone, Storable)]
132        #[storable(crate = crate)]
133        struct Sensor1(u8);
134
135        let slot0 = pin!(Slot::<Sensor0>::new());
136        let slot1 = pin!(Slot::<Sensor1>::new());
137
138        let mut reader0 = ExclusiveReader::from_slot(slot0.as_ref());
139        let mut reader1 = ExclusiveReader::from_slot(slot1.as_ref());
140
141        (&mut reader0, &mut reader1).read(|(a, b)| assert_eq!(a.is_none(), b.is_none()));
142    }
143
144    #[test]
145    fn wait_for_update_exclusive_reader() {
146        #[derive(Eq, PartialEq, Debug, Clone, Storable)]
147        #[storable(crate = crate)]
148        struct Sensor0(u8);
149        #[derive(Eq, PartialEq, Debug, Clone, Storable)]
150        #[storable(crate = crate)]
151        struct Sensor1(u8);
152
153        let source = pin!(generational::Source::new());
154        let slot0 = pin!(Slot::<Sensor0>::new());
155        let slot1 = pin!(Slot::<Sensor1>::new());
156
157        let mut writer0 = Writer::new(source.as_ref().waiter(), slot0.as_ref());
158        let mut writer1 = Writer::new(source.as_ref().waiter(), slot1.as_ref());
159        let mut reader0 = ExclusiveReader::from_slot(slot0.as_ref());
160        let mut reader1 = ExclusiveReader::from_slot(slot1.as_ref());
161
162        assert!(
163            (&mut reader0, &mut reader1)
164                .wait_for_update()
165                .now_or_never()
166                .is_none()
167        );
168
169        source.as_ref().increment_generation();
170        writer0.write(Sensor0(2)).now_or_never().unwrap();
171        writer1.write(Sensor1(2)).now_or_never().unwrap();
172
173        assert!(
174            (&mut reader0, &mut reader1)
175                .wait_for_update()
176                .now_or_never()
177                .is_some()
178        );
179        assert!(
180            (&mut reader0, &mut reader1)
181                .wait_for_update()
182                .now_or_never()
183                .is_none()
184        );
185    }
186
187    #[test]
188    fn read() {
189        #[derive(Eq, PartialEq, Debug, Clone, Storable)]
190        #[storable(crate = crate)]
191        struct Sensor0(u8);
192        #[derive(Eq, PartialEq, Debug, Clone, Storable)]
193        #[storable(crate = crate)]
194        struct Sensor1(u8);
195
196        let source = pin!(generational::Source::new());
197        let slot0 = pin!(Slot::<Sensor0>::new());
198        let slot1 = pin!(Slot::<Sensor1>::new());
199
200        let mut reader0 = Reader::from_slot(slot0.as_ref());
201        let mut reader1 = Reader::from_slot(slot1.as_ref());
202
203        (&mut reader0, &mut reader1).read(|(a, b)| assert_eq!(a.is_none(), b.is_none()));
204
205        let mut writer0 = Writer::new(source.as_ref().waiter(), slot0.as_ref());
206        let mut writer1 = Writer::new(source.as_ref().waiter(), slot1.as_ref());
207        source.as_ref().increment_generation();
208        writer0.write(Sensor0(2)).now_or_never().unwrap();
209        writer1.write(Sensor1(2)).now_or_never().unwrap();
210
211        let mut reader0 = reader0.wait_init().now_or_never().unwrap();
212        let mut reader1 = reader1.wait_init().now_or_never().unwrap();
213
214        (&mut reader0, &mut reader1).read(|(a, b)| assert_eq!(a.0, b.0));
215    }
216
217    #[test]
218    fn wait_for_update() {
219        #[derive(Eq, PartialEq, Debug, Clone, Storable)]
220        #[storable(crate = crate)]
221        struct Sensor0(u8);
222        #[derive(Eq, PartialEq, Debug, Clone, Storable)]
223        #[storable(crate = crate)]
224        struct Sensor1(u8);
225
226        let source = pin!(generational::Source::new());
227        let slot0 = pin!(Slot::<Sensor0>::new());
228        let slot1 = pin!(Slot::<Sensor1>::new());
229
230        let mut writer0 = Writer::new(source.as_ref().waiter(), slot0.as_ref());
231        let mut writer1 = Writer::new(source.as_ref().waiter(), slot1.as_ref());
232        let mut reader0 = Reader::from_slot(slot0.as_ref());
233        let mut reader1 = Reader::from_slot(slot1.as_ref());
234
235        assert!(
236            (&mut reader0, &mut reader1)
237                .wait_for_update()
238                .now_or_never()
239                .is_none()
240        );
241
242        source.as_ref().increment_generation();
243        writer0.write(Sensor0(2)).now_or_never().unwrap();
244        writer1.write(Sensor1(2)).now_or_never().unwrap();
245
246        assert!(
247            (&mut reader0, &mut reader1)
248                .wait_for_update()
249                .now_or_never()
250                .is_some()
251        );
252        assert!(
253            (&mut reader0, &mut reader1)
254                .wait_for_update()
255                .now_or_never()
256                .is_none()
257        );
258
259        let mut reader0 = reader0.wait_init().now_or_never().unwrap();
260        let mut reader1 = reader1.wait_init().now_or_never().unwrap();
261
262        assert!(
263            (&mut reader0, &mut reader1)
264                .wait_for_update()
265                .now_or_never()
266                .is_none()
267        );
268
269        source.as_ref().increment_generation();
270        writer0.write(Sensor0(3)).now_or_never().unwrap();
271        writer1.write(Sensor1(3)).now_or_never().unwrap();
272
273        (&mut reader0, &mut reader1)
274            .wait_for_update()
275            .now_or_never()
276            .unwrap()
277            .read(|(a, b)| assert_eq!(a.0, b.0));
278        assert!(
279            (&mut reader0, &mut reader1)
280                .wait_for_update()
281                .now_or_never()
282                .is_none()
283        );
284    }
285
286    #[test]
287    fn read_mixed() {
288        #[derive(Eq, PartialEq, Debug, Clone, Storable)]
289        #[storable(crate = crate)]
290        struct Sensor0(u8);
291        #[derive(Eq, PartialEq, Debug, Clone, Storable)]
292        #[storable(crate = crate)]
293        struct Sensor1(u8);
294
295        let source = pin!(generational::Source::new());
296        let slot0 = pin!(Slot::<Sensor0>::new());
297        let slot1 = pin!(Slot::<Sensor1>::new());
298
299        let mut reader0 = Reader::from_slot(slot0.as_ref());
300        let mut reader1 = ExclusiveReader::from_slot(slot1.as_ref());
301
302        (&mut reader0, &mut reader1).read(|(a, b)| assert_eq!(a.is_none(), b.is_none()));
303
304        let mut writer0 = Writer::new(source.as_ref().waiter(), slot0.as_ref());
305        let mut writer1 = Writer::new(source.as_ref().waiter(), slot1.as_ref());
306        source.as_ref().increment_generation();
307        writer0.write(Sensor0(2)).now_or_never().unwrap();
308        writer1.write(Sensor1(2)).now_or_never().unwrap();
309
310        let mut reader0 = reader0.wait_init().now_or_never().unwrap();
311
312        (&mut reader0, &mut reader1)
313            .read(|(a, b): (&Sensor0, &Option<Sensor1>)| assert_eq!(a.0, b.as_ref().unwrap().0));
314    }
315}