Skip to content

feat: add batch processor #99

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 61 additions & 1 deletion api/telemetry/v1alpha1/tenant_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,49 @@
package v1alpha1

import (
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type Batch struct {
// Name of the Batch processor
Name string `json:"name,omitempty"`

// From go.opentelemetry.io/collector/processor/batchprocessor

// Timeout sets the time after which a batch will be sent regardless of size.
// When this is set to zero, batched data will be sent immediately.
Timeout time.Duration `json:"timeout,omitempty"`

// SendBatchSize is the size of a batch which after hit, will trigger it to be sent.
// When this is set to zero, the batch size is ignored and data will be sent immediately
// subject to only send_batch_max_size.
SendBatchSize uint32 `json:"send_batch_size,omitempty"`

// SendBatchMaxSize is the maximum size of a batch. It must be larger than SendBatchSize.
// Larger batches are split into smaller units.
// Default value is 0, that means no maximum size.
SendBatchMaxSize uint32 `json:"send_batch_max_size,omitempty"`

// MetadataKeys is a list of client.Metadata keys that will be
// used to form distinct batchers. If this setting is empty,
// a single batcher instance will be used. When this setting
// is not empty, one batcher will be used per distinct
// combination of values for the listed metadata keys.
//
// Empty value and unset metadata are treated as distinct cases.
//
// Entries are case-insensitive. Duplicated entries will
// trigger a validation error.
MetadataKeys []string `json:"metadata_keys,omitempty"`

// MetadataCardinalityLimit indicates the maximum number of
// batcher instances that will be created through a distinct
// combination of MetadataKeys.
MetadataCardinalityLimit uint32 `json:"metadata_cardinality_limit,omitempty"`
}

// TransformStatement represents a single statement in a Transform processor
// ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor
type TransformStatement struct {
Expand All @@ -43,6 +83,12 @@ type Transform struct {
LogStatements []TransformStatement `json:"logStatements,omitempty"`
}

// Processors defines the tenant level processors
type Processors struct {
Transform Transform `json:"transform,omitempty"`
Batch *Batch `json:"batch,omitempty"`
}

// RouteConfig defines the routing configuration for a tenant
// it will be used to generate routing connectors
type RouteConfig struct {
Expand All @@ -60,10 +106,24 @@ type RouteConfig struct {
type TenantSpec struct {
SubscriptionNamespaceSelectors []metav1.LabelSelector `json:"subscriptionNamespaceSelectors,omitempty"`
LogSourceNamespaceSelectors []metav1.LabelSelector `json:"logSourceNamespaceSelectors,omitempty"`
Transform Transform `json:"transform,omitempty"`
Processors Processors `json:"processors,omitempty"`
RouteConfig RouteConfig `json:"routeConfig,omitempty"`
}

func (t *TenantSpec) SetDefaults(tenantName string) {
if t.Processors.Batch == nil {
t.Processors.Batch = &Batch{
Name: tenantName,
Timeout: 200 * time.Millisecond,
SendBatchSize: 8192,
}
}

if len(t.Processors.Batch.MetadataKeys) > 0 && t.Processors.Batch.MetadataCardinalityLimit == 0 {
t.Processors.Batch.MetadataCardinalityLimit = 1000
}
}

// TenantStatus defines the observed state of Tenant
type TenantStatus struct {
Subscriptions []NamespacedName `json:"subscriptions,omitempty"`
Expand Down
43 changes: 42 additions & 1 deletion api/telemetry/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading