veecle_telemetry/
protocol.rs

1//! Telemetry protocol types and message definitions.
2//!
3//! This module defines the core data structures used for telemetry message exchange.
4//! It includes message types for logging, tracing, and time synchronization, as well
5//! as supporting types for execution tracking and attribute management.
6//!
7//! # Message Types
8//!
9//! The protocol supports several categories of telemetry messages:
10//!
11//! - **Log Messages** - Structured logging with severity levels and attributes
12//! - **Tracing Messages** - Distributed tracing with spans, events, and links
13//! - **Time Sync Messages** - Time synchronization between systems
14//!
15//! # Thread Tracking
16//!
17//! Each message is associated with an [`ThreadId`] that uniquely identifies
18//! the thread it came from (globally unique across all processes).
19//! This allows telemetry data from multiple threads to be correlated and analyzed separately.
20//!
21//! # Serialization
22//!
23//! All protocol types implement [`serde::Serialize`] and optionally [`serde::Deserialize`]
24//! (when the `alloc` feature is enabled) for easy serialization to various formats.
25
26#[cfg(feature = "alloc")]
27use alloc::vec::Vec;
28use core::fmt;
29use core::num::NonZeroU64;
30use core::str::FromStr;
31
32use serde::{Deserialize, Serialize};
33
34use crate::SpanContext;
35pub use crate::id::{ProcessId, SpanId};
36#[cfg(feature = "alloc")]
37use crate::to_static::ToStatic;
38use crate::types::{ListType, StringType, list_from_slice};
39use crate::value::KeyValue;
40
41/// A specialised form of [`list_from_slice`] for attributes.
42pub fn attribute_list_from_slice<'a>(slice: &'a [KeyValue<'a>]) -> AttributeListType<'a> {
43    list_from_slice::<KeyValue<'a>>(slice)
44}
45
46/// Type alias for a list of attributes.
47pub type AttributeListType<'a> = ListType<'a, KeyValue<'a>>;
48
49#[cfg(feature = "alloc")]
50impl ToStatic for AttributeListType<'_> {
51    type Static = AttributeListType<'static>;
52
53    fn to_static(&self) -> Self::Static {
54        self.iter()
55            .map(|item| item.to_static())
56            .collect::<Vec<_>>()
57            .into()
58    }
59}
60
61/// A globally-unique id identifying a thread within a specific process.
62///
63/// The primary purpose of this id is to allow the consumer of telemetry messages to associate
64/// spans with the callstack they came from to reconstruct parent-child relationships. On a normal
65/// operating system this is the thread, on other systems it should be whatever is the closest
66/// equivalent, e.g. for FreeRTOS it would be a task. On a single threaded bare-metal system it
67/// would be a constant as there is only the one callstack.
68#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
69pub struct ThreadId {
70    /// The globally-unique id for the process this thread is within.
71    pub process: ProcessId,
72
73    /// The process-unique id for this thread within the process.
74    raw: NonZeroU64,
75}
76
77impl ThreadId {
78    /// Creates a [`ThreadId`] from a raw value.
79    ///
80    /// Extra care needs to be taken that this is not a constant value or re-used within this
81    /// process in any way.
82    pub const fn from_raw(process: ProcessId, raw: NonZeroU64) -> Self {
83        Self { process, raw }
84    }
85
86    /// Creates a [`ThreadId`] for the current thread, using OS specific means to acquire it.
87    #[cfg(feature = "enable")]
88    pub(crate) fn current(process: ProcessId) -> Self {
89        #[cfg_attr(not(feature = "std"), expect(unreachable_code))]
90        Self::from_raw(process, {
91            #[cfg(feature = "std")]
92            {
93                use veecle_osal_std::thread::{Thread, ThreadAbstraction};
94                Thread::current_thread_id()
95            }
96
97            #[cfg(not(feature = "std"))]
98            {
99                panic!("not yet supported")
100            }
101        })
102    }
103}
104
105impl fmt::Display for ThreadId {
106    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
107        let Self { process, raw } = self;
108        write!(f, "{process}:{raw:032x}")
109    }
110}
111
112/// Errors that can occur while parsing [`ThreadId`] from a string.
113#[derive(Clone, Debug)]
114pub enum ParseThreadIdError {
115    /// The string is missing a `:` separator.
116    MissingSeparator,
117
118    /// The embedded [`ProcessId`] failed to parse.
119    InvalidProcessId(core::num::ParseIntError),
120
121    /// The embedded [`ThreadId`] failed to parse.
122    InvalidThreadId(core::num::ParseIntError),
123
124    /// The embedded [`ThreadId`] had a zero value.
125    ZeroThreadId,
126}
127
128impl fmt::Display for ParseThreadIdError {
129    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130        match self {
131            Self::MissingSeparator => f.write_str("missing ':' separator"),
132            Self::InvalidProcessId(_) => f.write_str("failed to parse process id"),
133            Self::InvalidThreadId(_) => f.write_str("failed to parse thread id"),
134            Self::ZeroThreadId => f.write_str("zero thread id"),
135        }
136    }
137}
138
139impl core::error::Error for ParseThreadIdError {
140    fn source(&self) -> Option<&(dyn core::error::Error + 'static)> {
141        match self {
142            Self::MissingSeparator => None,
143            Self::InvalidProcessId(error) => Some(error),
144            Self::InvalidThreadId(error) => Some(error),
145            Self::ZeroThreadId => None,
146        }
147    }
148}
149
150impl FromStr for ThreadId {
151    type Err = ParseThreadIdError;
152
153    fn from_str(s: &str) -> Result<Self, Self::Err> {
154        let Some((process, thread)) = s.split_once(":") else {
155            return Err(ParseThreadIdError::MissingSeparator);
156        };
157        let process = ProcessId::from_str(process).map_err(ParseThreadIdError::InvalidProcessId)?;
158        let thread = NonZeroU64::new(
159            u64::from_str_radix(thread, 16).map_err(ParseThreadIdError::InvalidThreadId)?,
160        )
161        .ok_or(ParseThreadIdError::ZeroThreadId)?;
162        Ok(Self::from_raw(process, thread))
163    }
164}
165
166impl serde::Serialize for ThreadId {
167    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
168    where
169        S: serde::Serializer,
170    {
171        let mut bytes = [0u8; 49];
172
173        hex::encode_to_slice(self.process.to_raw().to_le_bytes(), &mut bytes[..32]).unwrap();
174        bytes[32] = b':';
175        hex::encode_to_slice(self.raw.get().to_le_bytes(), &mut bytes[33..]).unwrap();
176
177        serializer.serialize_str(str::from_utf8(&bytes).unwrap())
178    }
179}
180
181impl<'de> serde::Deserialize<'de> for ThreadId {
182    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
183    where
184        D: serde::Deserializer<'de>,
185    {
186        use serde::de::Error;
187
188        let string = <&str>::deserialize(deserializer)?;
189
190        if string.len() != 49 {
191            return Err(D::Error::invalid_length(
192                string.len(),
193                &"expected 49 byte string",
194            ));
195        }
196
197        let bytes = string.as_bytes();
198
199        if bytes[32] != b':' {
200            return Err(D::Error::invalid_value(
201                serde::de::Unexpected::Str(string),
202                &"expected : separator at byte 32",
203            ));
204        }
205
206        let mut process = [0; 16];
207        hex::decode_to_slice(&bytes[..32], &mut process).map_err(D::Error::custom)?;
208        let process = ProcessId::from_raw(u128::from_le_bytes(process));
209
210        let mut thread = [0; 8];
211        hex::decode_to_slice(&bytes[33..], &mut thread).map_err(D::Error::custom)?;
212        let thread = NonZeroU64::new(u64::from_le_bytes(thread))
213            .ok_or_else(|| D::Error::custom("zero thread id"))?;
214
215        Ok(Self::from_raw(process, thread))
216    }
217}
218
219/// A telemetry message associated with a specific execution thread.
220///
221/// This structure wraps a telemetry message with its execution context,
222/// allowing messages from different executions to be properly correlated.
223#[derive(Clone, Debug, Serialize)]
224#[cfg_attr(feature = "alloc", derive(Deserialize))]
225pub struct InstanceMessage<'a> {
226    /// The thread this message belongs to
227    pub thread: ThreadId,
228
229    /// The telemetry message content
230    #[serde(borrow)]
231    pub message: TelemetryMessage<'a>,
232}
233
234#[cfg(feature = "alloc")]
235impl ToStatic for InstanceMessage<'_> {
236    type Static = InstanceMessage<'static>;
237
238    fn to_static(&self) -> Self::Static {
239        InstanceMessage {
240            thread: self.thread,
241            message: self.message.to_static(),
242        }
243    }
244}
245
246/// An enumeration of all possible telemetry message types.
247///
248/// This enum represents the different categories of telemetry data that can be
249/// collected and exported by the system.
250#[derive(Clone, Debug, Serialize)]
251#[cfg_attr(feature = "alloc", derive(Deserialize))]
252pub enum TelemetryMessage<'a> {
253    /// A structured log message with severity and attributes
254    Log(#[serde(borrow)] LogMessage<'a>),
255    /// A time synchronization message for clock coordination
256    TimeSync(TimeSyncMessage),
257    /// A distributed tracing message (spans, events, links)
258    Tracing(#[serde(borrow)] TracingMessage<'a>),
259}
260
261#[cfg(feature = "alloc")]
262impl ToStatic for TelemetryMessage<'_> {
263    type Static = TelemetryMessage<'static>;
264
265    fn to_static(&self) -> Self::Static {
266        match self {
267            TelemetryMessage::Log(msg) => TelemetryMessage::Log(msg.to_static()),
268            TelemetryMessage::TimeSync(msg) => TelemetryMessage::TimeSync(msg.clone()),
269            TelemetryMessage::Tracing(msg) => TelemetryMessage::Tracing(msg.to_static()),
270        }
271    }
272}
273
274/// Log message severity levels.
275///
276/// These levels follow standard logging conventions, ordered from most verbose
277/// to most critical.
278#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
279pub enum Severity {
280    /// The "trace" level.
281    ///
282    /// Designates very low priority, often extremely verbose, information.
283    Trace,
284    /// The "debug" level.
285    ///
286    /// Designates lower priority information.
287    Debug,
288    /// The "info" level.
289    ///
290    /// Designates useful information.
291    Info,
292    /// The "warn" level.
293    ///
294    /// Designates hazardous situations.
295    Warn,
296    /// The "error" level.
297    ///
298    /// Designates very serious errors.
299    Error,
300    /// The "fatal" level.
301    ///
302    /// Designates critical failures that might crash the program.
303    Fatal,
304}
305
306/// A structured log message with severity, timestamp, and attributes.
307///
308/// Log messages can be optionally correlated with traces by including trace and span IDs when available.
309#[derive(Clone, Debug, Serialize)]
310#[cfg_attr(feature = "alloc", derive(Deserialize))]
311pub struct LogMessage<'a> {
312    /// Timestamp in nanoseconds since Unix epoch (or system start)
313    pub time_unix_nano: u64,
314    /// The severity level of this log message
315    pub severity: Severity,
316
317    /// The message body
318    #[serde(borrow)]
319    pub body: StringType<'a>,
320
321    /// Key-value attributes providing additional context
322    #[serde(borrow)]
323    pub attributes: AttributeListType<'a>,
324    /// Optional span id for correlation with traces
325    pub span_id: Option<SpanId>,
326}
327
328#[cfg(feature = "alloc")]
329impl ToStatic for LogMessage<'_> {
330    type Static = LogMessage<'static>;
331
332    fn to_static(&self) -> Self::Static {
333        LogMessage {
334            time_unix_nano: self.time_unix_nano,
335            severity: self.severity,
336            body: self.body.to_static(),
337            attributes: self.attributes.to_static(),
338            span_id: self.span_id,
339        }
340    }
341}
342
343/// A time synchronization message for coordinating clocks between systems.
344///
345/// This message contains both local time and absolute time information,
346/// allowing downstream systems to correlate local timestamps with real-world time.
347#[derive(Clone, Debug, Serialize, Deserialize)]
348pub struct TimeSyncMessage {
349    /// Local timestamp in system-specific units
350    pub local_timestamp: u64,
351    /// Time since Unix epoch in nanoseconds
352    pub since_epoch: u64,
353}
354
355/// Messages related to distributed tracing operations.
356///
357/// This enum encompasses all the different types of tracing messages that can be
358/// generated during span lifecycle management and tracing operations.
359#[derive(Clone, Debug, Serialize)]
360#[cfg_attr(feature = "alloc", derive(Deserialize))]
361pub enum TracingMessage<'a> {
362    /// A new span has been created
363    CreateSpan(#[serde(borrow)] SpanCreateMessage<'a>),
364    /// A span has been entered (made current)
365    EnterSpan(SpanEnterMessage),
366    /// A span has been exited (no longer current)
367    ExitSpan(SpanExitMessage),
368    /// A span has been closed (completed)
369    CloseSpan(SpanCloseMessage),
370    /// An event has been added to a span
371    AddEvent(#[serde(borrow)] SpanAddEventMessage<'a>),
372    /// A link has been added to a span
373    AddLink(SpanAddLinkMessage),
374    /// An attribute has been set on a span
375    SetAttribute(#[serde(borrow)] SpanSetAttributeMessage<'a>),
376}
377
378#[cfg(feature = "alloc")]
379impl ToStatic for TracingMessage<'_> {
380    type Static = TracingMessage<'static>;
381
382    fn to_static(&self) -> Self::Static {
383        match self {
384            TracingMessage::CreateSpan(msg) => TracingMessage::CreateSpan(msg.to_static()),
385            TracingMessage::EnterSpan(msg) => TracingMessage::EnterSpan(*msg),
386            TracingMessage::ExitSpan(msg) => TracingMessage::ExitSpan(*msg),
387            TracingMessage::CloseSpan(msg) => TracingMessage::CloseSpan(*msg),
388            TracingMessage::AddEvent(msg) => TracingMessage::AddEvent(msg.to_static()),
389            TracingMessage::AddLink(msg) => TracingMessage::AddLink(*msg),
390            TracingMessage::SetAttribute(msg) => TracingMessage::SetAttribute(msg.to_static()),
391        }
392    }
393}
394
395/// Message indicating the creation of a new span.
396///
397/// This message provides all the information needed to create a new span
398/// in the trace, including its identity, timing, and initial attributes.
399#[derive(Clone, Debug, Serialize)]
400#[cfg_attr(feature = "alloc", derive(Deserialize))]
401pub struct SpanCreateMessage<'a> {
402    /// The unique identifier (within the associated process) for this span.
403    pub span_id: SpanId,
404    /// The parent span id, if this is a child span
405    pub parent_span_id: Option<SpanId>,
406
407    /// The name of the span
408    #[serde(borrow)]
409    pub name: StringType<'a>,
410
411    /// Timestamp when the span was started
412    pub start_time_unix_nano: u64,
413
414    /// Initial attributes attached to the span
415    #[serde(borrow)]
416    pub attributes: AttributeListType<'a>,
417}
418
419#[cfg(feature = "alloc")]
420impl ToStatic for SpanCreateMessage<'_> {
421    type Static = SpanCreateMessage<'static>;
422
423    fn to_static(&self) -> Self::Static {
424        SpanCreateMessage {
425            span_id: self.span_id,
426            parent_span_id: self.parent_span_id,
427            name: self.name.to_static(),
428            start_time_unix_nano: self.start_time_unix_nano,
429            attributes: self.attributes.to_static(),
430        }
431    }
432}
433
434/// Message indicating a span has been entered.
435#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
436pub struct SpanEnterMessage {
437    /// The span being entered
438    pub span_id: SpanId,
439
440    /// Timestamp when the span was entered
441    pub time_unix_nano: u64,
442}
443
444/// Message indicating a span has been exited.
445#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
446pub struct SpanExitMessage {
447    /// The span being exited
448    pub span_id: SpanId,
449
450    /// Timestamp when the span was exited
451    pub time_unix_nano: u64,
452}
453
454/// Message indicating a span has been closed (completed).
455#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
456pub struct SpanCloseMessage {
457    /// The span being closed
458    pub span_id: SpanId,
459
460    /// Timestamp when the span was closed
461    pub end_time_unix_nano: u64,
462}
463
464/// Message indicating an attribute has been set on a span.
465#[derive(Clone, Debug, Serialize, Deserialize)]
466pub struct SpanSetAttributeMessage<'a> {
467    /// The span the attribute is being set on
468    pub span_id: SpanId,
469
470    /// The attribute being set
471    #[serde(borrow)]
472    pub attribute: KeyValue<'a>,
473}
474
475#[cfg(feature = "alloc")]
476impl ToStatic for SpanSetAttributeMessage<'_> {
477    type Static = SpanSetAttributeMessage<'static>;
478
479    fn to_static(&self) -> Self::Static {
480        SpanSetAttributeMessage {
481            span_id: self.span_id,
482            attribute: self.attribute.to_static(),
483        }
484    }
485}
486
487/// Message indicating an event has been added to a span.
488#[derive(Clone, Debug, Serialize)]
489#[cfg_attr(feature = "alloc", derive(Deserialize))]
490pub struct SpanAddEventMessage<'a> {
491    /// The span the event is being added to
492    pub span_id: SpanId,
493
494    /// The name of the event
495    #[serde(borrow)]
496    pub name: StringType<'a>,
497    /// Timestamp when the event occurred
498    pub time_unix_nano: u64,
499
500    /// Attributes providing additional context for the event
501    #[serde(borrow)]
502    pub attributes: AttributeListType<'a>,
503}
504
505#[cfg(feature = "alloc")]
506impl ToStatic for SpanAddEventMessage<'_> {
507    type Static = SpanAddEventMessage<'static>;
508
509    fn to_static(&self) -> Self::Static {
510        SpanAddEventMessage {
511            span_id: self.span_id,
512            name: self.name.to_static(),
513            time_unix_nano: self.time_unix_nano,
514            attributes: self.attributes.to_static(),
515        }
516    }
517}
518
519/// Message indicating a link has been added to a span.
520///
521/// Links connect spans across different traces, representing relationships
522/// that are not parent-child hierarchies.
523#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
524pub struct SpanAddLinkMessage {
525    /// The span the link is being added to
526    pub span_id: SpanId,
527
528    /// The span context being linked to
529    pub link: SpanContext,
530}
531
532#[cfg(test)]
533#[cfg_attr(coverage_nightly, coverage(off))]
534mod tests {
535    use alloc::format;
536    #[cfg(feature = "alloc")]
537    use alloc::string::String;
538
539    use super::*;
540
541    #[test]
542    fn thread_id_format_from_str_roundtrip() {
543        let test_cases = [
544            ThreadId::from_raw(ProcessId::from_raw(0), NonZeroU64::new(1).unwrap()),
545            ThreadId::from_raw(
546                ProcessId::from_raw(0x123456789ABCDEF0FEDCBA9876543210),
547                NonZeroU64::new(0xFEDCBA9876543210).unwrap(),
548            ),
549            ThreadId::from_raw(
550                ProcessId::from_raw(u128::MAX),
551                NonZeroU64::new(u64::MAX).unwrap(),
552            ),
553            ThreadId::from_raw(ProcessId::from_raw(1), NonZeroU64::new(1).unwrap()),
554        ];
555
556        for thread_id in test_cases {
557            let formatted = format!("{thread_id}");
558            let parsed = formatted.parse::<ThreadId>().unwrap();
559            assert_eq!(
560                thread_id,
561                parsed,
562                "Failed roundtrip for {:#x}:{:#x}",
563                thread_id.process.to_raw(),
564                thread_id.raw,
565            );
566        }
567    }
568
569    #[test]
570    fn thread_id_serde_roundtrip() {
571        let test_cases = [
572            ThreadId::from_raw(ProcessId::from_raw(0), NonZeroU64::new(1).unwrap()),
573            ThreadId::from_raw(
574                ProcessId::from_raw(0x123456789ABCDEF0FEDCBA9876543210),
575                NonZeroU64::new(0xFEDCBA9876543210).unwrap(),
576            ),
577            ThreadId::from_raw(
578                ProcessId::from_raw(u128::MAX),
579                NonZeroU64::new(u64::MAX).unwrap(),
580            ),
581            ThreadId::from_raw(ProcessId::from_raw(1), NonZeroU64::new(1).unwrap()),
582        ];
583
584        for original in test_cases {
585            let json = serde_json::to_string(&original).unwrap();
586            let deserialized: ThreadId = serde_json::from_str(&json).unwrap();
587            assert_eq!(original, deserialized);
588        }
589    }
590
591    #[test]
592    fn string_type_conversions() {
593        let static_str: StringType<'static> = "static".into();
594
595        let _event = SpanAddEventMessage {
596            span_id: SpanId(0),
597            name: static_str,
598            time_unix_nano: 0,
599            attributes: attribute_list_from_slice(&[]),
600        };
601
602        let borrowed_str: StringType = "borrowed".into();
603
604        let _event = SpanAddEventMessage {
605            span_id: SpanId(0),
606            name: borrowed_str,
607            time_unix_nano: 0,
608            attributes: attribute_list_from_slice(&[]),
609        };
610    }
611
612    #[cfg(any(feature = "std", feature = "alloc"))]
613    #[test]
614    fn string_type_with_owned_strings() {
615        let string = String::from("owned");
616        let owned: StringType<'static> = StringType::from(string);
617
618        let _event = SpanAddEventMessage {
619            span_id: SpanId(0),
620            name: owned,
621            time_unix_nano: 0,
622            attributes: attribute_list_from_slice(&[]),
623        };
624    }
625
626    #[cfg(feature = "alloc")]
627    #[test]
628    fn to_static_conversion() {
629        use alloc::string::String;
630
631        use crate::value::Value;
632
633        // Create some data with non-static lifetime
634        let borrowed_name_str = "test_span";
635        let borrowed_name: StringType = borrowed_name_str.into();
636
637        let owned_key = String::from("test_key");
638        let owned_value = String::from("test_value");
639        let attribute = KeyValue {
640            key: owned_key.as_str().into(),
641            value: Value::String(owned_value.as_str().into()),
642        };
643
644        let attributes = [attribute];
645        let span_event = SpanAddEventMessage {
646            span_id: SpanId(0),
647            name: borrowed_name,
648            time_unix_nano: 0,
649            attributes: attribute_list_from_slice(&attributes),
650        };
651
652        let tracing_message = TracingMessage::AddEvent(span_event);
653        let telemetry_message = TelemetryMessage::Tracing(tracing_message);
654        let instance_message = InstanceMessage {
655            thread: ThreadId::from_raw(ProcessId::from_raw(999), NonZeroU64::new(111).unwrap()),
656            message: telemetry_message,
657        };
658
659        let static_message: InstanceMessage<'static> = instance_message.to_static();
660
661        // Verify the conversion worked - the static message should have the same data
662        if let TelemetryMessage::Tracing(TracingMessage::AddEvent(span_event)) =
663            &static_message.message
664        {
665            assert_eq!(span_event.name.as_ref(), "test_span");
666        } else {
667            panic!("Expected CreateSpan message");
668        }
669    }
670}