Skip to content

Commit 21fb30c

Browse files
committed
Fix redpanda_migrator_bundle output template
The output needs to know what custom label was set for the redpanda_migrator_bundle input, if any. Otherwise, it can't find the input components it needs to talk to. Signed-off-by: Mihai Todor <mihai.todor@contractor.redpanda.com>
1 parent c2b9017 commit 21fb30c

File tree

4 files changed

+92
-6
lines changed

4 files changed

+92
-6
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,12 @@ Changelog
33

44
All notable changes to this project will be documented in this file.
55

6+
## 4.56.0 - TBD
7+
8+
### Added
9+
10+
- Field `input_bundle_label` added to the `redpanda_migrator_bundle` output. (@mihaitodor)
11+
612
## 4.55.1 - 2025-05-19
713

814
### Added

docs/modules/components/pages/outputs/redpanda_migrator_bundle.adoc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ output:
3333
redpanda_migrator: {} # No default (required)
3434
schema_registry: {} # No default (required)
3535
translate_schema_ids: false
36+
input_bundle_label: ""
3637
```
3738
3839
All-in-one output which writes messages and schemas to a Kafka or Redpanda cluster. This output is meant to be used
@@ -67,4 +68,13 @@ when it already contains some schemas which differ from the ones being migrated.
6768
6869
*Default*: `false`
6970
71+
=== `input_bundle_label`
72+
73+
Specify the redpanda_migrator_bundle input label if one is assigned to it.
74+
75+
76+
*Type*: `string`
77+
78+
*Default*: `""`
79+
7080

internal/impl/kafka/input_redpanda_migrator_bundle.tmpl.yaml

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ mapping: |
4646
4747
let labelPrefix = @label.not_empty().or("redpanda_migrator_bundle")
4848
49-
let redpandaMigrator = this.redpanda_migrator.assign({"output_resource": "%s_redpanda_migrator_output".format($labelPrefix)})
49+
let redpandaMigrator = this.redpanda_migrator
5050
5151
let redpandaMigratorOffsets = this.redpanda_migrator.with("seed_brokers", "topics", "regexp_topics", "consumer_group", "topic_lag_refresh_period", "client_id", "rack_id", "tls", "sasl")
5252
@@ -155,7 +155,6 @@ tests:
155155
seed_brokers: [ "127.0.0.1:9092" ]
156156
topics: [ "foobar" ]
157157
consumer_group: "migrator"
158-
output_resource: redpanda_migrator_bundle_redpanda_migrator_output
159158
processors:
160159
- mapping: meta input_label = "redpanda_migrator_input"
161160
- label: redpanda_migrator_bundle_redpanda_migrator_offsets_input
@@ -199,7 +198,6 @@ tests:
199198
seed_brokers: [ "127.0.0.1:9092" ]
200199
topics: [ "foobar" ]
201200
consumer_group: "migrator"
202-
output_resource: redpanda_migrator_bundle_redpanda_migrator_output
203201
processors:
204202
- mapping: meta input_label = "redpanda_migrator_input"
205203
- label: redpanda_migrator_bundle_redpanda_migrator_offsets_input
@@ -225,7 +223,6 @@ tests:
225223
seed_brokers: [ "127.0.0.1:9092" ]
226224
topics: [ "foobar" ]
227225
consumer_group: "migrator"
228-
output_resource: redpanda_migrator_bundle_redpanda_migrator_output
229226
processors:
230227
- mapping: meta input_label = "redpanda_migrator_input"
231228
- label: redpanda_migrator_bundle_redpanda_migrator_offsets_input

internal/impl/kafka/output_redpanda_migrator_bundle.tmpl.yaml

Lines changed: 75 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,18 @@ fields:
4444
Allow the target Schema Registry instance to allocate different schema IDs for migrated schemas. This is useful
4545
when it already contains some schemas which differ from the ones being migrated.
4646
47+
- name: input_bundle_label
48+
type: string
49+
kind: scalar
50+
default: ""
51+
description: |
52+
Specify the redpanda_migrator_bundle input label if one is assigned to it.
53+
4754
mapping: |
4855
#!blobl
4956
5057
let labelPrefix = @label.not_empty().or("redpanda_migrator_bundle")
58+
let inputBundleLabel = this.input_bundle_label.not_empty().or("redpanda_migrator_bundle")
5159
5260
if this.translate_schema_ids == true && this.schema_registry.length() == 0 {
5361
root = throw("Schema ID translation requires schema_registry to be configured")
@@ -75,7 +83,7 @@ mapping: |
7583
]
7684
},
7785
"translate_schema_ids": this.translate_schema_ids.or(false),
78-
"input_resource": "%s_redpanda_migrator_input".format($labelPrefix)
86+
"input_resource": "%s_redpanda_migrator_input".format($inputBundleLabel)
7987
}
8088
)
8189
@@ -100,7 +108,7 @@ mapping: |
100108
this.schema_registry.assign({
101109
"subject": "${! @schema_registry_subject }",
102110
"max_in_flight": $srMaxInFlight,
103-
"input_resource": "%s_schema_registry_input".format($labelPrefix),
111+
"input_resource": "%s_schema_registry_input".format($inputBundleLabel),
104112
"translate_ids": this.translate_schema_ids.or(false)
105113
})
106114
}
@@ -225,6 +233,71 @@ tests:
225233
- output:
226234
reject: ${! @fallback_error }
227235

236+
- name: Migrate messages, offsets and schemas when setting a label on the bundle input
237+
config:
238+
redpanda_migrator:
239+
seed_brokers: [ "127.0.0.1:9092" ]
240+
topic_prefix: dest.
241+
max_in_flight: 1
242+
schema_registry:
243+
url: http://localhost:8081
244+
max_in_flight: 1
245+
input_bundle_label: source
246+
247+
expected:
248+
switch:
249+
cases:
250+
- check: metadata("input_label") == "redpanda_migrator_input"
251+
output:
252+
label: redpanda_migrator_bundle_redpanda_migrator_output
253+
redpanda_migrator:
254+
key: ${! metadata("kafka_key") }
255+
max_in_flight: 1
256+
partition: ${! metadata("kafka_partition").or(throw("missing kafka_partition metadata")) }
257+
partitioner: manual
258+
seed_brokers:
259+
- 127.0.0.1:9092
260+
timestamp_ms: ${! metadata("kafka_timestamp_ms").or(timestamp_unix_milli()) }
261+
topic: ${! metadata("kafka_topic").or(throw("missing kafka_topic metadata")) }
262+
topic_prefix: "dest."
263+
metadata:
264+
include_patterns:
265+
- ^(?:[^k].*|k[^a].*|ka[^f].*|kaf[^k].*|kafk[^a].*|kafka[^_].*)
266+
translate_schema_ids: false
267+
input_resource: source_redpanda_migrator_input
268+
schema_registry_output_resource: redpanda_migrator_bundle_schema_registry_output
269+
processors:
270+
- mapping: |
271+
meta input_label = deleted()
272+
- check: metadata("input_label") == "redpanda_migrator_offsets_input"
273+
output:
274+
label: redpanda_migrator_bundle_redpanda_migrator_offsets_output
275+
redpanda_migrator_offsets:
276+
seed_brokers:
277+
- 127.0.0.1:9092
278+
offset_topic_prefix: "dest."
279+
- check: metadata("input_label") == "schema_registry_input"
280+
output:
281+
fallback:
282+
- label: redpanda_migrator_bundle_schema_registry_output
283+
schema_registry:
284+
subject: ${! @schema_registry_subject }
285+
url: http://localhost:8081
286+
max_in_flight: 1
287+
translate_ids: false
288+
input_resource: source_schema_registry_input
289+
- switch:
290+
cases:
291+
- check: '@fallback_error == "request returned status: 422"'
292+
output:
293+
drop: {}
294+
processors:
295+
- log:
296+
message: |
297+
Subject '${! @schema_registry_subject }' version ${! @schema_registry_version } already has schema: ${! content() }
298+
- output:
299+
reject: ${! @fallback_error }
300+
228301
- name: Migrate messages, offsets and schemas with schema ID translation
229302
config:
230303
redpanda_migrator:

0 commit comments

Comments
 (0)