veecle_os_runtime/datastore/
exclusive_reader.rs1use core::cell::Ref;
2use core::fmt::Debug;
3use core::marker::PhantomData;
4use core::pin::Pin;
5
6use crate::datastore::Storable;
7use crate::datastore::slot::{self, Slot};
8
9#[derive(Debug)]
59pub struct ExclusiveReader<'a, T>
60where
61 T: Storable + 'static,
62{
63 waiter: slot::Waiter<'a, T>,
64
65 marker: PhantomData<fn(T)>,
66}
67
68impl<T> ExclusiveReader<'_, T>
69where
70 T: Storable + 'static,
71{
72 #[veecle_telemetry::instrument]
78 pub fn read<U>(&self, f: impl FnOnce(Option<&T::DataType>) -> U) -> U {
79 self.waiter.read(|value| {
80 let value = value.as_ref();
81
82 veecle_telemetry::trace!("Slot read", type_name = self.waiter.inner_type_name());
84 f(value)
85 })
86 }
87
88 pub fn take(&mut self) -> Option<T::DataType> {
90 let span = veecle_telemetry::span!("take");
91 let _guard = span.enter();
92
93 let value = self.waiter.take(span.context());
94
95 veecle_telemetry::trace!(
97 "Slot value taken.",
98 type_name = self.waiter.inner_type_name()
99 );
100
101 value
102 }
103
104 pub fn read_cloned(&self) -> Option<T::DataType>
109 where
110 T::DataType: Clone,
111 {
112 self.read(|t| t.cloned())
113 }
114
115 #[veecle_telemetry::instrument]
123 pub async fn wait_for_update(&mut self) -> &mut Self {
124 self.waiter.wait().await;
125 self.waiter.update_generation();
126 self
127 }
128}
129
130impl<'a, T> ExclusiveReader<'a, T>
131where
132 T: Storable + 'static,
133{
134 pub(crate) fn from_slot(slot: Pin<&'a Slot<T>>) -> Self {
136 ExclusiveReader {
137 waiter: slot.waiter(),
138 marker: PhantomData,
139 }
140 }
141}
142
143impl<T> super::combined_readers::Sealed for ExclusiveReader<'_, T> where T: Storable {}
144
145impl<T> super::combined_readers::CombinableReader for ExclusiveReader<'_, T>
146where
147 T: Storable,
148{
149 type ToBeRead = Option<T::DataType>;
150
151 fn borrow(&self) -> Ref<'_, Self::ToBeRead> {
152 self.waiter.borrow()
153 }
154
155 async fn wait_for_update(&mut self) {
156 self.wait_for_update().await;
157 }
158}
159
160#[cfg(test)]
161#[cfg_attr(coverage_nightly, coverage(off))]
162mod tests {
163 use core::pin::pin;
164 use futures::FutureExt;
165
166 use crate::datastore::{ExclusiveReader, Slot, Storable, Writer, generational};
167
168 #[test]
169 fn read() {
170 #[derive(Eq, PartialEq, Debug, Clone, Storable)]
171 #[storable(crate = crate)]
172 struct Sensor(u8);
173
174 let source = pin!(generational::Source::new());
175 let slot = pin!(Slot::<Sensor>::new());
176
177 let reader = ExclusiveReader::from_slot(slot.as_ref());
178 let mut writer = Writer::new(source.as_ref().waiter(), slot.as_ref());
179
180 assert_eq!(reader.read(|x| x.cloned()), None);
181 assert_eq!(reader.read_cloned(), None);
182
183 source.as_ref().increment_generation();
184 writer.write(Sensor(1)).now_or_never().unwrap();
185
186 assert_eq!(
187 reader.read(|x: Option<&Sensor>| x.cloned()),
188 Some(Sensor(1))
189 );
190 assert_eq!(reader.read_cloned(), Some(Sensor(1)));
191 }
192
193 #[test]
194 fn take() {
195 #[derive(Eq, PartialEq, Debug, Clone, Storable)]
196 #[storable(crate = crate)]
197 struct Sensor(u8);
198
199 let source = pin!(generational::Source::new());
200 let slot = pin!(Slot::<Sensor>::new());
201
202 let mut reader = ExclusiveReader::from_slot(slot.as_ref());
203 let mut writer = Writer::new(source.as_ref().waiter(), slot.as_ref());
204
205 assert_eq!(reader.take(), None);
206 source.as_ref().increment_generation();
207 writer.write(Sensor(10)).now_or_never().unwrap();
208 assert_eq!(reader.take(), Some(Sensor(10)));
209 assert_eq!(reader.take(), None);
210 }
211
212 #[test]
213 fn wait_for_update() {
214 #[derive(Eq, PartialEq, Debug, Clone, Storable)]
215 #[storable(crate = crate)]
216 struct Sensor(u8);
217
218 let source = pin!(generational::Source::new());
219 let slot = pin!(Slot::<Sensor>::new());
220
221 let mut reader = ExclusiveReader::from_slot(slot.as_ref());
222 let mut writer = Writer::new(source.as_ref().waiter(), slot.as_ref());
223
224 assert!(reader.wait_for_update().now_or_never().is_none());
225
226 source.as_ref().increment_generation();
227 writer.write(Sensor(1)).now_or_never().unwrap();
228
229 reader
230 .wait_for_update()
231 .now_or_never()
232 .unwrap()
233 .read(|x| assert_eq!(x, Some(&Sensor(1))));
234 }
235}