Skip to content

feat: batch processor #101

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions api/telemetry/v1alpha1/output_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,50 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type Batch struct {
// From go.opentelemetry.io/collector/processor/batchprocessor

// +kubebuilder:validation:Format=duration

// Timeout sets the time after which a batch will be sent regardless of size.
// When this is set to zero, batched data will be sent immediately.
Timeout string `json:"timeout,omitempty"`

// SendBatchSize is the size of a batch which after hit, will trigger it to be sent.
// When this is set to zero, the batch size is ignored and data will be sent immediately
// subject to only send_batch_max_size.
SendBatchSize uint32 `json:"send_batch_size,omitempty"`

// SendBatchMaxSize is the maximum size of a batch. It must be larger than SendBatchSize.
// Larger batches are split into smaller units.
// Default value is 0, that means no maximum size.
SendBatchMaxSize uint32 `json:"send_batch_max_size,omitempty"`

// MetadataKeys is a list of client.Metadata keys that will be
// used to form distinct batchers. If this setting is empty,
// a single batcher instance will be used. When this setting
// is not empty, one batcher will be used per distinct
// combination of values for the listed metadata keys.
//
// Empty value and unset metadata are treated as distinct cases.
//
// Entries are case-insensitive. Duplicated entries will
// trigger a validation error.
MetadataKeys []string `json:"metadata_keys,omitempty"`

// MetadataCardinalityLimit indicates the maximum number of
// batcher instances that will be created through a distinct
// combination of MetadataKeys.
MetadataCardinalityLimit uint32 `json:"metadata_cardinality_limit,omitempty"`
}

// OutputSpec defines the desired state of Output
type OutputSpec struct {
OTLPGRPC *OTLPGRPC `json:"otlp,omitempty"`
Fluentforward *Fluentforward `json:"fluentforward,omitempty"`
OTLPHTTP *OTLPHTTP `json:"otlphttp,omitempty"`
Authentication *OutputAuth `json:"authentication,omitempty"`
Batch *Batch `json:"batch,omitempty"`
}

type OutputAuth struct {
Expand Down
25 changes: 25 additions & 0 deletions api/telemetry/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,51 @@ spec:
type: string
type: object
type: object
batch:
properties:
metadata_cardinality_limit:
description: |-
MetadataCardinalityLimit indicates the maximum number of
batcher instances that will be created through a distinct
combination of MetadataKeys.
format: int32
type: integer
metadata_keys:
description: |-
MetadataKeys is a list of client.Metadata keys that will be
used to form distinct batchers. If this setting is empty,
a single batcher instance will be used. When this setting
is not empty, one batcher will be used per distinct
combination of values for the listed metadata keys.

Empty value and unset metadata are treated as distinct cases.

Entries are case-insensitive. Duplicated entries will
trigger a validation error.
items:
type: string
type: array
send_batch_max_size:
description: |-
SendBatchMaxSize is the maximum size of a batch. It must be larger than SendBatchSize.
Larger batches are split into smaller units.
Default value is 0, that means no maximum size.
format: int32
type: integer
send_batch_size:
description: |-
SendBatchSize is the size of a batch which after hit, will trigger it to be sent.
When this is set to zero, the batch size is ignored and data will be sent immediately
subject to only send_batch_max_size.
format: int32
type: integer
timeout:
description: |-
Timeout sets the time after which a batch will be sent regardless of size.
When this is set to zero, batched data will be sent immediately.
format: duration
type: string
type: object
fluentforward:
properties:
compress_gzip:
Expand Down
45 changes: 45 additions & 0 deletions config/crd/bases/telemetry.kube-logging.dev_outputs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,51 @@ spec:
type: string
type: object
type: object
batch:
properties:
metadata_cardinality_limit:
description: |-
MetadataCardinalityLimit indicates the maximum number of
batcher instances that will be created through a distinct
combination of MetadataKeys.
format: int32
type: integer
metadata_keys:
description: |-
MetadataKeys is a list of client.Metadata keys that will be
used to form distinct batchers. If this setting is empty,
a single batcher instance will be used. When this setting
is not empty, one batcher will be used per distinct
combination of values for the listed metadata keys.

Empty value and unset metadata are treated as distinct cases.

Entries are case-insensitive. Duplicated entries will
trigger a validation error.
items:
type: string
type: array
send_batch_max_size:
description: |-
SendBatchMaxSize is the maximum size of a batch. It must be larger than SendBatchSize.
Larger batches are split into smaller units.
Default value is 0, that means no maximum size.
format: int32
type: integer
send_batch_size:
description: |-
SendBatchSize is the size of a batch which after hit, will trigger it to be sent.
When this is set to zero, the batch size is ignored and data will be sent immediately
subject to only send_batch_max_size.
format: int32
type: integer
timeout:
description: |-
Timeout sets the time after which a batch will be sent regardless of size.
When this is set to zero, batched data will be sent immediately.
format: duration
type: string
type: object
fluentforward:
properties:
compress_gzip:
Expand Down
6 changes: 6 additions & 0 deletions e2e/testdata/tenants_with_bridges/tenants_with_bridges.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ metadata:
name: otlp-test-output-database
namespace: collector
spec:
batch:
send_batch_size: 8192
timeout: 200ms
otlp:
endpoint: receiver-collector.telemetry-controller-system.svc.cluster.local:4317
tls:
Expand Down Expand Up @@ -132,6 +135,9 @@ metadata:
name: otlp-test-output-web
namespace: collector
spec:
batch:
send_batch_size: 8192
timeout: 200ms
otlp:
endpoint: receiver-collector.telemetry-controller-system.svc.cluster.local:4317
tls:
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/telemetry/collector_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ var (

const (
requeueDelayOnFailedTenant = 20 * time.Second
axoflowOtelCollectorImageRef = "ghcr.io/axoflow/axoflow-otel-collector/axoflow-otel-collector:0.112.0"
axoflowOtelCollectorImageRef = "ghcr.io/axoflow/axoflow-otel-collector/axoflow-otel-collector:0.112.0-dev1"
)

// CollectorReconciler reconciles a Collector object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,14 @@ processors:
- action: insert
key: tenant
value: example-tenant-b
batch/otlp-test-output:
metadata_cardinality_limit: 100
metadata_keys:
- key1
- key2
send_batch_max_size: 4096
send_batch_size: 512
timeout: 5s
deltatocumulative: {}
k8sattributes:
auth_type: serviceAccount
Expand Down Expand Up @@ -169,6 +177,7 @@ service:
processors:
- memory_limiter
- attributes/exporter_name_otlp-test-output
- batch/otlp-test-output
receivers:
- routing/subscription_example-tenant-a-ns_subscription-example-1_outputs
logs/output_example-tenant-a-ns_subscription-example-2_collector_otlp-test-output-2:
Expand Down Expand Up @@ -263,3 +272,9 @@ service:
telemetry:
metrics:
level: detailed
readers:
- pull:
exporter:
prometheus:
host: ""
port: 8888
54 changes: 43 additions & 11 deletions internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ func (cfgInput *OtelColConfigInput) generateProcessors() map[string]any {

for _, output := range cfgInput.OutputsWithSecretData {
processors[fmt.Sprintf("attributes/exporter_name_%s", output.Output.Name)] = processor.GenerateOutputExporterNameProcessor(components.GetExporterNameForOutput(output.Output))

// Add a batch processor if the output has one
if output.Output.Spec.Batch != nil {
processors[fmt.Sprintf("batch/%s", output.Output.Name)] = processor.GenerateBatchProcessorForOutput(*output.Output.Spec.Batch)
}
}

return processors
Expand Down Expand Up @@ -201,6 +206,15 @@ func (cfgInput *OtelColConfigInput) generateNamedPipelines() map[string]*otelv1b

receivers := []string{fmt.Sprintf("routing/subscription_%s_%s_outputs", subscription.Namespace, subscription.Name)}
processors := []string{fmt.Sprintf("attributes/exporter_name_%s", output.Output.Name)}

// NOTE: The order of the processors is important.
// The batch processor should be defined in the pipeline after the memory_limiter as well as any sampling processors.
// This is because batching should happen after any data drops such as sampling.
// ref: https://github.com/open-telemetry/opentelemetry-collector/tree/main/processor#recommended-processors
if output.Output.Spec.Batch != nil {
processors = append(processors, fmt.Sprintf("batch/%s", output.Output.Name))
}

var exporters []string

if output.Output.Spec.OTLPGRPC != nil {
Expand Down Expand Up @@ -228,6 +242,34 @@ func (cfgInput *OtelColConfigInput) generateNamedPipelines() map[string]*otelv1b
return namedPipelines
}

func (cfgInput *OtelColConfigInput) generateTelemetry() map[string]any {
telemetry := map[string]interface{}{
"metrics": map[string]interface{}{
"level": "detailed",
"readers": []map[string]interface{}{
{
"pull": map[string]interface{}{
"exporter": map[string]interface{}{
"prometheus": map[string]interface{}{
"host": "",
"port": 8888,
},
},
},
},
},
},
}

if cfgInput.Debug {
telemetry["logs"] = map[string]string{
"level": "debug",
}
}

return telemetry
}

func (cfgInput *OtelColConfigInput) AssembleConfig(ctx context.Context) otelv1beta1.Config {
exporters := cfgInput.generateExporters(ctx)

Expand All @@ -241,17 +283,7 @@ func (cfgInput *OtelColConfigInput) AssembleConfig(ctx context.Context) otelv1be

pipelines := cfgInput.generateNamedPipelines()

telemetry := make(map[string]any)

telemetry["metrics"] = map[string]string{
"level": "detailed",
}

if cfgInput.Debug {
telemetry["logs"] = map[string]string{
"level": "debug",
}
}
telemetry := cfgInput.generateTelemetry()

if _, ok := processors["memory_limiter"]; ok {
for name, pipeline := range pipelines {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,13 @@ func TestOtelColConfComplex(t *testing.T) {
},
},
},
Batch: &v1alpha1.Batch{
Timeout: "5s",
SendBatchSize: 512,
SendBatchMaxSize: 4096,
MetadataKeys: []string{"key1", "key2"},
MetadataCardinalityLimit: 100,
},
},
},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ func createDecoderConfig(result interface{}, hooks ...mapstructure.DecodeHookFun

// decodeID converts string to component.ID or pipeline.ID
func decodeID(from reflect.Type, to reflect.Type, data interface{}) (interface{}, error) {
// contrib components sometimes does not follow the type/name format
contribComponents := map[string]bool{
// occasionally components don't follow the type/name format
// in such cases, we need to handle them separately
exceptionComponents := map[string]bool{
"debug": true,
"deltatocumulative": true,
"memory_limiter": true,
Expand All @@ -49,7 +50,7 @@ func decodeID(from reflect.Type, to reflect.Type, data interface{}) (interface{}
switch to {
case reflect.TypeOf(component.ID{}):
if len(parts) != 2 {
if contribComponents[parts[0]] {
if exceptionComponents[parts[0]] {
return component.MustNewID(parts[0]), nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ type PrometheusExporterConfig struct {
func GenerateMetricsExporters() map[string]any {
defaultPrometheusExporterConfig := PrometheusExporterConfig{
HTTPServerConfig: HTTPServerConfig{
Endpoint: "0.0.0.0:9999",
Endpoint: ":9999",
},
}

Expand Down
Loading
Loading