diff --git a/.editorconfig b/.editorconfig index 7fdb734b2ee..2749b2502c2 100644 --- a/.editorconfig +++ b/.editorconfig @@ -10,5 +10,8 @@ charset = utf-8 trim_trailing_whitespace = true insert_final_newline = true +[*.rs] +indent_size = 4 + [*.md] trim_trailing_whitespace = false diff --git a/benchmark/sirun/plugin-koa/README.md b/benchmark/sirun/plugin-koa/README.md new file mode 100644 index 00000000000..9bba28b0d38 --- /dev/null +++ b/benchmark/sirun/plugin-koa/README.md @@ -0,0 +1,3 @@ +This creates 100,000 HTTP requests to a Koa server. + +The variants are with the tracer, without it, and with a new internal tracer. diff --git a/benchmark/sirun/plugin-koa/agent.js b/benchmark/sirun/plugin-koa/agent.js new file mode 100644 index 00000000000..2d146a75b77 --- /dev/null +++ b/benchmark/sirun/plugin-koa/agent.js @@ -0,0 +1,25 @@ +'use strict' + +const express = require('express') +const bodyParser = require('body-parser') + +const port = process.env.PORT || 8126 +const app = express() + +let requests = 0 +let bytes = 0 + +app.use(bodyParser.raw({ limit: '50mb', type: () => true })) +app.use('*', (req, res) => { + requests++ + bytes += req.body.length + + // console.log(require('msgpack-lite').decode(req.body)) + + console.log(`Requests: ${requests}`) // eslint-disable-line no-console + console.log(`Bytes: ${bytes}`) // eslint-disable-line no-console + + res.status(200).send() +}) + +app.listen(port) diff --git a/benchmark/sirun/plugin-koa/internal-tracer/client.js b/benchmark/sirun/plugin-koa/internal-tracer/client.js new file mode 100644 index 00000000000..f35b055cfcb --- /dev/null +++ b/benchmark/sirun/plugin-koa/internal-tracer/client.js @@ -0,0 +1,92 @@ +'use strict' + +const http = require('http') +const https = require('https') +const { dockerId, storage } = require('../../../../packages/datadog-core') +const tracerVersion = require('../../../../package.json').version + +const httpAgent = new http.Agent({ keepAlive: true, maxSockets: 1 }) +const httpsAgent = new https.Agent({ keepAlive: true, maxSockets: 1 }) + +const DD_TRACE_AGENT_URL = process.env.DD_TRACE_AGENT_URL || process.env.DD_TRACE_URL + +class Client { + request (options, done) { + if (options.count === 0) return + + const port = options.port || 8127 + const url = new URL(DD_TRACE_AGENT_URL || `http://127.0.0.1:${port}`) + const isSecure = url.protocol === 'https:' + const isUnix = url.protocol === 'unix:' + const client = isSecure ? https : http + const agent = isSecure ? httpsAgent : httpAgent + const data = options.data + const timeout = 2000 + const httpOptions = { + agent, + protocol: url.protocol, + hostname: url.hostname, + port: url.port, + socketPath: isUnix && url.pathname, + path: options.path, + method: 'PUT', + headers: { + 'Content-Length': String(data.length), + 'Content-Type': 'application/msgpack', + 'Datadog-Container-ID': dockerId || '', + 'Datadog-Meta-Lang': 'nodejs', + 'Datadog-Meta-Lang-Version': process.version, + 'Datadog-Meta-Lang-Interpreter': process.jsEngine || 'v8', + 'Datadog-Meta-Tracer-Version': tracerVersion + }, + timeout + } + + const onResponse = res => { + let data = '' + + res.setTimeout(timeout) + res.on('data', chunk => { + data += chunk + }) + res.on('end', () => { + if (res.statusCode >= 200 && res.statusCode <= 299) { + try { + const response = data + done(null, response) + } catch (e) { + done(e) + } + } else { + const statusCode = res.statusCode + const statusText = http.STATUS_CODES[res.statusCode] + const error = new Error(`Error from the agent: ${statusCode} ${statusText}`) + + error.status = statusCode + + done(error, null) + } + }) + } + + const makeRequest = onError => { + const store = storage.getStore() + + storage.enterWith({ noop: true }) + + const req = client.request(httpOptions, onResponse) + + req.on('error', onError) + + req.setTimeout(timeout, req.abort) + req.write(data) + req.end() + + storage.enterWith(store) + } + + makeRequest(() => makeRequest(done)) // retry once on error + } +} + +module.exports = { Client } diff --git a/benchmark/sirun/plugin-koa/internal-tracer/context.js b/benchmark/sirun/plugin-koa/internal-tracer/context.js new file mode 100644 index 00000000000..46d193b864a --- /dev/null +++ b/benchmark/sirun/plugin-koa/internal-tracer/context.js @@ -0,0 +1,19 @@ +'use strict' + +const { id, zeroId } = require('./id') + +class TraceContext { + constructor (childOf) { + if (childOf) { + this.traceId = childOf.traceId + this.spanId = id() + this.parentId = childOf.spanId + } else { + this.traceId = id() + this.spanId = this.traceId + this.parentId = zeroId + } + } +} + +module.exports = { TraceContext } diff --git a/benchmark/sirun/plugin-koa/internal-tracer/encoder.js b/benchmark/sirun/plugin-koa/internal-tracer/encoder.js new file mode 100644 index 00000000000..acca446bebe --- /dev/null +++ b/benchmark/sirun/plugin-koa/internal-tracer/encoder.js @@ -0,0 +1,495 @@ +'use strict' + +const Chunk = require('../../../../packages/dd-trace/src/encode/chunk') +const { storage } = require('../../../../packages/datadog-core') +const { Client } = require('./client') +const { zeroId } = require('./id') +const { now } = require('./now') + +// const service = process.env.DD_SERVICE || 'unnamed-node-app' +const ARRAY_OF_TWO = 0x92 +const SOFT_LIMIT = 8 * 1024 * 1024 // 8MB +const flushInterval = 2000 +const noop = () => {} +const eventTypes = { + WEB_REQUEST_START: 1, + ERROR: 2, + WEB_REQUEST_FINISH: 3, + START_SPAN: 4, + FINISH_SPAN: 5, + ADD_TAGS: 6, + MYSQL_START_SPAN: 8 +} + +const float64Array = new Float64Array(1) +const uInt8Float64Array = new Uint8Array(float64Array.buffer) + +float64Array[0] = -1 + +const bigEndian = uInt8Float64Array[7] === 0 + +class Encoder { + constructor (limit = SOFT_LIMIT) { + this._limit = limit + this._metadataBytes = new Chunk(1024) + this._eventBytes = new Chunk() + this._stringBytes = new Chunk() + this._client = new Client() + this._reset() + + process.once('beforeExit', () => this.flush()) + } + + count () { + return this._eventCount + } + + encodeWebRequestStart (req, component) { + const bytes = this._eventBytes + const store = storage.getStore() + + if (!store || !store.traceContext) return + + this._encodeFixArray(bytes, 2) + this._encodeByte(bytes, eventTypes.WEB_REQUEST_START) + this._encodeFixArray(bytes, 8) + this._encodeLong(bytes, now()) + this._encodeId(bytes, store.traceContext.traceId) + this._encodeId(bytes, store.traceContext.spanId) + this._encodeId(bytes, store.traceContext.parentId) + this._encodeString(bytes, component) + this._encodeString(bytes, req.method) + this._encodeString(bytes, req.url) + this._encodeString(bytes, req.url) // route + + this._afterEncode() + } + + encodeWebRequestFinish (res) { + const bytes = this._eventBytes + const store = storage.getStore() + + if (!store || !store.traceContext) return + + this._encodeFixArray(bytes, 2) + this._encodeByte(bytes, eventTypes.WEB_REQUEST_FINISH) + this._encodeFixArray(bytes, 3) + this._encodeLong(bytes, now()) + this._encodeId(bytes, store.traceContext.traceId) + this._encodeId(bytes, store.traceContext.spanId) + this._encodeShort(bytes, res.statusCode) + + this._afterEncode() + } + + encodeMysqlQueryStart (query) { + const bytes = this._eventBytes + const store = storage.getStore() + + if (!store || !store.traceContext) return + + this._encodeFixArray(bytes, 2) + this._encodeByte(bytes, eventTypes.MYSQL_START_SPAN) + this._encodeFixArray(bytes, 9) + this._encodeLong(bytes, now()) + this._encodeId(bytes, store.traceContext.traceId) + this._encodeId(bytes, store.traceContext.spanId) + this._encodeId(bytes, store.traceContext.parentId) + this._encodeString(bytes, query.sql) + this._encodeString(bytes, query.conf.database) + this._encodeString(bytes, query.conf.user) + this._encodeString(bytes, query.conf.host) + this._encodeString(bytes, query.conf.port) + + this._afterEncode() + } + + encodeFinish () { + const bytes = this._eventBytes + const store = storage.getStore() + + if (!store || !store.traceContext) return + + this._encodeFixArray(bytes, 2) + this._encodeByte(bytes, eventTypes.FINISH_SPAN) + this._encodeFixArray(bytes, 5) + this._encodeLong(bytes, now()) + this._encodeId(bytes, store.traceContext.traceId) + this._encodeId(bytes, store.traceContext.spanId) + this._encodeFixMap(bytes, 0) + this._encodeFixMap(bytes, 0) + + this._afterEncode() + } + + encodeError (error) { + const bytes = this._eventBytes + const store = storage.getStore() + + if (!store || !store.traceContext) return // TODO: support errors without tracing + + this._encodeFixArray(bytes, 2) + this._encodeByte(bytes, eventTypes.ERROR) // implied: name + this._encodeFixArray(bytes, error ? 6 : 3) + this._encodeLong(bytes, now()) + this._encodeId(bytes, store.traceContext.traceId) + this._encodeId(bytes, store.traceContext.spanId) + + if (error) { + this._encodeString(bytes, error.name) + this._encodeString(bytes, error.message) + this._encodeString(bytes, error.stack) + } + + this._afterEncode() + } + + makePayload () { + const prefixSize = 1 + const stringSize = this._stringBytes.length + 5 + const eventSize = this._eventBytes.length + 5 + const buffer = Buffer.allocUnsafe(prefixSize + stringSize + eventSize) + + let offset = 0 + + buffer[offset++] = ARRAY_OF_TWO + + offset = this._writeStrings(buffer, offset) + // TODO: add metadata + offset = this._writeEvents(buffer, offset) + + this._reset() + + return buffer + } + + flush (done = noop) { + const count = this.count() + + if (count === 0) return + + const data = this.makePayload() + + this._timer = clearTimeout(this._timer) + + if (process.env.WITH_NATIVE_COLLECTOR) { + this.flushFfi(data, done) + } else if (process.env.WITH_WASM_COLLECTOR) { + this.flushWasm(data, done) + } else { + const path = `/v0.1/events` + this._client.request({ data, path, count }, done) + } + } + + // TODO: Use node:ffi when it lands. + // https://github.com/nodejs/node/pull/46905 + flushFfi (data, done) { + try { + const path = require('path') + const { getNativeFunction, getBufferPointer } = require('sbffi') + const libPath = path.normalize( + path.join(__dirname, '../../../../collector/target/release/libffi.dylib') + ) + const submit = getNativeFunction(libPath, 'submit', 'uint32_t', ['uint32_t', 'uint8_t *']) + const ptr = getBufferPointer(data) + + submit(data.length, ptr) + done() + } catch (e) { + done(e) + } + } + + // In collector folder: + // cargo build -r -p wasm --target wasm32-unknown-unknown + // wasm-bindgen + // --target nodejs ./target/wasm32-unknown-unknown/release/wasm.wasm + // --out-dir=./target/wasm32-unknown-unknown/release/ + flushWasm (payload, done) { + const libPath = '../../../../collector/target/wasm32-unknown-unknown/release/wasm.js' + const { collect } = require(libPath) + + const data = collect(payload) + const path = `/v0.5/traces` + + this._client.request({ data, path, port: 8126 }, done) + } + + reset () { + this._reset() + } + + _afterEncode () { + this._eventCount++ + + // we can go over the soft limit since the agent has a 50MB hard limit + if (this._eventBytes.length > this._limit || this._stringBytes.length > this._limit) { + this.flush() + } else if (!this._timer) { + this._timer = setTimeout(() => this.flush(), flushInterval).unref() + } + } + + _reset () { + this._metadataBytes.length = 0 + this._eventCount = 0 + this._eventBytes.length = 0 + this._stringCount = 0 + this._stringBytes.length = 0 + this._stringMap = {} + + this._cacheString('') + } + + _encodeFixArray (bytes, size = 0) { + const offset = bytes.length + + bytes.reserve(1) + bytes.length += 1 + + bytes.buffer[offset] = 0x90 + size + } + + _encodeArrayPrefix (bytes, value) { + const length = value.length + const offset = bytes.length + + bytes.reserve(5) + bytes.length += 5 + + bytes.buffer[offset] = 0xdd + bytes.buffer[offset + 1] = length >> 24 + bytes.buffer[offset + 2] = length >> 16 + bytes.buffer[offset + 3] = length >> 8 + bytes.buffer[offset + 4] = length + } + + _encodeFixMap (bytes, size = 0) { + const offset = bytes.length + + bytes.reserve(1) + bytes.length += 1 + + bytes.buffer[offset] = 0x80 + size + } + + _encodeMapPrefix (bytes, keysLength) { + const offset = bytes.length + + bytes.reserve(5) + bytes.length += 5 + bytes.buffer[offset] = 0xdf + bytes.buffer[offset + 1] = keysLength >> 24 + bytes.buffer[offset + 2] = keysLength >> 16 + bytes.buffer[offset + 3] = keysLength >> 8 + bytes.buffer[offset + 4] = keysLength + } + + _encodeByte (bytes, value) { + bytes.reserve(1) + + bytes.buffer[bytes.length++] = value + } + + _encodeId (bytes, id) { + const offset = bytes.length + + if (id === zeroId) { + bytes.reserve(1) + bytes.length += 1 + + bytes.buffer[offset] = 0x00 + } else { + bytes.reserve(9) + bytes.length += 9 + + bytes.buffer[offset] = 0xcf + bytes.buffer[offset + 1] = id[0] + bytes.buffer[offset + 2] = id[1] + bytes.buffer[offset + 3] = id[2] + bytes.buffer[offset + 4] = id[3] + bytes.buffer[offset + 5] = id[4] + bytes.buffer[offset + 6] = id[5] + bytes.buffer[offset + 7] = id[6] + bytes.buffer[offset + 8] = id[7] + } + } + + _encodeInteger (bytes, value) { + const offset = bytes.length + + bytes.reserve(5) + bytes.length += 5 + + bytes.buffer[offset] = 0xce + bytes.buffer[offset + 1] = value >> 24 + bytes.buffer[offset + 2] = value >> 16 + bytes.buffer[offset + 3] = value >> 8 + bytes.buffer[offset + 4] = value + } + + _encodeShort (bytes, value) { + const offset = bytes.length + + bytes.reserve(3) + bytes.length += 3 + + bytes.buffer[offset] = 0xcd + bytes.buffer[offset + 1] = value >> 8 + bytes.buffer[offset + 2] = value + } + + _encodeLong (bytes, value) { + const offset = bytes.length + const hi = (value / Math.pow(2, 32)) >> 0 + const lo = value >>> 0 + + bytes.reserve(9) + bytes.length += 9 + + bytes.buffer[offset] = 0xcf + bytes.buffer[offset + 1] = hi >> 24 + bytes.buffer[offset + 2] = hi >> 16 + bytes.buffer[offset + 3] = hi >> 8 + bytes.buffer[offset + 4] = hi + bytes.buffer[offset + 5] = lo >> 24 + bytes.buffer[offset + 6] = lo >> 16 + bytes.buffer[offset + 7] = lo >> 8 + bytes.buffer[offset + 8] = lo + } + + _encodeUnsigned (bytes, value) { + const offset = bytes.length + + if (value <= 0x7f) { + bytes.reserve(1) + bytes.length += 1 + + bytes.buffer[offset] = value + } else if (value <= 0xff) { + bytes.reserve(2) + bytes.length += 2 + + bytes.buffer[offset] = 0xcc + bytes.buffer[offset + 1] = value + } else if (value <= 0xffff) { + bytes.reserve(3) + bytes.length += 3 + + bytes.buffer[offset] = 0xcd + bytes.buffer[offset + 1] = value >> 8 + bytes.buffer[offset + 2] = value + } else if (value <= 0xffffffff) { + bytes.reserve(5) + bytes.length += 5 + + bytes.buffer[offset] = 0xce + bytes.buffer[offset + 1] = value >> 24 + bytes.buffer[offset + 2] = value >> 16 + bytes.buffer[offset + 3] = value >> 8 + bytes.buffer[offset + 4] = value + } else { + const hi = (value / Math.pow(2, 32)) >> 0 + const lo = value >>> 0 + + bytes.reserve(9) + bytes.length += 9 + + bytes.buffer[offset] = 0xcf + bytes.buffer[offset + 1] = hi >> 24 + bytes.buffer[offset + 2] = hi >> 16 + bytes.buffer[offset + 3] = hi >> 8 + bytes.buffer[offset + 4] = hi + bytes.buffer[offset + 5] = lo >> 24 + bytes.buffer[offset + 6] = lo >> 16 + bytes.buffer[offset + 7] = lo >> 8 + bytes.buffer[offset + 8] = lo + } + } + + _encodeMap (bytes, value) { + const keys = Object.keys(value) + const validKeys = keys.filter(key => typeof value[key] === 'string' || typeof value[key] === 'number') + + this._encodeMapPrefix(bytes, validKeys.length) + + for (const key of validKeys) { + this._encodeString(bytes, key) + this._encodeValue(bytes, value[key]) + } + } + + _encodeValue (bytes, value) { + switch (typeof value) { + case 'string': + this._encodeString(bytes, value) + break + case 'number': + this._encodeFloat(bytes, value) + break + default: + // should not happen + } + } + + _encodeFixString (bytes, value = '') { + this._cacheString(value) + this._encodeUnsigned(bytes, this._stringMap[value]) + } + + _encodeString (bytes, value = '') { + this._cacheString(value) + this._encodeUnsigned(bytes, this._stringMap[value]) + } + + _encodeFloat (bytes, value) { + float64Array[0] = value + + const offset = bytes.length + bytes.reserve(9) + bytes.length += 9 + + bytes.buffer[offset] = 0xcb + + if (bigEndian) { + for (let i = 0; i <= 7; i++) { + bytes.buffer[offset + i + 1] = uInt8Float64Array[i] + } + } else { + for (let i = 7; i >= 0; i--) { + bytes.buffer[bytes.length - i - 1] = uInt8Float64Array[i] + } + } + } + + _cacheString (value) { + if (!(value in this._stringMap)) { + this._stringMap[value] = this._stringCount++ + this._stringBytes.write(value) + } + } + + _writeArrayPrefix (buffer, offset, count) { + buffer[offset++] = 0xdd + buffer.writeUInt32BE(count, offset) + + return offset + 4 + } + + _writeStrings (buffer, offset) { + offset = this._writeArrayPrefix(buffer, offset, this._stringCount) + offset += this._stringBytes.buffer.copy(buffer, offset, 0, this._stringBytes.length) + + return offset + } + + _writeEvents (buffer, offset = 0) { + offset = this._writeArrayPrefix(buffer, offset, this._eventCount) + offset += this._eventBytes.buffer.copy(buffer, offset, 0, this._eventBytes.length) + + return offset + } +} + +module.exports = { Encoder, encoder: new Encoder() } diff --git a/benchmark/sirun/plugin-koa/internal-tracer/id.js b/benchmark/sirun/plugin-koa/internal-tracer/id.js new file mode 100644 index 00000000000..0822c40f91f --- /dev/null +++ b/benchmark/sirun/plugin-koa/internal-tracer/id.js @@ -0,0 +1,117 @@ +'use strict' + +const { randomFillSync } = require('crypto') + +const UINT_MAX = 4294967296 + +const data = new Uint8Array(8 * 8192) +const zeroId = new Uint8Array(8) + +let batch = 0 + +function id (value, raddix) { + const buffer = value + ? fromNumberString(value, raddix) + : pseudoRandom() + + return buffer +} + +function pseudoRandom () { + if (batch === 0) { + randomFillSync(data) + } + + batch = (batch + 1) % 8192 + + const offset = batch * 8 + + return [ + data[offset] & 0x7F, // only positive int64 + data[offset + 1], + data[offset + 2], + data[offset + 3], + data[offset + 4], + data[offset + 5], + data[offset + 6], + data[offset + 7] + ] +} + +function fromNumberString (str, raddix = 10) { + const len = str.length + + let pos = 0 + let high = 0 + let low = 0 + + if (str[0] === '-') pos++ + + const sign = pos + + while (pos < len) { + const chr = parseInt(str[pos++], raddix) + + if (!(chr >= 0)) break // NaN + + low = low * raddix + chr + high = high * raddix + Math.floor(low / UINT_MAX) + low %= UINT_MAX + } + + if (sign) { + high = ~high + + if (low) { + low = UINT_MAX - low + } else { + high++ + } + } + + if (high === 0 && low === 0) return zeroId + + const buffer = new Array(8) // TODO: use existing buffer + + writeUInt32BE(buffer, high, 0) + writeUInt32BE(buffer, low, 4) + + return buffer +} + +function toNumberString (buffer, radix = 10) { + let high = readInt32(buffer, 0) + let low = readInt32(buffer, 4) + let str = '' + + while (1) { + const mod = (high % radix) * UINT_MAX + low + + high = Math.floor(high / radix) + low = Math.floor(mod / radix) + str = (mod % radix).toString(radix) + str + + if (!high && !low) break + } + + return str +} + +function readInt32 (buffer, offset) { + return (buffer[offset + 0] * 16777216) + + (buffer[offset + 1] << 16) + + (buffer[offset + 2] << 8) + + buffer[offset + 3] +} + +function writeUInt32BE (buffer, value, offset) { + buffer[3 + offset] = value & 255 + value = value >> 8 + buffer[2 + offset] = value & 255 + value = value >> 8 + buffer[1 + offset] = value & 255 + value = value >> 8 + buffer[0 + offset] = value & 255 +} + +module.exports = { id, zeroId, toNumberString } diff --git a/benchmark/sirun/plugin-koa/internal-tracer/index.js b/benchmark/sirun/plugin-koa/internal-tracer/index.js new file mode 100644 index 00000000000..aac26892504 --- /dev/null +++ b/benchmark/sirun/plugin-koa/internal-tracer/index.js @@ -0,0 +1,5 @@ +'use strict' + +require('./patch') +require('./storage') +require('./trace') diff --git a/benchmark/sirun/plugin-koa/internal-tracer/now.js b/benchmark/sirun/plugin-koa/internal-tracer/now.js new file mode 100644 index 00000000000..bdccbfd777f --- /dev/null +++ b/benchmark/sirun/plugin-koa/internal-tracer/now.js @@ -0,0 +1,10 @@ +'use strict' + +const processStartTime = BigInt(Date.now() * 1e6) +const processStartTicks = process.hrtime.bigint() + +function now () { + Number(processStartTime + process.hrtime.bigint() - processStartTicks) +} + +module.exports = { now } diff --git a/benchmark/sirun/plugin-koa/internal-tracer/patch.js b/benchmark/sirun/plugin-koa/internal-tracer/patch.js new file mode 100644 index 00000000000..bb9ff542ca4 --- /dev/null +++ b/benchmark/sirun/plugin-koa/internal-tracer/patch.js @@ -0,0 +1,32 @@ +'use strict' + +const { channel } = require('diagnostics_channel') +const Hook = require('../../../../packages/dd-trace/src/ritm') + +const startChannel = channel('apm:koa:request:start') +const endChannel = channel('apm:koa:request:end') +const errorChannel = channel('apm:koa:request:error') +const asyncEndChannel = channel('apm:koa:request:async-end') + +Hook(['koa'], function (Koa, name, basedir) { + if (name !== 'koa/lib/application.js') return Koa + + const { handleRequest } = Koa.prototype + + Koa.prototype.handleRequest = function (ctx, fnMiddleware) { + startChannel.publish(ctx) + + const promise = handleRequest.apply(this, arguments) + .then(() => asyncEndChannel.publish(ctx), error => { + errorChannel.publish(error) + asyncEndChannel.publish(ctx) + throw error + }) + + endChannel.publish(ctx) + + return promise + } + + return Koa +}) diff --git a/benchmark/sirun/plugin-koa/internal-tracer/storage.js b/benchmark/sirun/plugin-koa/internal-tracer/storage.js new file mode 100644 index 00000000000..5416804df2c --- /dev/null +++ b/benchmark/sirun/plugin-koa/internal-tracer/storage.js @@ -0,0 +1,20 @@ +'use strict' + +const { channel } = require('diagnostics_channel') +const { storage } = require('../../../../packages/datadog-core') + +const startChannel = channel('apm:koa:request:start') +const endChannel = channel('apm:koa:request:end') + +const stores = [] + +startChannel.subscribe(() => { + const store = storage.getStore() + + stores.push(store) + storage.enterWith({ ...store }) +}) + +endChannel.subscribe(() => { + storage.enterWith(stores.pop()) +}) diff --git a/benchmark/sirun/plugin-koa/internal-tracer/trace.js b/benchmark/sirun/plugin-koa/internal-tracer/trace.js new file mode 100644 index 00000000000..932fec1ba31 --- /dev/null +++ b/benchmark/sirun/plugin-koa/internal-tracer/trace.js @@ -0,0 +1,29 @@ +'use strict' + +const { channel } = require('diagnostics_channel') +const { encoder } = require('./encoder') +const { TraceContext } = require('./context') +const { storage } = require('../../../../packages/datadog-core') + +const startChannel = channel('apm:koa:request:start') +const errorChannel = channel('apm:koa:request:error') +const asyncEndChannel = channel('apm:koa:request:async-end') + +startChannel.subscribe(({ req }) => { + const store = storage.getStore() + const traceContext = new TraceContext(store.traceContext) + + store.traceContext = traceContext + + encoder.encodeWebRequestStart(req, 'koa') +}) + +errorChannel.subscribe(error => { + encoder.encodeError(error) +}) + +asyncEndChannel.subscribe(({ res }) => { + encoder.encodeWebRequestFinish(res, 'koa') + + // TODO: restore parent context +}) diff --git a/benchmark/sirun/plugin-koa/meta.json b/benchmark/sirun/plugin-koa/meta.json new file mode 100644 index 00000000000..f69dc680fff --- /dev/null +++ b/benchmark/sirun/plugin-koa/meta.json @@ -0,0 +1,28 @@ +{ + "name": "koa", + "setup": "node setup", + "run": "node server", + "env": { + "PORT": "3030", + "REQUESTS": "100000" + }, + "iterations": 10, + "cachegrind": false, + "variants": { + "baseline": {}, + "with-internal-tracer": { + "env": { + "WITH_INTERNAL_TRACER": "true", + "WITH_FAKE_DB": "true", + "WITH_NATIVE_COLLECTOR": "true" + } + }, + "with-tracer": { + "env": { + "WITH_TRACER": "true", + "WITH_FAKE_DB": "true", + "DD_TRACE_AGENT_PROTOCOL_VERSION": "0.5" + } + } + } +} diff --git a/benchmark/sirun/plugin-koa/server.js b/benchmark/sirun/plugin-koa/server.js new file mode 100644 index 00000000000..c784753d1e9 --- /dev/null +++ b/benchmark/sirun/plugin-koa/server.js @@ -0,0 +1,113 @@ +'use strict' + +const { PORT, REQUESTS, WITH_FAKE_DB, WITH_INTERNAL_TRACER, WITH_TRACER } = process.env + +let tracer +let encoder +let storage +let TraceContext + +if (WITH_TRACER === 'true') { + tracer = require('../../..') + tracer.init({ + startupLogs: false, + plugins: false + }) + tracer.use('http', { enabled: true }) + tracer.use('koa', { enabled: true, middleware: true }) +} + +if (WITH_INTERNAL_TRACER === 'true') { + require('./internal-tracer') + encoder = require('./internal-tracer/encoder').encoder + storage = require('../../../packages/datadog-core').storage + TraceContext = require('./internal-tracer/context').TraceContext +} + +const http = require('http') +const Koa = require('../../../versions/koa/node_modules/koa') +const net = require('net') +const app = new Koa() + +const port = parseInt(PORT) +const requests = parseInt(REQUESTS) + +let readyServer +let total = 0 + +app.use(ctx => { + ctx.body = 'OK' + + if (++total === requests) { + server.close() + readyServer.close() + } + + if (WITH_FAKE_DB === 'true') { + for (let i = 0; i < 25; i++) { + const query = startQuery() + + if (WITH_TRACER) { + const span = traceQuery(query) + runQuery(query) + span.finish() + } else if (WITH_INTERNAL_TRACER) { + const store = storage.getStore() + const parent = store.traceContext + store.traceContext = new TraceContext(parent) + encoder.encodeMysqlQueryStart(query) + runQuery(query) + encoder.encodeFinish() + store.traceContext = parent + } else { + runQuery(query) + } + } + } +}) + +const server = http.createServer(app.callback()) + +server.listen(port, () => { + readyServer = net.createServer(() => {}) + readyServer.listen(port + 1) +}) + +function startQuery () { + return { + sql: 'SELECT * FROM mytable WHERE 1 = 1;', + conf: { + user: 'myuser', + database: 'mydatabase', + host: '127.0.0.1', + port: '3306' + } + } +} + +function runQuery () { + return new Promise(resolve => { + setTimeout(resolve, Math.random() * 5) + }) +} + +function traceQuery (query) { + const childOf = tracer.scope().active() + const span = tracer.startSpan('mysql.query', { + childOf, + tags: { + 'span.kind': 'client', + 'span.type': 'sql', + 'resource.name': query.sql, + 'db.type': 'mysql', + 'db.user': query.conf.user, + 'db.name': query.conf.database, + 'out.host': query.conf.host, + 'out.port': query.conf.port + } + }) + + span.finish() + + return span +} diff --git a/benchmark/sirun/plugin-koa/setup.js b/benchmark/sirun/plugin-koa/setup.js new file mode 100644 index 00000000000..cd09f1d170c --- /dev/null +++ b/benchmark/sirun/plugin-koa/setup.js @@ -0,0 +1,13 @@ +'use strict' + +const { spawn } = require('child_process') + +const port = parseInt(process.env.PORT) +const requests = parseInt(process.env.REQUESTS) +const options = { + detached: true, + stdio: 'ignore', + shell: true +} + +spawn(`npx wait-on tcp:${port + 1} && npx autocannon -a ${requests} http://localhost:${port}/hello`, options).unref() diff --git a/collector/.gitignore b/collector/.gitignore new file mode 100644 index 00000000000..eb5a316cbd1 --- /dev/null +++ b/collector/.gitignore @@ -0,0 +1 @@ +target diff --git a/collector/Cargo.lock b/collector/Cargo.lock new file mode 100644 index 00000000000..fc5dc590727 --- /dev/null +++ b/collector/Cargo.lock @@ -0,0 +1,558 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "ahash" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c99f64d1e06488f620f932677e24bc6e2897582980441ae90a671415bd7ec2f" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", +] + +[[package]] +name = "autocfg" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" + +[[package]] +name = "bumpalo" +version = "3.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d261e256854913907f67ed06efbc3338dfe6179796deefc1ff763fc1aee5535" + +[[package]] +name = "byteorder" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" + +[[package]] +name = "bytes" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "common" +version = "0.1.0" +dependencies = [ + "hashbrown", + "rmp", +] + +[[package]] +name = "ffi" +version = "0.1.0" +dependencies = [ + "common", + "hyper-client", + "libc", + "tokio", +] + +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + +[[package]] +name = "futures-channel" +version = "0.3.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e5317663a9089767a1ec00a487df42e0ca174b61b4483213ac24448e4664df5" +dependencies = [ + "futures-core", +] + +[[package]] +name = "futures-core" +version = "0.3.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec90ff4d0fe1f57d600049061dc6bb68ed03c7d2fbd697274c41805dcb3f8608" + +[[package]] +name = "futures-task" +version = "0.3.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcf79a1bf610b10f42aea489289c5a2c478a786509693b80cd39c44ccd936366" + +[[package]] +name = "futures-util" +version = "0.3.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c1d6de3acfef38d2be4b1f543f553131788603495be83da675e180c8d6b7bd1" +dependencies = [ + "futures-core", + "futures-task", + "pin-project-lite", + "pin-utils", +] + +[[package]] +name = "hashbrown" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" +dependencies = [ + "ahash", +] + +[[package]] +name = "hermit-abi" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee512640fe35acbfb4bb779db6f0d80704c2cacfa2e39b601ef3e3f47d1ae4c7" +dependencies = [ + "libc", +] + +[[package]] +name = "http" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd6effc99afb63425aff9b05836f029929e345a6148a14b7ecd5ab67af944482" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + +[[package]] +name = "http-body" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" +dependencies = [ + "bytes", + "http", + "pin-project-lite", +] + +[[package]] +name = "httparse" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904" + +[[package]] +name = "httpdate" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" + +[[package]] +name = "hyper" +version = "0.14.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e011372fa0b68db8350aa7a248930ecc7839bf46d8485577d69f117a75f164c" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "http", + "http-body", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2", + "tokio", + "tower-service", + "tracing", + "want", +] + +[[package]] +name = "hyper-client" +version = "0.1.0" +dependencies = [ + "common", + "hyper", + "tokio", +] + +[[package]] +name = "itoa" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "453ad9f582a441959e5f0d088b02ce04cfe8d51a8eaf077f12ac6d3e94164ca6" + +[[package]] +name = "libc" +version = "0.2.139" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "201de327520df007757c1f0adce6e827fe8562fbc28bfd9c15571c66ca1f5f79" + +[[package]] +name = "log" +version = "0.4.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "mio" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b9d9a46eff5b4ff64b45a9e316a6d1e0bc719ef429cbec4dc630684212bfdf9" +dependencies = [ + "libc", + "log", + "wasi", + "windows-sys", +] + +[[package]] +name = "num-traits" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "578ede34cf02f8924ab9447f50c28075b4d3e5b269972345e7e0372b38c6cdcd" +dependencies = [ + "autocfg", +] + +[[package]] +name = "num_cpus" +version = "1.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fac9e2da13b5eb447a6ce3d392f23a29d8694bff781bf03a16cd9ac8697593b" +dependencies = [ + "hermit-abi", + "libc", +] + +[[package]] +name = "once_cell" +version = "1.17.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7e5500299e16ebb147ae15a00a942af264cf3688f47923b8fc2cd5858f23ad3" + +[[package]] +name = "paste" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d01a5bd0424d00070b0098dd17ebca6f961a959dead1dbcbbbc1d1cd8d3deeba" + +[[package]] +name = "pin-project-lite" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" + +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] +name = "proc-macro2" +version = "1.0.51" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d727cae5b39d21da60fa540906919ad737832fe0b1c165da3a34d6548c849d6" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8856d8364d252a14d474036ea1358d63c9e6965c8e5c1885c18f73d70bff9c7b" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "rmp" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44519172358fd6d58656c86ab8e7fbc9e1490c3e8f14d35ed78ca0dd07403c9f" +dependencies = [ + "byteorder", + "num-traits", + "paste", +] + +[[package]] +name = "server" +version = "0.1.0" +dependencies = [ + "common", + "hyper", + "hyper-client", + "tokio", +] + +[[package]] +name = "socket2" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64a4a911eed85daf18834cfaa86a79b7d266ff93ff5ba14005426219480ed662" +dependencies = [ + "libc", + "winapi", +] + +[[package]] +name = "syn" +version = "1.0.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "tokio" +version = "1.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03201d01c3c27a29c8a5cee5b55a93ddae1ccf6f08f65365c2c918f8c1b76f64" +dependencies = [ + "autocfg", + "libc", + "mio", + "num_cpus", + "pin-project-lite", + "socket2", + "tokio-macros", + "windows-sys", +] + +[[package]] +name = "tokio-macros" +version = "1.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d266c00fde287f55d3f1c3e96c500c362a2b8c695076ec180f27918820bc6df8" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tower-service" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" + +[[package]] +name = "tracing" +version = "0.1.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" +dependencies = [ + "cfg-if", + "pin-project-lite", + "tracing-core", +] + +[[package]] +name = "tracing-core" +version = "0.1.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24eb03ba0eab1fd845050058ce5e616558e8f8d8fca633e6b163fe25c797213a" +dependencies = [ + "once_cell", +] + +[[package]] +name = "try-lock" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" + +[[package]] +name = "unicode-ident" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "775c11906edafc97bc378816b94585fbd9a054eabaf86fdd0ced94af449efab7" + +[[package]] +name = "version_check" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" + +[[package]] +name = "want" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0" +dependencies = [ + "log", + "try-lock", +] + +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "wasm" +version = "0.1.0" +dependencies = [ + "common", + "wasm-bindgen", +] + +[[package]] +name = "wasm-bindgen" +version = "0.2.84" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31f8dcbc21f30d9b8f2ea926ecb58f6b91192c17e9d33594b3df58b2007ca53b" +dependencies = [ + "cfg-if", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.84" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95ce90fd5bcc06af55a641a86428ee4229e44e07033963a2290a8e241607ccb9" +dependencies = [ + "bumpalo", + "log", + "once_cell", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.84" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c21f77c0bedc37fd5dc21f897894a5ca01e7bb159884559461862ae90c0b4c5" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.84" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2aff81306fcac3c7515ad4e177f521b5c9a15f2b08f4e32d823066102f35a5f6" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.84" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0046fef7e28c3804e5e38bfa31ea2a0f73905319b677e57ebe37e49358989b5d" + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "windows-sys" +version = "0.45.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-targets" +version = "0.42.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e2522491fbfcd58cc84d47aeb2958948c4b8982e9a2d8a2a35bbaed431390e7" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.42.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c9864e83243fdec7fc9c5444389dcbbfd258f745e7853198f365e3c4968a608" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.42.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c8b1b673ffc16c47a9ff48570a9d85e25d265735c503681332589af6253c6c7" + +[[package]] +name = "windows_i686_gnu" +version = "0.42.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de3887528ad530ba7bdbb1faa8275ec7a1155a45ffa57c37993960277145d640" + +[[package]] +name = "windows_i686_msvc" +version = "0.42.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf4d1122317eddd6ff351aa852118a2418ad4214e6613a50e0191f7004372605" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.42.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1040f221285e17ebccbc2591ffdc2d44ee1f9186324dd3e84e99ac68d699c45" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.42.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "628bfdf232daa22b0d64fdb62b09fcc36bb01f05a3939e20ab73aaf9470d0463" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.42.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "447660ad36a13288b1db4d4248e857b510e8c3a225c822ba4fb748c0aafecffd" diff --git a/collector/Cargo.toml b/collector/Cargo.toml new file mode 100644 index 00000000000..4ebff0acb1c --- /dev/null +++ b/collector/Cargo.toml @@ -0,0 +1,17 @@ +[workspace] +members = [ + "common", + "ffi", + "hyper-client", + "server", + "wasm" +] + +[profile.release] +codegen-units = 1 +lto = true +opt-level = 3 +panic = "abort" +strip = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/collector/bench/single.js b/collector/bench/single.js new file mode 100644 index 00000000000..58216fc865d --- /dev/null +++ b/collector/bench/single.js @@ -0,0 +1,37 @@ +const http = require('http') +const id = require('../../packages/dd-trace/src/id') +const msgpack = require('msgpack-lite') +const codec = msgpack.createCodec({ int64: true }) + +const traceId = id() +const spanId = id() +const startTime = id((Date.now() * 1e6).toString(), 10) +const finishTime = id(((Date.now() + 100) * 1e6).toString(), 10) + +const payload = msgpack.encode({ + 'events': [ + [1, startTime, traceId, spanId, 0, 'GET', '/some/path'], + [3, finishTime, traceId, spanId, 200] + ] +}, { codec }) + +const req = http.request({ + method: 'put', + port: 8127, + path: '/v0.1/events' +}, res => { + let data = '' + + res.on('data', chunk => { + data += chunk + }) + + res.on('end', () => { + console.log(`Response: ${data}`) // eslint-disable-line no-console + }) +}) + +req.setHeader('Content-Type', 'application/msgpack') +req.write(payload) + +req.end() diff --git a/collector/common/Cargo.toml b/collector/common/Cargo.toml new file mode 100644 index 00000000000..ebd18d00f58 --- /dev/null +++ b/collector/common/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "common" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +hashbrown = "0.13.2" +rmp = "0.8.11" diff --git a/collector/common/src/client.rs b/collector/common/src/client.rs new file mode 100644 index 00000000000..d0e7d9cea87 --- /dev/null +++ b/collector/common/src/client.rs @@ -0,0 +1,4 @@ +// TODO: Support streaming with a writer instead of slice. +pub trait Client { + fn request(&self, data: Vec); +} diff --git a/collector/common/src/exporting.rs b/collector/common/src/exporting.rs new file mode 100644 index 00000000000..c8a2c9ee3c7 --- /dev/null +++ b/collector/common/src/exporting.rs @@ -0,0 +1,7 @@ +use crate::tracing::Traces; + +pub mod agent; + +pub trait Exporter { + fn export(&self, traces: Traces); +} diff --git a/collector/common/src/exporting/agent.rs b/collector/common/src/exporting/agent.rs new file mode 100644 index 00000000000..5a12cdcfcd7 --- /dev/null +++ b/collector/common/src/exporting/agent.rs @@ -0,0 +1,140 @@ +use crate::client::Client; +use crate::tracing::{Trace, Traces, Span, Meta, Metrics}; +use super::Exporter; +use hashbrown::HashMap; +use rmp::encode; +use rmp::encode::ByteBuf; +use std::rc::Rc; + +pub struct AgentExporter { + client: Box +} + +impl Exporter for AgentExporter { + fn export(&self, traces: Traces) { + let mut wr = ByteBuf::new(); + let trace_count = traces.len(); + + if trace_count > 0 { + // println!("{:#?}", traces); + + self.encode_traces(&mut wr, traces); + + let data: Vec = wr.as_vec().to_vec(); + + // TODO: Get the response somehow (with a channel?) + // TODO: Make client reusable between requests (with a channel?) + self.client.request(data); + } + } +} + +impl AgentExporter { + pub fn new(client: Box) -> Self { + Self { + client + } + } + + fn cache_strings(&self, strings: &mut Vec>, positions: &mut HashMap, u32>, trace: &Trace) { + for span in trace.spans.values() { + self.cache_string(strings, positions, &span.service); + self.cache_string(strings, positions, &span.name); + self.cache_string(strings, positions, &span.resource); + self.cache_string(strings, positions, &span.span_type); + + for (k, v) in &span.meta { + self.cache_string(strings, positions, &k); + self.cache_string(strings, positions, &v); + } + + for (k, _) in &span.metrics { + self.cache_string(strings, positions, &k); + } + } + } + + fn cache_string(&self, strings: &mut Vec>, positions: &mut HashMap, u32>, s: &Rc) { + if !positions.contains_key(s) { + let len = strings.len() as u32; + + positions.insert(s.clone(), len); + strings.push(s.clone()); + } + } + + fn encode_strings(&self, wr: &mut ByteBuf, strings: &mut Vec>) { + encode::write_array_len(wr, strings.len() as u32).unwrap(); + + for s in strings { + encode::write_str(wr, s).unwrap(); + } + } + + fn encode_traces(&self, wr: &mut ByteBuf, traces: Traces) { + encode::write_array_len(wr, 2).unwrap(); + + let empty_string: Rc = Rc::from(""); + let mut strings = Vec::new(); + let mut positions = HashMap::new(); + + strings.push(empty_string.clone()); + positions.insert(empty_string.clone(), 0u32); + + // TODO: Avoid looping twice over traces/strings. + for trace in traces.values() { + self.cache_strings(&mut strings, &mut positions, trace); + } + + self.encode_strings(wr, &mut strings); + + encode::write_array_len(wr, traces.len() as u32).unwrap(); + + for trace in traces.values() { + self.encode_trace(wr, trace, &positions); + } + } + + fn encode_trace(&self, wr: &mut ByteBuf, trace: &Trace, positions: &HashMap, u32>) { + encode::write_array_len(wr, trace.spans.len() as u32).unwrap(); + + for span in trace.spans.values() { + self.encode_span(wr, span, positions); + } + } + + fn encode_span(&self, wr: &mut ByteBuf, span: &Span, positions: &HashMap, u32>) { + encode::write_array_len(wr, 12).unwrap(); + + encode::write_uint(wr, positions[&span.service] as u64).unwrap(); + encode::write_uint(wr, positions[&span.name] as u64).unwrap(); + encode::write_uint(wr, positions[&span.resource] as u64).unwrap(); + encode::write_uint(wr, span.trace_id).unwrap(); + encode::write_uint(wr, span.span_id).unwrap(); + encode::write_uint(wr, span.parent_id).unwrap(); + encode::write_uint(wr, span.start).unwrap(); + encode::write_uint(wr, span.duration + 1).unwrap(); + encode::write_uint(wr, span.error).unwrap(); + self.encode_meta(wr, &span.meta, positions); + self.encode_metrics(wr, &span.metrics, positions); + encode::write_uint(wr, positions[&span.span_type] as u64).unwrap(); + } + + fn encode_meta(&self, wr: &mut ByteBuf, meta: &Meta, positions: &HashMap, u32>) { + encode::write_map_len(wr, meta.len() as u32).unwrap(); + + for (k, v) in meta { + encode::write_uint(wr, positions[k] as u64).unwrap(); + encode::write_uint(wr, positions[v] as u64).unwrap(); + } + } + + fn encode_metrics(&self, wr: &mut ByteBuf, metrics: &Metrics, positions: &HashMap, u32>) { + encode::write_map_len(wr, metrics.len() as u32).unwrap(); + + for (k, v) in metrics { + encode::write_uint(wr, positions[k] as u64).unwrap(); + encode::write_f64(wr, *v).unwrap(); + } + } +} diff --git a/collector/common/src/lib.rs b/collector/common/src/lib.rs new file mode 100644 index 00000000000..72b23c147f8 --- /dev/null +++ b/collector/common/src/lib.rs @@ -0,0 +1,5 @@ +pub mod client; +pub mod exporting; +pub mod msgpack; +pub mod processing; +pub mod tracing; diff --git a/collector/common/src/msgpack.rs b/collector/common/src/msgpack.rs new file mode 100644 index 00000000000..2bf87a91ae0 --- /dev/null +++ b/collector/common/src/msgpack.rs @@ -0,0 +1,28 @@ +use std::io::Read; + +pub use rmp::decode::{NumValueReadError, read_array_len, read_f64, read_map_len, read_str_len}; + +pub fn read_u16(mut rd: R) -> Result { + rmp::decode::read_int(&mut rd) +} + +pub fn read_u32(mut rd: R) -> Result { + rmp::decode::read_int(&mut rd) +} + +pub fn read_u64(mut rd: R) -> Result { + rmp::decode::read_int(&mut rd) +} + +pub fn read_usize(mut rd: R) -> Result { + rmp::decode::read_int(&mut rd) +} + +pub fn read_str(mut rd: R) -> String { + let limit = read_str_len(&mut rd).unwrap() as u64; + let mut str = String::new(); + + rd.by_ref().take(limit).read_to_string(&mut str).unwrap(); + + str +} diff --git a/collector/common/src/processing.rs b/collector/common/src/processing.rs new file mode 100644 index 00000000000..173ff0cb33c --- /dev/null +++ b/collector/common/src/processing.rs @@ -0,0 +1,332 @@ +use crate::exporting::Exporter; +use crate::msgpack::{read_array_len, read_f64, read_map_len, read_str, read_u16, read_u64, read_usize}; +use crate::tracing::{Span, Trace, Traces, Meta, Metrics}; +use hashbrown::{HashMap, HashSet}; +use std::io::Read; +use std::rc::Rc; + +pub struct Processor { + exporter: Box, + traces: Traces, + strings: HashSet> +} + +// TODO: Decouple processing from exporting. +// TODO: Add support for more payload metadata (i.e. language). +// TODO: Custom more efficient events depending on span type. +// TODO: Store service metadata that can be used on every span like service name. +// TODO: Cache things like outgoing host/port or MySQL connection information. +// TODO: Event for adding trace tags. +// TODO: Event for adding baggage items. +// TODO: Add support for sampling. +// TODO: Support sending traces directly to Datadog. +// TODO: Optimize to minimize allocations and copies. + +impl Processor { + pub fn new(exporter: Box) -> Self { + Self { + exporter, + traces: Traces::new(), + // TODO: Figure out how to cache those properly. + strings: HashSet::from([Rc::from("")]) + } + } + + pub fn process(&mut self, mut rd: R) { + read_array_len(&mut rd).unwrap(); + + let string_count = read_array_len(&mut rd).unwrap(); + let mut strings: Vec> = Vec::with_capacity(string_count as usize); + + for _ in 0..string_count { + strings.push(Rc::from(read_str(&mut rd).as_str())); + } + + let event_count = read_array_len(&mut rd).unwrap(); + + for _ in 0..event_count { + self.process_event(&mut strings, &mut rd); + } + } + + pub fn flush(&mut self) { + let finished_traces: HashMap = self.traces + .drain_filter(|_, v| v.started == v.finished) + .collect(); + + self.exporter.export(finished_traces); + } + + fn process_event(&mut self, strings: &mut Vec>, mut rd: R) { + read_array_len(&mut rd).unwrap(); + + let event_type = read_u64(&mut rd).unwrap(); + + match event_type { + 1 => self.process_start_web_request(strings, rd), + 2 => self.process_add_error(strings, rd), + 3 => self.process_finish_web_request(strings, rd), + 4 => self.process_start_span(strings, rd), + 5 => self.process_finish_span(strings, rd), + 6 => self.process_add_tags(strings, rd), + 7 => self.process_strings(strings, rd), + 8 => self.process_start_mysql_query(strings, rd), + _ => () + } + } + + fn process_strings(&mut self, strings: &mut Vec>, mut rd: R) { + let size = read_array_len(&mut rd).unwrap(); + + strings.reserve(size as usize); + + for _ in 0..size { + strings.push(Rc::from(read_str(&mut rd).as_str())); + } + } + + // TODO: Store an error object instead of tags on the span. + fn process_add_error(&mut self, strings: &[Rc], mut rd: R) { + let size = read_array_len(&mut rd).unwrap(); + + read_u64(&mut rd).unwrap(); + + let trace_id = read_u64(&mut rd).unwrap(); + let span_id = read_u64(&mut rd).unwrap(); + + if size < 4 { + if let Some(trace) = self.traces.get_mut(&trace_id) { + if let Some(mut span) = trace.spans.get_mut(&span_id) { + span.error = 1; + } + } + } else { + let name_key = self.from_str("error.name"); + let name = strings[read_usize(&mut rd).unwrap()].clone(); + let message_key = self.from_str("error.message"); + let message = strings[read_usize(&mut rd).unwrap()].clone(); + let stack_key = self.from_str("error.stack"); + let stack = strings[read_usize(&mut rd).unwrap()].clone(); + + if let Some(trace) = self.traces.get_mut(&trace_id) { + if let Some(mut span) = trace.spans.get_mut(&span_id) { + span.error = 1; + + span.meta.insert(name_key, name); + span.meta.insert(message_key, message); + span.meta.insert(stack_key, stack); + } + } + } + } + + fn process_add_tags(&mut self, strings: &[Rc], mut rd: R) { + read_array_len(&mut rd).unwrap(); + read_u64(&mut rd).unwrap(); + + let trace_id = read_u64(&mut rd).unwrap(); + let span_id = read_u64(&mut rd).unwrap(); + let (meta, metrics) = self.read_tags(&mut rd, strings); + + if let Some(trace) = self.traces.get_mut(&trace_id) { + if let Some(span) = trace.spans.get_mut(&span_id) { + span.meta.extend(meta); + span.metrics.extend(metrics); + } + } + } + + fn process_start_span(&mut self, strings: &[Rc], mut rd: R) { + read_array_len(&mut rd).unwrap(); + + let start = read_u64(&mut rd).unwrap(); + let trace_id = read_u64(&mut rd).unwrap(); + let span_id = read_u64(&mut rd).unwrap(); + let parent_id = read_u64(&mut rd).unwrap(); + let service = strings[read_usize(&mut rd).unwrap()].clone(); + let name = strings[read_usize(&mut rd).unwrap()].clone(); + let resource = strings[read_usize(&mut rd).unwrap()].clone(); + let (meta, metrics) = self.read_tags(&mut rd, strings); + let span_type = strings[read_usize(&mut rd).unwrap()].clone(); + + let span = Span { + start, + trace_id, + span_id, + parent_id, + span_type, + name, + resource, + service, + error: 0, + duration: 0, + meta, + metrics + }; + + self.start_span(span); + } + + fn process_finish_span(&mut self, strings: &[Rc], mut rd: R) { + read_array_len(&mut rd).unwrap(); + + let start = read_u64(&mut rd).unwrap(); + let trace_id = read_u64(&mut rd).unwrap(); + let span_id = read_u64(&mut rd).unwrap(); + let (meta, metrics) = self.read_tags(&mut rd, strings); + + if let Some(mut trace) = self.traces.get_mut(&trace_id) { + if let Some(mut span) = trace.spans.get_mut(&span_id) { + trace.finished += 1; + + span.duration = start - span.start; + + span.meta.extend(meta); + span.metrics.extend(metrics); + } + } + } + + fn process_start_web_request(&mut self, strings: &[Rc], mut rd: R) { + let mut meta = HashMap::new(); + let metrics = HashMap::new(); + + read_array_len(&mut rd).unwrap(); + + let start = read_u64(&mut rd).unwrap(); + let trace_id = read_u64(&mut rd).unwrap(); + let span_id = read_u64(&mut rd).unwrap(); + let parent_id = read_u64(&mut rd).unwrap(); + let component = strings[read_usize(&mut rd).unwrap()].clone(); + let method = strings[read_usize(&mut rd).unwrap()].clone(); + let url = strings[read_usize(&mut rd).unwrap()].clone(); + let route = strings[read_usize(&mut rd).unwrap()].clone(); + + // TODO: How to cache string concatenation? + let name = Rc::from(format!("{component}.request")); + let resource = Rc::from(format!("{method} {route}")); + + meta.insert(self.from_str("http.method"), method); + meta.insert(self.from_str("http.url"), url); + + let span = Span { + start, + trace_id, + span_id, + parent_id, + span_type: self.from_str("web"), + name, + resource, + service: self.from_str("unnamed-app"), + error: 0, + duration: 0, + meta, + metrics + }; + + self.start_span(span); + } + + fn process_start_mysql_query(&mut self, strings: &[Rc], mut rd: R) { + let mut meta = HashMap::new(); + let mut metrics = HashMap::new(); + + read_array_len(&mut rd).unwrap(); + + let start = read_u64(&mut rd).unwrap(); + let trace_id = read_u64(&mut rd).unwrap(); + let span_id = read_u64(&mut rd).unwrap(); + let parent_id = read_u64(&mut rd).unwrap(); + let sql = strings[read_usize(&mut rd).unwrap()].clone(); + let database = strings[read_usize(&mut rd).unwrap()].clone(); + let user = strings[read_usize(&mut rd).unwrap()].clone(); + let host = strings[read_usize(&mut rd).unwrap()].clone(); + let port = read_u16(&mut rd).unwrap(); + + // TODO: How to cache string concatenation? + meta.insert(self.from_str("db.type"), self.from_str("mysql")); + meta.insert(self.from_str("db.user"), user); + meta.insert(self.from_str("db.name"), database); + meta.insert(self.from_str("out.host"), host); + metrics.insert(self.from_str("out.port"), port as f64); + + let span = Span { + start, + trace_id, + span_id, + parent_id, + span_type: self.from_str("sql"), + name: self.from_str("mysql.query"), + resource: sql, + service: self.from_str("unnamed-app-mysql"), + error: 0, + duration: 0, + meta, + metrics + }; + + self.start_span(span); + } + + fn process_finish_web_request(&mut self, _: &[Rc], mut rd: R) { + read_array_len(&mut rd).unwrap(); + + let start = read_u64(&mut rd).unwrap(); + let trace_id = read_u64(&mut rd).unwrap(); + let span_id = read_u64(&mut rd).unwrap(); + let status_code_key = self.from_str("http.status_code"); + let status_code = Rc::from(read_u16(&mut rd).unwrap().to_string()); + + if let Some(mut trace) = self.traces.get_mut(&trace_id) { + if let Some(mut span) = trace.spans.get_mut(&span_id) { + trace.finished += 1; + + span.duration = start - span.start; + span.meta.insert(status_code_key, status_code); + } + } + } + + fn read_tags(&self, mut rd: R, strings: &[Rc]) -> (Meta, Metrics){ + let mut meta = HashMap::new(); + let mut metrics = HashMap::new(); + + let meta_size = read_map_len(&mut rd).unwrap(); + + for _ in 0..meta_size { + meta.insert( + strings[read_usize(&mut rd).unwrap()].clone(), + strings[read_usize(&mut rd).unwrap()].clone() + ); + } + + let metrics_size = read_map_len(&mut rd).unwrap(); + + for _ in 0..metrics_size { + metrics.insert( + strings[read_usize(&mut rd).unwrap()].clone(), + read_f64(&mut rd).unwrap() + ); + } + + (meta, metrics) + } + + fn start_span(&mut self, span: Span) { + let trace = self.traces.entry(span.trace_id).or_default(); + + trace.started += 1; + trace.spans.insert(span.span_id, span); + } + + fn from_str(&mut self, s: &str) -> Rc { + match self.strings.get(s) { + Some(s) => s.clone(), + None => { + let s: Rc = Rc::from(s); + self.strings.insert(s.clone()); + s + } + } + } +} diff --git a/collector/common/src/tracing.rs b/collector/common/src/tracing.rs new file mode 100644 index 00000000000..726eeebe715 --- /dev/null +++ b/collector/common/src/tracing.rs @@ -0,0 +1,39 @@ +use hashbrown::HashMap; +use std::rc::Rc; + +pub type Meta = HashMap, Rc>; +pub type Metrics = HashMap, f64>; +pub type Traces = HashMap; + +#[derive(Debug)] +pub struct Span { + pub span_type: Rc, + pub trace_id: u64, + pub span_id: u64, + pub parent_id: u64, + pub name: Rc, + pub resource: Rc, + pub service: Rc, + pub error: u64, + pub start: u64, + pub duration: u64, + pub meta: Meta, + pub metrics: Metrics +} + +#[derive(Debug)] +pub struct Trace { + pub started: u64, + pub finished: u64, + pub spans: HashMap +} + +impl Default for Trace { + fn default() -> Self { + Self { + started: 0, + finished: 0, + spans: HashMap::new() + } + } +} diff --git a/collector/ffi/Cargo.toml b/collector/ffi/Cargo.toml new file mode 100644 index 00000000000..f181873cb15 --- /dev/null +++ b/collector/ffi/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "ffi" +version = "0.1.0" +edition = "2021" + +[lib] +crate_type = ["cdylib"] + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +common = { path = "../common" } +hyper-client = { path = "../hyper-client" } +tokio = { version = "1.25.0", features = ["macros", "rt-multi-thread", "sync"] } +libc = "0.2.139" diff --git a/collector/ffi/src/lib.rs b/collector/ffi/src/lib.rs new file mode 100644 index 00000000000..810c2be152c --- /dev/null +++ b/collector/ffi/src/lib.rs @@ -0,0 +1,33 @@ +use common::exporting::agent::AgentExporter; +use common::processing::Processor; +use hyper_client::HyperClient; +use tokio::sync::mpsc::{self, Receiver, Sender}; + +extern crate libc; + +#[no_mangle] +pub extern "C" fn submit(size: usize, ptr: *const u8) -> u32 { + internal_submit(unsafe { + std::slice::from_raw_parts(ptr as *const u8, size as usize) + }) as u32 +} + +#[tokio::main] +async fn internal_submit(payload: &[u8]) -> u32 { + let (tx, mut rx): (Sender<()>, Receiver<()>) = mpsc::channel(1); + + let mut client = Box::new(HyperClient::new()); + + client.on_response(tx); + + let exporter = Box::new(AgentExporter::new(client)); + let mut processor = Processor::new(exporter); + let mut rd = payload; + + processor.process(&mut rd); + processor.flush(); + + rx.recv().await.unwrap(); + + 0 // TODO: Return proper response buffer instead. +} diff --git a/collector/hyper-client/Cargo.toml b/collector/hyper-client/Cargo.toml new file mode 100644 index 00000000000..319f4e73db3 --- /dev/null +++ b/collector/hyper-client/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "hyper-client" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +common = { path = "../common" } +hyper = { version = "0.14.24", features = ["client", "http1", "runtime"] } +tokio = { version = "1.25.0", features = ["rt-multi-thread", "sync"] } diff --git a/collector/hyper-client/src/lib.rs b/collector/hyper-client/src/lib.rs new file mode 100644 index 00000000000..31988c7fe6a --- /dev/null +++ b/collector/hyper-client/src/lib.rs @@ -0,0 +1,55 @@ +use common::client::Client; +use tokio::sync::mpsc::Sender; + +pub struct HyperClient { + tx: Option> +} + +impl HyperClient { + pub fn new() -> Self { + Self { + tx: None + } + } + + // TODO: Require a sender in `new()` instead. + pub fn on_response (&mut self, tx: Sender<()>) { + self.tx = Some(tx); + } +} + +impl Default for HyperClient { + fn default() -> Self { + Self::new() + } +} + +impl Client for HyperClient { + fn request(&self, data: Vec) { + // TODO: configuration options + let req = hyper::Request::builder() + .method(hyper::Method::PUT) + .uri("http://localhost:8126/v0.5/traces") + .header("Content-Type", "application/msgpack") + // .header("X-Datadog-Trace-Count", trace_count.to_string()) + // .header("Datadog-Meta-Tracer-Version", "") + // .header("Datadog-Meta-Lang", "") + // .header("Datadog-Meta-Lang-Version", "") + // .header("Datadog-Meta-Lang-Interpreter", "") + .body(hyper::Body::from(data)) + .unwrap(); + + let tx = self.tx.clone(); + + tokio::spawn(async move { + let res = hyper::Client::new().request(req).await.unwrap(); + + // Discard the response for now. + hyper::body::to_bytes(res.into_body()).await.unwrap(); + + if let Some(tx) = tx { + tx.send(()).await.unwrap(); + } + }); + } +} diff --git a/collector/server/Cargo.toml b/collector/server/Cargo.toml new file mode 100644 index 00000000000..2fce1df1ce6 --- /dev/null +++ b/collector/server/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "server" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +common = { path = "../common" } +hyper-client = { path = "../hyper-client" } +hyper = { version = "0.14.24", features = ["http1", "runtime", "server"] } +tokio = { version = "1.25.0", features = ["macros", "rt-multi-thread", "sync"] } diff --git a/collector/server/src/main.rs b/collector/server/src/main.rs new file mode 100644 index 00000000000..257c0864c25 --- /dev/null +++ b/collector/server/src/main.rs @@ -0,0 +1,75 @@ +use common::exporting::agent::AgentExporter; +use common::processing::Processor; +use hyper::body::{Buf, Bytes}; +use hyper::{Body, Method, StatusCode}; +use hyper::http::Response; +use hyper::server::conn::Http; +use hyper::service::service_fn; +use hyper_client::HyperClient; +use std::net::SocketAddr; +use tokio::net::TcpListener; +use tokio::sync::mpsc; +use tokio::sync::mpsc::{Receiver,Sender}; + +// TODO: Stream the data somehow. +// TODO: Make sure that processor is cleaned up on connection close. +// TODO: Add proper error handling. +// TODO: Add tests. +// TODO: Add benchmarks. + +#[tokio::main] +async fn main() -> Result<(), Box> { + let addr = SocketAddr::from(([127, 0, 0, 1], 8127)); + let listener = TcpListener::bind(addr).await?; + + loop { + let (stream, _) = listener.accept().await?; + let (tx, mut rx): (Sender, Receiver) = mpsc::channel(100); + + tokio::spawn(async move { + while let Some(payload) = rx.recv().await { + let client = Box::new(HyperClient::new()); + let exporter = Box::new(AgentExporter::new(client)); + let mut processor = Processor::new(exporter); + let mut rd = payload.reader(); + + processor.process(&mut rd); + processor.flush(); + } + }); + + tokio::spawn(async move { + Http::new() + .http1_only(true) + .http1_keep_alive(true) + .serve_connection(stream, service_fn(move |mut req| { + let tx = tx.clone(); + + async move { + let body; + + match (req.method(), req.uri().path()) { + (&Method::PUT, "/v0.1/events") => { + // TODO: use body::aggregate instead + let bytes = hyper::body::to_bytes(req.body_mut()).await?; + + tx.send(bytes).await.unwrap(); + + body = Response::new(Body::from("")); + }, + _ => { + body = Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Body::from("")) + .unwrap() + } + } + + Ok::<_, hyper::Error>(body) + } + })) + .await + .unwrap(); + }); + } +} diff --git a/collector/wasm/Cargo.toml b/collector/wasm/Cargo.toml new file mode 100644 index 00000000000..8b566e70e73 --- /dev/null +++ b/collector/wasm/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "wasm" +version = "0.1.0" +edition = "2021" + +[lib] +crate_type = ["cdylib"] + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +common = { path = "../common" } +wasm-bindgen = "0.2.84" diff --git a/collector/wasm/src/client.rs b/collector/wasm/src/client.rs new file mode 100644 index 00000000000..00148c5a1b5 --- /dev/null +++ b/collector/wasm/src/client.rs @@ -0,0 +1,35 @@ +use common::client::Client; +use std::sync::mpsc::SyncSender; + +pub struct BufferClient { + tx: Option>> +} + +impl BufferClient { + pub fn new() -> Self { + Self { + tx: None + } + } + + // TODO: Require a sender in `new()` instead. + pub fn on_response (&mut self, tx: SyncSender>) { + self.tx = Some(tx); + } +} + +impl Default for BufferClient { + fn default() -> Self { + Self::new() + } +} + +impl Client for BufferClient { + fn request(&self, data: Vec) { + let tx = self.tx.clone(); + + if let Some(tx) = tx { + tx.send(data).unwrap(); + } + } +} diff --git a/collector/wasm/src/lib.rs b/collector/wasm/src/lib.rs new file mode 100644 index 00000000000..d294672d24b --- /dev/null +++ b/collector/wasm/src/lib.rs @@ -0,0 +1,28 @@ + +use client::BufferClient; +use common::exporting::agent::AgentExporter; +use common::processing::Processor; +use std::sync::mpsc::{SyncSender, Receiver, self}; +use wasm_bindgen::prelude::*; + +pub mod client; + +// TODO: Use WASI to submit from here instead of just returning the data. + +#[wasm_bindgen] +pub fn collect(payload: &[u8]) -> Vec { + let (tx, rx): (SyncSender>, Receiver>) = mpsc::sync_channel(1); + + let mut client = Box::new(BufferClient::new()); + + client.on_response(tx); + + let exporter = Box::new(AgentExporter::new(client)); + let mut processor = Processor::new(exporter); + let mut rd = payload; + + processor.process(&mut rd); + processor.flush(); + + rx.recv().unwrap() +} diff --git a/package.json b/package.json index 97801cdf259..9bf4634c09c 100644 --- a/package.json +++ b/package.json @@ -86,6 +86,7 @@ "path-to-regexp": "^0.1.2", "protobufjs": "^7.1.2", "retry": "^0.10.1", + "sbffi": "bengl/sbffi", "semver": "^5.5.0" }, "devDependencies": { diff --git a/yarn.lock b/yarn.lock index b73febf9ef7..987073047a1 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2646,6 +2646,11 @@ node-abort-controller@^3.0.1: resolved "https://registry.yarnpkg.com/node-abort-controller/-/node-abort-controller-3.0.1.tgz#f91fa50b1dee3f909afabb7e261b1e1d6b0cb74e" integrity sha512-/ujIVxthRs+7q6hsdjHMaj8hRG9NuWmwrz+JdRwZ14jdFoKSkm+vDsCbF9PLpnSqjaWQJuTmVtcWHNLr+vrOFw== +node-addon-api@^6.0.0: + version "6.0.0" + resolved "https://registry.yarnpkg.com/node-addon-api/-/node-addon-api-6.0.0.tgz#cfb3574e6df708ff71a30db6c4762d9e06e11c27" + integrity sha512-GyHvgPvUXBvAkXa0YvYnhilSB1A+FRYMpIVggKzPZqdaZfevZOuzfWzyvgzOwRLHBeo/MMswmJFsrNF4Nw1pmA== + node-gyp-build@^3.9.0: version "3.9.0" resolved "https://registry.yarnpkg.com/node-gyp-build/-/node-gyp-build-3.9.0.tgz#53a350187dd4d5276750da21605d1cb681d09e25" @@ -3190,6 +3195,12 @@ safe-regex-test@^1.0.0: resolved "https://registry.yarnpkg.com/safer-buffer/-/safer-buffer-2.1.2.tgz#44fa161b0187b9549dd84bb91802f9bd8385cd6a" integrity sha512-YZo3K82SD7Riyi0E1EQPojLz7kpepnSQI9IyPbHHg1XXXevb5dJI7tpyN2ADxGcQbHG7vcyRHk0cbwqcQriUtg== +sbffi@bengl/sbffi: + version "1.0.4" + resolved "https://codeload.github.com/bengl/sbffi/tar.gz/35a2584cec76060b7a883f0ece5c956e92d01144" + dependencies: + node-addon-api "^6.0.0" + semver@5.3.0: version "5.3.0" resolved "https://registry.yarnpkg.com/semver/-/semver-5.3.0.tgz#9b2ce5d3de02d17c6012ad326aa6b4d0cf54f94f"