Skip to content

Fix Redpanda Migrator bundle output #3428

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ Changelog

All notable changes to this project will be documented in this file.

## 4.56.0 - TBD

### Added

- Field `input_bundle_label` added to the `redpanda_migrator_bundle` output. (@mihaitodor)

## 4.55.1 - 2025-05-19

### Added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ output:
redpanda_migrator: {} # No default (required)
schema_registry: {} # No default (required)
translate_schema_ids: false
input_bundle_label: ""
```

All-in-one output which writes messages and schemas to a Kafka or Redpanda cluster. This output is meant to be used
Expand Down Expand Up @@ -67,4 +68,13 @@ when it already contains some schemas which differ from the ones being migrated.

*Default*: `false`

=== `input_bundle_label`

Specify the redpanda_migrator_bundle input label if one is assigned to it.


*Type*: `string`

*Default*: `""`


Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ mapping: |

let labelPrefix = @label.not_empty().or("redpanda_migrator_bundle")

let redpandaMigrator = this.redpanda_migrator.assign({"output_resource": "%s_redpanda_migrator_output".format($labelPrefix)})
let redpandaMigrator = this.redpanda_migrator

let redpandaMigratorOffsets = this.redpanda_migrator.with("seed_brokers", "topics", "regexp_topics", "consumer_group", "topic_lag_refresh_period", "client_id", "rack_id", "tls", "sasl")

Expand Down Expand Up @@ -155,7 +155,6 @@ tests:
seed_brokers: [ "127.0.0.1:9092" ]
topics: [ "foobar" ]
consumer_group: "migrator"
output_resource: redpanda_migrator_bundle_redpanda_migrator_output
processors:
- mapping: meta input_label = "redpanda_migrator_input"
- label: redpanda_migrator_bundle_redpanda_migrator_offsets_input
Expand Down Expand Up @@ -199,7 +198,6 @@ tests:
seed_brokers: [ "127.0.0.1:9092" ]
topics: [ "foobar" ]
consumer_group: "migrator"
output_resource: redpanda_migrator_bundle_redpanda_migrator_output
processors:
- mapping: meta input_label = "redpanda_migrator_input"
- label: redpanda_migrator_bundle_redpanda_migrator_offsets_input
Expand All @@ -225,7 +223,6 @@ tests:
seed_brokers: [ "127.0.0.1:9092" ]
topics: [ "foobar" ]
consumer_group: "migrator"
output_resource: redpanda_migrator_bundle_redpanda_migrator_output
processors:
- mapping: meta input_label = "redpanda_migrator_input"
- label: redpanda_migrator_bundle_redpanda_migrator_offsets_input
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,18 @@ fields:
Allow the target Schema Registry instance to allocate different schema IDs for migrated schemas. This is useful
when it already contains some schemas which differ from the ones being migrated.

- name: input_bundle_label
type: string
kind: scalar
default: ""
description: |
Specify the redpanda_migrator_bundle input label if one is assigned to it.

mapping: |
#!blobl

let labelPrefix = @label.not_empty().or("redpanda_migrator_bundle")
let inputBundleLabel = this.input_bundle_label.not_empty().or("redpanda_migrator_bundle")

if this.translate_schema_ids == true && this.schema_registry.length() == 0 {
root = throw("Schema ID translation requires schema_registry to be configured")
Expand Down Expand Up @@ -75,7 +83,7 @@ mapping: |
]
},
"translate_schema_ids": this.translate_schema_ids.or(false),
"input_resource": "%s_redpanda_migrator_input".format($labelPrefix)
"input_resource": "%s_redpanda_migrator_input".format($inputBundleLabel)
}
)

Expand All @@ -100,7 +108,7 @@ mapping: |
this.schema_registry.assign({
"subject": "${! @schema_registry_subject }",
"max_in_flight": $srMaxInFlight,
"input_resource": "%s_schema_registry_input".format($labelPrefix),
"input_resource": "%s_schema_registry_input".format($inputBundleLabel),
"translate_ids": this.translate_schema_ids.or(false)
})
}
Expand Down Expand Up @@ -225,6 +233,71 @@ tests:
- output:
reject: ${! @fallback_error }

- name: Migrate messages, offsets and schemas when setting a label on the bundle input
config:
redpanda_migrator:
seed_brokers: [ "127.0.0.1:9092" ]
topic_prefix: dest.
max_in_flight: 1
schema_registry:
url: http://localhost:8081
max_in_flight: 1
input_bundle_label: source

expected:
switch:
cases:
- check: metadata("input_label") == "redpanda_migrator_input"
output:
label: redpanda_migrator_bundle_redpanda_migrator_output
redpanda_migrator:
key: ${! metadata("kafka_key") }
max_in_flight: 1
partition: ${! metadata("kafka_partition").or(throw("missing kafka_partition metadata")) }
partitioner: manual
seed_brokers:
- 127.0.0.1:9092
timestamp_ms: ${! metadata("kafka_timestamp_ms").or(timestamp_unix_milli()) }
topic: ${! metadata("kafka_topic").or(throw("missing kafka_topic metadata")) }
topic_prefix: "dest."
metadata:
include_patterns:
- ^(?:[^k].*|k[^a].*|ka[^f].*|kaf[^k].*|kafk[^a].*|kafka[^_].*)
translate_schema_ids: false
input_resource: source_redpanda_migrator_input
schema_registry_output_resource: redpanda_migrator_bundle_schema_registry_output
processors:
- mapping: |
meta input_label = deleted()
- check: metadata("input_label") == "redpanda_migrator_offsets_input"
output:
label: redpanda_migrator_bundle_redpanda_migrator_offsets_output
redpanda_migrator_offsets:
seed_brokers:
- 127.0.0.1:9092
offset_topic_prefix: "dest."
- check: metadata("input_label") == "schema_registry_input"
output:
fallback:
- label: redpanda_migrator_bundle_schema_registry_output
schema_registry:
subject: ${! @schema_registry_subject }
url: http://localhost:8081
max_in_flight: 1
translate_ids: false
input_resource: source_schema_registry_input
- switch:
cases:
- check: '@fallback_error == "request returned status: 422"'
output:
drop: {}
processors:
- log:
message: |
Subject '${! @schema_registry_subject }' version ${! @schema_registry_version } already has schema: ${! content() }
- output:
reject: ${! @fallback_error }

- name: Migrate messages, offsets and schemas with schema ID translation
config:
redpanda_migrator:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import (
_ "github.com/redpanda-data/benthos/v4/public/components/pure"
)

//go:embed redpanda_migrator_bundle_input.tmpl.yaml
//go:embed input_redpanda_migrator_bundle.tmpl.yaml
var redpandaMigratorInputTemplate []byte

//go:embed redpanda_migrator_bundle_output.tmpl.yaml
//go:embed output_redpanda_migrator_bundle.tmpl.yaml
var redpandaMigratorOutputTemplate []byte

func init() {
Expand Down