Skip to content

Commit fd64970

Browse files
committed
cache strings and use 0.5 endpoint
1 parent 83deae0 commit fd64970

File tree

7 files changed

+295
-113
lines changed

7 files changed

+295
-113
lines changed

benchmark/sirun/plugin-koa/internal-tracer/encoder.js

Lines changed: 113 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@ const noop = () => {}
1515
const eventTypes = {
1616
KOA_REQUEST_START: 1,
1717
ERROR: 2,
18-
KOA_REQUEST_FINISH: 3
18+
KOA_REQUEST_FINISH: 3,
19+
START_SPAN: 4,
20+
FINISH_SPAN: 5,
21+
ADD_TAGS: 6
1922
}
2023

2124
const float64Array = new Float64Array(1)
@@ -41,6 +44,33 @@ class Encoder {
4144
return this._eventCount
4245
}
4346

47+
// encodeKoaRequestStart (req) {
48+
// const bytes = this._eventBytes
49+
// const store = storage.getStore()
50+
51+
// if (!store || !store.traceContext) return
52+
53+
// this._encodeFixArray(bytes, 2)
54+
// this._encodeShort(bytes, eventTypes.START_SPAN)
55+
// this._encodeFixArray(bytes, 10)
56+
// this._encodeLong(bytes, now())
57+
// this._encodeId(bytes, store.traceContext.traceId)
58+
// this._encodeId(bytes, store.traceContext.spanId)
59+
// this._encodeId(bytes, store.traceContext.parentId)
60+
// this._encodeString(bytes, service)
61+
// this._encodeString(bytes, 'koa.request')
62+
// this._encodeString(bytes, `${req.method} ${req.url}`)
63+
// this._encodeFixMap(bytes, 2)
64+
// this._encodeString(bytes, 'http.method')
65+
// this._encodeString(bytes, req.method)
66+
// this._encodeString(bytes, 'http.url')
67+
// this._encodeString(bytes, req.url)
68+
// this._encodeFixMap(bytes, 0)
69+
// this._encodeString(bytes, 'web')
70+
71+
// this._afterEncode()
72+
// }
73+
4474
encodeKoaRequestStart (req) {
4575
const bytes = this._eventBytes
4676
const store = storage.getStore()
@@ -53,7 +83,7 @@ class Encoder {
5383
// error will be its own event
5484

5585
this._encodeFixArray(bytes, 2)
56-
this._encodeShort(bytes, eventTypes.KOA_REQUEST_START) // implied: name
86+
this._encodeUnsigned(bytes, eventTypes.KOA_REQUEST_START) // implied: name
5787
this._encodeFixArray(bytes, 6)
5888
this._encodeLong(bytes, now())
5989
this._encodeId(bytes, store.traceContext.traceId)
@@ -65,19 +95,39 @@ class Encoder {
6595
this._afterEncode()
6696
}
6797

98+
// encodeKoaRequestFinish (res) {
99+
// const bytes = this._eventBytes
100+
// const store = storage.getStore()
101+
102+
// if (!store || !store.traceContext) return
103+
104+
// this._encodeFixArray(bytes, 2)
105+
// this._encodeShort(bytes, eventTypes.FINISH_SPAN)
106+
// this._encodeFixArray(bytes, 5)
107+
// this._encodeLong(bytes, now())
108+
// this._encodeId(bytes, store.traceContext.traceId)
109+
// this._encodeId(bytes, store.traceContext.spanId)
110+
// this._encodeFixMap(bytes, 1)
111+
// this._encodeString(bytes, 'http.status_code')
112+
// this._encodeString(bytes, String(res.statusCode || 0))
113+
// this._encodeFixMap(bytes, 0)
114+
115+
// this._afterEncode()
116+
// }
117+
68118
encodeKoaRequestFinish (res) {
69119
const bytes = this._eventBytes
70120
const store = storage.getStore()
71121

72122
if (!store || !store.traceContext) return
73123

74124
this._encodeFixArray(bytes, 2)
75-
this._encodeShort(bytes, eventTypes.KOA_REQUEST_FINISH) // implied: name
125+
this._encodeUnsigned(bytes, eventTypes.KOA_REQUEST_FINISH) // implied: name
76126
this._encodeFixArray(bytes, 4)
77127
this._encodeLong(bytes, now())
78128
this._encodeId(bytes, store.traceContext.traceId)
79129
this._encodeId(bytes, store.traceContext.spanId)
80-
this._encodeShort(bytes, res.statusCode)
130+
this._encodeUnsigned(bytes, res.statusCode)
81131

82132
this._afterEncode()
83133
}
@@ -181,6 +231,15 @@ class Encoder {
181231
bytes.buffer[offset + 4] = length
182232
}
183233

234+
_encodeFixMap (bytes, size = 0) {
235+
const offset = bytes.length
236+
237+
bytes.reserve(1)
238+
bytes.length += 1
239+
240+
bytes.buffer[offset] = 0x80 + size
241+
}
242+
184243
_encodeMapPrefix (bytes, keysLength) {
185244
const offset = bytes.length
186245

@@ -259,6 +318,50 @@ class Encoder {
259318
bytes.buffer[offset + 8] = lo
260319
}
261320

321+
_encodeUnsigned (bytes, value) {
322+
const offset = bytes.length
323+
324+
if (value <= 0xff) {
325+
bytes.reserve(2)
326+
bytes.length += 2
327+
328+
bytes.buffer[offset] = 0xcc
329+
bytes.buffer[offset + 1] = value
330+
} else if (value <= 0xffff) {
331+
bytes.reserve(3)
332+
bytes.length += 3
333+
334+
bytes.buffer[offset] = 0xcd
335+
bytes.buffer[offset + 1] = value >> 8
336+
bytes.buffer[offset + 2] = value
337+
} else if (value <= 0xffffffff) {
338+
bytes.reserve(5)
339+
bytes.length += 5
340+
341+
bytes.buffer[offset] = 0xce
342+
bytes.buffer[offset + 1] = value >> 24
343+
bytes.buffer[offset + 2] = value >> 16
344+
bytes.buffer[offset + 3] = value >> 8
345+
bytes.buffer[offset + 4] = value
346+
} else {
347+
const hi = (value / Math.pow(2, 32)) >> 0
348+
const lo = value >>> 0
349+
350+
bytes.reserve(9)
351+
bytes.length += 9
352+
353+
bytes.buffer[offset] = 0xcf
354+
bytes.buffer[offset + 1] = hi >> 24
355+
bytes.buffer[offset + 2] = hi >> 16
356+
bytes.buffer[offset + 3] = hi >> 8
357+
bytes.buffer[offset + 4] = hi
358+
bytes.buffer[offset + 5] = lo >> 24
359+
bytes.buffer[offset + 6] = lo >> 16
360+
bytes.buffer[offset + 7] = lo >> 8
361+
bytes.buffer[offset + 8] = lo
362+
}
363+
}
364+
262365
_encodeMap (bytes, value) {
263366
const keys = Object.keys(value)
264367
const validKeys = keys.filter(key => typeof value[key] === 'string' || typeof value[key] === 'number')
@@ -284,9 +387,14 @@ class Encoder {
284387
}
285388
}
286389

390+
_encodeFixString (bytes, value = '') {
391+
this._cacheString(value)
392+
this._encodeUnsigned(bytes, this._stringMap[value])
393+
}
394+
287395
_encodeString (bytes, value = '') {
288396
this._cacheString(value)
289-
this._encodeInteger(bytes, this._stringMap[value])
397+
this._encodeUnsigned(bytes, this._stringMap[value])
290398
}
291399

292400
_encodeFloat (bytes, value) {

benchmark/sirun/plugin-koa/server.js

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,17 @@ const requests = parseInt(REQUESTS)
2727
let readyServer
2828
let total = 0
2929

30-
app.use(ctx => {
30+
app.use(async ctx => {
3131
ctx.body = 'OK'
3232

3333
if (++total === requests) {
3434
server.close()
3535
readyServer.close()
3636
}
37+
38+
await new Promise((resolve) => {
39+
setTimeout(resolve(), 500)
40+
})
3741
})
3842

3943
const server = http.createServer(app.callback())

collector/common/src/exporting/agent.rs

Lines changed: 83 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
use crate::client::Client;
2-
use crate::tracing::{Trace, Traces};
2+
use crate::tracing::{Trace, Traces, Span, Meta, Metrics};
33
use super::Exporter;
4+
use hashbrown::HashMap;
45
use rmp::encode;
56
use rmp::encode::ByteBuf;
6-
use hashbrown::HashMap;
7+
use std::rc::Rc;
78

89
pub struct AgentExporter {
910
client: Box<dyn Client + Send + Sync>
@@ -35,69 +36,104 @@ impl AgentExporter {
3536
}
3637
}
3738

39+
fn cache_strings(&self, strings: &mut Vec<Rc<str>>, positions: &mut HashMap<Rc<str>, u32>, trace: &Trace) {
40+
for span in trace.spans.values() {
41+
self.cache_string(strings, positions, &span.service);
42+
self.cache_string(strings, positions, &span.name);
43+
self.cache_string(strings, positions, &span.resource);
44+
self.cache_string(strings, positions, &span.span_type);
45+
46+
for (k, v) in &span.meta {
47+
self.cache_string(strings, positions, &k);
48+
self.cache_string(strings, positions, &v);
49+
}
50+
51+
for (k, _) in &span.metrics {
52+
self.cache_string(strings, positions, &k);
53+
}
54+
}
55+
}
56+
57+
fn cache_string(&self, strings: &mut Vec<Rc<str>>, positions: &mut HashMap<Rc<str>, u32>, s: &Rc<str>) {
58+
if !positions.contains_key(s) {
59+
let len = strings.len() as u32;
60+
61+
positions.insert(s.clone(), len);
62+
strings.push(s.clone());
63+
}
64+
}
65+
66+
fn encode_strings(&self, wr: &mut ByteBuf, strings: &mut Vec<Rc<str>>) {
67+
encode::write_array_len(wr, strings.len() as u32).unwrap();
68+
69+
for s in strings {
70+
encode::write_str(wr, s).unwrap();
71+
}
72+
}
73+
3874
fn encode_traces(&self, wr: &mut ByteBuf, traces: Traces) {
75+
encode::write_array_len(wr, 2).unwrap();
76+
77+
let empty_string: Rc<str> = Rc::from("");
78+
let mut strings = Vec::new();
79+
let mut positions = HashMap::new();
80+
81+
strings.push(empty_string.clone());
82+
positions.insert(empty_string.clone(), 0u32);
83+
84+
// TODO: Avoid looping twice over traces/strings.
85+
for trace in traces.values() {
86+
self.cache_strings(&mut strings, &mut positions, trace);
87+
}
88+
89+
self.encode_strings(wr, &mut strings);
90+
3991
encode::write_array_len(wr, traces.len() as u32).unwrap();
4092

4193
for trace in traces.values() {
42-
self.encode_trace(wr, trace);
94+
self.encode_trace(wr, trace, &positions);
4395
}
4496
}
4597

46-
fn encode_trace(&self, wr: &mut ByteBuf, trace: &Trace) {
98+
fn encode_trace(&self, wr: &mut ByteBuf, trace: &Trace, positions: &HashMap<Rc<str>, u32>) {
4799
encode::write_array_len(wr, trace.spans.len() as u32).unwrap();
48100

49101
for span in trace.spans.values() {
50-
match &span.span_type {
51-
Some(span_type) => {
52-
encode::write_map_len(wr, 12).unwrap();
53-
encode::write_str(wr, "type").unwrap();
54-
encode::write_str(wr, span_type.as_str()).unwrap();
55-
},
56-
None => {
57-
encode::write_map_len(wr, 11).unwrap();
58-
}
59-
}
60-
61-
encode::write_str(wr, "trace_id").unwrap();
62-
encode::write_uint(wr, span.trace_id).unwrap();
63-
encode::write_str(wr, "span_id").unwrap();
64-
encode::write_uint(wr, span.span_id).unwrap();
65-
encode::write_str(wr, "parent_id").unwrap();
66-
encode::write_uint(wr, span.parent_id).unwrap();
67-
encode::write_str(wr, "name").unwrap();
68-
encode::write_str(wr, span.name.as_str()).unwrap();
69-
encode::write_str(wr, "resource").unwrap();
70-
encode::write_str(wr, span.resource.as_str()).unwrap();
71-
encode::write_str(wr, "service").unwrap();
72-
encode::write_str(wr, span.service.as_str()).unwrap();
73-
encode::write_str(wr, "error").unwrap();
74-
encode::write_uint(wr, span.error).unwrap();
75-
encode::write_str(wr, "start").unwrap();
76-
encode::write_uint(wr, span.start).unwrap();
77-
encode::write_str(wr, "duration").unwrap();
78-
encode::write_uint(wr, span.duration + 1).unwrap();
79-
80-
self.encode_meta(wr, &span.meta);
81-
self.encode_metrics(wr, &span.metrics);
102+
self.encode_span(wr, span, positions);
82103
}
83104
}
84105

85-
fn encode_meta(&self, wr: &mut ByteBuf, map: &HashMap<String, String>) {
86-
encode::write_str(wr, "meta").unwrap();
87-
encode::write_map_len(wr, map.len() as u32).unwrap();
106+
fn encode_span(&self, wr: &mut ByteBuf, span: &Span, positions: &HashMap<Rc<str>, u32>) {
107+
encode::write_array_len(wr, 12).unwrap();
108+
109+
encode::write_uint(wr, positions[&span.service] as u64).unwrap();
110+
encode::write_uint(wr, positions[&span.name] as u64).unwrap();
111+
encode::write_uint(wr, positions[&span.resource] as u64).unwrap();
112+
encode::write_uint(wr, span.trace_id).unwrap();
113+
encode::write_uint(wr, span.span_id).unwrap();
114+
encode::write_uint(wr, span.parent_id).unwrap();
115+
encode::write_uint(wr, span.start).unwrap();
116+
encode::write_uint(wr, span.duration + 1).unwrap();
117+
encode::write_uint(wr, span.error).unwrap();
118+
self.encode_meta(wr, &span.meta, positions);
119+
self.encode_metrics(wr, &span.metrics, positions);
120+
encode::write_uint(wr, positions[&span.span_type] as u64).unwrap();
121+
}
122+
123+
fn encode_meta(&self, wr: &mut ByteBuf, meta: &Meta, positions: &HashMap<Rc<str>, u32>) {
124+
encode::write_map_len(wr, meta.len() as u32).unwrap();
88125

89-
for (k, v) in map {
90-
encode::write_str(wr, k.as_str()).unwrap();
91-
encode::write_str(wr, v.as_str()).unwrap();
126+
for (k, v) in meta {
127+
encode::write_uint(wr, positions[k] as u64).unwrap();
128+
encode::write_uint(wr, positions[v] as u64).unwrap();
92129
}
93130
}
94131

95-
fn encode_metrics(&self, wr: &mut ByteBuf, map: &HashMap<String, f64>) {
96-
encode::write_str(wr, "metrics").unwrap();
97-
encode::write_map_len(wr, map.len() as u32).unwrap();
132+
fn encode_metrics(&self, wr: &mut ByteBuf, metrics: &Metrics, positions: &HashMap<Rc<str>, u32>) {
133+
encode::write_map_len(wr, metrics.len() as u32).unwrap();
98134

99-
for (k, v) in map {
100-
encode::write_str(wr, k.as_str()).unwrap();
135+
for (k, v) in metrics {
136+
encode::write_uint(wr, positions[k] as u64).unwrap();
101137
encode::write_f64(wr, *v).unwrap();
102138
}
103139
}

collector/common/src/msgpack.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ pub fn read_u64<R: Read>(mut rd: R) -> Result<u64, NumValueReadError> {
1414
rmp::decode::read_int(&mut rd)
1515
}
1616

17+
pub fn read_usize<R: Read>(mut rd: R) -> Result<usize, NumValueReadError> {
18+
rmp::decode::read_int(&mut rd)
19+
}
20+
1721
pub fn read_str<R: Read>(mut rd: R) -> String {
1822
let limit = read_str_len(&mut rd).unwrap() as u64;
1923
let mut str = String::new();

0 commit comments

Comments
 (0)