From 3f6c05d9f6fba3ad0feefdf210f44051641fe03e Mon Sep 17 00:00:00 2001 From: Onecer Date: Wed, 10 Jul 2024 14:50:15 +0800 Subject: [PATCH 1/2] feat: fluentd output plugin kafka support rdkafka2 client Signed-off-by: Onecer --- ...logging.banzaicloud.io_clusteroutputs.yaml | 4 ++ .../crds/logging.banzaicloud.io_outputs.yaml | 4 ++ ...logging.banzaicloud.io_clusteroutputs.yaml | 4 ++ .../bases/logging.banzaicloud.io_outputs.yaml | 4 ++ docs/configuration/plugins/outputs/kafka.md | 11 +++++- pkg/sdk/logging/model/output/kafka.go | 17 ++++++-- pkg/sdk/logging/model/output/kafka_test.go | 39 +++++++++++++++++++ .../model/output/zz_generated.deepcopy.go | 5 +++ 8 files changed, 82 insertions(+), 6 deletions(-) diff --git a/charts/logging-operator/crds/logging.banzaicloud.io_clusteroutputs.yaml b/charts/logging-operator/crds/logging.banzaicloud.io_clusteroutputs.yaml index 4a5f05e03..9179a1e0f 100644 --- a/charts/logging-operator/crds/logging.banzaicloud.io_clusteroutputs.yaml +++ b/charts/logging-operator/crds/logging.banzaicloud.io_clusteroutputs.yaml @@ -3424,6 +3424,8 @@ spec: type: string use_default_for_unknown_topic: type: boolean + use_rdkafka: + type: boolean username: properties: mountFrom: @@ -10821,6 +10823,8 @@ spec: type: string use_default_for_unknown_topic: type: boolean + use_rdkafka: + type: boolean username: properties: mountFrom: diff --git a/charts/logging-operator/crds/logging.banzaicloud.io_outputs.yaml b/charts/logging-operator/crds/logging.banzaicloud.io_outputs.yaml index 222995229..7f24ee24f 100644 --- a/charts/logging-operator/crds/logging.banzaicloud.io_outputs.yaml +++ b/charts/logging-operator/crds/logging.banzaicloud.io_outputs.yaml @@ -3420,6 +3420,8 @@ spec: type: string use_default_for_unknown_topic: type: boolean + use_rdkafka: + type: boolean username: properties: mountFrom: @@ -10091,6 +10093,8 @@ spec: type: string use_default_for_unknown_topic: type: boolean + use_rdkafka: + type: boolean username: properties: mountFrom: diff --git a/config/crd/bases/logging.banzaicloud.io_clusteroutputs.yaml b/config/crd/bases/logging.banzaicloud.io_clusteroutputs.yaml index 4a5f05e03..9179a1e0f 100644 --- a/config/crd/bases/logging.banzaicloud.io_clusteroutputs.yaml +++ b/config/crd/bases/logging.banzaicloud.io_clusteroutputs.yaml @@ -3424,6 +3424,8 @@ spec: type: string use_default_for_unknown_topic: type: boolean + use_rdkafka: + type: boolean username: properties: mountFrom: @@ -10821,6 +10823,8 @@ spec: type: string use_default_for_unknown_topic: type: boolean + use_rdkafka: + type: boolean username: properties: mountFrom: diff --git a/config/crd/bases/logging.banzaicloud.io_outputs.yaml b/config/crd/bases/logging.banzaicloud.io_outputs.yaml index 222995229..7f24ee24f 100644 --- a/config/crd/bases/logging.banzaicloud.io_outputs.yaml +++ b/config/crd/bases/logging.banzaicloud.io_outputs.yaml @@ -3420,6 +3420,8 @@ spec: type: string use_default_for_unknown_topic: type: boolean + use_rdkafka: + type: boolean username: properties: mountFrom: @@ -10091,6 +10093,8 @@ spec: type: string use_default_for_unknown_topic: type: boolean + use_rdkafka: + type: boolean username: properties: mountFrom: diff --git a/docs/configuration/plugins/outputs/kafka.md b/docs/configuration/plugins/outputs/kafka.md index 9ca2e115e..ba6b81656 100644 --- a/docs/configuration/plugins/outputs/kafka.md +++ b/docs/configuration/plugins/outputs/kafka.md @@ -33,7 +33,9 @@ spec: ## Configuration ## Kafka -Send your logs to Kafka +Send your logs to Kafka. +Setting use_rdkafka to true opts for rdkafka2, which offers higher performance compared to ruby-kafka. +-[more info](https://github.com/fluent/fluent-plugin-kafka#output-plugin) ### ack_timeout (int, optional) {#kafka-ack_timeout} @@ -212,7 +214,7 @@ Client certificate key Verify certificate hostname -### sasl_over_ssl (bool, required) {#kafka-sasl_over_ssl} +### sasl_over_ssl (*bool, optional) {#kafka-sasl_over_ssl} SASL over SSL @@ -240,6 +242,11 @@ Use default for unknown topics Default: false +### use_rdkafka (bool, optional) {#kafka-use_rdkafka} + +Use rdkafka of the output plugin. + + ### username (*secret.Secret, optional) {#kafka-username} Username when using PLAIN/SCRAM SASL authentication diff --git a/pkg/sdk/logging/model/output/kafka.go b/pkg/sdk/logging/model/output/kafka.go index 732cea94d..475234e68 100644 --- a/pkg/sdk/logging/model/output/kafka.go +++ b/pkg/sdk/logging/model/output/kafka.go @@ -58,9 +58,12 @@ type _metaKafka interface{} //nolint:deadcode,unused // +kubebuilder:object:generate=true // +docName:"Kafka" -// Send your logs to Kafka +// Send your logs to Kafka. +// Setting use_rdkafka to true opts for rdkafka2, which offers higher performance compared to ruby-kafka. +// -[more info](https://github.com/fluent/fluent-plugin-kafka#output-plugin) type KafkaOutputConfig struct { - + // Use rdkafka of the output plugin. + UseRdkafka bool `json:"use_rdkafka,omitempty"` // The list of all seed brokers, with their host and port information. Brokers string `json:"brokers"` // Topic Key (default: "topic") @@ -95,7 +98,7 @@ type KafkaOutputConfig struct { Idempotent bool `json:"idempotent,omitempty"` // SASL over SSL (default: true) // +kubebuilder:validation:Optional - SaslOverSSL bool `json:"sasl_over_ssl"` + SaslOverSSL *bool `json:"sasl_over_ssl,omitempty"` Principal string `json:"principal,omitempty"` Keytab *secret.Secret `json:"keytab,omitempty"` // Username when using PLAIN/SCRAM SASL authentication @@ -141,7 +144,10 @@ type KafkaOutputConfig struct { } func (e *KafkaOutputConfig) ToDirective(secretLoader secret.SecretLoader, id string) (types.Directive, error) { - const pluginType = "kafka2" + pluginType := "kafka2" + if e.UseRdkafka { + pluginType = "rdkafka2" + } kafka := &types.OutputPlugin{ PluginMeta: types.PluginMeta{ Type: pluginType, @@ -171,5 +177,8 @@ func (e *KafkaOutputConfig) ToDirective(secretLoader secret.SecretLoader, id str kafka.SubDirectives = append(kafka.SubDirectives, format) } } + + // remove use_rdkafka from params, it is not a valid parameter for plugin config + delete(kafka.Params, "use_rdkafka") return kafka, nil } diff --git a/pkg/sdk/logging/model/output/kafka_test.go b/pkg/sdk/logging/model/output/kafka_test.go index f22a0f6ee..c49328736 100644 --- a/pkg/sdk/logging/model/output/kafka_test.go +++ b/pkg/sdk/logging/model/output/kafka_test.go @@ -62,3 +62,42 @@ buffer: test := render.NewOutputPluginTest(t, kafka) test.DiffResult(expected) } + +func TestRdkafka(t *testing.T) { + CONFIG := []byte(` +brokers: kafka-headless.kafka.svc.cluster.local:29092 +default_topic: topic +use_rdkafka: true +ssl_verify_hostname: false +format: + type: json +buffer: + timekey: 1m + timekey_wait: 30s + timekey_use_utc: true +`) + expected := ` + + @type rdkafka2 + @id test + brokers kafka-headless.kafka.svc.cluster.local:29092 + default_topic topic + ssl_verify_hostname false + + @type file + path /buffers/test.*.buffer + retry_forever true + timekey 1m + timekey_use_utc true + timekey_wait 30s + + + @type json + + +` + kafka := &output.KafkaOutputConfig{} + require.NoError(t, yaml.Unmarshal(CONFIG, kafka)) + test := render.NewOutputPluginTest(t, kafka) + test.DiffResult(expected) +} diff --git a/pkg/sdk/logging/model/output/zz_generated.deepcopy.go b/pkg/sdk/logging/model/output/zz_generated.deepcopy.go index df5cd8a0a..6c0ebc29d 100644 --- a/pkg/sdk/logging/model/output/zz_generated.deepcopy.go +++ b/pkg/sdk/logging/model/output/zz_generated.deepcopy.go @@ -706,6 +706,11 @@ func (in *KafkaOutputConfig) DeepCopyInto(out *KafkaOutputConfig) { (*out)[key] = val } } + if in.SaslOverSSL != nil { + in, out := &in.SaslOverSSL, &out.SaslOverSSL + *out = new(bool) + **out = **in + } if in.Keytab != nil { in, out := &in.Keytab, &out.Keytab *out = new(secret.Secret) From 561e69c1e93e30fcd12d65f6d9fc3a3654f7ce34 Mon Sep 17 00:00:00 2001 From: Peter Wilcsinszky Date: Thu, 18 Jul 2024 08:59:49 +0200 Subject: [PATCH 2/2] rdkafka2: update image versions, add notes on availability Signed-off-by: Peter Wilcsinszky --- docs/configuration/plugins/outputs/gelf.md | 2 +- docs/configuration/plugins/outputs/kafka.md | 3 ++- e2e/common/helpers.go | 2 +- .../fluentd_aggregator_test.go | 2 +- e2e/fluentd-aggregator/fluentd_aggregator_test.go | 4 ++-- e2e/volumedrain/volumedrain_test.go | 2 +- pkg/sdk/logging/api/v1beta1/logging_types.go | 2 +- pkg/sdk/logging/model/output/gelf.go | 2 +- pkg/sdk/logging/model/output/kafka.go | 4 +++- 9 files changed, 13 insertions(+), 10 deletions(-) diff --git a/docs/configuration/plugins/outputs/gelf.md b/docs/configuration/plugins/outputs/gelf.md index a8d4b301d..c6bdceb57 100644 --- a/docs/configuration/plugins/outputs/gelf.md +++ b/docs/configuration/plugins/outputs/gelf.md @@ -13,7 +13,7 @@ generated_file: true ### buffer (*Buffer, optional) {#output config-buffer} -Available since ghcr.io/kube-logging/fluentd:v1.16-full-build.139 [Buffer](../buffer/) +Available since ghcr.io/kube-logging/fluentd:v1.16-4.8-full [Buffer](../buffer/) ### host (string, required) {#output config-host} diff --git a/docs/configuration/plugins/outputs/kafka.md b/docs/configuration/plugins/outputs/kafka.md index ba6b81656..5fd0e135c 100644 --- a/docs/configuration/plugins/outputs/kafka.md +++ b/docs/configuration/plugins/outputs/kafka.md @@ -35,6 +35,7 @@ spec: Send your logs to Kafka. Setting use_rdkafka to true opts for rdkafka2, which offers higher performance compared to ruby-kafka. +(Note: requires fluentd image version v1.16-4.9-full or higher) -[more info](https://github.com/fluent/fluent-plugin-kafka#output-plugin) ### ack_timeout (int, optional) {#kafka-ack_timeout} @@ -244,7 +245,7 @@ Default: false ### use_rdkafka (bool, optional) {#kafka-use_rdkafka} -Use rdkafka of the output plugin. +Use rdkafka2 instead of the legacy kafka2 output plugin. This plugin requires fluentd image version v1.16-4.9-full or higher. ### username (*secret.Secret, optional) {#kafka-username} diff --git a/e2e/common/helpers.go b/e2e/common/helpers.go index 97d14ad2d..0f89a884b 100644 --- a/e2e/common/helpers.go +++ b/e2e/common/helpers.go @@ -125,7 +125,7 @@ func LoggingInfra( ControlNamespace: nsInfra, FluentdSpec: &v1beta1.FluentdSpec{ Image: v1beta1.ImageSpec{ - Tag: "v1.16-4.8-base", + Tag: "v1.16-4.9-base", }, DisablePvc: true, Resources: v12.ResourceRequirements{ diff --git a/e2e/fluentd-aggregator-namespacelabel/fluentd_aggregator_test.go b/e2e/fluentd-aggregator-namespacelabel/fluentd_aggregator_test.go index d22051994..3ff34ac07 100644 --- a/e2e/fluentd-aggregator-namespacelabel/fluentd_aggregator_test.go +++ b/e2e/fluentd-aggregator-namespacelabel/fluentd_aggregator_test.go @@ -103,7 +103,7 @@ func TestFluentdAggregator_NamespaceLabel(t *testing.T) { }, FluentdSpec: &v1beta1.FluentdSpec{ Image: v1beta1.ImageSpec{ - Tag: "v1.16-4.8-base", + Tag: "v1.16-4.9-base", }, Resources: corev1.ResourceRequirements{ Limits: corev1.ResourceList{ diff --git a/e2e/fluentd-aggregator/fluentd_aggregator_test.go b/e2e/fluentd-aggregator/fluentd_aggregator_test.go index 10aec4b4f..56cca8246 100644 --- a/e2e/fluentd-aggregator/fluentd_aggregator_test.go +++ b/e2e/fluentd-aggregator/fluentd_aggregator_test.go @@ -91,7 +91,7 @@ func TestFluentdAggregator_MultiWorker(t *testing.T) { }, FluentdSpec: &v1beta1.FluentdSpec{ Image: v1beta1.ImageSpec{ - Tag: "v1.16-4.8-base", + Tag: "v1.16-4.9-base", }, Resources: corev1.ResourceRequirements{ Limits: corev1.ResourceList{ @@ -247,7 +247,7 @@ func TestFluentdAggregator_ConfigChecks(t *testing.T) { }, FluentdSpec: &v1beta1.FluentdSpec{ Image: v1beta1.ImageSpec{ - Tag: "v1.16-4.8-base", + Tag: "v1.16-4.9-base", }, Resources: corev1.ResourceRequirements{ Limits: corev1.ResourceList{ diff --git a/e2e/volumedrain/volumedrain_test.go b/e2e/volumedrain/volumedrain_test.go index 2b0eded0b..77525d68a 100644 --- a/e2e/volumedrain/volumedrain_test.go +++ b/e2e/volumedrain/volumedrain_test.go @@ -89,7 +89,7 @@ func TestVolumeDrain_Downscale(t *testing.T) { }, FluentdSpec: &v1beta1.FluentdSpec{ Image: v1beta1.ImageSpec{ - Tag: "v1.16-4.8-base", + Tag: "v1.16-4.9-base", }, Resources: corev1.ResourceRequirements{ Limits: corev1.ResourceList{ diff --git a/pkg/sdk/logging/api/v1beta1/logging_types.go b/pkg/sdk/logging/api/v1beta1/logging_types.go index 9b7b4e5d2..3e6394d49 100644 --- a/pkg/sdk/logging/api/v1beta1/logging_types.go +++ b/pkg/sdk/logging/api/v1beta1/logging_types.go @@ -172,7 +172,7 @@ const ( DefaultFluentbitConfigReloaderImageRepository = "ghcr.io/kube-logging/config-reloader" DefaultFluentbitConfigReloaderImageTag = "v0.0.5" DefaultFluentdImageRepository = "ghcr.io/kube-logging/fluentd" - DefaultFluentdImageTag = "v1.16-4.8-full" + DefaultFluentdImageTag = "v1.16-4.9-full" DefaultFluentdBufferStorageVolumeName = "fluentd-buffer" DefaultFluentdDrainWatchImageRepository = "ghcr.io/kube-logging/fluentd-drain-watch" DefaultFluentdDrainWatchImageTag = "v0.2.1" diff --git a/pkg/sdk/logging/model/output/gelf.go b/pkg/sdk/logging/model/output/gelf.go index a893d725a..f33501392 100644 --- a/pkg/sdk/logging/model/output/gelf.go +++ b/pkg/sdk/logging/model/output/gelf.go @@ -49,7 +49,7 @@ type GELFOutputConfig struct { TLS *bool `json:"tls,omitempty"` // TLS options (default: {}). For details, see [https://github.com/graylog-labs/gelf-rb/blob/72916932b789f7a6768c3cdd6ab69a3c942dbcef/lib/gelf/transport/tcp_tls.rb#L7-L12](https://github.com/graylog-labs/gelf-rb/blob/72916932b789f7a6768c3cdd6ab69a3c942dbcef/lib/gelf/transport/tcp_tls.rb#L7-L12). TLSOptions map[string]string `json:"tls_options,omitempty"` - // Available since ghcr.io/kube-logging/fluentd:v1.16-full-build.139 + // Available since ghcr.io/kube-logging/fluentd:v1.16-4.8-full // +docLink:"Buffer,../buffer/" Buffer *Buffer `json:"buffer,omitempty"` } diff --git a/pkg/sdk/logging/model/output/kafka.go b/pkg/sdk/logging/model/output/kafka.go index 475234e68..f920ef2e6 100644 --- a/pkg/sdk/logging/model/output/kafka.go +++ b/pkg/sdk/logging/model/output/kafka.go @@ -16,6 +16,7 @@ package output import ( "github.com/cisco-open/operator-tools/pkg/secret" + "github.com/kube-logging/logging-operator/pkg/sdk/logging/model/types" ) @@ -60,9 +61,10 @@ type _metaKafka interface{} //nolint:deadcode,unused // +docName:"Kafka" // Send your logs to Kafka. // Setting use_rdkafka to true opts for rdkafka2, which offers higher performance compared to ruby-kafka. +// (Note: requires fluentd image version v1.16-4.9-full or higher) // -[more info](https://github.com/fluent/fluent-plugin-kafka#output-plugin) type KafkaOutputConfig struct { - // Use rdkafka of the output plugin. + // Use rdkafka2 instead of the legacy kafka2 output plugin. This plugin requires fluentd image version v1.16-4.9-full or higher. UseRdkafka bool `json:"use_rdkafka,omitempty"` // The list of all seed brokers, with their host and port information. Brokers string `json:"brokers"`