Skip to content

Optimize Instrumentation Configure for Lambda cases #68

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,10 @@
// SPDX-License-Identifier: Apache-2.0
// Modifications Copyright The OpenTelemetry Authors. Licensed under the Apache License 2.0 License.

import { context, Context, diag, TraceFlags } from '@opentelemetry/api';
import {
BindOnceFuture,
ExportResultCode,
getEnv,
globalErrorHandler,
suppressTracing,
unrefTimer,
} from '@opentelemetry/core';
import { BufferConfig, ReadableSpan, Span, SpanExporter, SpanProcessor } from '@opentelemetry/sdk-trace-base';
import { Context, TraceFlags } from '@opentelemetry/api';
import { ReadableSpan, BufferConfig, Span } from '@opentelemetry/sdk-trace-base';
import { AWS_ATTRIBUTE_KEYS } from './aws-attribute-keys';
import { BatchSpanProcessorBase } from '@opentelemetry/sdk-trace-base/build/src/export/BatchSpanProcessorBase';

/**
* This class is a customized version of the `BatchSpanProcessorBase` from the
Expand Down Expand Up @@ -41,211 +34,24 @@ import { AWS_ATTRIBUTE_KEYS } from './aws-attribute-keys';
* The rest of the behavior—batch processing, queuing, and exporting spans in
* batches—is inherited from the base class and remains largely the same.
*/
export class AwsBatchUnsampledSpanProcessor implements SpanProcessor {
private readonly _maxExportBatchSize: number;
private readonly _maxQueueSize: number;
private readonly _scheduledDelayMillis: number;
private readonly _exportTimeoutMillis: number;

private _isExporting: boolean = false;
private _finishedSpans: ReadableSpan[] = [];
private _timer: NodeJS.Timeout | undefined;
private _shutdownOnce: BindOnceFuture<void>;
private _droppedSpansCount: number = 0;

constructor(private readonly _exporter: SpanExporter, config?: BufferConfig) {
const env = getEnv();
this._maxExportBatchSize =
typeof config?.maxExportBatchSize === 'number' ? config.maxExportBatchSize : env.OTEL_BSP_MAX_EXPORT_BATCH_SIZE;
this._maxQueueSize = typeof config?.maxQueueSize === 'number' ? config.maxQueueSize : env.OTEL_BSP_MAX_QUEUE_SIZE;
this._scheduledDelayMillis =
typeof config?.scheduledDelayMillis === 'number' ? config.scheduledDelayMillis : env.OTEL_BSP_SCHEDULE_DELAY;
this._exportTimeoutMillis =
typeof config?.exportTimeoutMillis === 'number' ? config.exportTimeoutMillis : env.OTEL_BSP_EXPORT_TIMEOUT;

this._shutdownOnce = new BindOnceFuture(this._shutdown, this);

if (this._maxExportBatchSize > this._maxQueueSize) {
diag.warn(
'BatchSpanProcessor: maxExportBatchSize must be smaller or equal to maxQueueSize, setting maxExportBatchSize to match maxQueueSize'
);
this._maxExportBatchSize = this._maxQueueSize;
}
}

forceFlush(): Promise<void> {
if (this._shutdownOnce.isCalled) {
return this._shutdownOnce.promise;
}
return this._flushAll();
}

onStart(span: Span, _parentContext: Context): void {
export class AwsBatchUnsampledSpanProcessor extends BatchSpanProcessorBase<BufferConfig> {
override onStart(span: Span, _parentContext: Context): void {
if ((span.spanContext().traceFlags & TraceFlags.SAMPLED) === 0) {
span.setAttribute(AWS_ATTRIBUTE_KEYS.AWS_TRACE_FLAG_UNSAMPLED, true);
return;
}
}

onEnd(span: ReadableSpan): void {
if (this._shutdownOnce.isCalled) {
override onEnd(span: ReadableSpan): void {
if ((this as any)._shutdownOnce.isCalled) {
return;
}

if ((span.spanContext().traceFlags & TraceFlags.SAMPLED) === 1) {
return;
}

this._addToBuffer(span);
}

shutdown(): Promise<void> {
return this._shutdownOnce.call();
}

private _shutdown() {
return Promise.resolve()
.then(() => {
return this.onShutdown();
})
.then(() => {
return this._flushAll();
})
.then(() => {
return this._exporter.shutdown();
});
}

/** Add a span in the buffer. */
private _addToBuffer(span: ReadableSpan) {
if (this._finishedSpans.length >= this._maxQueueSize) {
// limit reached, drop span

if (this._droppedSpansCount === 0) {
diag.debug('maxQueueSize reached, dropping spans');
}
this._droppedSpansCount++;

return;
}

if (this._droppedSpansCount > 0) {
// some spans were dropped, log once with count of spans dropped
diag.warn(`Dropped ${this._droppedSpansCount} spans because maxQueueSize reached`);
this._droppedSpansCount = 0;
}

this._finishedSpans.push(span);
this._maybeStartTimer();
}

/**
* Send all spans to the exporter respecting the batch size limit
* This function is used only on forceFlush or shutdown,
* for all other cases _flush should be used
* */
private _flushAll(): Promise<void> {
return new Promise((resolve, reject) => {
const promises = [];
// calculate number of batches
const count = Math.ceil(this._finishedSpans.length / this._maxExportBatchSize);
for (let i = 0, j = count; i < j; i++) {
promises.push(this._flushOneBatch());
}
Promise.all(promises)
.then(() => {
resolve();
})
.catch(reject);
});
}

private _flushOneBatch(): Promise<void> {
this._clearTimer();
if (this._finishedSpans.length === 0) {
return Promise.resolve();
}
return new Promise((resolve, reject) => {
const timer = setTimeout(() => {
// don't wait anymore for export, this way the next batch can start
reject(new Error('Timeout'));
}, this._exportTimeoutMillis);
// prevent downstream exporter calls from generating spans
context.with(suppressTracing(context.active()), () => {
// Reset the finished spans buffer here because the next invocations of the _flush method
// could pass the same finished spans to the exporter if the buffer is cleared
// outside the execution of this callback.
let spans: ReadableSpan[];
if (this._finishedSpans.length <= this._maxExportBatchSize) {
spans = this._finishedSpans;
this._finishedSpans = [];
} else {
spans = this._finishedSpans.splice(0, this._maxExportBatchSize);
}

const doExport = () =>
this._exporter.export(spans, result => {
clearTimeout(timer);
if (result.code === ExportResultCode.SUCCESS) {
resolve();
} else {
reject(result.error ?? new Error('BatchSpanProcessor: span export failed'));
}
});

let pendingResources: Array<Promise<void>> | null = null;
for (let i = 0, len = spans.length; i < len; i++) {
const span = spans[i];
if (span.resource.asyncAttributesPending && span.resource.waitForAsyncAttributes) {
pendingResources ??= [];
pendingResources.push(span.resource.waitForAsyncAttributes());
}
}

// Avoid scheduling a promise to make the behavior more predictable and easier to test
if (pendingResources === null) {
doExport();
} else {
Promise.all(pendingResources).then(doExport, err => {
globalErrorHandler(err);
reject(err);
});
}
});
});
}

private _maybeStartTimer() {
if (this._isExporting) return;
const flush = () => {
this._isExporting = true;
this._flushOneBatch()
.finally(() => {
this._isExporting = false;
if (this._finishedSpans.length > 0) {
this._clearTimer();
this._maybeStartTimer();
}
})
.catch(e => {
this._isExporting = false;
globalErrorHandler(e);
});
};
// we only wait if the queue doesn't have enough elements yet
if (this._finishedSpans.length >= this._maxExportBatchSize) {
return flush();
}
if (this._timer !== undefined) return;
this._timer = setTimeout(() => flush(), this._scheduledDelayMillis);
unrefTimer(this._timer);
}

private _clearTimer() {
if (this._timer !== undefined) {
clearTimeout(this._timer);
this._timer = undefined;
}
(this as any)._addToBuffer(span);
}

onShutdown(): void {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,13 @@ const APPLICATION_SIGNALS_ENABLED_CONFIG: string = 'OTEL_AWS_APPLICATION_SIGNALS
const APPLICATION_SIGNALS_EXPORTER_ENDPOINT_CONFIG: string = 'OTEL_AWS_APPLICATION_SIGNALS_EXPORTER_ENDPOINT';
const METRIC_EXPORT_INTERVAL_CONFIG: string = 'OTEL_METRIC_EXPORT_INTERVAL';
const DEFAULT_METRIC_EXPORT_INTERVAL_MILLIS: number = 60000;
const AWS_LAMBDA_FUNCTION_NAME_CONFIG: string = 'AWS_LAMBDA_FUNCTION_NAME';
export const AWS_LAMBDA_FUNCTION_NAME_CONFIG: string = 'AWS_LAMBDA_FUNCTION_NAME';
const AWS_XRAY_DAEMON_ADDRESS_CONFIG: string = 'AWS_XRAY_DAEMON_ADDRESS';
const FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX = 'T1S';
const FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX = 'T1U';
// Follow Python SDK Impl to set the max span batch size
// which will reduce the chance of UDP package size is larger than 64KB
const LAMBDA_SPAN_EXPORT_BATCH_SIZE = 10;
/**
* Aws Application Signals Config Provider creates a configuration object that can be provided to
* the OTel NodeJS SDK for Auto Instrumentation with Application Signals Functionality.
Expand Down Expand Up @@ -128,6 +131,9 @@ export class AwsOpentelemetryConfigurator {
if (!resourceDetectorsFromEnv.includes('env')) {
defaultDetectors.push(envDetectorSync);
}
} else if (isLambdaEnvironment()) {
// If in Lambda environment, only keep env detector as default
defaultDetectors.push(envDetectorSync);
} else {
/*
* envDetectorSync is used as opposed to envDetector (async), so it is guaranteed that the
Expand Down Expand Up @@ -229,17 +235,6 @@ export class AwsOpentelemetryConfigurator {

diag.info('AWS Application Signals enabled.');

// Register BatchUnsampledSpanProcessor to export unsampled traces in Lambda
// when Application Signals enabled
if (isLambdaEnvironment()) {
spanProcessors.push(
new AwsBatchUnsampledSpanProcessor(
new OTLPUdpSpanExporter(getXrayDaemonEndpoint(), FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX)
)
);
diag.info('Enabled batch unsampled span processor for Lambda environment.');
}

let exportIntervalMillis: number = Number(process.env[METRIC_EXPORT_INTERVAL_CONFIG]);
diag.debug(`AWS Application Signals Metrics export interval: ${exportIntervalMillis}`);

Expand All @@ -256,12 +251,30 @@ export class AwsOpentelemetryConfigurator {
exporter: applicationSignalsMetricExporter,
exportIntervalMillis: exportIntervalMillis,
});
const meterProvider: MeterProvider = new MeterProvider({
/** Resource associated with metric telemetry */
resource: resource,
readers: [periodicExportingMetricReader],
});
spanProcessors.push(AwsSpanMetricsProcessorBuilder.create(meterProvider, resource).build());

// Register BatchUnsampledSpanProcessor to export unsampled traces in Lambda
// when Application Signals enabled
if (isLambdaEnvironment()) {
spanProcessors.push(
new AwsBatchUnsampledSpanProcessor(
new OTLPUdpSpanExporter(getXrayDaemonEndpoint(), FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX),
{
maxExportBatchSize: getSpanExportBatchSize(),
}
)
);
diag.info('Enabled batch unsampled span processor for Lambda environment.');
}

// Disable Application Metrics for Lambda environment
if (!isLambdaEnvironment()) {
const meterProvider: MeterProvider = new MeterProvider({
/** Resource associated with metric telemetry */
resource: resource,
readers: [periodicExportingMetricReader],
});
spanProcessors.push(AwsSpanMetricsProcessorBuilder.create(meterProvider, resource).build());
}
}

static customizeSampler(sampler: Sampler): Sampler {
Expand Down Expand Up @@ -415,7 +428,11 @@ export class AwsSpanProcessorProvider {
// eslint-disable-next-line @typescript-eslint/typedef
let protocol = this.getOtlpProtocol();

if (AwsOpentelemetryConfigurator.isApplicationSignalsEnabled() && isLambdaEnvironment()) {
// If `isLambdaEnvironment` is true, we will default to exporting OTel spans via `udp_exporter` to Fluxpump,
// regardless of whether `AppSignals` is true or false.
// However, if the customer has explicitly set the `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT`,
// we will continue using the `otlp_exporter` to send OTel traces to the specified endpoint.
if (!hasCustomOtlpTraceEndpoint() && isLambdaEnvironment()) {
protocol = 'udp';
}
switch (protocol) {
Expand Down Expand Up @@ -521,7 +538,9 @@ export class AwsSpanProcessorProvider {
if (exporter instanceof ConsoleSpanExporter) {
return new SimpleSpanProcessor(configuredExporter);
} else {
return new BatchSpanProcessor(configuredExporter);
return new BatchSpanProcessor(configuredExporter, {
maxExportBatchSize: getSpanExportBatchSize(),
});
}
});
}
Expand Down Expand Up @@ -616,11 +635,22 @@ function getSamplerProbabilityFromEnv(environment: Required<ENVIRONMENT>): numbe
return probability;
}

function isLambdaEnvironment() {
function getSpanExportBatchSize() {
if (isLambdaEnvironment()) {
return LAMBDA_SPAN_EXPORT_BATCH_SIZE;
}
return undefined;
}

export function isLambdaEnvironment() {
// detect if running in AWS Lambda environment
return process.env[AWS_LAMBDA_FUNCTION_NAME_CONFIG] !== undefined;
}

function hasCustomOtlpTraceEndpoint() {
return process.env['OTEL_EXPORTER_OTLP_TRACES_ENDPOINT'] !== undefined;
}

function getXrayDaemonEndpoint() {
return process.env[AWS_XRAY_DAEMON_ADDRESS_CONFIG];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
} from '@opentelemetry/semantic-conventions';
import { AWS_ATTRIBUTE_KEYS } from './aws-attribute-keys';
import * as SQL_DIALECT_KEYWORDS_JSON from './configuration/sql_dialect_keywords.json';
import { AWS_LAMBDA_FUNCTION_NAME_CONFIG, isLambdaEnvironment } from './aws-opentelemetry-configurator';

/** Utility class designed to support shared logic across AWS Span Processors. */
export class AwsSpanProcessingUtil {
Expand Down Expand Up @@ -50,6 +51,9 @@ export class AwsSpanProcessingUtil {
let operation: string = span.name;
if (AwsSpanProcessingUtil.shouldUseInternalOperation(span)) {
operation = AwsSpanProcessingUtil.INTERNAL_OPERATION;
}
if (isLambdaEnvironment()) {
operation = process.env[AWS_LAMBDA_FUNCTION_NAME_CONFIG] + '/Handler';
} else if (!AwsSpanProcessingUtil.isValidOperation(span, operation)) {
operation = AwsSpanProcessingUtil.generateIngressOperation(span);
}
Expand Down
Loading
Loading