From bd5dd05d4b199c610a5e168a32f5172354ea3a82 Mon Sep 17 00:00:00 2001 From: Jerry Leung Date: Fri, 24 May 2024 16:44:30 -0700 Subject: [PATCH 01/16] Added WithLogs and its handling --- collector/internal/lifecycle/manager.go | 2 +- collector/internal/telemetryapi/client.go | 8 +- .../receiver/telemetryapireceiver/README.md | 10 +- .../receiver/telemetryapireceiver/factory.go | 23 ++- .../sharedcomponent/sharedcomponent.go | 76 ++++++++ .../sharedcomponent/sharedcomponent_test.go | 72 ++++++++ .../receiver/telemetryapireceiver/receiver.go | 166 +++++++++++++++--- .../telemetryapireceiver/receiver_test.go | 164 ++++++++++++++++- .../receiver/telemetryapireceiver/types.go | 6 +- 9 files changed, 483 insertions(+), 44 deletions(-) create mode 100644 collector/receiver/telemetryapireceiver/internal/sharedcomponent/sharedcomponent.go create mode 100644 collector/receiver/telemetryapireceiver/internal/sharedcomponent/sharedcomponent_test.go diff --git a/collector/internal/lifecycle/manager.go b/collector/internal/lifecycle/manager.go index 68f6939190..c6d9eb9305 100644 --- a/collector/internal/lifecycle/manager.go +++ b/collector/internal/lifecycle/manager.go @@ -75,7 +75,7 @@ func NewManager(ctx context.Context, logger *zap.Logger, version string) (contex } telemetryClient := telemetryapi.NewClient(logger) - _, err = telemetryClient.Subscribe(ctx, res.ExtensionID, addr) + _, err = telemetryClient.Subscribe(ctx, []telemetryapi.EventType{telemetryapi.Platform}, res.ExtensionID, addr) if err != nil { logger.Fatal("Cannot register Telemetry API client", zap.Error(err)) } diff --git a/collector/internal/telemetryapi/client.go b/collector/internal/telemetryapi/client.go index db57573fc2..08a0c67f05 100644 --- a/collector/internal/telemetryapi/client.go +++ b/collector/internal/telemetryapi/client.go @@ -46,13 +46,7 @@ func NewClient(logger *zap.Logger) *Client { } } -func (c *Client) Subscribe(ctx context.Context, extensionID string, listenerURI string) (string, error) { - eventTypes := []EventType{ - Platform, - // Function, - // Extension, - } - +func (c *Client) Subscribe(ctx context.Context, eventTypes []EventType, extensionID string, listenerURI string) (string, error) { bufferingConfig := BufferingCfg{ MaxItems: 1000, MaxBytes: 256 * 1024, diff --git a/collector/receiver/telemetryapireceiver/README.md b/collector/receiver/telemetryapireceiver/README.md index fe2c2745f1..e51b420f2d 100644 --- a/collector/receiver/telemetryapireceiver/README.md +++ b/collector/receiver/telemetryapireceiver/README.md @@ -1,10 +1,10 @@ # Telemetry API Receiver -| Status | | -| ------------------------ |-----------------| -| Stability | [alpha] | -| Supported pipeline types | traces | -| Distributions | [extension] | +| Status | | +| ------------------------ |--------------| +| Stability | [alpha] | +| Supported pipeline types | traces, logs | +| Distributions | [extension] | This receiver generates telemetry in response to events from the [Telemetry API](https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api.html). It does this by setting up an endpoint and registering itself with the Telemetry API on startup. diff --git a/collector/receiver/telemetryapireceiver/factory.go b/collector/receiver/telemetryapireceiver/factory.go index c89dcbeac8..63c8b2ce23 100644 --- a/collector/receiver/telemetryapireceiver/factory.go +++ b/collector/receiver/telemetryapireceiver/factory.go @@ -18,6 +18,7 @@ import ( "context" "errors" + "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver/internal/sharedcomponent" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/receiver" @@ -39,7 +40,8 @@ func NewFactory(extensionID string) receiver.Factory { extensionID: extensionID, } }, - receiver.WithTraces(createTracesReceiver, stability)) + receiver.WithTraces(createTracesReceiver, stability), + receiver.WithLogs(createLogsReceiver, stability)) } func createTracesReceiver(ctx context.Context, params receiver.CreateSettings, rConf component.Config, next consumer.Traces) (receiver.Traces, error) { @@ -47,6 +49,23 @@ func createTracesReceiver(ctx context.Context, params receiver.CreateSettings, r if !ok { return nil, errConfigNotTelemetryAPI } + r := receivers.GetOrAdd(cfg, func() component.Component { + return newTelemetryAPIReceiver(cfg, params) + }) + r.Unwrap().(*telemetryAPIReceiver).registerTracesConsumer(next) + return r, nil +} - return newTelemetryAPIReceiver(cfg, next, params) +func createLogsReceiver(ctx context.Context, params receiver.CreateSettings, rConf component.Config, next consumer.Logs) (receiver.Logs, error) { + cfg, ok := rConf.(*Config) + if !ok { + return nil, errConfigNotTelemetryAPI + } + r := receivers.GetOrAdd(cfg, func() component.Component { + return newTelemetryAPIReceiver(cfg, params) + }) + r.Unwrap().(*telemetryAPIReceiver).registerLogsConsumer(next) + return r, nil } + +var receivers = sharedcomponent.NewSharedComponents() diff --git a/collector/receiver/telemetryapireceiver/internal/sharedcomponent/sharedcomponent.go b/collector/receiver/telemetryapireceiver/internal/sharedcomponent/sharedcomponent.go new file mode 100644 index 0000000000..b297d81586 --- /dev/null +++ b/collector/receiver/telemetryapireceiver/internal/sharedcomponent/sharedcomponent.go @@ -0,0 +1,76 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package sharedcomponent exposes util functionality for receivers and exporters +// that need to share state between different signal types instances such as net.Listener or os.File. +package sharedcomponent // import "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver/internal/sharedcomponent" + +import ( + "context" + "sync" + + "go.opentelemetry.io/collector/component" +) + +// SharedComponents a map that keeps reference of all created instances for a given configuration, +// and ensures that the shared state is started and stopped only once. +type SharedComponents struct { + comps map[any]*SharedComponent +} + +// NewSharedComponents returns a new empty SharedComponents. +func NewSharedComponents() *SharedComponents { + return &SharedComponents{ + comps: make(map[any]*SharedComponent), + } +} + +// GetOrAdd returns the already created instance if exists, otherwise creates a new instance +// and adds it to the map of references. +func (scs *SharedComponents) GetOrAdd(key any, create func() component.Component) *SharedComponent { + if c, ok := scs.comps[key]; ok { + return c + } + newComp := &SharedComponent{ + Component: create(), + removeFunc: func() { + delete(scs.comps, key) + }, + } + scs.comps[key] = newComp + return newComp +} + +// SharedComponent ensures that the wrapped component is started and stopped only once. +// When stopped it is removed from the SharedComponents map. +type SharedComponent struct { + component.Component + + startOnce sync.Once + stopOnce sync.Once + removeFunc func() +} + +// Unwrap returns the original component. +func (r *SharedComponent) Unwrap() component.Component { + return r.Component +} + +// Start implements component.Component. +func (r *SharedComponent) Start(ctx context.Context, host component.Host) error { + var err error + r.startOnce.Do(func() { + err = r.Component.Start(ctx, host) + }) + return err +} + +// Shutdown implements component.Component. +func (r *SharedComponent) Shutdown(ctx context.Context) error { + var err error + r.stopOnce.Do(func() { + err = r.Component.Shutdown(ctx) + r.removeFunc() + }) + return err +} diff --git a/collector/receiver/telemetryapireceiver/internal/sharedcomponent/sharedcomponent_test.go b/collector/receiver/telemetryapireceiver/internal/sharedcomponent/sharedcomponent_test.go new file mode 100644 index 0000000000..dad4886c17 --- /dev/null +++ b/collector/receiver/telemetryapireceiver/internal/sharedcomponent/sharedcomponent_test.go @@ -0,0 +1,72 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package sharedcomponent + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" +) + +var id = component.MustNewID("test") + +func TestNewSharedComponents(t *testing.T) { + comps := NewSharedComponents() + assert.Len(t, comps.comps, 0) +} + +type mockComponent struct { + component.StartFunc + component.ShutdownFunc +} + +func TestSharedComponents_GetOrAdd(t *testing.T) { + nop := &mockComponent{} + createNop := func() component.Component { return nop } + + comps := NewSharedComponents() + got := comps.GetOrAdd(id, createNop) + assert.Len(t, comps.comps, 1) + assert.Same(t, nop, got.Unwrap()) + assert.Same(t, got, comps.GetOrAdd(id, createNop)) + + // Shutdown nop will remove + assert.NoError(t, got.Shutdown(context.Background())) + assert.Len(t, comps.comps, 0) + assert.NotSame(t, got, comps.GetOrAdd(id, createNop)) +} + +func TestSharedComponent(t *testing.T) { + wantErr := errors.New("my error") + calledStart := 0 + calledStop := 0 + comp := &mockComponent{ + StartFunc: func(_ context.Context, _ component.Host) error { + calledStart++ + return wantErr + }, + ShutdownFunc: func(_ context.Context) error { + calledStop++ + return wantErr + }, + } + createComp := func() component.Component { return comp } + + comps := NewSharedComponents() + got := comps.GetOrAdd(id, createComp) + assert.Equal(t, wantErr, got.Start(context.Background(), componenttest.NewNopHost())) + assert.Equal(t, 1, calledStart) + // Second time is not called anymore. + assert.NoError(t, got.Start(context.Background(), componenttest.NewNopHost())) + assert.Equal(t, 1, calledStart) + assert.Equal(t, wantErr, got.Shutdown(context.Background())) + assert.Equal(t, 1, calledStop) + // Second time is not called anymore. + assert.NoError(t, got.Shutdown(context.Background())) + assert.Equal(t, 1, calledStop) +} diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index 4df8b7764d..87e5623053 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -24,15 +24,17 @@ import ( "math/rand" "net/http" "os" + "strings" "time" "github.com/golang-collections/go-datastructures/queue" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/receiver" - semconv "go.opentelemetry.io/collector/semconv/v1.5.0" + semconv "go.opentelemetry.io/collector/semconv/v1.25.0" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-lambda/collector/internal/telemetryapi" @@ -40,12 +42,15 @@ import ( const defaultListenerPort = "4325" const initialQueueSize = 5 +const timeFormatLayout = "2006-01-02T15:04:05.000Z" +const scopeName = "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapi" type telemetryAPIReceiver struct { httpServer *http.Server logger *zap.Logger queue *queue.Queue // queue is a synchronous queue and is used to put the received log events to be dispatched later - nextConsumer consumer.Traces + nextTraces consumer.Traces + nextLogs consumer.Logs lastPlatformStartTime string lastPlatformEndTime string extensionID string @@ -64,7 +69,7 @@ func (r *telemetryAPIReceiver) Start(ctx context.Context, host component.Host) e }() telemetryClient := telemetryapi.NewClient(r.logger) - _, err := telemetryClient.Subscribe(ctx, r.extensionID, fmt.Sprintf("http://%s/", address)) + _, err := telemetryClient.Subscribe(ctx, []telemetryapi.EventType{telemetryapi.Platform, telemetryapi.Function}, r.extensionID, fmt.Sprintf("http://%s/", address)) if err != nil { r.logger.Info("Listening for requests", zap.String("address", address), zap.String("extensionID", r.extensionID)) return err @@ -147,12 +152,26 @@ func (r *telemetryAPIReceiver) httpHandler(w http.ResponseWriter, req *http.Requ } if len(r.lastPlatformStartTime) > 0 && len(r.lastPlatformEndTime) > 0 { if td, err := r.createPlatformInitSpan(r.lastPlatformStartTime, r.lastPlatformEndTime); err == nil { - err := r.nextConsumer.ConsumeTraces(context.Background(), td) - if err == nil { - r.lastPlatformEndTime = "" - r.lastPlatformStartTime = "" - } else { - r.logger.Error("error receiving traces", zap.Error(err)) + if r.nextTraces != nil { + err := r.nextTraces.ConsumeTraces(context.Background(), td) + if err == nil { + r.lastPlatformEndTime = "" + r.lastPlatformStartTime = "" + } else { + r.logger.Error("error receiving traces", zap.Error(err)) + } + } + } + } + + // Logs + if r.nextLogs != nil { + if logs, err := r.createLogs(slice); err == nil { + if logs.LogRecordCount() > 0 { + err := r.nextLogs.ConsumeLogs(context.Background(), logs) + if err != nil { + r.logger.Error("error receiving logs", zap.Error(err)) + } } } } @@ -161,26 +180,133 @@ func (r *telemetryAPIReceiver) httpHandler(w http.ResponseWriter, req *http.Requ slice = nil } +func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { + log := plog.NewLogs() + resourceLog := log.ResourceLogs().AppendEmpty() + r.resource.CopyTo(resourceLog.Resource()) + scopeLog := resourceLog.ScopeLogs().AppendEmpty() + scopeLog.Scope().SetName(scopeName) + for _, el := range slice { + r.logger.Debug(fmt.Sprintf("Event: %s", el.Type), zap.Any("event", el)) + logRecord := scopeLog.LogRecords().AppendEmpty() + logRecord.Attributes().PutStr("type", el.Type) + if t, err := time.Parse(timeFormatLayout, el.Time); err == nil { + logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t)) + logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now())) + } else { + r.logger.Error("error parsing time", zap.Error(err)) + return plog.Logs{}, err + } + if el.Type == string(telemetryapi.Function) || el.Type == string(telemetryapi.Extension) { + if record, ok := el.Record.(map[string]interface{}); ok { + // in JSON format https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function + if timestamp, ok := record["timestamp"].(string); ok { + if observedTime, err := time.Parse(timeFormatLayout, timestamp); err == nil { + logRecord.SetTimestamp(pcommon.NewTimestampFromTime(observedTime)) + } else { + r.logger.Error("error parsing time", zap.Error(err)) + return plog.Logs{}, err + } + } + if level, ok := record["level"].(string); ok { + level = strings.ToUpper(level) + logRecord.SetSeverityText(level) + switch level { + case "TRACE": + logRecord.SetSeverityNumber(1) + case "TRACE2": + logRecord.SetSeverityNumber(2) + case "TRACE3": + logRecord.SetSeverityNumber(3) + case "TRACE4": + logRecord.SetSeverityNumber(4) + case "DEBUG": + logRecord.SetSeverityNumber(5) + case "DEBUG2": + logRecord.SetSeverityNumber(6) + case "DEBUG3": + logRecord.SetSeverityNumber(7) + case "DEBUG4": + logRecord.SetSeverityNumber(8) + case "INFO": + logRecord.SetSeverityNumber(9) + case "INFO2": + logRecord.SetSeverityNumber(10) + case "INFO3": + logRecord.SetSeverityNumber(11) + case "INFO4": + logRecord.SetSeverityNumber(12) + case "WARN": + logRecord.SetSeverityNumber(13) + case "WARN2": + logRecord.SetSeverityNumber(14) + case "WARN3": + logRecord.SetSeverityNumber(15) + case "WARN4": + logRecord.SetSeverityNumber(16) + case "ERROR": + logRecord.SetSeverityNumber(17) + case "ERROR2": + logRecord.SetSeverityNumber(18) + case "ERROR3": + logRecord.SetSeverityNumber(19) + case "ERROR4": + logRecord.SetSeverityNumber(20) + case "FATAL": + logRecord.SetSeverityNumber(21) + case "FATAL2": + logRecord.SetSeverityNumber(22) + case "FATAL3": + logRecord.SetSeverityNumber(23) + case "FATAL4": + logRecord.SetSeverityNumber(24) + default: + } + } + if requestId, ok := record["requestId"].(string); ok { + logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, requestId) + } + if line, ok := record["message"].(string); ok { + logRecord.Body().SetStr(line) + } + } else { + // in plain text https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function + if line, ok := el.Record.(string); ok { + logRecord.Body().SetStr(line) + } + } + } + } + return log, nil +} + +func (r *telemetryAPIReceiver) registerTracesConsumer(next consumer.Traces) { + r.nextTraces = next +} + +func (r *telemetryAPIReceiver) registerLogsConsumer(next consumer.Logs) { + r.nextLogs = next +} + func (r *telemetryAPIReceiver) createPlatformInitSpan(start, end string) (ptrace.Traces, error) { traceData := ptrace.NewTraces() rs := traceData.ResourceSpans().AppendEmpty() r.resource.CopyTo(rs.Resource()) ss := rs.ScopeSpans().AppendEmpty() - ss.Scope().SetName("github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapi") + ss.Scope().SetName(scopeName) span := ss.Spans().AppendEmpty() span.SetTraceID(newTraceID()) span.SetSpanID(newSpanID()) span.SetName("platform.initRuntimeDone") span.SetKind(ptrace.SpanKindInternal) span.Attributes().PutBool(semconv.AttributeFaaSColdstart, true) - layout := "2006-01-02T15:04:05.000Z" - startTime, err := time.Parse(layout, start) + startTime, err := time.Parse(timeFormatLayout, start) if err != nil { return ptrace.Traces{}, err } span.SetStartTimestamp(pcommon.NewTimestampFromTime(startTime)) - endTime, err := time.Parse(layout, end) + endTime, err := time.Parse(timeFormatLayout, end) if err != nil { return ptrace.Traces{}, err } @@ -190,9 +316,8 @@ func (r *telemetryAPIReceiver) createPlatformInitSpan(start, end string) (ptrace func newTelemetryAPIReceiver( cfg *Config, - next consumer.Traces, set receiver.CreateSettings, -) (*telemetryAPIReceiver, error) { +) *telemetryAPIReceiver { envResourceMap := map[string]string{ "AWS_LAMBDA_FUNCTION_MEMORY_SIZE": semconv.AttributeFaaSMaxMemory, "AWS_LAMBDA_FUNCTION_VERSION": semconv.AttributeFaaSVersion, @@ -213,12 +338,11 @@ func newTelemetryAPIReceiver( } } return &telemetryAPIReceiver{ - logger: set.Logger, - queue: queue.New(initialQueueSize), - nextConsumer: next, - extensionID: cfg.extensionID, - resource: r, - }, nil + logger: set.Logger, + queue: queue.New(initialQueueSize), + extensionID: cfg.extensionID, + resource: r, + } } func listenOnAddress() string { diff --git a/collector/receiver/telemetryapireceiver/receiver_test.go b/collector/receiver/telemetryapireceiver/receiver_test.go index 7f7b46572f..c47ed95459 100644 --- a/collector/receiver/telemetryapireceiver/receiver_test.go +++ b/collector/receiver/telemetryapireceiver/receiver_test.go @@ -19,9 +19,12 @@ import ( "net/http/httptest" "strings" "testing" + "time" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/receiver/receivertest" ) @@ -99,12 +102,11 @@ func TestHandler(t *testing.T) { for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { consumer := mockConsumer{} - r, err := newTelemetryAPIReceiver( + r := newTelemetryAPIReceiver( &Config{}, - &consumer, receivertest.NewNopCreateSettings(), ) - require.NoError(t, err) + r.registerTracesConsumer(consumer) req := httptest.NewRequest("POST", "http://localhost:53612/someevent", strings.NewReader(tc.body)) rec := httptest.NewRecorder() @@ -146,9 +148,8 @@ func TestCreatePlatformInitSpan(t *testing.T) { } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - r, err := newTelemetryAPIReceiver( + r := newTelemetryAPIReceiver( &Config{}, - nil, receivertest.NewNopCreateSettings(), ) require.NoError(t, err) @@ -161,3 +162,156 @@ func TestCreatePlatformInitSpan(t *testing.T) { }) } } + +func TestCreateLogs(t *testing.T) { + testCases := []struct { + desc string + slice []event + expectedLogRecords int + expectedType string + expectedTimestamp string + expectedBody string + expectedSeverityText string + expectedContainsRequestId bool + expectedRequestId string + expectedSeverityNumber plog.SeverityNumber + expectError bool + }{ + { + desc: "no slice", + expectedLogRecords: 0, + expectError: false, + }, + { + desc: "Invalid Timestamp", + slice: []event{ + { + Time: "invalid", + Type: "function", + Record: "[INFO] Hello world, I am an extension!", + }, + }, + expectError: true, + }, + { + desc: "function text", + slice: []event{ + { + Time: "2022-10-12T00:03:50.000Z", + Type: "function", + Record: "[INFO] Hello world, I am an extension!", + }, + }, + expectedLogRecords: 1, + expectedType: "function", + expectedTimestamp: "2022-10-12T00:03:50.000Z", + expectedBody: "[INFO] Hello world, I am an extension!", + expectedContainsRequestId: false, + expectedSeverityText: "", + expectedSeverityNumber: plog.SeverityNumberUnspecified, + expectError: false, + }, + { + desc: "function json", + slice: []event{ + { + Time: "2022-10-12T00:03:50.000Z", + Type: "function", + Record: map[string]any{ + "timestamp": "2022-10-12T00:03:50.000Z", + "level": "INFO", + "requestId": "79b4f56e-95b1-4643-9700-2807f4e68189", + "message": "Hello world, I am a function!", + }, + }, + }, + expectedLogRecords: 1, + expectedType: "function", + expectedTimestamp: "2022-10-12T00:03:50.000Z", + expectedBody: "Hello world, I am a function!", + expectedContainsRequestId: true, + expectedRequestId: "79b4f56e-95b1-4643-9700-2807f4e68189", + expectedSeverityText: "INFO", + expectedSeverityNumber: plog.SeverityNumberInfo, + expectError: false, + }, + { + desc: "extension text", + slice: []event{ + { + Time: "2022-10-12T00:03:50.000Z", + Type: "extension", + Record: "[INFO] Hello world, I am an extension!", + }, + }, + expectedLogRecords: 1, + expectedType: "extension", + expectedTimestamp: "2022-10-12T00:03:50.000Z", + expectedBody: "[INFO] Hello world, I am an extension!", + expectedContainsRequestId: false, + expectedSeverityText: "", + expectedSeverityNumber: plog.SeverityNumberUnspecified, + expectError: false, + }, + { + desc: "extension json", + slice: []event{ + { + Time: "2022-10-12T00:03:50.000Z", + Type: "extension", + Record: map[string]any{ + "timestamp": "2022-10-12T00:03:50.000Z", + "level": "INFO", + "requestId": "79b4f56e-95b1-4643-9700-2807f4e68689", + "message": "Hello world, I am an extension!", + }, + }, + }, + expectedLogRecords: 1, + expectedType: "extension", + expectedTimestamp: "2022-10-12T00:03:50.000Z", + expectedBody: "Hello world, I am an extension!", + expectedContainsRequestId: true, + expectedRequestId: "79b4f56e-95b1-4643-9700-2807f4e68689", + expectedSeverityText: "INFO", + expectedSeverityNumber: plog.SeverityNumberInfo, + expectError: false, + }, + } + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + r := newTelemetryAPIReceiver( + &Config{}, + receivertest.NewNopCreateSettings(), + ) + log, err := r.createLogs(tc.slice) + if tc.expectError { + require.Error(t, err) + } else { + require.Equal(t, 1, log.ResourceLogs().Len()) + resourceLog := log.ResourceLogs().At(0) + require.Equal(t, 1, resourceLog.ScopeLogs().Len()) + scopeLog := resourceLog.ScopeLogs().At(0) + require.Equal(t, scopeName, scopeLog.Scope().Name()) + require.Equal(t, tc.expectedLogRecords, scopeLog.LogRecords().Len()) + if scopeLog.LogRecords().Len() > 0 { + logRecord := scopeLog.LogRecords().At(0) + attr, ok := logRecord.Attributes().Get("type") + require.True(t, ok) + require.Equal(t, tc.expectedType, attr.Str()) + expectedTime, err := time.Parse(timeFormatLayout, tc.expectedTimestamp) + require.NoError(t, err) + require.Equal(t, pcommon.NewTimestampFromTime(expectedTime), logRecord.Timestamp()) + requestId, ok := logRecord.Attributes().Get(semconv.AttributeFaaSInvocationID) + require.Equal(t, tc.expectedContainsRequestId, ok) + if ok { + require.Equal(t, tc.expectedRequestId, requestId.Str()) + } + require.Equal(t, tc.expectedSeverityText, logRecord.SeverityText()) + require.Equal(t, tc.expectedSeverityNumber, logRecord.SeverityNumber()) + require.Equal(t, tc.expectedBody, logRecord.Body().Str()) + } + } + }) + } +} diff --git a/collector/receiver/telemetryapireceiver/types.go b/collector/receiver/telemetryapireceiver/types.go index 40bbc6ff94..fdcb4e3f28 100644 --- a/collector/receiver/telemetryapireceiver/types.go +++ b/collector/receiver/telemetryapireceiver/types.go @@ -15,7 +15,7 @@ package telemetryapireceiver // import "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver" type event struct { - Time string `json:"time"` - Type string `json:"type"` - Record map[string]any `json:"record"` + Time string `json:"time"` + Type string `json:"type"` + Record any `json:"record"` } From 63558a84ad65c52701a5c7af8b45c5d58da542df Mon Sep 17 00:00:00 2001 From: Jerry Leung Date: Fri, 24 May 2024 17:47:47 -0700 Subject: [PATCH 02/16] nits --- collector/receiver/telemetryapireceiver/receiver.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index 87e5623053..00b9cb050b 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -201,8 +201,8 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { if record, ok := el.Record.(map[string]interface{}); ok { // in JSON format https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function if timestamp, ok := record["timestamp"].(string); ok { - if observedTime, err := time.Parse(timeFormatLayout, timestamp); err == nil { - logRecord.SetTimestamp(pcommon.NewTimestampFromTime(observedTime)) + if t, err := time.Parse(timeFormatLayout, timestamp); err == nil { + logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t)) } else { r.logger.Error("error parsing time", zap.Error(err)) return plog.Logs{}, err From 1ae4fddcc9f5fe1a15af9f88f9fd95ae86d9de2b Mon Sep 17 00:00:00 2001 From: Jerry Leung Date: Fri, 24 May 2024 19:26:15 -0700 Subject: [PATCH 03/16] Added extensions --- collector/receiver/telemetryapireceiver/receiver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index 00b9cb050b..4b8d3a97c2 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -69,7 +69,7 @@ func (r *telemetryAPIReceiver) Start(ctx context.Context, host component.Host) e }() telemetryClient := telemetryapi.NewClient(r.logger) - _, err := telemetryClient.Subscribe(ctx, []telemetryapi.EventType{telemetryapi.Platform, telemetryapi.Function}, r.extensionID, fmt.Sprintf("http://%s/", address)) + _, err := telemetryClient.Subscribe(ctx, []telemetryapi.EventType{telemetryapi.Platform, telemetryapi.Function, telemetryapi.Extension}, r.extensionID, fmt.Sprintf("http://%s/", address)) if err != nil { r.logger.Info("Listening for requests", zap.String("address", address), zap.String("extensionID", r.extensionID)) return err From 0f74cbe0934b4bf1b42a6c59a85d9b97728ddf11 Mon Sep 17 00:00:00 2001 From: Jerry Leung Date: Mon, 27 May 2024 17:54:18 -0700 Subject: [PATCH 04/16] Fixed unit tests --- collector/receiver/telemetryapireceiver/receiver.go | 2 +- .../receiver/telemetryapireceiver/receiver_test.go | 12 ++++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index 4b8d3a97c2..2d791ddba3 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -210,7 +210,6 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { } if level, ok := record["level"].(string); ok { level = strings.ToUpper(level) - logRecord.SetSeverityText(level) switch level { case "TRACE": logRecord.SetSeverityNumber(1) @@ -262,6 +261,7 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { logRecord.SetSeverityNumber(24) default: } + logRecord.SetSeverityText(logRecord.SeverityNumber().String()) } if requestId, ok := record["requestId"].(string); ok { logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, requestId) diff --git a/collector/receiver/telemetryapireceiver/receiver_test.go b/collector/receiver/telemetryapireceiver/receiver_test.go index c47ed95459..6fb953565c 100644 --- a/collector/receiver/telemetryapireceiver/receiver_test.go +++ b/collector/receiver/telemetryapireceiver/receiver_test.go @@ -27,6 +27,7 @@ import ( "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/receiver/receivertest" + semconv "go.opentelemetry.io/collector/semconv/v1.25.0" ) func TestListenOnAddress(t *testing.T) { @@ -64,6 +65,10 @@ func (c *mockConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) erro return nil } +func (c *mockConsumer) ConsumeLogs(ctx context.Context, td plog.Logs) error { + return nil +} + func (c *mockConsumer) Capabilities() consumer.Capabilities { return consumer.Capabilities{MutatesData: true} } @@ -106,7 +111,7 @@ func TestHandler(t *testing.T) { &Config{}, receivertest.NewNopCreateSettings(), ) - r.registerTracesConsumer(consumer) + r.registerTracesConsumer(&consumer) req := httptest.NewRequest("POST", "http://localhost:53612/someevent", strings.NewReader(tc.body)) rec := httptest.NewRecorder() @@ -152,7 +157,6 @@ func TestCreatePlatformInitSpan(t *testing.T) { &Config{}, receivertest.NewNopCreateSettings(), ) - require.NoError(t, err) td, err := r.createPlatformInitSpan(tc.start, tc.end) if tc.expectError { require.Error(t, err) @@ -231,7 +235,7 @@ func TestCreateLogs(t *testing.T) { expectedBody: "Hello world, I am a function!", expectedContainsRequestId: true, expectedRequestId: "79b4f56e-95b1-4643-9700-2807f4e68189", - expectedSeverityText: "INFO", + expectedSeverityText: "Info", expectedSeverityNumber: plog.SeverityNumberInfo, expectError: false, }, @@ -273,7 +277,7 @@ func TestCreateLogs(t *testing.T) { expectedBody: "Hello world, I am an extension!", expectedContainsRequestId: true, expectedRequestId: "79b4f56e-95b1-4643-9700-2807f4e68689", - expectedSeverityText: "INFO", + expectedSeverityText: "Info", expectedSeverityNumber: plog.SeverityNumberInfo, expectError: false, }, From 0c5429261c72340f85f5744c31ee1986e0a2ac09 Mon Sep 17 00:00:00 2001 From: Jerry Leung Date: Mon, 27 May 2024 17:56:05 -0700 Subject: [PATCH 05/16] Added unit test cases --- .../telemetryapireceiver/receiver_test.go | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/collector/receiver/telemetryapireceiver/receiver_test.go b/collector/receiver/telemetryapireceiver/receiver_test.go index 6fb953565c..d43d17dfa9 100644 --- a/collector/receiver/telemetryapireceiver/receiver_test.go +++ b/collector/receiver/telemetryapireceiver/receiver_test.go @@ -281,6 +281,30 @@ func TestCreateLogs(t *testing.T) { expectedSeverityNumber: plog.SeverityNumberInfo, expectError: false, }, + { + desc: "extension json anything", + slice: []event{ + { + Time: "2022-10-12T00:03:50.000Z", + Type: "extension", + Record: map[string]any{ + "timestamp": "2022-10-12T00:03:50.000Z", + "level": "anything", + "requestId": "79b4f56e-95b1-4643-9700-2807f4e68689", + "message": "Hello world, I am an extension!", + }, + }, + }, + expectedLogRecords: 1, + expectedType: "extension", + expectedTimestamp: "2022-10-12T00:03:50.000Z", + expectedBody: "Hello world, I am an extension!", + expectedContainsRequestId: true, + expectedRequestId: "79b4f56e-95b1-4643-9700-2807f4e68689", + expectedSeverityText: "Unspecified", + expectedSeverityNumber: plog.SeverityNumberUnspecified, + expectError: false, + }, } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { From dd2e317eb5ad6c391b6a06bd20889266bab5eff1 Mon Sep 17 00:00:00 2001 From: Jerry Leung Date: Thu, 20 Jun 2024 14:18:44 -0700 Subject: [PATCH 06/16] Added config (#26) --- .../receiver/telemetryapireceiver/README.md | 16 ++- .../receiver/telemetryapireceiver/config.go | 11 ++ .../telemetryapireceiver/config_test.go | 128 ++++++++++++++++++ .../receiver/telemetryapireceiver/factory.go | 10 +- .../telemetryapireceiver/factory_test.go | 2 +- .../receiver/telemetryapireceiver/receiver.go | 29 +++- .../telemetryapireceiver/receiver_test.go | 4 +- .../telemetryapireceiver/testdata/config.yaml | 34 +++++ 8 files changed, 222 insertions(+), 12 deletions(-) create mode 100644 collector/receiver/telemetryapireceiver/testdata/config.yaml diff --git a/collector/receiver/telemetryapireceiver/README.md b/collector/receiver/telemetryapireceiver/README.md index e51b420f2d..9a94a96244 100644 --- a/collector/receiver/telemetryapireceiver/README.md +++ b/collector/receiver/telemetryapireceiver/README.md @@ -15,11 +15,25 @@ Supported events: ## Configuration -There are currently no configuration parameters available for this receiver. It can be enabled via the following configuration: +| Field | Default | Description | +|---------|---------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------| +| `port` | 4235 | HTTP server port to receive Telemetry API data. | +| `types` | ["platform", "function", "extension"] | [Types](https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api-reference.html#telemetry-subscribe-api) of telemetry to subscribe to | + ```yaml receivers: telemetryapi: + telemetryapi/1: + port: 4326 + telemetryapi/2: + port: 4326 + types: + - platform + - function + telemetryapi/3: + port: 4326 + types: ["platform", "function"] ``` [alpha]: https://github.com/open-telemetry/opentelemetry-collector#alpha diff --git a/collector/receiver/telemetryapireceiver/config.go b/collector/receiver/telemetryapireceiver/config.go index 86b5250196..b51ef1ed57 100644 --- a/collector/receiver/telemetryapireceiver/config.go +++ b/collector/receiver/telemetryapireceiver/config.go @@ -14,12 +14,23 @@ package telemetryapireceiver // import "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver" +import ( + "fmt" +) + // Config defines the configuration for the various elements of the receiver agent. type Config struct { extensionID string + Port int `mapstructure:"port"` + Types []string `mapstructure:"types"` } // Validate validates the configuration by checking for missing or invalid fields func (cfg *Config) Validate() error { + for _, t := range cfg.Types { + if t != platform && t != function && t != extension { + return fmt.Errorf("unknown extension type: %s", t) + } + } return nil } diff --git a/collector/receiver/telemetryapireceiver/config_test.go b/collector/receiver/telemetryapireceiver/config_test.go index 7f3969dd7c..fbee9ed924 100644 --- a/collector/receiver/telemetryapireceiver/config_test.go +++ b/collector/receiver/telemetryapireceiver/config_test.go @@ -15,12 +15,133 @@ package telemetryapireceiver // import "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver" import ( + "fmt" + "path/filepath" "testing" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap/confmaptest" ) +func TestLoadConfig(t *testing.T) { + t.Parallel() + + tests := []struct { + id component.ID + expected component.Config + }{ + { + id: component.NewID(component.MustNewType("telemetryapi")), + expected: NewFactory("extensionID").CreateDefaultConfig(), + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "1"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{platform, function, extension}, + }, + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "2"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{platform, function, extension}, + }, + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "3"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{platform}, + }, + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "4"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{function}, + }, + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "5"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{extension}, + }, + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "6"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{platform, function}, + }, + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "7"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{platform, extension}, + }, + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "8"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{function, extension}, + }, + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "9"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{}, + }, + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "10"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{function, extension}, + }, + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "11"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{function, extension}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.id.String(), func(t *testing.T) { + cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) + require.NoError(t, err) + factory := NewFactory("extensionID") + cfg := factory.CreateDefaultConfig() + sub, err := cm.Sub(tt.id.String()) + require.NoError(t, err) + require.NoError(t, sub.Unmarshal(cfg)) + require.NoError(t, component.ValidateConfig(cfg)) + require.Equal(t, tt.expected, cfg) + }) + } +} + func TestValidate(t *testing.T) { + t.Parallel() + testCases := []struct { desc string cfg *Config @@ -31,6 +152,13 @@ func TestValidate(t *testing.T) { cfg: &Config{}, expectedErr: nil, }, + { + desc: "invalid config", + cfg: &Config{ + Types: []string{"invalid"}, + }, + expectedErr: fmt.Errorf("unknown extension type: invalid"), + }, } for _, tc := range testCases { diff --git a/collector/receiver/telemetryapireceiver/factory.go b/collector/receiver/telemetryapireceiver/factory.go index 63c8b2ce23..ced897d9f9 100644 --- a/collector/receiver/telemetryapireceiver/factory.go +++ b/collector/receiver/telemetryapireceiver/factory.go @@ -25,8 +25,12 @@ import ( ) const ( - typeStr = "telemetryapi" - stability = component.StabilityLevelDevelopment + typeStr = "telemetryapi" + stability = component.StabilityLevelDevelopment + defaultPort = 4325 + platform = "platform" + function = "function" + extension = "extension" ) var errConfigNotTelemetryAPI = errors.New("config was not a Telemetry API receiver config") @@ -38,6 +42,8 @@ func NewFactory(extensionID string) receiver.Factory { func() component.Config { return &Config{ extensionID: extensionID, + Port: defaultPort, + Types: []string{platform, function, extension}, } }, receiver.WithTraces(createTracesReceiver, stability), diff --git a/collector/receiver/telemetryapireceiver/factory_test.go b/collector/receiver/telemetryapireceiver/factory_test.go index 97961ea437..e0cfcd25eb 100644 --- a/collector/receiver/telemetryapireceiver/factory_test.go +++ b/collector/receiver/telemetryapireceiver/factory_test.go @@ -41,7 +41,7 @@ func TestNewFactory(t *testing.T) { testFunc: func(t *testing.T) { factory := NewFactory("test") - var expectedCfg component.Config = &Config{extensionID: "test"} + var expectedCfg component.Config = &Config{extensionID: "test", Port: defaultPort, Types: []string{platform, function, extension}} require.Equal(t, expectedCfg, factory.CreateDefaultConfig()) }, diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index 2d791ddba3..237a5b2922 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -24,6 +24,7 @@ import ( "math/rand" "net/http" "os" + "strconv" "strings" "time" @@ -40,7 +41,6 @@ import ( "github.com/open-telemetry/opentelemetry-lambda/collector/internal/telemetryapi" ) -const defaultListenerPort = "4325" const initialQueueSize = 5 const timeFormatLayout = "2006-01-02T15:04:05.000Z" const scopeName = "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapi" @@ -54,11 +54,13 @@ type telemetryAPIReceiver struct { lastPlatformStartTime string lastPlatformEndTime string extensionID string + port int + types []telemetryapi.EventType resource pcommon.Resource } func (r *telemetryAPIReceiver) Start(ctx context.Context, host component.Host) error { - address := listenOnAddress() + address := listenOnAddress(r.port) r.logger.Info("Listening for requests", zap.String("address", address)) mux := http.NewServeMux() @@ -69,7 +71,7 @@ func (r *telemetryAPIReceiver) Start(ctx context.Context, host component.Host) e }() telemetryClient := telemetryapi.NewClient(r.logger) - _, err := telemetryClient.Subscribe(ctx, []telemetryapi.EventType{telemetryapi.Platform, telemetryapi.Function, telemetryapi.Extension}, r.extensionID, fmt.Sprintf("http://%s/", address)) + _, err := telemetryClient.Subscribe(ctx, r.types, r.extensionID, fmt.Sprintf("http://%s/", address)) if err != nil { r.logger.Info("Listening for requests", zap.String("address", address), zap.String("extensionID", r.extensionID)) return err @@ -337,21 +339,36 @@ func newTelemetryAPIReceiver( r.Attributes().PutStr(resourceAttribute, val) } } + + subscribedTypes := []telemetryapi.EventType{} + for _, val := range cfg.Types { + switch val { + case "platform": + subscribedTypes = append(subscribedTypes, telemetryapi.Platform) + case "function": + subscribedTypes = append(subscribedTypes, telemetryapi.Function) + case "extension": + subscribedTypes = append(subscribedTypes, telemetryapi.Extension) + } + } + return &telemetryAPIReceiver{ logger: set.Logger, queue: queue.New(initialQueueSize), extensionID: cfg.extensionID, + port: cfg.Port, + types: subscribedTypes, resource: r, } } -func listenOnAddress() string { +func listenOnAddress(port int) string { envAwsLocal, ok := os.LookupEnv("AWS_SAM_LOCAL") var addr string if ok && envAwsLocal == "true" { - addr = ":" + defaultListenerPort + addr = ":" + strconv.Itoa(port) } else { - addr = "sandbox.localdomain:" + defaultListenerPort + addr = "sandbox.localdomain:" + strconv.Itoa(port) } return addr diff --git a/collector/receiver/telemetryapireceiver/receiver_test.go b/collector/receiver/telemetryapireceiver/receiver_test.go index d43d17dfa9..fd1bb5e2a3 100644 --- a/collector/receiver/telemetryapireceiver/receiver_test.go +++ b/collector/receiver/telemetryapireceiver/receiver_test.go @@ -38,7 +38,7 @@ func TestListenOnAddress(t *testing.T) { { desc: "listen on address without AWS_SAM_LOCAL env variable", testFunc: func(t *testing.T) { - addr := listenOnAddress() + addr := listenOnAddress(4325) require.EqualValues(t, "sandbox.localdomain:4325", addr) }, }, @@ -46,7 +46,7 @@ func TestListenOnAddress(t *testing.T) { desc: "listen on address with AWS_SAM_LOCAL env variable", testFunc: func(t *testing.T) { t.Setenv("AWS_SAM_LOCAL", "true") - addr := listenOnAddress() + addr := listenOnAddress(4325) require.EqualValues(t, ":4325", addr) }, }, diff --git a/collector/receiver/telemetryapireceiver/testdata/config.yaml b/collector/receiver/telemetryapireceiver/testdata/config.yaml new file mode 100644 index 0000000000..b0935f779c --- /dev/null +++ b/collector/receiver/telemetryapireceiver/testdata/config.yaml @@ -0,0 +1,34 @@ +telemetryapi: +telemetryapi/1: + port: 12345 +telemetryapi/2: + port: "12345" +telemetryapi/3: + port: 12345 + types: ["platform"] +telemetryapi/4: + port: 12345 + types: ["function"] +telemetryapi/5: + port: 12345 + types: ["extension"] +telemetryapi/6: + port: 12345 + types: ["platform", "function"] +telemetryapi/7: + port: 12345 + types: ["platform", "extension"] +telemetryapi/8: + port: 12345 + types: ["function", "extension"] +telemetryapi/9: + port: 12345 + types: [] +telemetryapi/10: + port: 12345 + types: + - function + - extension +telemetryapi/11: + port: 12345 + types: [function, extension] From 4ff5876659f5234562bb8ec10d4d2e43cb891001 Mon Sep 17 00:00:00 2001 From: Jerry Leung Date: Thu, 20 Jun 2024 16:26:51 -0700 Subject: [PATCH 07/16] Added severityTextToNumber function --- .../telemetryapireceiver/config_test.go | 2 - .../receiver/telemetryapireceiver/receiver.go | 87 ++++++------- .../telemetryapireceiver/receiver_test.go | 114 ++++++++++++++++++ 3 files changed, 149 insertions(+), 54 deletions(-) diff --git a/collector/receiver/telemetryapireceiver/config_test.go b/collector/receiver/telemetryapireceiver/config_test.go index fbee9ed924..f6500bebde 100644 --- a/collector/receiver/telemetryapireceiver/config_test.go +++ b/collector/receiver/telemetryapireceiver/config_test.go @@ -140,8 +140,6 @@ func TestLoadConfig(t *testing.T) { } func TestValidate(t *testing.T) { - t.Parallel() - testCases := []struct { desc string cfg *Config diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index 237a5b2922..b1c9eca5b9 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -211,58 +211,7 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { } } if level, ok := record["level"].(string); ok { - level = strings.ToUpper(level) - switch level { - case "TRACE": - logRecord.SetSeverityNumber(1) - case "TRACE2": - logRecord.SetSeverityNumber(2) - case "TRACE3": - logRecord.SetSeverityNumber(3) - case "TRACE4": - logRecord.SetSeverityNumber(4) - case "DEBUG": - logRecord.SetSeverityNumber(5) - case "DEBUG2": - logRecord.SetSeverityNumber(6) - case "DEBUG3": - logRecord.SetSeverityNumber(7) - case "DEBUG4": - logRecord.SetSeverityNumber(8) - case "INFO": - logRecord.SetSeverityNumber(9) - case "INFO2": - logRecord.SetSeverityNumber(10) - case "INFO3": - logRecord.SetSeverityNumber(11) - case "INFO4": - logRecord.SetSeverityNumber(12) - case "WARN": - logRecord.SetSeverityNumber(13) - case "WARN2": - logRecord.SetSeverityNumber(14) - case "WARN3": - logRecord.SetSeverityNumber(15) - case "WARN4": - logRecord.SetSeverityNumber(16) - case "ERROR": - logRecord.SetSeverityNumber(17) - case "ERROR2": - logRecord.SetSeverityNumber(18) - case "ERROR3": - logRecord.SetSeverityNumber(19) - case "ERROR4": - logRecord.SetSeverityNumber(20) - case "FATAL": - logRecord.SetSeverityNumber(21) - case "FATAL2": - logRecord.SetSeverityNumber(22) - case "FATAL3": - logRecord.SetSeverityNumber(23) - case "FATAL4": - logRecord.SetSeverityNumber(24) - default: - } + logRecord.SetSeverityNumber(severityTextToNumber(level)) logRecord.SetSeverityText(logRecord.SeverityNumber().String()) } if requestId, ok := record["requestId"].(string); ok { @@ -282,6 +231,40 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { return log, nil } +func severityTextToNumber(severityText string) plog.SeverityNumber { + mapping := map[string]plog.SeverityNumber{ + "TRACE": plog.SeverityNumberTrace, + "TRACE2": plog.SeverityNumberTrace2, + "TRACE3": plog.SeverityNumberTrace3, + "TRACE4": plog.SeverityNumberTrace4, + "DEBUG": plog.SeverityNumberDebug, + "DEBUG2": plog.SeverityNumberDebug2, + "DEBUG3": plog.SeverityNumberDebug3, + "DEBUG4": plog.SeverityNumberDebug4, + "INFO": plog.SeverityNumberInfo, + "INFO2": plog.SeverityNumberInfo2, + "INFO3": plog.SeverityNumberInfo3, + "INFO4": plog.SeverityNumberInfo4, + "WARN": plog.SeverityNumberWarn, + "WARN2": plog.SeverityNumberWarn2, + "WARN3": plog.SeverityNumberWarn3, + "WARN4": plog.SeverityNumberWarn4, + "ERROR": plog.SeverityNumberError, + "ERROR2": plog.SeverityNumberError2, + "ERROR3": plog.SeverityNumberError3, + "ERROR4": plog.SeverityNumberError4, + "FATAL": plog.SeverityNumberFatal, + "FATAL2": plog.SeverityNumberFatal2, + "FATAL3": plog.SeverityNumberFatal3, + "FATAL4": plog.SeverityNumberFatal4, + } + if ans, ok := mapping[strings.ToUpper(severityText)]; ok { + return ans + } else { + return plog.SeverityNumberUnspecified + } +} + func (r *telemetryAPIReceiver) registerTracesConsumer(next consumer.Traces) { r.nextTraces = next } diff --git a/collector/receiver/telemetryapireceiver/receiver_test.go b/collector/receiver/telemetryapireceiver/receiver_test.go index fd1bb5e2a3..e6ffe4571c 100644 --- a/collector/receiver/telemetryapireceiver/receiver_test.go +++ b/collector/receiver/telemetryapireceiver/receiver_test.go @@ -74,6 +74,8 @@ func (c *mockConsumer) Capabilities() consumer.Capabilities { } func TestHandler(t *testing.T) { + t.Parallel() + testCases := []struct { desc string body string @@ -168,6 +170,8 @@ func TestCreatePlatformInitSpan(t *testing.T) { } func TestCreateLogs(t *testing.T) { + t.Parallel() + testCases := []struct { desc string slice []event @@ -343,3 +347,113 @@ func TestCreateLogs(t *testing.T) { }) } } + +func TestSeverityTextToNumber(t *testing.T) { + t.Parallel() + + testCases := []struct { + level string + number plog.SeverityNumber + }{ + { + level: "TRACE", + number: plog.SeverityNumberTrace, + }, + { + level: "TRACE2", + number: plog.SeverityNumberTrace2, + }, + { + level: "TRACE3", + number: plog.SeverityNumberTrace3, + }, + { + level: "TRACE4", + number: plog.SeverityNumberTrace4, + }, + { + level: "DEBUG2", + number: plog.SeverityNumberDebug2, + }, + { + level: "DEBUG3", + number: plog.SeverityNumberDebug3, + }, + { + level: "DEBUG4", + number: plog.SeverityNumberDebug4, + }, + { + level: "INFO", + number: plog.SeverityNumberInfo, + }, + { + level: "INFO2", + number: plog.SeverityNumberInfo2, + }, + { + level: "INFO3", + number: plog.SeverityNumberInfo3, + }, + { + level: "INFO4", + number: plog.SeverityNumberInfo4, + }, + { + level: "WARN", + number: plog.SeverityNumberWarn, + }, + { + level: "WARN2", + number: plog.SeverityNumberWarn2, + }, + { + level: "WARN3", + number: plog.SeverityNumberWarn3, + }, + { + level: "WARN4", + number: plog.SeverityNumberWarn4, + }, + { + level: "ERROR", + number: plog.SeverityNumberError, + }, + { + level: "ERROR2", + number: plog.SeverityNumberError2, + }, + { + level: "ERROR3", + number: plog.SeverityNumberError3, + }, + { + level: "ERROR4", + number: plog.SeverityNumberError4, + }, + { + level: "FATAL", + number: plog.SeverityNumberFatal, + }, + { + level: "FATAL2", + number: plog.SeverityNumberFatal2, + }, + { + level: "FATAL3", + number: plog.SeverityNumberFatal3, + }, + { + level: "FATAL4", + number: plog.SeverityNumberFatal4, + }, + { + level: "UNKNOWN", + number: plog.SeverityNumberUnspecified, + }, + } + for _, tc := range testCases { + require.Equal(t, tc.number, severityTextToNumber(tc.level)) + + } +} From 730cfc92f33b3f3f55eec8f4dcbf94028aeabd16 Mon Sep 17 00:00:00 2001 From: Jerry Leung Date: Thu, 27 Jun 2024 15:29:09 -0700 Subject: [PATCH 08/16] Corrected README.md --- collector/receiver/telemetryapireceiver/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/collector/receiver/telemetryapireceiver/README.md b/collector/receiver/telemetryapireceiver/README.md index 9a94a96244..ebd7b9b4c0 100644 --- a/collector/receiver/telemetryapireceiver/README.md +++ b/collector/receiver/telemetryapireceiver/README.md @@ -17,7 +17,7 @@ Supported events: | Field | Default | Description | |---------|---------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------| -| `port` | 4235 | HTTP server port to receive Telemetry API data. | +| `port` | 4325 | HTTP server port to receive Telemetry API data. | | `types` | ["platform", "function", "extension"] | [Types](https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api-reference.html#telemetry-subscribe-api) of telemetry to subscribe to | From 999a7cea6db1af6a25919f7d6fef5a81b15173af Mon Sep 17 00:00:00 2001 From: Jerry Leung Date: Thu, 27 Jun 2024 16:08:48 -0700 Subject: [PATCH 09/16] Handled empty types array --- collector/receiver/telemetryapireceiver/receiver.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index b1c9eca5b9..c09f77798d 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -71,10 +71,12 @@ func (r *telemetryAPIReceiver) Start(ctx context.Context, host component.Host) e }() telemetryClient := telemetryapi.NewClient(r.logger) - _, err := telemetryClient.Subscribe(ctx, r.types, r.extensionID, fmt.Sprintf("http://%s/", address)) - if err != nil { - r.logger.Info("Listening for requests", zap.String("address", address), zap.String("extensionID", r.extensionID)) - return err + if len(r.types) > 0 { + _, err := telemetryClient.Subscribe(ctx, r.types, r.extensionID, fmt.Sprintf("http://%s/", address)) + if err != nil { + r.logger.Info("Listening for requests", zap.String("address", address), zap.String("extensionID", r.extensionID)) + return err + } } return nil } From 1190cb5f015506f9a9bdd75a4a5a9c76339ad2de Mon Sep 17 00:00:00 2001 From: Jerry Leung Date: Thu, 27 Jun 2024 16:13:40 -0700 Subject: [PATCH 10/16] Added CRITICAL & ALL --- .../receiver/telemetryapireceiver/receiver.go | 50 ++++++++++--------- .../telemetryapireceiver/receiver_test.go | 8 +++ 2 files changed, 34 insertions(+), 24 deletions(-) diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index c09f77798d..9bc30b3393 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -235,30 +235,32 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { func severityTextToNumber(severityText string) plog.SeverityNumber { mapping := map[string]plog.SeverityNumber{ - "TRACE": plog.SeverityNumberTrace, - "TRACE2": plog.SeverityNumberTrace2, - "TRACE3": plog.SeverityNumberTrace3, - "TRACE4": plog.SeverityNumberTrace4, - "DEBUG": plog.SeverityNumberDebug, - "DEBUG2": plog.SeverityNumberDebug2, - "DEBUG3": plog.SeverityNumberDebug3, - "DEBUG4": plog.SeverityNumberDebug4, - "INFO": plog.SeverityNumberInfo, - "INFO2": plog.SeverityNumberInfo2, - "INFO3": plog.SeverityNumberInfo3, - "INFO4": plog.SeverityNumberInfo4, - "WARN": plog.SeverityNumberWarn, - "WARN2": plog.SeverityNumberWarn2, - "WARN3": plog.SeverityNumberWarn3, - "WARN4": plog.SeverityNumberWarn4, - "ERROR": plog.SeverityNumberError, - "ERROR2": plog.SeverityNumberError2, - "ERROR3": plog.SeverityNumberError3, - "ERROR4": plog.SeverityNumberError4, - "FATAL": plog.SeverityNumberFatal, - "FATAL2": plog.SeverityNumberFatal2, - "FATAL3": plog.SeverityNumberFatal3, - "FATAL4": plog.SeverityNumberFatal4, + "TRACE": plog.SeverityNumberTrace, + "TRACE2": plog.SeverityNumberTrace2, + "TRACE3": plog.SeverityNumberTrace3, + "TRACE4": plog.SeverityNumberTrace4, + "DEBUG": plog.SeverityNumberDebug, + "DEBUG2": plog.SeverityNumberDebug2, + "DEBUG3": plog.SeverityNumberDebug3, + "DEBUG4": plog.SeverityNumberDebug4, + "INFO": plog.SeverityNumberInfo, + "INFO2": plog.SeverityNumberInfo2, + "INFO3": plog.SeverityNumberInfo3, + "INFO4": plog.SeverityNumberInfo4, + "WARN": plog.SeverityNumberWarn, + "WARN2": plog.SeverityNumberWarn2, + "WARN3": plog.SeverityNumberWarn3, + "WARN4": plog.SeverityNumberWarn4, + "ERROR": plog.SeverityNumberError, + "ERROR2": plog.SeverityNumberError2, + "ERROR3": plog.SeverityNumberError3, + "ERROR4": plog.SeverityNumberError4, + "FATAL": plog.SeverityNumberFatal, + "FATAL2": plog.SeverityNumberFatal2, + "FATAL3": plog.SeverityNumberFatal3, + "FATAL4": plog.SeverityNumberFatal4, + "CRITICAL": plog.SeverityNumberFatal, + "ALL": plog.SeverityNumberTrace, } if ans, ok := mapping[strings.ToUpper(severityText)]; ok { return ans diff --git a/collector/receiver/telemetryapireceiver/receiver_test.go b/collector/receiver/telemetryapireceiver/receiver_test.go index e6ffe4571c..6c4df40b68 100644 --- a/collector/receiver/telemetryapireceiver/receiver_test.go +++ b/collector/receiver/telemetryapireceiver/receiver_test.go @@ -447,6 +447,14 @@ func TestSeverityTextToNumber(t *testing.T) { level: "FATAL4", number: plog.SeverityNumberFatal4, }, + { + level: "CRITICAL", + number: plog.SeverityNumberFatal, + }, + { + level: "ALL", + number: plog.SeverityNumberTrace, + }, { level: "UNKNOWN", number: plog.SeverityNumberUnspecified, From 16f92d12c9c93ed216ea5ac63ef09daa68e84445 Mon Sep 17 00:00:00 2001 From: Jerry Leung Date: Wed, 17 Jul 2024 10:20:37 -0700 Subject: [PATCH 11/16] Removed invalid test case --- .../telemetryapireceiver/config_test.go | 24 +++++++------------ .../telemetryapireceiver/testdata/config.yaml | 18 +++++++------- 2 files changed, 16 insertions(+), 26 deletions(-) diff --git a/collector/receiver/telemetryapireceiver/config_test.go b/collector/receiver/telemetryapireceiver/config_test.go index f6500bebde..ef5562d55c 100644 --- a/collector/receiver/telemetryapireceiver/config_test.go +++ b/collector/receiver/telemetryapireceiver/config_test.go @@ -45,14 +45,6 @@ func TestLoadConfig(t *testing.T) { }, { id: component.NewIDWithName(component.MustNewType("telemetryapi"), "2"), - expected: &Config{ - extensionID: "extensionID", - Port: 12345, - Types: []string{platform, function, extension}, - }, - }, - { - id: component.NewIDWithName(component.MustNewType("telemetryapi"), "3"), expected: &Config{ extensionID: "extensionID", Port: 12345, @@ -60,7 +52,7 @@ func TestLoadConfig(t *testing.T) { }, }, { - id: component.NewIDWithName(component.MustNewType("telemetryapi"), "4"), + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "3"), expected: &Config{ extensionID: "extensionID", Port: 12345, @@ -68,7 +60,7 @@ func TestLoadConfig(t *testing.T) { }, }, { - id: component.NewIDWithName(component.MustNewType("telemetryapi"), "5"), + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "4"), expected: &Config{ extensionID: "extensionID", Port: 12345, @@ -76,7 +68,7 @@ func TestLoadConfig(t *testing.T) { }, }, { - id: component.NewIDWithName(component.MustNewType("telemetryapi"), "6"), + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "5"), expected: &Config{ extensionID: "extensionID", Port: 12345, @@ -84,7 +76,7 @@ func TestLoadConfig(t *testing.T) { }, }, { - id: component.NewIDWithName(component.MustNewType("telemetryapi"), "7"), + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "6"), expected: &Config{ extensionID: "extensionID", Port: 12345, @@ -92,7 +84,7 @@ func TestLoadConfig(t *testing.T) { }, }, { - id: component.NewIDWithName(component.MustNewType("telemetryapi"), "8"), + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "7"), expected: &Config{ extensionID: "extensionID", Port: 12345, @@ -100,7 +92,7 @@ func TestLoadConfig(t *testing.T) { }, }, { - id: component.NewIDWithName(component.MustNewType("telemetryapi"), "9"), + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "8"), expected: &Config{ extensionID: "extensionID", Port: 12345, @@ -108,7 +100,7 @@ func TestLoadConfig(t *testing.T) { }, }, { - id: component.NewIDWithName(component.MustNewType("telemetryapi"), "10"), + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "9"), expected: &Config{ extensionID: "extensionID", Port: 12345, @@ -116,7 +108,7 @@ func TestLoadConfig(t *testing.T) { }, }, { - id: component.NewIDWithName(component.MustNewType("telemetryapi"), "11"), + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "10"), expected: &Config{ extensionID: "extensionID", Port: 12345, diff --git a/collector/receiver/telemetryapireceiver/testdata/config.yaml b/collector/receiver/telemetryapireceiver/testdata/config.yaml index b0935f779c..ce43e718f9 100644 --- a/collector/receiver/telemetryapireceiver/testdata/config.yaml +++ b/collector/receiver/telemetryapireceiver/testdata/config.yaml @@ -2,33 +2,31 @@ telemetryapi: telemetryapi/1: port: 12345 telemetryapi/2: - port: "12345" -telemetryapi/3: port: 12345 types: ["platform"] -telemetryapi/4: +telemetryapi/3: port: 12345 types: ["function"] -telemetryapi/5: +telemetryapi/4: port: 12345 types: ["extension"] -telemetryapi/6: +telemetryapi/5: port: 12345 types: ["platform", "function"] -telemetryapi/7: +telemetryapi/6: port: 12345 types: ["platform", "extension"] -telemetryapi/8: +telemetryapi/7: port: 12345 types: ["function", "extension"] -telemetryapi/9: +telemetryapi/8: port: 12345 types: [] -telemetryapi/10: +telemetryapi/9: port: 12345 types: - function - extension -telemetryapi/11: +telemetryapi/10: port: 12345 types: [function, extension] From a2970351701f14bac779ae41bce7a293a0325fed Mon Sep 17 00:00:00 2001 From: Jerry Leung Date: Thu, 22 Aug 2024 10:23:55 -0700 Subject: [PATCH 12/16] Fixed code after rebase --- .../receiver/telemetryapireceiver/factory.go | 8 ++- .../receiver/telemetryapireceiver/go.mod | 28 ++++++--- .../receiver/telemetryapireceiver/go.sum | 62 ++++++++++++------- .../receiver/telemetryapireceiver/receiver.go | 2 +- .../telemetryapireceiver/receiver_test.go | 11 ++-- 5 files changed, 73 insertions(+), 38 deletions(-) diff --git a/collector/receiver/telemetryapireceiver/factory.go b/collector/receiver/telemetryapireceiver/factory.go index fcbf918a26..83ab96f23b 100644 --- a/collector/receiver/telemetryapireceiver/factory.go +++ b/collector/receiver/telemetryapireceiver/factory.go @@ -56,19 +56,21 @@ func createTracesReceiver(ctx context.Context, params receiver.Settings, rConf c return nil, errConfigNotTelemetryAPI } r := receivers.GetOrAdd(cfg, func() component.Component { - return newTelemetryAPIReceiver(cfg, params) + t, _ := newTelemetryAPIReceiver(cfg, params) + return t }) r.Unwrap().(*telemetryAPIReceiver).registerTracesConsumer(next) return r, nil } -func createLogsReceiver(ctx context.Context, params receiver.CreateSettings, rConf component.Config, next consumer.Logs) (receiver.Logs, error) { +func createLogsReceiver(ctx context.Context, params receiver.Settings, rConf component.Config, next consumer.Logs) (receiver.Logs, error) { cfg, ok := rConf.(*Config) if !ok { return nil, errConfigNotTelemetryAPI } r := receivers.GetOrAdd(cfg, func() component.Component { - return newTelemetryAPIReceiver(cfg, params) + t, _ := newTelemetryAPIReceiver(cfg, params) + return t }) r.Unwrap().(*telemetryAPIReceiver).registerLogsConsumer(next) return r, nil diff --git a/collector/receiver/telemetryapireceiver/go.mod b/collector/receiver/telemetryapireceiver/go.mod index 9dc718eeb1..fb66eefafc 100644 --- a/collector/receiver/telemetryapireceiver/go.mod +++ b/collector/receiver/telemetryapireceiver/go.mod @@ -11,11 +11,12 @@ require ( github.com/open-telemetry/opentelemetry-lambda/collector v0.98.0 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.107.0 - go.opentelemetry.io/collector/consumer v0.106.1 - go.opentelemetry.io/collector/consumer/consumertest v0.106.1 + go.opentelemetry.io/collector/confmap v0.107.0 + go.opentelemetry.io/collector/consumer v0.107.0 + go.opentelemetry.io/collector/consumer/consumertest v0.107.0 go.opentelemetry.io/collector/pdata v1.13.0 - go.opentelemetry.io/collector/receiver v0.106.1 - go.opentelemetry.io/collector/semconv v0.106.1 + go.opentelemetry.io/collector/receiver v0.107.0 + go.opentelemetry.io/collector/semconv v0.107.0 go.uber.org/zap v1.27.0 ) @@ -25,9 +26,16 @@ require ( github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/go-viper/mapstructure/v2 v2.0.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/hashicorp/go-version v1.7.0 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/knadh/koanf/maps v0.1.1 // indirect + github.com/knadh/koanf/providers/confmap v0.1.0 // indirect + github.com/knadh/koanf/v2 v2.1.1 // indirect + github.com/mitchellh/copystructure v1.2.0 // indirect + github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect @@ -37,8 +45,10 @@ require ( github.com/prometheus/common v0.55.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.107.0 // indirect - go.opentelemetry.io/collector/consumer/consumerprofiles v0.106.1 // indirect - go.opentelemetry.io/collector/pdata/pprofile v0.106.1 // indirect + go.opentelemetry.io/collector/consumer/consumerprofiles v0.107.0 // indirect + go.opentelemetry.io/collector/featuregate v1.13.0 // indirect + go.opentelemetry.io/collector/internal/globalgates v0.107.0 // indirect + go.opentelemetry.io/collector/pdata/pprofile v0.107.0 // indirect go.opentelemetry.io/otel v1.28.0 // indirect go.opentelemetry.io/otel/exporters/prometheus v0.50.0 // indirect go.opentelemetry.io/otel/metric v1.28.0 // indirect @@ -46,9 +56,9 @@ require ( go.opentelemetry.io/otel/sdk/metric v1.28.0 // indirect go.opentelemetry.io/otel/trace v1.28.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/net v0.27.0 // indirect - golang.org/x/sys v0.22.0 // indirect - golang.org/x/text v0.16.0 // indirect + golang.org/x/net v0.28.0 // indirect + golang.org/x/sys v0.23.0 // indirect + golang.org/x/text v0.17.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect google.golang.org/grpc v1.65.0 // indirect google.golang.org/protobuf v1.34.2 // indirect diff --git a/collector/receiver/telemetryapireceiver/go.sum b/collector/receiver/telemetryapireceiver/go.sum index 864167af18..4998268d81 100644 --- a/collector/receiver/telemetryapireceiver/go.sum +++ b/collector/receiver/telemetryapireceiver/go.sum @@ -11,6 +11,8 @@ github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-viper/mapstructure/v2 v2.0.0 h1:dhn8MZ1gZ0mzeodTG3jt5Vj/o87xZKuNAprG2mQfMfc= +github.com/go-viper/mapstructure/v2 v2.0.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang-collections/go-datastructures v0.0.0-20150211160725-59788d5eb259 h1:ZHJ7+IGpuOXtVf6Zk/a3WuHQgkC+vXwaqfUBDFwahtI= @@ -20,14 +22,26 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY= +github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/knadh/koanf/maps v0.1.1 h1:G5TjmUh2D7G2YWf5SQQqSiHRJEjaicvU0KpypqB3NIs= +github.com/knadh/koanf/maps v0.1.1/go.mod h1:npD/QZY3V6ghQDdcQzl1W4ICNVTkohC8E73eI2xW4yI= +github.com/knadh/koanf/providers/confmap v0.1.0 h1:gOkxhHkemwG4LezxxN8DMOFopOPghxRVp7JbIvdvqzU= +github.com/knadh/koanf/providers/confmap v0.1.0/go.mod h1:2uLhxQzJnyHKfxG927awZC7+fyHFdQkd697K4MdLnIU= +github.com/knadh/koanf/v2 v2.1.1 h1:/R8eXqasSTsmDCsAyYj+81Wteg8AqrV9CP6gvsTsOmM= +github.com/knadh/koanf/v2 v2.1.1/go.mod h1:4mnTRbZCK+ALuBXHZMjDfG9y714L7TykVnZkXbMU3Es= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= +github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= +github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= +github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -54,27 +68,33 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -go.opentelemetry.io/collector v0.106.1 h1:ZSQMpFGzFP3RILe1/+K80kCCT2ahn3MKt5e3u0Yz7Rs= +go.opentelemetry.io/collector v0.107.0 h1:C1Mng03iE73flGhEg795IFVlr3qhDLef5GESjIVtx5g= go.opentelemetry.io/collector/component v0.107.0 h1:3ReaEAtKwrPj7HrlKjEGBDKbBaxdRMPC2mfZ9b6zjXE= go.opentelemetry.io/collector/component v0.107.0/go.mod h1:1xMIYKvpnP7laipjgEw7kq1ozG7ySLkA0Evhr2Bp8M4= go.opentelemetry.io/collector/config/configtelemetry v0.107.0 h1:pSGd4FWQ/Up/Af+XZTR8JNneH/wmQ/TAU4Z16JHQeUc= go.opentelemetry.io/collector/config/configtelemetry v0.107.0/go.mod h1:WxWKNVAQJg/Io1nA3xLgn/DWLE/W1QOB2+/Js3ACi40= -go.opentelemetry.io/collector/consumer v0.106.1 h1:+AQ/Kmoc/g0WP8thwymNkXk1jeWsHDK6XyYfdezcxcc= -go.opentelemetry.io/collector/consumer v0.106.1/go.mod h1:oy6pR/v5o/N9cxsICskyt//bU8k8EG0JeOO1MTDfs5A= -go.opentelemetry.io/collector/consumer/consumerprofiles v0.106.1 h1:uxQjWm2XE7d1OncQDM9tL1ha+otGt1HjoRYIcQRMOfQ= -go.opentelemetry.io/collector/consumer/consumerprofiles v0.106.1/go.mod h1:xQScBf9/PORFaYM6JVPOr7/TcRVEuKcW5XbAXfJByRs= -go.opentelemetry.io/collector/consumer/consumertest v0.106.1 h1:hDdFeVjCLIJ6iLfbiYcV9s+4iboFXbkJ/k3h09qusPw= -go.opentelemetry.io/collector/consumer/consumertest v0.106.1/go.mod h1:WRTYnQ8bYHQrEN6eJZ80oC4pNI7VeDRdsTZI6xs9o5M= +go.opentelemetry.io/collector/confmap v0.107.0 h1:M2o7jvQM9bnMU3pE2N6BK4KHYtSnvsSZkegUD89y8BU= +go.opentelemetry.io/collector/confmap v0.107.0/go.mod h1:9Fs/ZEIeiMa38VqkqIpn+JKQkcPf/lhAKA9fHu6c9GY= +go.opentelemetry.io/collector/consumer v0.107.0 h1:fF/+xyv9BfXQUvuJqkljrpzKyBQExDQt6zB5rzGyuHs= +go.opentelemetry.io/collector/consumer v0.107.0/go.mod h1:wgWpFes9sbnZ11XeJPSeutU8GJx6dT/gzSUqHpaZZQA= +go.opentelemetry.io/collector/consumer/consumerprofiles v0.107.0 h1:SEP5rLm4KgBaELciRQO4m9U2q3xn16KGjpIw8zQn6Ik= +go.opentelemetry.io/collector/consumer/consumerprofiles v0.107.0/go.mod h1:Vi/aqlZjCBdGgGu+iOEfUyHvq2TJBar0WfsQSOMhR6Y= +go.opentelemetry.io/collector/consumer/consumertest v0.107.0 h1:BfjFHHAqbTmCN32akYvMhWKYC+ayHTX935/fRChwohM= +go.opentelemetry.io/collector/consumer/consumertest v0.107.0/go.mod h1:qNMedscdVyuxbV+wWUt4yGKQM3c0YEgQJTFeAtGZjRY= +go.opentelemetry.io/collector/featuregate v1.13.0 h1:rc84eCf5hesXQ8/bP6Zc15wqthbomfLBHmox5tT7AwM= +go.opentelemetry.io/collector/featuregate v1.13.0/go.mod h1:PsOINaGgTiFc+Tzu2K/X2jP+Ngmlp7YKGV1XrnBkH7U= +go.opentelemetry.io/collector/internal/globalgates v0.107.0 h1:PaD6WgQg80YTVxg8OF+YEqgI7WRd13wMu/R6GIG7uNU= +go.opentelemetry.io/collector/internal/globalgates v0.107.0/go.mod h1:hca7Tpzu6JmBrAOgmlyp/ZM6kxprPRMKqSYoq/Tdzjw= go.opentelemetry.io/collector/pdata v1.13.0 h1:eV3NQt2f1UcaibkziMvGTQI34LlpiYBUGp1yP0G/Cxw= go.opentelemetry.io/collector/pdata v1.13.0/go.mod h1:MYeB0MmMAxeM0hstCFrCqWLzdyeYySim2dG6pDT6nYI= -go.opentelemetry.io/collector/pdata/pprofile v0.106.1 h1:nOLo25YnluNi+zAbU7G24RN86cJ1/EZJc6VEayBlOPo= -go.opentelemetry.io/collector/pdata/pprofile v0.106.1/go.mod h1:chr7lMJIzyXkccnPRkIPhyXtqLZLSReZYhwsggOGEfg= -go.opentelemetry.io/collector/pdata/testdata v0.106.1 h1:JUyLAwKD8o/9jgkBi16zOClxOyY028A7XIXHPV4mNmM= -go.opentelemetry.io/collector/pdata/testdata v0.106.1/go.mod h1:ghdz2RDEzsfigW0J+9oqA4fGmQJ/DJYUhE3vYU6JfhM= -go.opentelemetry.io/collector/receiver v0.106.1 h1:9kDLDJmInnz+AzAV9oV/UGMoc1+oI1pwMMs7+uMiJq4= -go.opentelemetry.io/collector/receiver v0.106.1/go.mod h1:3j9asWz7mqsgE77rPaNhlNQhRwgFhRynf0UEPs/4rkM= -go.opentelemetry.io/collector/semconv v0.106.1 h1:x0OSXrQCFinqZNUPTKrHU0gnbwngOVOPyhedQCDyDoQ= -go.opentelemetry.io/collector/semconv v0.106.1/go.mod h1:yMVUCNoQPZVq/IPfrHrnntZTWsLf5YGZ7qwKulIl5hw= +go.opentelemetry.io/collector/pdata/pprofile v0.107.0 h1:F25VZrEkSaneIBNcNI9LEBWf9nRC/WHKluSBTP0gKAA= +go.opentelemetry.io/collector/pdata/pprofile v0.107.0/go.mod h1:1GrwsKtgogRCt8aG/0lfJ037yDdFtYqF+OtJr+snxRQ= +go.opentelemetry.io/collector/pdata/testdata v0.107.0 h1:02CqvJrYjkrBlWDD+6yrByN1AhG2zT61OScLPhyyMwU= +go.opentelemetry.io/collector/pdata/testdata v0.107.0/go.mod h1:bqaeiDH1Lc5DFJXvjVHwO50x00TXj+oFre+EbOVeZXs= +go.opentelemetry.io/collector/receiver v0.107.0 h1:zfqvvYw5EmGsHT0WAfRyBv1WDN1uSXYRVNuHlYswTmQ= +go.opentelemetry.io/collector/receiver v0.107.0/go.mod h1:b29OEGTLMTit+2Xj8MA59PFbZVXpiTMGnVR0SuzqrI0= +go.opentelemetry.io/collector/semconv v0.107.0 h1:MrrUR4L4tu3IE1JxsxtT/PxjVUqvd6SC9d/dQzk/OxA= +go.opentelemetry.io/collector/semconv v0.107.0/go.mod h1:yMVUCNoQPZVq/IPfrHrnntZTWsLf5YGZ7qwKulIl5hw= go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo= go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4= go.opentelemetry.io/otel/exporters/prometheus v0.50.0 h1:2Ewsda6hejmbhGFyUvWZjUThC98Cf8Zy6g0zkIimOng= @@ -102,20 +122,20 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= -golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= +golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= +golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= -golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM= +golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= -golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index d29952611f..d59776fbe5 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -346,7 +346,7 @@ func newTelemetryAPIReceiver( port: cfg.Port, types: subscribedTypes, resource: r, - } + }, nil } func listenOnAddress(port int) string { diff --git a/collector/receiver/telemetryapireceiver/receiver_test.go b/collector/receiver/telemetryapireceiver/receiver_test.go index 94d95364aa..34f701a2ca 100644 --- a/collector/receiver/telemetryapireceiver/receiver_test.go +++ b/collector/receiver/telemetryapireceiver/receiver_test.go @@ -109,10 +109,11 @@ func TestHandler(t *testing.T) { for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { consumer := mockConsumer{} - r := newTelemetryAPIReceiver( + r, err := newTelemetryAPIReceiver( &Config{}, receivertest.NewNopSettings(), ) + require.NoError(t, err) r.registerTracesConsumer(&consumer) req := httptest.NewRequest("POST", "http://localhost:53612/someevent", strings.NewReader(tc.body)) @@ -155,10 +156,11 @@ func TestCreatePlatformInitSpan(t *testing.T) { } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - r := newTelemetryAPIReceiver( + r, err := newTelemetryAPIReceiver( &Config{}, receivertest.NewNopSettings(), ) + require.NoError(t, err) td, err := r.createPlatformInitSpan(tc.start, tc.end) if tc.expectError { require.Error(t, err) @@ -312,10 +314,11 @@ func TestCreateLogs(t *testing.T) { } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - r := newTelemetryAPIReceiver( + r, err := newTelemetryAPIReceiver( &Config{}, - receivertest.NewNopCreateSettings(), + receivertest.NewNopSettings(), ) + require.NoError(t, err) log, err := r.createLogs(tc.slice) if tc.expectError { require.Error(t, err) From 2d33076061b2805ed27326bc270b1cc97fcf58b7 Mon Sep 17 00:00:00 2001 From: Jerry Leung Date: Thu, 22 Aug 2024 10:25:38 -0700 Subject: [PATCH 13/16] Updated README.md --- collector/receiver/telemetryapireceiver/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/collector/receiver/telemetryapireceiver/README.md b/collector/receiver/telemetryapireceiver/README.md index ebd7b9b4c0..f7e7ebca61 100644 --- a/collector/receiver/telemetryapireceiver/README.md +++ b/collector/receiver/telemetryapireceiver/README.md @@ -27,12 +27,12 @@ receivers: telemetryapi/1: port: 4326 telemetryapi/2: - port: 4326 + port: 4327 types: - platform - function telemetryapi/3: - port: 4326 + port: 4328 types: ["platform", "function"] ``` From cdea000a4a0ed3d1192cf177b4e49eaee9a1bf62 Mon Sep 17 00:00:00 2001 From: Jerry Leung Date: Thu, 22 Aug 2024 10:48:53 -0700 Subject: [PATCH 14/16] Used time.RFC3339 format --- collector/receiver/telemetryapireceiver/receiver.go | 9 ++++----- collector/receiver/telemetryapireceiver/receiver_test.go | 2 +- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index d59776fbe5..df21c32456 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -42,7 +42,6 @@ import ( ) const initialQueueSize = 5 -const timeFormatLayout = "2006-01-02T15:04:05.000Z" const scopeName = "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapi" type telemetryAPIReceiver struct { @@ -194,7 +193,7 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { r.logger.Debug(fmt.Sprintf("Event: %s", el.Type), zap.Any("event", el)) logRecord := scopeLog.LogRecords().AppendEmpty() logRecord.Attributes().PutStr("type", el.Type) - if t, err := time.Parse(timeFormatLayout, el.Time); err == nil { + if t, err := time.Parse(time.RFC3339, el.Time); err == nil { logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t)) logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now())) } else { @@ -205,7 +204,7 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { if record, ok := el.Record.(map[string]interface{}); ok { // in JSON format https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function if timestamp, ok := record["timestamp"].(string); ok { - if t, err := time.Parse(timeFormatLayout, timestamp); err == nil { + if t, err := time.Parse(time.RFC3339, timestamp); err == nil { logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t)) } else { r.logger.Error("error parsing time", zap.Error(err)) @@ -290,12 +289,12 @@ func (r *telemetryAPIReceiver) createPlatformInitSpan(start, end string) (ptrace span.SetName("platform.initRuntimeDone") span.SetKind(ptrace.SpanKindInternal) span.Attributes().PutBool(semconv.AttributeFaaSColdstart, true) - startTime, err := time.Parse(timeFormatLayout, start) + startTime, err := time.Parse(time.RFC3339, start) if err != nil { return ptrace.Traces{}, err } span.SetStartTimestamp(pcommon.NewTimestampFromTime(startTime)) - endTime, err := time.Parse(timeFormatLayout, end) + endTime, err := time.Parse(time.RFC3339, end) if err != nil { return ptrace.Traces{}, err } diff --git a/collector/receiver/telemetryapireceiver/receiver_test.go b/collector/receiver/telemetryapireceiver/receiver_test.go index 34f701a2ca..21e1acb8ca 100644 --- a/collector/receiver/telemetryapireceiver/receiver_test.go +++ b/collector/receiver/telemetryapireceiver/receiver_test.go @@ -334,7 +334,7 @@ func TestCreateLogs(t *testing.T) { attr, ok := logRecord.Attributes().Get("type") require.True(t, ok) require.Equal(t, tc.expectedType, attr.Str()) - expectedTime, err := time.Parse(timeFormatLayout, tc.expectedTimestamp) + expectedTime, err := time.Parse(time.RFC3339, tc.expectedTimestamp) require.NoError(t, err) require.Equal(t, pcommon.NewTimestampFromTime(expectedTime), logRecord.Timestamp()) requestId, ok := logRecord.Attributes().Get(semconv.AttributeFaaSInvocationID) From 166628ad12f6633947d60ffaf982612ebf2572be Mon Sep 17 00:00:00 2001 From: Jerry Leung Date: Tue, 27 Aug 2024 16:48:41 -0700 Subject: [PATCH 15/16] Applied review comments --- .../telemetryapireceiver/config_test.go | 107 ++++++++---------- 1 file changed, 46 insertions(+), 61 deletions(-) diff --git a/collector/receiver/telemetryapireceiver/config_test.go b/collector/receiver/telemetryapireceiver/config_test.go index ef5562d55c..025846e8cd 100644 --- a/collector/receiver/telemetryapireceiver/config_test.go +++ b/collector/receiver/telemetryapireceiver/config_test.go @@ -27,105 +27,90 @@ import ( func TestLoadConfig(t *testing.T) { t.Parallel() + // Helper function to create expected Config + createExpectedConfig := func(types []string) *Config { + return &Config{ + extensionID: "extensionID", + Port: 12345, + Types: types, + } + } + tests := []struct { + name string id component.ID expected component.Config }{ { + name: "default", id: component.NewID(component.MustNewType("telemetryapi")), expected: NewFactory("extensionID").CreateDefaultConfig(), }, { - id: component.NewIDWithName(component.MustNewType("telemetryapi"), "1"), - expected: &Config{ - extensionID: "extensionID", - Port: 12345, - Types: []string{platform, function, extension}, - }, + name: "all types", + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "1"), + expected: createExpectedConfig([]string{platform, function, extension}), }, { - id: component.NewIDWithName(component.MustNewType("telemetryapi"), "2"), - expected: &Config{ - extensionID: "extensionID", - Port: 12345, - Types: []string{platform}, - }, + name: "platform only", + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "2"), + expected: createExpectedConfig([]string{platform}), }, { - id: component.NewIDWithName(component.MustNewType("telemetryapi"), "3"), - expected: &Config{ - extensionID: "extensionID", - Port: 12345, - Types: []string{function}, - }, + name: "function only", + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "3"), + expected: createExpectedConfig([]string{function}), }, { - id: component.NewIDWithName(component.MustNewType("telemetryapi"), "4"), - expected: &Config{ - extensionID: "extensionID", - Port: 12345, - Types: []string{extension}, - }, + name: "extension only", + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "4"), + expected: createExpectedConfig([]string{extension}), }, { - id: component.NewIDWithName(component.MustNewType("telemetryapi"), "5"), - expected: &Config{ - extensionID: "extensionID", - Port: 12345, - Types: []string{platform, function}, - }, + name: "platform and function", + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "5"), + expected: createExpectedConfig([]string{platform, function}), }, { - id: component.NewIDWithName(component.MustNewType("telemetryapi"), "6"), - expected: &Config{ - extensionID: "extensionID", - Port: 12345, - Types: []string{platform, extension}, - }, + name: "platform and extension", + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "6"), + expected: createExpectedConfig([]string{platform, extension}), }, { - id: component.NewIDWithName(component.MustNewType("telemetryapi"), "7"), - expected: &Config{ - extensionID: "extensionID", - Port: 12345, - Types: []string{function, extension}, - }, + name: "function and extension", + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "7"), + expected: createExpectedConfig([]string{function, extension}), }, { - id: component.NewIDWithName(component.MustNewType("telemetryapi"), "8"), - expected: &Config{ - extensionID: "extensionID", - Port: 12345, - Types: []string{}, - }, + name: "empty types", + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "8"), + expected: createExpectedConfig([]string{}), }, { - id: component.NewIDWithName(component.MustNewType("telemetryapi"), "9"), - expected: &Config{ - extensionID: "extensionID", - Port: 12345, - Types: []string{function, extension}, - }, + name: "function and extension (alternative syntax)", + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "9"), + expected: createExpectedConfig([]string{function, extension}), }, { - id: component.NewIDWithName(component.MustNewType("telemetryapi"), "10"), - expected: &Config{ - extensionID: "extensionID", - Port: 12345, - Types: []string{function, extension}, - }, + name: "function and extension (another syntax)", + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "10"), + expected: createExpectedConfig([]string{function, extension}), }, } + for _, tt := range tests { - t.Run(tt.id.String(), func(t *testing.T) { + t.Run(tt.name, func(t *testing.T) { cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) require.NoError(t, err) + factory := NewFactory("extensionID") cfg := factory.CreateDefaultConfig() + sub, err := cm.Sub(tt.id.String()) require.NoError(t, err) require.NoError(t, sub.Unmarshal(cfg)) require.NoError(t, component.ValidateConfig(cfg)) + require.Equal(t, tt.expected, cfg) }) } From 3b7cda52875572d7e82fee207657c85e3d5f5317 Mon Sep 17 00:00:00 2001 From: Jerry Leung Date: Wed, 28 Aug 2024 09:13:19 -0700 Subject: [PATCH 16/16] Added WARNING, Updated test cases, Added String.ToUpper --- .../receiver/telemetryapireceiver/receiver.go | 3 +- .../telemetryapireceiver/receiver_test.go | 144 +++++------------- 2 files changed, 36 insertions(+), 111 deletions(-) diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index df21c32456..6fbc8a6774 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -212,7 +212,7 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { } } if level, ok := record["level"].(string); ok { - logRecord.SetSeverityNumber(severityTextToNumber(level)) + logRecord.SetSeverityNumber(severityTextToNumber(strings.ToUpper(level))) logRecord.SetSeverityText(logRecord.SeverityNumber().String()) } if requestId, ok := record["requestId"].(string); ok { @@ -260,6 +260,7 @@ func severityTextToNumber(severityText string) plog.SeverityNumber { "FATAL4": plog.SeverityNumberFatal4, "CRITICAL": plog.SeverityNumberFatal, "ALL": plog.SeverityNumberTrace, + "WARNING": plog.SeverityNumberWarn, } if ans, ok := mapping[strings.ToUpper(severityText)]; ok { return ans diff --git a/collector/receiver/telemetryapireceiver/receiver_test.go b/collector/receiver/telemetryapireceiver/receiver_test.go index 21e1acb8ca..691d3fa7c1 100644 --- a/collector/receiver/telemetryapireceiver/receiver_test.go +++ b/collector/receiver/telemetryapireceiver/receiver_test.go @@ -354,117 +354,41 @@ func TestCreateLogs(t *testing.T) { func TestSeverityTextToNumber(t *testing.T) { t.Parallel() - testCases := []struct { - level string - number plog.SeverityNumber - }{ - { - level: "TRACE", - number: plog.SeverityNumberTrace, - }, - { - level: "TRACE2", - number: plog.SeverityNumberTrace2, - }, - { - level: "TRACE3", - number: plog.SeverityNumberTrace3, - }, - { - level: "TRACE4", - number: plog.SeverityNumberTrace4, - }, - { - level: "DEBUG2", - number: plog.SeverityNumberDebug2, - }, - { - level: "DEBUG3", - number: plog.SeverityNumberDebug3, - }, - { - level: "DEBUG4", - number: plog.SeverityNumberDebug4, - }, - { - level: "INFO", - number: plog.SeverityNumberInfo, - }, - { - level: "INFO2", - number: plog.SeverityNumberInfo2, - }, - { - level: "INFO3", - number: plog.SeverityNumberInfo3, - }, - { - level: "INFO4", - number: plog.SeverityNumberInfo4, - }, - { - level: "WARN", - number: plog.SeverityNumberWarn, - }, - { - level: "WARN2", - number: plog.SeverityNumberWarn2, - }, - { - level: "WARN3", - number: plog.SeverityNumberWarn3, - }, - { - level: "WARN4", - number: plog.SeverityNumberWarn4, - }, - { - level: "ERROR", - number: plog.SeverityNumberError, - }, - { - level: "ERROR2", - number: plog.SeverityNumberError2, - }, - { - level: "ERROR3", - number: plog.SeverityNumberError3, - }, - { - level: "ERROR4", - number: plog.SeverityNumberError4, - }, - { - level: "FATAL", - number: plog.SeverityNumberFatal, - }, - { - level: "FATAL2", - number: plog.SeverityNumberFatal2, - }, - { - level: "FATAL3", - number: plog.SeverityNumberFatal3, - }, - { - level: "FATAL4", - number: plog.SeverityNumberFatal4, - }, - { - level: "CRITICAL", - number: plog.SeverityNumberFatal, - }, - { - level: "ALL", - number: plog.SeverityNumberTrace, - }, - { - level: "UNKNOWN", - number: plog.SeverityNumberUnspecified, - }, + goldenMapping := map[string]plog.SeverityNumber{ + "TRACE": plog.SeverityNumberTrace, + "TRACE2": plog.SeverityNumberTrace2, + "TRACE3": plog.SeverityNumberTrace3, + "TRACE4": plog.SeverityNumberTrace4, + "DEBUG": plog.SeverityNumberDebug, + "DEBUG2": plog.SeverityNumberDebug2, + "DEBUG3": plog.SeverityNumberDebug3, + "DEBUG4": plog.SeverityNumberDebug4, + "INFO": plog.SeverityNumberInfo, + "INFO2": plog.SeverityNumberInfo2, + "INFO3": plog.SeverityNumberInfo3, + "INFO4": plog.SeverityNumberInfo4, + "WARN": plog.SeverityNumberWarn, + "WARN2": plog.SeverityNumberWarn2, + "WARN3": plog.SeverityNumberWarn3, + "WARN4": plog.SeverityNumberWarn4, + "ERROR": plog.SeverityNumberError, + "ERROR2": plog.SeverityNumberError2, + "ERROR3": plog.SeverityNumberError3, + "ERROR4": plog.SeverityNumberError4, + "FATAL": plog.SeverityNumberFatal, + "FATAL2": plog.SeverityNumberFatal2, + "FATAL3": plog.SeverityNumberFatal3, + "FATAL4": plog.SeverityNumberFatal4, + "CRITICAL": plog.SeverityNumberFatal, + "ALL": plog.SeverityNumberTrace, + "WARNING": plog.SeverityNumberWarn, + } + for level, number := range goldenMapping { + require.Equal(t, number, severityTextToNumber(level)) } - for _, tc := range testCases { - require.Equal(t, tc.number, severityTextToNumber(tc.level)) + others := []string{"", "UNKNOWN", "other", "anything"} + for _, level := range others { + require.Equal(t, plog.SeverityNumberUnspecified, severityTextToNumber(level)) } }