Skip to content

Commit 90c0992

Browse files
Add span decoder for bytes slices
1 parent 787d15e commit 90c0992

File tree

7 files changed

+343
-30
lines changed

7 files changed

+343
-30
lines changed

tinybytes/src/bytes_string.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,21 @@ impl BytesString {
8282
}
8383
}
8484

85+
/// Creates a `BytesString` from a string slice within the given buffer.
86+
///
87+
/// # Arguments
88+
///
89+
/// * `bytes` - A `tinybytes::Bytes` instance that will be converted into a `BytesString`.
90+
/// * `slice` - The string slice pointing into the given bytes that will form the `BytesString`.
91+
pub fn try_from_bytes_slice(bytes: &Bytes, slice: &str) -> Option<Self> {
92+
// SAFETY: This is safe as a str slice is definitely a valid UTF-8 slice.
93+
unsafe {
94+
Some(Self::from_bytes_unchecked(
95+
bytes.slice_ref(slice.as_bytes())?,
96+
))
97+
}
98+
}
99+
85100
/// Creates a `BytesString` from a `tinybytes::Bytes` instance without validating the bytes.
86101
///
87102
/// This function does not perform any validation on the provided bytes, and assumes that the

trace-utils/src/msgpack_decoder/v04/decoder/mod.rs

Lines changed: 125 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,11 @@ mod span_link;
66

77
use self::span::decode_span;
88
use super::error::DecodeError;
9-
use super::number::read_number_bytes;
10-
use crate::span_v04::Span;
9+
use super::number::{read_nullable_number_ref, read_number_bytes, read_number_ref};
10+
use crate::span_v04::{Span, SpanSlice};
1111
use rmp::decode::DecodeStringError;
1212
use rmp::{decode, decode::RmpRead, Marker};
13+
use span::decode_span_ref;
1314
use std::{collections::HashMap, f64};
1415
use tinybytes::{Bytes, BytesString};
1516

@@ -58,11 +59,26 @@ const NULL_MARKER: &u8 = &0xc0;
5859
/// let decoded_span = &decoded_traces[0][0];
5960
/// assert_eq!("test-span", decoded_span.name.as_str());
6061
/// ```
61-
pub fn from_slice(mut data: tinybytes::Bytes) -> Result<(Vec<Vec<Span>>, usize), DecodeError> {
62-
let trace_count =
63-
rmp::decode::read_array_len(unsafe { data.as_mut_slice() }).map_err(|_| {
64-
DecodeError::InvalidFormat("Unable to read array len for trace count".to_owned())
65-
})?;
62+
pub fn from_slice(data: tinybytes::Bytes) -> Result<(Vec<Vec<Span>>, usize), DecodeError> {
63+
let mut parsed_data = data.clone();
64+
let (traces_ref, size) = from_slice_ref(unsafe { parsed_data.as_mut_slice() })?;
65+
let traces_owned = traces_ref
66+
.iter()
67+
.map(|trace| {
68+
trace
69+
.iter()
70+
// Safe to unwrap since the spans uses subslices of the `data` slice
71+
.map(|span| span.try_to_bytes(&data).unwrap())
72+
.collect()
73+
})
74+
.collect();
75+
Ok((traces_owned, size))
76+
}
77+
78+
pub fn from_slice_ref(mut data: &[u8]) -> Result<(Vec<Vec<SpanSlice>>, usize), DecodeError> {
79+
let trace_count = rmp::decode::read_array_len(&mut data).map_err(|_| {
80+
DecodeError::InvalidFormat("Unable to read array len for trace count".to_owned())
81+
})?;
6682

6783
let start_len = data.len();
6884

@@ -74,12 +90,9 @@ pub fn from_slice(mut data: tinybytes::Bytes) -> Result<(Vec<Vec<Span>>, usize),
7490
.expect("Unable to cast trace_count to usize"),
7591
),
7692
|mut traces, _| {
77-
let span_count = rmp::decode::read_array_len(unsafe { data.as_mut_slice() })
78-
.map_err(|_| {
79-
DecodeError::InvalidFormat(
80-
"Unable to read array len for span count".to_owned(),
81-
)
82-
})?;
93+
let span_count = rmp::decode::read_array_len(&mut data).map_err(|_| {
94+
DecodeError::InvalidFormat("Unable to read array len for span count".to_owned())
95+
})?;
8396

8497
let trace = (0..span_count).try_fold(
8598
Vec::with_capacity(
@@ -88,7 +101,7 @@ pub fn from_slice(mut data: tinybytes::Bytes) -> Result<(Vec<Vec<Span>>, usize),
88101
.expect("Unable to cast span_count to usize"),
89102
),
90103
|mut trace, _| {
91-
let span = decode_span(&mut data)?;
104+
let span = decode_span_ref(&mut data)?;
92105
trace.push(span);
93106
Ok(trace)
94107
},
@@ -143,6 +156,15 @@ fn read_nullable_string_bytes(buf: &mut Bytes) -> Result<BytesString, DecodeErro
143156
}
144157
}
145158

159+
#[inline]
160+
fn read_nullable_string_ref<'a>(buf: &mut &'a [u8]) -> Result<&'a str, DecodeError> {
161+
if is_null_marker(buf) {
162+
Ok("")
163+
} else {
164+
read_string_ref(buf)
165+
}
166+
}
167+
146168
#[inline]
147169
// Safety: read_string_ref checks utf8 validity, so we don't do it again when creating the
148170
// BytesStrings.
@@ -172,6 +194,35 @@ fn read_nullable_str_map_to_bytes_strings(
172194
read_str_map_to_bytes_strings(buf)
173195
}
174196

197+
#[inline]
198+
// Safety: read_string_ref checks utf8 validity, so we don't do it again when creating the
199+
// BytesStrings.
200+
fn read_str_map_to_str_ref<'a>(
201+
buf: &mut &'a [u8],
202+
) -> Result<HashMap<&'a str, &'a str>, DecodeError> {
203+
let len = decode::read_map_len(buf)
204+
.map_err(|_| DecodeError::InvalidFormat("Unable to get map len for str map".to_owned()))?;
205+
206+
let mut map = HashMap::with_capacity(len.try_into().expect("Unable to cast map len to usize"));
207+
for _ in 0..len {
208+
let key = read_string_ref(buf)?;
209+
let value = read_string_ref(buf)?;
210+
map.insert(key, value);
211+
}
212+
Ok(map)
213+
}
214+
215+
#[inline]
216+
fn read_nullable_str_map_to_str_ref<'a>(
217+
buf: &mut &'a [u8],
218+
) -> Result<HashMap<&'a str, &'a str>, DecodeError> {
219+
if is_null_marker(buf) {
220+
return Ok(HashMap::default());
221+
}
222+
223+
read_str_map_to_str_ref(buf)
224+
}
225+
175226
#[inline]
176227
fn read_metric_pair(buf: &mut Bytes) -> Result<(BytesString, f64), DecodeError> {
177228
let key = read_string_bytes(buf)?;
@@ -215,6 +266,52 @@ fn read_meta_struct(buf: &mut Bytes) -> Result<HashMap<BytesString, Vec<u8>>, De
215266
read_map(len, buf, read_meta_struct_pair)
216267
}
217268

269+
#[inline]
270+
fn read_metric_pair_ref<'a>(buf: &mut &'a [u8]) -> Result<(&'a str, f64), DecodeError> {
271+
let key = read_string_ref(buf)?;
272+
let v = read_number_ref(buf)?;
273+
274+
Ok((key, v))
275+
}
276+
277+
#[inline]
278+
fn read_metrics_ref<'a>(buf: &mut &'a [u8]) -> Result<HashMap<&'a str, f64>, DecodeError> {
279+
if is_null_marker(buf) {
280+
return Ok(HashMap::default());
281+
}
282+
283+
let len = read_map_len(buf)?;
284+
285+
read_map(len, buf, read_metric_pair_ref)
286+
}
287+
288+
#[inline]
289+
fn read_meta_struct_ref<'a>(buf: &mut &'a [u8]) -> Result<HashMap<&'a str, Vec<u8>>, DecodeError> {
290+
if is_null_marker(buf) {
291+
return Ok(HashMap::default());
292+
}
293+
294+
fn read_meta_struct_pair_ref<'a>(
295+
buf: &mut &'a [u8],
296+
) -> Result<(&'a str, Vec<u8>), DecodeError> {
297+
let key = read_string_ref(buf)?;
298+
let array_len = decode::read_array_len(buf).map_err(|_| {
299+
DecodeError::InvalidFormat("Unable to read array len for meta_struct".to_owned())
300+
})?;
301+
302+
let mut v = Vec::with_capacity(array_len as usize);
303+
304+
for _ in 0..array_len {
305+
let value = read_number_ref(buf)?;
306+
v.push(value);
307+
}
308+
Ok((key, v))
309+
}
310+
311+
let len = read_map_len(buf)?;
312+
read_map(len, buf, read_meta_struct_pair_ref)
313+
}
314+
218315
/// Reads a map from the buffer and returns it as a `HashMap`.
219316
///
220317
/// This function is generic over the key and value types of the map, and it uses a provided
@@ -243,14 +340,10 @@ fn read_meta_struct(buf: &mut Bytes) -> Result<HashMap<BytesString, Vec<u8>>, De
243340
/// * `V` - The type of the values in the map.
244341
/// * `F` - The type of the function used to read key-value pairs from the buffer.
245342
#[inline]
246-
fn read_map<K, V, F>(
247-
len: usize,
248-
buf: &mut Bytes,
249-
read_pair: F,
250-
) -> Result<HashMap<K, V>, DecodeError>
343+
fn read_map<K, B, V, F>(len: usize, buf: &mut B, read_pair: F) -> Result<HashMap<K, V>, DecodeError>
251344
where
252345
K: std::hash::Hash + Eq,
253-
F: Fn(&mut Bytes) -> Result<(K, V), DecodeError>,
346+
F: Fn(&mut B) -> Result<(K, V), DecodeError>,
254347
{
255348
let mut map = HashMap::with_capacity(len);
256349
for _ in 0..len {
@@ -297,6 +390,18 @@ where
297390
}
298391
}
299392

393+
/// When you want to "peek" if the next value is a null marker, and only advance the buffer if it is
394+
/// null and return the default value. If it is not null, you can continue to decode as expected.
395+
#[inline]
396+
fn is_null_marker(buf: &mut &[u8]) -> bool {
397+
if buf.first() == Some(NULL_MARKER) {
398+
*buf = &buf[1..];
399+
true
400+
} else {
401+
false
402+
}
403+
}
404+
300405
#[cfg(test)]
301406
mod tests {
302407
use super::*;

trace-utils/src/msgpack_decoder/v04/decoder/span.rs

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,15 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
use super::{
5-
read_meta_struct, read_metrics, read_nullable_str_map_to_bytes_strings,
6-
read_nullable_string_bytes, read_string_ref, span_link::read_span_links,
5+
read_meta_struct, read_meta_struct_ref, read_metric_pair_ref, read_metrics, read_metrics_ref,
6+
read_nullable_number_ref, read_nullable_str_map_to_bytes_strings,
7+
read_nullable_str_map_to_str_ref, read_nullable_string_bytes, read_nullable_string_ref,
8+
read_number_ref, read_str_map_to_str_ref, read_string_ref,
9+
span_link::{read_span_links, read_span_links_ref},
710
};
811
use crate::msgpack_decoder::v04::error::DecodeError;
912
use crate::msgpack_decoder::v04::number::read_nullable_number_bytes;
10-
use crate::span_v04::{Span, SpanKey};
13+
use crate::span_v04::{Span, SpanKey, SpanSlice};
1114
use tinybytes::Bytes;
1215

1316
/// Decodes a slice of bytes into a `Span` object.
@@ -27,14 +30,24 @@ use tinybytes::Bytes;
2730
/// - The map length cannot be read.
2831
/// - Any key or value cannot be decoded.
2932
pub fn decode_span(buffer: &mut Bytes) -> Result<Span, DecodeError> {
30-
let mut span = Span::default();
33+
let span_ref = decode_span_ref(unsafe { buffer.as_mut_slice() });
34+
println!("Span ref {:?}", span_ref);
3135

32-
let span_size = rmp::decode::read_map_len(unsafe { buffer.as_mut_slice() }).map_err(|_| {
36+
span_ref
37+
.unwrap()
38+
.try_to_bytes(buffer)
39+
.ok_or(DecodeError::IOError)
40+
}
41+
42+
pub fn decode_span_ref<'a>(buffer: &mut &'a [u8]) -> Result<SpanSlice<'a>, DecodeError> {
43+
let mut span = SpanSlice::default();
44+
45+
let span_size = rmp::decode::read_map_len(buffer).map_err(|_| {
3346
DecodeError::InvalidFormat("Unable to get map len for span size".to_owned())
3447
})?;
3548

3649
for _ in 0..span_size {
37-
fill_span(&mut span, buffer)?;
50+
fill_span_ref(&mut span, buffer)?;
3851
}
3952

4053
Ok(span)
@@ -66,6 +79,32 @@ fn fill_span(span: &mut Span, buf: &mut Bytes) -> Result<(), DecodeError> {
6679
Ok(())
6780
}
6881

82+
// Safety: read_string_ref checks utf8 validity, so we don't do it again when creating the
83+
// BytesStrings
84+
fn fill_span_ref<'a>(span: &mut SpanSlice<'a>, buf: &mut &'a [u8]) -> Result<(), DecodeError> {
85+
let key = read_string_ref(buf)?
86+
.parse::<SpanKey>()
87+
.map_err(|_| DecodeError::InvalidFormat("Invalid span key".to_owned()))?;
88+
89+
match key {
90+
SpanKey::Service => span.service = read_nullable_string_ref(buf)?,
91+
SpanKey::Name => span.name = read_nullable_string_ref(buf)?,
92+
SpanKey::Resource => span.resource = read_nullable_string_ref(buf)?,
93+
SpanKey::TraceId => span.trace_id = read_nullable_number_ref(buf)?,
94+
SpanKey::SpanId => span.span_id = read_nullable_number_ref(buf)?,
95+
SpanKey::ParentId => span.parent_id = read_nullable_number_ref(buf)?,
96+
SpanKey::Start => span.start = read_nullable_number_ref(buf)?,
97+
SpanKey::Duration => span.duration = read_nullable_number_ref(buf)?,
98+
SpanKey::Error => span.error = read_nullable_number_ref(buf)?,
99+
SpanKey::Type => span.r#type = read_nullable_string_ref(buf)?,
100+
SpanKey::Meta => span.meta = read_nullable_str_map_to_str_ref(buf)?,
101+
SpanKey::Metrics => span.metrics = read_metrics_ref(buf)?,
102+
SpanKey::MetaStruct => span.meta_struct = read_meta_struct_ref(buf)?,
103+
SpanKey::SpanLinks => span.span_links = read_span_links_ref(buf)?,
104+
}
105+
Ok(())
106+
}
107+
69108
#[cfg(test)]
70109
mod tests {
71110
use super::SpanKey;

trace-utils/src/msgpack_decoder/v04/decoder/span_link.rs

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@ use crate::msgpack_decoder::v04::decoder::{
55
handle_null_marker, read_str_map_to_bytes_strings, read_string_bytes, read_string_ref,
66
};
77
use crate::msgpack_decoder::v04::error::DecodeError;
8-
use crate::msgpack_decoder::v04::number::read_number_bytes;
9-
use crate::span_v04::SpanLink;
8+
use crate::msgpack_decoder::v04::number::{read_number_bytes, read_number_ref};
9+
use crate::span_v04::{SpanLink, SpanLinkSlice};
1010
use rmp::Marker;
1111
use std::str::FromStr;
1212
use tinybytes::Bytes;
1313

14+
use super::{is_null_marker, read_str_map_to_str_ref};
15+
1416
/// Reads a slice of bytes and decodes it into a vector of `SpanLink` objects.
1517
///
1618
/// # Arguments
@@ -48,6 +50,30 @@ pub(crate) fn read_span_links(buf: &mut Bytes) -> Result<Vec<SpanLink>, DecodeEr
4850
)),
4951
}
5052
}
53+
54+
pub(crate) fn read_span_links_ref<'a>(
55+
buf: &mut &'a [u8],
56+
) -> Result<Vec<SpanLinkSlice<'a>>, DecodeError> {
57+
if is_null_marker(buf) {
58+
return Ok(Vec::default());
59+
}
60+
61+
match rmp::decode::read_marker(buf).map_err(|_| {
62+
DecodeError::InvalidFormat("Unable to read marker for span links".to_owned())
63+
})? {
64+
Marker::FixArray(len) => {
65+
let mut vec: Vec<SpanLinkSlice<'a>> = Vec::with_capacity(len.into());
66+
for _ in 0..len {
67+
vec.push(decode_span_link_ref(buf)?);
68+
}
69+
Ok(vec)
70+
}
71+
_ => Err(DecodeError::InvalidType(
72+
"Unable to read span link from buffer".to_owned(),
73+
)),
74+
}
75+
}
76+
5177
#[derive(Debug, PartialEq)]
5278
enum SpanLinkKey {
5379
TraceId,
@@ -95,6 +121,25 @@ fn decode_span_link(buf: &mut Bytes) -> Result<SpanLink, DecodeError> {
95121
Ok(span)
96122
}
97123

124+
fn decode_span_link_ref<'a>(buf: &mut &'a [u8]) -> Result<SpanLinkSlice<'a>, DecodeError> {
125+
let mut span = SpanLinkSlice::default();
126+
let span_size = rmp::decode::read_map_len(buf)
127+
.map_err(|_| DecodeError::InvalidType("Unable to get map len for span size".to_owned()))?;
128+
129+
for _ in 0..span_size {
130+
match read_string_ref(buf)?.parse::<SpanLinkKey>()? {
131+
SpanLinkKey::TraceId => span.trace_id = read_number_ref(buf)?,
132+
SpanLinkKey::TraceIdHigh => span.trace_id_high = read_number_ref(buf)?,
133+
SpanLinkKey::SpanId => span.span_id = read_number_ref(buf)?,
134+
SpanLinkKey::Attributes => span.attributes = read_str_map_to_str_ref(buf)?,
135+
SpanLinkKey::Tracestate => span.tracestate = read_string_ref(buf)?,
136+
SpanLinkKey::Flags => span.flags = read_number_ref(buf)?,
137+
}
138+
}
139+
140+
Ok(span)
141+
}
142+
98143
#[cfg(test)]
99144
mod tests {
100145
use super::SpanLinkKey;

0 commit comments

Comments
 (0)