diff --git a/aws-distro-opentelemetry-node-autoinstrumentation/src/aws-batch-unsampled-span-processor.ts b/aws-distro-opentelemetry-node-autoinstrumentation/src/aws-batch-unsampled-span-processor.ts index ff87f5b0..3d6b89a9 100644 --- a/aws-distro-opentelemetry-node-autoinstrumentation/src/aws-batch-unsampled-span-processor.ts +++ b/aws-distro-opentelemetry-node-autoinstrumentation/src/aws-batch-unsampled-span-processor.ts @@ -2,17 +2,10 @@ // SPDX-License-Identifier: Apache-2.0 // Modifications Copyright The OpenTelemetry Authors. Licensed under the Apache License 2.0 License. -import { context, Context, diag, TraceFlags } from '@opentelemetry/api'; -import { - BindOnceFuture, - ExportResultCode, - getEnv, - globalErrorHandler, - suppressTracing, - unrefTimer, -} from '@opentelemetry/core'; -import { BufferConfig, ReadableSpan, Span, SpanExporter, SpanProcessor } from '@opentelemetry/sdk-trace-base'; +import { Context, TraceFlags } from '@opentelemetry/api'; +import { ReadableSpan, BufferConfig, Span } from '@opentelemetry/sdk-trace-base'; import { AWS_ATTRIBUTE_KEYS } from './aws-attribute-keys'; +import { BatchSpanProcessorBase } from '@opentelemetry/sdk-trace-base/build/src/export/BatchSpanProcessorBase'; /** * This class is a customized version of the `BatchSpanProcessorBase` from the @@ -41,54 +34,16 @@ import { AWS_ATTRIBUTE_KEYS } from './aws-attribute-keys'; * The rest of the behavior—batch processing, queuing, and exporting spans in * batches—is inherited from the base class and remains largely the same. */ -export class AwsBatchUnsampledSpanProcessor implements SpanProcessor { - private readonly _maxExportBatchSize: number; - private readonly _maxQueueSize: number; - private readonly _scheduledDelayMillis: number; - private readonly _exportTimeoutMillis: number; - - private _isExporting: boolean = false; - private _finishedSpans: ReadableSpan[] = []; - private _timer: NodeJS.Timeout | undefined; - private _shutdownOnce: BindOnceFuture; - private _droppedSpansCount: number = 0; - - constructor(private readonly _exporter: SpanExporter, config?: BufferConfig) { - const env = getEnv(); - this._maxExportBatchSize = - typeof config?.maxExportBatchSize === 'number' ? config.maxExportBatchSize : env.OTEL_BSP_MAX_EXPORT_BATCH_SIZE; - this._maxQueueSize = typeof config?.maxQueueSize === 'number' ? config.maxQueueSize : env.OTEL_BSP_MAX_QUEUE_SIZE; - this._scheduledDelayMillis = - typeof config?.scheduledDelayMillis === 'number' ? config.scheduledDelayMillis : env.OTEL_BSP_SCHEDULE_DELAY; - this._exportTimeoutMillis = - typeof config?.exportTimeoutMillis === 'number' ? config.exportTimeoutMillis : env.OTEL_BSP_EXPORT_TIMEOUT; - - this._shutdownOnce = new BindOnceFuture(this._shutdown, this); - - if (this._maxExportBatchSize > this._maxQueueSize) { - diag.warn( - 'BatchSpanProcessor: maxExportBatchSize must be smaller or equal to maxQueueSize, setting maxExportBatchSize to match maxQueueSize' - ); - this._maxExportBatchSize = this._maxQueueSize; - } - } - - forceFlush(): Promise { - if (this._shutdownOnce.isCalled) { - return this._shutdownOnce.promise; - } - return this._flushAll(); - } - - onStart(span: Span, _parentContext: Context): void { +export class AwsBatchUnsampledSpanProcessor extends BatchSpanProcessorBase { + override onStart(span: Span, _parentContext: Context): void { if ((span.spanContext().traceFlags & TraceFlags.SAMPLED) === 0) { span.setAttribute(AWS_ATTRIBUTE_KEYS.AWS_TRACE_FLAG_UNSAMPLED, true); return; } } - onEnd(span: ReadableSpan): void { - if (this._shutdownOnce.isCalled) { + override onEnd(span: ReadableSpan): void { + if ((this as any)._shutdownOnce.isCalled) { return; } @@ -96,156 +51,7 @@ export class AwsBatchUnsampledSpanProcessor implements SpanProcessor { return; } - this._addToBuffer(span); - } - - shutdown(): Promise { - return this._shutdownOnce.call(); - } - - private _shutdown() { - return Promise.resolve() - .then(() => { - return this.onShutdown(); - }) - .then(() => { - return this._flushAll(); - }) - .then(() => { - return this._exporter.shutdown(); - }); - } - - /** Add a span in the buffer. */ - private _addToBuffer(span: ReadableSpan) { - if (this._finishedSpans.length >= this._maxQueueSize) { - // limit reached, drop span - - if (this._droppedSpansCount === 0) { - diag.debug('maxQueueSize reached, dropping spans'); - } - this._droppedSpansCount++; - - return; - } - - if (this._droppedSpansCount > 0) { - // some spans were dropped, log once with count of spans dropped - diag.warn(`Dropped ${this._droppedSpansCount} spans because maxQueueSize reached`); - this._droppedSpansCount = 0; - } - - this._finishedSpans.push(span); - this._maybeStartTimer(); - } - - /** - * Send all spans to the exporter respecting the batch size limit - * This function is used only on forceFlush or shutdown, - * for all other cases _flush should be used - * */ - private _flushAll(): Promise { - return new Promise((resolve, reject) => { - const promises = []; - // calculate number of batches - const count = Math.ceil(this._finishedSpans.length / this._maxExportBatchSize); - for (let i = 0, j = count; i < j; i++) { - promises.push(this._flushOneBatch()); - } - Promise.all(promises) - .then(() => { - resolve(); - }) - .catch(reject); - }); - } - - private _flushOneBatch(): Promise { - this._clearTimer(); - if (this._finishedSpans.length === 0) { - return Promise.resolve(); - } - return new Promise((resolve, reject) => { - const timer = setTimeout(() => { - // don't wait anymore for export, this way the next batch can start - reject(new Error('Timeout')); - }, this._exportTimeoutMillis); - // prevent downstream exporter calls from generating spans - context.with(suppressTracing(context.active()), () => { - // Reset the finished spans buffer here because the next invocations of the _flush method - // could pass the same finished spans to the exporter if the buffer is cleared - // outside the execution of this callback. - let spans: ReadableSpan[]; - if (this._finishedSpans.length <= this._maxExportBatchSize) { - spans = this._finishedSpans; - this._finishedSpans = []; - } else { - spans = this._finishedSpans.splice(0, this._maxExportBatchSize); - } - - const doExport = () => - this._exporter.export(spans, result => { - clearTimeout(timer); - if (result.code === ExportResultCode.SUCCESS) { - resolve(); - } else { - reject(result.error ?? new Error('BatchSpanProcessor: span export failed')); - } - }); - - let pendingResources: Array> | null = null; - for (let i = 0, len = spans.length; i < len; i++) { - const span = spans[i]; - if (span.resource.asyncAttributesPending && span.resource.waitForAsyncAttributes) { - pendingResources ??= []; - pendingResources.push(span.resource.waitForAsyncAttributes()); - } - } - - // Avoid scheduling a promise to make the behavior more predictable and easier to test - if (pendingResources === null) { - doExport(); - } else { - Promise.all(pendingResources).then(doExport, err => { - globalErrorHandler(err); - reject(err); - }); - } - }); - }); - } - - private _maybeStartTimer() { - if (this._isExporting) return; - const flush = () => { - this._isExporting = true; - this._flushOneBatch() - .finally(() => { - this._isExporting = false; - if (this._finishedSpans.length > 0) { - this._clearTimer(); - this._maybeStartTimer(); - } - }) - .catch(e => { - this._isExporting = false; - globalErrorHandler(e); - }); - }; - // we only wait if the queue doesn't have enough elements yet - if (this._finishedSpans.length >= this._maxExportBatchSize) { - return flush(); - } - if (this._timer !== undefined) return; - this._timer = setTimeout(() => flush(), this._scheduledDelayMillis); - unrefTimer(this._timer); - } - - private _clearTimer() { - if (this._timer !== undefined) { - clearTimeout(this._timer); - this._timer = undefined; - } + (this as any)._addToBuffer(span); } onShutdown(): void {} diff --git a/aws-distro-opentelemetry-node-autoinstrumentation/src/aws-opentelemetry-configurator.ts b/aws-distro-opentelemetry-node-autoinstrumentation/src/aws-opentelemetry-configurator.ts index a9894728..8ddc39c5 100644 --- a/aws-distro-opentelemetry-node-autoinstrumentation/src/aws-opentelemetry-configurator.ts +++ b/aws-distro-opentelemetry-node-autoinstrumentation/src/aws-opentelemetry-configurator.ts @@ -65,10 +65,13 @@ const APPLICATION_SIGNALS_ENABLED_CONFIG: string = 'OTEL_AWS_APPLICATION_SIGNALS const APPLICATION_SIGNALS_EXPORTER_ENDPOINT_CONFIG: string = 'OTEL_AWS_APPLICATION_SIGNALS_EXPORTER_ENDPOINT'; const METRIC_EXPORT_INTERVAL_CONFIG: string = 'OTEL_METRIC_EXPORT_INTERVAL'; const DEFAULT_METRIC_EXPORT_INTERVAL_MILLIS: number = 60000; -const AWS_LAMBDA_FUNCTION_NAME_CONFIG: string = 'AWS_LAMBDA_FUNCTION_NAME'; +export const AWS_LAMBDA_FUNCTION_NAME_CONFIG: string = 'AWS_LAMBDA_FUNCTION_NAME'; const AWS_XRAY_DAEMON_ADDRESS_CONFIG: string = 'AWS_XRAY_DAEMON_ADDRESS'; const FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX = 'T1S'; const FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX = 'T1U'; +// Follow Python SDK Impl to set the max span batch size +// which will reduce the chance of UDP package size is larger than 64KB +const LAMBDA_SPAN_EXPORT_BATCH_SIZE = 10; /** * Aws Application Signals Config Provider creates a configuration object that can be provided to * the OTel NodeJS SDK for Auto Instrumentation with Application Signals Functionality. @@ -128,6 +131,9 @@ export class AwsOpentelemetryConfigurator { if (!resourceDetectorsFromEnv.includes('env')) { defaultDetectors.push(envDetectorSync); } + } else if (isLambdaEnvironment()) { + // If in Lambda environment, only keep env detector as default + defaultDetectors.push(envDetectorSync); } else { /* * envDetectorSync is used as opposed to envDetector (async), so it is guaranteed that the @@ -229,17 +235,6 @@ export class AwsOpentelemetryConfigurator { diag.info('AWS Application Signals enabled.'); - // Register BatchUnsampledSpanProcessor to export unsampled traces in Lambda - // when Application Signals enabled - if (isLambdaEnvironment()) { - spanProcessors.push( - new AwsBatchUnsampledSpanProcessor( - new OTLPUdpSpanExporter(getXrayDaemonEndpoint(), FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX) - ) - ); - diag.info('Enabled batch unsampled span processor for Lambda environment.'); - } - let exportIntervalMillis: number = Number(process.env[METRIC_EXPORT_INTERVAL_CONFIG]); diag.debug(`AWS Application Signals Metrics export interval: ${exportIntervalMillis}`); @@ -256,12 +251,30 @@ export class AwsOpentelemetryConfigurator { exporter: applicationSignalsMetricExporter, exportIntervalMillis: exportIntervalMillis, }); - const meterProvider: MeterProvider = new MeterProvider({ - /** Resource associated with metric telemetry */ - resource: resource, - readers: [periodicExportingMetricReader], - }); - spanProcessors.push(AwsSpanMetricsProcessorBuilder.create(meterProvider, resource).build()); + + // Register BatchUnsampledSpanProcessor to export unsampled traces in Lambda + // when Application Signals enabled + if (isLambdaEnvironment()) { + spanProcessors.push( + new AwsBatchUnsampledSpanProcessor( + new OTLPUdpSpanExporter(getXrayDaemonEndpoint(), FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX), + { + maxExportBatchSize: getSpanExportBatchSize(), + } + ) + ); + diag.info('Enabled batch unsampled span processor for Lambda environment.'); + } + + // Disable Application Metrics for Lambda environment + if (!isLambdaEnvironment()) { + const meterProvider: MeterProvider = new MeterProvider({ + /** Resource associated with metric telemetry */ + resource: resource, + readers: [periodicExportingMetricReader], + }); + spanProcessors.push(AwsSpanMetricsProcessorBuilder.create(meterProvider, resource).build()); + } } static customizeSampler(sampler: Sampler): Sampler { @@ -415,7 +428,11 @@ export class AwsSpanProcessorProvider { // eslint-disable-next-line @typescript-eslint/typedef let protocol = this.getOtlpProtocol(); - if (AwsOpentelemetryConfigurator.isApplicationSignalsEnabled() && isLambdaEnvironment()) { + // If `isLambdaEnvironment` is true, we will default to exporting OTel spans via `udp_exporter` to Fluxpump, + // regardless of whether `AppSignals` is true or false. + // However, if the customer has explicitly set the `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT`, + // we will continue using the `otlp_exporter` to send OTel traces to the specified endpoint. + if (!hasCustomOtlpTraceEndpoint() && isLambdaEnvironment()) { protocol = 'udp'; } switch (protocol) { @@ -521,7 +538,9 @@ export class AwsSpanProcessorProvider { if (exporter instanceof ConsoleSpanExporter) { return new SimpleSpanProcessor(configuredExporter); } else { - return new BatchSpanProcessor(configuredExporter); + return new BatchSpanProcessor(configuredExporter, { + maxExportBatchSize: getSpanExportBatchSize(), + }); } }); } @@ -616,11 +635,22 @@ function getSamplerProbabilityFromEnv(environment: Required): numbe return probability; } -function isLambdaEnvironment() { +function getSpanExportBatchSize() { + if (isLambdaEnvironment()) { + return LAMBDA_SPAN_EXPORT_BATCH_SIZE; + } + return undefined; +} + +export function isLambdaEnvironment() { // detect if running in AWS Lambda environment return process.env[AWS_LAMBDA_FUNCTION_NAME_CONFIG] !== undefined; } +function hasCustomOtlpTraceEndpoint() { + return process.env['OTEL_EXPORTER_OTLP_TRACES_ENDPOINT'] !== undefined; +} + function getXrayDaemonEndpoint() { return process.env[AWS_XRAY_DAEMON_ADDRESS_CONFIG]; } diff --git a/aws-distro-opentelemetry-node-autoinstrumentation/src/aws-span-processing-util.ts b/aws-distro-opentelemetry-node-autoinstrumentation/src/aws-span-processing-util.ts index c916276a..ff37978c 100644 --- a/aws-distro-opentelemetry-node-autoinstrumentation/src/aws-span-processing-util.ts +++ b/aws-distro-opentelemetry-node-autoinstrumentation/src/aws-span-processing-util.ts @@ -18,6 +18,7 @@ import { } from '@opentelemetry/semantic-conventions'; import { AWS_ATTRIBUTE_KEYS } from './aws-attribute-keys'; import * as SQL_DIALECT_KEYWORDS_JSON from './configuration/sql_dialect_keywords.json'; +import { AWS_LAMBDA_FUNCTION_NAME_CONFIG, isLambdaEnvironment } from './aws-opentelemetry-configurator'; /** Utility class designed to support shared logic across AWS Span Processors. */ export class AwsSpanProcessingUtil { @@ -50,6 +51,9 @@ export class AwsSpanProcessingUtil { let operation: string = span.name; if (AwsSpanProcessingUtil.shouldUseInternalOperation(span)) { operation = AwsSpanProcessingUtil.INTERNAL_OPERATION; + } + if (isLambdaEnvironment()) { + operation = process.env[AWS_LAMBDA_FUNCTION_NAME_CONFIG] + '/Handler'; } else if (!AwsSpanProcessingUtil.isValidOperation(span, operation)) { operation = AwsSpanProcessingUtil.generateIngressOperation(span); } diff --git a/aws-distro-opentelemetry-node-autoinstrumentation/src/patches/instrumentation-patch.ts b/aws-distro-opentelemetry-node-autoinstrumentation/src/patches/instrumentation-patch.ts index 155d7afc..4c673600 100644 --- a/aws-distro-opentelemetry-node-autoinstrumentation/src/patches/instrumentation-patch.ts +++ b/aws-distro-opentelemetry-node-autoinstrumentation/src/patches/instrumentation-patch.ts @@ -7,6 +7,30 @@ import { AWS_ATTRIBUTE_KEYS } from '../aws-attribute-keys'; import { RequestMetadata } from '../third-party/otel/aws/services/ServiceExtension'; import { KinesisServiceExtension } from './aws/services/kinesis'; import { S3ServiceExtension } from './aws/services/s3'; +import { AwsLambdaInstrumentation } from '@opentelemetry/instrumentation-aws-lambda'; +import { + Context as OtelContext, + context as otelContext, + trace, + propagation, + TextMapGetter, + isSpanContextValid, + ROOT_CONTEXT, + diag, +} from '@opentelemetry/api'; +import { APIGatewayProxyEventHeaders, Context } from 'aws-lambda'; +import { AWSXRAY_TRACE_ID_HEADER, AWSXRayPropagator } from '@opentelemetry/propagator-aws-xray'; + +export const traceContextEnvironmentKey = '_X_AMZN_TRACE_ID'; +const awsPropagator = new AWSXRayPropagator(); +export const headerGetter: TextMapGetter = { + keys(carrier: any): string[] { + return Object.keys(carrier); + }, + get(carrier: any, key: string) { + return carrier[key]; + }, +}; export function applyInstrumentationPatches(instrumentations: Instrumentation[]): void { /* @@ -18,7 +42,7 @@ export function applyInstrumentationPatches(instrumentations: Instrumentation[]) Where possible, automated testing should be run to catch upstream changes resulting in broken patches */ - instrumentations.forEach(instrumentation => { + instrumentations.forEach((instrumentation, index) => { if (instrumentation.instrumentationName === '@opentelemetry/instrumentation-aws-sdk') { // Access private property servicesExtensions of AwsInstrumentation // eslint-disable-next-line @typescript-eslint/ban-ts-comment @@ -29,10 +53,48 @@ export function applyInstrumentationPatches(instrumentations: Instrumentation[]) services.set('Kinesis', new KinesisServiceExtension()); patchSqsServiceExtension(services.get('SQS')); } + } else if (instrumentation.instrumentationName === '@opentelemetry/instrumentation-aws-lambda') { + diag.debug('Overriding aws lambda instrumentation'); + const lambdaInstrumentation = new AwsLambdaInstrumentation({ + eventContextExtractor: customExtractor, + disableAwsContextPropagation: true, + }); + instrumentations[index] = lambdaInstrumentation; } }); } +/* + * This function `customExtractor` is used to extract SpanContext for AWS Lambda functions. + * It first attempts to extract the trace context from the AWS X-Ray header, which is stored in the Lambda environment variables. + * If a valid span context is extracted from the environment, it uses this as the parent context for the function's tracing. + * If the X-Ray header is missing or invalid, it falls back to extracting trace context from the Lambda handler's event headers. + * If neither approach succeeds, it defaults to using the root Otel context, ensuring the function is still instrumented for tracing. + */ +export const customExtractor = (event: any, _handlerContext: Context): OtelContext => { + let parent: OtelContext | undefined = undefined; + const lambdaTraceHeader = process.env[traceContextEnvironmentKey]; + if (lambdaTraceHeader) { + parent = awsPropagator.extract( + otelContext.active(), + { [AWSXRAY_TRACE_ID_HEADER]: lambdaTraceHeader }, + headerGetter + ); + } + if (parent) { + const spanContext = trace.getSpan(parent)?.spanContext(); + if (spanContext && isSpanContextValid(spanContext)) { + return parent; + } + } + const httpHeaders = event.headers || {}; + const extractedContext = propagation.extract(otelContext.active(), httpHeaders, headerGetter); + if (trace.getSpan(extractedContext)?.spanContext()) { + return extractedContext; + } + return ROOT_CONTEXT; +}; + /* * This patch extends the existing upstream extension for SQS. Extensions allow for custom logic for adding * service-specific information to spans, such as attributes. Specifically, we are adding logic to add diff --git a/aws-distro-opentelemetry-node-autoinstrumentation/test/aws-opentelemetry-configurator.test.ts b/aws-distro-opentelemetry-node-autoinstrumentation/test/aws-opentelemetry-configurator.test.ts index 39b944a5..bca98c2d 100644 --- a/aws-distro-opentelemetry-node-autoinstrumentation/test/aws-opentelemetry-configurator.test.ts +++ b/aws-distro-opentelemetry-node-autoinstrumentation/test/aws-opentelemetry-configurator.test.ts @@ -27,11 +27,12 @@ import { customBuildSamplerFromEnv, } from '../src/aws-opentelemetry-configurator'; import { AwsSpanMetricsProcessor } from '../src/aws-span-metrics-processor'; -import { OTLPUdpSpanExporter } from '../src/otlp-udp-exporter'; import { setAwsDefaultEnvironmentVariables } from '../src/register'; import { AwsXRayRemoteSampler } from '../src/sampler/aws-xray-remote-sampler'; import { AwsXraySamplingClient } from '../src/sampler/aws-xray-sampling-client'; import { GetSamplingRulesResponse } from '../src/sampler/remote-sampler.types'; +import { OTLPUdpSpanExporter } from '../src/otlp-udp-exporter'; +import { AwsBatchUnsampledSpanProcessor } from '../src/aws-batch-unsampled-span-processor'; // Tests AwsOpenTelemetryConfigurator after running Environment Variable setup in register.ts describe('AwsOpenTelemetryConfiguratorTest', () => { @@ -370,6 +371,34 @@ describe('AwsOpenTelemetryConfiguratorTest', () => { delete process.env.AWS_XRAY_DAEMON_ADDRESS; }); + it('tests configureOTLP on Lambda with ApplicationSignals False', () => { + process.env.AWS_LAMBDA_FUNCTION_NAME = 'TestFunction'; + process.env.OTEL_AWS_APPLICATION_SIGNALS_ENABLED = 'False'; + process.env.OTEL_TRACES_EXPORTER = 'otlp'; + process.env.AWS_XRAY_DAEMON_ADDRESS = 'www.test.com:2222'; + const spanExporter: SpanExporter = AwsSpanProcessorProvider.configureOtlp(); + expect(spanExporter).toBeInstanceOf(OTLPUdpSpanExporter); + expect((spanExporter as OTLPUdpSpanExporter)['_endpoint']).toBe('www.test.com:2222'); + delete process.env.OTEL_AWS_APPLICATION_SIGNALS_ENABLED; + delete process.env.AWS_LAMBDA_FUNCTION_NAME; + delete process.env.OTEL_TRACES_EXPORTER; + delete process.env.AWS_XRAY_DAEMON_ADDRESS; + }); + + it('Test CustomizeSpanProcessors for Lambda', () => { + process.env.OTEL_AWS_APPLICATION_SIGNALS_ENABLED = 'True'; + process.env.AWS_LAMBDA_FUNCTION_NAME = 'TestFunction'; + const spanProcessors: SpanProcessor[] = []; + AwsOpentelemetryConfigurator.customizeSpanProcessors(spanProcessors, Resource.empty()); + expect(spanProcessors.length).toEqual(2); + const firstProcessor: SpanProcessor = spanProcessors[0]; + expect(firstProcessor).toBeInstanceOf(AttributePropagatingSpanProcessor); + const secondProcessor: SpanProcessor = spanProcessors[1]; + expect(secondProcessor).toBeInstanceOf(AwsBatchUnsampledSpanProcessor); + delete process.env.OTEL_AWS_APPLICATION_SIGNALS_ENABLED; + delete process.env.AWS_LAMBDA_FUNCTION_NAME; + }); + function validateConfiguratorEnviron() { // Set by register.ts expect('http/protobuf').toEqual(process.env.OTEL_EXPORTER_OTLP_PROTOCOL); diff --git a/aws-distro-opentelemetry-node-autoinstrumentation/test/aws-span-processing-util.test.ts b/aws-distro-opentelemetry-node-autoinstrumentation/test/aws-span-processing-util.test.ts index bfd3e4d5..68d69c24 100644 --- a/aws-distro-opentelemetry-node-autoinstrumentation/test/aws-span-processing-util.test.ts +++ b/aws-distro-opentelemetry-node-autoinstrumentation/test/aws-span-processing-util.test.ts @@ -368,6 +368,15 @@ describe('AwsSpanProcessingUtilTest', () => { expect(AwsSpanProcessingUtil.MAX_KEYWORD_LENGTH).toBeGreaterThanOrEqual(keyword.length); }); }); + + it('testGetIngressOperationForLambda', () => { + process.env.AWS_LAMBDA_FUNCTION_NAME = 'TestFunction'; + const validName: string = 'ValidName'; + (spanDataMock as any).name = validName; + (spanDataMock as any).kind = SpanKind.SERVER; + const actualOperation: string = AwsSpanProcessingUtil.getIngressOperation(spanDataMock); + expect(actualOperation).toEqual('TestFunction/Handler'); + }); }); function createMockSpanContext(): SpanContext { diff --git a/aws-distro-opentelemetry-node-autoinstrumentation/test/patches/instrumentation-patch.test.ts b/aws-distro-opentelemetry-node-autoinstrumentation/test/patches/instrumentation-patch.test.ts index cc26f857..0265b7da 100644 --- a/aws-distro-opentelemetry-node-autoinstrumentation/test/patches/instrumentation-patch.test.ts +++ b/aws-distro-opentelemetry-node-autoinstrumentation/test/patches/instrumentation-patch.test.ts @@ -1,20 +1,30 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -import { Attributes, diag } from '@opentelemetry/api'; +import { Attributes, diag, Context as OtelContext, trace, propagation, Span } from '@opentelemetry/api'; import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node'; import { Instrumentation } from '@opentelemetry/instrumentation'; import { AwsInstrumentation, NormalizedRequest } from '@opentelemetry/instrumentation-aws-sdk'; +import { AwsLambdaInstrumentation, AwsLambdaInstrumentationConfig } from '@opentelemetry/instrumentation-aws-lambda'; import { expect } from 'expect'; import { AWS_ATTRIBUTE_KEYS } from '../../src/aws-attribute-keys'; import { RequestMetadata, ServiceExtension } from '../../src/third-party/otel/aws/services/ServiceExtension'; -import { applyInstrumentationPatches } from './../../src/patches/instrumentation-patch'; +import { applyInstrumentationPatches, customExtractor, headerGetter } from './../../src/patches/instrumentation-patch'; +import * as sinon from 'sinon'; +import { AWSXRAY_TRACE_ID_HEADER, AWSXRayPropagator } from '@opentelemetry/propagator-aws-xray'; +import { Context } from 'aws-lambda'; +import { SinonStub } from 'sinon'; const _STREAM_NAME: string = 'streamName'; const _BUCKET_NAME: string = 'bucketName'; const _QUEUE_NAME: string = 'queueName'; const _QUEUE_URL: string = 'https://sqs.us-east-1.amazonaws.com/123412341234/queueName'; +const mockHeaders = { + 'x-test-header': 'test-value', + 'content-type': 'application/json', +}; + const UNPATCHED_INSTRUMENTATIONS: Instrumentation[] = getNodeAutoInstrumentations(); const PATCHED_INSTRUMENTATIONS: Instrumentation[] = getNodeAutoInstrumentations(); @@ -114,6 +124,14 @@ describe('InstrumentationPatchTest', () => { expect(sqsAttributes[AWS_ATTRIBUTE_KEYS.AWS_SQS_QUEUE_NAME]).toEqual(_QUEUE_NAME); }); + it('Lambda with custom eventContextExtractor patching', () => { + const patchedAwsSdkInstrumentation: AwsLambdaInstrumentation = + extractLambdaInstrumentation(PATCHED_INSTRUMENTATIONS); + expect( + (patchedAwsSdkInstrumentation.getConfig() as AwsLambdaInstrumentationConfig).eventContextExtractor + ).not.toBeUndefined(); + }); + function extractAwsSdkInstrumentation(instrumentations: Instrumentation[]): AwsInstrumentation { const filteredInstrumentations: Instrumentation[] = instrumentations.filter( instrumentation => instrumentation.instrumentationName === '@opentelemetry/instrumentation-aws-sdk' @@ -186,4 +204,129 @@ describe('InstrumentationPatchTest', () => { const requestMetadata: RequestMetadata = serviceExtension.requestPreSpanHook(requestInput, {}, diag); return requestMetadata.spanAttributes || {}; } + + function extractLambdaInstrumentation(instrumentations: Instrumentation[]): AwsLambdaInstrumentation { + const filteredInstrumentations: Instrumentation[] = instrumentations.filter( + instrumentation => instrumentation.instrumentationName === '@opentelemetry/instrumentation-aws-lambda' + ); + expect(filteredInstrumentations.length).toEqual(1); + return filteredInstrumentations[0] as AwsLambdaInstrumentation; + } +}); + +describe('customExtractor', () => { + const traceContextEnvironmentKey = '_X_AMZN_TRACE_ID'; + const MOCK_XRAY_TRACE_ID = '8a3c60f7d188f8fa79d48a391a778fa6'; + const MOCK_XRAY_TRACE_ID_STR = '1-8a3c60f7-d188f8fa79d48a391a778fa6'; + const MOCK_XRAY_PARENT_SPAN_ID = '53995c3f42cd8ad8'; + const MOCK_XRAY_LAMBDA_LINEAGE = 'Lineage=01cfa446:0'; + + const TRACE_ID_VERSION = '1'; // Assuming TRACE_ID_VERSION is defined somewhere in the code + + // Common part of the XRAY trace context + const MOCK_XRAY_TRACE_CONTEXT_COMMON = `Root=${TRACE_ID_VERSION}-${MOCK_XRAY_TRACE_ID_STR};Parent=${MOCK_XRAY_PARENT_SPAN_ID}`; + + // Different versions of the XRAY trace context + const MOCK_XRAY_TRACE_CONTEXT_SAMPLED = `${MOCK_XRAY_TRACE_CONTEXT_COMMON};Sampled=1;${MOCK_XRAY_LAMBDA_LINEAGE}`; + // const MOCK_XRAY_TRACE_CONTEXT_PASSTHROUGH = ( + // `Root=${TRACE_ID_VERSION}-${MOCK_XRAY_TRACE_ID_STR.slice(0, TRACE_ID_FIRST_PART_LENGTH)}` + + // `-${MOCK_XRAY_TRACE_ID_STR.slice(TRACE_ID_FIRST_PART_LENGTH)};${MOCK_XRAY_LAMBDA_LINEAGE}` + // ); + + // Create the W3C Trace Context (Sampled) + const MOCK_W3C_TRACE_CONTEXT_SAMPLED = '00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01'; + + // // W3C Trace State + const MOCK_W3C_TRACE_STATE_KEY = 'vendor_specific_key'; + const MOCK_W3C_TRACE_STATE_VALUE = 'test_value'; + const MOCK_TRACE_STATE = `${MOCK_W3C_TRACE_STATE_KEY}=${MOCK_W3C_TRACE_STATE_VALUE},foo=1,bar=2`; + + let awsPropagatorStub: SinonStub; + let traceGetSpanStub: SinonStub; + // let propagationStub: SinonStub; + + beforeEach(() => { + // Clear environment variables before each test + delete process.env[traceContextEnvironmentKey]; + }); + + afterEach(() => { + // Restore original methods after each test to ensure stubs don't affect other tests + sinon.restore(); + }); + + it('should extract context from lambda trace header when present', () => { + const mockLambdaTraceHeader = MOCK_XRAY_TRACE_CONTEXT_SAMPLED; + process.env[traceContextEnvironmentKey] = mockLambdaTraceHeader; + + const mockParentContext = {} as OtelContext; + + // Partial mock of the Span object + const mockSpan: Partial = { + spanContext: sinon.stub().returns({ + traceId: MOCK_XRAY_TRACE_ID, + spanId: MOCK_XRAY_PARENT_SPAN_ID, + }), + }; + + // Stub awsPropagator.extract to return the mockParentContext + awsPropagatorStub = sinon.stub(AWSXRayPropagator.prototype, 'extract').returns(mockParentContext); + + // Stub trace.getSpan to return the mock span + traceGetSpanStub = sinon.stub(trace, 'getSpan').returns(mockSpan as Span); + + // Call the customExtractor function + const event = { headers: {} }; + const result = customExtractor(event, {} as Context); + + // Assertions + expect(awsPropagatorStub.calledOnce).toBe(true); + expect( + awsPropagatorStub.calledWith( + sinon.match.any, + { [AWSXRAY_TRACE_ID_HEADER]: mockLambdaTraceHeader }, + sinon.match.any + ) + ).toBe(true); + expect(traceGetSpanStub.calledOnce).toBe(true); + expect(result).toEqual(mockParentContext); // Should return the parent context when valid + }); + + it('should extract context from HTTP headers when lambda trace header is not present', () => { + delete process.env[traceContextEnvironmentKey]; + const event = { + headers: { + traceparent: MOCK_W3C_TRACE_CONTEXT_SAMPLED, + tracestate: MOCK_TRACE_STATE, + }, + }; + const mockExtractedContext = { + getValue: function () { + return undefined; + }, // Empty function that returns undefined + } as unknown as OtelContext; + + const propagationStub = sinon.stub(propagation, 'extract').returns(mockExtractedContext); + + // Call the customExtractor function + const mockHttpHeaders = event.headers; + customExtractor(event, {} as Context); + + expect(propagationStub.calledWith(sinon.match.any, mockHttpHeaders, sinon.match.any)).toBe(true); + }); + + it('should return all header keys from the carrier', () => { + const keys = headerGetter.keys(mockHeaders); + expect(keys).toEqual(['x-test-header', 'content-type']); + }); + + it('should return the correct header value for a given key', () => { + const headerValue = headerGetter.get(mockHeaders, 'x-test-header'); + expect(headerValue).toBe('test-value'); + }); + + it('should return undefined for a key that does not exist', () => { + const headerValue = headerGetter.get(mockHeaders, 'non-existent-header'); + expect(headerValue).toBeUndefined(); + }); }); diff --git a/lambda-layer/README.md b/lambda-layer/README.md index 2d6b09c0..c75fee8d 100644 --- a/lambda-layer/README.md +++ b/lambda-layer/README.md @@ -28,10 +28,8 @@ anymore. ## Steps to Deploy this Lambda function -Go to `${root}/lambda-layer/sample-apps/aws-sdk` director, - -1. run `npm install`. -2. run `terraform init` and `terraform apply`. +1. Go to `${root}/lambda-layer/sample-apps/aws-sdk` director, run `npm install`. +2. Go to `${root}/lambda-layer/terraform/lambda`, run `terraform init` and `terraform apply`. The lambda function(`aws-opentelemetry-distro-nodejs`) will be initialized and the URL for an API Gateway invoking the Lambda will be displayed at the end. Send a request to the URL in a browser or using curl to execute the function. Then, diff --git a/lambda-layer/packages/layer/scripts/otel-instrument b/lambda-layer/packages/layer/scripts/otel-instrument index fbdb4bd6..dd317a32 100644 --- a/lambda-layer/packages/layer/scripts/otel-instrument +++ b/lambda-layer/packages/layer/scripts/otel-instrument @@ -1,12 +1,30 @@ #!/bin/bash export NODE_OPTIONS="${NODE_OPTIONS} --require /opt/wrapper.js" export LAMBDA_RESOURCE_ATTRIBUTES="cloud.region=$AWS_REGION,cloud.provider=aws,faas.name=$AWS_LAMBDA_FUNCTION_NAME,faas.version=$AWS_LAMBDA_FUNCTION_VERSION,faas.instance=$AWS_LAMBDA_LOG_STREAM_NAME,aws.log.group.names=$AWS_LAMBDA_LOG_GROUP_NAME"; -export OTEL_NODE_ENABLED_INSTRUMENTATIONS="http,aws-lambda,aws-sdk" +export OTEL_NODE_ENABLED_INSTRUMENTATIONS="aws-lambda,aws-sdk" + +# - If OTEL_EXPORTER_OTLP_PROTOCOL is not set by user, the default exporting protocol is http/protobuf. +if [ -z "${OTEL_EXPORTER_OTLP_PROTOCOL}" ]; then + export OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf +fi + +# - Set the service name +if [ -z "${OTEL_SERVICE_NAME}" ]; then + export OTEL_SERVICE_NAME=$AWS_LAMBDA_FUNCTION_NAME; +fi + +# - Set the propagators +if [[ -z "$OTEL_PROPAGATORS" ]]; then + export OTEL_PROPAGATORS="tracecontext,baggage,xray" +fi # - Set Application Signals configuration -if [ "${OTEL_AWS_APPLICATION_SIGNALS_ENABLED}" = "true" ]; then +if [ -z "${OTEL_AWS_APPLICATION_SIGNALS_ENABLED}" ]; then + export OTEL_AWS_APPLICATION_SIGNALS_ENABLED="true"; +fi + +if [ -z "${OTEL_METRICS_EXPORTER}" ]; then export OTEL_METRICS_EXPORTER="none"; - export OTEL_LOGS_EXPORTER="none"; fi # - Append Lambda Resource Attributes to OTel Resource Attribute List @@ -16,10 +34,6 @@ else export OTEL_RESOURCE_ATTRIBUTES="$LAMBDA_RESOURCE_ATTRIBUTES,$OTEL_RESOURCE_ATTRIBUTES"; fi -# - Set the service name -if [ -z "${OTEL_SERVICE_NAME}" ]; then - export OTEL_SERVICE_NAME=$AWS_LAMBDA_FUNCTION_NAME; -fi if [[ $OTEL_RESOURCE_ATTRIBUTES != *"service.name="* ]]; then export OTEL_RESOURCE_ATTRIBUTES="service.name=${AWS_LAMBDA_FUNCTION_NAME},${OTEL_RESOURCE_ATTRIBUTES}" fi diff --git a/lambda-layer/sample-apps/aws-sdk/src/index.ts b/lambda-layer/sample-apps/aws-sdk/src/index.ts index 5b24caa1..6dbc216f 100644 --- a/lambda-layer/sample-apps/aws-sdk/src/index.ts +++ b/lambda-layer/sample-apps/aws-sdk/src/index.ts @@ -13,6 +13,9 @@ exports.handler = async (event: APIGatewayProxyEvent, context: Context) => { const result = await s3.listBuckets().promise(); + console.log('Fetched OTel Resource Attrs:' + process.env.OTEL_RESOURCE_ATTRIBUTES); + console.log('Fetched X-Ray Trace Header:' + process.env['_X_AMZN_TRACE_ID']); + const response: APIGatewayProxyResult = { statusCode: 200, body: `Hello lambda - found ${result.Buckets?.length || 0} buckets`,