Skip to content

feat(instrumentation-aws-sdk): add gen ai conventions for converse stream span #2769

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
May 22, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -402,13 +402,17 @@ export class AwsInstrumentation extends InstrumentationBase<AwsSdkInstrumentatio
request: normalizedRequest,
requestId: requestId,
};
self.servicesExtensions.responseHook(
const override = self.servicesExtensions.responseHook(
normalizedResponse,
span,
self.tracer,
self.getConfig(),
startTime
);
if (override) {
response.output = override;
normalizedResponse.data = override;
}
self._callUserResponseHook(span, normalizedResponse);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered whether this should be done for the user hook too but didn't think there's enough use case for it. Currently the change is only internal since AFAIK, users can't define service extensions

return response;
})
Expand Down Expand Up @@ -442,7 +446,9 @@ export class AwsInstrumentation extends InstrumentationBase<AwsSdkInstrumentatio
throw err;
})
.finally(() => {
span.end();
if (!requestMetadata.isStream) {
span.end();
}
});
promiseWithResponseLogic
.then(res => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ import {
export interface RequestMetadata {
// isIncoming - if true, then the operation callback / promise should be bind with the operation's span
isIncoming: boolean;
// isStream - if true, then the response is a stream so the span should not be ended by the middleware.
// the ServiceExtension must end the span itself, generally by wrapping the stream and ending after it is
// consumed.
isStream?: boolean;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know if the GenAI SIG discussed/documented wanting this behaviour of ending the span after the full stream is consumed? I've seen opinions vary when discussing HTTP streaming. See the guidance for HTTP client span duration here: https://github.com/open-telemetry/semantic-conventions/blob/main/docs/http/http-spans.md#http-client-span-duration

Is there any equivalent in the Python GenAI-related instrumentations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the link, that's interesting indeed in terms of response streaming. There isn't any guideline on gen ai spans, however I feel it's sort of implied by the conventions for token usage - there really isn't a way to populate them without keeping the span until the end of the stream. While the duration could keep streaming out of it, that would then need to override the end time of that span with the earlier timestamp, which I don't think the JS SDK supports.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While the duration could keep streaming out of it, that would then need to override the end time of that span with the earlier timestamp, which I don't think the JS SDK supports.

Correct, it does not support that. The (documented) behaviour then would be that the Span duration would be just up until the first response from the server, effectively TTFB.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah do you mean that we should use TTFB here? That would mean we couldn't record usage information though.

BTW, I realized that this might be closer to RPC than HTTP which has some specification for streaming

https://opentelemetry.io/docs/specs/semconv/rpc/rpc-spans/#message-event

an event for each message sent/received on client and server spans SHOULD be created

I think this also implies the overall span is for the whole stream. FWIW python keeps the span for the entire stream too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah do you mean that we should use TTFB here?

No, I did not mean to imply what this "should" do. I don't have a strong opinion one way or the other.

The HTTP span guidance from https://github.com/open-telemetry/semantic-conventions/blob/main/docs/http/http-spans.md#http-client-span-duration says:

Because of the potential for confusion around this, HTTP client library instrumentations SHOULD document their behavior around ending HTTP client spans.

I only meant to say that if it is decided to handle this stream by having the span end just on the initial response that this should be documented.

FWIW python keeps the span for the entire stream too.

Sounds good to me to have the intention for the JS instrumentation to be the same.

spanAttributes?: SpanAttributes;
spanKind?: SpanKind;
spanName?: string;
Expand All @@ -47,13 +51,14 @@ export interface ServiceExtension {
// called before request is sent, and after span is started
requestPostSpanHook?: (request: NormalizedRequest) => void;

// called after response is received. If value is returned, it replaces the response output.
responseHook?: (
response: NormalizedResponse,
span: Span,
tracer: Tracer,
config: AwsSdkInstrumentationConfig,
startTime: HrTime
) => void;
) => any | undefined;

updateMetricInstruments?: (meter: Meter) => void;
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,14 @@ export class ServicesExtensions implements ServiceExtension {
startTime: HrTime
) {
const serviceExtension = this.services.get(response.request.serviceName);
serviceExtension?.responseHook?.(response, span, tracer, config, startTime);

return serviceExtension?.responseHook?.(
response,
span,
tracer,
config,
startTime
);
}

updateMetricInstruments(meter: Meter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ import {
NormalizedRequest,
NormalizedResponse,
} from '../types';
import type {
ConverseStreamOutput,
TokenUsage,
} from '@aws-sdk/client-bedrock-runtime';
import {
hrTime,
hrTimeDuration,
Expand Down Expand Up @@ -93,7 +97,9 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
): RequestMetadata {
switch (request.commandName) {
case 'Converse':
return this.requestPreSpanHookConverse(request, config, diag);
return this.requestPreSpanHookConverse(request, config, diag, false);
case 'ConverseStream':
return this.requestPreSpanHookConverse(request, config, diag, true);
case 'InvokeModel':
return this.requestPreSpanHookInvokeModel(request, config, diag);
}
Expand All @@ -106,7 +112,8 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
private requestPreSpanHookConverse(
request: NormalizedRequest,
config: AwsSdkInstrumentationConfig,
diag: DiagLogger
diag: DiagLogger,
isStream: boolean
): RequestMetadata {
let spanName = GEN_AI_OPERATION_NAME_VALUE_CHAT;
const spanAttributes: Attributes = {
Expand Down Expand Up @@ -142,6 +149,7 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
return {
spanName,
isIncoming: false,
isStream,
spanAttributes,
};
}
Expand Down Expand Up @@ -328,6 +336,14 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
config,
startTime
);
case 'ConverseStream':
return this.responseHookConverseStream(
response,
span,
tracer,
config,
startTime
);
case 'InvokeModel':
return this.responseHookInvokeModel(response, span, tracer, config);
}
Expand All @@ -342,6 +358,63 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
) {
const { stopReason, usage } = response.data;

BedrockRuntimeServiceExtension.setStopReason(span, stopReason);
this.setUsage(response, span, usage, startTime);
}

private responseHookConverseStream(
response: NormalizedResponse,
span: Span,
tracer: Tracer,
config: AwsSdkInstrumentationConfig,
startTime: HrTime
) {
return {
...response.data,
stream: this.wrapConverseStreamResponse(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does stream: this.wrapConverseStreamResponse(...) overwrite the stream from ...response.data?
I think it will be nice to have a comment to call this out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup - added a comment about it

response,
response.data.stream,
span,
startTime
),
};
}

private async *wrapConverseStreamResponse(
response: NormalizedResponse,
stream: AsyncIterable<ConverseStreamOutput>,
span: Span,
startTime: HrTime
) {
try {
let usage: TokenUsage | undefined;
for await (const item of stream) {
BedrockRuntimeServiceExtension.setStopReason(
span,
item.messageStop?.stopReason
);
usage = item.metadata?.usage;
yield item;
}
this.setUsage(response, span, usage, startTime);
} finally {
span.end();
}
}

private static setStopReason(span: Span, stopReason: string | undefined) {
if (stopReason !== undefined) {
console.log(stopReason);
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [stopReason]);
}
}

private setUsage(
response: NormalizedResponse,
span: Span,
usage: TokenUsage | undefined,
startTime: HrTime
) {
const sharedMetricAttrs: Attributes = {
[ATTR_GEN_AI_SYSTEM]: GEN_AI_SYSTEM_VALUE_AWS_BEDROCK,
[ATTR_GEN_AI_OPERATION_NAME]: GEN_AI_OPERATION_NAME_VALUE_CHAT,
Expand Down Expand Up @@ -371,10 +444,6 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
});
}
}

if (stopReason !== undefined) {
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [stopReason]);
}
}

private responseHookInvokeModel(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@
*/

import { getTestSpans } from '@opentelemetry/contrib-test-utils';
import { metricReader } from './load-instrumentation';
import { meterProvider, metricExporter } from './load-instrumentation';

import {
BedrockRuntimeClient,
ConverseCommand,
ConverseStreamCommand,
ConversationRole,
InvokeModelCommand,
} from '@aws-sdk/client-bedrock-runtime';
Expand Down Expand Up @@ -106,6 +107,9 @@ describe('Bedrock', () => {

afterEach(async function () {
nockDone();

await meterProvider.forceFlush();
metricExporter.reset();
});

describe('Converse', () => {
Expand Down Expand Up @@ -154,7 +158,131 @@ describe('Bedrock', () => {
[ATTR_GEN_AI_RESPONSE_FINISH_REASONS]: ['max_tokens'],
});

const { resourceMetrics } = await metricReader.collect();
await meterProvider.forceFlush();
const [resourceMetrics] = metricExporter.getMetrics();
expect(resourceMetrics.scopeMetrics.length).toBe(1);
const scopeMetrics = resourceMetrics.scopeMetrics[0];
const tokenUsage = scopeMetrics.metrics.filter(
m => m.descriptor.name === 'gen_ai.client.token.usage'
);
expect(tokenUsage.length).toBe(1);
expect(tokenUsage[0].descriptor).toMatchObject({
name: 'gen_ai.client.token.usage',
type: 'HISTOGRAM',
description: 'Measures number of input and output tokens used',
unit: '{token}',
});
expect(tokenUsage[0].dataPoints.length).toBe(2);
expect(tokenUsage[0].dataPoints).toEqual(
expect.arrayContaining([
expect.objectContaining({
value: expect.objectContaining({
sum: 8,
}),
attributes: {
'gen_ai.system': 'aws.bedrock',
'gen_ai.operation.name': 'chat',
'gen_ai.request.model': 'amazon.titan-text-lite-v1',
'gen_ai.token.type': 'input',
},
}),
expect.objectContaining({
value: expect.objectContaining({
sum: 10,
}),
attributes: {
'gen_ai.system': 'aws.bedrock',
'gen_ai.operation.name': 'chat',
'gen_ai.request.model': 'amazon.titan-text-lite-v1',
'gen_ai.token.type': 'output',
},
}),
])
);

const operationDuration = scopeMetrics.metrics.filter(
m => m.descriptor.name === 'gen_ai.client.operation.duration'
);
expect(operationDuration.length).toBe(1);
expect(operationDuration[0].descriptor).toMatchObject({
name: 'gen_ai.client.operation.duration',
type: 'HISTOGRAM',
description: 'GenAI operation duration',
unit: 's',
});
expect(operationDuration[0].dataPoints.length).toBe(1);
expect(operationDuration[0].dataPoints).toEqual([
expect.objectContaining({
value: expect.objectContaining({
sum: expect.any(Number),
}),
attributes: {
'gen_ai.system': 'aws.bedrock',
'gen_ai.operation.name': 'chat',
'gen_ai.request.model': 'amazon.titan-text-lite-v1',
},
}),
]);
expect(
(operationDuration[0].dataPoints[0].value as any).sum
).toBeGreaterThan(0);
});
});

describe('ConverseStream', () => {
it('adds genai conventions', async () => {
const modelId = 'amazon.titan-text-lite-v1';
const messages = [
{
role: ConversationRole.USER,
content: [{ text: 'Say this is a test' }],
},
];
const inferenceConfig = {
maxTokens: 10,
temperature: 0.8,
topP: 1,
stopSequences: ['|'],
};

const command = new ConverseStreamCommand({
modelId,
messages,
inferenceConfig,
});

const response = await client.send(command);
const chunks: string[] = [];
for await (const item of response.stream!) {
const text = item.contentBlockDelta?.delta?.text;
if (text) {
chunks.push(text);
}
}
expect(chunks.join('')).toBe('Hi! How are you? How');

const testSpans: ReadableSpan[] = getTestSpans();
const converseSpans: ReadableSpan[] = testSpans.filter(
(s: ReadableSpan) => {
return s.name === 'chat amazon.titan-text-lite-v1';
}
);
expect(converseSpans.length).toBe(1);
expect(converseSpans[0].attributes).toMatchObject({
[ATTR_GEN_AI_SYSTEM]: GEN_AI_SYSTEM_VALUE_AWS_BEDROCK,
[ATTR_GEN_AI_OPERATION_NAME]: GEN_AI_OPERATION_NAME_VALUE_CHAT,
[ATTR_GEN_AI_REQUEST_MODEL]: modelId,
[ATTR_GEN_AI_REQUEST_MAX_TOKENS]: 10,
[ATTR_GEN_AI_REQUEST_TEMPERATURE]: 0.8,
[ATTR_GEN_AI_REQUEST_TOP_P]: 1,
[ATTR_GEN_AI_REQUEST_STOP_SEQUENCES]: ['|'],
[ATTR_GEN_AI_USAGE_INPUT_TOKENS]: 8,
[ATTR_GEN_AI_USAGE_OUTPUT_TOKENS]: 10,
[ATTR_GEN_AI_RESPONSE_FINISH_REASONS]: ['max_tokens'],
});

await meterProvider.forceFlush();
const [resourceMetrics] = metricExporter.getMetrics();
expect(resourceMetrics.scopeMetrics.length).toBe(1);
const scopeMetrics = resourceMetrics.scopeMetrics[0];
const tokenUsage = scopeMetrics.metrics.filter(
Expand Down Expand Up @@ -224,6 +352,7 @@ describe('Bedrock', () => {
});
});

// TODO: Instrument InvokeModel
describe('InvokeModel', () => {
it('adds amazon titan model attributes to span', async () => {
const modelId = 'amazon.titan-text-express-v1';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,25 @@
* specific test. We instead instantiate a single instrumentation instance here to
* use within all tests.
*/
import { registerInstrumentationTesting } from '@opentelemetry/contrib-test-utils';
import {
initMeterProvider,
registerInstrumentationTesting,
} from '@opentelemetry/contrib-test-utils';
AggregationTemporality,
InMemoryMetricExporter,
MeterProvider,
PeriodicExportingMetricReader,
} from '@opentelemetry/sdk-metrics';
import { AwsInstrumentation } from '../src';

export const instrumentation = new AwsInstrumentation();
export const metricReader = initMeterProvider(instrumentation);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized this pattern seemed to not work for multiple tests. I did try passing DELTA temporality in the MetricReader constructor since I thought it would fix it but it didn't. So I changed to a pattern inspired by what's used in Java

https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/main/testing-common/src/main/java/io/opentelemetry/instrumentation/testing/LibraryTestRunner.java#L122

export const metricExporter = new InMemoryMetricExporter(
AggregationTemporality.DELTA
);
export const meterProvider = new MeterProvider({
readers: [
new PeriodicExportingMetricReader({
exporter: metricExporter,
}),
],
});
instrumentation.setMeterProvider(meterProvider);
registerInstrumentationTesting(instrumentation);
Loading