Skip to content

fix: operator stability part2 #96

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions api/telemetry/v1alpha1/bridge_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,13 @@ type BridgeSpec struct {

// BridgeStatus defines the observed state of Bridge
type BridgeStatus struct {
State State `json:"state,omitempty"`
}

//+kubebuilder:object:root=true
//+kubebuilder:resource:scope=Cluster,categories=telemetry-all
//+kubebuilder:subresource:status
//+kubebuilder:printcolumn:name="Source Tenant",type=string,JSONPath=`.spec.sourceTenant`
//+kubebuilder:printcolumn:name="Target Tenant",type=string,JSONPath=`.spec.targetTenant`
//+kubebuilder:printcolumn:name="State",type=string,JSONPath=`.status.state`

// Bridge is the Schema for the Bridges API
type Bridge struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ spec:
- jsonPath: .spec.targetTenant
name: Target Tenant
type: string
- jsonPath: .status.state
name: State
type: string
name: v1alpha1
schema:
openAPIV3Schema:
Expand Down Expand Up @@ -67,9 +64,6 @@ spec:
type: object
status:
description: BridgeStatus defines the observed state of Bridge
properties:
state:
type: string
type: object
type: object
served: true
Expand Down
6 changes: 0 additions & 6 deletions config/crd/bases/telemetry.kube-logging.dev_bridges.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ spec:
- jsonPath: .spec.targetTenant
name: Target Tenant
type: string
- jsonPath: .status.state
name: State
type: string
name: v1alpha1
schema:
openAPIV3Schema:
Expand Down Expand Up @@ -67,9 +64,6 @@ spec:
type: object
status:
description: BridgeStatus defines the observed state of Bridge
properties:
state:
type: string
type: object
type: object
served: true
Expand Down
109 changes: 56 additions & 53 deletions internal/controller/telemetry/collector_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
otelv1beta1 "github.com/open-telemetry/opentelemetry-operator/apis/v1beta1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -44,6 +43,10 @@ import (
"github.com/kube-logging/telemetry-controller/internal/controller/telemetry/pipeline/components"
)

var (
ErrTenantFailed = errors.New("tenant failed")
)

const (
requeueDelayOnFailedTenant = 20 * time.Second
axoflowOtelCollectorImageRef = "ghcr.io/axoflow/axoflow-otel-collector/axoflow-otel-collector:0.112.0"
Expand All @@ -55,20 +58,15 @@ type CollectorReconciler struct {
Scheme *runtime.Scheme
}

type TenantFailedError struct {
msg string
}

type BasicAuthClientAuthConfig struct {
Username string
Password string
}

func (e *TenantFailedError) Error() string { return e.msg }

func (r *CollectorReconciler) buildConfigInputForCollector(ctx context.Context, collector *v1alpha1.Collector) (otelcolconfgen.OtelColConfigInput, error) {
logger := log.FromContext(ctx)
tenantSubscriptionMap := make(map[string][]v1alpha1.NamespacedName)
var bridgesReferencedByTenant []v1alpha1.Bridge
subscriptionOutputMap := make(map[v1alpha1.NamespacedName][]v1alpha1.NamespacedName)

tenants, err := r.getTenantsMatchingSelectors(ctx, collector.Spec.TenantSelector)
Expand All @@ -81,15 +79,13 @@ func (r *CollectorReconciler) buildConfigInputForCollector(ctx context.Context,
}

for _, tenant := range tenants {

if tenant.Status.State == v1alpha1.StateFailed {
logger.Info("tenant %q is in failed state, retrying later", tenant.Name)
return otelcolconfgen.OtelColConfigInput{}, &TenantFailedError{msg: "tenant failed"}
logger.Info(fmt.Sprintf("tenant %q is in failed state, retrying later", tenant.Name))
return otelcolconfgen.OtelColConfigInput{}, ErrTenantFailed
}

subscriptionNames := tenant.Status.Subscriptions
tenantSubscriptionMap[tenant.Name] = subscriptionNames

for _, subsName := range subscriptionNames {
queriedSubs := &v1alpha1.Subscription{}
if err = r.Client.Get(ctx, types.NamespacedName(subsName), queriedSubs); err != nil {
Expand All @@ -98,12 +94,22 @@ func (r *CollectorReconciler) buildConfigInputForCollector(ctx context.Context,
}
subscriptions[subsName] = *queriedSubs
}

bridgeNames := tenant.Status.ConnectedBridges
for _, bridgeName := range bridgeNames {
queriedBridge := &v1alpha1.Bridge{}
if err = r.Client.Get(ctx, types.NamespacedName{Name: bridgeName}, queriedBridge); err != nil {
logger.Error(errors.WithStack(err), "failed getting bridges for tenant", "tenant", tenant.Name)
return otelcolconfgen.OtelColConfigInput{}, err
}

bridgesReferencedByTenant = append(bridgesReferencedByTenant, *queriedBridge)
}
}

for _, subscription := range subscriptions {
outputNames := subscription.Status.Outputs
subscriptionOutputMap[subscription.NamespacedName()] = outputNames

for _, outputName := range outputNames {
outputWithSecretData := components.OutputWithSecretData{}

Expand All @@ -117,29 +123,20 @@ func (r *CollectorReconciler) buildConfigInputForCollector(ctx context.Context,
if err := r.populateSecretForOutput(ctx, queriedOutput, &outputWithSecretData); err != nil {
return otelcolconfgen.OtelColConfigInput{}, err
}

outputs = append(outputs, outputWithSecretData)
}
}

bridges, err := r.getBridges(ctx, client.ListOptions{})
if err != nil {
logger.Error(errors.WithStack(err), "failed listing bridges")
return otelcolconfgen.OtelColConfigInput{}, err
}

otelConfigInput := otelcolconfgen.OtelColConfigInput{
return otelcolconfgen.OtelColConfigInput{
Tenants: tenants,
Subscriptions: subscriptions,
Bridges: bridges,
Bridges: bridgesReferencedByTenant,
OutputsWithSecretData: outputs,
TenantSubscriptionMap: tenantSubscriptionMap,
SubscriptionOutputMap: subscriptionOutputMap,
Debug: collector.Spec.Debug,
MemoryLimiter: *collector.Spec.MemoryLimiter,
}

return otelConfigInput, nil
}, nil
}

func (r *CollectorReconciler) populateSecretForOutput(ctx context.Context, queriedOutput *v1alpha1.Output, outputWithSecret *components.OutputWithSecretData) error {
Expand Down Expand Up @@ -182,10 +179,8 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
collector := &v1alpha1.Collector{}
logger.Info(fmt.Sprintf("getting collector: %q", req.Name))

if err := r.Get(ctx, req.NamespacedName, collector); client.IgnoreNotFound(err) != nil {
logger.Error(errors.New("failed getting collector, possible API server error"), "failed getting collector, possible API server error",
"collector", req.Name)
return ctrl.Result{}, err
if err := r.Get(ctx, req.NamespacedName, collector); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}

collector.Spec.SetDefaults()
Expand All @@ -194,22 +189,38 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (

otelConfigInput, err := r.buildConfigInputForCollector(ctx, collector)
if err != nil {
if errors.Is(err, &TenantFailedError{}) {
if errors.Is(err, ErrTenantFailed) {
return ctrl.Result{RequeueAfter: requeueDelayOnFailedTenant}, err
}
return ctrl.Result{}, err
}

if err := otelConfigInput.ValidateConfig(); ignoreNoResourcesError(err) != nil {
collector.Status.State = v1alpha1.StateFailed
if err := otelConfigInput.ValidateConfig(); err != nil {
if errors.Is(err, otelcolconfgen.ErrNoResources) {
logger.Info(err.Error())
return ctrl.Result{}, nil
}
logger.Error(errors.WithStack(err), "invalid otel config input")

collector.Status.State = v1alpha1.StateFailed
if updateErr := r.updateStatus(ctx, collector); updateErr != nil {
logger.Error(errors.WithStack(updateErr), "failed updating collector status")
return ctrl.Result{}, errors.Append(err, updateErr)
}

return ctrl.Result{}, err
}

otelConfig := otelConfigInput.AssembleConfig(ctx)
if err := validator.ValidateAssembledConfig(otelConfig); err != nil {
collector.Status.State = v1alpha1.StateFailed
logger.Error(errors.WithStack(err), "invalid otel config")

collector.Status.State = v1alpha1.StateFailed
if updateErr := r.updateStatus(ctx, collector); updateErr != nil {
logger.Error(errors.WithStack(updateErr), "failed updating collector status")
return ctrl.Result{}, errors.Append(err, updateErr)
}

return ctrl.Result{}, err
}

Expand Down Expand Up @@ -280,11 +291,15 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
resourceReconciler := reconciler.NewReconcilerWith(r.Client, reconciler.WithLog(logger))
_, err = resourceReconciler.ReconcileResource(&otelCollector, reconciler.StatePresent)
if err != nil {
logger.Error(errors.WithStack(err), "failed reconciling collector")

collector.Status.State = v1alpha1.StateFailed
if apierrors.IsConflict(err) {
logger.Info("conflict while creating otel collector, retrying")
return ctrl.Result{Requeue: true}, nil
if updateErr := r.updateStatus(ctx, collector); updateErr != nil {
logger.Error(errors.WithStack(updateErr), "failed updating collector status")
return ctrl.Result{}, errors.Append(err, updateErr)
}

return ctrl.Result{}, err
}

tenantNames := []string{}
Expand All @@ -296,9 +311,10 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
collector.Status.State = v1alpha1.StateReady
if !reflect.DeepEqual(originalCollectorStatus, collector.Status) {
logger.Info("collector status changed")
if err = r.Client.Status().Update(ctx, collector); err != nil {
logger.Error(errors.WithStack(err), "failed updating collector status")
return ctrl.Result{}, err

if updateErr := r.updateStatus(ctx, collector); updateErr != nil {
logger.Error(errors.WithStack(updateErr), "failed updating collector status")
return ctrl.Result{}, errors.Append(err, updateErr)
}
}

Expand Down Expand Up @@ -536,13 +552,8 @@ func (r *CollectorReconciler) getTenantsMatchingSelectors(ctx context.Context, l
return tenantsForSelector.Items, nil
}

func (r *CollectorReconciler) getBridges(ctx context.Context, listOpts client.ListOptions) ([]v1alpha1.Bridge, error) {
var bridges v1alpha1.BridgeList
if err := r.Client.List(ctx, &bridges, &listOpts); client.IgnoreNotFound(err) != nil {
return nil, err
}

return bridges.Items, nil
func (r *CollectorReconciler) updateStatus(ctx context.Context, obj client.Object) error {
return r.Status().Update(ctx, obj)
}

func normalizeStringSlice(inputList []string) []string {
Expand All @@ -558,11 +569,3 @@ func normalizeStringSlice(inputList []string) []string {

return uniqueList
}

func ignoreNoResourcesError(err error) error {
if err == otelcolconfgen.ErrNoResources {
return nil
}

return err
}
33 changes: 26 additions & 7 deletions internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"reflect"
"slices"

"github.com/hashicorp/go-multierror"
Expand All @@ -33,7 +34,7 @@ import (
"golang.org/x/exp/maps"
)

var ErrNoResources = errors.New("there are no resources deployed that the controller can use")
var ErrNoResources = errors.New("there are no resources deployed that the collector(s) can use")

type OtelColConfigInput struct {
// These must only include resources that are selected by the collector, tenant labelselectors, and listed outputs in the subscriptions
Expand All @@ -49,6 +50,21 @@ type OtelColConfigInput struct {
Debug bool
}

func (cfgInput *OtelColConfigInput) IsEmpty() bool {
v := reflect.ValueOf(*cfgInput)
for i := range v.NumField() {
field := v.Field(i)
switch field.Kind() {
case reflect.Slice, reflect.Map:
if field.Len() != 0 {
return false
}
}
}

return true
}

func (cfgInput *OtelColConfigInput) generateExporters(ctx context.Context) map[string]any {
exporters := map[string]any{}
maps.Copy(exporters, exporter.GenerateMetricsExporters())
Expand Down Expand Up @@ -248,9 +264,14 @@ func (cfgInput *OtelColConfigInput) AssembleConfig(ctx context.Context) otelv1be
}
}

extensionNames := make([]string, 0, len(extensions))
for k := range extensions {
extensionNames = append(extensionNames, k)
var extensionNames []string
if len(extensions) > 0 {
extensionNames = make([]string, 0, len(extensions))
for k := range extensions {
extensionNames = append(extensionNames, k)
}
} else {
extensionNames = nil
}

return otelv1beta1.Config{
Expand Down Expand Up @@ -320,16 +341,14 @@ func validateSubscriptionsAndBridges(tenants *[]v1alpha1.Tenant, subscriptions *
}

func (cfgInput *OtelColConfigInput) ValidateConfig() error {
if cfgInput == nil {
if cfgInput.IsEmpty() {
return ErrNoResources
}

var result *multierror.Error

if err := validateTenants(&cfgInput.Tenants); err != nil {
result = multierror.Append(result, err)
}

if err := validateSubscriptionsAndBridges(&cfgInput.Tenants, &cfgInput.Subscriptions, &cfgInput.Bridges); err != nil {
result = multierror.Append(result, err)
}
Expand Down
Loading
Loading