diff --git a/aws-distro-opentelemetry-node-autoinstrumentation/src/aws-opentelemetry-configurator.ts b/aws-distro-opentelemetry-node-autoinstrumentation/src/aws-opentelemetry-configurator.ts index dac8e45d..56c66133 100644 --- a/aws-distro-opentelemetry-node-autoinstrumentation/src/aws-opentelemetry-configurator.ts +++ b/aws-distro-opentelemetry-node-autoinstrumentation/src/aws-opentelemetry-configurator.ts @@ -31,6 +31,7 @@ import { import { Aggregation, AggregationSelector, + AggregationTemporality, InstrumentType, MeterProvider, PeriodicExportingMetricReader, @@ -61,14 +62,17 @@ import { OTLPUdpSpanExporter } from './otlp-udp-exporter'; import { AwsXRayRemoteSampler } from './sampler/aws-xray-remote-sampler'; // This file is generated via `npm run compile` import { LIB_VERSION } from './version'; +import { AWSCloudWatchEMFExporter } from './exporter/otlp/aws/metrics/otlp-aws-emf-exporter'; -const XRAY_OTLP_ENDPOINT_PATTERN = '^https://xray\\.([a-z0-9-]+)\\.amazonaws\\.com/v1/traces$'; +const AWS_TRACES_OTLP_ENDPOINT_PATTERN = '^https://xray\\.([a-z0-9-]+)\\.amazonaws\\.com/v1/traces$'; +const AWS_LOGS_OTLP_ENDPOINT_PATTERN = '^https://logs\\.([a-z0-9-]+)\\.amazonaws\\.com/v1/logs$'; const APPLICATION_SIGNALS_ENABLED_CONFIG: string = 'OTEL_AWS_APPLICATION_SIGNALS_ENABLED'; const APPLICATION_SIGNALS_EXPORTER_ENDPOINT_CONFIG: string = 'OTEL_AWS_APPLICATION_SIGNALS_EXPORTER_ENDPOINT'; const METRIC_EXPORT_INTERVAL_CONFIG: string = 'OTEL_METRIC_EXPORT_INTERVAL'; const DEFAULT_METRIC_EXPORT_INTERVAL_MILLIS: number = 60000; export const AWS_LAMBDA_FUNCTION_NAME_CONFIG: string = 'AWS_LAMBDA_FUNCTION_NAME'; +export const AGENT_OBSERVABILITY_ENABLED = 'AGENT_OBSERVABILITY_ENABLED'; const AWS_XRAY_DAEMON_ADDRESS_CONFIG: string = 'AWS_XRAY_DAEMON_ADDRESS'; const FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX = 'T1S'; const FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX = 'T1U'; @@ -77,6 +81,17 @@ const FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX = 'T1U'; const LAMBDA_SPAN_EXPORT_BATCH_SIZE = 10; export const LAMBDA_APPLICATION_SIGNALS_REMOTE_ENVIRONMENT: string = 'LAMBDA_APPLICATION_SIGNALS_REMOTE_ENVIRONMENT'; +const AWS_OTLP_LOGS_GROUP_HEADER = 'x-aws-log-group'; +const AWS_OTLP_LOGS_STREAM_HEADER = 'x-aws-log-stream'; +const AWS_EMF_METRICS_NAMESPACE = 'x-aws-metric-namespace'; + +interface OtlpLogHeaderSetting { + logGroup?: string; + logStream?: string; + namespace?: string; + isValid: boolean; +} + /** * Aws Application Signals Config Provider creates a configuration object that can be provided to * the OTel NodeJS SDK for Auto Instrumentation with Application Signals Functionality. @@ -97,6 +112,7 @@ export class AwsOpentelemetryConfigurator { private sampler: Sampler; private spanProcessors: SpanProcessor[]; private propagator: TextMapPropagator; + private metricReader: PeriodicExportingMetricReader | undefined; /** * The constructor will setup the AwsOpentelemetryConfigurator object to be able to provide a @@ -180,6 +196,10 @@ export class AwsOpentelemetryConfigurator { const awsSpanProcessorProvider: AwsSpanProcessorProvider = new AwsSpanProcessorProvider(this.resource); this.spanProcessors = awsSpanProcessorProvider.getSpanProcessors(); AwsOpentelemetryConfigurator.customizeSpanProcessors(this.spanProcessors, this.resource); + + if (checkEmfExporterEnabled()) { + this.metricReader = AwsOpentelemetryConfigurator.createEmfExporterMetricReader(); + } } private customizeVersions(autoResource: Resource): Resource { @@ -187,7 +207,7 @@ export class AwsOpentelemetryConfigurator { const DISTRO_VERSION: string = LIB_VERSION; autoResource.attributes[SEMRESATTRS_TELEMETRY_AUTO_VERSION] = DISTRO_VERSION + '-aws'; diag.debug( - `@aws/aws-distro-opentelemetry-node-autoinstrumentation - version: ${autoResource.attributes[SEMRESATTRS_TELEMETRY_AUTO_VERSION]}` + `enhanced-adot-node-autoinstrumentation - version: ${autoResource.attributes[SEMRESATTRS_TELEMETRY_AUTO_VERSION]}` ); return autoResource; } @@ -211,6 +231,10 @@ export class AwsOpentelemetryConfigurator { textMapPropagator: this.propagator, }; + if (this.metricReader) { + config.metricReader = this.metricReader; + } + return config; } @@ -223,13 +247,7 @@ export class AwsOpentelemetryConfigurator { return isApplicationSignalsEnabled.toLowerCase() === 'true'; } - static customizeSpanProcessors(spanProcessors: SpanProcessor[], resource: Resource): void { - if (!AwsOpentelemetryConfigurator.isApplicationSignalsEnabled()) { - return; - } - - diag.info('AWS Application Signals enabled.'); - + static geMetricExportInterval(): number { let exportIntervalMillis: number = Number(process.env[METRIC_EXPORT_INTERVAL_CONFIG]); diag.debug(`AWS Application Signals Metrics export interval: ${exportIntervalMillis}`); @@ -239,13 +257,23 @@ export class AwsOpentelemetryConfigurator { diag.info(`AWS Application Signals metrics export interval capped to ${exportIntervalMillis}`); } + return exportIntervalMillis; + } + + static customizeSpanProcessors(spanProcessors: SpanProcessor[], resource: Resource): void { + if (!AwsOpentelemetryConfigurator.isApplicationSignalsEnabled()) { + return; + } + + diag.info('AWS Application Signals enabled.'); + spanProcessors.push(AttributePropagatingSpanProcessorBuilder.create().build()); const applicationSignalsMetricExporter: PushMetricExporter = ApplicationSignalsExporterProvider.Instance.createExporter(); const periodicExportingMetricReader: PeriodicExportingMetricReader = new PeriodicExportingMetricReader({ exporter: applicationSignalsMetricExporter, - exportIntervalMillis: exportIntervalMillis, + exportIntervalMillis: AwsOpentelemetryConfigurator.geMetricExportInterval(), }); // Register BatchUnsampledSpanProcessor to export unsampled traces in Lambda @@ -281,6 +309,22 @@ export class AwsOpentelemetryConfigurator { } } + static createEmfExporterMetricReader() { + const headersResult = validateLogsHeaders(); + if (!headersResult.logGroup) { + diag.warn('Log group is not set. Set Log Group with OTEL_EXPORTER_OTLP_LOGS_HEADERS='); + return; + } + + const emfExporter = createEmfExporter(headersResult.logGroup, headersResult.logStream, headersResult.namespace); + const periodicExportingMetricReader = new PeriodicExportingMetricReader({ + exporter: emfExporter, + exportIntervalMillis: AwsOpentelemetryConfigurator.geMetricExportInterval(), + }); + + return periodicExportingMetricReader; + } + static customizeSampler(sampler: Sampler): Sampler { if (AwsOpentelemetryConfigurator.isApplicationSignalsEnabled()) { return AlwaysRecordSampler.create(sampler); @@ -428,7 +472,7 @@ export class AwsSpanProcessorProvider { private resource: Resource; static configureOtlp(): SpanExporter { - const otlp_exporter_traces_endpoint = process.env['OTEL_EXPORTER_OTLP_TRACES_ENDPOINT']; + const otlpExporterTracesEndpoint = process.env['OTEL_EXPORTER_OTLP_TRACES_ENDPOINT']; // eslint-disable-next-line @typescript-eslint/typedef let protocol = this.getOtlpProtocol(); @@ -445,9 +489,9 @@ export class AwsSpanProcessorProvider { case 'http/json': return new OTLPHttpTraceExporter(); case 'http/protobuf': - if (otlp_exporter_traces_endpoint && isXrayOtlpEndpoint(otlp_exporter_traces_endpoint)) { + if (otlpExporterTracesEndpoint && isAwsOtlpEndpoint(otlpExporterTracesEndpoint, 'xray')) { diag.debug('Detected XRay OTLP Traces endpoint. Switching exporter to OtlpAwsSpanExporter'); - return new OTLPAwsSpanExporter(otlp_exporter_traces_endpoint); + return new OTLPAwsSpanExporter(otlpExporterTracesEndpoint); } return new OTLPProtoTraceExporter(); case 'udp': @@ -455,9 +499,9 @@ export class AwsSpanProcessorProvider { return new OTLPUdpSpanExporter(getXrayDaemonEndpoint(), FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX); default: diag.warn(`Unsupported OTLP traces protocol: ${protocol}. Using http/protobuf.`); - if (otlp_exporter_traces_endpoint && isXrayOtlpEndpoint(otlp_exporter_traces_endpoint)) { + if (otlpExporterTracesEndpoint && isAwsOtlpEndpoint(otlpExporterTracesEndpoint, 'xray')) { diag.debug('Detected XRay OTLP Traces endpoint. Switching exporter to OtlpAwsSpanExporter'); - return new OTLPAwsSpanExporter(otlp_exporter_traces_endpoint); + return new OTLPAwsSpanExporter(otlpExporterTracesEndpoint); } return new OTLPProtoTraceExporter(); } @@ -647,6 +691,8 @@ function getSamplerProbabilityFromEnv(environment: Required): numbe return probability; } +// END The OpenTelemetry Authors code + function getSpanExportBatchSize() { if (isLambdaEnvironment()) { return LAMBDA_SPAN_EXPORT_BATCH_SIZE; @@ -667,8 +713,95 @@ function getXrayDaemonEndpoint() { return process.env[AWS_XRAY_DAEMON_ADDRESS_CONFIG]; } -function isXrayOtlpEndpoint(otlpEndpoint: string | undefined) { - return otlpEndpoint && new RegExp(XRAY_OTLP_ENDPOINT_PATTERN).test(otlpEndpoint.toLowerCase()); +/** + * Determines if the given endpoint is either the AWS OTLP Traces or Logs endpoint. + */ + +export function isAwsOtlpEndpoint(otlpEndpoint: string, service: string): boolean { + const pattern = service === 'xray' ? AWS_TRACES_OTLP_ENDPOINT_PATTERN : AWS_LOGS_OTLP_ENDPOINT_PATTERN; + + return new RegExp(pattern).test(otlpEndpoint.toLowerCase()); +} + +export function checkEmfExporterEnabled(): boolean { + const exporterValue = process.env.OTEL_METRICS_EXPORTER; + if (exporterValue === undefined) { + return false; + } + + const exporters = exporterValue.split(',').map(exporter => exporter.trim()); + + const index = exporters.indexOf('awsemf'); + if (index === -1) { + return false; + } + + exporters.splice(index, 1); + + const newValue = exporters ? exporters.join(',') : undefined; + + if (typeof newValue === 'string' && newValue !== '') { + process.env.OTEL_METRICS_EXPORTER = newValue; + } else { + delete process.env.OTEL_METRICS_EXPORTER; + } + + return true; } -// END The OpenTelemetry Authors code +export function createEmfExporter( + logGroupName: string, + logStreamName?: string, + namespace?: string +): AWSCloudWatchEMFExporter { + return new AWSCloudWatchEMFExporter(namespace, logGroupName, logStreamName, AggregationTemporality.DELTA, {}); +} + +/** + * Checks if x-aws-log-group and x-aws-log-stream are present in the headers in order to send logs to + * AWS OTLP Logs endpoint. + */ +export function validateLogsHeaders(): OtlpLogHeaderSetting { + const logHeaders = process.env.OTEL_EXPORTER_OTLP_LOGS_HEADERS; + + let logGroup: string | undefined = undefined; + let logStream: string | undefined = undefined; + let namespace: string | undefined = undefined; + + if (!logHeaders) { + diag.warn( + 'Improper configuration: Please configure the environment variable OTEL_EXPORTER_OTLP_LOGS_HEADERS to include x-aws-log-group and x-aws-log-stream' + ); + return { isValid: false }; + } + + for (const pair of logHeaders.split(',')) { + const splitIndex = pair.indexOf('='); + if (splitIndex > -1) { + const key = pair.substring(0, splitIndex); + const value = pair.substring(splitIndex + 1); + + if (key === AWS_OTLP_LOGS_GROUP_HEADER && value !== '') { + logGroup = value; + } else if (key === AWS_OTLP_LOGS_STREAM_HEADER && value !== '') { + logStream = value; + } else if (key === AWS_EMF_METRICS_NAMESPACE && value !== '') { + namespace = value; + } + } + } + + const isValid = !!logGroup && !!logStream; + if (!isValid) { + diag.warn( + 'Improper configuration: Please configure the environment variable OTEL_EXPORTER_OTLP_LOGS_HEADERS to have values for x-aws-log-group and x-aws-log-stream' + ); + } + + return { + logGroup: logGroup, + logStream: logStream, + namespace: namespace, + isValid: isValid, + }; +} diff --git a/aws-distro-opentelemetry-node-autoinstrumentation/src/exporter/otlp/aws/metrics/otlp-aws-emf-exporter.ts b/aws-distro-opentelemetry-node-autoinstrumentation/src/exporter/otlp/aws/metrics/otlp-aws-emf-exporter.ts new file mode 100644 index 00000000..882404e7 --- /dev/null +++ b/aws-distro-opentelemetry-node-autoinstrumentation/src/exporter/otlp/aws/metrics/otlp-aws-emf-exporter.ts @@ -0,0 +1,590 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +/** + * OpenTelemetry EMF (Embedded Metric Format) Exporter for CloudWatch. + * This exporter converts OTel metrics into CloudWatch EMF format. + */ +import { Attributes, diag, HrTime } from '@opentelemetry/api'; +import { CloudWatchLogsClientConfig, PutLogEventsCommandInput, CloudWatchLogs } from '@aws-sdk/client-cloudwatch-logs'; +import { + Aggregation, + AggregationTemporality, + DataPoint, + DataPointType, + GaugeMetricData, + InstrumentType, + PushMetricExporter, + ResourceMetrics, +} from '@opentelemetry/sdk-metrics'; +import { Resource } from '@opentelemetry/resources'; +import { ExportResult, ExportResultCode } from '@opentelemetry/core'; +import * as Crypto from 'crypto'; + +// Constants for CloudWatch Logs limits +export const CW_MAX_EVENT_PAYLOAD_BYTES = 256 * 1024; // 256KB +export const CW_MAX_REQUEST_EVENT_COUNT = 10000; +export const CW_PER_EVENT_HEADER_BYTES = 26; +export const BATCH_FLUSH_INTERVAL = 60 * 1000; +export const CW_MAX_REQUEST_PAYLOAD_BYTES = 1 * 1024 * 1024; // 1MB +export const CW_TRUNCATED_SUFFIX = '[Truncated...]'; +export const CW_EVENT_TIMESTAMP_LIMIT_PAST = 14 * 24 * 60 * 60 * 1000; // 14 days in milliseconds +export const CW_EVENT_TIMESTAMP_LIMIT_FUTURE = 2 * 60 * 60 * 1000; // 2 hours in milliseconds + +export interface MetricRecord { + name: string; + unit: string; + description: string; + timestamp: number; + attributes: Attributes; + + // Only one of the following should be defined + sumData?: number; + value?: number; +} + +interface EMFLog { + _aws: _Aws; + [key: `otel.resource.${string}`]: string; + [metricName: string]: any; // Can be any, but usually will be used for Metric Record Data + Version: string; +} + +interface _Aws { + CloudWatchMetrics: CloudWatchMetric[]; + Timestamp: number; +} + +interface CloudWatchMetric { + Namespace: string; + Dimensions: string[][]; + Metrics: Metric[]; +} + +interface Metric { + Name: string; + Unit?: string; +} + +interface LogEvent { + message: string; + timestamp: number; +} + +/** + * OpenTelemetry metrics exporter for CloudWatch EMF format. + * + * This exporter converts OTel metrics into CloudWatch EMF logs which are then + * sent to CloudWatch Logs. CloudWatch Logs automatically extracts the metrics + * from the EMF logs. + */ +export class AWSCloudWatchEMFExporter implements PushMetricExporter { + private namespace: string; + private logGroupName: string; + private logStreamName: string; + private aggregationTemporality: AggregationTemporality; + + private logsClient: CloudWatchLogs; + private logStreamExists: boolean; + private logStreamExistsPromise: Promise; + + private EMF_SUPPORTED_UNITS: Set = new Set([ + 'Seconds', + 'Microseconds', + 'Milliseconds', + 'Bytes', + 'Kilobytes', + 'Megabytes', + 'Gigabytes', + 'Terabytes', + 'Bits', + 'Kilobits', + 'Megabits', + 'Gigabits', + 'Terabits', + 'Percent', + 'Count', + 'Bytes/Second', + 'Kilobytes/Second', + 'Megabytes/Second', + 'Gigabytes/Second', + 'Terabytes/Second', + 'Bits/Second', + 'Kilobits/Second', + 'Megabits/Second', + 'Gigabits/Second', + 'Terabits/Second', + 'Count/Second', + 'None', + ]); + + // OTel to CloudWatch unit mapping + private UNIT_MAPPING: Map = new Map( + Object.entries({ + '1': '', + ns: '', + ms: 'Milliseconds', + s: 'Seconds', + us: 'Microseconds', + By: 'Bytes', + bit: 'Bits', + }) + ); + + /** + * Initialize the CloudWatch EMF exporter. + * + * @param namespace CloudWatch namespace for metrics + * @param logGroupName CloudWatch log group name + * @param logStreamName Optional CloudWatch log stream name (auto-generated if not provided) + * @param AggregationTemporality Optional AggregationTemporality to indicate the way additive quantities are expressed + * @param cloudwatchLogsConfig Optional CloudWatch Logs Client Configuration. Configure region here if needed explicitly. + */ + constructor( + namespace: string = 'default', + logGroupName: string, + logStreamName?: string, + aggregationTemporality: AggregationTemporality = AggregationTemporality.DELTA, + cloudwatchLogsConfig: CloudWatchLogsClientConfig = {} + ) { + this.namespace = namespace; + this.logGroupName = logGroupName; + this.logStreamName = logStreamName || this.generateLogStreamName(); + this.aggregationTemporality = aggregationTemporality; + + this.logsClient = new CloudWatchLogs(cloudwatchLogsConfig); + + // Determine that Log group/stream exists asynchronously. The Constructor cannot wait on async + // operations, so whether or not the group/stream actually exists will be determined later. + this.logStreamExists = false; + this.logStreamExistsPromise = this.ensureLogGroupExists().then(async () => { + await this.ensureLogStreamExists(); + }); + } + + /** + * Generate a unique log stream name. + * + * @returns {string} + */ + private generateLogStreamName(): string { + const uniqueId = Crypto.randomUUID().substring(0, 8); + return `otel-js-${uniqueId}`; + } + + /** + * Ensure the log group exists, create if it doesn't. + */ + private async ensureLogGroupExists() { + try { + await this.logsClient.createLogGroup({ + logGroupName: this.logGroupName, + }); + diag.info(`Created log group: ${this.logGroupName}`); + } catch (e: unknown) { + if (e instanceof Error && e.name === 'ResourceAlreadyExistsException') { + diag.info(`Log group ${this.logGroupName} already exists.`); + } else { + diag.error(`Error occurred when creating log group ${this.logGroupName}: ${e}`); + throw e; + } + } + } + + /** + * Ensure the log stream exists, create if it doesn't. + */ + private async ensureLogStreamExists() { + try { + await this.logsClient.createLogStream({ + logGroupName: this.logGroupName, + logStreamName: this.logStreamName, + }); + diag.info(`Created log stream: ${this.logStreamName}`); + this.logStreamExists = true; + } catch (e: unknown) { + if (e instanceof Error && e.name === 'ResourceAlreadyExistsException') { + diag.info(`Log stream ${this.logStreamName} already exists.`); + } else { + diag.error(`Error occurred when creating log stream "${this.logStreamName}": ${e}`); + throw e; + } + } + } + + /** + * Get CloudWatch unit from unit in MetricRecord + * + * @param record Metric Record + * @returns {string | undefined} + */ + private getUnit(record: MetricRecord): string | undefined { + const unit = record.unit; + + if (this.EMF_SUPPORTED_UNITS.has(unit)) { + return unit; + } + + return this.UNIT_MAPPING.get(unit); + } + + /** + * Extract dimension names from attributes. + * For now, use all attributes as dimensions for the dimension selection logic. + * + * @param attributes OpenTelemetry Attributes to extract Dimension Names from + * @returns {string[]} + */ + private getDimensionNames(attributes: Attributes): string[] { + return Object.keys(attributes); + } + + /** + * Create a hashable key from attributes for grouping metrics. + * + * @param attributes OpenTelemetry Attributes used to create an attributes key + * @returns {string} + */ + private getAttributesKey(attributes: Attributes): string { + // Sort the attributes to ensure consistent keys + const sortedAttrs = Object.entries(attributes).sort(); + // Create a string representation of the attributes + return sortedAttrs.toString(); + } + + /** + * Normalize an OpenTelemetry timestamp to milliseconds for CloudWatch. + * + * @param hrTime Datapoint timestamp + * @returns {number} Timestamp in milliseconds + */ + private normalizeTimestamp(hrTime: HrTime): number { + // Convert from second and nanoseconds to milliseconds + const secondsToMillis = hrTime[0] * 1000; + const nanosToMillis = Math.floor(hrTime[1] / 1_000_000); + return secondsToMillis + nanosToMillis; + } + + /** + * Create a base metric record with instrument information. + * + * @param metricName Name of the metric + * @param metricUnit Unit of the metric + * @param metricDescription Description of the metric + * @param timestamp Normalized end epoch timestamp when metric data was collected + * @param attributes Attributes of the metric data + * @returns {MetricRecord} + */ + private createMetricRecord( + metricName: string, + metricUnit: string, + metricDescription: string, + timestamp: number, + attributes: Attributes + ): MetricRecord { + const record: MetricRecord = { + name: metricName, + unit: metricUnit, + description: metricDescription, + timestamp, + attributes, + }; + + return record; + } + + /** + * Convert a Gauge metric datapoint to a metric record. + * + * @param metric Gauge Metric Data + * @param dataPoint The datapoint to convert + * @returns {MetricRecord} + */ + private convertGauge(metric: GaugeMetricData, dataPoint: DataPoint): MetricRecord { + const timestampMs = this.normalizeTimestamp(dataPoint.endTime); + // Create base record + const metricRecord: MetricRecord = this.createMetricRecord( + metric.descriptor.name, + metric.descriptor.unit, + metric.descriptor.description, + timestampMs, + dataPoint.attributes + ); + metricRecord.value = dataPoint.value; // For Gauge, set the value directly + + return metricRecord; + } + + /** + * Group metric record by attributes and timestamp. + * + * @param record The metric record + * @param timestampMs The timestamp in milliseconds + * @returns {[string, number]} Values for the key to group metrics + */ + private groupByAttributesAndTimestamp(record: MetricRecord): [string, number] { + // Create a key for grouping based on attributes + const attrsKey = this.getAttributesKey(record.attributes); + return [attrsKey, record.timestamp]; + } + + /** + * Create EMF log from metric records. + * metricRecords is already grouped by attributes, so this + * function creates a single EMF Log for these records. + * + * @param metricRecords List of MetricRecords + * @param resource + * @param timestamp + * @returns {EMFLog} + */ + private createEmfLog( + metricRecords: MetricRecord[], + resource: Resource, + timestamp: number | undefined = undefined + ): EMFLog { + // Start with base structure + const emfLog: EMFLog = { + _aws: { + Timestamp: timestamp || Date.now(), + CloudWatchMetrics: [], + }, + Version: '1', + }; + + // Add resource attributes to EMF log but not as dimensions + if (resource && resource.attributes) { + for (const [key, value] of Object.entries(resource.attributes)) { + emfLog[`otel.resource.${key}`] = value?.toString() ?? 'undefined'; + } + } + // Initialize collections for dimensions and metrics + // Attributes of each record in the list should be the same + const allAttributes: Attributes = metricRecords.length > 0 ? metricRecords[0].attributes : {}; + const metricDefinitions = []; + + // Process each metric record + for (const record of metricRecords) { + const metricName = record.name; + // Skip processing if metric name is falsy + if (!metricName) { + continue; + } + + // Handle Gauge metrics - Store value directly in emfLog + // TODO: Handle metrics other than for GAUGE + if (record.value) { + emfLog[metricName] = record.value; + } else { + diag.debug(`Skipping metric ${metricName} as it does not have valid metric value`); + continue; + } + + // Create metric data + const metricData: Metric = { + Name: metricName, + }; + + const unit = this.getUnit(record); + if (unit) { + metricData.Unit = unit; + } + // Add to metric definitions list + metricDefinitions.push(metricData); + } + // Get dimension names from collected attributes + const dimensionNames = this.getDimensionNames(allAttributes); + + // Add attribute values to the root of the EMF log + for (const [name, value] of Object.entries(allAttributes)) { + emfLog[name] = value?.toString() ?? 'undefined'; + } + + // Add the single dimension set to CloudWatch Metrics if we have dimensions and metrics + if (dimensionNames && metricDefinitions) { + emfLog._aws.CloudWatchMetrics.push({ + Namespace: this.namespace, + Dimensions: [dimensionNames], + Metrics: metricDefinitions, + }); + } + + return emfLog; + } + + /** + * Method to handle safely pushing a MetricRecord into a Map of a Map of a list of MetricRecords + * + * @param groupedMetrics + * @param groupAttribute + * @param groupTimestamp + * @param record + */ + private pushMetricRecordIntoGroupedMetrics( + groupedMetrics: Map>, + groupAttribute: string, + groupTimestamp: number, + record: MetricRecord + ) { + let metricsGroupedByAttribute = groupedMetrics.get(groupAttribute); + if (!metricsGroupedByAttribute) { + metricsGroupedByAttribute = new Map(); + groupedMetrics.set(groupAttribute, metricsGroupedByAttribute); + } + + let metricsGroupedByAttributeAndTimestamp = metricsGroupedByAttribute.get(groupTimestamp); + if (!metricsGroupedByAttributeAndTimestamp) { + metricsGroupedByAttributeAndTimestamp = []; + metricsGroupedByAttribute.set(groupTimestamp, metricsGroupedByAttributeAndTimestamp); + } + metricsGroupedByAttributeAndTimestamp.push(record); + } + + /** + * Export metrics as EMF logs to CloudWatch. + * Groups metrics by attributes and timestamp before creating EMF logs. + * + * @param resourceMetrics Resource Metrics data containing scope metrics + * @param resultCallback callback for when the export has completed + * @returns {Promise} + */ + public async export(resourceMetrics: ResourceMetrics, resultCallback: (result: ExportResult) => void) { + try { + if (!resourceMetrics) { + resultCallback({ code: ExportResultCode.SUCCESS }); + return; + } + + // Process all metrics from resource metrics their scope metrics + // The resource is now part of each resource_metrics object + const resource = resourceMetrics.resource; + + for (const scopeMetrics of resourceMetrics.scopeMetrics /*resource_metrics.scope_metrics*/) { + // Map of maps to group metrics by attributes and timestamp + // Keys: (attributes_key, timestamp_ms) + // Value: list of metric records + const groupedMetrics = new Map>(); + + // Process all metrics in this scope + for (const metric of scopeMetrics.metrics) { + // Convert metrics to a format compatible with create_emf_log + // Process metric.dataPoints for different metric types + // TODO: Handle DataPointTypes other than GAUGE + if (metric.dataPointType === DataPointType.GAUGE) { + for (const dataPoint of metric.dataPoints) { + const record = this.convertGauge(metric, dataPoint); + const [groupAttribute, groupTimestamp] = this.groupByAttributesAndTimestamp(record); + this.pushMetricRecordIntoGroupedMetrics(groupedMetrics, groupAttribute, groupTimestamp, record); + } + } else { + // This else block should never run, all metric types are accounted for above + diag.debug(`Unsupported Metric Type in metric: ${metric}`); + } + } + + const sendLogEventPromises: Promise[] = []; + // Now process each group separately to create one EMF log per group + groupedMetrics.forEach((metricsRecordsGroupedByAttribute: Map, attrsKey: string) => { + // metricRecords is grouped by attribute and timestamp + metricsRecordsGroupedByAttribute.forEach((metricRecords: MetricRecord[], timestampMs: number) => { + if (metricRecords) { + diag.debug( + `Creating EMF log for group with ${ + metricRecords.length + } metrics. Timestamp: ${timestampMs}, Attributes: ${attrsKey.substring(0, 100)}...` + ); + + // Create EMF log for this batch of metrics with the group's timestamp + const emfLog = this.createEmfLog(metricRecords, resource, Number(timestampMs)); + + // Convert to JSON + const logEvent = { + message: JSON.stringify(emfLog), + timestamp: timestampMs, + }; + + // Send to CloudWatch Logs + sendLogEventPromises.push(this.sendLogEvent(logEvent)); + } + }); + }); + await Promise.all(sendLogEventPromises); + } + + resultCallback({ code: ExportResultCode.SUCCESS }); + } catch (e) { + diag.error(`Failed to export metrics: ${e}`); + const exportResult: ExportResult = { code: ExportResultCode.FAILED }; + if (e instanceof Error) { + exportResult.error = e; + } + resultCallback(exportResult); + } + } + + /** + * Send a log event to CloudWatch Logs. + * + * This function implements the same logic as the Go version in the OTel Collector. + * It batches log events according to CloudWatch Logs constraints and sends them + * when the batch is full or spans more than 24 hours. + * + * @param logEvent The log event to send + * @returns {Promise} + */ + private async sendLogEvent(logEvent: LogEvent) { + // Prepare the PutLogEvents request + const putLogEventsInput: PutLogEventsCommandInput = { + logStreamName: this.logStreamName, + logEvents: [logEvent], + logGroupName: this.logGroupName, + }; + + try { + if (!this.logStreamExists) { + // Must perform logStreamExistsPromise check here because promises cannot be "awaited" in constructor. + // Once the logStreamExistsPromise has resolved, this.logStreamExists will be true and this code block will be skipped. + await this.logStreamExistsPromise; + } + + // Make the PutLogEvents call + await this.logsClient.putLogEvents(putLogEventsInput); + + diag.debug('Successfully sent log event'); + } catch (e) { + diag.error(`Failed to send log events: ${e}`); + throw e; + } + } + + /** + * Force flush any pending metrics. + */ + public async forceFlush() { + diag.debug('AWSCloudWatchEMFExporter force flushes the bufferred metrics'); + } + + /** + * Shutdown the exporter after force flush. + * + * @returns {Promise} + */ + public async shutdown() { + await this.forceFlush(); + diag.debug('AWSCloudWatchEMFExporter shutdown called'); + return Promise.resolve(); + } + + selectAggregationTemporality(instrumentType: InstrumentType): AggregationTemporality { + return this.aggregationTemporality; + } + + selectAggregation(instrumentType: InstrumentType): Aggregation { + switch (instrumentType) { + case InstrumentType.HISTOGRAM: { + return Aggregation.ExponentialHistogram(); + } + } + return Aggregation.Default(); + } +} diff --git a/aws-distro-opentelemetry-node-autoinstrumentation/src/utils.ts b/aws-distro-opentelemetry-node-autoinstrumentation/src/utils.ts index 0fd74c0d..4e31bd73 100644 --- a/aws-distro-opentelemetry-node-autoinstrumentation/src/utils.ts +++ b/aws-distro-opentelemetry-node-autoinstrumentation/src/utils.ts @@ -1,6 +1,8 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 +import { diag } from '@opentelemetry/api'; + export const getNodeVersion = () => { const nodeVersion = process.versions.node; const versionParts = nodeVersion.split('.'); @@ -17,3 +19,23 @@ export const getNodeVersion = () => { return majorVersion; }; + +/** + * Get AWS region from environment or boto3 session. + * Returns the AWS region in the following priority order: + * 1. AWS_REGION environment variable + * 2. AWS_DEFAULT_REGION environment variable + * 3. undefined if no region can be determined + */ +export const getAwsRegionFromEnvironment = (): string | undefined => { + const region = process.env.AWS_REGION ?? process.env.AWS_DEFAULT_REGION; + if (region) { + return region; + } + + diag.warn( + 'AWS region not found in environment variables (AWS_REGION, AWS_DEFAULT_REGION). Please set AWS_REGION environment variable explicitly.' + ); + + return undefined; +}; diff --git a/aws-distro-opentelemetry-node-autoinstrumentation/test/aws-opentelemetry-configurator.test.ts b/aws-distro-opentelemetry-node-autoinstrumentation/test/aws-opentelemetry-configurator.test.ts index e4d3d83b..1bf6d189 100644 --- a/aws-distro-opentelemetry-node-autoinstrumentation/test/aws-opentelemetry-configurator.test.ts +++ b/aws-distro-opentelemetry-node-autoinstrumentation/test/aws-opentelemetry-configurator.test.ts @@ -34,7 +34,11 @@ import { ApplicationSignalsExporterProvider, AwsOpentelemetryConfigurator, AwsSpanProcessorProvider, + checkEmfExporterEnabled, + createEmfExporter, customBuildSamplerFromEnv, + isAwsOtlpEndpoint, + validateLogsHeaders, } from '../src/aws-opentelemetry-configurator'; import { AwsSpanMetricsProcessor } from '../src/aws-span-metrics-processor'; import { OTLPUdpSpanExporter } from '../src/otlp-udp-exporter'; @@ -42,6 +46,8 @@ import { setAwsDefaultEnvironmentVariables } from '../src/register'; import { AwsXRayRemoteSampler } from '../src/sampler/aws-xray-remote-sampler'; import { AwsXraySamplingClient } from '../src/sampler/aws-xray-sampling-client'; import { GetSamplingRulesResponse } from '../src/sampler/remote-sampler.types'; +import { CloudWatchLogs } from '@aws-sdk/client-cloudwatch-logs'; +import { AWSCloudWatchEMFExporter } from '../src/exporter/otlp/aws/metrics/otlp-aws-emf-exporter'; // Tests AwsOpenTelemetryConfigurator after running Environment Variable setup in register.ts describe('AwsOpenTelemetryConfiguratorTest', () => { @@ -667,4 +673,52 @@ describe('AwsOpenTelemetryConfiguratorTest', () => { // Cleanup delete process.env.OTEL_EXPORTER_OTLP_TRACES_PROTOCOL; }); + + it('testCheckEmfExporterEnabled', () => { + process.env.OTEL_METRICS_EXPORTER = 'first,awsemf,third'; + checkEmfExporterEnabled(); + expect(process.env.OTEL_METRICS_EXPORTER).toEqual('first,third'); + }); + + it('testCreateEmfExporter', () => { + sinon.stub(CloudWatchLogs.prototype, 'describeLogGroups').callsFake(input => { + return { logGroups: [] }; + }); + sinon.stub(CloudWatchLogs.prototype, 'createLogGroup').callsFake(input => { + return {}; + }); + sinon.stub(CloudWatchLogs.prototype, 'createLogStream').callsFake(input => { + return {}; + }); + + const exporter = createEmfExporter('test-log-group', undefined, 'TestNamespace'); + + expect(exporter).toBeInstanceOf(AWSCloudWatchEMFExporter); + sinon.restore(); + }); + + it('testIsAwsOtlpEndpoint', () => { + expect(isAwsOtlpEndpoint('https://xray.us-east-1.amazonaws.com/v1/traces', 'xray')).toBeTruthy(); + expect(isAwsOtlpEndpoint('https://lambda.us-east-1.amazonaws.com/v1/traces', 'xray')).toBeFalsy(); + expect(isAwsOtlpEndpoint('https://logs.us-east-1.amazonaws.com/v1/logs', 'logs')).toBeTruthy(); + expect(isAwsOtlpEndpoint('https://lambda.us-east-1.amazonaws.com/v1/logs', 'logs')).toBeFalsy(); + }); + + it('testValidateLogsHeaders', () => { + process.env.OTEL_EXPORTER_OTLP_LOGS_HEADERS = + 'x-aws-log-group=/test/log/group/name,x-aws-log-stream=test_log_stream_name,x-aws-metric-namespace=TEST_NAMESPACE'; + let headerSettings = validateLogsHeaders(); + expect(headerSettings).toEqual({ + logGroup: '/test/log/group/name', + logStream: 'test_log_stream_name', + namespace: 'TEST_NAMESPACE', + isValid: true, + }); + + delete process.env.OTEL_EXPORTER_OTLP_LOGS_HEADERS; + headerSettings = validateLogsHeaders(); + expect(headerSettings).toEqual({ + isValid: false, + }); + }); }); diff --git a/aws-distro-opentelemetry-node-autoinstrumentation/test/exporter/otlp/aws/common/test-utils.test.ts b/aws-distro-opentelemetry-node-autoinstrumentation/test/exporter/otlp/aws/common/test-utils.test.ts new file mode 100644 index 00000000..cdf04829 --- /dev/null +++ b/aws-distro-opentelemetry-node-autoinstrumentation/test/exporter/otlp/aws/common/test-utils.test.ts @@ -0,0 +1,20 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +export const AWS_AUTH_PATH = '../../../../../src/exporter/otlp/aws/common/aws-authenticator'; +export const AWS_SPAN_EXPORTER_PATH = '../../../../../src/exporter/otlp/aws/traces/otlp-aws-span-exporter'; +export const AWS_LOG_EXPORTER_PATH = '../../../../../src/exporter/otlp/aws/logs/otlp-aws-log-exporter'; + +export const SIGNATURE_V4_MODULE = '@smithy/signature-v4'; +export const CREDENTIAL_PROVIDER_MODULE = '@aws-sdk/credential-provider-node'; +export const SHA_256_MODULE = '@aws-crypto/sha256-js'; +export const AWS_HTTP_MODULE = '@smithy/protocol-http'; + +export const AWS_OTLP_TRACES_ENDPOINT = 'https://xray.us-east-1.amazonaws.com'; +export const AWS_OTLP_TRACES_ENDPOINT_PATH = '/v1/traces'; + +export const AWS_OTLP_LOGS_ENDPOINT = 'https://logs.us-east-1.amazonaws.com'; +export const AWS_OTLP_LOGS_ENDPOINT_PATH = '/v1/logs'; + +export const AUTHORIZATION_HEADER = 'authorization'; +export const X_AMZ_DATE_HEADER = 'x-amz-date'; +export const X_AMZ_SECURITY_TOKEN_HEADER = 'x-amz-security-token'; diff --git a/aws-distro-opentelemetry-node-autoinstrumentation/test/exporter/otlp/aws/metrics/otlp-aws-emf-exporter.test.ts b/aws-distro-opentelemetry-node-autoinstrumentation/test/exporter/otlp/aws/metrics/otlp-aws-emf-exporter.test.ts new file mode 100644 index 00000000..3529483b --- /dev/null +++ b/aws-distro-opentelemetry-node-autoinstrumentation/test/exporter/otlp/aws/metrics/otlp-aws-emf-exporter.test.ts @@ -0,0 +1,537 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +import { ValueType } from '@opentelemetry/api'; +import { Resource } from '@opentelemetry/resources'; +import expect from 'expect'; +import * as sinon from 'sinon'; +import { + CloudWatchLogs, +} from '@aws-sdk/client-cloudwatch-logs'; +import { + AWSCloudWatchEMFExporter, + MetricRecord, +} from '../../../../../src/exporter/otlp/aws/metrics/otlp-aws-emf-exporter'; +import { + Aggregation, + AggregationTemporality, + DataPoint, + DataPointType, + GaugeMetricData, + InstrumentType, + ResourceMetrics, +} from '@opentelemetry/sdk-metrics'; +import { ExportResultCode } from '@opentelemetry/core'; + +describe('TestAWSCloudWatchEMFExporter', () => { + /* Test AWSCloudWatchEMFExporter class. */ + let exporter: AWSCloudWatchEMFExporter; + + beforeEach(() => { + // Stub CloudWatchLogs to avoid AWS calls + sinon.stub(CloudWatchLogs.prototype, 'describeLogGroups').callsFake(input => { + return { logGroups: [] }; + }); + sinon.stub(CloudWatchLogs.prototype, 'createLogGroup').callsFake(input => { + return {}; + }); + sinon.stub(CloudWatchLogs.prototype, 'createLogStream').callsFake(input => { + return {}; + }); + + exporter = new AWSCloudWatchEMFExporter( + 'TestNamespace', + 'test-log-group', + undefined, + AggregationTemporality.DELTA, + {} + ); + }); + + afterEach(() => { + sinon.restore(); + }); + + it('TestInitialization', () => { + /* Test exporter initialization. */ + expect(exporter['namespace']).toEqual('TestNamespace'); + expect(exporter['logGroupName']).not.toBeUndefined(); + expect(exporter['logStreamName']).not.toBeUndefined(); + expect(exporter['aggregationTemporality']).not.toBeUndefined(); + }); + + it('TestInitializationWithCustomParams', () => { + /* Test exporter initialization with custom parameters. */ + + const newExporter = new AWSCloudWatchEMFExporter( + 'CustomNamespace', + 'custom-log-group', + 'custom-stream', + AggregationTemporality.DELTA, + {} + ); + + expect(newExporter['namespace']).toEqual('CustomNamespace'); + expect(newExporter['logGroupName']).toEqual('custom-log-group'); + expect(newExporter['logStreamName']).toEqual('custom-stream'); + }); + + it('TestGetUnitMapping', () => { + /* Test unit mapping functionality. */ + // Test known units + expect( + exporter['getUnit']({ + name: 'testName', + unit: 'ms', + description: 'testDescription', + timestamp: Date.now(), + attributes: {}, + }) + ).toEqual('Milliseconds'); + expect( + exporter['getUnit']({ + name: 'testName', + unit: 's', + description: 'testDescription', + timestamp: Date.now(), + attributes: {}, + }) + ).toEqual('Seconds'); + expect( + exporter['getUnit']({ + name: 'testName', + unit: 'By', + description: 'testDescription', + timestamp: Date.now(), + attributes: {}, + }) + ).toEqual('Bytes'); + expect( + exporter['getUnit']({ + name: 'testName', + unit: '%', + description: 'testDescription', + timestamp: Date.now(), + attributes: {}, + }) + ).toBeUndefined(); + + // Test unknown unit + expect( + exporter['getUnit']({ + name: 'testName', + unit: 'unknown', + description: 'testDescription', + timestamp: Date.now(), + attributes: {}, + }) + ).toBeUndefined(); + + // Test empty unit (should return undefined due to falsy check) + expect( + exporter['getUnit']({ + name: 'testName', + unit: '', + description: 'testDescription', + timestamp: Date.now(), + attributes: {}, + }) + ).toBeUndefined(); + }); + + it('TestGetDimensionNames', () => { + /* Test dimension names extraction. */ + const attributes = { 'service.name': 'test-service', env: 'prod', region: 'us-east-1' }; + + const result = exporter['getDimensionNames'](attributes); + + // Should return all attribute keys + expect(result).toContain('service.name'); + expect(result).toContain('env'); + expect(result).toContain('region'); + }); + + it('TestGetAttributesKey', () => { + /* Test attributes key generation. */ + const attributes = { service: 'test', env: 'prod' }; + + const result = exporter['getAttributesKey'](attributes); + + // Should be a string representation of sorted attributes + expect(typeof result).toEqual('string'); + expect(result).toContain('service'); + expect(result).toContain('test'); + expect(result).toContain('env'); + expect(result).toContain('prod'); + }); + + it('TestGetAttributesKeyConsistent', () => { + /* Test that attributes key generation is consistent. */ + // Same attributes in different order should produce same key + const attrs1 = { b: '2', a: '1' }; + const attrs2 = { a: '1', b: '2' }; + + const key1 = exporter['getAttributesKey'](attrs1); + const key2 = exporter['getAttributesKey'](attrs2); + + expect(key1).toEqual(key2); + }); + + it('TestGroupByAttributesAndTimestamp', () => { + /* Test grouping by attributes and timestamp. */ + const record: MetricRecord = { + name: 'test_metric', + unit: 'ms', + description: 'test description', + timestamp: Date.now(), + attributes: { env: 'test' }, + }; + + const result = exporter['groupByAttributesAndTimestamp'](record); + + // Should return a tuple with attributes key and timestamp + expect(result.length).toEqual(2); + expect(typeof result[0]).toEqual('string'); + expect(typeof result[1]).toEqual('number'); + expect(result[1]).toEqual(record.timestamp); + }); + + it('TestGenerateLogStreamName', () => { + /* Test log stream name generation. */ + const name1 = exporter['generateLogStreamName'](); + const name2 = exporter['generateLogStreamName'](); + + // Should generate unique names + expect(name1).not.toEqual(name2); + expect(name1.startsWith('otel-js-')).toBeTruthy(); + expect(name2.startsWith('otel-js-')).toBeTruthy(); + }); + + it('TestNormalizeTimestamp', () => { + /* Test timestamp normalization. */ + const timestampNs = 1609459200000000000; // 2021-01-01 00:00:00 in nanoseconds + const expectedMs = 1609459200000; // Same time in milliseconds + + const result = exporter['normalizeTimestamp']([0, timestampNs]); + expect(result).toEqual(expectedMs); + }); + + it('TestCreateMetricRecord', () => { + /* Test metric record creation. */ + const record = exporter['createMetricRecord']('test_metric', 'Count', 'Test description', Date.now(), {}); + + expect(record).not.toBeUndefined(); + expect(record).not.toBeUndefined(); + expect(record.name).toEqual('test_metric'); + expect(record.unit).toEqual('Count'); + expect(record.description).toEqual('Test description'); + }); + + it('TestConvertGauge', () => { + const dp: DataPoint = { + startTime: [0, 0], + endTime: [1, 3_000_000], + attributes: { key: 'value' }, + value: 42.5, + }; + + /* Test gauge conversion. */ + const metric: GaugeMetricData = { + dataPointType: DataPointType.GAUGE, + descriptor: { + name: 'test_gauge_metric_data', + unit: 'Count', + description: 'Gauge description', + valueType: ValueType.DOUBLE, + type: InstrumentType.GAUGE, + }, + dataPoints: [dp], + aggregationTemporality: AggregationTemporality.DELTA, + }; + + const record = exporter['convertGauge'](metric, dp); + + expect(record).not.toBeUndefined(); + expect(record.name).toEqual('test_gauge_metric_data'); + expect(record.value).toEqual(42.5); + expect(record.attributes).toEqual({ key: 'value' }); + expect(record.timestamp).toEqual(1003); + }); + + it('TestCreateEmfLog', () => { + /* Test EMF log creation. */ + // Create test records + const gaugeRecord: MetricRecord = { + ...exporter['createMetricRecord']('gauge_metric', 'Count', 'Gauge', Date.now(), { env: 'test' }), + value: 50.0, + }; + + // TODO: Test Sum metric record + + const records = [gaugeRecord]; + const resource = new Resource({ 'service.name': 'test-service' }); + + const result = exporter['createEmfLog'](records, resource); + + expect(result).toHaveProperty('_aws'); + expect(result._aws.CloudWatchMetrics[0].Namespace).toEqual('TestNamespace'); + expect(result._aws.CloudWatchMetrics[0].Dimensions[0][0]).toEqual('env'); + expect(result._aws.CloudWatchMetrics[0].Metrics[0].Name).toEqual('gauge_metric'); + expect(result._aws.CloudWatchMetrics[0].Metrics[0].Unit).toEqual('Count'); + expect(result).toHaveProperty('Version', '1'); + expect(result['otel.resource.service.name']).toEqual('test-service'); // toHaveProperty() doesn't work with '.' + expect(result).toHaveProperty('gauge_metric', 50); + expect(result).toHaveProperty('env', 'test'); + + // Sanity check that the result is JSON serializable, and doesn't throw error + JSON.stringify(result); + }); + + it('TestExportSuccess', done => { + /* Test successful export. */ + // Mock CloudWatch Logs client + sinon.stub(exporter['logsClient'], 'putLogEvents').callsFake(input => { + return { nextSequenceToken: '12345' }; + }); + + // Create empty metrics data to test basic export flow + const resourceMetricsData: ResourceMetrics = { + resource: new Resource({}), + scopeMetrics: [], + }; + + exporter.export(resourceMetricsData, result => { + expect(result.code).toEqual(ExportResultCode.SUCCESS); + done(); + }); + }); + + it('TestExportSuccessWithManyResourceMetrics', done => { + /* Test successful export. */ + // Mock CloudWatch Logs client + sinon.stub(exporter['logsClient'], 'putLogEvents').callsFake(input => { + return { nextSequenceToken: '12345' }; + }); + + // Create empty metrics data to test basic export flow + const resourceMetricsData: ResourceMetrics = { + resource: new Resource({}), + scopeMetrics: [ + { + scope: { + name: 'test', + }, + metrics: [ + { + dataPoints: [ + { + startTime: [0, 0], + endTime: [1, 1], + value: 3, + attributes: {}, + }, + ], + dataPointType: DataPointType.GAUGE, + descriptor: { + name: 'descriptorName', + description: 'descriptionName', + unit: 'ms', + type: InstrumentType.GAUGE, + valueType: ValueType.INT, + }, + aggregationTemporality: AggregationTemporality.DELTA, + }, + ], + }, + { + scope: { + name: 'test', + }, + metrics: [ + { + dataPoints: [ + { + startTime: [0, 0], + endTime: [1, 1], + value: 3, + attributes: {}, + }, + ], + isMonotonic: true, + dataPointType: DataPointType.SUM, + descriptor: { + name: 'descriptorName', + description: 'descriptionName', + unit: 'ms', + type: InstrumentType.COUNTER, + valueType: ValueType.INT, + }, + aggregationTemporality: AggregationTemporality.DELTA, + }, + ], + }, + { + scope: { + name: 'test', + }, + metrics: [ + { + dataPoints: [ + { + startTime: [0, 0], + endTime: [1, 1], + value: { + buckets: { + boundaries: [], + counts: [], + }, + sum: 7, + count: 3, + min: 1, + max: 5, + }, + attributes: {}, + }, + ], + dataPointType: DataPointType.HISTOGRAM, + descriptor: { + name: 'descriptorName', + description: 'descriptionName', + unit: 'ms', + type: InstrumentType.HISTOGRAM, + valueType: ValueType.INT, + }, + aggregationTemporality: AggregationTemporality.DELTA, + }, + ], + }, + { + scope: { + name: 'test', + }, + metrics: [ + { + dataPoints: [ + { + startTime: [0, 0], + endTime: [1, 1], + value: { + count: 2, + sum: 8, + scale: 1, + zeroCount: 0, + positive: { + offset: 0, + bucketCounts: [1, 2], + }, + negative: { + offset: 0, + bucketCounts: [1, 2], + }, + min: 2, + max: 6, + }, + attributes: {}, + }, + ], + dataPointType: DataPointType.EXPONENTIAL_HISTOGRAM, + descriptor: { + name: 'descriptorName', + description: 'descriptionName', + unit: 'ms', + type: InstrumentType.HISTOGRAM, + valueType: ValueType.INT, + }, + aggregationTemporality: AggregationTemporality.DELTA, + }, + ], + }, + ], + }; + + exporter.export(resourceMetricsData, result => { + expect(result.code).toEqual(ExportResultCode.SUCCESS); + done(); + }); + }); + + it('TestExportFailure', done => { + /* Test export failure handling. */ + // Create metrics data that will cause an exception during iteration + const metricsData: ResourceMetrics = { + resource: new Resource({}), + scopeMetrics: [undefined as any], // will cause an error to throw + }; + + exporter.export(metricsData, result => { + expect(result.code).toEqual(ExportResultCode.FAILED); + done(); + }); + }); + + it('TestForceF', async () => { + await expect(exporter.forceFlush()).resolves.not.toThrow(); + }); + + it('TestShutdown', async () => { + /* Test shutdown functionality. */ + + const forceFlushStub = sinon.stub(exporter, 'forceFlush'); + + // Ensure this call doesn't reject + await exporter.shutdown(); + + sinon.assert.calledOnce(forceFlushStub); + }); + + it('TestSelectAggregationTemporality', async () => { + // Default is AggregationTemporality.DELTA + expect(exporter.selectAggregationTemporality(InstrumentType.HISTOGRAM)).toEqual(AggregationTemporality.DELTA); + }); + + it('TestSelectAggregation', async () => { + // Should return ExponentialHistogram Aggregation for HISTOGRAM InstrumentType + expect(exporter.selectAggregation(InstrumentType.HISTOGRAM)).toEqual(Aggregation.ExponentialHistogram()); + + // Should return Default Aggregation for other InstrumentType + expect(exporter.selectAggregation(InstrumentType.COUNTER)).toEqual(Aggregation.Default()); + expect(exporter.selectAggregation(InstrumentType.GAUGE)).toEqual(Aggregation.Default()); + }); + + it('TestEnsureLogGroupExists', async () => { + exporter['logGroupName'] = 'groupName'; + + (exporter['logsClient'].createLogGroup as any).callsFake(async () => { + const err = new Error('SpecifiedError'); + err.name = 'ResourceAlreadyExistsException'; + throw err; + }); + await exporter['ensureLogGroupExists'](); + + (exporter['logsClient'].createLogGroup as any).callsFake(async () => { + throw Error('SomeError'); + }); + await expect(async () => { + return exporter['ensureLogGroupExists'](); + }).rejects.toThrow('SomeError'); + }); + + it('TestEnsureLogStreamExists', async () => { + exporter['logGroupName'] = 'groupName'; + exporter['logStreamName'] = 'streamName'; + + (exporter['logsClient'].createLogStream as any).callsFake(async () => { + const err = new Error('SpecifiedError'); + err.name = 'ResourceAlreadyExistsException'; + throw err; + }); + await exporter['ensureLogStreamExists'](); + + (exporter['logsClient'].createLogStream as any).callsFake(async () => { + throw Error('SomeError'); + }); + await expect(async () => { + return exporter['ensureLogStreamExists'](); + }).rejects.toThrow('SomeError'); + }); +}); diff --git a/aws-distro-opentelemetry-node-autoinstrumentation/test/utils.test.ts b/aws-distro-opentelemetry-node-autoinstrumentation/test/utils.test.ts new file mode 100644 index 00000000..aacea5a5 --- /dev/null +++ b/aws-distro-opentelemetry-node-autoinstrumentation/test/utils.test.ts @@ -0,0 +1,24 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +import expect from 'expect'; +import { getAwsRegionFromEnvironment } from '../src/utils'; + +describe('Utils', function () { + beforeEach(() => { + delete process.env.AWS_REGION; + delete process.env.AWS_DEFAULT_REGION; + }); + + it('Test getAwsRegion from AWS_REGION env var', () => { + process.env.AWS_REGION = 'us-west-2'; + process.env.AWS_DEFAULT_REGION = 'eu-west-1'; + expect(getAwsRegionFromEnvironment()).toEqual('us-west-2'); + }); + + it('Test getAwsRegion from AWS_REGION env var', () => { + delete process.env.AWS_REGION; + process.env.AWS_DEFAULT_REGION = 'eu-west-1'; + expect(getAwsRegionFromEnvironment()).toEqual('eu-west-1'); + }); +});