veecle_telemetry/collector/mod.rs
1//! Telemetry data collection and export infrastructure.
2//!
3//! This module provides the core infrastructure for collecting telemetry data and exporting it
4//! to various backends.
5//! It includes the global collector singleton, export trait, and various
6//! built-in exporters.
7//!
8//! # Global Collector
9//!
10//! The collector uses a global singleton pattern to ensure telemetry data is collected
11//! consistently across the entire application.
12//! The collector must be initialized once
13//! using [`set_exporter`] before any telemetry data can be collected.
14//!
15//! # Export Trait
16//!
17//! The [`Export`] trait defines the interface for exporting telemetry data.
18//! Custom exporters can be implemented by providing an implementation of this trait.
19//!
20//! # Built-in Exporters
21//!
22//! - [`ConsoleJsonExporter`] - Exports telemetry data as JSON to stdout
23//! - [`TestExporter`] - Collects telemetry data in memory for testing purposes
24
25#[cfg(feature = "std")]
26mod json_exporter;
27#[cfg(feature = "std")]
28mod test_exporter;
29
30use core::fmt::Debug;
31#[cfg(feature = "enable")]
32use core::sync::atomic::{AtomicUsize, Ordering};
33use core::{error, fmt};
34
35#[cfg(feature = "std")]
36pub use json_exporter::ConsoleJsonExporter;
37#[cfg(feature = "std")]
38#[doc(hidden)]
39pub use test_exporter::TestExporter;
40
41use crate::protocol::InstanceMessage;
42#[cfg(feature = "enable")]
43pub use crate::protocol::ProcessId;
44#[cfg(feature = "enable")]
45use crate::protocol::{
46 LogMessage, SpanAddEventMessage, SpanAddLinkMessage, SpanCloseMessage, SpanCreateMessage,
47 SpanEnterMessage, SpanExitMessage, SpanSetAttributeMessage, TelemetryMessage, ThreadId,
48 TracingMessage,
49};
50
51/// Trait for exporting telemetry data to external systems.
52///
53/// Implementors of this trait define how telemetry data should be exported,
54/// whether to files, network endpoints, or other destinations.
55///
56/// # Examples
57///
58/// ```rust
59/// use veecle_telemetry::collector::Export;
60/// use veecle_telemetry::protocol::InstanceMessage;
61///
62/// #[derive(Debug)]
63/// struct CustomExporter;
64///
65/// impl Export for CustomExporter {
66/// fn export(&self, message: InstanceMessage<'_>) {
67/// // Custom export logic here
68/// println!("Exporting: {:?}", message);
69/// }
70/// }
71/// ```
72pub trait Export: Debug {
73 /// Exports a telemetry message.
74 ///
75 /// This method is called for each telemetry message that needs to be exported.
76 /// The implementation should handle the message appropriately based on its type.
77 fn export(&self, message: InstanceMessage<'_>);
78}
79
80/// The global telemetry collector.
81///
82/// This structure manages the collection and export of telemetry data.
83/// It maintains a unique execution ID, handles trace ID generation, and coordinates with the
84/// configured exporter.
85///
86/// The collector is typically accessed through the [`get_collector`] function rather
87/// than being constructed directly.
88#[derive(Debug)]
89pub struct Collector {
90 #[cfg(feature = "enable")]
91 inner: CollectorInner,
92}
93
94#[cfg(feature = "enable")]
95#[derive(Debug)]
96struct CollectorInner {
97 process_id: ProcessId,
98
99 exporter: &'static (dyn Export + Sync),
100}
101
102#[cfg(feature = "enable")]
103#[derive(Debug)]
104struct NopExporter;
105
106#[cfg(feature = "enable")]
107impl Export for NopExporter {
108 fn export(&self, _: InstanceMessage) {}
109}
110
111// The GLOBAL_COLLECTOR static holds a pointer to the global exporter. It is protected by
112// the GLOBAL_INIT static which determines whether GLOBAL_EXPORTER has been initialized.
113#[cfg(feature = "enable")]
114static mut GLOBAL_COLLECTOR: Collector = Collector {
115 inner: CollectorInner {
116 process_id: ProcessId::from_raw(0),
117 exporter: &NO_EXPORTER,
118 },
119};
120static NO_COLLECTOR: Collector = Collector {
121 #[cfg(feature = "enable")]
122 inner: CollectorInner {
123 process_id: ProcessId::from_raw(0),
124 exporter: &NO_EXPORTER,
125 },
126};
127#[cfg(feature = "enable")]
128static NO_EXPORTER: NopExporter = NopExporter;
129
130#[cfg(feature = "enable")]
131static GLOBAL_INIT: AtomicUsize = AtomicUsize::new(0);
132
133// There are three different states that we care about:
134// - the collector is uninitialized
135// - the collector is initializing (set_exporter has been called but GLOBAL_COLLECTOR hasn't been set yet)
136// - the collector is active
137#[cfg(feature = "enable")]
138const UNINITIALIZED: usize = 0;
139#[cfg(feature = "enable")]
140const INITIALIZING: usize = 1;
141#[cfg(feature = "enable")]
142const INITIALIZED: usize = 2;
143
144/// Initializes the collector with the given Exporter and [`ProcessId`].
145///
146/// A [`ProcessId`] should never be re-used as it's used to collect metadata about the execution and to generate
147/// [`SpanContext`]s which need to be globally unique.
148///
149/// [`SpanContext`]: crate::SpanContext
150#[cfg(feature = "enable")]
151pub fn set_exporter(
152 process_id: ProcessId,
153 exporter: &'static (dyn Export + Sync),
154) -> Result<(), SetExporterError> {
155 if GLOBAL_INIT
156 .compare_exchange(
157 UNINITIALIZED,
158 INITIALIZING,
159 Ordering::Acquire,
160 Ordering::Relaxed,
161 )
162 .is_ok()
163 {
164 // SAFETY: this is guarded by the atomic
165 unsafe { GLOBAL_COLLECTOR = Collector::new(process_id, exporter) }
166 GLOBAL_INIT.store(INITIALIZED, Ordering::Release);
167
168 Ok(())
169 } else {
170 Err(SetExporterError(()))
171 }
172}
173
174/// Returns a reference to the collector.
175///
176/// If an exporter has not been set, a no-op implementation is returned.
177pub fn get_collector() -> &'static Collector {
178 #[cfg(not(feature = "enable"))]
179 {
180 &NO_COLLECTOR
181 }
182
183 // Acquire memory ordering guarantees that current thread would see any
184 // memory writes that happened before store of the value
185 // into `GLOBAL_INIT` with memory ordering `Release` or stronger.
186 //
187 // Since the value `INITIALIZED` is written only after `GLOBAL_COLLECTOR` was
188 // initialized, observing it after `Acquire` load here makes both
189 // write to the `GLOBAL_COLLECTOR` static and initialization of the exporter
190 // internal state synchronized with current thread.
191 #[cfg(feature = "enable")]
192 if GLOBAL_INIT.load(Ordering::Acquire) != INITIALIZED {
193 &NO_COLLECTOR
194 } else {
195 // SAFETY: this is guarded by the atomic
196 unsafe {
197 #[expect(clippy::deref_addrof, reason = "false positive")]
198 &*&raw const GLOBAL_COLLECTOR
199 }
200 }
201}
202
203/// The type returned by [`set_exporter`] if [`set_exporter`] has already been called.
204///
205/// [`set_exporter`]: fn.set_exporter.html
206#[derive(Debug)]
207pub struct SetExporterError(());
208
209impl SetExporterError {
210 const MESSAGE: &'static str = "a global exporter has already been set";
211}
212
213impl fmt::Display for SetExporterError {
214 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
215 fmt.write_str(Self::MESSAGE)
216 }
217}
218
219impl error::Error for SetExporterError {}
220
221#[cfg(feature = "enable")]
222impl Collector {
223 fn new(process_id: ProcessId, exporter: &'static (dyn Export + Sync)) -> Self {
224 Self {
225 inner: CollectorInner {
226 process_id,
227 exporter,
228 },
229 }
230 }
231
232 #[inline]
233 pub(crate) fn process_id(&self) -> ProcessId {
234 self.inner.process_id
235 }
236
237 /// Collects and exports an external telemetry message.
238 ///
239 /// This method allows external systems to inject telemetry messages into the
240 /// collector pipeline.
241 /// The message will be exported using the configured exporter.
242 ///
243 /// # Examples
244 ///
245 /// ```rust
246 /// use core::num::NonZeroU64;
247 /// use veecle_telemetry::collector::get_collector;
248 /// use veecle_telemetry::protocol::{
249 /// ThreadId,
250 /// ProcessId,
251 /// InstanceMessage,
252 /// TelemetryMessage,
253 /// TimeSyncMessage,
254 /// };
255 ///
256 /// let collector = get_collector();
257 /// let message = InstanceMessage {
258 /// thread: ThreadId::from_raw(ProcessId::from_raw(1), NonZeroU64::new(1).unwrap()),
259 /// message: TelemetryMessage::TimeSync(TimeSyncMessage {
260 /// local_timestamp: 0,
261 /// since_epoch: 0,
262 /// }),
263 /// };
264 /// collector.collect_external(message);
265 /// ```
266 #[inline]
267 pub fn collect_external(&self, message: InstanceMessage<'_>) {
268 self.inner.exporter.export(message);
269 }
270
271 #[inline]
272 pub(crate) fn new_span(&self, span: SpanCreateMessage<'_>) {
273 self.tracing_message(TracingMessage::CreateSpan(span));
274 }
275
276 #[inline]
277 pub(crate) fn enter_span(&self, enter: SpanEnterMessage) {
278 self.tracing_message(TracingMessage::EnterSpan(enter));
279 }
280
281 #[inline]
282 pub(crate) fn exit_span(&self, exit: SpanExitMessage) {
283 self.tracing_message(TracingMessage::ExitSpan(exit));
284 }
285
286 #[inline]
287 pub(crate) fn close_span(&self, span: SpanCloseMessage) {
288 self.tracing_message(TracingMessage::CloseSpan(span));
289 }
290
291 #[inline]
292 pub(crate) fn span_event(&self, event: SpanAddEventMessage<'_>) {
293 self.tracing_message(TracingMessage::AddEvent(event));
294 }
295
296 #[inline]
297 pub(crate) fn span_link(&self, link: SpanAddLinkMessage) {
298 self.tracing_message(TracingMessage::AddLink(link));
299 }
300
301 #[inline]
302 pub(crate) fn span_attribute(&self, attribute: SpanSetAttributeMessage<'_>) {
303 self.tracing_message(TracingMessage::SetAttribute(attribute));
304 }
305
306 #[inline]
307 pub(crate) fn log_message(&self, log: LogMessage<'_>) {
308 self.inner.exporter.export(InstanceMessage {
309 thread: ThreadId::current(self.inner.process_id),
310 message: TelemetryMessage::Log(log),
311 });
312 }
313
314 #[inline]
315 fn tracing_message(&self, message: TracingMessage<'_>) {
316 self.inner.exporter.export(InstanceMessage {
317 thread: ThreadId::current(self.inner.process_id),
318 message: TelemetryMessage::Tracing(message),
319 });
320 }
321}