-
Notifications
You must be signed in to change notification settings - Fork 580
feat(instrumentation-aws-sdk): add gen ai conventions for converse stream span #2769
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
7608585
938e1c2
7b351e1
22ea86d
fc4e015
409b434
4db49c3
d9c2938
4838419
c908fe9
c8e8796
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,6 +31,10 @@ import { | |
export interface RequestMetadata { | ||
// isIncoming - if true, then the operation callback / promise should be bind with the operation's span | ||
isIncoming: boolean; | ||
// isStream - if true, then the response is a stream so the span should not be ended by the middleware. | ||
// the ServiceExtension must end the span itself, generally by wrapping the stream and ending after it is | ||
// consumed. | ||
isStream?: boolean; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you know if the GenAI SIG discussed/documented wanting this behaviour of ending the span after the full stream is consumed? I've seen opinions vary when discussing HTTP streaming. See the guidance for HTTP client span duration here: https://github.com/open-telemetry/semantic-conventions/blob/main/docs/http/http-spans.md#http-client-span-duration Is there any equivalent in the Python GenAI-related instrumentations? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the link, that's interesting indeed in terms of response streaming. There isn't any guideline on gen ai spans, however I feel it's sort of implied by the conventions for token usage - there really isn't a way to populate them without keeping the span until the end of the stream. While the duration could keep streaming out of it, that would then need to override the end time of that span with the earlier timestamp, which I don't think the JS SDK supports. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Correct, it does not support that. The (documented) behaviour then would be that the Span duration would be just up until the first response from the server, effectively TTFB. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah do you mean that we should use TTFB here? That would mean we couldn't record usage information though. BTW, I realized that this might be closer to RPC than HTTP which has some specification for streaming https://opentelemetry.io/docs/specs/semconv/rpc/rpc-spans/#message-event
I think this also implies the overall span is for the whole stream. FWIW python keeps the span for the entire stream too. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
No, I did not mean to imply what this "should" do. I don't have a strong opinion one way or the other. The HTTP span guidance from https://github.com/open-telemetry/semantic-conventions/blob/main/docs/http/http-spans.md#http-client-span-duration says:
I only meant to say that if it is decided to handle this stream by having the span end just on the initial response that this should be documented.
Sounds good to me to have the intention for the JS instrumentation to be the same. |
||
spanAttributes?: SpanAttributes; | ||
spanKind?: SpanKind; | ||
spanName?: string; | ||
|
@@ -47,13 +51,14 @@ export interface ServiceExtension { | |
// called before request is sent, and after span is started | ||
requestPostSpanHook?: (request: NormalizedRequest) => void; | ||
|
||
// called after response is received. If value is returned, it replaces the response output. | ||
responseHook?: ( | ||
response: NormalizedResponse, | ||
span: Span, | ||
tracer: Tracer, | ||
config: AwsSdkInstrumentationConfig, | ||
startTime: HrTime | ||
) => void; | ||
) => any | undefined; | ||
|
||
updateMetricInstruments?: (meter: Meter) => void; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,6 +46,10 @@ import { | |
NormalizedRequest, | ||
NormalizedResponse, | ||
} from '../types'; | ||
import type { | ||
ConverseStreamOutput, | ||
TokenUsage, | ||
} from '@aws-sdk/client-bedrock-runtime'; | ||
trentm marked this conversation as resolved.
Show resolved
Hide resolved
|
||
import { | ||
hrTime, | ||
hrTimeDuration, | ||
|
@@ -93,7 +97,9 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension { | |
): RequestMetadata { | ||
switch (request.commandName) { | ||
case 'Converse': | ||
return this.requestPreSpanHookConverse(request, config, diag); | ||
return this.requestPreSpanHookConverse(request, config, diag, false); | ||
case 'ConverseStream': | ||
return this.requestPreSpanHookConverse(request, config, diag, true); | ||
case 'InvokeModel': | ||
return this.requestPreSpanHookInvokeModel(request, config, diag); | ||
} | ||
|
@@ -106,7 +112,8 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension { | |
private requestPreSpanHookConverse( | ||
request: NormalizedRequest, | ||
config: AwsSdkInstrumentationConfig, | ||
diag: DiagLogger | ||
diag: DiagLogger, | ||
isStream: boolean | ||
): RequestMetadata { | ||
let spanName = GEN_AI_OPERATION_NAME_VALUE_CHAT; | ||
const spanAttributes: Attributes = { | ||
|
@@ -142,6 +149,7 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension { | |
return { | ||
spanName, | ||
isIncoming: false, | ||
isStream, | ||
spanAttributes, | ||
}; | ||
} | ||
|
@@ -328,6 +336,14 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension { | |
config, | ||
startTime | ||
); | ||
case 'ConverseStream': | ||
return this.responseHookConverseStream( | ||
response, | ||
span, | ||
tracer, | ||
config, | ||
startTime | ||
); | ||
case 'InvokeModel': | ||
return this.responseHookInvokeModel(response, span, tracer, config); | ||
} | ||
|
@@ -342,6 +358,62 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension { | |
) { | ||
const { stopReason, usage } = response.data; | ||
|
||
BedrockRuntimeServiceExtension.setStopReason(span, stopReason); | ||
this.setUsage(response, span, usage, startTime); | ||
} | ||
|
||
private responseHookConverseStream( | ||
response: NormalizedResponse, | ||
span: Span, | ||
tracer: Tracer, | ||
config: AwsSdkInstrumentationConfig, | ||
startTime: HrTime | ||
) { | ||
return { | ||
...response.data, | ||
stream: this.wrapConverseStreamResponse( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yup - added a comment about it |
||
response, | ||
response.data.stream, | ||
span, | ||
startTime | ||
), | ||
}; | ||
} | ||
|
||
private async *wrapConverseStreamResponse( | ||
response: NormalizedResponse, | ||
stream: AsyncIterable<ConverseStreamOutput>, | ||
span: Span, | ||
startTime: HrTime | ||
) { | ||
try { | ||
let usage: TokenUsage | undefined; | ||
for await (const item of stream) { | ||
BedrockRuntimeServiceExtension.setStopReason( | ||
span, | ||
item.messageStop?.stopReason | ||
); | ||
usage = item.metadata?.usage; | ||
yield item; | ||
} | ||
this.setUsage(response, span, usage, startTime); | ||
} finally { | ||
span.end(); | ||
} | ||
} | ||
|
||
private static setStopReason(span: Span, stopReason: string | undefined) { | ||
if (stopReason !== undefined) { | ||
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [stopReason]); | ||
} | ||
} | ||
|
||
private setUsage( | ||
response: NormalizedResponse, | ||
span: Span, | ||
usage: TokenUsage | undefined, | ||
startTime: HrTime | ||
) { | ||
const sharedMetricAttrs: Attributes = { | ||
[ATTR_GEN_AI_SYSTEM]: GEN_AI_SYSTEM_VALUE_AWS_BEDROCK, | ||
[ATTR_GEN_AI_OPERATION_NAME]: GEN_AI_OPERATION_NAME_VALUE_CHAT, | ||
|
@@ -371,10 +443,6 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension { | |
}); | ||
} | ||
} | ||
|
||
if (stopReason !== undefined) { | ||
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [stopReason]); | ||
} | ||
} | ||
|
||
private responseHookInvokeModel( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,12 +21,25 @@ | |
* specific test. We instead instantiate a single instrumentation instance here to | ||
* use within all tests. | ||
*/ | ||
import { registerInstrumentationTesting } from '@opentelemetry/contrib-test-utils'; | ||
import { | ||
initMeterProvider, | ||
registerInstrumentationTesting, | ||
} from '@opentelemetry/contrib-test-utils'; | ||
AggregationTemporality, | ||
InMemoryMetricExporter, | ||
MeterProvider, | ||
PeriodicExportingMetricReader, | ||
} from '@opentelemetry/sdk-metrics'; | ||
import { AwsInstrumentation } from '../src'; | ||
|
||
export const instrumentation = new AwsInstrumentation(); | ||
export const metricReader = initMeterProvider(instrumentation); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I realized this pattern seemed to not work for multiple tests. I did try passing |
||
export const metricExporter = new InMemoryMetricExporter( | ||
AggregationTemporality.DELTA | ||
); | ||
export const meterProvider = new MeterProvider({ | ||
readers: [ | ||
new PeriodicExportingMetricReader({ | ||
exporter: metricExporter, | ||
}), | ||
], | ||
}); | ||
instrumentation.setMeterProvider(meterProvider); | ||
registerInstrumentationTesting(instrumentation); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I considered whether this should be done for the user hook too but didn't think there's enough use case for it. Currently the change is only internal since AFAIK, users can't define service extensions