diff --git a/.chloggen/42256.yaml b/.chloggen/42256.yaml new file mode 100644 index 0000000000000..7f8df040331fe --- /dev/null +++ b/.chloggen/42256.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: extension/health_check + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Added extension.healthcheck.disableCompatibilityWrapper feature gate to enable v2 component status reporting in healthcheckextension while maintaining backward compatibility by default." + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [42256] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/extension/healthcheckextension/README.md b/extension/healthcheckextension/README.md index 7b2bb17ecaf5e..ce5688650ce0c 100644 --- a/extension/healthcheckextension/README.md +++ b/extension/healthcheckextension/README.md @@ -1,5 +1,11 @@ # Health Check +> ℹ️ **Migration Notice** ℹ️ +> +> This extension is migrating to use component status reporting for health checks +> while maintaining full backward compatibility. See the [Backward Compatibility](#backward-compatibility) +> section for details about feature gates and migration options. + > ⚠️⚠️⚠️ **Warning** ⚠️⚠️⚠️ > > The `check_collector_pipeline` feature of this extension is not working as expected. It @@ -53,5 +59,45 @@ extensions: unhealthy: I'm bad! ``` -The full list of settings exposed for this exporter is documented in [config.go](./config.go) +The full list of settings exposed for this exporter is documented in [LegacyConfig in config.go](../../internal/healthcheck/internal/http/config.go#L24) with detailed sample configurations in [testdata/config.yaml](./testdata/config.yaml). + +## Backward Compatibility + +[Linked issue](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/42256). + +This extension maintains full backward compatibility with the original Ready/NotReady behavior by keeping the legacy implementation active unless a feature gate is enabled. + +### Feature Gate: `extension.healthcheck.disableCompatibilityWrapper` + +- **Default**: Disabled (false) +- **Purpose**: Switches the extension to the shared healthcheck implementation that reports status from component events +- **When enabled**: Health status is determined by component status events (v2 behavior) +- **When disabled**: Ready/NotReady calls directly control health endpoint status using the legacy implementation + +#### Usage + +To use the new event-driven behavior: + +```bash +# Set the feature gate to true +--feature-gates=extension.healthcheck.disableCompatibilityWrapper=true +``` + +#### Migration Timeline + +1. **Current**: Compatibility wrapper enabled by default - no breaking changes. +2. **Future**: Feature gate will be removed, compatibility wrapper will be permanently disabled. +3. **Recommended**: Test your setup with the feature gate enabled to prepare for future versions. + +#### Ready/NotReady Behavior + +**Legacy Implementation (Default)** +- `Ready()` → Health endpoint returns 200 OK +- `NotReady()` → Health endpoint returns 503 Service Unavailable +- Behavior identical to original extension + +**Shared Healthcheck Implementation (Feature gate enabled)** +- `Ready()`/`NotReady()` → Used for pipeline lifecycle only +- Health status determined by component status events +- Behavior similar to healthcheckv2extension diff --git a/extension/healthcheckextension/config.go b/extension/healthcheckextension/config.go index cd9526d7f21f8..50dca13561184 100644 --- a/extension/healthcheckextension/config.go +++ b/extension/healthcheckextension/config.go @@ -4,71 +4,12 @@ package healthcheckextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckextension" import ( - "errors" - "strings" - "time" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/confighttp" -) - -type ResponseBodySettings struct { - // Healthy represents the body of the response returned when the collector is healthy. - // The default value is "" - Healthy string `mapstructure:"healthy"` - - // Unhealthy represents the body of the response returned when the collector is unhealthy. - // The default value is "" - Unhealthy string `mapstructure:"unhealthy"` -} - -// Config has the configuration for the extension enabling the health check -// extension, used to report the health status of the service. -type Config struct { - confighttp.ServerConfig `mapstructure:",squash"` - - // Path represents the path the health check service will serve. - // The default path is "/". - Path string `mapstructure:"path"` - - // ResponseBody represents the body of the response returned by the health check service. - // This overrides the default response that it would return. - ResponseBody *ResponseBodySettings `mapstructure:"response_body"` - - // CheckCollectorPipeline contains the list of settings of collector pipeline health check - CheckCollectorPipeline checkCollectorPipelineSettings `mapstructure:"check_collector_pipeline"` -} - -var _ component.Config = (*Config)(nil) -var ( - errNoEndpointProvided = errors.New("bad config: endpoint must be specified") - errInvalidExporterFailureThresholdProvided = errors.New("bad config: exporter_failure_threshold expects a positive number") - errInvalidPath = errors.New("bad config: path must start with /") + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/healthcheck" ) -// Validate checks if the extension configuration is valid -func (cfg *Config) Validate() error { - _, err := time.ParseDuration(cfg.CheckCollectorPipeline.Interval) - if err != nil { - return err - } - if cfg.Endpoint == "" { - return errNoEndpointProvided - } - if cfg.CheckCollectorPipeline.ExporterFailureThreshold <= 0 { - return errInvalidExporterFailureThresholdProvided - } - if !strings.HasPrefix(cfg.Path, "/") { - return errInvalidPath - } - return nil -} +// Config is an alias to the shared healthcheck.Config to keep the extensions in lockstep. +// This ensures complete compatibility and eliminates the need for translation layers. +type Config = healthcheck.Config -type checkCollectorPipelineSettings struct { - // Enabled indicates whether to not enable collector pipeline check. - Enabled bool `mapstructure:"enabled"` - // Interval the time range to check healthy status of collector pipeline - Interval string `mapstructure:"interval"` - // ExporterFailureThreshold is the threshold of exporter failure numbers during the Interval - ExporterFailureThreshold int `mapstructure:"exporter_failure_threshold"` -} +// Type alias for backward compatibility +type ResponseBodySettings = healthcheck.ResponseBodyConfig diff --git a/extension/healthcheckextension/config_test.go b/extension/healthcheckextension/config_test.go index 474d42a3ba153..ccb72a9170ff6 100644 --- a/extension/healthcheckextension/config_test.go +++ b/extension/healthcheckextension/config_test.go @@ -6,20 +6,26 @@ package healthcheckextension import ( "path/filepath" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/config/confignet" "go.opentelemetry.io/collector/config/configoptional" "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/confmap/confmaptest" "go.opentelemetry.io/collector/confmap/xconfmap" + "go.opentelemetry.io/collector/extension/extensiontest" + "go.opentelemetry.io/collector/featuregate" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckextension/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/healthcheck" ) -func TestLoadConfig(t *testing.T) { +func TestLoadConfigLegacy(t *testing.T) { t.Parallel() tests := []struct { @@ -34,38 +40,39 @@ func TestLoadConfig(t *testing.T) { { id: component.NewIDWithName(metadata.Type, "1"), expected: &Config{ - ServerConfig: confighttp.ServerConfig{ - Endpoint: "localhost:13", - TLS: configoptional.Some(configtls.ServerConfig{ - Config: configtls.Config{ - CAFile: "/path/to/ca", - CertFile: "/path/to/cert", - KeyFile: "/path/to/key", - }, - }), + LegacyConfig: healthcheck.HTTPLegacyConfig{ + ServerConfig: confighttp.ServerConfig{ + Endpoint: "localhost:13", + TLS: configoptional.Some(configtls.ServerConfig{ + Config: configtls.Config{ + CAFile: "/path/to/ca", + CertFile: "/path/to/cert", + KeyFile: "/path/to/key", + }, + }), + }, + Path: "/", + CheckCollectorPipeline: &healthcheck.CheckCollectorPipelineConfig{ + Enabled: false, + Interval: "5m", + ExporterFailureThreshold: 5, + }, }, - CheckCollectorPipeline: defaultCheckCollectorPipelineSettings(), - Path: "/", - ResponseBody: nil, }, }, { id: component.NewIDWithName(metadata.Type, "missingendpoint"), - expectedErr: errNoEndpointProvided, - }, - { - id: component.NewIDWithName(metadata.Type, "invalidthreshold"), - expectedErr: errInvalidExporterFailureThresholdProvided, + expectedErr: healthcheck.ErrHTTPEndpointRequired, }, { id: component.NewIDWithName(metadata.Type, "invalidpath"), - expectedErr: errInvalidPath, + expectedErr: healthcheck.ErrInvalidPath, }, { id: component.NewIDWithName(metadata.Type, "response-body"), expected: func() component.Config { cfg := NewFactory().CreateDefaultConfig().(*Config) - cfg.ResponseBody = &ResponseBodySettings{ + cfg.ResponseBody = &healthcheck.ResponseBodyConfig{ Healthy: "I'm OK", Unhealthy: "I'm not well", } @@ -73,12 +80,12 @@ func TestLoadConfig(t *testing.T) { }(), }, } + for _, tt := range tests { t.Run(tt.id.String(), func(t *testing.T) { cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) require.NoError(t, err) - factory := NewFactory() - cfg := factory.CreateDefaultConfig() + cfg := NewFactory().CreateDefaultConfig() sub, err := cm.Sub(tt.id.String()) require.NoError(t, err) require.NoError(t, sub.Unmarshal(cfg)) @@ -91,3 +98,78 @@ func TestLoadConfig(t *testing.T) { }) } } + +func TestLoadConfigV2WithoutGate(t *testing.T) { + cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) + require.NoError(t, err) + + cfg := NewFactory().CreateDefaultConfig() + sub, err := cm.Sub("health_check/v2-http-only") + require.NoError(t, err) + require.NoError(t, sub.Unmarshal(cfg)) + assert.NotNil(t, cfg.(*Config).HTTPConfig) + + // Without the feature gate, v2 config is ignored and legacy extension is created. + // This allows users to have both legacy and v2 configs for easier migration. + f := NewFactory() + ext, err := f.Create(t.Context(), extensiontest.NewNopSettings(f.Type()), cfg) + require.NoError(t, err) + assert.IsType(t, &healthCheckExtension{}, ext, "should create legacy extension when gate is disabled") +} + +func TestLoadConfigV2WithGate(t *testing.T) { + prev := disableCompatibilityWrapperGate.IsEnabled() + require.NoError(t, featuregate.GlobalRegistry().Set(disableCompatibilityWrapperGate.ID(), true)) + t.Cleanup(func() { + require.NoError(t, featuregate.GlobalRegistry().Set(disableCompatibilityWrapperGate.ID(), prev)) + }) + + cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) + require.NoError(t, err) + + cfg := NewFactory().CreateDefaultConfig().(*Config) + sub, err := cm.Sub("health_check/v2-both-protocols") + require.NoError(t, err) + require.NoError(t, sub.Unmarshal(cfg)) + + assert.NoError(t, xconfmap.Validate(cfg)) + assert.Equal(t, &Config{ + LegacyConfig: healthcheck.HTTPLegacyConfig{ + ServerConfig: confighttp.ServerConfig{ + Endpoint: "localhost:13133", + }, + Path: "/", + CheckCollectorPipeline: &healthcheck.CheckCollectorPipelineConfig{ + Enabled: false, + Interval: "5m", + ExporterFailureThreshold: 5, + }, + }, + HTTPConfig: &healthcheck.HTTPConfig{ + ServerConfig: confighttp.ServerConfig{ + Endpoint: "localhost:13133", + }, + Status: healthcheck.PathConfig{ + Enabled: true, + Path: "/status", + }, + Config: healthcheck.PathConfig{ + Enabled: true, + Path: "/config", + }, + }, + GRPCConfig: &healthcheck.GRPCConfig{ + ServerConfig: configgrpc.ServerConfig{ + NetAddr: confignet.AddrConfig{ + Endpoint: "localhost:13132", + Transport: confignet.TransportTypeTCP, + }, + }, + }, + ComponentHealthConfig: &healthcheck.ComponentHealthConfig{ + IncludePermanent: true, + IncludeRecoverable: true, + RecoveryDuration: time.Minute, + }, + }, cfg) +} diff --git a/extension/healthcheckextension/factory.go b/extension/healthcheckextension/factory.go index 9303fa9be2cd4..27ef9734e55b9 100644 --- a/extension/healthcheckextension/factory.go +++ b/extension/healthcheckextension/factory.go @@ -9,13 +9,25 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/extension" + "go.opentelemetry.io/collector/featuregate" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckextension/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/healthcheck" ) const defaultPort = 13133 +// Feature gate that switches the extension to the shared healthcheck implementation +var disableCompatibilityWrapperGate = featuregate.GlobalRegistry().MustRegister( + "extension.healthcheck.disableCompatibilityWrapper", + featuregate.StageAlpha, + featuregate.WithRegisterDescription("Switch to the shared healthcheck implementation powered by component status events"), + featuregate.WithRegisterFromVersion("v0.138.0"), + featuregate.WithRegisterToVersion("v0.143.0"), + featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/42256"), +) + // NewFactory creates a factory for HealthCheck extension. func NewFactory() extension.Factory { return extension.NewFactory( @@ -28,25 +40,48 @@ func NewFactory() extension.Factory { func createDefaultConfig() component.Config { return &Config{ - ServerConfig: confighttp.ServerConfig{ - Endpoint: testutil.EndpointForPort(defaultPort), + LegacyConfig: healthcheck.HTTPLegacyConfig{ + ServerConfig: confighttp.ServerConfig{ + Endpoint: testutil.EndpointForPort(defaultPort), + }, + Path: "/", + CheckCollectorPipeline: &healthcheck.CheckCollectorPipelineConfig{ + Enabled: false, + Interval: "5m", + ExporterFailureThreshold: 5, + }, }, - CheckCollectorPipeline: defaultCheckCollectorPipelineSettings(), - Path: "/", } } -func createExtension(_ context.Context, set extension.Settings, cfg component.Config) (extension.Extension, error) { +func createExtension(ctx context.Context, set extension.Settings, cfg component.Config) (extension.Extension, error) { config := cfg.(*Config) - return newServer(*config, set.TelemetrySettings), nil -} + if disableCompatibilityWrapperGate.IsEnabled() { + // When feature gate is enabled, use v2 implementation directly. + // The feature gate controls behavior, not the presence of v2 config fields. + config.UseV2 = true -// defaultCheckCollectorPipelineSettings returns the default settings for CheckCollectorPipeline. -func defaultCheckCollectorPipelineSettings() checkCollectorPipelineSettings { - return checkCollectorPipelineSettings{ - Enabled: false, - Interval: "5m", - ExporterFailureThreshold: 5, + // If no v2 config is set, create HTTP config from legacy settings for backward compatibility + if config.HTTPConfig == nil && config.GRPCConfig == nil { + config.HTTPConfig = &healthcheck.HTTPConfig{ + ServerConfig: config.ServerConfig, + Status: healthcheck.PathConfig{ + Enabled: true, + Path: config.Path, + }, + Config: healthcheck.PathConfig{ + Enabled: false, + Path: "/config", + }, + } + } + + return healthcheck.NewHealthCheckExtension(ctx, *config, set), nil } + + // Feature gate disabled: use legacy implementation. + // V2 config fields (HTTPConfig/GRPCConfig) are ignored even if present in the config. + // This allows users to have both configs present for easier migration. + return newServer(*config, set.TelemetrySettings), nil } diff --git a/extension/healthcheckextension/factory_test.go b/extension/healthcheckextension/factory_test.go index 5e36b67915445..1ea9cd3516240 100644 --- a/extension/healthcheckextension/factory_test.go +++ b/extension/healthcheckextension/factory_test.go @@ -4,6 +4,9 @@ package healthcheckextension import ( + "net" + "net/http" + "runtime" "testing" "github.com/stretchr/testify/assert" @@ -11,31 +14,89 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/extension/extensiontest" + "go.opentelemetry.io/collector/featuregate" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/healthcheck" ) func TestFactory_CreateDefaultConfig(t *testing.T) { cfg := createDefaultConfig() - assert.Equal(t, &Config{ - ServerConfig: confighttp.ServerConfig{ - Endpoint: "localhost:13133", + expected := &Config{ + LegacyConfig: healthcheck.HTTPLegacyConfig{ + ServerConfig: confighttp.ServerConfig{ + Endpoint: "localhost:13133", + }, + Path: "/", + CheckCollectorPipeline: &healthcheck.CheckCollectorPipelineConfig{ + Enabled: false, + Interval: "5m", + ExporterFailureThreshold: 5, + }, }, - CheckCollectorPipeline: defaultCheckCollectorPipelineSettings(), - Path: "/", - }, cfg) + } + assert.Equal(t, expected, cfg) assert.NoError(t, componenttest.CheckConfigStruct(cfg)) +} + +func TestFactory_CreateLegacyExtension(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.Endpoint = testutil.GetAvailableLocalAddress(t) + ext, err := createExtension(t.Context(), extensiontest.NewNopSettings(extensiontest.NopType), cfg) require.NoError(t, err) - require.NotNil(t, ext) + require.IsType(t, &healthCheckExtension{}, ext) } -func TestFactory_Create(t *testing.T) { +func TestLegacyExtensionLifecycle(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.Endpoint = testutil.GetAvailableLocalAddress(t) + + hcExt, err := createExtension(t.Context(), extensiontest.NewNopSettings(extensiontest.NopType), cfg) + require.NoError(t, err) + + require.NoError(t, hcExt.Start(t.Context(), componenttest.NewNopHost())) + t.Cleanup(func() { require.NoError(t, hcExt.Shutdown(t.Context())) }) + + // Give a chance for the server goroutine to run. + runtime.Gosched() + + client := &http.Client{} + url := "http://" + cfg.Endpoint + cfg.Path + + resp, err := client.Get(url) + require.NoError(t, err) + require.NotNil(t, resp) + require.NoError(t, resp.Body.Close()) +} + +func TestLegacyExtensionPortAlreadyInUse(t *testing.T) { + endpoint := testutil.GetAvailableLocalAddress(t) + ln, err := net.Listen("tcp", endpoint) + require.NoError(t, err) + defer ln.Close() + + cfg := createDefaultConfig().(*Config) + cfg.Endpoint = endpoint + + hcExt, err := createExtension(t.Context(), extensiontest.NewNopSettings(extensiontest.NopType), cfg) + require.NoError(t, err) + + require.Error(t, hcExt.Start(t.Context(), componenttest.NewNopHost())) +} + +func TestFactory_CreateV2Extension(t *testing.T) { + prev := disableCompatibilityWrapperGate.IsEnabled() + require.NoError(t, featuregate.GlobalRegistry().Set(disableCompatibilityWrapperGate.ID(), true)) + t.Cleanup(func() { + require.NoError(t, featuregate.GlobalRegistry().Set(disableCompatibilityWrapperGate.ID(), prev)) + }) + cfg := createDefaultConfig().(*Config) cfg.Endpoint = testutil.GetAvailableLocalAddress(t) ext, err := createExtension(t.Context(), extensiontest.NewNopSettings(extensiontest.NopType), cfg) require.NoError(t, err) - require.NotNil(t, ext) + require.IsType(t, &healthcheck.HealthCheckExtension{}, ext) } diff --git a/extension/healthcheckextension/go.mod b/extension/healthcheckextension/go.mod index 016055c6c9c27..ea6b3a2f387f0 100644 --- a/extension/healthcheckextension/go.mod +++ b/extension/healthcheckextension/go.mod @@ -4,11 +4,14 @@ go 1.24.0 require ( github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.138.0 + github.com/open-telemetry/opentelemetry-collector-contrib/internal/healthcheck v0.138.0 github.com/stretchr/testify v1.11.1 go.opentelemetry.io/collector/component v1.44.0 go.opentelemetry.io/collector/component/componentstatus v0.138.0 go.opentelemetry.io/collector/component/componenttest v0.138.0 + go.opentelemetry.io/collector/config/configgrpc v0.138.0 go.opentelemetry.io/collector/config/confighttp v0.138.0 + go.opentelemetry.io/collector/config/confignet v1.44.0 go.opentelemetry.io/collector/config/configoptional v1.44.0 go.opentelemetry.io/collector/config/configtls v1.44.0 go.opentelemetry.io/collector/confmap v1.44.0 @@ -16,6 +19,7 @@ require ( go.opentelemetry.io/collector/extension v1.44.0 go.opentelemetry.io/collector/extension/extensioncapabilities v0.138.0 go.opentelemetry.io/collector/extension/extensiontest v0.138.0 + go.opentelemetry.io/collector/featuregate v1.44.0 go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 ) @@ -43,6 +47,8 @@ require ( github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect + github.com/mostynb/go-grpc-compression v1.2.3 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status v0.138.0 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/rs/cors v1.11.1 // indirect @@ -54,11 +60,11 @@ require ( go.opentelemetry.io/collector/config/configopaque v1.44.0 // indirect go.opentelemetry.io/collector/extension/extensionauth v1.44.0 // indirect go.opentelemetry.io/collector/extension/extensionmiddleware v0.138.0 // indirect - go.opentelemetry.io/collector/featuregate v1.44.0 // indirect go.opentelemetry.io/collector/internal/telemetry v0.138.0 // indirect go.opentelemetry.io/collector/pdata v1.44.0 // indirect go.opentelemetry.io/collector/pipeline v1.44.0 // indirect go.opentelemetry.io/contrib/bridges/otelzap v0.13.0 // indirect + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.63.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.63.0 // indirect go.opentelemetry.io/otel v1.38.0 // indirect go.opentelemetry.io/otel/log v0.14.0 // indirect @@ -72,7 +78,7 @@ require ( golang.org/x/net v0.43.0 // indirect golang.org/x/sys v0.35.0 // indirect golang.org/x/text v0.28.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20250804133106-a7a43d27e69b // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250825161204-c5933d9347a5 // indirect google.golang.org/grpc v1.76.0 // indirect google.golang.org/protobuf v1.36.10 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect @@ -80,8 +86,12 @@ require ( replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/common => ../../internal/common +replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/healthcheck => ../../internal/healthcheck + retract ( v0.76.2 v0.76.1 v0.65.0 ) + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status => ../../pkg/status diff --git a/extension/healthcheckextension/go.sum b/extension/healthcheckextension/go.sum index ca9015781ea6e..f9c3091b347c9 100644 --- a/extension/healthcheckextension/go.sum +++ b/extension/healthcheckextension/go.sum @@ -62,6 +62,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJ github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee h1:W5t00kpgFdJifH4BDsTlE89Zl93FEloxaWZfGcifgq8= github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/mostynb/go-grpc-compression v1.2.3 h1:42/BKWMy0KEJGSdWvzqIyOZ95YcR9mLPqKctH7Uo//I= +github.com/mostynb/go-grpc-compression v1.2.3/go.mod h1:AghIxF3P57umzqM9yz795+y1Vjs47Km/Y2FE6ouQ7Lg= github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -91,10 +93,14 @@ go.opentelemetry.io/collector/config/configauth v1.44.0 h1:zYur6VJyHFtJW/1MSKyRa go.opentelemetry.io/collector/config/configauth v1.44.0/go.mod h1:8arPf8HFVkhKabgDsKqTggm081s71IYF8LogcGlHUeY= go.opentelemetry.io/collector/config/configcompression v1.44.0 h1:AaNpVYWFrmWKGnZdJCuVSlY3STSm0UBTuZU13aavvlQ= go.opentelemetry.io/collector/config/configcompression v1.44.0/go.mod h1:ZlnKaXFYL3HVMUNWVAo/YOLYoxNZo7h8SrQp3l7GV00= +go.opentelemetry.io/collector/config/configgrpc v0.138.0 h1:kY0vTvurV0PkeaJG/otkBrMNk6RGJk9n8s+5PpZJcGg= +go.opentelemetry.io/collector/config/configgrpc v0.138.0/go.mod h1:xOQCBmGksJxU/OUr28jxVTttS3x6Nc1IgkcbJU9MOoI= go.opentelemetry.io/collector/config/confighttp v0.138.0 h1:6NaoRNwwS+Hci8XC+oxGH2njZTw/hm3Bv66TsvpBip8= go.opentelemetry.io/collector/config/confighttp v0.138.0/go.mod h1:0NKEeugQ7zQ/q6REMqxNPOrkYH8LdpUm6e9OlzMbfZg= go.opentelemetry.io/collector/config/configmiddleware v1.44.0 h1:lXIF5YMZi9hmyInvmGimmKKMtukSJP4CfvyKaLyIbUg= go.opentelemetry.io/collector/config/configmiddleware v1.44.0/go.mod h1:7f+1+cmt4spFY3Gs14XB/04RSsDYG7ycTzvNJbeayPY= +go.opentelemetry.io/collector/config/confignet v1.44.0 h1:2bjbOxUz4z1XHSGF6UJxygdxdpG2vPf+SOh2UDww7zQ= +go.opentelemetry.io/collector/config/confignet v1.44.0/go.mod h1:4jJWdoe1MmpqxMzxrIILcS5FK2JPocXYZGUvv5ZQVKE= go.opentelemetry.io/collector/config/configopaque v1.44.0 h1:bfpNfe42k7SEREJZ2l3jI0EKjCUqKslvlY3o4OGYhGg= go.opentelemetry.io/collector/config/configopaque v1.44.0/go.mod h1:9uzLyGsWX0FtPWkomQXqLtblmSHgJFaM4T0gMBrCma0= go.opentelemetry.io/collector/config/configoptional v1.44.0 h1:Jaq8V5JBVsdKQ275QkBuCYUMmZnlNMoCFatryRius2I= @@ -127,10 +133,16 @@ go.opentelemetry.io/collector/internal/telemetry v0.138.0 h1:xHHYlPh1vVvr+ip0ct2 go.opentelemetry.io/collector/internal/telemetry v0.138.0/go.mod h1:evqf71fdIMXdQEofbs1bVnBUzfF6zysLMLR9bEAS9Xw= go.opentelemetry.io/collector/pdata v1.44.0 h1:q/EfWDDKrSaf4hjTIzyPeg1ZcCRg1Uj7VTFnGfNVdk8= go.opentelemetry.io/collector/pdata v1.44.0/go.mod h1:LnsjYysFc3AwMVh6KGNlkGKJUF2ReuWxtD9Hb3lSMZk= +go.opentelemetry.io/collector/pdata/pprofile v0.138.0 h1:ElnIPJK8jVzHYSnzbIVjg/v2Yq8iVLUKf7kB00zUFlE= +go.opentelemetry.io/collector/pdata/pprofile v0.138.0/go.mod h1:M7/5+Q4LohEkEB38kHhFu3S3XCA1eGSGz5uSXvNyMlM= +go.opentelemetry.io/collector/pdata/testdata v0.138.0 h1:6geeGQ4Rsb88OARLcACKn09PVIbhExaNJ1aC9OVLZaw= +go.opentelemetry.io/collector/pdata/testdata v0.138.0/go.mod h1:4wvgY+KTP7ohJVd1/pb8UIKb2TA/girsZbGTKqM5e20= go.opentelemetry.io/collector/pipeline v1.44.0 h1:EFdFBg3Wm2BlMtQbUeork5a4KFpS6haInSr+u/dk8rg= go.opentelemetry.io/collector/pipeline v1.44.0/go.mod h1:xUrAqiebzYbrgxyoXSkk6/Y3oi5Sy3im2iCA51LwUAI= go.opentelemetry.io/contrib/bridges/otelzap v0.13.0 h1:aBKdhLVieqvwWe9A79UHI/0vgp2t/s2euY8X59pGRlw= go.opentelemetry.io/contrib/bridges/otelzap v0.13.0/go.mod h1:SYqtxLQE7iINgh6WFuVi2AI70148B8EI35DSk0Wr8m4= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.63.0 h1:YH4g8lQroajqUwWbq/tr2QX1JFmEXaDLgG+ew9bLMWo= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.63.0/go.mod h1:fvPi2qXDqFs8M4B4fmJhE92TyQs9Ydjlg3RvfUp+NbQ= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.63.0 h1:RbKq8BG0FI8OiXhBfcRtqqHcZcka+gU3cskNuf05R18= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.63.0/go.mod h1:h06DGIukJOevXaj/xrNjhi/2098RZzcLTbc0jDAUbsg= go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8= @@ -196,8 +208,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250804133106-a7a43d27e69b h1:zPKJod4w6F1+nRGDI9ubnXYhU9NSWoFAijkHkUXeTK8= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250804133106-a7a43d27e69b/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250825161204-c5933d9347a5 h1:eaY8u2EuxbRv7c3NiGK0/NedzVsCcV6hDuU5qPX5EGE= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250825161204-c5933d9347a5/go.mod h1:M4/wBTSeyLxupu3W3tJtOgB14jILAS/XWPSSa3TAlJc= google.golang.org/grpc v1.76.0 h1:UnVkv1+uMLYXoIz6o7chp59WfQUYA2ex/BXQ9rHZu7A= google.golang.org/grpc v1.76.0/go.mod h1:Ju12QI8M6iQJtbcsV+awF5a4hfJMLi4X0JLo94ULZ6c= google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE= diff --git a/extension/healthcheckextension/healthcheckextension_test.go b/extension/healthcheckextension/healthcheckextension_test.go index b3ce8db38844e..5ef8a9c86f56c 100644 --- a/extension/healthcheckextension/healthcheckextension_test.go +++ b/extension/healthcheckextension/healthcheckextension_test.go @@ -4,6 +4,7 @@ package healthcheckextension import ( + "context" "io" "net" "net/http" @@ -14,8 +15,11 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/extension/extensioncapabilities" + "go.opentelemetry.io/collector/extension/extensiontest" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/healthcheck" ) const ( @@ -31,7 +35,7 @@ func ensureServerRunning(url string) func() bool { } type teststep struct { - step func(*healthCheckExtension) error + step func(extensioncapabilities.PipelineWatcher) error expectedStatusCode int expectedBody string } @@ -39,18 +43,23 @@ type teststep struct { func TestHealthCheckExtensionUsage(t *testing.T) { tests := []struct { name string - config Config + config *Config teststeps []teststep }{ { name: "WithoutCheckCollectorPipeline", - config: Config{ - ServerConfig: confighttp.ServerConfig{ - Endpoint: testutil.GetAvailableLocalAddress(t), + config: &Config{ + LegacyConfig: healthcheck.HTTPLegacyConfig{ + ServerConfig: confighttp.ServerConfig{ + Endpoint: testutil.GetAvailableLocalAddress(t), + }, + Path: "/", + CheckCollectorPipeline: &healthcheck.CheckCollectorPipelineConfig{ + Enabled: false, + Interval: "5m", + ExporterFailureThreshold: 5, + }, }, - CheckCollectorPipeline: defaultCheckCollectorPipelineSettings(), - Path: "/", - ResponseBody: nil, }, teststeps: []teststep{ { @@ -58,12 +67,12 @@ func TestHealthCheckExtensionUsage(t *testing.T) { expectedBody: expectedBodyNotReady, }, { - step: func(hcExt *healthCheckExtension) error { return hcExt.Ready() }, + step: func(hcExt extensioncapabilities.PipelineWatcher) error { return hcExt.Ready() }, expectedStatusCode: http.StatusOK, expectedBody: expectedBodyReady, }, { - step: func(hcExt *healthCheckExtension) error { return hcExt.NotReady() }, + step: func(hcExt extensioncapabilities.PipelineWatcher) error { return hcExt.NotReady() }, expectedStatusCode: http.StatusServiceUnavailable, expectedBody: expectedBodyNotReady, }, @@ -71,36 +80,51 @@ func TestHealthCheckExtensionUsage(t *testing.T) { }, { name: "WithCustomizedPathWithoutCheckCollectorPipeline", - config: Config{ - ServerConfig: confighttp.ServerConfig{ - Endpoint: testutil.GetAvailableLocalAddress(t), + config: &Config{ + LegacyConfig: healthcheck.HTTPLegacyConfig{ + ServerConfig: confighttp.ServerConfig{ + Endpoint: testutil.GetAvailableLocalAddress(t), + }, + Path: "/health", + CheckCollectorPipeline: &healthcheck.CheckCollectorPipelineConfig{ + Enabled: false, + Interval: "5m", + ExporterFailureThreshold: 5, + }, }, - CheckCollectorPipeline: defaultCheckCollectorPipelineSettings(), - Path: "/health", }, teststeps: []teststep{ { expectedStatusCode: http.StatusServiceUnavailable, }, { - step: func(hcExt *healthCheckExtension) error { return hcExt.Ready() }, + step: func(hcExt extensioncapabilities.PipelineWatcher) error { return hcExt.Ready() }, expectedStatusCode: http.StatusOK, }, { - step: func(hcExt *healthCheckExtension) error { return hcExt.NotReady() }, + step: func(hcExt extensioncapabilities.PipelineWatcher) error { return hcExt.NotReady() }, expectedStatusCode: http.StatusServiceUnavailable, }, }, }, { name: "WithBothCustomResponseBodyWithoutCheckCollectorPipeline", - config: Config{ - ServerConfig: confighttp.ServerConfig{ - Endpoint: testutil.GetAvailableLocalAddress(t), + config: &Config{ + LegacyConfig: healthcheck.HTTPLegacyConfig{ + ServerConfig: confighttp.ServerConfig{ + Endpoint: testutil.GetAvailableLocalAddress(t), + }, + Path: "/", + ResponseBody: &healthcheck.ResponseBodyConfig{ + Healthy: "ALL OK", + Unhealthy: "NOT OK", + }, + CheckCollectorPipeline: &healthcheck.CheckCollectorPipelineConfig{ + Enabled: false, + Interval: "5m", + ExporterFailureThreshold: 5, + }, }, - CheckCollectorPipeline: defaultCheckCollectorPipelineSettings(), - Path: "/", - ResponseBody: &ResponseBodySettings{Healthy: "ALL OK", Unhealthy: "NOT OK"}, }, teststeps: []teststep{ { @@ -108,12 +132,12 @@ func TestHealthCheckExtensionUsage(t *testing.T) { expectedBody: "NOT OK", }, { - step: func(hcExt *healthCheckExtension) error { return hcExt.Ready() }, + step: func(hcExt extensioncapabilities.PipelineWatcher) error { return hcExt.Ready() }, expectedStatusCode: http.StatusOK, expectedBody: "ALL OK", }, { - step: func(hcExt *healthCheckExtension) error { return hcExt.NotReady() }, + step: func(hcExt extensioncapabilities.PipelineWatcher) error { return hcExt.NotReady() }, expectedStatusCode: http.StatusServiceUnavailable, expectedBody: "NOT OK", }, @@ -121,13 +145,21 @@ func TestHealthCheckExtensionUsage(t *testing.T) { }, { name: "WithHealthyCustomResponseBodyWithoutCheckCollectorPipeline", - config: Config{ - ServerConfig: confighttp.ServerConfig{ - Endpoint: testutil.GetAvailableLocalAddress(t), + config: &Config{ + LegacyConfig: healthcheck.HTTPLegacyConfig{ + ServerConfig: confighttp.ServerConfig{ + Endpoint: testutil.GetAvailableLocalAddress(t), + }, + Path: "/", + ResponseBody: &healthcheck.ResponseBodyConfig{ + Healthy: "ALL OK", + }, + CheckCollectorPipeline: &healthcheck.CheckCollectorPipelineConfig{ + Enabled: false, + Interval: "5m", + ExporterFailureThreshold: 5, + }, }, - CheckCollectorPipeline: defaultCheckCollectorPipelineSettings(), - Path: "/", - ResponseBody: &ResponseBodySettings{Healthy: "ALL OK"}, }, teststeps: []teststep{ { @@ -135,12 +167,12 @@ func TestHealthCheckExtensionUsage(t *testing.T) { expectedBody: "", }, { - step: func(hcExt *healthCheckExtension) error { return hcExt.Ready() }, + step: func(hcExt extensioncapabilities.PipelineWatcher) error { return hcExt.Ready() }, expectedStatusCode: http.StatusOK, expectedBody: "ALL OK", }, { - step: func(hcExt *healthCheckExtension) error { return hcExt.NotReady() }, + step: func(hcExt extensioncapabilities.PipelineWatcher) error { return hcExt.NotReady() }, expectedStatusCode: http.StatusServiceUnavailable, expectedBody: "", }, @@ -148,13 +180,21 @@ func TestHealthCheckExtensionUsage(t *testing.T) { }, { name: "WithUnhealthyCustomResponseBodyWithoutCheckCollectorPipeline", - config: Config{ - ServerConfig: confighttp.ServerConfig{ - Endpoint: testutil.GetAvailableLocalAddress(t), + config: &Config{ + LegacyConfig: healthcheck.HTTPLegacyConfig{ + ServerConfig: confighttp.ServerConfig{ + Endpoint: testutil.GetAvailableLocalAddress(t), + }, + Path: "/", + ResponseBody: &healthcheck.ResponseBodyConfig{ + Unhealthy: "NOT OK", + }, + CheckCollectorPipeline: &healthcheck.CheckCollectorPipelineConfig{ + Enabled: false, + Interval: "5m", + ExporterFailureThreshold: 5, + }, }, - CheckCollectorPipeline: defaultCheckCollectorPipelineSettings(), - Path: "/", - ResponseBody: &ResponseBodySettings{Unhealthy: "NOT OK"}, }, teststeps: []teststep{ { @@ -162,12 +202,12 @@ func TestHealthCheckExtensionUsage(t *testing.T) { expectedBody: "NOT OK", }, { - step: func(hcExt *healthCheckExtension) error { return hcExt.Ready() }, + step: func(hcExt extensioncapabilities.PipelineWatcher) error { return hcExt.Ready() }, expectedStatusCode: http.StatusOK, expectedBody: "", }, { - step: func(hcExt *healthCheckExtension) error { return hcExt.NotReady() }, + step: func(hcExt extensioncapabilities.PipelineWatcher) error { return hcExt.NotReady() }, expectedStatusCode: http.StatusServiceUnavailable, expectedBody: "NOT OK", }, @@ -177,11 +217,18 @@ func TestHealthCheckExtensionUsage(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - hcExt := newServer(tt.config, componenttest.NewNopTelemetrySettings()) + // Use context.Background() instead of t.Context() because createExtension starts + // a background goroutine that needs to stay alive for the duration of the test. + // t.Context() may be cancelled immediately causing the goroutine to exit prematurely. + //nolint:usetesting + hcExt, err := createExtension(context.Background(), extensiontest.NewNopSettings(extensiontest.NopType), tt.config) + require.NoError(t, err) require.NotNil(t, hcExt) - require.NoError(t, hcExt.Start(t.Context(), componenttest.NewNopHost())) - t.Cleanup(func() { require.NoError(t, hcExt.Shutdown(t.Context())) }) + //nolint:usetesting + require.NoError(t, hcExt.Start(context.Background(), componenttest.NewNopHost())) + //nolint:usetesting + t.Cleanup(func() { require.NoError(t, hcExt.Shutdown(context.Background())) }) // Give a chance for the server goroutine to run. runtime.Gosched() @@ -190,9 +237,13 @@ func TestHealthCheckExtensionUsage(t *testing.T) { client := &http.Client{} url := "http://" + tt.config.Endpoint + tt.config.Path + // Cast to PipelineWatcher for step functions + pw, ok := hcExt.(extensioncapabilities.PipelineWatcher) + require.True(t, ok, "extension must implement PipelineWatcher") + for _, ts := range tt.teststeps { if ts.step != nil { - require.NoError(t, ts.step(hcExt)) + require.NoError(t, ts.step(pw)) } resp, err := client.Get(url) @@ -212,73 +263,29 @@ func TestHealthCheckExtensionUsage(t *testing.T) { } } -func TestHealthCheckExtensionPortAlreadyInUse(t *testing.T) { - endpoint := testutil.GetAvailableLocalAddress(t) - - // This needs to be ":port" because health checks also tries to connect to ":port". - // To avoid the pop-up "accept incoming network connections" health check should be changed - // to accept an address. - ln, err := net.Listen("tcp", endpoint) - require.NoError(t, err) - defer ln.Close() - - config := Config{ - ServerConfig: confighttp.ServerConfig{ - Endpoint: endpoint, - }, - CheckCollectorPipeline: defaultCheckCollectorPipelineSettings(), - } - hcExt := newServer(config, componenttest.NewNopTelemetrySettings()) - require.NotNil(t, hcExt) - - require.Error(t, hcExt.Start(t.Context(), componenttest.NewNopHost())) -} - -func TestHealthCheckMultipleStarts(t *testing.T) { - config := Config{ - ServerConfig: confighttp.ServerConfig{ - Endpoint: testutil.GetAvailableLocalAddress(t), - }, - CheckCollectorPipeline: defaultCheckCollectorPipelineSettings(), - Path: "/", - } - - hcExt := newServer(config, componenttest.NewNopTelemetrySettings()) - require.NotNil(t, hcExt) - - require.NoError(t, hcExt.Start(t.Context(), componenttest.NewNopHost())) - t.Cleanup(func() { require.NoError(t, hcExt.Shutdown(t.Context())) }) - - require.Error(t, hcExt.Start(t.Context(), componenttest.NewNopHost())) -} - -func TestHealthCheckMultipleShutdowns(t *testing.T) { - config := Config{ - ServerConfig: confighttp.ServerConfig{ - Endpoint: testutil.GetAvailableLocalAddress(t), - }, - CheckCollectorPipeline: defaultCheckCollectorPipelineSettings(), - Path: "/", - } - - hcExt := newServer(config, componenttest.NewNopTelemetrySettings()) - require.NotNil(t, hcExt) - - require.NoError(t, hcExt.Start(t.Context(), componenttest.NewNopHost())) - require.NoError(t, hcExt.Shutdown(t.Context())) - require.NoError(t, hcExt.Shutdown(t.Context())) -} - func TestHealthCheckShutdownWithoutStart(t *testing.T) { - config := Config{ - ServerConfig: confighttp.ServerConfig{ - Endpoint: testutil.GetAvailableLocalAddress(t), + config := &Config{ + LegacyConfig: healthcheck.HTTPLegacyConfig{ + ServerConfig: confighttp.ServerConfig{ + Endpoint: testutil.GetAvailableLocalAddress(t), + }, + Path: "/", + CheckCollectorPipeline: &healthcheck.CheckCollectorPipelineConfig{ + Enabled: false, + Interval: "5m", + ExporterFailureThreshold: 5, + }, }, - CheckCollectorPipeline: defaultCheckCollectorPipelineSettings(), } - hcExt := newServer(config, componenttest.NewNopTelemetrySettings()) + // Use context.Background() instead of t.Context() because createExtension starts + // a background goroutine that needs to stay alive for the duration of the test. + // t.Context() may be cancelled immediately causing the goroutine to exit prematurely. + //nolint:usetesting + hcExt, err := createExtension(context.Background(), extensiontest.NewNopSettings(extensiontest.NopType), config) + require.NoError(t, err) require.NotNil(t, hcExt) - require.NoError(t, hcExt.Shutdown(t.Context())) + //nolint:usetesting + require.NoError(t, hcExt.Shutdown(context.Background())) } diff --git a/extension/healthcheckextension/integration_test.go b/extension/healthcheckextension/integration_test.go index 4d1b167951f23..da35a063e2928 100644 --- a/extension/healthcheckextension/integration_test.go +++ b/extension/healthcheckextension/integration_test.go @@ -9,41 +9,118 @@ import ( "io" "net/http" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/extension/extensioncapabilities" "go.opentelemetry.io/collector/extension/extensiontest" + "go.opentelemetry.io/collector/featuregate" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/healthcheck" ) -func Test_SimpleHealthCheck(t *testing.T) { +func TestLegacyReadyNotReadyBehavior(t *testing.T) { + transport := &http.Transport{ + DisableKeepAlives: true, + } + client := &http.Client{ + Transport: transport, + } + t.Cleanup(func() { + transport.CloseIdleConnections() + }) + f := NewFactory() port := testutil.GetAvailablePort(t) cfg := f.CreateDefaultConfig().(*Config) cfg.Endpoint = fmt.Sprintf("localhost:%d", port) - e, err := f.Create(t.Context(), extensiontest.NewNopSettings(f.Type()), cfg) - require.NoError(t, err) - err = e.Start(t.Context(), componenttest.NewNopHost()) + ext, err := f.Create(t.Context(), extensiontest.NewNopSettings(f.Type()), cfg) require.NoError(t, err) + require.IsType(t, &healthCheckExtension{}, ext) + + require.NoError(t, ext.Start(t.Context(), componenttest.NewNopHost())) t.Cleanup(func() { - require.NoError(t, e.Shutdown(t.Context())) + require.NoError(t, ext.Shutdown(t.Context())) }) - resp, err := http.DefaultClient.Get(fmt.Sprintf("http://localhost:%d/", port)) + + resp, err := client.Get(fmt.Sprintf("http://localhost:%d/", port)) require.NoError(t, err) - assert.Equal(t, "503 Service Unavailable", resp.Status) - var buf bytes.Buffer - _, err = io.Copy(&buf, resp.Body) + assert.Equal(t, http.StatusServiceUnavailable, resp.StatusCode) + + body, err := io.ReadAll(resp.Body) require.NoError(t, err) - assert.JSONEq(t, `{"status":"Server not available","upSince":"0001-01-01T00:00:00Z","uptime":""}`, buf.String()) - err = e.(*healthCheckExtension).Ready() + require.NoError(t, resp.Body.Close()) + assert.JSONEq(t, `{"status":"Server not available","upSince":"0001-01-01T00:00:00Z","uptime":""}`, string(body)) + + pipelineWatcher, ok := ext.(extensioncapabilities.PipelineWatcher) + require.True(t, ok) + + require.NoError(t, pipelineWatcher.Ready()) + + assert.Eventually(t, func() bool { + checkResp, checkErr := client.Get(fmt.Sprintf("http://localhost:%d/", port)) + if checkErr != nil { + return false + } + defer checkResp.Body.Close() + return checkResp.StatusCode == http.StatusOK + }, 150*time.Millisecond, 5*time.Millisecond) + + resp, err = client.Get(fmt.Sprintf("http://localhost:%d/", port)) + require.NoError(t, err) + buf := bytes.NewBuffer(nil) + _, err = io.Copy(buf, resp.Body) require.NoError(t, err) - resp, err = http.DefaultClient.Get(fmt.Sprintf("http://localhost:%d/", port)) + require.NoError(t, resp.Body.Close()) + assert.Contains(t, buf.String(), `"status":"Server available"`) + + require.NoError(t, pipelineWatcher.NotReady()) + + assert.Eventually(t, func() bool { + checkResp, checkErr := client.Get(fmt.Sprintf("http://localhost:%d/", port)) + if checkErr != nil { + return false + } + defer checkResp.Body.Close() + return checkResp.StatusCode == http.StatusServiceUnavailable + }, 150*time.Millisecond, 5*time.Millisecond) +} + +func TestV2ExtensionEnabledByGate(t *testing.T) { + prev := disableCompatibilityWrapperGate.IsEnabled() + require.NoError(t, featuregate.GlobalRegistry().Set(disableCompatibilityWrapperGate.ID(), true)) + t.Cleanup(func() { + require.NoError(t, featuregate.GlobalRegistry().Set(disableCompatibilityWrapperGate.ID(), prev)) + }) + + transport := &http.Transport{ + DisableKeepAlives: true, + } + client := &http.Client{ + Transport: transport, + } + t.Cleanup(func() { + transport.CloseIdleConnections() + }) + + f := NewFactory() + port := testutil.GetAvailablePort(t) + cfg := f.CreateDefaultConfig().(*Config) + cfg.Endpoint = fmt.Sprintf("localhost:%d", port) + ext, err := f.Create(t.Context(), extensiontest.NewNopSettings(f.Type()), cfg) require.NoError(t, err) - assert.Equal(t, "200 OK", resp.Status) - buf.Reset() - _, err = io.Copy(&buf, resp.Body) + require.IsType(t, &healthcheck.HealthCheckExtension{}, ext) + + require.NoError(t, ext.Start(t.Context(), componenttest.NewNopHost())) + t.Cleanup(func() { + require.NoError(t, ext.Shutdown(t.Context())) + }) + + resp, err := client.Get(fmt.Sprintf("http://localhost:%d/", port)) require.NoError(t, err) - assert.Contains(t, buf.String(), `{"status":"Server available","upSince":"`) + assert.Equal(t, http.StatusServiceUnavailable, resp.StatusCode) + require.NoError(t, resp.Body.Close()) } diff --git a/extension/healthcheckextension/internal/healthcheck/handler.go b/extension/healthcheckextension/internal/healthcheck/handler.go index 59ae67b15cc91..deedd255e5a1d 100644 --- a/extension/healthcheckextension/internal/healthcheck/handler.go +++ b/extension/healthcheckextension/internal/healthcheck/handler.go @@ -104,11 +104,11 @@ func (hc *HealthCheck) Handler() http.Handler { w.Header().Set("Content-Type", "application/json") w.WriteHeader(template.statusCode) - _, _ = w.Write(hc.createRespBody(hcState, template)) + _, _ = w.Write(createRespBody(hcState, template)) }) } -func (*HealthCheck) createRespBody(state state, template healthCheckResponse) []byte { +func createRespBody(state state, template healthCheckResponse) []byte { resp := template // clone if state.status == Ready { resp.UpSince = state.upSince diff --git a/extension/healthcheckextension/testdata/config.yaml b/extension/healthcheckextension/testdata/config.yaml index 460543bdfc349..ee14eee89c32a 100644 --- a/extension/healthcheckextension/testdata/config.yaml +++ b/extension/healthcheckextension/testdata/config.yaml @@ -32,3 +32,54 @@ health_check/response-body: response_body: healthy: I'm OK unhealthy: I'm not well +health_check/v2-http-only: + http: + endpoint: "localhost:13133" + status: + enabled: true + path: "/status" + config: + enabled: false + path: "/config" +health_check/v2-grpc-only: + grpc: + endpoint: "localhost:13132" + transport: tcp +health_check/v2-both-protocols: + http: + endpoint: "localhost:13133" + status: + enabled: true + path: "/status" + config: + enabled: true + path: "/config" + grpc: + endpoint: "localhost:13132" + transport: tcp + component_health: + include_permanent_errors: true + include_recoverable_errors: true + recovery_duration: 1m +health_check/v2-missing-protocol: +health_check/v2-http-missing-endpoint: + http: + endpoint: "" +health_check/v2-grpc-missing-endpoint: + grpc: + endpoint: "" +health_check/v2-invalid-status-path: + http: + endpoint: "localhost:13133" + status: + enabled: true + path: "invalid" +health_check/v2-invalid-config-path: + http: + endpoint: "localhost:13133" + status: + enabled: true + path: "/status" + config: + enabled: true + path: "invalid" diff --git a/internal/healthcheck/config.go b/internal/healthcheck/config.go index 4e7bc51988704..9b1abe4d8a8f6 100644 --- a/internal/healthcheck/config.go +++ b/internal/healthcheck/config.go @@ -5,6 +5,7 @@ package healthcheck // import "github.com/open-telemetry/opentelemetry-collector import ( "errors" + "fmt" "strings" "go.opentelemetry.io/collector/component" @@ -13,7 +14,6 @@ import ( "go.opentelemetry.io/collector/config/confignet" "go.opentelemetry.io/collector/confmap" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/healthcheck/internal/common" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/healthcheck/internal/grpc" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/healthcheck/internal/http" @@ -27,6 +27,7 @@ type ( GRPCConfig = grpc.Config ComponentHealthConfig = common.ComponentHealthConfig CheckCollectorPipelineConfig = http.CheckCollectorPipelineConfig + ResponseBodyConfig = http.ResponseBodyConfig ) const ( @@ -43,6 +44,11 @@ var ( ErrInvalidPath = errors.New("path must start with /") ) +// endpointForPort returns a localhost endpoint for the given port. +func endpointForPort(port int) string { + return fmt.Sprintf("localhost:%d", port) +} + // Config has the configuration for the extension enabling the health check // extension, used to report the health status of the service. type Config struct { @@ -98,11 +104,43 @@ func (c *Config) Validate() error { // Unmarshal a confmap.Conf into the config struct. func (c *Config) Unmarshal(conf *confmap.Conf) error { + // Initialize with default values to enable unmarshaling into nested structs. + // For healthcheckextension: the feature gate determines behavior, not these fields. + // For healthcheckv2extension: these fields control which protocols are enabled. + // We conditionally initialize and then clear to preserve "user specified" vs "not specified". + if conf.IsSet(httpConfigKey) { + c.HTTPConfig = &http.Config{ + ServerConfig: confighttp.ServerConfig{ + Endpoint: endpointForPort(DefaultHTTPPort), + }, + Status: http.PathConfig{ + Enabled: true, + Path: "/status", + }, + Config: http.PathConfig{ + Enabled: false, + Path: "/config", + }, + } + } + if conf.IsSet(grpcConfigKey) { + c.GRPCConfig = &grpc.Config{ + ServerConfig: configgrpc.ServerConfig{ + NetAddr: confignet.AddrConfig{ + Endpoint: endpointForPort(DefaultGRPCPort), + Transport: confignet.TransportTypeTCP, + }, + }, + } + } + err := conf.Unmarshal(c) if err != nil { return err } + // Clear configs that weren't actually set in the confmap. + // This preserves the distinction between "user didn't specify" vs "user specified with defaults". if !conf.IsSet(httpConfigKey) { c.HTTPConfig = nil } @@ -118,13 +156,13 @@ func NewDefaultConfig() component.Config { return &Config{ LegacyConfig: http.LegacyConfig{ ServerConfig: confighttp.ServerConfig{ - Endpoint: testutil.EndpointForPort(DefaultHTTPPort), + Endpoint: endpointForPort(DefaultHTTPPort), }, Path: "/", }, HTTPConfig: &http.Config{ ServerConfig: confighttp.ServerConfig{ - Endpoint: testutil.EndpointForPort(DefaultHTTPPort), + Endpoint: endpointForPort(DefaultHTTPPort), }, Status: http.PathConfig{ Enabled: true, @@ -138,7 +176,7 @@ func NewDefaultConfig() component.Config { GRPCConfig: &grpc.Config{ ServerConfig: configgrpc.ServerConfig{ NetAddr: confignet.AddrConfig{ - Endpoint: testutil.EndpointForPort(DefaultGRPCPort), + Endpoint: endpointForPort(DefaultGRPCPort), Transport: "tcp", }, }, diff --git a/internal/healthcheck/internal/http/server_test.go b/internal/healthcheck/internal/http/server_test.go index b30399c6c8bda..8bee085877d67 100644 --- a/internal/healthcheck/internal/http/server_test.go +++ b/internal/healthcheck/internal/http/server_test.go @@ -8,6 +8,7 @@ import ( "fmt" "io" "net/http" + "net/http/httptest" "os" "path/filepath" "strings" @@ -21,6 +22,7 @@ import ( "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/confmap/confmaptest" "go.opentelemetry.io/collector/pipeline" + "go.uber.org/goleak" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/healthcheck/internal/common" @@ -153,6 +155,16 @@ type teststep struct { } func TestStatus(t *testing.T) { + // These goroutines are part of the http.Client's connection pool management. + // They don't accept context.Context and are managed by the transport's lifecycle, + // not our test lifecycle. They'll be cleaned up when the transport is garbage collected. + opts := []goleak.Option{ + goleak.IgnoreCurrent(), + goleak.IgnoreTopFunction("net/http.(*persistConn).writeLoop"), + goleak.IgnoreTopFunction("net/http.(*persistConn).readLoop"), + } + goleak.VerifyNone(t, opts...) + var server *Server traces := testhelpers.NewPipelineMetadata(pipeline.SignalTraces) metrics := testhelpers.NewPipelineMetadata(pipeline.SignalMetrics) @@ -2945,16 +2957,31 @@ func TestStatus(t *testing.T) { ) require.NoError(t, server.Start(t.Context(), componenttest.NewNopHost())) - defer func() { require.NoError(t, server.Shutdown(t.Context())) }() + ts := httptest.NewServer(server.mux) + + // Ensure cleanup happens in the correct order + defer func() { + ts.Close() + http.DefaultTransport.(*http.Transport).CloseIdleConnections() + require.NoError(t, server.Shutdown(t.Context())) + }() - var url string + var path string if tc.legacyConfig.UseV2 { - url = fmt.Sprintf("http://%s%s", tc.config.Endpoint, tc.config.Status.Path) + path = tc.config.Status.Path } else { - url = fmt.Sprintf("http://%s%s", tc.legacyConfig.Endpoint, tc.legacyConfig.Path) + path = tc.legacyConfig.Path } + url := ts.URL + path - client := &http.Client{} + // Create a custom client with aggressive timeouts + client := &http.Client{ + Timeout: 100 * time.Millisecond, + Transport: &http.Transport{ + DisableKeepAlives: true, + MaxConnsPerHost: 1, + }, + } for _, ts := range tc.teststeps { if ts.step != nil { @@ -2983,6 +3010,7 @@ func TestStatus(t *testing.T) { body, err := io.ReadAll(resp.Body) require.NoError(t, err) + defer resp.Body.Close() assert.Contains(t, string(body), ts.expectedBody) @@ -3046,6 +3074,16 @@ func assertStatusSimple( } func TestConfig(t *testing.T) { + // These goroutines are part of the http.Client's connection pool management. + // They don't accept context.Context and are managed by the transport's lifecycle, + // not our test lifecycle. They'll be cleaned up when the transport is garbage collected. + opts := []goleak.Option{ + goleak.IgnoreCurrent(), + goleak.IgnoreTopFunction("net/http.(*persistConn).writeLoop"), + goleak.IgnoreTopFunction("net/http.(*persistConn).readLoop"), + } + goleak.VerifyNone(t, opts...) + var server *Server confMap, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) require.NoError(t, err) @@ -3123,10 +3161,27 @@ func TestConfig(t *testing.T) { ) require.NoError(t, server.Start(t.Context(), componenttest.NewNopHost())) - defer func() { require.NoError(t, server.Shutdown(t.Context())) }() + ts := httptest.NewServer(server.mux) + + // Ensure cleanup happens in the correct order + defer func() { + ts.Close() + require.NoError(t, server.Shutdown(t.Context())) + }() + + // Use a single client for all requests in this test + transport := &http.Transport{ + DisableKeepAlives: true, + MaxIdleConnsPerHost: -1, + DisableCompression: true, + MaxConnsPerHost: -1, + IdleConnTimeout: 1 * time.Millisecond, + TLSHandshakeTimeout: 1 * time.Millisecond, + } + client := &http.Client{Transport: transport} + defer transport.CloseIdleConnections() - client := &http.Client{} - url := fmt.Sprintf("http://%s%s", tc.config.Endpoint, tc.config.Config.Path) + url := ts.URL + tc.config.Config.Path if tc.setup != nil { tc.setup() @@ -3138,6 +3193,7 @@ func TestConfig(t *testing.T) { body, err := io.ReadAll(resp.Body) require.NoError(t, err) + defer resp.Body.Close() assert.Equal(t, tc.expectedBody, body) }) } diff --git a/internal/tidylist/tidylist.txt b/internal/tidylist/tidylist.txt index f323aaf731e82..966afee7c90b1 100644 --- a/internal/tidylist/tidylist.txt +++ b/internal/tidylist/tidylist.txt @@ -157,9 +157,9 @@ extension/encoding/textencodingextension extension/encoding/zipkinencodingextension extension/googleclientauthextension extension/headerssetterextension -extension/healthcheckextension pkg/status internal/healthcheck +extension/healthcheckextension extension/healthcheckv2extension extension/httpforwarderextension extension/jaegerremotesampling