From 99358c918132d33e3285569600b22094f92b467d Mon Sep 17 00:00:00 2001 From: Bence Csati Date: Wed, 13 Nov 2024 11:42:15 +0100 Subject: [PATCH 1/2] feat: add batch-processor Signed-off-by: Bence Csati --- api/telemetry/v1alpha1/tenant_types.go | 62 ++++- .../v1alpha1/zz_generated.deepcopy.go | 43 +++- .../telemetry.kube-logging.dev_tenants.yaml | 242 +++++++++++------- .../telemetry.kube-logging.dev_tenants.yaml | 242 +++++++++++------- .../telemetry/otel_conf_gen/otel_conf_gen.go | 16 +- .../components/processor/batch_processor.go | 38 +++ .../processor/transform_processor.go | 12 +- .../controller/telemetry/route_controller.go | 1 + internal/controller/telemetry/utils/utils.go | 10 + 9 files changed, 466 insertions(+), 200 deletions(-) create mode 100644 internal/controller/telemetry/pipeline/components/processor/batch_processor.go diff --git a/api/telemetry/v1alpha1/tenant_types.go b/api/telemetry/v1alpha1/tenant_types.go index d5d92cf8..d7dd339a 100644 --- a/api/telemetry/v1alpha1/tenant_types.go +++ b/api/telemetry/v1alpha1/tenant_types.go @@ -15,9 +15,49 @@ package v1alpha1 import ( + "time" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +type Batch struct { + // Name of the Batch processor + Name string `json:"name,omitempty"` + + // From go.opentelemetry.io/collector/processor/batchprocessor + + // Timeout sets the time after which a batch will be sent regardless of size. + // When this is set to zero, batched data will be sent immediately. + Timeout time.Duration `json:"timeout,omitempty"` + + // SendBatchSize is the size of a batch which after hit, will trigger it to be sent. + // When this is set to zero, the batch size is ignored and data will be sent immediately + // subject to only send_batch_max_size. + SendBatchSize uint32 `json:"send_batch_size,omitempty"` + + // SendBatchMaxSize is the maximum size of a batch. It must be larger than SendBatchSize. + // Larger batches are split into smaller units. + // Default value is 0, that means no maximum size. + SendBatchMaxSize uint32 `json:"send_batch_max_size,omitempty"` + + // MetadataKeys is a list of client.Metadata keys that will be + // used to form distinct batchers. If this setting is empty, + // a single batcher instance will be used. When this setting + // is not empty, one batcher will be used per distinct + // combination of values for the listed metadata keys. + // + // Empty value and unset metadata are treated as distinct cases. + // + // Entries are case-insensitive. Duplicated entries will + // trigger a validation error. + MetadataKeys []string `json:"metadata_keys,omitempty"` + + // MetadataCardinalityLimit indicates the maximum number of + // batcher instances that will be created through a distinct + // combination of MetadataKeys. + MetadataCardinalityLimit uint32 `json:"metadata_cardinality_limit,omitempty"` +} + // TransformStatement represents a single statement in a Transform processor // ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor type TransformStatement struct { @@ -43,6 +83,12 @@ type Transform struct { LogStatements []TransformStatement `json:"logStatements,omitempty"` } +// Processors defines the tenant level processors +type Processors struct { + Transform Transform `json:"transform,omitempty"` + Batch *Batch `json:"batch,omitempty"` +} + // RouteConfig defines the routing configuration for a tenant // it will be used to generate routing connectors type RouteConfig struct { @@ -60,10 +106,24 @@ type RouteConfig struct { type TenantSpec struct { SubscriptionNamespaceSelectors []metav1.LabelSelector `json:"subscriptionNamespaceSelectors,omitempty"` LogSourceNamespaceSelectors []metav1.LabelSelector `json:"logSourceNamespaceSelectors,omitempty"` - Transform Transform `json:"transform,omitempty"` + Processors Processors `json:"processors,omitempty"` RouteConfig RouteConfig `json:"routeConfig,omitempty"` } +func (t *TenantSpec) SetDefaults(tenantName string) { + if t.Processors.Batch == nil { + t.Processors.Batch = &Batch{ + Name: tenantName, + Timeout: 200 * time.Millisecond, + SendBatchSize: 8192, + } + } + + if len(t.Processors.Batch.MetadataKeys) > 0 && t.Processors.Batch.MetadataCardinalityLimit == 0 { + t.Processors.Batch.MetadataCardinalityLimit = 1000 + } +} + // TenantStatus defines the observed state of Tenant type TenantStatus struct { Subscriptions []NamespacedName `json:"subscriptions,omitempty"` diff --git a/api/telemetry/v1alpha1/zz_generated.deepcopy.go b/api/telemetry/v1alpha1/zz_generated.deepcopy.go index c4c1d15a..984fe1f7 100644 --- a/api/telemetry/v1alpha1/zz_generated.deepcopy.go +++ b/api/telemetry/v1alpha1/zz_generated.deepcopy.go @@ -77,6 +77,26 @@ func (in *BasicAuthConfig) DeepCopy() *BasicAuthConfig { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Batch) DeepCopyInto(out *Batch) { + *out = *in + if in.MetadataKeys != nil { + in, out := &in.MetadataKeys, &out.MetadataKeys + *out = make([]string, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Batch. +func (in *Batch) DeepCopy() *Batch { + if in == nil { + return nil + } + out := new(Batch) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *BearerAuthConfig) DeepCopyInto(out *BearerAuthConfig) { *out = *in @@ -629,6 +649,27 @@ func (in *OutputStatus) DeepCopy() *OutputStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Processors) DeepCopyInto(out *Processors) { + *out = *in + in.Transform.DeepCopyInto(&out.Transform) + if in.Batch != nil { + in, out := &in.Batch, &out.Batch + *out = new(Batch) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Processors. +func (in *Processors) DeepCopy() *Processors { + if in == nil { + return nil + } + out := new(Processors) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *QueueSettings) DeepCopyInto(out *QueueSettings) { *out = *in @@ -886,7 +927,7 @@ func (in *TenantSpec) DeepCopyInto(out *TenantSpec) { (*in)[i].DeepCopyInto(&(*out)[i]) } } - in.Transform.DeepCopyInto(&out.Transform) + in.Processors.DeepCopyInto(&out.Processors) in.RouteConfig.DeepCopyInto(&out.RouteConfig) } diff --git a/charts/telemetry-controller/crds/telemetry.kube-logging.dev_tenants.yaml b/charts/telemetry-controller/crds/telemetry.kube-logging.dev_tenants.yaml index ab253d49..d21bd73d 100644 --- a/charts/telemetry-controller/crds/telemetry.kube-logging.dev_tenants.yaml +++ b/charts/telemetry-controller/crds/telemetry.kube-logging.dev_tenants.yaml @@ -105,6 +105,153 @@ spec: type: object x-kubernetes-map-type: atomic type: array + processors: + description: Processors defines the tenant level processors + properties: + batch: + properties: + metadata_cardinality_limit: + description: |- + MetadataCardinalityLimit indicates the maximum number of + batcher instances that will be created through a distinct + combination of MetadataKeys. + format: int32 + type: integer + metadata_keys: + description: |- + MetadataKeys is a list of client.Metadata keys that will be + used to form distinct batchers. If this setting is empty, + a single batcher instance will be used. When this setting + is not empty, one batcher will be used per distinct + combination of values for the listed metadata keys. + + Empty value and unset metadata are treated as distinct cases. + + Entries are case-insensitive. Duplicated entries will + trigger a validation error. + items: + type: string + type: array + name: + description: Name of the Batch processor + type: string + send_batch_max_size: + description: |- + SendBatchMaxSize is the maximum size of a batch. It must be larger than SendBatchSize. + Larger batches are split into smaller units. + Default value is 0, that means no maximum size. + format: int32 + type: integer + send_batch_size: + description: |- + SendBatchSize is the size of a batch which after hit, will trigger it to be sent. + When this is set to zero, the batch size is ignored and data will be sent immediately + subject to only send_batch_max_size. + format: int32 + type: integer + timeout: + description: |- + Timeout sets the time after which a batch will be sent regardless of size. + When this is set to zero, batched data will be sent immediately. + format: int64 + type: integer + type: object + transform: + description: Transform represents the Transform processor, which + modifies telemetry based on its configuration + properties: + errorMode: + description: |- + ErrorMode specifies how errors are handled while processing a statement + vaid options are: ignore, silent, propagate; (default: propagate) + enum: + - ignore + - silent + - propagate + type: string + logStatements: + items: + description: |- + TransformStatement represents a single statement in a Transform processor + ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor + properties: + conditions: + items: + type: string + type: array + context: + enum: + - resource + - scope + - span + - spanevent + - metric + - datapoint + - log + type: string + statements: + items: + type: string + type: array + type: object + type: array + metricStatements: + items: + description: |- + TransformStatement represents a single statement in a Transform processor + ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor + properties: + conditions: + items: + type: string + type: array + context: + enum: + - resource + - scope + - span + - spanevent + - metric + - datapoint + - log + type: string + statements: + items: + type: string + type: array + type: object + type: array + name: + description: Name of the Transform processor + type: string + traceStatements: + items: + description: |- + TransformStatement represents a single statement in a Transform processor + ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor + properties: + conditions: + items: + type: string + type: array + context: + enum: + - resource + - scope + - span + - spanevent + - metric + - datapoint + - log + type: string + statements: + items: + type: string + type: array + type: object + type: array + type: object + type: object routeConfig: description: |- RouteConfig defines the routing configuration for a tenant @@ -177,101 +324,6 @@ spec: type: object x-kubernetes-map-type: atomic type: array - transform: - description: Transform represents the Transform processor, which modifies - telemetry based on its configuration - properties: - errorMode: - description: |- - ErrorMode specifies how errors are handled while processing a statement - vaid options are: ignore, silent, propagate; (default: propagate) - enum: - - ignore - - silent - - propagate - type: string - logStatements: - items: - description: |- - TransformStatement represents a single statement in a Transform processor - ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor - properties: - conditions: - items: - type: string - type: array - context: - enum: - - resource - - scope - - span - - spanevent - - metric - - datapoint - - log - type: string - statements: - items: - type: string - type: array - type: object - type: array - metricStatements: - items: - description: |- - TransformStatement represents a single statement in a Transform processor - ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor - properties: - conditions: - items: - type: string - type: array - context: - enum: - - resource - - scope - - span - - spanevent - - metric - - datapoint - - log - type: string - statements: - items: - type: string - type: array - type: object - type: array - name: - description: Name of the Transform processor - type: string - traceStatements: - items: - description: |- - TransformStatement represents a single statement in a Transform processor - ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor - properties: - conditions: - items: - type: string - type: array - context: - enum: - - resource - - scope - - span - - spanevent - - metric - - datapoint - - log - type: string - statements: - items: - type: string - type: array - type: object - type: array - type: object type: object status: description: TenantStatus defines the observed state of Tenant diff --git a/config/crd/bases/telemetry.kube-logging.dev_tenants.yaml b/config/crd/bases/telemetry.kube-logging.dev_tenants.yaml index ab253d49..d21bd73d 100644 --- a/config/crd/bases/telemetry.kube-logging.dev_tenants.yaml +++ b/config/crd/bases/telemetry.kube-logging.dev_tenants.yaml @@ -105,6 +105,153 @@ spec: type: object x-kubernetes-map-type: atomic type: array + processors: + description: Processors defines the tenant level processors + properties: + batch: + properties: + metadata_cardinality_limit: + description: |- + MetadataCardinalityLimit indicates the maximum number of + batcher instances that will be created through a distinct + combination of MetadataKeys. + format: int32 + type: integer + metadata_keys: + description: |- + MetadataKeys is a list of client.Metadata keys that will be + used to form distinct batchers. If this setting is empty, + a single batcher instance will be used. When this setting + is not empty, one batcher will be used per distinct + combination of values for the listed metadata keys. + + Empty value and unset metadata are treated as distinct cases. + + Entries are case-insensitive. Duplicated entries will + trigger a validation error. + items: + type: string + type: array + name: + description: Name of the Batch processor + type: string + send_batch_max_size: + description: |- + SendBatchMaxSize is the maximum size of a batch. It must be larger than SendBatchSize. + Larger batches are split into smaller units. + Default value is 0, that means no maximum size. + format: int32 + type: integer + send_batch_size: + description: |- + SendBatchSize is the size of a batch which after hit, will trigger it to be sent. + When this is set to zero, the batch size is ignored and data will be sent immediately + subject to only send_batch_max_size. + format: int32 + type: integer + timeout: + description: |- + Timeout sets the time after which a batch will be sent regardless of size. + When this is set to zero, batched data will be sent immediately. + format: int64 + type: integer + type: object + transform: + description: Transform represents the Transform processor, which + modifies telemetry based on its configuration + properties: + errorMode: + description: |- + ErrorMode specifies how errors are handled while processing a statement + vaid options are: ignore, silent, propagate; (default: propagate) + enum: + - ignore + - silent + - propagate + type: string + logStatements: + items: + description: |- + TransformStatement represents a single statement in a Transform processor + ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor + properties: + conditions: + items: + type: string + type: array + context: + enum: + - resource + - scope + - span + - spanevent + - metric + - datapoint + - log + type: string + statements: + items: + type: string + type: array + type: object + type: array + metricStatements: + items: + description: |- + TransformStatement represents a single statement in a Transform processor + ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor + properties: + conditions: + items: + type: string + type: array + context: + enum: + - resource + - scope + - span + - spanevent + - metric + - datapoint + - log + type: string + statements: + items: + type: string + type: array + type: object + type: array + name: + description: Name of the Transform processor + type: string + traceStatements: + items: + description: |- + TransformStatement represents a single statement in a Transform processor + ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor + properties: + conditions: + items: + type: string + type: array + context: + enum: + - resource + - scope + - span + - spanevent + - metric + - datapoint + - log + type: string + statements: + items: + type: string + type: array + type: object + type: array + type: object + type: object routeConfig: description: |- RouteConfig defines the routing configuration for a tenant @@ -177,101 +324,6 @@ spec: type: object x-kubernetes-map-type: atomic type: array - transform: - description: Transform represents the Transform processor, which modifies - telemetry based on its configuration - properties: - errorMode: - description: |- - ErrorMode specifies how errors are handled while processing a statement - vaid options are: ignore, silent, propagate; (default: propagate) - enum: - - ignore - - silent - - propagate - type: string - logStatements: - items: - description: |- - TransformStatement represents a single statement in a Transform processor - ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor - properties: - conditions: - items: - type: string - type: array - context: - enum: - - resource - - scope - - span - - spanevent - - metric - - datapoint - - log - type: string - statements: - items: - type: string - type: array - type: object - type: array - metricStatements: - items: - description: |- - TransformStatement represents a single statement in a Transform processor - ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor - properties: - conditions: - items: - type: string - type: array - context: - enum: - - resource - - scope - - span - - spanevent - - metric - - datapoint - - log - type: string - statements: - items: - type: string - type: array - type: object - type: array - name: - description: Name of the Transform processor - type: string - traceStatements: - items: - description: |- - TransformStatement represents a single statement in a Transform processor - ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor - properties: - conditions: - items: - type: string - type: array - context: - enum: - - resource - - scope - - span - - spanevent - - metric - - datapoint - - log - type: string - statements: - items: - type: string - type: array - type: object - type: array - type: object type: object status: description: TenantStatus defines the observed state of Tenant diff --git a/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go b/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go index 9da1803e..11e2bc97 100644 --- a/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go +++ b/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go @@ -30,6 +30,7 @@ import ( "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/pipeline/components/extension" "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/pipeline/components/processor" "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/pipeline/components/receiver" + "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/utils" otelv1beta1 "github.com/open-telemetry/opentelemetry-operator/apis/v1beta1" "golang.org/x/exp/maps" ) @@ -86,8 +87,13 @@ func (cfgInput *OtelColConfigInput) generateProcessors() map[string]any { processors[fmt.Sprintf("attributes/tenant_%s", tenant.Name)] = processor.GenerateTenantAttributeProcessor(tenant.Name) // Add a transform processor if the tenant has one - if tenant.Spec.Transform.Name != "" { - processors[fmt.Sprintf("transform/%s", tenant.Spec.Transform.Name)] = processor.GenerateTransformProcessorForTenant(tenant) + if tenant.Spec.Processors.Transform.Name != "" { + processors[fmt.Sprintf("transform/%s", tenant.Spec.Processors.Transform.Name)] = processor.GenerateTransformProcessorForTenant(tenant) + } + + // Add a batch processor if the tenant has one + if tenant.Spec.Processors.Batch != nil && tenant.Spec.Processors.Batch.Name != "" { + processors[fmt.Sprintf("batch/%s", tenant.Spec.Processors.Batch.Name)] = processor.GenerateBatchProcessor(*tenant.Spec.Processors.Batch) } } @@ -200,7 +206,13 @@ func (cfgInput *OtelColConfigInput) generateNamedPipelines() map[string]*otelv1b output := cfgInput.OutputsWithSecretData[idx] receivers := []string{fmt.Sprintf("routing/subscription_%s_%s_outputs", subscription.Namespace, subscription.Name)} + processors := []string{fmt.Sprintf("attributes/exporter_name_%s", output.Output.Name)} + queriedTenant := utils.GetTenantByName(cfgInput.Tenants, tenant) + if queriedTenant != nil && queriedTenant.Spec.Processors.Batch != nil { + processors = append(processors, fmt.Sprintf("batch/%s", queriedTenant.Spec.Processors.Batch.Name)) + } + var exporters []string if output.Output.Spec.OTLPGRPC != nil { diff --git a/internal/controller/telemetry/pipeline/components/processor/batch_processor.go b/internal/controller/telemetry/pipeline/components/processor/batch_processor.go new file mode 100644 index 00000000..787b0bb0 --- /dev/null +++ b/internal/controller/telemetry/pipeline/components/processor/batch_processor.go @@ -0,0 +1,38 @@ +// Copyright © 2024 Kube logging authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processor + +import ( + "github.com/kube-logging/telemetry-controller/api/telemetry/v1alpha1" +) + +func GenerateBatchProcessor(batchProcessor v1alpha1.Batch) map[string]any { + batchProcessorResult := make(map[string]any) + batchProcessorResult["timeout"] = batchProcessor.Timeout.String() + if batchProcessor.SendBatchSize != 0 { + batchProcessorResult["send_batch_size"] = batchProcessor.SendBatchSize + } + if batchProcessor.SendBatchMaxSize != 0 { + batchProcessorResult["send_batch_max_size"] = batchProcessor.SendBatchMaxSize + } + if len(batchProcessor.MetadataKeys) != 0 { + batchProcessorResult["metadata_keys"] = batchProcessor.MetadataKeys + } + if batchProcessor.MetadataCardinalityLimit != 0 { + batchProcessorResult["metadata_cardinality_limit"] = batchProcessor.MetadataCardinalityLimit + } + + return batchProcessorResult +} diff --git a/internal/controller/telemetry/pipeline/components/processor/transform_processor.go b/internal/controller/telemetry/pipeline/components/processor/transform_processor.go index 23d52a68..c5136503 100644 --- a/internal/controller/telemetry/pipeline/components/processor/transform_processor.go +++ b/internal/controller/telemetry/pipeline/components/processor/transform_processor.go @@ -37,17 +37,17 @@ type TransformProcessor struct { func GenerateTransformProcessorForTenant(tenant v1alpha1.Tenant) TransformProcessor { return TransformProcessor{ - ErrorMode: components.ErrorMode(tenant.Spec.Transform.ErrorMode), - TraceStatements: convertAPIStatements(tenant.Spec.Transform.TraceStatements), - MetricStatements: convertAPIStatements(tenant.Spec.Transform.MetricStatements), - LogStatements: convertAPIStatements(tenant.Spec.Transform.LogStatements), + ErrorMode: components.ErrorMode(tenant.Spec.Processors.Transform.ErrorMode), + TraceStatements: convertAPIStatements(tenant.Spec.Processors.Transform.TraceStatements), + MetricStatements: convertAPIStatements(tenant.Spec.Processors.Transform.MetricStatements), + LogStatements: convertAPIStatements(tenant.Spec.Processors.Transform.LogStatements), } } func GenerateTransformProcessorForTenantPipeline(tenantName string, pipeline *otelv1beta1.Pipeline, tenants []v1alpha1.Tenant) { for _, tenant := range tenants { - if tenant.Name == tenantName && tenant.Spec.Transform.Name != "" { - pipeline.Processors = append(pipeline.Processors, fmt.Sprintf("transform/%s", tenant.Spec.Transform.Name)) + if tenant.Name == tenantName && tenant.Spec.Processors.Transform.Name != "" { + pipeline.Processors = append(pipeline.Processors, fmt.Sprintf("transform/%s", tenant.Spec.Processors.Transform.Name)) } } } diff --git a/internal/controller/telemetry/route_controller.go b/internal/controller/telemetry/route_controller.go index 1a3df3ea..a79cf70e 100644 --- a/internal/controller/telemetry/route_controller.go +++ b/internal/controller/telemetry/route_controller.go @@ -113,6 +113,7 @@ func (r *RouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl subscription.Status.State = v1alpha1.StateFailed logger.Error(errors.WithStack(errors.New("no valid outputs for subscription")), "no valid outputs for subscription", "subscription", subscription.NamespacedName().String()) } else { + tenant.Spec.SetDefaults(tenant.Name) subscription.Status.State = v1alpha1.StateReady } subscription.Status.Outputs = validOutputs diff --git a/internal/controller/telemetry/utils/utils.go b/internal/controller/telemetry/utils/utils.go index 13d6af62..fa78513f 100644 --- a/internal/controller/telemetry/utils/utils.go +++ b/internal/controller/telemetry/utils/utils.go @@ -26,3 +26,13 @@ func SortNamespacedNames(names []v1alpha1.NamespacedName) { return strings.Compare(a.String(), b.String()) }) } + +func GetTenantByName(tenants []v1alpha1.Tenant, name string) *v1alpha1.Tenant { + for _, tenant := range tenants { + if tenant.Name == name { + return &tenant + } + } + + return nil +} From 5b44d7164783a7f33be4235e03576c3883a69ca4 Mon Sep 17 00:00:00 2001 From: Bence Csati Date: Wed, 13 Nov 2024 11:43:19 +0100 Subject: [PATCH 2/2] test: add batch-processor test Signed-off-by: Bence Csati --- .../examples/tenant-to-tenant-routing/pipeline.yaml | 13 +++++++------ .../tenants_with_bridges/tenants_with_bridges.yaml | 13 +++++++------ .../otel_col_conf_test_fixtures/complex.yaml | 11 +++++++++++ .../telemetry/otel_conf_gen/otel_conf_gen_test.go | 5 +++++ 4 files changed, 30 insertions(+), 12 deletions(-) diff --git a/docs/examples/tenant-to-tenant-routing/pipeline.yaml b/docs/examples/tenant-to-tenant-routing/pipeline.yaml index 48165ae7..4e3b80dc 100644 --- a/docs/examples/tenant-to-tenant-routing/pipeline.yaml +++ b/docs/examples/tenant-to-tenant-routing/pipeline.yaml @@ -15,12 +15,13 @@ metadata: collector: cluster name: shared spec: - transform: - name: parse-nginx - logStatements: - - context: log - statements: - - set(resource.attributes["parsed"], ExtractPatterns(body, "(?P(GET|PUT))")) + processors: + transform: + name: parse-nginx + logStatements: + - context: log + statements: + - set(resource.attributes["parsed"], ExtractPatterns(body, "(?P(GET|PUT))")) logSourceNamespaceSelectors: - matchLabels: tenant: shared diff --git a/e2e/testdata/tenants_with_bridges/tenants_with_bridges.yaml b/e2e/testdata/tenants_with_bridges/tenants_with_bridges.yaml index 48c5d8af..971d6119 100644 --- a/e2e/testdata/tenants_with_bridges/tenants_with_bridges.yaml +++ b/e2e/testdata/tenants_with_bridges/tenants_with_bridges.yaml @@ -41,12 +41,13 @@ metadata: collector: cluster name: shared spec: - transform: - name: parse-nginx - logStatements: - - context: log - statements: - - set(resource.attributes["parsed"], ExtractPatterns(body, "(?P(GET|PUT))")) + processors: + transform: + name: parse-nginx + logStatements: + - context: log + statements: + - set(resource.attributes["parsed"], ExtractPatterns(body, "(?P(GET|PUT))")) logSourceNamespaceSelectors: - matchLabels: tenant: shared diff --git a/internal/controller/telemetry/otel_conf_gen/otel_col_conf_test_fixtures/complex.yaml b/internal/controller/telemetry/otel_conf_gen/otel_col_conf_test_fixtures/complex.yaml index 2bfb5eb0..9536dd89 100644 --- a/internal/controller/telemetry/otel_conf_gen/otel_col_conf_test_fixtures/complex.yaml +++ b/internal/controller/telemetry/otel_conf_gen/otel_col_conf_test_fixtures/complex.yaml @@ -122,6 +122,12 @@ processors: - action: insert key: tenant value: example-tenant-b + batch/example-tenant-a: + send_batch_size: 8192 + timeout: 200ms + batch/example-tenant-b: + send_batch_size: 8192 + timeout: 200ms deltatocumulative: {} k8sattributes: auth_type: serviceAccount @@ -160,6 +166,7 @@ service: processors: - memory_limiter - attributes/exporter_name_loki-test-output + - batch/example-tenant-a receivers: - routing/subscription_example-tenant-a-ns_subscription-example-1_outputs logs/output_example-tenant-a-ns_subscription-example-1_collector_otlp-test-output: @@ -169,6 +176,7 @@ service: processors: - memory_limiter - attributes/exporter_name_otlp-test-output + - batch/example-tenant-a receivers: - routing/subscription_example-tenant-a-ns_subscription-example-1_outputs logs/output_example-tenant-a-ns_subscription-example-2_collector_otlp-test-output-2: @@ -178,6 +186,7 @@ service: processors: - memory_limiter - attributes/exporter_name_otlp-test-output-2 + - batch/example-tenant-a receivers: - routing/subscription_example-tenant-a-ns_subscription-example-2_outputs logs/output_example-tenant-b-ns_subscription-example-3_collector_fluentforward-test-output: @@ -187,6 +196,7 @@ service: processors: - memory_limiter - attributes/exporter_name_fluentforward-test-output + - batch/example-tenant-b receivers: - routing/subscription_example-tenant-b-ns_subscription-example-3_outputs logs/output_example-tenant-b-ns_subscription-example-3_collector_otlp-test-output-2: @@ -196,6 +206,7 @@ service: processors: - memory_limiter - attributes/exporter_name_otlp-test-output-2 + - batch/example-tenant-b receivers: - routing/subscription_example-tenant-b-ns_subscription-example-3_outputs logs/tenant_example-tenant-a: diff --git a/internal/controller/telemetry/otel_conf_gen/otel_conf_gen_test.go b/internal/controller/telemetry/otel_conf_gen/otel_conf_gen_test.go index c702559a..5917938c 100644 --- a/internal/controller/telemetry/otel_conf_gen/otel_conf_gen_test.go +++ b/internal/controller/telemetry/otel_conf_gen/otel_conf_gen_test.go @@ -299,6 +299,11 @@ func TestOtelColConfComplex(t *testing.T) { }, } + // Set Batch processor defaults + for i, tenant := range inputCfg.Tenants { + inputCfg.Tenants[i].Spec.SetDefaults(tenant.Name) + } + // Config // The receiver and exporter entries are not serialized because of tags on the underlying data structure. The tests won't contain them, this is a known issue.