diff --git a/collector/internal/lifecycle/manager.go b/collector/internal/lifecycle/manager.go index 68f6939190..c6d9eb9305 100644 --- a/collector/internal/lifecycle/manager.go +++ b/collector/internal/lifecycle/manager.go @@ -75,7 +75,7 @@ func NewManager(ctx context.Context, logger *zap.Logger, version string) (contex } telemetryClient := telemetryapi.NewClient(logger) - _, err = telemetryClient.Subscribe(ctx, res.ExtensionID, addr) + _, err = telemetryClient.Subscribe(ctx, []telemetryapi.EventType{telemetryapi.Platform}, res.ExtensionID, addr) if err != nil { logger.Fatal("Cannot register Telemetry API client", zap.Error(err)) } diff --git a/collector/internal/telemetryapi/client.go b/collector/internal/telemetryapi/client.go index 2eb9fd5e4e..16883e1804 100644 --- a/collector/internal/telemetryapi/client.go +++ b/collector/internal/telemetryapi/client.go @@ -49,13 +49,7 @@ func NewClient(logger *zap.Logger) *Client { } } -func (c *Client) Subscribe(ctx context.Context, extensionID string, listenerURI string) (string, error) { - eventTypes := []EventType{ - Platform, - // Function, - // Extension, - } - +func (c *Client) Subscribe(ctx context.Context, eventTypes []EventType, extensionID string, listenerURI string) (string, error) { bufferingConfig := BufferingCfg{ MaxItems: 1000, MaxBytes: 256 * 1024, diff --git a/collector/receiver/telemetryapireceiver/README.md b/collector/receiver/telemetryapireceiver/README.md index fe2c2745f1..ebd7b9b4c0 100644 --- a/collector/receiver/telemetryapireceiver/README.md +++ b/collector/receiver/telemetryapireceiver/README.md @@ -1,10 +1,10 @@ # Telemetry API Receiver -| Status | | -| ------------------------ |-----------------| -| Stability | [alpha] | -| Supported pipeline types | traces | -| Distributions | [extension] | +| Status | | +| ------------------------ |--------------| +| Stability | [alpha] | +| Supported pipeline types | traces, logs | +| Distributions | [extension] | This receiver generates telemetry in response to events from the [Telemetry API](https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api.html). It does this by setting up an endpoint and registering itself with the Telemetry API on startup. @@ -15,11 +15,25 @@ Supported events: ## Configuration -There are currently no configuration parameters available for this receiver. It can be enabled via the following configuration: +| Field | Default | Description | +|---------|---------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------| +| `port` | 4325 | HTTP server port to receive Telemetry API data. | +| `types` | ["platform", "function", "extension"] | [Types](https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api-reference.html#telemetry-subscribe-api) of telemetry to subscribe to | + ```yaml receivers: telemetryapi: + telemetryapi/1: + port: 4326 + telemetryapi/2: + port: 4326 + types: + - platform + - function + telemetryapi/3: + port: 4326 + types: ["platform", "function"] ``` [alpha]: https://github.com/open-telemetry/opentelemetry-collector#alpha diff --git a/collector/receiver/telemetryapireceiver/config.go b/collector/receiver/telemetryapireceiver/config.go index 86b5250196..b51ef1ed57 100644 --- a/collector/receiver/telemetryapireceiver/config.go +++ b/collector/receiver/telemetryapireceiver/config.go @@ -14,12 +14,23 @@ package telemetryapireceiver // import "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver" +import ( + "fmt" +) + // Config defines the configuration for the various elements of the receiver agent. type Config struct { extensionID string + Port int `mapstructure:"port"` + Types []string `mapstructure:"types"` } // Validate validates the configuration by checking for missing or invalid fields func (cfg *Config) Validate() error { + for _, t := range cfg.Types { + if t != platform && t != function && t != extension { + return fmt.Errorf("unknown extension type: %s", t) + } + } return nil } diff --git a/collector/receiver/telemetryapireceiver/config_test.go b/collector/receiver/telemetryapireceiver/config_test.go index 7f3969dd7c..f6500bebde 100644 --- a/collector/receiver/telemetryapireceiver/config_test.go +++ b/collector/receiver/telemetryapireceiver/config_test.go @@ -15,11 +15,130 @@ package telemetryapireceiver // import "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver" import ( + "fmt" + "path/filepath" "testing" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap/confmaptest" ) +func TestLoadConfig(t *testing.T) { + t.Parallel() + + tests := []struct { + id component.ID + expected component.Config + }{ + { + id: component.NewID(component.MustNewType("telemetryapi")), + expected: NewFactory("extensionID").CreateDefaultConfig(), + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "1"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{platform, function, extension}, + }, + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "2"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{platform, function, extension}, + }, + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "3"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{platform}, + }, + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "4"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{function}, + }, + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "5"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{extension}, + }, + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "6"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{platform, function}, + }, + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "7"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{platform, extension}, + }, + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "8"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{function, extension}, + }, + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "9"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{}, + }, + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "10"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{function, extension}, + }, + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "11"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{function, extension}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.id.String(), func(t *testing.T) { + cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) + require.NoError(t, err) + factory := NewFactory("extensionID") + cfg := factory.CreateDefaultConfig() + sub, err := cm.Sub(tt.id.String()) + require.NoError(t, err) + require.NoError(t, sub.Unmarshal(cfg)) + require.NoError(t, component.ValidateConfig(cfg)) + require.Equal(t, tt.expected, cfg) + }) + } +} + func TestValidate(t *testing.T) { testCases := []struct { desc string @@ -31,6 +150,13 @@ func TestValidate(t *testing.T) { cfg: &Config{}, expectedErr: nil, }, + { + desc: "invalid config", + cfg: &Config{ + Types: []string{"invalid"}, + }, + expectedErr: fmt.Errorf("unknown extension type: invalid"), + }, } for _, tc := range testCases { diff --git a/collector/receiver/telemetryapireceiver/factory.go b/collector/receiver/telemetryapireceiver/factory.go index c89dcbeac8..ced897d9f9 100644 --- a/collector/receiver/telemetryapireceiver/factory.go +++ b/collector/receiver/telemetryapireceiver/factory.go @@ -18,14 +18,19 @@ import ( "context" "errors" + "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver/internal/sharedcomponent" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/receiver" ) const ( - typeStr = "telemetryapi" - stability = component.StabilityLevelDevelopment + typeStr = "telemetryapi" + stability = component.StabilityLevelDevelopment + defaultPort = 4325 + platform = "platform" + function = "function" + extension = "extension" ) var errConfigNotTelemetryAPI = errors.New("config was not a Telemetry API receiver config") @@ -37,9 +42,12 @@ func NewFactory(extensionID string) receiver.Factory { func() component.Config { return &Config{ extensionID: extensionID, + Port: defaultPort, + Types: []string{platform, function, extension}, } }, - receiver.WithTraces(createTracesReceiver, stability)) + receiver.WithTraces(createTracesReceiver, stability), + receiver.WithLogs(createLogsReceiver, stability)) } func createTracesReceiver(ctx context.Context, params receiver.CreateSettings, rConf component.Config, next consumer.Traces) (receiver.Traces, error) { @@ -47,6 +55,23 @@ func createTracesReceiver(ctx context.Context, params receiver.CreateSettings, r if !ok { return nil, errConfigNotTelemetryAPI } + r := receivers.GetOrAdd(cfg, func() component.Component { + return newTelemetryAPIReceiver(cfg, params) + }) + r.Unwrap().(*telemetryAPIReceiver).registerTracesConsumer(next) + return r, nil +} - return newTelemetryAPIReceiver(cfg, next, params) +func createLogsReceiver(ctx context.Context, params receiver.CreateSettings, rConf component.Config, next consumer.Logs) (receiver.Logs, error) { + cfg, ok := rConf.(*Config) + if !ok { + return nil, errConfigNotTelemetryAPI + } + r := receivers.GetOrAdd(cfg, func() component.Component { + return newTelemetryAPIReceiver(cfg, params) + }) + r.Unwrap().(*telemetryAPIReceiver).registerLogsConsumer(next) + return r, nil } + +var receivers = sharedcomponent.NewSharedComponents() diff --git a/collector/receiver/telemetryapireceiver/factory_test.go b/collector/receiver/telemetryapireceiver/factory_test.go index 97961ea437..e0cfcd25eb 100644 --- a/collector/receiver/telemetryapireceiver/factory_test.go +++ b/collector/receiver/telemetryapireceiver/factory_test.go @@ -41,7 +41,7 @@ func TestNewFactory(t *testing.T) { testFunc: func(t *testing.T) { factory := NewFactory("test") - var expectedCfg component.Config = &Config{extensionID: "test"} + var expectedCfg component.Config = &Config{extensionID: "test", Port: defaultPort, Types: []string{platform, function, extension}} require.Equal(t, expectedCfg, factory.CreateDefaultConfig()) }, diff --git a/collector/receiver/telemetryapireceiver/internal/sharedcomponent/sharedcomponent.go b/collector/receiver/telemetryapireceiver/internal/sharedcomponent/sharedcomponent.go new file mode 100644 index 0000000000..b297d81586 --- /dev/null +++ b/collector/receiver/telemetryapireceiver/internal/sharedcomponent/sharedcomponent.go @@ -0,0 +1,76 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package sharedcomponent exposes util functionality for receivers and exporters +// that need to share state between different signal types instances such as net.Listener or os.File. +package sharedcomponent // import "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver/internal/sharedcomponent" + +import ( + "context" + "sync" + + "go.opentelemetry.io/collector/component" +) + +// SharedComponents a map that keeps reference of all created instances for a given configuration, +// and ensures that the shared state is started and stopped only once. +type SharedComponents struct { + comps map[any]*SharedComponent +} + +// NewSharedComponents returns a new empty SharedComponents. +func NewSharedComponents() *SharedComponents { + return &SharedComponents{ + comps: make(map[any]*SharedComponent), + } +} + +// GetOrAdd returns the already created instance if exists, otherwise creates a new instance +// and adds it to the map of references. +func (scs *SharedComponents) GetOrAdd(key any, create func() component.Component) *SharedComponent { + if c, ok := scs.comps[key]; ok { + return c + } + newComp := &SharedComponent{ + Component: create(), + removeFunc: func() { + delete(scs.comps, key) + }, + } + scs.comps[key] = newComp + return newComp +} + +// SharedComponent ensures that the wrapped component is started and stopped only once. +// When stopped it is removed from the SharedComponents map. +type SharedComponent struct { + component.Component + + startOnce sync.Once + stopOnce sync.Once + removeFunc func() +} + +// Unwrap returns the original component. +func (r *SharedComponent) Unwrap() component.Component { + return r.Component +} + +// Start implements component.Component. +func (r *SharedComponent) Start(ctx context.Context, host component.Host) error { + var err error + r.startOnce.Do(func() { + err = r.Component.Start(ctx, host) + }) + return err +} + +// Shutdown implements component.Component. +func (r *SharedComponent) Shutdown(ctx context.Context) error { + var err error + r.stopOnce.Do(func() { + err = r.Component.Shutdown(ctx) + r.removeFunc() + }) + return err +} diff --git a/collector/receiver/telemetryapireceiver/internal/sharedcomponent/sharedcomponent_test.go b/collector/receiver/telemetryapireceiver/internal/sharedcomponent/sharedcomponent_test.go new file mode 100644 index 0000000000..dad4886c17 --- /dev/null +++ b/collector/receiver/telemetryapireceiver/internal/sharedcomponent/sharedcomponent_test.go @@ -0,0 +1,72 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package sharedcomponent + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" +) + +var id = component.MustNewID("test") + +func TestNewSharedComponents(t *testing.T) { + comps := NewSharedComponents() + assert.Len(t, comps.comps, 0) +} + +type mockComponent struct { + component.StartFunc + component.ShutdownFunc +} + +func TestSharedComponents_GetOrAdd(t *testing.T) { + nop := &mockComponent{} + createNop := func() component.Component { return nop } + + comps := NewSharedComponents() + got := comps.GetOrAdd(id, createNop) + assert.Len(t, comps.comps, 1) + assert.Same(t, nop, got.Unwrap()) + assert.Same(t, got, comps.GetOrAdd(id, createNop)) + + // Shutdown nop will remove + assert.NoError(t, got.Shutdown(context.Background())) + assert.Len(t, comps.comps, 0) + assert.NotSame(t, got, comps.GetOrAdd(id, createNop)) +} + +func TestSharedComponent(t *testing.T) { + wantErr := errors.New("my error") + calledStart := 0 + calledStop := 0 + comp := &mockComponent{ + StartFunc: func(_ context.Context, _ component.Host) error { + calledStart++ + return wantErr + }, + ShutdownFunc: func(_ context.Context) error { + calledStop++ + return wantErr + }, + } + createComp := func() component.Component { return comp } + + comps := NewSharedComponents() + got := comps.GetOrAdd(id, createComp) + assert.Equal(t, wantErr, got.Start(context.Background(), componenttest.NewNopHost())) + assert.Equal(t, 1, calledStart) + // Second time is not called anymore. + assert.NoError(t, got.Start(context.Background(), componenttest.NewNopHost())) + assert.Equal(t, 1, calledStart) + assert.Equal(t, wantErr, got.Shutdown(context.Background())) + assert.Equal(t, 1, calledStop) + // Second time is not called anymore. + assert.NoError(t, got.Shutdown(context.Background())) + assert.Equal(t, 1, calledStop) +} diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index 4df8b7764d..9bc30b3393 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -24,36 +24,43 @@ import ( "math/rand" "net/http" "os" + "strconv" + "strings" "time" "github.com/golang-collections/go-datastructures/queue" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/receiver" - semconv "go.opentelemetry.io/collector/semconv/v1.5.0" + semconv "go.opentelemetry.io/collector/semconv/v1.25.0" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-lambda/collector/internal/telemetryapi" ) -const defaultListenerPort = "4325" const initialQueueSize = 5 +const timeFormatLayout = "2006-01-02T15:04:05.000Z" +const scopeName = "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapi" type telemetryAPIReceiver struct { httpServer *http.Server logger *zap.Logger queue *queue.Queue // queue is a synchronous queue and is used to put the received log events to be dispatched later - nextConsumer consumer.Traces + nextTraces consumer.Traces + nextLogs consumer.Logs lastPlatformStartTime string lastPlatformEndTime string extensionID string + port int + types []telemetryapi.EventType resource pcommon.Resource } func (r *telemetryAPIReceiver) Start(ctx context.Context, host component.Host) error { - address := listenOnAddress() + address := listenOnAddress(r.port) r.logger.Info("Listening for requests", zap.String("address", address)) mux := http.NewServeMux() @@ -64,10 +71,12 @@ func (r *telemetryAPIReceiver) Start(ctx context.Context, host component.Host) e }() telemetryClient := telemetryapi.NewClient(r.logger) - _, err := telemetryClient.Subscribe(ctx, r.extensionID, fmt.Sprintf("http://%s/", address)) - if err != nil { - r.logger.Info("Listening for requests", zap.String("address", address), zap.String("extensionID", r.extensionID)) - return err + if len(r.types) > 0 { + _, err := telemetryClient.Subscribe(ctx, r.types, r.extensionID, fmt.Sprintf("http://%s/", address)) + if err != nil { + r.logger.Info("Listening for requests", zap.String("address", address), zap.String("extensionID", r.extensionID)) + return err + } } return nil } @@ -147,12 +156,26 @@ func (r *telemetryAPIReceiver) httpHandler(w http.ResponseWriter, req *http.Requ } if len(r.lastPlatformStartTime) > 0 && len(r.lastPlatformEndTime) > 0 { if td, err := r.createPlatformInitSpan(r.lastPlatformStartTime, r.lastPlatformEndTime); err == nil { - err := r.nextConsumer.ConsumeTraces(context.Background(), td) - if err == nil { - r.lastPlatformEndTime = "" - r.lastPlatformStartTime = "" - } else { - r.logger.Error("error receiving traces", zap.Error(err)) + if r.nextTraces != nil { + err := r.nextTraces.ConsumeTraces(context.Background(), td) + if err == nil { + r.lastPlatformEndTime = "" + r.lastPlatformStartTime = "" + } else { + r.logger.Error("error receiving traces", zap.Error(err)) + } + } + } + } + + // Logs + if r.nextLogs != nil { + if logs, err := r.createLogs(slice); err == nil { + if logs.LogRecordCount() > 0 { + err := r.nextLogs.ConsumeLogs(context.Background(), logs) + if err != nil { + r.logger.Error("error receiving logs", zap.Error(err)) + } } } } @@ -161,26 +184,118 @@ func (r *telemetryAPIReceiver) httpHandler(w http.ResponseWriter, req *http.Requ slice = nil } +func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { + log := plog.NewLogs() + resourceLog := log.ResourceLogs().AppendEmpty() + r.resource.CopyTo(resourceLog.Resource()) + scopeLog := resourceLog.ScopeLogs().AppendEmpty() + scopeLog.Scope().SetName(scopeName) + for _, el := range slice { + r.logger.Debug(fmt.Sprintf("Event: %s", el.Type), zap.Any("event", el)) + logRecord := scopeLog.LogRecords().AppendEmpty() + logRecord.Attributes().PutStr("type", el.Type) + if t, err := time.Parse(timeFormatLayout, el.Time); err == nil { + logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t)) + logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now())) + } else { + r.logger.Error("error parsing time", zap.Error(err)) + return plog.Logs{}, err + } + if el.Type == string(telemetryapi.Function) || el.Type == string(telemetryapi.Extension) { + if record, ok := el.Record.(map[string]interface{}); ok { + // in JSON format https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function + if timestamp, ok := record["timestamp"].(string); ok { + if t, err := time.Parse(timeFormatLayout, timestamp); err == nil { + logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t)) + } else { + r.logger.Error("error parsing time", zap.Error(err)) + return plog.Logs{}, err + } + } + if level, ok := record["level"].(string); ok { + logRecord.SetSeverityNumber(severityTextToNumber(level)) + logRecord.SetSeverityText(logRecord.SeverityNumber().String()) + } + if requestId, ok := record["requestId"].(string); ok { + logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, requestId) + } + if line, ok := record["message"].(string); ok { + logRecord.Body().SetStr(line) + } + } else { + // in plain text https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function + if line, ok := el.Record.(string); ok { + logRecord.Body().SetStr(line) + } + } + } + } + return log, nil +} + +func severityTextToNumber(severityText string) plog.SeverityNumber { + mapping := map[string]plog.SeverityNumber{ + "TRACE": plog.SeverityNumberTrace, + "TRACE2": plog.SeverityNumberTrace2, + "TRACE3": plog.SeverityNumberTrace3, + "TRACE4": plog.SeverityNumberTrace4, + "DEBUG": plog.SeverityNumberDebug, + "DEBUG2": plog.SeverityNumberDebug2, + "DEBUG3": plog.SeverityNumberDebug3, + "DEBUG4": plog.SeverityNumberDebug4, + "INFO": plog.SeverityNumberInfo, + "INFO2": plog.SeverityNumberInfo2, + "INFO3": plog.SeverityNumberInfo3, + "INFO4": plog.SeverityNumberInfo4, + "WARN": plog.SeverityNumberWarn, + "WARN2": plog.SeverityNumberWarn2, + "WARN3": plog.SeverityNumberWarn3, + "WARN4": plog.SeverityNumberWarn4, + "ERROR": plog.SeverityNumberError, + "ERROR2": plog.SeverityNumberError2, + "ERROR3": plog.SeverityNumberError3, + "ERROR4": plog.SeverityNumberError4, + "FATAL": plog.SeverityNumberFatal, + "FATAL2": plog.SeverityNumberFatal2, + "FATAL3": plog.SeverityNumberFatal3, + "FATAL4": plog.SeverityNumberFatal4, + "CRITICAL": plog.SeverityNumberFatal, + "ALL": plog.SeverityNumberTrace, + } + if ans, ok := mapping[strings.ToUpper(severityText)]; ok { + return ans + } else { + return plog.SeverityNumberUnspecified + } +} + +func (r *telemetryAPIReceiver) registerTracesConsumer(next consumer.Traces) { + r.nextTraces = next +} + +func (r *telemetryAPIReceiver) registerLogsConsumer(next consumer.Logs) { + r.nextLogs = next +} + func (r *telemetryAPIReceiver) createPlatformInitSpan(start, end string) (ptrace.Traces, error) { traceData := ptrace.NewTraces() rs := traceData.ResourceSpans().AppendEmpty() r.resource.CopyTo(rs.Resource()) ss := rs.ScopeSpans().AppendEmpty() - ss.Scope().SetName("github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapi") + ss.Scope().SetName(scopeName) span := ss.Spans().AppendEmpty() span.SetTraceID(newTraceID()) span.SetSpanID(newSpanID()) span.SetName("platform.initRuntimeDone") span.SetKind(ptrace.SpanKindInternal) span.Attributes().PutBool(semconv.AttributeFaaSColdstart, true) - layout := "2006-01-02T15:04:05.000Z" - startTime, err := time.Parse(layout, start) + startTime, err := time.Parse(timeFormatLayout, start) if err != nil { return ptrace.Traces{}, err } span.SetStartTimestamp(pcommon.NewTimestampFromTime(startTime)) - endTime, err := time.Parse(layout, end) + endTime, err := time.Parse(timeFormatLayout, end) if err != nil { return ptrace.Traces{}, err } @@ -190,9 +305,8 @@ func (r *telemetryAPIReceiver) createPlatformInitSpan(start, end string) (ptrace func newTelemetryAPIReceiver( cfg *Config, - next consumer.Traces, set receiver.CreateSettings, -) (*telemetryAPIReceiver, error) { +) *telemetryAPIReceiver { envResourceMap := map[string]string{ "AWS_LAMBDA_FUNCTION_MEMORY_SIZE": semconv.AttributeFaaSMaxMemory, "AWS_LAMBDA_FUNCTION_VERSION": semconv.AttributeFaaSVersion, @@ -212,22 +326,36 @@ func newTelemetryAPIReceiver( r.Attributes().PutStr(resourceAttribute, val) } } + + subscribedTypes := []telemetryapi.EventType{} + for _, val := range cfg.Types { + switch val { + case "platform": + subscribedTypes = append(subscribedTypes, telemetryapi.Platform) + case "function": + subscribedTypes = append(subscribedTypes, telemetryapi.Function) + case "extension": + subscribedTypes = append(subscribedTypes, telemetryapi.Extension) + } + } + return &telemetryAPIReceiver{ - logger: set.Logger, - queue: queue.New(initialQueueSize), - nextConsumer: next, - extensionID: cfg.extensionID, - resource: r, - }, nil + logger: set.Logger, + queue: queue.New(initialQueueSize), + extensionID: cfg.extensionID, + port: cfg.Port, + types: subscribedTypes, + resource: r, + } } -func listenOnAddress() string { +func listenOnAddress(port int) string { envAwsLocal, ok := os.LookupEnv("AWS_SAM_LOCAL") var addr string if ok && envAwsLocal == "true" { - addr = ":" + defaultListenerPort + addr = ":" + strconv.Itoa(port) } else { - addr = "sandbox.localdomain:" + defaultListenerPort + addr = "sandbox.localdomain:" + strconv.Itoa(port) } return addr diff --git a/collector/receiver/telemetryapireceiver/receiver_test.go b/collector/receiver/telemetryapireceiver/receiver_test.go index 7f7b46572f..6c4df40b68 100644 --- a/collector/receiver/telemetryapireceiver/receiver_test.go +++ b/collector/receiver/telemetryapireceiver/receiver_test.go @@ -19,11 +19,15 @@ import ( "net/http/httptest" "strings" "testing" + "time" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/receiver/receivertest" + semconv "go.opentelemetry.io/collector/semconv/v1.25.0" ) func TestListenOnAddress(t *testing.T) { @@ -34,7 +38,7 @@ func TestListenOnAddress(t *testing.T) { { desc: "listen on address without AWS_SAM_LOCAL env variable", testFunc: func(t *testing.T) { - addr := listenOnAddress() + addr := listenOnAddress(4325) require.EqualValues(t, "sandbox.localdomain:4325", addr) }, }, @@ -42,7 +46,7 @@ func TestListenOnAddress(t *testing.T) { desc: "listen on address with AWS_SAM_LOCAL env variable", testFunc: func(t *testing.T) { t.Setenv("AWS_SAM_LOCAL", "true") - addr := listenOnAddress() + addr := listenOnAddress(4325) require.EqualValues(t, ":4325", addr) }, }, @@ -61,11 +65,17 @@ func (c *mockConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) erro return nil } +func (c *mockConsumer) ConsumeLogs(ctx context.Context, td plog.Logs) error { + return nil +} + func (c *mockConsumer) Capabilities() consumer.Capabilities { return consumer.Capabilities{MutatesData: true} } func TestHandler(t *testing.T) { + t.Parallel() + testCases := []struct { desc string body string @@ -99,12 +109,11 @@ func TestHandler(t *testing.T) { for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { consumer := mockConsumer{} - r, err := newTelemetryAPIReceiver( + r := newTelemetryAPIReceiver( &Config{}, - &consumer, receivertest.NewNopCreateSettings(), ) - require.NoError(t, err) + r.registerTracesConsumer(&consumer) req := httptest.NewRequest("POST", "http://localhost:53612/someevent", strings.NewReader(tc.body)) rec := httptest.NewRecorder() @@ -146,12 +155,10 @@ func TestCreatePlatformInitSpan(t *testing.T) { } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - r, err := newTelemetryAPIReceiver( + r := newTelemetryAPIReceiver( &Config{}, - nil, receivertest.NewNopCreateSettings(), ) - require.NoError(t, err) td, err := r.createPlatformInitSpan(tc.start, tc.end) if tc.expectError { require.Error(t, err) @@ -161,3 +168,300 @@ func TestCreatePlatformInitSpan(t *testing.T) { }) } } + +func TestCreateLogs(t *testing.T) { + t.Parallel() + + testCases := []struct { + desc string + slice []event + expectedLogRecords int + expectedType string + expectedTimestamp string + expectedBody string + expectedSeverityText string + expectedContainsRequestId bool + expectedRequestId string + expectedSeverityNumber plog.SeverityNumber + expectError bool + }{ + { + desc: "no slice", + expectedLogRecords: 0, + expectError: false, + }, + { + desc: "Invalid Timestamp", + slice: []event{ + { + Time: "invalid", + Type: "function", + Record: "[INFO] Hello world, I am an extension!", + }, + }, + expectError: true, + }, + { + desc: "function text", + slice: []event{ + { + Time: "2022-10-12T00:03:50.000Z", + Type: "function", + Record: "[INFO] Hello world, I am an extension!", + }, + }, + expectedLogRecords: 1, + expectedType: "function", + expectedTimestamp: "2022-10-12T00:03:50.000Z", + expectedBody: "[INFO] Hello world, I am an extension!", + expectedContainsRequestId: false, + expectedSeverityText: "", + expectedSeverityNumber: plog.SeverityNumberUnspecified, + expectError: false, + }, + { + desc: "function json", + slice: []event{ + { + Time: "2022-10-12T00:03:50.000Z", + Type: "function", + Record: map[string]any{ + "timestamp": "2022-10-12T00:03:50.000Z", + "level": "INFO", + "requestId": "79b4f56e-95b1-4643-9700-2807f4e68189", + "message": "Hello world, I am a function!", + }, + }, + }, + expectedLogRecords: 1, + expectedType: "function", + expectedTimestamp: "2022-10-12T00:03:50.000Z", + expectedBody: "Hello world, I am a function!", + expectedContainsRequestId: true, + expectedRequestId: "79b4f56e-95b1-4643-9700-2807f4e68189", + expectedSeverityText: "Info", + expectedSeverityNumber: plog.SeverityNumberInfo, + expectError: false, + }, + { + desc: "extension text", + slice: []event{ + { + Time: "2022-10-12T00:03:50.000Z", + Type: "extension", + Record: "[INFO] Hello world, I am an extension!", + }, + }, + expectedLogRecords: 1, + expectedType: "extension", + expectedTimestamp: "2022-10-12T00:03:50.000Z", + expectedBody: "[INFO] Hello world, I am an extension!", + expectedContainsRequestId: false, + expectedSeverityText: "", + expectedSeverityNumber: plog.SeverityNumberUnspecified, + expectError: false, + }, + { + desc: "extension json", + slice: []event{ + { + Time: "2022-10-12T00:03:50.000Z", + Type: "extension", + Record: map[string]any{ + "timestamp": "2022-10-12T00:03:50.000Z", + "level": "INFO", + "requestId": "79b4f56e-95b1-4643-9700-2807f4e68689", + "message": "Hello world, I am an extension!", + }, + }, + }, + expectedLogRecords: 1, + expectedType: "extension", + expectedTimestamp: "2022-10-12T00:03:50.000Z", + expectedBody: "Hello world, I am an extension!", + expectedContainsRequestId: true, + expectedRequestId: "79b4f56e-95b1-4643-9700-2807f4e68689", + expectedSeverityText: "Info", + expectedSeverityNumber: plog.SeverityNumberInfo, + expectError: false, + }, + { + desc: "extension json anything", + slice: []event{ + { + Time: "2022-10-12T00:03:50.000Z", + Type: "extension", + Record: map[string]any{ + "timestamp": "2022-10-12T00:03:50.000Z", + "level": "anything", + "requestId": "79b4f56e-95b1-4643-9700-2807f4e68689", + "message": "Hello world, I am an extension!", + }, + }, + }, + expectedLogRecords: 1, + expectedType: "extension", + expectedTimestamp: "2022-10-12T00:03:50.000Z", + expectedBody: "Hello world, I am an extension!", + expectedContainsRequestId: true, + expectedRequestId: "79b4f56e-95b1-4643-9700-2807f4e68689", + expectedSeverityText: "Unspecified", + expectedSeverityNumber: plog.SeverityNumberUnspecified, + expectError: false, + }, + } + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + r := newTelemetryAPIReceiver( + &Config{}, + receivertest.NewNopCreateSettings(), + ) + log, err := r.createLogs(tc.slice) + if tc.expectError { + require.Error(t, err) + } else { + require.Equal(t, 1, log.ResourceLogs().Len()) + resourceLog := log.ResourceLogs().At(0) + require.Equal(t, 1, resourceLog.ScopeLogs().Len()) + scopeLog := resourceLog.ScopeLogs().At(0) + require.Equal(t, scopeName, scopeLog.Scope().Name()) + require.Equal(t, tc.expectedLogRecords, scopeLog.LogRecords().Len()) + if scopeLog.LogRecords().Len() > 0 { + logRecord := scopeLog.LogRecords().At(0) + attr, ok := logRecord.Attributes().Get("type") + require.True(t, ok) + require.Equal(t, tc.expectedType, attr.Str()) + expectedTime, err := time.Parse(timeFormatLayout, tc.expectedTimestamp) + require.NoError(t, err) + require.Equal(t, pcommon.NewTimestampFromTime(expectedTime), logRecord.Timestamp()) + requestId, ok := logRecord.Attributes().Get(semconv.AttributeFaaSInvocationID) + require.Equal(t, tc.expectedContainsRequestId, ok) + if ok { + require.Equal(t, tc.expectedRequestId, requestId.Str()) + } + require.Equal(t, tc.expectedSeverityText, logRecord.SeverityText()) + require.Equal(t, tc.expectedSeverityNumber, logRecord.SeverityNumber()) + require.Equal(t, tc.expectedBody, logRecord.Body().Str()) + } + } + }) + } +} + +func TestSeverityTextToNumber(t *testing.T) { + t.Parallel() + + testCases := []struct { + level string + number plog.SeverityNumber + }{ + { + level: "TRACE", + number: plog.SeverityNumberTrace, + }, + { + level: "TRACE2", + number: plog.SeverityNumberTrace2, + }, + { + level: "TRACE3", + number: plog.SeverityNumberTrace3, + }, + { + level: "TRACE4", + number: plog.SeverityNumberTrace4, + }, + { + level: "DEBUG2", + number: plog.SeverityNumberDebug2, + }, + { + level: "DEBUG3", + number: plog.SeverityNumberDebug3, + }, + { + level: "DEBUG4", + number: plog.SeverityNumberDebug4, + }, + { + level: "INFO", + number: plog.SeverityNumberInfo, + }, + { + level: "INFO2", + number: plog.SeverityNumberInfo2, + }, + { + level: "INFO3", + number: plog.SeverityNumberInfo3, + }, + { + level: "INFO4", + number: plog.SeverityNumberInfo4, + }, + { + level: "WARN", + number: plog.SeverityNumberWarn, + }, + { + level: "WARN2", + number: plog.SeverityNumberWarn2, + }, + { + level: "WARN3", + number: plog.SeverityNumberWarn3, + }, + { + level: "WARN4", + number: plog.SeverityNumberWarn4, + }, + { + level: "ERROR", + number: plog.SeverityNumberError, + }, + { + level: "ERROR2", + number: plog.SeverityNumberError2, + }, + { + level: "ERROR3", + number: plog.SeverityNumberError3, + }, + { + level: "ERROR4", + number: plog.SeverityNumberError4, + }, + { + level: "FATAL", + number: plog.SeverityNumberFatal, + }, + { + level: "FATAL2", + number: plog.SeverityNumberFatal2, + }, + { + level: "FATAL3", + number: plog.SeverityNumberFatal3, + }, + { + level: "FATAL4", + number: plog.SeverityNumberFatal4, + }, + { + level: "CRITICAL", + number: plog.SeverityNumberFatal, + }, + { + level: "ALL", + number: plog.SeverityNumberTrace, + }, + { + level: "UNKNOWN", + number: plog.SeverityNumberUnspecified, + }, + } + for _, tc := range testCases { + require.Equal(t, tc.number, severityTextToNumber(tc.level)) + + } +} diff --git a/collector/receiver/telemetryapireceiver/testdata/config.yaml b/collector/receiver/telemetryapireceiver/testdata/config.yaml new file mode 100644 index 0000000000..b0935f779c --- /dev/null +++ b/collector/receiver/telemetryapireceiver/testdata/config.yaml @@ -0,0 +1,34 @@ +telemetryapi: +telemetryapi/1: + port: 12345 +telemetryapi/2: + port: "12345" +telemetryapi/3: + port: 12345 + types: ["platform"] +telemetryapi/4: + port: 12345 + types: ["function"] +telemetryapi/5: + port: 12345 + types: ["extension"] +telemetryapi/6: + port: 12345 + types: ["platform", "function"] +telemetryapi/7: + port: 12345 + types: ["platform", "extension"] +telemetryapi/8: + port: 12345 + types: ["function", "extension"] +telemetryapi/9: + port: 12345 + types: [] +telemetryapi/10: + port: 12345 + types: + - function + - extension +telemetryapi/11: + port: 12345 + types: [function, extension]