Skip to content

Add AWS X-Ray Remote Sampler #15

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Aug 16, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ node --require '@aws/aws-distro-opentelemetry-node-autoinstrumentation/register'
export OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf \
export OTEL_PROPAGATORS=xray,tracecontext,b3,b3multi \
export OTEL_TRACES_EXPORTER=console,otlp \
export OTEL_TRACES_SAMPLER=always_on \
export OTEL_TRACES_SAMPLER=xray \
export OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://localhost:4316/v1/traces \
export OTEL_RESOURCE_ATTRIBUTES=service.name=test-adot-sdk-ec2-service-name \
export OTEL_AWS_APPLICATION_SIGNALS_ENABLED=true \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import { AlwaysRecordSampler } from './always-record-sampler';
import { AttributePropagatingSpanProcessorBuilder } from './attribute-propagating-span-processor-builder';
import { AwsMetricAttributesSpanExporterBuilder } from './aws-metric-attributes-span-exporter-builder';
import { AwsSpanMetricsProcessorBuilder } from './aws-span-metrics-processor-builder';
import { AwsXRayRemoteSampler } from './sampler';

const APPLICATION_SIGNALS_ENABLED_CONFIG: string = 'OTEL_AWS_APPLICATION_SIGNALS_ENABLED';
const APPLICATION_SIGNALS_EXPORTER_ENDPOINT_CONFIG: string = 'OTEL_AWS_APPLICATION_SIGNALS_EXPORTER_ENDPOINT';
Expand Down Expand Up @@ -187,6 +188,7 @@ export class AwsOpentelemetryConfigurator {
config = {
instrumentations: this.instrumentations,
resource: this.resource,
sampler: this.sampler,
idGenerator: this.idGenerator,
autoDetectResources: false,
};
Expand Down Expand Up @@ -244,7 +246,37 @@ export class AwsOpentelemetryConfigurator {
}

export function customBuildSamplerFromEnv(resource: Resource): Sampler {
// TODO: Add XRay Sampler (with resource) if specified through Environment variable
switch (process.env.OTEL_TRACES_SAMPLER) {
case 'xray': {
const samplerArgumentEnv: string | undefined = process.env.OTEL_TRACES_SAMPLER_ARG;
let endpoint: string | undefined = undefined;
let pollingInterval: number | undefined = undefined;

if (samplerArgumentEnv !== undefined) {
const args: string[] = samplerArgumentEnv.split(',');
for (const arg of args) {
const keyValue: string[] = arg.split('=', 2);
if (keyValue.length !== 2) {
continue;
}
if (keyValue[0] === 'endpoint') {
endpoint = keyValue[1];
} else if (keyValue[0] === 'polling_interval') {
pollingInterval = Number(keyValue[1]);
if (isNaN(pollingInterval)) {
pollingInterval = undefined;
diag.error('polling_interval in OTEL_TRACES_SAMPLER_ARG must be a valid number');
}
}
}
}

diag.debug(`XRay Sampler Endpoint: ${endpoint}`);
diag.debug(`XRay Sampler Polling Interval: ${pollingInterval}`);
return new AwsXRayRemoteSampler({ resource: resource, endpoint: endpoint, pollingInterval: pollingInterval });
}
}

return buildSamplerFromEnv();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

import { Attributes, Context, DiagLogger, Link, SpanKind, diag } from '@opentelemetry/api';
import { ParentBasedSampler, Sampler, SamplingResult } from '@opentelemetry/sdk-trace-base';
import { AwsXraySamplingClient } from './aws-xray-sampling-client';
import { FallbackSampler } from './fallback-sampler';
import {
AwsXRayRemoteSamplerConfig,
GetSamplingRulesResponse,
GetSamplingTargetsBody,
GetSamplingTargetsResponse,
SamplingRuleRecord,
SamplingTargetDocument,
} from './remote-sampler.types';
import { DEFAULT_TARGET_POLLING_INTERVAL_SECONDS, RuleCache, TargetMap } from './rule-cache';
import { SamplingRuleApplier } from './sampling-rule-applier';

// 5 minute default sampling rules polling interval
const DEFAULT_RULES_POLLING_INTERVAL_SECONDS: number = 5 * 60;
// Default endpoint for awsproxy : https://aws-otel.github.io/docs/getting-started/remote-sampling#enable-awsproxy-extension
const DEFAULT_AWS_PROXY_ENDPOINT: string = 'http://localhost:2000';

export class AwsXRayRemoteSampler implements Sampler {
private rulePollingIntervalMillis: number;
private targetPollingInterval: number;
private awsProxyEndpoint: string;
private ruleCache: RuleCache;
private fallbackSampler: ParentBasedSampler;
private samplerDiag: DiagLogger;
private rulePoller: NodeJS.Timer | undefined;
private targetPoller: NodeJS.Timer | undefined;
private clientId: string;
private rulePollingJitterMillis: number;
private targetPollingJitterMillis: number;
private samplingClient: AwsXraySamplingClient;

constructor(samplerConfig: AwsXRayRemoteSamplerConfig) {
this.samplerDiag = diag.createComponentLogger({
namespace: '@aws-observability/aws-xray-remote-sampler',
});

if (samplerConfig.pollingInterval == null || samplerConfig.pollingInterval < 10) {
this.samplerDiag.warn(
`'pollingInterval' is undefined or too small. Defaulting to ${DEFAULT_RULES_POLLING_INTERVAL_SECONDS} seconds`
);
this.rulePollingIntervalMillis = DEFAULT_RULES_POLLING_INTERVAL_SECONDS * 1000;
} else {
this.rulePollingIntervalMillis = samplerConfig.pollingInterval * 1000;
}

this.rulePollingJitterMillis = Math.random() * 5 * 1000;
this.targetPollingInterval = DEFAULT_TARGET_POLLING_INTERVAL_SECONDS;
this.targetPollingJitterMillis = (Math.random() / 10) * 1000;

this.awsProxyEndpoint = samplerConfig.endpoint ? samplerConfig.endpoint : DEFAULT_AWS_PROXY_ENDPOINT;
this.fallbackSampler = new ParentBasedSampler({ root: new FallbackSampler() });
this.clientId = this.generateClientId();
this.ruleCache = new RuleCache(samplerConfig.resource);

this.samplingClient = new AwsXraySamplingClient(this.awsProxyEndpoint, this.samplerDiag);

// Start the Sampling Rules poller
this.startSamplingRulesPoller();

// execute first Sampling Targets update and then start the Sampling Targets poller
this.getAndUpdateSamplingTargets();
this.startSamplingTargetsPoller();
}

public shouldSample(
context: Context,
traceId: string,
spanName: string,
spanKind: SpanKind,
attributes: Attributes,
links: Link[]
): SamplingResult {
if (this.ruleCache.isExpired()) {
return this.fallbackSampler.shouldSample(context, traceId, spanName, spanKind, attributes, links);
}

const matchedRule: SamplingRuleApplier | undefined = this.ruleCache.getMatchedRule(attributes);
if (matchedRule) {
return matchedRule.shouldSample(context, traceId, spanName, spanKind, attributes, links);
}

this.samplerDiag.debug(
'Using fallback sampler as no rule match was found. This is likely due to a bug, since default rule should always match'
);
return this.fallbackSampler.shouldSample(context, traceId, spanName, spanKind, attributes, links);
}

public toString(): string {
return 'AwsXRayRemoteSampler{remote sampling with AWS X-Ray}';
}

private startSamplingRulesPoller(): void {
// Execute first update
this.getAndUpdateSamplingRules();
// Update sampling rules every 5 minutes (or user-defined polling interval)
this.rulePoller = setInterval(
() => this.getAndUpdateSamplingRules(),
this.rulePollingIntervalMillis + this.rulePollingJitterMillis
);
this.rulePoller.unref();
}

private startSamplingTargetsPoller(): void {
// Update sampling targets every targetPollingInterval (usually 10 seconds)
this.targetPoller = setInterval(
() => this.getAndUpdateSamplingTargets(),
this.targetPollingInterval * 1000 + this.targetPollingJitterMillis
);
this.targetPoller.unref();
}

private getAndUpdateSamplingTargets(): void {
const requestBody: GetSamplingTargetsBody = {
SamplingStatisticsDocuments: this.ruleCache.createSamplingStatisticsDocuments(this.clientId),
};

this.samplingClient.fetchSamplingTargets(requestBody, this.updateSamplingTargets.bind(this));
}

private getAndUpdateSamplingRules(): void {
this.samplingClient.fetchSamplingRules(this.updateSamplingRules.bind(this));
}

private updateSamplingRules(responseObject: GetSamplingRulesResponse): void {
let samplingRules: SamplingRuleApplier[] = [];

samplingRules = [];
if (responseObject.SamplingRuleRecords) {
responseObject.SamplingRuleRecords.forEach((record: SamplingRuleRecord) => {
if (record.SamplingRule) {
samplingRules.push(new SamplingRuleApplier(record.SamplingRule, undefined));
}
});
this.ruleCache.updateRules(samplingRules);
} else {
this.samplerDiag.error('SamplingRuleRecords from GetSamplingRules request is not defined');
}
}

private updateSamplingTargets(responseObject: GetSamplingTargetsResponse): void {
try {
const targetDocuments: TargetMap = {};

// Create Target-Name-to-Target-Map from sampling targets response
responseObject.SamplingTargetDocuments.forEach((newTarget: SamplingTargetDocument) => {
targetDocuments[newTarget.RuleName] = newTarget;
});

// Update targets in the cache
const [refreshSamplingRules, nextPollingInterval]: [boolean, number] = this.ruleCache.updateTargets(
targetDocuments,
responseObject.LastRuleModification
);
this.targetPollingInterval = nextPollingInterval;
clearInterval(this.targetPoller);
this.startSamplingTargetsPoller();

if (refreshSamplingRules) {
this.samplerDiag.debug('Performing out-of-band sampling rule polling to fetch updated rules.');
clearInterval(this.rulePoller);
this.startSamplingRulesPoller();
}
} catch (error: unknown) {
this.samplerDiag.debug('Error occurred when updating Sampling Targets');
}
}

private generateClientId(): string {
const hexChars: string[] = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'];
const clientIdArray: string[] = [];
for (let _: number = 0; _ < 24; _ += 1) {
clientIdArray.push(hexChars[Math.floor(Math.random() * hexChars.length)]);
}
return clientIdArray.join('');
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

import { DiagLogFunction, DiagLogger, context } from '@opentelemetry/api';
import { suppressTracing } from '@opentelemetry/core';
import * as http from 'http';
import { GetSamplingRulesResponse, GetSamplingTargetsBody, GetSamplingTargetsResponse } from './remote-sampler.types';

export class AwsXraySamplingClient {
private getSamplingRulesEndpoint: string;
private samplingTargetsEndpoint: string;
private samplerDiag: DiagLogger;

constructor(endpoint: string, samplerDiag: DiagLogger) {
this.getSamplingRulesEndpoint = endpoint + '/GetSamplingRules';
this.samplingTargetsEndpoint = endpoint + '/SamplingTargets';
this.samplerDiag = samplerDiag;
}

public fetchSamplingTargets(
requestBody: GetSamplingTargetsBody,
callback: (responseObject: GetSamplingTargetsResponse) => void
) {
this.makeSamplingRequest<GetSamplingTargetsResponse>(
this.samplingTargetsEndpoint,
callback,
this.samplerDiag.debug,
JSON.stringify(requestBody)
);
}

public fetchSamplingRules(callback: (responseObject: GetSamplingRulesResponse) => void) {
this.makeSamplingRequest<GetSamplingRulesResponse>(this.getSamplingRulesEndpoint, callback, this.samplerDiag.error);
}

private makeSamplingRequest<T>(
url: string,
callback: (responseObject: T) => void,
logger: DiagLogFunction,
requestBodyJsonString?: string
): void {
const options = {
method: 'POST',
headers: {},
};

if (requestBodyJsonString) {
options.headers = {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(requestBodyJsonString),
};
}

// Ensure AWS X-Ray Sampler does not generate traces itself
context.with(suppressTracing(context.active()), () => {
const req = http
.request(url, options, response => {
response.setEncoding('utf-8');
let responseData = '';
response.on('data', dataChunk => (responseData += dataChunk));
response.on('end', () => {
if (response.statusCode === 200 && responseData.length > 0) {
let responseObject: T | undefined = undefined;
try {
responseObject = JSON.parse(responseData) as T;
} catch (e: unknown) {
logger(`Error occurred when parsing responseData from ${url}`);
}

if (responseObject) {
callback(responseObject);
}
} else {
this.samplerDiag.debug(`${url} Response Code is: ${response.statusCode}`);
this.samplerDiag.debug(`${url} responseData is: ${responseData}`);
}
});
})
.on('error', (error: unknown) => {
logger(`Error occurred when making an HTTP POST to ${url}: ${error}`);
});
if (requestBodyJsonString) {
req.end(requestBodyJsonString);
} else {
req.end();
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

import { Attributes, Context, Link, SpanKind } from '@opentelemetry/api';
import { Sampler, SamplingDecision, SamplingResult, TraceIdRatioBasedSampler } from '@opentelemetry/sdk-trace-base';
import { RateLimitingSampler } from './rate-limiting-sampler';

// FallbackSampler samples 1 req/sec and additional 5% of requests using TraceIdRatioBasedSampler.
export class FallbackSampler implements Sampler {
private fixedRateSampler: TraceIdRatioBasedSampler;
private rateLimitingSampler: RateLimitingSampler;

constructor() {
this.fixedRateSampler = new TraceIdRatioBasedSampler(0.05);
this.rateLimitingSampler = new RateLimitingSampler(1);
}

shouldSample(
context: Context,
traceId: string,
spanName: string,
spanKind: SpanKind,
attributes: Attributes,
links: Link[]
): SamplingResult {
const samplingResult: SamplingResult = this.rateLimitingSampler.shouldSample(
context,
traceId,
spanName,
spanKind,
attributes,
links
);

if (samplingResult.decision !== SamplingDecision.NOT_RECORD) {
return samplingResult;
}

return this.fixedRateSampler.shouldSample(context, traceId);
}

public toString(): string {
return 'FallbackSampler{fallback sampling with sampling config of 1 req/sec and 5% of additional requests';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
export * from './aws-xray-remote-sampler';
export { AwsXRayRemoteSamplerConfig } from './remote-sampler.types';
Loading
Loading