-
Notifications
You must be signed in to change notification settings - Fork 208
Feature: Generate function and extension logs via Telemetry API receiver #1347
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 11 commits
bd5dd05
63558a8
1ae4fdd
0f74cbe
0c54292
dd2e317
13a99af
4ff5876
730cfc9
999a7ce
1190cb5
165cdda
26a3c68
16f92d1
2575871
a297035
2d33076
cdea000
166628a
2e03ffe
3b7cda5
1bc05fb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -18,14 +18,19 @@ import ( | |||||||||||||
"context" | ||||||||||||||
"errors" | ||||||||||||||
|
||||||||||||||
"github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver/internal/sharedcomponent" | ||||||||||||||
"go.opentelemetry.io/collector/component" | ||||||||||||||
"go.opentelemetry.io/collector/consumer" | ||||||||||||||
"go.opentelemetry.io/collector/receiver" | ||||||||||||||
) | ||||||||||||||
|
||||||||||||||
const ( | ||||||||||||||
typeStr = "telemetryapi" | ||||||||||||||
stability = component.StabilityLevelDevelopment | ||||||||||||||
typeStr = "telemetryapi" | ||||||||||||||
stability = component.StabilityLevelDevelopment | ||||||||||||||
defaultPort = 4325 | ||||||||||||||
platform = "platform" | ||||||||||||||
function = "function" | ||||||||||||||
extension = "extension" | ||||||||||||||
) | ||||||||||||||
|
||||||||||||||
var errConfigNotTelemetryAPI = errors.New("config was not a Telemetry API receiver config") | ||||||||||||||
|
@@ -37,16 +42,36 @@ func NewFactory(extensionID string) receiver.Factory { | |||||||||||||
func() component.Config { | ||||||||||||||
return &Config{ | ||||||||||||||
extensionID: extensionID, | ||||||||||||||
Port: defaultPort, | ||||||||||||||
Types: []string{platform, function, extension}, | ||||||||||||||
} | ||||||||||||||
}, | ||||||||||||||
receiver.WithTraces(createTracesReceiver, stability)) | ||||||||||||||
receiver.WithTraces(createTracesReceiver, stability), | ||||||||||||||
receiver.WithLogs(createLogsReceiver, stability)) | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
func createTracesReceiver(ctx context.Context, params receiver.CreateSettings, rConf component.Config, next consumer.Traces) (receiver.Traces, error) { | ||||||||||||||
cfg, ok := rConf.(*Config) | ||||||||||||||
if !ok { | ||||||||||||||
return nil, errConfigNotTelemetryAPI | ||||||||||||||
} | ||||||||||||||
r := receivers.GetOrAdd(cfg, func() component.Component { | ||||||||||||||
return newTelemetryAPIReceiver(cfg, params) | ||||||||||||||
}) | ||||||||||||||
r.Unwrap().(*telemetryAPIReceiver).registerTracesConsumer(next) | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would want more data protection in this file than I see in the top-level variable for
Suggested change
And if that works we don't have to rely on copying There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. and actually this is a bit fragile. this means that you cannot use this receiver in lets say, more than one pipeline. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was thinking to reuse the http server. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think Can you take a look to the updated change and let me know your feedback? Thanks! |
||||||||||||||
return r, nil | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
return newTelemetryAPIReceiver(cfg, next, params) | ||||||||||||||
func createLogsReceiver(ctx context.Context, params receiver.CreateSettings, rConf component.Config, next consumer.Logs) (receiver.Logs, error) { | ||||||||||||||
cfg, ok := rConf.(*Config) | ||||||||||||||
if !ok { | ||||||||||||||
return nil, errConfigNotTelemetryAPI | ||||||||||||||
} | ||||||||||||||
r := receivers.GetOrAdd(cfg, func() component.Component { | ||||||||||||||
return newTelemetryAPIReceiver(cfg, params) | ||||||||||||||
}) | ||||||||||||||
r.Unwrap().(*telemetryAPIReceiver).registerLogsConsumer(next) | ||||||||||||||
return r, nil | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
var receivers = sharedcomponent.NewSharedComponents() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
// Package sharedcomponent exposes util functionality for receivers and exporters | ||
// that need to share state between different signal types instances such as net.Listener or os.File. | ||
package sharedcomponent // import "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver/internal/sharedcomponent" | ||
|
||
import ( | ||
"context" | ||
"sync" | ||
|
||
"go.opentelemetry.io/collector/component" | ||
) | ||
|
||
// SharedComponents a map that keeps reference of all created instances for a given configuration, | ||
// and ensures that the shared state is started and stopped only once. | ||
type SharedComponents struct { | ||
comps map[any]*SharedComponent | ||
} | ||
|
||
// NewSharedComponents returns a new empty SharedComponents. | ||
func NewSharedComponents() *SharedComponents { | ||
return &SharedComponents{ | ||
comps: make(map[any]*SharedComponent), | ||
} | ||
} | ||
|
||
// GetOrAdd returns the already created instance if exists, otherwise creates a new instance | ||
// and adds it to the map of references. | ||
func (scs *SharedComponents) GetOrAdd(key any, create func() component.Component) *SharedComponent { | ||
if c, ok := scs.comps[key]; ok { | ||
return c | ||
} | ||
newComp := &SharedComponent{ | ||
Component: create(), | ||
removeFunc: func() { | ||
delete(scs.comps, key) | ||
}, | ||
} | ||
scs.comps[key] = newComp | ||
return newComp | ||
} | ||
|
||
// SharedComponent ensures that the wrapped component is started and stopped only once. | ||
// When stopped it is removed from the SharedComponents map. | ||
type SharedComponent struct { | ||
component.Component | ||
|
||
startOnce sync.Once | ||
stopOnce sync.Once | ||
removeFunc func() | ||
} | ||
|
||
// Unwrap returns the original component. | ||
func (r *SharedComponent) Unwrap() component.Component { | ||
return r.Component | ||
} | ||
|
||
// Start implements component.Component. | ||
func (r *SharedComponent) Start(ctx context.Context, host component.Host) error { | ||
var err error | ||
r.startOnce.Do(func() { | ||
err = r.Component.Start(ctx, host) | ||
}) | ||
return err | ||
} | ||
|
||
// Shutdown implements component.Component. | ||
func (r *SharedComponent) Shutdown(ctx context.Context) error { | ||
var err error | ||
r.stopOnce.Do(func() { | ||
err = r.Component.Shutdown(ctx) | ||
r.removeFunc() | ||
}) | ||
return err | ||
} |
Uh oh!
There was an error while loading. Please reload this page.