diff --git a/plugins/node/opentelemetry-instrumentation-aws-sdk/src/aws-sdk.ts b/plugins/node/opentelemetry-instrumentation-aws-sdk/src/aws-sdk.ts index 159a27d5ca..353667a8d4 100644 --- a/plugins/node/opentelemetry-instrumentation-aws-sdk/src/aws-sdk.ts +++ b/plugins/node/opentelemetry-instrumentation-aws-sdk/src/aws-sdk.ts @@ -402,13 +402,17 @@ export class AwsInstrumentation extends InstrumentationBase { - span.end(); + if (!requestMetadata.isStream) { + span.end(); + } }); promiseWithResponseLogic .then(res => { diff --git a/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/ServiceExtension.ts b/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/ServiceExtension.ts index 1adc9bcd3a..fc30898002 100644 --- a/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/ServiceExtension.ts +++ b/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/ServiceExtension.ts @@ -31,6 +31,10 @@ import { export interface RequestMetadata { // isIncoming - if true, then the operation callback / promise should be bind with the operation's span isIncoming: boolean; + // isStream - if true, then the response is a stream so the span should not be ended by the middleware. + // the ServiceExtension must end the span itself, generally by wrapping the stream and ending after it is + // consumed. + isStream?: boolean; spanAttributes?: SpanAttributes; spanKind?: SpanKind; spanName?: string; @@ -47,13 +51,14 @@ export interface ServiceExtension { // called before request is sent, and after span is started requestPostSpanHook?: (request: NormalizedRequest) => void; + // called after response is received. If value is returned, it replaces the response output. responseHook?: ( response: NormalizedResponse, span: Span, tracer: Tracer, config: AwsSdkInstrumentationConfig, startTime: HrTime - ) => void; + ) => any | undefined; updateMetricInstruments?: (meter: Meter) => void; } diff --git a/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/ServicesExtensions.ts b/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/ServicesExtensions.ts index 939dc881b0..50fb3508d3 100644 --- a/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/ServicesExtensions.ts +++ b/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/ServicesExtensions.ts @@ -68,7 +68,14 @@ export class ServicesExtensions implements ServiceExtension { startTime: HrTime ) { const serviceExtension = this.services.get(response.request.serviceName); - serviceExtension?.responseHook?.(response, span, tracer, config, startTime); + + return serviceExtension?.responseHook?.( + response, + span, + tracer, + config, + startTime + ); } updateMetricInstruments(meter: Meter) { diff --git a/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/bedrock-runtime.ts b/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/bedrock-runtime.ts index b7c5f83972..f15a91ad39 100644 --- a/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/bedrock-runtime.ts +++ b/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/bedrock-runtime.ts @@ -46,6 +46,10 @@ import { NormalizedRequest, NormalizedResponse, } from '../types'; +import type { + ConverseStreamOutput, + TokenUsage, +} from '@aws-sdk/client-bedrock-runtime'; import { hrTime, hrTimeDuration, @@ -93,7 +97,9 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension { ): RequestMetadata { switch (request.commandName) { case 'Converse': - return this.requestPreSpanHookConverse(request, config, diag); + return this.requestPreSpanHookConverse(request, config, diag, false); + case 'ConverseStream': + return this.requestPreSpanHookConverse(request, config, diag, true); case 'InvokeModel': return this.requestPreSpanHookInvokeModel(request, config, diag); } @@ -106,7 +112,8 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension { private requestPreSpanHookConverse( request: NormalizedRequest, config: AwsSdkInstrumentationConfig, - diag: DiagLogger + diag: DiagLogger, + isStream: boolean ): RequestMetadata { let spanName = GEN_AI_OPERATION_NAME_VALUE_CHAT; const spanAttributes: Attributes = { @@ -142,6 +149,7 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension { return { spanName, isIncoming: false, + isStream, spanAttributes, }; } @@ -328,6 +336,14 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension { config, startTime ); + case 'ConverseStream': + return this.responseHookConverseStream( + response, + span, + tracer, + config, + startTime + ); case 'InvokeModel': return this.responseHookInvokeModel(response, span, tracer, config); } @@ -342,6 +358,64 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension { ) { const { stopReason, usage } = response.data; + BedrockRuntimeServiceExtension.setStopReason(span, stopReason); + this.setUsage(response, span, usage, startTime); + } + + private responseHookConverseStream( + response: NormalizedResponse, + span: Span, + tracer: Tracer, + config: AwsSdkInstrumentationConfig, + startTime: HrTime + ) { + return { + ...response.data, + // Wrap and replace the response stream to allow processing events to telemetry + // before yielding to the user. + stream: this.wrapConverseStreamResponse( + response, + response.data.stream, + span, + startTime + ), + }; + } + + private async *wrapConverseStreamResponse( + response: NormalizedResponse, + stream: AsyncIterable, + span: Span, + startTime: HrTime + ) { + try { + let usage: TokenUsage | undefined; + for await (const item of stream) { + BedrockRuntimeServiceExtension.setStopReason( + span, + item.messageStop?.stopReason + ); + usage = item.metadata?.usage; + yield item; + } + this.setUsage(response, span, usage, startTime); + } finally { + span.end(); + } + } + + private static setStopReason(span: Span, stopReason: string | undefined) { + if (stopReason !== undefined) { + span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [stopReason]); + } + } + + private setUsage( + response: NormalizedResponse, + span: Span, + usage: TokenUsage | undefined, + startTime: HrTime + ) { const sharedMetricAttrs: Attributes = { [ATTR_GEN_AI_SYSTEM]: GEN_AI_SYSTEM_VALUE_AWS_BEDROCK, [ATTR_GEN_AI_OPERATION_NAME]: GEN_AI_OPERATION_NAME_VALUE_CHAT, @@ -371,10 +445,6 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension { }); } } - - if (stopReason !== undefined) { - span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [stopReason]); - } } private responseHookInvokeModel( diff --git a/plugins/node/opentelemetry-instrumentation-aws-sdk/test/bedrock-runtime.test.ts b/plugins/node/opentelemetry-instrumentation-aws-sdk/test/bedrock-runtime.test.ts index 7d220cc63b..b01ed09010 100644 --- a/plugins/node/opentelemetry-instrumentation-aws-sdk/test/bedrock-runtime.test.ts +++ b/plugins/node/opentelemetry-instrumentation-aws-sdk/test/bedrock-runtime.test.ts @@ -28,11 +28,12 @@ */ import { getTestSpans } from '@opentelemetry/contrib-test-utils'; -import { metricReader } from './load-instrumentation'; +import { meterProvider, metricExporter } from './load-instrumentation'; import { BedrockRuntimeClient, ConverseCommand, + ConverseStreamCommand, ConversationRole, InvokeModelCommand, } from '@aws-sdk/client-bedrock-runtime'; @@ -106,6 +107,9 @@ describe('Bedrock', () => { afterEach(async function () { nockDone(); + + await meterProvider.forceFlush(); + metricExporter.reset(); }); describe('Converse', () => { @@ -154,7 +158,131 @@ describe('Bedrock', () => { [ATTR_GEN_AI_RESPONSE_FINISH_REASONS]: ['max_tokens'], }); - const { resourceMetrics } = await metricReader.collect(); + await meterProvider.forceFlush(); + const [resourceMetrics] = metricExporter.getMetrics(); + expect(resourceMetrics.scopeMetrics.length).toBe(1); + const scopeMetrics = resourceMetrics.scopeMetrics[0]; + const tokenUsage = scopeMetrics.metrics.filter( + m => m.descriptor.name === 'gen_ai.client.token.usage' + ); + expect(tokenUsage.length).toBe(1); + expect(tokenUsage[0].descriptor).toMatchObject({ + name: 'gen_ai.client.token.usage', + type: 'HISTOGRAM', + description: 'Measures number of input and output tokens used', + unit: '{token}', + }); + expect(tokenUsage[0].dataPoints.length).toBe(2); + expect(tokenUsage[0].dataPoints).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + value: expect.objectContaining({ + sum: 8, + }), + attributes: { + 'gen_ai.system': 'aws.bedrock', + 'gen_ai.operation.name': 'chat', + 'gen_ai.request.model': 'amazon.titan-text-lite-v1', + 'gen_ai.token.type': 'input', + }, + }), + expect.objectContaining({ + value: expect.objectContaining({ + sum: 10, + }), + attributes: { + 'gen_ai.system': 'aws.bedrock', + 'gen_ai.operation.name': 'chat', + 'gen_ai.request.model': 'amazon.titan-text-lite-v1', + 'gen_ai.token.type': 'output', + }, + }), + ]) + ); + + const operationDuration = scopeMetrics.metrics.filter( + m => m.descriptor.name === 'gen_ai.client.operation.duration' + ); + expect(operationDuration.length).toBe(1); + expect(operationDuration[0].descriptor).toMatchObject({ + name: 'gen_ai.client.operation.duration', + type: 'HISTOGRAM', + description: 'GenAI operation duration', + unit: 's', + }); + expect(operationDuration[0].dataPoints.length).toBe(1); + expect(operationDuration[0].dataPoints).toEqual([ + expect.objectContaining({ + value: expect.objectContaining({ + sum: expect.any(Number), + }), + attributes: { + 'gen_ai.system': 'aws.bedrock', + 'gen_ai.operation.name': 'chat', + 'gen_ai.request.model': 'amazon.titan-text-lite-v1', + }, + }), + ]); + expect( + (operationDuration[0].dataPoints[0].value as any).sum + ).toBeGreaterThan(0); + }); + }); + + describe('ConverseStream', () => { + it('adds genai conventions', async () => { + const modelId = 'amazon.titan-text-lite-v1'; + const messages = [ + { + role: ConversationRole.USER, + content: [{ text: 'Say this is a test' }], + }, + ]; + const inferenceConfig = { + maxTokens: 10, + temperature: 0.8, + topP: 1, + stopSequences: ['|'], + }; + + const command = new ConverseStreamCommand({ + modelId, + messages, + inferenceConfig, + }); + + const response = await client.send(command); + const chunks: string[] = []; + for await (const item of response.stream!) { + const text = item.contentBlockDelta?.delta?.text; + if (text) { + chunks.push(text); + } + } + expect(chunks.join('')).toBe('Hi! How are you? How'); + + const testSpans: ReadableSpan[] = getTestSpans(); + const converseSpans: ReadableSpan[] = testSpans.filter( + (s: ReadableSpan) => { + return s.name === 'chat amazon.titan-text-lite-v1'; + } + ); + expect(converseSpans.length).toBe(1); + expect(converseSpans[0].attributes).toMatchObject({ + [ATTR_GEN_AI_SYSTEM]: GEN_AI_SYSTEM_VALUE_AWS_BEDROCK, + [ATTR_GEN_AI_OPERATION_NAME]: GEN_AI_OPERATION_NAME_VALUE_CHAT, + [ATTR_GEN_AI_REQUEST_MODEL]: modelId, + [ATTR_GEN_AI_REQUEST_MAX_TOKENS]: 10, + [ATTR_GEN_AI_REQUEST_TEMPERATURE]: 0.8, + [ATTR_GEN_AI_REQUEST_TOP_P]: 1, + [ATTR_GEN_AI_REQUEST_STOP_SEQUENCES]: ['|'], + [ATTR_GEN_AI_USAGE_INPUT_TOKENS]: 8, + [ATTR_GEN_AI_USAGE_OUTPUT_TOKENS]: 10, + [ATTR_GEN_AI_RESPONSE_FINISH_REASONS]: ['max_tokens'], + }); + + await meterProvider.forceFlush(); + const [resourceMetrics] = metricExporter.getMetrics(); expect(resourceMetrics.scopeMetrics.length).toBe(1); const scopeMetrics = resourceMetrics.scopeMetrics[0]; const tokenUsage = scopeMetrics.metrics.filter( diff --git a/plugins/node/opentelemetry-instrumentation-aws-sdk/test/load-instrumentation.ts b/plugins/node/opentelemetry-instrumentation-aws-sdk/test/load-instrumentation.ts index 531ce19b82..0942f2f9ba 100644 --- a/plugins/node/opentelemetry-instrumentation-aws-sdk/test/load-instrumentation.ts +++ b/plugins/node/opentelemetry-instrumentation-aws-sdk/test/load-instrumentation.ts @@ -21,12 +21,25 @@ * specific test. We instead instantiate a single instrumentation instance here to * use within all tests. */ +import { registerInstrumentationTesting } from '@opentelemetry/contrib-test-utils'; import { - initMeterProvider, - registerInstrumentationTesting, -} from '@opentelemetry/contrib-test-utils'; + AggregationTemporality, + InMemoryMetricExporter, + MeterProvider, + PeriodicExportingMetricReader, +} from '@opentelemetry/sdk-metrics'; import { AwsInstrumentation } from '../src'; export const instrumentation = new AwsInstrumentation(); -export const metricReader = initMeterProvider(instrumentation); +export const metricExporter = new InMemoryMetricExporter( + AggregationTemporality.DELTA +); +export const meterProvider = new MeterProvider({ + readers: [ + new PeriodicExportingMetricReader({ + exporter: metricExporter, + }), + ], +}); +instrumentation.setMeterProvider(meterProvider); registerInstrumentationTesting(instrumentation); diff --git a/plugins/node/opentelemetry-instrumentation-aws-sdk/test/mock-responses/bedrock-conversestream-adds-genai-conventions.json b/plugins/node/opentelemetry-instrumentation-aws-sdk/test/mock-responses/bedrock-conversestream-adds-genai-conventions.json new file mode 100644 index 0000000000..2fd8efdee3 --- /dev/null +++ b/plugins/node/opentelemetry-instrumentation-aws-sdk/test/mock-responses/bedrock-conversestream-adds-genai-conventions.json @@ -0,0 +1,42 @@ +[ + { + "scope": "https://bedrock-runtime.us-east-1.amazonaws.com:443", + "method": "POST", + "path": "/model/amazon.titan-text-lite-v1/converse-stream", + "body": { + "inferenceConfig": { + "maxTokens": 10, + "stopSequences": [ + "|" + ], + "temperature": 0.8, + "topP": 1 + }, + "messages": [ + { + "content": [ + { + "text": "Say this is a test" + } + ], + "role": "user" + } + ] + }, + "status": 200, + "response": "00000081000000526cc176930b3a6576656e742d7479706507000c6d65737361676553746172740d3a636f6e74656e742d747970650700106170706c69636174696f6e2f6a736f6e0d3a6d6573736167652d747970650700056576656e747b2270223a2261626364222c22726f6c65223a22617373697374616e74227df512a005000000c600000057f67806450b3a6576656e742d74797065070011636f6e74656e74426c6f636b44656c74610d3a636f6e74656e742d747970650700106170706c69636174696f6e2f6a736f6e0d3a6d6573736167652d747970650700056576656e747b22636f6e74656e74426c6f636b496e646578223a302c2264656c7461223a7b2274657874223a2248692120486f772061726520796f753f20486f77227d2c2270223a226162636465666768696a6b6c6d6e6f70717273747576777879227da88f22a40000009500000056fecc83c80b3a6576656e742d74797065070010636f6e74656e74426c6f636b53746f700d3a636f6e74656e742d747970650700106170706c69636174696f6e2f6a736f6e0d3a6d6573736167652d747970650700056576656e747b22636f6e74656e74426c6f636b496e646578223a302c2270223a226162636465666768696a6b6c6d6e6f7071227d8dc2956d00000097000000511a68450b0b3a6576656e742d7479706507000b6d65737361676553746f700d3a636f6e74656e742d747970650700106170706c69636174696f6e2f6a736f6e0d3a6d6573736167652d747970650700056576656e747b2270223a226162636465666768696a6b6c6d6e6f7071727374222c2273746f70526561736f6e223a226d61785f746f6b656e73227ddb5bf387000000ce0000004ea263e5440b3a6576656e742d747970650700086d657461646174610d3a636f6e74656e742d747970650700106170706c69636174696f6e2f6a736f6e0d3a6d6573736167652d747970650700056576656e747b226d657472696373223a7b226c6174656e63794d73223a3736357d2c2270223a226162636465666768696a6b6c6d6e6f222c227573616765223a7b22696e707574546f6b656e73223a382c226f7574707574546f6b656e73223a31302c22746f74616c546f6b656e73223a31387d7d98eada7f", + "rawHeaders": [ + "Date", + "Fri, 21 Mar 2025 02:04:20 GMT", + "Content-Type", + "application/vnd.amazon.eventstream", + "Transfer-Encoding", + "chunked", + "Connection", + "keep-alive", + "x-amzn-RequestId", + "c01898c3-00ef-43e3-b015-e7458e9afc84" + ], + "responseIsBinary": true + } +] \ No newline at end of file