Skip to content

Commit 646d8c2

Browse files
authored
feat(llmobs): evaluation metrics can be submitted by agent proxy (#5466)
* rough implementation * pass in agentlessEnabled for eval metrics writer * all tests except module tests fixed * default agentlessEnabled to undefined * fix tests * try check against agent * enable tests for checking agent running * fix llmobs plugins * fix tests * remove uncaught exception in favor of better solution later * remove .only * fix tests * fix tests * fix tests and error content * fix test * add additional agent-proxy endpoint parsing and error parsing * simplify * mostly fix tests * refactor _url * remove cb references for now * reset test * more readable * more readable * maybe fix tests * update constants * clean up merge * only compute url on agentless change * remove utility function * fmt * add agentlessEnabled back for plugins * fix vertexai * error logic * pass processedEventSizeBytes to super.append * remove api key enforcement in submitEvaluation
1 parent 48d4a4a commit 646d8c2

File tree

26 files changed

+573
-367
lines changed

26 files changed

+573
-367
lines changed

docs/test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -574,7 +574,7 @@ const enabled = llmobs.enabled
574574
// manually enable
575575
llmobs.enable({
576576
mlApp: 'mlApp',
577-
agentlessEnabled: true
577+
agentlessEnabled: false
578578
})
579579

580580
// manually disable
@@ -594,7 +594,7 @@ llmobs.wrap({ kind: 'llm' }, function myLLM () {})()
594594
llmobs.wrap({ kind: 'llm', name: 'myLLM', modelName: 'myModel', modelProvider: 'myProvider' }, function myFunction () {})()
595595

596596
// export a span
597-
llmobs.enable({ mlApp: 'myApp' })
597+
llmobs.enable({ mlApp: 'myApp', agentlessEnabled: false })
598598
llmobs.trace({ kind: 'llm', name: 'myLLM' }, (span) => {
599599
const llmobsSpanCtx = llmobs.exportSpan(span)
600600
llmobsSpanCtx.traceId;

packages/dd-trace/src/config.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -530,7 +530,7 @@ class Config {
530530
this._setValue(defaults, 'isManualApiEnabled', false)
531531
this._setValue(defaults, 'langchain.spanCharLimit', 128)
532532
this._setValue(defaults, 'langchain.spanPromptCompletionSampleRate', 1.0)
533-
this._setValue(defaults, 'llmobs.agentlessEnabled', false)
533+
this._setValue(defaults, 'llmobs.agentlessEnabled', undefined)
534534
this._setValue(defaults, 'llmobs.enabled', false)
535535
this._setValue(defaults, 'llmobs.mlApp', undefined)
536536
this._setValue(defaults, 'ciVisibilityTestSessionName', '')

packages/dd-trace/src/llmobs/constants/writers.js

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
'use strict'
22

33
module.exports = {
4-
EVP_PROXY_AGENT_BASE_PATH: 'evp_proxy/v2',
5-
EVP_PROXY_AGENT_ENDPOINT: 'evp_proxy/v2/api/v2/llmobs',
4+
EVP_PROXY_AGENT_BASE_PATH: '/evp_proxy/v2/',
65
EVP_SUBDOMAIN_HEADER_NAME: 'X-Datadog-EVP-Subdomain',
7-
EVP_SUBDOMAIN_HEADER_VALUE: 'llmobs-intake',
8-
AGENTLESS_SPANS_ENDPOINT: '/api/v2/llmobs',
9-
AGENTLESS_EVALULATIONS_ENDPOINT: '/api/intake/llm-obs/v1/eval-metric',
6+
7+
SPANS_EVENT_TYPE: 'span',
8+
SPANS_INTAKE: 'llmobs-intake',
9+
SPANS_ENDPOINT: '/api/v2/llmobs',
10+
11+
EVALUATIONS_INTAKE: 'api',
12+
EVALUATIONS_EVENT_TYPE: 'evaluation_metric',
13+
EVALUATIONS_ENDPOINT: '/api/intake/llm-obs/v1/eval-metric',
1014

1115
EVP_PAYLOAD_SIZE_LIMIT: 5 << 20, // 5MB (actual limit is 5.1MB)
1216
EVP_EVENT_SIZE_LIMIT: (1 << 20) - 1024 // 999KB (actual limit is 1MB)

packages/dd-trace/src/llmobs/index.js

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ const evalMetricAppendCh = channel('llmobs:eval-metric:append')
1313
const flushCh = channel('llmobs:writers:flush')
1414
const injectCh = channel('dd-trace:span:inject')
1515

16-
const LLMObsAgentlessSpanWriter = require('./writers/spans/agentless')
17-
const LLMObsAgentProxySpanWriter = require('./writers/spans/agentProxy')
1816
const LLMObsEvalMetricsWriter = require('./writers/evaluations')
17+
const LLMObsSpanWriter = require('./writers/spans')
18+
const { setAgentStrategy } = require('./writers/util')
1919

2020
/**
2121
* Setting writers and processor globally when LLMObs is enabled
@@ -25,16 +25,22 @@ const LLMObsEvalMetricsWriter = require('./writers/evaluations')
2525
* if the tracer is `init`ed. But, in those cases, we don't want to start writers or subscribe
2626
* to channels.
2727
*/
28+
29+
/** @type {LLMObsSpanProcessor | null} */
2830
let spanProcessor
31+
32+
/** @type {LLMObsSpanWriter | null} */
2933
let spanWriter
34+
35+
/** @type {LLMObsEvalMetricsWriter | null} */
3036
let evalWriter
3137

3238
function enable (config) {
3339
const startTime = performance.now()
3440
// create writers and eval writer append and flush channels
3541
// span writer append is handled by the span processor
3642
evalWriter = new LLMObsEvalMetricsWriter(config)
37-
spanWriter = createSpanWriter(config)
43+
spanWriter = new LLMObsSpanWriter(config)
3844

3945
evalMetricAppendCh.subscribe(handleEvalMetricAppend)
4046
flushCh.subscribe(handleFlush)
@@ -46,7 +52,20 @@ function enable (config) {
4652

4753
// distributed tracing for llmobs
4854
injectCh.subscribe(handleLLMObsParentIdInjection)
49-
telemetry.recordLLMObsEnabled(startTime, config)
55+
56+
setAgentStrategy(config, useAgentless => {
57+
if (useAgentless && !(config.apiKey && config.site)) {
58+
throw new Error(
59+
'Cannot send LLM Observability data without a running agent or without both a Datadog API key and site.\n' +
60+
'Ensure these configurations are set before running your application.'
61+
)
62+
}
63+
64+
evalWriter?.setAgentless(useAgentless)
65+
spanWriter?.setAgentless(useAgentless)
66+
67+
telemetry.recordLLMObsEnabled(startTime, config)
68+
})
5069
}
5170

5271
function disable () {
@@ -74,11 +93,6 @@ function handleLLMObsParentIdInjection ({ carrier }) {
7493
carrier['x-datadog-tags'] += `,${PROPAGATED_PARENT_ID_KEY}=${parentId}`
7594
}
7695

77-
function createSpanWriter (config) {
78-
const SpanWriter = config.llmobs.agentlessEnabled ? LLMObsAgentlessSpanWriter : LLMObsAgentProxySpanWriter
79-
return new SpanWriter(config)
80-
}
81-
8296
function handleFlush () {
8397
try {
8498
spanWriter.flush()

packages/dd-trace/src/llmobs/sdk.js

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -287,13 +287,6 @@ class LLMObs extends NoopLLMObs {
287287
submitEvaluation (llmobsSpanContext, options = {}) {
288288
if (!this.enabled) return
289289

290-
if (!this._config.apiKey) {
291-
throw new Error(
292-
'DD_API_KEY is required for sending evaluation metrics. Evaluation metric data will not be sent.\n' +
293-
'Ensure this configuration is set before running your application.'
294-
)
295-
}
296-
297290
const { traceId, spanId } = llmobsSpanContext
298291
if (!traceId || !spanId) {
299292
throw new Error(

packages/dd-trace/src/llmobs/writers/base.js

Lines changed: 66 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,22 @@
11
'use strict'
22

33
const request = require('../../exporters/common/request')
4-
const { URL, format } = require('url')
4+
const { URL, format } = require('node:url')
5+
const path = require('node:path')
56

67
const logger = require('../../log')
78

89
const { encodeUnicode } = require('../util')
910
const telemetry = require('../telemetry')
1011
const log = require('../../log')
12+
const {
13+
EVP_SUBDOMAIN_HEADER_NAME,
14+
EVP_PROXY_AGENT_BASE_PATH
15+
} = require('../constants/writers')
16+
const { parseResponseAndLog } = require('./util')
1117

1218
class BaseLLMObsWriter {
13-
constructor ({ interval, timeout, endpoint, intake, eventType, protocol, port }) {
19+
constructor ({ interval, timeout, eventType, config, endpoint, intake }) {
1420
this._interval = interval || 1000 // 1s
1521
this._timeout = timeout || 5000 // 5s
1622
this._eventType = eventType
@@ -19,28 +25,20 @@ class BaseLLMObsWriter {
1925
this._bufferLimit = 1000
2026
this._bufferSize = 0
2127

22-
this._url = new URL(format({
23-
protocol: protocol || 'https:',
24-
hostname: intake,
25-
port: port || 443,
26-
pathname: endpoint
27-
}))
28-
29-
this._headers = {
30-
'Content-Type': 'application/json'
31-
}
28+
this._config = config
29+
this._endpoint = endpoint
30+
this._intake = intake
3231

3332
this._periodic = setInterval(() => {
3433
this.flush()
3534
}, this._interval).unref()
3635

37-
process.once('beforeExit', () => {
36+
this._beforeExitHandler = () => {
3837
this.destroy()
39-
})
38+
}
39+
process.once('beforeExit', this._beforeExitHandler)
4040

4141
this._destroyed = false
42-
43-
logger.debug(`Started ${this.constructor.name} to ${this._url}`)
4442
}
4543

4644
append (event, byteLength) {
@@ -55,7 +53,9 @@ class BaseLLMObsWriter {
5553
}
5654

5755
flush () {
58-
if (this._buffer.length === 0) {
56+
const noAgentStrategy = this._agentless == null
57+
58+
if (this._buffer.length === 0 || noAgentStrategy) {
5959
return
6060
}
6161

@@ -64,29 +64,12 @@ class BaseLLMObsWriter {
6464
this._bufferSize = 0
6565
const payload = this._encode(this.makePayload(events))
6666

67-
const options = {
68-
headers: this._headers,
69-
method: 'POST',
70-
url: this._url,
71-
timeout: this._timeout
72-
}
73-
7467
log.debug(`Encoded LLMObs payload: ${payload}`)
7568

69+
const options = this._getOptions()
70+
7671
request(payload, options, (err, resp, code) => {
77-
if (err) {
78-
logger.error(
79-
'Error sending %d LLMObs %s events to %s: %s', events.length, this._eventType, this._url, err.message, err
80-
)
81-
telemetry.recordDroppedPayload(events.length, this._eventType, 'request_error')
82-
} else if (code >= 300) {
83-
logger.error(
84-
'Error sending %d LLMObs %s events to %s: %s', events.length, this._eventType, this._url, code
85-
)
86-
telemetry.recordDroppedPayload(events.length, this._eventType, 'http_error')
87-
} else {
88-
logger.debug(`Sent ${events.length} LLMObs ${this._eventType} events to ${this._url}`)
89-
}
72+
parseResponseAndLog(err, code, events.length, options.url.href, this._eventType)
9073
})
9174
}
9275

@@ -96,12 +79,57 @@ class BaseLLMObsWriter {
9679
if (!this._destroyed) {
9780
logger.debug(`Stopping ${this.constructor.name}`)
9881
clearInterval(this._periodic)
99-
process.removeListener('beforeExit', this.destroy)
82+
process.removeListener('beforeExit', this._beforeExitHandler)
10083
this.flush()
10184
this._destroyed = true
10285
}
10386
}
10487

88+
setAgentless (agentless) {
89+
this._agentless = agentless
90+
this._url = this._getUrl()
91+
logger.debug(`Configuring ${this.constructor.name} to ${this._url.href}`)
92+
}
93+
94+
_getUrl () {
95+
if (this._agentless) {
96+
return new URL(format({
97+
protocol: 'https:',
98+
hostname: `${this._intake}.${this._config.site}`,
99+
pathname: this._endpoint
100+
}))
101+
}
102+
103+
const { hostname, port } = this._config
104+
const base = this._config.url || new URL(format({
105+
protocol: 'http:',
106+
hostname,
107+
port
108+
}))
109+
110+
const proxyPath = path.join(EVP_PROXY_AGENT_BASE_PATH, this._endpoint)
111+
return new URL(proxyPath, base)
112+
}
113+
114+
_getOptions () {
115+
const options = {
116+
headers: {
117+
'Content-Type': 'application/json'
118+
},
119+
method: 'POST',
120+
timeout: this._timeout,
121+
url: this._url
122+
}
123+
124+
if (this._agentless) {
125+
options.headers['DD-API-KEY'] = this._config.apiKey || ''
126+
} else {
127+
options.headers[EVP_SUBDOMAIN_HEADER_NAME] = this._intake
128+
}
129+
130+
return options
131+
}
132+
105133
_encode (payload) {
106134
return JSON.stringify(payload, (key, value) => {
107135
if (typeof value === 'string') {

packages/dd-trace/src/llmobs/writers/evaluations.js

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,20 @@
11
'use strict'
22

3-
const { AGENTLESS_EVALULATIONS_ENDPOINT } = require('../constants/writers')
3+
const {
4+
EVALUATIONS_ENDPOINT,
5+
EVALUATIONS_EVENT_TYPE,
6+
EVALUATIONS_INTAKE
7+
} = require('../constants/writers')
48
const BaseWriter = require('./base')
59

610
class LLMObsEvalMetricsWriter extends BaseWriter {
711
constructor (config) {
812
super({
9-
endpoint: AGENTLESS_EVALULATIONS_ENDPOINT,
10-
intake: `api.${config.site}`,
11-
eventType: 'evaluation_metric'
13+
config,
14+
intake: EVALUATIONS_INTAKE,
15+
eventType: EVALUATIONS_EVENT_TYPE,
16+
endpoint: EVALUATIONS_ENDPOINT
1217
})
13-
14-
this._headers['DD-API-KEY'] = config.apiKey
1518
}
1619

1720
makePayload (events) {

packages/dd-trace/src/llmobs/writers/spans/base.js renamed to packages/dd-trace/src/llmobs/writers/spans.js

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,27 @@
11
'use strict'
22

3-
const { EVP_EVENT_SIZE_LIMIT, EVP_PAYLOAD_SIZE_LIMIT } = require('../../constants/writers')
4-
const { DROPPED_VALUE_TEXT } = require('../../constants/text')
5-
const { DROPPED_IO_COLLECTION_ERROR } = require('../../constants/tags')
6-
const BaseWriter = require('../base')
7-
const telemetry = require('../../telemetry')
8-
const logger = require('../../../log')
3+
const {
4+
EVP_EVENT_SIZE_LIMIT,
5+
EVP_PAYLOAD_SIZE_LIMIT,
6+
SPANS_ENDPOINT,
7+
SPANS_EVENT_TYPE,
8+
SPANS_INTAKE
9+
} = require('../constants/writers')
10+
const { DROPPED_VALUE_TEXT } = require('../constants/text')
11+
const { DROPPED_IO_COLLECTION_ERROR } = require('../constants/tags')
12+
const BaseWriter = require('./base')
13+
const telemetry = require('../telemetry')
14+
const logger = require('../../log')
915

10-
const tracerVersion = require('../../../../../../package.json').version
16+
const tracerVersion = require('../../../../../package.json').version
1117

1218
class LLMObsSpanWriter extends BaseWriter {
13-
constructor (options) {
19+
constructor (config) {
1420
super({
15-
...options,
16-
eventType: 'span'
21+
config,
22+
eventType: SPANS_EVENT_TYPE,
23+
intake: SPANS_INTAKE,
24+
endpoint: SPANS_ENDPOINT
1725
})
1826
}
1927

@@ -33,11 +41,11 @@ class LLMObsSpanWriter extends BaseWriter {
3341
telemetry.recordLLMObsSpanSize(event, processedEventSizeBytes, shouldTruncate)
3442

3543
if (this._bufferSize + eventSizeBytes > EVP_PAYLOAD_SIZE_LIMIT) {
36-
logger.debug('Flusing queue because queing next event will exceed EvP payload limit')
44+
logger.debug('Flushing queue because queuing next event will exceed EvP payload limit')
3745
this.flush()
3846
}
3947

40-
super.append(event, eventSizeBytes)
48+
super.append(event, processedEventSizeBytes)
4149
}
4250

4351
makePayload (events) {

packages/dd-trace/src/llmobs/writers/spans/agentProxy.js

Lines changed: 0 additions & 23 deletions
This file was deleted.

0 commit comments

Comments
 (0)