From eccd7646cd4e6415c8fbcc0fc27dc69b0babb565 Mon Sep 17 00:00:00 2001 From: Mihai Todor Date: Fri, 16 May 2025 20:30:43 +0100 Subject: [PATCH 1/4] Cleanup Redpanda Migrator file names Signed-off-by: Mihai Todor --- ...{redpanda_migrator_input.go => input_redpanda_migrator.go} | 0 ...put.tmpl.yaml => input_redpanda_migrator_bundle.tmpl.yaml} | 0 ...or_offsets_input.go => input_redpanda_migrator_offsets.go} | 0 .../{schema_registry_input.go => input_schema_registry.go} | 0 ...edpanda_migrator_output.go => output_redpanda_migrator.go} | 0 ...ut.tmpl.yaml => output_redpanda_migrator_bundle.tmpl.yaml} | 0 ..._offsets_output.go => output_redpanda_migrator_offsets.go} | 0 .../{schema_registry_output.go => output_schema_registry.go} | 0 internal/impl/kafka/{init.go => redpanda_migrator.go} | 4 ++-- 9 files changed, 2 insertions(+), 2 deletions(-) rename internal/impl/kafka/{redpanda_migrator_input.go => input_redpanda_migrator.go} (100%) rename internal/impl/kafka/{redpanda_migrator_bundle_input.tmpl.yaml => input_redpanda_migrator_bundle.tmpl.yaml} (100%) rename internal/impl/kafka/{redpanda_migrator_offsets_input.go => input_redpanda_migrator_offsets.go} (100%) rename internal/impl/kafka/{schema_registry_input.go => input_schema_registry.go} (100%) rename internal/impl/kafka/{redpanda_migrator_output.go => output_redpanda_migrator.go} (100%) rename internal/impl/kafka/{redpanda_migrator_bundle_output.tmpl.yaml => output_redpanda_migrator_bundle.tmpl.yaml} (100%) rename internal/impl/kafka/{redpanda_migrator_offsets_output.go => output_redpanda_migrator_offsets.go} (100%) rename internal/impl/kafka/{schema_registry_output.go => output_schema_registry.go} (100%) rename internal/impl/kafka/{init.go => redpanda_migrator.go} (91%) diff --git a/internal/impl/kafka/redpanda_migrator_input.go b/internal/impl/kafka/input_redpanda_migrator.go similarity index 100% rename from internal/impl/kafka/redpanda_migrator_input.go rename to internal/impl/kafka/input_redpanda_migrator.go diff --git a/internal/impl/kafka/redpanda_migrator_bundle_input.tmpl.yaml b/internal/impl/kafka/input_redpanda_migrator_bundle.tmpl.yaml similarity index 100% rename from internal/impl/kafka/redpanda_migrator_bundle_input.tmpl.yaml rename to internal/impl/kafka/input_redpanda_migrator_bundle.tmpl.yaml diff --git a/internal/impl/kafka/redpanda_migrator_offsets_input.go b/internal/impl/kafka/input_redpanda_migrator_offsets.go similarity index 100% rename from internal/impl/kafka/redpanda_migrator_offsets_input.go rename to internal/impl/kafka/input_redpanda_migrator_offsets.go diff --git a/internal/impl/kafka/schema_registry_input.go b/internal/impl/kafka/input_schema_registry.go similarity index 100% rename from internal/impl/kafka/schema_registry_input.go rename to internal/impl/kafka/input_schema_registry.go diff --git a/internal/impl/kafka/redpanda_migrator_output.go b/internal/impl/kafka/output_redpanda_migrator.go similarity index 100% rename from internal/impl/kafka/redpanda_migrator_output.go rename to internal/impl/kafka/output_redpanda_migrator.go diff --git a/internal/impl/kafka/redpanda_migrator_bundle_output.tmpl.yaml b/internal/impl/kafka/output_redpanda_migrator_bundle.tmpl.yaml similarity index 100% rename from internal/impl/kafka/redpanda_migrator_bundle_output.tmpl.yaml rename to internal/impl/kafka/output_redpanda_migrator_bundle.tmpl.yaml diff --git a/internal/impl/kafka/redpanda_migrator_offsets_output.go b/internal/impl/kafka/output_redpanda_migrator_offsets.go similarity index 100% rename from internal/impl/kafka/redpanda_migrator_offsets_output.go rename to internal/impl/kafka/output_redpanda_migrator_offsets.go diff --git a/internal/impl/kafka/schema_registry_output.go b/internal/impl/kafka/output_schema_registry.go similarity index 100% rename from internal/impl/kafka/schema_registry_output.go rename to internal/impl/kafka/output_schema_registry.go diff --git a/internal/impl/kafka/init.go b/internal/impl/kafka/redpanda_migrator.go similarity index 91% rename from internal/impl/kafka/init.go rename to internal/impl/kafka/redpanda_migrator.go index e9fa89db95..5e99b4f84e 100644 --- a/internal/impl/kafka/init.go +++ b/internal/impl/kafka/redpanda_migrator.go @@ -24,10 +24,10 @@ import ( _ "github.com/redpanda-data/benthos/v4/public/components/pure" ) -//go:embed redpanda_migrator_bundle_input.tmpl.yaml +//go:embed input_redpanda_migrator_bundle.tmpl.yaml var redpandaMigratorInputTemplate []byte -//go:embed redpanda_migrator_bundle_output.tmpl.yaml +//go:embed output_redpanda_migrator_bundle.tmpl.yaml var redpandaMigratorOutputTemplate []byte func init() { From 85f6af78c91b10972b230a0d07a22bfd600ee017 Mon Sep 17 00:00:00 2001 From: Mihai Todor Date: Fri, 16 May 2025 20:38:11 +0100 Subject: [PATCH 2/4] Fix redpanda_migrator_bundle output template The output needs to know what custom label was set for the redpanda_migrator_bundle input, if any. Otherwise, it can't find the input components it needs to talk to. Signed-off-by: Mihai Todor --- CHANGELOG.md | 1 + .../outputs/redpanda_migrator_bundle.adoc | 10 +++ .../input_redpanda_migrator_bundle.tmpl.yaml | 5 +- .../output_redpanda_migrator_bundle.tmpl.yaml | 77 ++++++++++++++++++- 4 files changed, 87 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 39a3d69bc8..fc3d8b43bb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ All notable changes to this project will be documented in this file. - The `mcp-server` subcommand now supports the new streamable HTTP spec when the `address` flag is specified. (@Jeffail) - Field `poll_interval` added to the `redpanda_migrator_offsets` input. (@mihaitodor) - Field `consumer_group_offsets_poll_interval` added to the `redpanda_migrator_bundle` input. (@mihaitodor) +- Field `input_bundle_label` added to the `redpanda_migrator_bundle` output. (@mihaitodor) ### Fixed diff --git a/docs/modules/components/pages/outputs/redpanda_migrator_bundle.adoc b/docs/modules/components/pages/outputs/redpanda_migrator_bundle.adoc index f62ed4c198..ec1ed304e7 100644 --- a/docs/modules/components/pages/outputs/redpanda_migrator_bundle.adoc +++ b/docs/modules/components/pages/outputs/redpanda_migrator_bundle.adoc @@ -33,6 +33,7 @@ output: redpanda_migrator: {} # No default (required) schema_registry: {} # No default (required) translate_schema_ids: false + input_bundle_label: "" ``` All-in-one output which writes messages and schemas to a Kafka or Redpanda cluster. This output is meant to be used @@ -67,4 +68,13 @@ when it already contains some schemas which differ from the ones being migrated. *Default*: `false` +=== `input_bundle_label` + +Specify the redpanda_migrator_bundle input label if one is assigned to it. + + +*Type*: `string` + +*Default*: `""` + diff --git a/internal/impl/kafka/input_redpanda_migrator_bundle.tmpl.yaml b/internal/impl/kafka/input_redpanda_migrator_bundle.tmpl.yaml index 92eef95f3e..ae96b6e799 100644 --- a/internal/impl/kafka/input_redpanda_migrator_bundle.tmpl.yaml +++ b/internal/impl/kafka/input_redpanda_migrator_bundle.tmpl.yaml @@ -53,7 +53,7 @@ mapping: | let labelPrefix = @label.not_empty().or("redpanda_migrator_bundle") - let redpandaMigrator = this.redpanda_migrator.assign({"output_resource": "%s_redpanda_migrator_output".format($labelPrefix)}) + let redpandaMigrator = this.redpanda_migrator let redpandaMigratorOffsets = this.redpanda_migrator.with( "seed_brokers", @@ -177,7 +177,6 @@ tests: seed_brokers: [ "127.0.0.1:9092" ] topics: [ "foobar" ] consumer_group: "migrator" - output_resource: redpanda_migrator_bundle_redpanda_migrator_output processors: - mapping: meta input_label = "redpanda_migrator_input" - label: redpanda_migrator_bundle_redpanda_migrator_offsets_input @@ -222,7 +221,6 @@ tests: seed_brokers: [ "127.0.0.1:9092" ] topics: [ "foobar" ] consumer_group: "migrator" - output_resource: redpanda_migrator_bundle_redpanda_migrator_output processors: - mapping: meta input_label = "redpanda_migrator_input" - label: redpanda_migrator_bundle_redpanda_migrator_offsets_input @@ -249,7 +247,6 @@ tests: seed_brokers: [ "127.0.0.1:9092" ] topics: [ "foobar" ] consumer_group: "migrator" - output_resource: redpanda_migrator_bundle_redpanda_migrator_output processors: - mapping: meta input_label = "redpanda_migrator_input" - label: redpanda_migrator_bundle_redpanda_migrator_offsets_input diff --git a/internal/impl/kafka/output_redpanda_migrator_bundle.tmpl.yaml b/internal/impl/kafka/output_redpanda_migrator_bundle.tmpl.yaml index e6920ec365..c0f7cd5b3e 100644 --- a/internal/impl/kafka/output_redpanda_migrator_bundle.tmpl.yaml +++ b/internal/impl/kafka/output_redpanda_migrator_bundle.tmpl.yaml @@ -44,10 +44,18 @@ fields: Allow the target Schema Registry instance to allocate different schema IDs for migrated schemas. This is useful when it already contains some schemas which differ from the ones being migrated. + - name: input_bundle_label + type: string + kind: scalar + default: "" + description: | + Specify the redpanda_migrator_bundle input label if one is assigned to it. + mapping: | #!blobl let labelPrefix = @label.not_empty().or("redpanda_migrator_bundle") + let inputBundleLabel = this.input_bundle_label.not_empty().or("redpanda_migrator_bundle") if this.translate_schema_ids == true && this.schema_registry.length() == 0 { root = throw("Schema ID translation requires schema_registry to be configured") @@ -75,7 +83,7 @@ mapping: | ] }, "translate_schema_ids": this.translate_schema_ids.or(false), - "input_resource": "%s_redpanda_migrator_input".format($labelPrefix) + "input_resource": "%s_redpanda_migrator_input".format($inputBundleLabel) } ) @@ -100,7 +108,7 @@ mapping: | this.schema_registry.assign({ "subject": "${! @schema_registry_subject }", "max_in_flight": $srMaxInFlight, - "input_resource": "%s_schema_registry_input".format($labelPrefix), + "input_resource": "%s_schema_registry_input".format($inputBundleLabel), "translate_ids": this.translate_schema_ids.or(false) }) } @@ -225,6 +233,71 @@ tests: - output: reject: ${! @fallback_error } + - name: Migrate messages, offsets and schemas when setting a label on the bundle input + config: + redpanda_migrator: + seed_brokers: [ "127.0.0.1:9092" ] + topic_prefix: dest. + max_in_flight: 1 + schema_registry: + url: http://localhost:8081 + max_in_flight: 1 + input_bundle_label: source + + expected: + switch: + cases: + - check: metadata("input_label") == "redpanda_migrator_input" + output: + label: redpanda_migrator_bundle_redpanda_migrator_output + redpanda_migrator: + key: ${! metadata("kafka_key") } + max_in_flight: 1 + partition: ${! metadata("kafka_partition").or(throw("missing kafka_partition metadata")) } + partitioner: manual + seed_brokers: + - 127.0.0.1:9092 + timestamp_ms: ${! metadata("kafka_timestamp_ms").or(timestamp_unix_milli()) } + topic: ${! metadata("kafka_topic").or(throw("missing kafka_topic metadata")) } + topic_prefix: "dest." + metadata: + include_patterns: + - ^(?:[^k].*|k[^a].*|ka[^f].*|kaf[^k].*|kafk[^a].*|kafka[^_].*) + translate_schema_ids: false + input_resource: source_redpanda_migrator_input + schema_registry_output_resource: redpanda_migrator_bundle_schema_registry_output + processors: + - mapping: | + meta input_label = deleted() + - check: metadata("input_label") == "redpanda_migrator_offsets_input" + output: + label: redpanda_migrator_bundle_redpanda_migrator_offsets_output + redpanda_migrator_offsets: + seed_brokers: + - 127.0.0.1:9092 + offset_topic_prefix: "dest." + - check: metadata("input_label") == "schema_registry_input" + output: + fallback: + - label: redpanda_migrator_bundle_schema_registry_output + schema_registry: + subject: ${! @schema_registry_subject } + url: http://localhost:8081 + max_in_flight: 1 + translate_ids: false + input_resource: source_schema_registry_input + - switch: + cases: + - check: '@fallback_error == "request returned status: 422"' + output: + drop: {} + processors: + - log: + message: | + Subject '${! @schema_registry_subject }' version ${! @schema_registry_version } already has schema: ${! content() } + - output: + reject: ${! @fallback_error } + - name: Migrate messages, offsets and schemas with schema ID translation config: redpanda_migrator: From 217c153dbf9aa360eed921b59be649bfca8ad403 Mon Sep 17 00:00:00 2001 From: Mihai Todor Date: Tue, 3 Jun 2025 20:49:41 +0100 Subject: [PATCH 3/4] Update Redpanda Migrator sequence diagrams Signed-off-by: Mihai Todor --- internal/impl/kafka/redpanda_migrator.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/impl/kafka/redpanda_migrator.md b/internal/impl/kafka/redpanda_migrator.md index 6638df8241..db3fc42a9a 100644 --- a/internal/impl/kafka/redpanda_migrator.md +++ b/internal/impl/kafka/redpanda_migrator.md @@ -28,7 +28,7 @@ participant Offsets Input participant Offsets Output participant Destination -Source->>Offsets Input: O = Receive(__consumer_offsets) +Source->>Offsets Input: O = OffsetFetch() Source->>Offsets Input: X = ListEndOffsets(T, P) Source->>Offsets Input: Check X < O Source->>Offsets Input: TS = ReadTimestamp(T, P, O) @@ -49,7 +49,7 @@ participant Offsets Input participant Offsets Output participant Destination -Source->>Offsets Input: O = Receive(__consumer_offsets) +Source->>Offsets Input: O = OffsetFetch() Source->>Offsets Input: X = ListEndOffsets(T, P) Source->>Offsets Input: Check X == O Source->>Offsets Input: TS = ReadTimestamp(T, P, -1) From 6c3ccb36c8d829f3bf749967ffb3ecefc7965521 Mon Sep 17 00:00:00 2001 From: Mihai Todor Date: Tue, 3 Jun 2025 20:50:08 +0100 Subject: [PATCH 4/4] Fix error messages in redpada_migrator_offsets output Signed-off-by: Mihai Todor --- .../impl/kafka/output_redpanda_migrator_offsets.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/impl/kafka/output_redpanda_migrator_offsets.go b/internal/impl/kafka/output_redpanda_migrator_offsets.go index d15ea74372..c405e4575f 100644 --- a/internal/impl/kafka/output_redpanda_migrator_offsets.go +++ b/internal/impl/kafka/output_redpanda_migrator_offsets.go @@ -310,12 +310,12 @@ func (w *redpandaMigratorOffsetsWriter) Write(ctx context.Context, msg *service. if isHighWatermark && offset.Timestamp != -1 { offsets, err := w.client.ListEndOffsets(ctx, topic) if err != nil { - return fmt.Errorf("failed to list the high watermark for topic %q and partition %q (timestamp %d): %s", topic, partition, offsetCommitTimestamp, err) + return fmt.Errorf("failed to list the high watermark for topic %q and partition %d (timestamp %d): %s", topic, partition, offsetCommitTimestamp, err) } highWatermark, ok := offsets.Lookup(topic, partition) if !ok { - return fmt.Errorf("failed to read the high watermark for topic %q and partition %q (timestamp %d): %s", topic, partition, offsetCommitTimestamp, err) + return fmt.Errorf("failed to read the high watermark for topic %q and partition %d (timestamp %d): %s", topic, partition, offsetCommitTimestamp, err) } if highWatermark.Offset == offset.Offset+1 { offset.Offset = highWatermark.Offset @@ -333,14 +333,14 @@ func (w *redpandaMigratorOffsetsWriter) Write(ctx context.Context, msg *service. offsetResponses, err := w.client.CommitOffsets(ctx, group, offsets) if err != nil { - return fmt.Errorf("failed to commit consumer offsets for topic %q and partition %q (timestamp %d): %s", topic, partition, offsetCommitTimestamp, err) + return fmt.Errorf("failed to commit consumer offsets for topic %q and partition %d (timestamp %d): %s", topic, partition, offsetCommitTimestamp, err) } if err := offsetResponses.Error(); err != nil { - return fmt.Errorf("committed consumer offsets returned an error for topic %q and partition %q (timestamp %d): %s", topic, partition, offsetCommitTimestamp, err) + return fmt.Errorf("committed consumer offsets returned an error for topic %q and partition %d (timestamp %d): %s", topic, partition, offsetCommitTimestamp, err) } - w.mgr.Logger().Debugf("Wrote offset for topic %q and timestamp %d: %d", topic, offsetCommitTimestamp, offset.Offset) + w.mgr.Logger().Debugf("Wrote offset for topic %q and partition %d and timestamp %d: %d", topic, partition, offsetCommitTimestamp, offset.Offset) return nil }