veecle_os_runtime/datastore/slot/
waiter.rs

1use super::super::generational;
2use super::Slot;
3use crate::Storable;
4use core::pin::Pin;
5
6pub(crate) struct Waiter<'a, T>
7where
8    T: Storable + 'static,
9{
10    slot: Pin<&'a Slot<T>>,
11    waiter: generational::Waiter<'a>,
12}
13
14impl<T> core::fmt::Debug for Waiter<'_, T>
15where
16    T: Storable + 'static,
17{
18    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
19        f.debug_struct("Waiter")
20            .field("slot", &self.slot)
21            .field("waiter", &self.waiter)
22            .finish()
23    }
24}
25
26impl<'a, T> Waiter<'a, T>
27where
28    T: Storable + 'static,
29{
30    pub(super) fn new(slot: Pin<&'a Slot<T>>, waiter: generational::Waiter<'a>) -> Self {
31        Self { slot, waiter }
32    }
33
34    /// Updates the last seen generation of this waiter so that we will wait for a newer value.
35    pub(crate) fn update_generation(&mut self) {
36        self.waiter.update_generation();
37    }
38
39    pub(crate) fn borrow(&self) -> core::cell::Ref<'_, Option<T::DataType>> {
40        self.slot.borrow()
41    }
42
43    pub(crate) fn read<U>(&self, f: impl FnOnce(&Option<T::DataType>) -> U) -> U {
44        self.slot.read(f)
45    }
46
47    pub(crate) fn inner_type_name(&self) -> &'static str {
48        self.slot.inner_type_name()
49    }
50
51    pub(crate) async fn wait(&self) {
52        if let Err(generational::MissedUpdate { current, expected }) = self.waiter.wait().await {
53            // While we are unsure about timing and such, I would at least keep a warning
54            // if we miss value. We can decide later on how we handle this case more
55            // properly.
56            let type_name = self.slot.inner_type_name();
57
58            veecle_telemetry::warn!(
59                "Missed update for type",
60                type_name = type_name,
61                current = current as i64,
62                expected = expected as i64
63            );
64        }
65    }
66}
67
68impl<'a, T> Waiter<'a, T>
69where
70    T: Storable + 'static,
71{
72    /// Takes the current value of the slot, leaving behind `None`.
73    ///
74    /// Stores the provided `span_context` to connect this write to the next read operation.
75    pub(crate) fn take(
76        &mut self,
77        span_context: Option<veecle_telemetry::SpanContext>,
78    ) -> Option<T::DataType> {
79        self.slot.take(span_context)
80    }
81}