Skip to content

Add AWS X-Ray Remote Sampler #15

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Aug 16, 2024
Merged
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ build/
node_modules/

# Other
.git/
.git/
*.nyc_output/
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ npm run lint:fix
### Run unit tests

```shell
npm run tests
npm run test
```

### Test the local ADOT JS package with your own local NodeJS project
Expand Down
10 changes: 6 additions & 4 deletions aws-distro-opentelemetry-node-autoinstrumentation/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ node --require '@aws/aws-distro-opentelemetry-node-autoinstrumentation/register'
## Sample Environment Variables for Application Signals

```shell
export OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf \
export OTEL_AWS_APPLICATION_SIGNALS_ENABLED=true \
export OTEL_AWS_APPLICATION_SIGNALS_EXPORTER_ENDPOINT=http://localhost:4316/v1/metrics \
export OTEL_PROPAGATORS=xray,tracecontext,b3,b3multi \
export OTEL_TRACES_EXPORTER=console,otlp \
export OTEL_TRACES_SAMPLER=always_on \
export OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf \
export OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://localhost:4316/v1/traces \
export OTEL_TRACES_EXPORTER=console,otlp \
export OTEL_TRACES_SAMPLER=xray \
export OTEL_TRACES_SAMPLER_ARG=endpoint=http://localhost:2000,polling_interval=300 \
export OTEL_RESOURCE_ATTRIBUTES=service.name=test-adot-sdk-ec2-service-name \
export OTEL_AWS_APPLICATION_SIGNALS_ENABLED=true \
export OTEL_NODE_DISABLED_INSTRUMENTATIONS=fs
```
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import { AlwaysRecordSampler } from './always-record-sampler';
import { AttributePropagatingSpanProcessorBuilder } from './attribute-propagating-span-processor-builder';
import { AwsMetricAttributesSpanExporterBuilder } from './aws-metric-attributes-span-exporter-builder';
import { AwsSpanMetricsProcessorBuilder } from './aws-span-metrics-processor-builder';
import { AwsXRayRemoteSampler } from './sampler/aws-xray-remote-sampler';

const APPLICATION_SIGNALS_ENABLED_CONFIG: string = 'OTEL_AWS_APPLICATION_SIGNALS_ENABLED';
const APPLICATION_SIGNALS_EXPORTER_ENDPOINT_CONFIG: string = 'OTEL_AWS_APPLICATION_SIGNALS_EXPORTER_ENDPOINT';
Expand Down Expand Up @@ -195,6 +196,7 @@ export class AwsOpentelemetryConfigurator {
config = {
instrumentations: this.instrumentations,
resource: this.resource,
sampler: this.sampler,
idGenerator: this.idGenerator,
autoDetectResources: false,
};
Expand Down Expand Up @@ -252,7 +254,37 @@ export class AwsOpentelemetryConfigurator {
}

export function customBuildSamplerFromEnv(resource: Resource): Sampler {
// TODO: Add XRay Sampler (with resource) if specified through Environment variable
switch (process.env.OTEL_TRACES_SAMPLER) {
case 'xray': {
const samplerArgumentEnv: string | undefined = process.env.OTEL_TRACES_SAMPLER_ARG;
let endpoint: string | undefined = undefined;
let pollingInterval: number | undefined = undefined;

if (samplerArgumentEnv !== undefined) {
const args: string[] = samplerArgumentEnv.split(',');
for (const arg of args) {
const keyValue: string[] = arg.split('=', 2);
if (keyValue.length !== 2) {
continue;
}
if (keyValue[0] === 'endpoint') {
endpoint = keyValue[1];
} else if (keyValue[0] === 'polling_interval') {
pollingInterval = Number(keyValue[1]);
if (isNaN(pollingInterval)) {
pollingInterval = undefined;
diag.error('polling_interval in OTEL_TRACES_SAMPLER_ARG must be a valid number');
}
}
}
}

diag.debug(`XRay Sampler Endpoint: ${endpoint}`);
diag.debug(`XRay Sampler Polling Interval: ${pollingInterval}`);
return new AwsXRayRemoteSampler({ resource: resource, endpoint: endpoint, pollingInterval: pollingInterval });
}
}

return buildSamplerFromEnv();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

import { Attributes, Context, DiagLogger, Link, SpanKind, diag } from '@opentelemetry/api';
import { ParentBasedSampler, Sampler, SamplingResult } from '@opentelemetry/sdk-trace-base';
import { AwsXraySamplingClient } from './aws-xray-sampling-client';
import { FallbackSampler } from './fallback-sampler';
import {
AwsXRayRemoteSamplerConfig,
GetSamplingRulesResponse,
GetSamplingTargetsBody,
GetSamplingTargetsResponse,
SamplingRuleRecord,
SamplingTargetDocument,
TargetMap,
} from './remote-sampler.types';
import { DEFAULT_TARGET_POLLING_INTERVAL_SECONDS, RuleCache } from './rule-cache';
import { SamplingRuleApplier } from './sampling-rule-applier';

// 5 minute default sampling rules polling interval
const DEFAULT_RULES_POLLING_INTERVAL_SECONDS: number = 5 * 60;
// Default endpoint for awsproxy : https://aws-otel.github.io/docs/getting-started/remote-sampling#enable-awsproxy-extension
const DEFAULT_AWS_PROXY_ENDPOINT: string = 'http://localhost:2000';

export class AwsXRayRemoteSampler implements Sampler {
private rulePollingIntervalMillis: number;
private targetPollingInterval: number;
private awsProxyEndpoint: string;
private ruleCache: RuleCache;
private fallbackSampler: ParentBasedSampler;
private samplerDiag: DiagLogger;
private rulePoller: NodeJS.Timer | undefined;
private targetPoller: NodeJS.Timer | undefined;
private clientId: string;
private rulePollingJitterMillis: number;
private targetPollingJitterMillis: number;
private samplingClient: AwsXraySamplingClient;

constructor(samplerConfig: AwsXRayRemoteSamplerConfig) {
this.samplerDiag = diag;

if (samplerConfig.pollingInterval == null || samplerConfig.pollingInterval < 10) {
this.samplerDiag.warn(
`'pollingInterval' is undefined or too small. Defaulting to ${DEFAULT_RULES_POLLING_INTERVAL_SECONDS} seconds`
);
this.rulePollingIntervalMillis = DEFAULT_RULES_POLLING_INTERVAL_SECONDS * 1000;
} else {
this.rulePollingIntervalMillis = samplerConfig.pollingInterval * 1000;
}

this.rulePollingJitterMillis = Math.random() * 5 * 1000;
this.targetPollingInterval = this.getDefaultTargetPollingInterval();
this.targetPollingJitterMillis = (Math.random() / 10) * 1000;

this.awsProxyEndpoint = samplerConfig.endpoint ? samplerConfig.endpoint : DEFAULT_AWS_PROXY_ENDPOINT;
this.fallbackSampler = new ParentBasedSampler({ root: new FallbackSampler() });
this.clientId = this.generateClientId();
this.ruleCache = new RuleCache(samplerConfig.resource);

this.samplingClient = new AwsXraySamplingClient(this.awsProxyEndpoint, this.samplerDiag);

// Start the Sampling Rules poller
this.startSamplingRulesPoller();

// Start the Sampling Targets poller where the first poll occurs after the default interval
this.startSamplingTargetsPoller();
}

public getRulePoller(): NodeJS.Timer | undefined {
return this.rulePoller;
}

public getRulePollingIntervalMillis(): number {
return this.rulePollingIntervalMillis;
}

public getTargetPollingInterval(): number {
return this.targetPollingInterval;
}

public getDefaultTargetPollingInterval(): number {
return DEFAULT_TARGET_POLLING_INTERVAL_SECONDS;
}

public getSamplingClient(): AwsXraySamplingClient {
return this.samplingClient;
}

public getRuleCache(): RuleCache {
return this.ruleCache;
}

public getClientId(): string {
return this.clientId;
}

public getAwsProxyEndpoint(): string {
return this.awsProxyEndpoint;
}

public shouldSample(
context: Context,
traceId: string,
spanName: string,
spanKind: SpanKind,
attributes: Attributes,
links: Link[]
): SamplingResult {
if (this.ruleCache.isExpired()) {
this.samplerDiag.debug('Rule cache is expired, so using fallback sampling strategy');
return this.fallbackSampler.shouldSample(context, traceId, spanName, spanKind, attributes, links);
}

const matchedRule: SamplingRuleApplier | undefined = this.ruleCache.getMatchedRule(attributes);
if (matchedRule) {
return matchedRule.shouldSample(context, traceId, spanName, spanKind, attributes, links);
}

this.samplerDiag.debug(
'Using fallback sampler as no rule match was found. This is likely due to a bug, since default rule should always match'
);
return this.fallbackSampler.shouldSample(context, traceId, spanName, spanKind, attributes, links);
}

public toString(): string {
return 'AwsXRayRemoteSampler{remote sampling with AWS X-Ray}';
}

private startSamplingRulesPoller(): void {
// Execute first update
this.getAndUpdateSamplingRules();
// Update sampling rules every 5 minutes (or user-defined polling interval)
this.rulePoller = setInterval(
() => this.getAndUpdateSamplingRules(),
this.rulePollingIntervalMillis + this.rulePollingJitterMillis
);
this.rulePoller.unref();
}

private startSamplingTargetsPoller(): void {
// Update sampling targets every targetPollingInterval (usually 10 seconds)
this.targetPoller = setInterval(
() => this.getAndUpdateSamplingTargets(),
this.targetPollingInterval * 1000 + this.targetPollingJitterMillis
);
this.targetPoller.unref();
}

private getAndUpdateSamplingTargets(): void {
const requestBody: GetSamplingTargetsBody = {
SamplingStatisticsDocuments: this.ruleCache.createSamplingStatisticsDocuments(this.clientId),
};

this.samplingClient.fetchSamplingTargets(requestBody, this.updateSamplingTargets.bind(this));
}

private getAndUpdateSamplingRules(): void {
this.samplingClient.fetchSamplingRules(this.updateSamplingRules.bind(this));
}

private updateSamplingRules(responseObject: GetSamplingRulesResponse): void {
let samplingRules: SamplingRuleApplier[] = [];

samplingRules = [];
if (responseObject.SamplingRuleRecords) {
responseObject.SamplingRuleRecords.forEach((record: SamplingRuleRecord) => {
if (record.SamplingRule) {
samplingRules.push(new SamplingRuleApplier(record.SamplingRule, undefined));
}
});
this.ruleCache.updateRules(samplingRules);
} else {
this.samplerDiag.error('SamplingRuleRecords from GetSamplingRules request is not defined');
}
}

private updateSamplingTargets(responseObject: GetSamplingTargetsResponse): void {
try {
const targetDocuments: TargetMap = {};

// Create Target-Name-to-Target-Map from sampling targets response
responseObject.SamplingTargetDocuments.forEach((newTarget: SamplingTargetDocument) => {
targetDocuments[newTarget.RuleName] = newTarget;
});

// Update targets in the cache
const [refreshSamplingRules, nextPollingInterval]: [boolean, number] = this.ruleCache.updateTargets(
targetDocuments,
responseObject.LastRuleModification
);
this.targetPollingInterval = nextPollingInterval;
clearInterval(this.targetPoller);
this.startSamplingTargetsPoller();

if (refreshSamplingRules) {
this.samplerDiag.debug('Performing out-of-band sampling rule polling to fetch updated rules.');
clearInterval(this.rulePoller);
this.startSamplingRulesPoller();
}
} catch (error: unknown) {
this.samplerDiag.debug('Error occurred when updating Sampling Targets');
}
}

private generateClientId(): string {
const hexChars: string[] = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'];
const clientIdArray: string[] = [];
for (let _: number = 0; _ < 24; _ += 1) {
clientIdArray.push(hexChars[Math.floor(Math.random() * hexChars.length)]);
}
return clientIdArray.join('');
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

import { DiagLogFunction, DiagLogger, context } from '@opentelemetry/api';
import { suppressTracing } from '@opentelemetry/core';
import * as http from 'http';
import { GetSamplingRulesResponse, GetSamplingTargetsBody, GetSamplingTargetsResponse } from './remote-sampler.types';

export class AwsXraySamplingClient {
private getSamplingRulesEndpoint: string;
private samplingTargetsEndpoint: string;
private samplerDiag: DiagLogger;

constructor(endpoint: string, samplerDiag: DiagLogger) {
this.getSamplingRulesEndpoint = endpoint + '/GetSamplingRules';
this.samplingTargetsEndpoint = endpoint + '/SamplingTargets';
this.samplerDiag = samplerDiag;
}

public fetchSamplingTargets(
requestBody: GetSamplingTargetsBody,
callback: (responseObject: GetSamplingTargetsResponse) => void
) {
this.makeSamplingRequest<GetSamplingTargetsResponse>(
this.samplingTargetsEndpoint,
callback,
this.samplerDiag.debug,
JSON.stringify(requestBody)
);
}

public fetchSamplingRules(callback: (responseObject: GetSamplingRulesResponse) => void) {
this.makeSamplingRequest<GetSamplingRulesResponse>(this.getSamplingRulesEndpoint, callback, this.samplerDiag.error);
}

private makeSamplingRequest<T>(
url: string,
callback: (responseObject: T) => void,
logger: DiagLogFunction,
requestBodyJsonString?: string
): void {
const options: http.RequestOptions = {
method: 'POST',
headers: {},
};

if (requestBodyJsonString) {
options.headers = {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(requestBodyJsonString),
};
}

// Ensure AWS X-Ray Sampler does not generate traces itself
context.with(suppressTracing(context.active()), () => {
const req: http.ClientRequest = http
.request(url, options, response => {
response.setEncoding('utf-8');
let responseData: string = '';
response.on('data', dataChunk => (responseData += dataChunk));
response.on('end', () => {
if (response.statusCode === 200 && responseData.length > 0) {
let responseObject: T | undefined = undefined;
try {
responseObject = JSON.parse(responseData) as T;
} catch (e: unknown) {
logger(`Error occurred when parsing responseData from ${url}`);
}

if (responseObject) {
callback(responseObject);
}
} else {
this.samplerDiag.debug(`${url} Response Code is: ${response.statusCode}`);
this.samplerDiag.debug(`${url} responseData is: ${responseData}`);
}
});
})
.on('error', (error: unknown) => {
logger(`Error occurred when making an HTTP POST to ${url}: ${error}`);
});
if (requestBodyJsonString) {
req.end(requestBodyJsonString);
} else {
req.end();
}
});
}
}
Loading
Loading