Skip to content

feat: add Bedrock InvokeModelWithResponseStream instrumentation #2845

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@
case 'ConverseStream':
return this.requestPreSpanHookConverse(request, config, diag, true);
case 'InvokeModel':
return this.requestPreSpanHookInvokeModel(request, config, diag);
return this.requestPreSpanHookInvokeModel(request, config, diag, false);
case 'InvokeModelWithResponseStream':
return this.requestPreSpanHookInvokeModel(request, config, diag, true);
}

return {
Expand Down Expand Up @@ -157,7 +159,8 @@
private requestPreSpanHookInvokeModel(
request: NormalizedRequest,
config: AwsSdkInstrumentationConfig,
diag: DiagLogger
diag: DiagLogger,
isStream: boolean
): RequestMetadata {
let spanName: string | undefined;
const spanAttributes: Attributes = {
Expand Down Expand Up @@ -312,6 +315,7 @@
return {
spanName,
isIncoming: false,
isStream,
spanAttributes,
};
}
Expand Down Expand Up @@ -346,6 +350,13 @@
);
case 'InvokeModel':
return this.responseHookInvokeModel(response, span, tracer, config);
case 'InvokeModelWithResponseStream':
return this.responseHookInvokeModelWithResponseStream(
response,
span,
tracer,
config
);
}
}

Expand Down Expand Up @@ -579,4 +590,134 @@
}
}
}

private async responseHookInvokeModelWithResponseStream(
response: NormalizedResponse,
span: Span,
tracer: Tracer,
config: AwsSdkInstrumentationConfig
): Promise<any> {
const stream = response.data?.body;
const modelId = response.request.commandInput?.modelId;
if (!stream || !span.isRecording()) return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (!stream || !span.isRecording()) return;
if (!stream) return;

!span.isRecording() is already checked before responseHookInvokeModelWithResponseStream() is called


const wrappedStream = instrumentAsyncIterable(
stream,
async (chunk: { chunk?: { bytes?: Uint8Array } }) => {
const parsedChunk = parseChunk(chunk?.chunk?.bytes);

if (!parsedChunk) return;

if (modelId.includes('amazon.titan')) {
recordTitanAttributes(parsedChunk);
} else if (modelId.includes('anthropic.claude')) {
recordClaudeAttributes(parsedChunk);
} else if (modelId.includes('amazon.nova')) {
recordNovaAttributes(parsedChunk);
}
}
);
// Replace the original response body with our instrumented stream.
// - Defers span.end() until the entire stream is consumed
// This ensures downstream consumers still receive the full stream correctly,
// while OpenTelemetry can record span attributes from streamed data.
response.data.body = (async function* () {
try {
for await (const item of wrappedStream) {
yield item;
}
} finally {
span.end();
}
})();
return response.data;

// Tap into the stream at the chunk level without modifying the chunk itself.
function instrumentAsyncIterable<T>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be more efficient to declare these following functions outside of this member function scope?

stream: AsyncIterable<T>,
onChunk: (chunk: T) => void
): AsyncIterable<T> {
return {
[Symbol.asyncIterator]: async function* () {
for await (const chunk of stream) {
onChunk(chunk);
yield chunk;
}
},
};
}

function parseChunk(bytes?: Uint8Array): any {
if (!bytes || !(bytes instanceof Uint8Array)) return null;
try {
const str = Buffer.from(bytes).toString('utf-8');
return JSON.parse(str);
} catch (err) {
console.warn('Failed to parse streamed chunk', err);
return null;

Check warning on line 657 in plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/bedrock-runtime.ts

View check run for this annotation

Codecov / codecov/patch

plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/bedrock-runtime.ts#L656-L657

Added lines #L656 - L657 were not covered by tests
}
}

function recordNovaAttributes(parsedChunk: any) {
if (parsedChunk.metadata?.usage !== undefined) {
if (parsedChunk.metadata?.usage.inputTokens !== undefined) {
span.setAttribute(
ATTR_GEN_AI_USAGE_INPUT_TOKENS,
parsedChunk.metadata.usage.inputTokens
);
}
if (parsedChunk.metadata?.usage.outputTokens !== undefined) {
span.setAttribute(
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
parsedChunk.metadata.usage.outputTokens
);
}
}
if (parsedChunk.messageStop?.stopReason !== undefined) {
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
parsedChunk.messageStop.stopReason,
]);
}
}

function recordClaudeAttributes(parsedChunk: any) {
if (parsedChunk.message?.usage?.input_tokens !== undefined) {
span.setAttribute(
ATTR_GEN_AI_USAGE_INPUT_TOKENS,
parsedChunk.message.usage.input_tokens
);
}
if (parsedChunk.message?.usage?.output_tokens !== undefined) {
span.setAttribute(
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
parsedChunk.message.usage.output_tokens
);
}
if (parsedChunk.delta?.stop_reason !== undefined) {
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
parsedChunk.delta.stop_reason,
]);
}
}

function recordTitanAttributes(parsedChunk: any) {
if (parsedChunk.inputTextTokenCount !== undefined) {
span.setAttribute(
ATTR_GEN_AI_USAGE_INPUT_TOKENS,
parsedChunk.inputTextTokenCount
);
}
if (parsedChunk.totalOutputTextTokenCount !== undefined) {
span.setAttribute(
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
parsedChunk.totalOutputTextTokenCount
);
}
if (parsedChunk.completionReason !== undefined) {
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
parsedChunk.completionReason,
]);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import {
ConverseStreamCommand,
ConversationRole,
InvokeModelCommand,
InvokeModelWithResponseStreamCommand,
} from '@aws-sdk/client-bedrock-runtime';
import { AwsCredentialIdentity } from '@aws-sdk/types';
import * as path from 'path';
Expand Down Expand Up @@ -79,6 +80,7 @@ const sanitizeRecordings = (scopes: Definition[]) => {
describe('Bedrock', () => {
nockBack.fixtures = path.join(__dirname, 'mock-responses');
let credentials: AwsCredentialIdentity | undefined;

if (nockBack.currentMode === 'dryrun') {
credentials = {
accessKeyId: 'testing',
Expand Down Expand Up @@ -642,4 +644,184 @@ describe('Bedrock', () => {
});
});
});

describe('InvokeModelWithStreams', () => {
it('adds amazon titan model attributes to span', async () => {
const modelId = 'amazon.titan-text-lite-v1';
const prompt = '\n\nHuman: Hello, How are you today? \n\nAssistant:';

const body = {
inputText: prompt,
textGenerationConfig: {
maxTokenCount: 10,
temperature: 0.8,
topP: 1,
stopSequences: ['|'],
},
};
const command = new InvokeModelWithResponseStreamCommand({
modelId,
body: JSON.stringify(body),
contentType: 'application/json',
accept: 'application/json',
});

const response = await client.send(command);
console.log('response', response);

let collectedText = '';
if (!response.body) return;
for await (const chunk of response.body) {
if (chunk?.chunk?.bytes instanceof Uint8Array) {
const parsed = JSON.parse(decodeChunk(chunk));
collectedText += parsed.outputText;
}
}
expect(collectedText).toBe(" Hello there! I'm doing well. Thank you");

const invokeModelSpans: ReadableSpan[] =
getInvokeModelWithResponseStreamSpans();

expect(invokeModelSpans.length).toBe(1);
expect(invokeModelSpans[0].attributes).toMatchObject({
[ATTR_GEN_AI_SYSTEM]: GEN_AI_SYSTEM_VALUE_AWS_BEDROCK,
[ATTR_GEN_AI_REQUEST_MODEL]: modelId,
[ATTR_GEN_AI_REQUEST_MAX_TOKENS]: 10,
[ATTR_GEN_AI_REQUEST_TEMPERATURE]: 0.8,
[ATTR_GEN_AI_REQUEST_TOP_P]: 1,
[ATTR_GEN_AI_REQUEST_STOP_SEQUENCES]: ['|'],
[ATTR_GEN_AI_USAGE_INPUT_TOKENS]: 13,
[ATTR_GEN_AI_USAGE_OUTPUT_TOKENS]: 10,
[ATTR_GEN_AI_RESPONSE_FINISH_REASONS]: ['LENGTH'],
});
});
it('adds claude model attributes to span', async () => {
const modelId = 'anthropic.claude-3-5-sonnet-20240620-v1:0';
const prompt = '\n\nHuman: Hello, How are you today? \n\nAssistant:';

const body = {
anthropic_version: 'bedrock-2023-05-31',
max_tokens: 12,
top_k: 250,
stop_sequences: ['|'],
temperature: 0.8,
top_p: 1,
messages: [
{
role: 'user',
content: [
{
type: 'text',
text: prompt,
},
],
},
],
};

const command = new InvokeModelWithResponseStreamCommand({
modelId,
body: JSON.stringify(body),
contentType: 'application/json',
accept: 'application/json',
});

const response = await client.send(command);

let collectedText = '';
if (!response.body) return;
for await (const chunk of response.body) {
if (chunk?.chunk?.bytes instanceof Uint8Array) {
const parsed = JSON.parse(decodeChunk(chunk));
if (
parsed.type === 'content_block_delta' &&
parsed.delta?.type === 'text_delta'
) {
collectedText += parsed.delta.text;
}
}
}

expect(collectedText).toBe(
"Hello! I'm doing well, thank you for asking."
);

const invokeModelSpans: ReadableSpan[] =
getInvokeModelWithResponseStreamSpans();

expect(invokeModelSpans.length).toBe(1);
expect(invokeModelSpans[0].attributes).toMatchObject({
[ATTR_GEN_AI_SYSTEM]: GEN_AI_SYSTEM_VALUE_AWS_BEDROCK,
[ATTR_GEN_AI_REQUEST_MODEL]: modelId,
[ATTR_GEN_AI_REQUEST_MAX_TOKENS]: 12,
[ATTR_GEN_AI_REQUEST_TEMPERATURE]: 0.8,
[ATTR_GEN_AI_REQUEST_TOP_P]: 1,
[ATTR_GEN_AI_REQUEST_STOP_SEQUENCES]: ['|'],
[ATTR_GEN_AI_USAGE_INPUT_TOKENS]: 22,
[ATTR_GEN_AI_USAGE_OUTPUT_TOKENS]: 1,
[ATTR_GEN_AI_RESPONSE_FINISH_REASONS]: ['max_tokens'],
});
});

it('adds amazon nova model attributes to span', async () => {
const modelId = 'amazon.nova-pro-v1:0';
const prompt = 'Say this is a test';
const nativeRequest: any = {
messages: [{ role: 'user', content: [{ text: prompt }] }],
inferenceConfig: {
max_new_tokens: 10,
temperature: 0.8,
top_p: 1,
stopSequences: ['|'],
},
};
const command = new InvokeModelWithResponseStreamCommand({
modelId,
body: JSON.stringify(nativeRequest),
});

const response = await client.send(command);

let collectedText = '';
if (!response.body) return;
for await (const chunk of response.body) {
if (chunk?.chunk?.bytes instanceof Uint8Array) {
const parsed = JSON.parse(decodeChunk(chunk));
if (parsed.contentBlockDelta?.delta) {
collectedText += parsed.contentBlockDelta?.delta.text;
}
}
}

expect(collectedText).toBe(
"Certainly! If you're indicating that this interaction"
);

const invokeModelSpans: ReadableSpan[] =
getInvokeModelWithResponseStreamSpans();

expect(invokeModelSpans.length).toBe(1);
expect(invokeModelSpans[0].attributes).toMatchObject({
[ATTR_GEN_AI_SYSTEM]: GEN_AI_SYSTEM_VALUE_AWS_BEDROCK,
[ATTR_GEN_AI_REQUEST_MODEL]: modelId,
[ATTR_GEN_AI_REQUEST_MAX_TOKENS]: 10,
[ATTR_GEN_AI_REQUEST_TEMPERATURE]: 0.8,
[ATTR_GEN_AI_REQUEST_TOP_P]: 1,
[ATTR_GEN_AI_REQUEST_STOP_SEQUENCES]: ['|'],
[ATTR_GEN_AI_USAGE_INPUT_TOKENS]: 5,
[ATTR_GEN_AI_USAGE_OUTPUT_TOKENS]: 10,
[ATTR_GEN_AI_RESPONSE_FINISH_REASONS]: ['max_tokens'],
});
});
});

function getInvokeModelWithResponseStreamSpans(): ReadableSpan[] {
return getTestSpans().filter((s: ReadableSpan) => {
return s.name === 'BedrockRuntime.InvokeModelWithResponseStream';
});
}

function decodeChunk(chunk: any) {
return Buffer.from(chunk.chunk.bytes).toString('utf-8');
}
});
Loading