Skip to content

Commit 796b3d6

Browse files
authored
Support producer serializer config in Python kafka connector (#357)
1 parent 7760d41 commit 796b3d6

File tree

4 files changed

+200
-11
lines changed

4 files changed

+200
-11
lines changed

langstream-e2e-tests/src/test/resources/apps/python-processor/pipeline.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,16 @@ name: "Exclamation processor"
2121
topics:
2222
- name: TEST_TOPIC_0
2323
creation-mode: create-if-not-exists
24+
schema:
25+
type: string
26+
keySchema:
27+
type: string
2428
- name: TEST_TOPIC_1
2529
creation-mode: create-if-not-exists
30+
schema:
31+
type: string
32+
keySchema:
33+
type: string
2634
pipeline:
2735
- name: "Process using Python"
2836
id: "test-python-processor"

langstream-runtime/langstream-runtime-impl/src/main/python/langstream_runtime/kafka_connection.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,29 @@
2626
from langstream import Record, CommitCallback, SimpleRecord
2727
from .kafka_serialization import (
2828
STRING_SERIALIZER,
29-
DOUBLE_SERIALIZER,
30-
LONG_SERIALIZER,
3129
BOOLEAN_SERIALIZER,
30+
SHORT_SERIALIZER,
31+
INTEGER_SERIALIZER,
32+
LONG_SERIALIZER,
33+
FLOAT_SERIALIZER,
34+
DOUBLE_SERIALIZER,
35+
BYTEARRAY_SERIALIZER,
3236
)
3337
from .topic_connector import TopicConsumer, TopicProducer
3438

3539
STRING_DESERIALIZER = StringDeserializer()
3640

41+
SERIALIZERS = {
42+
"org.apache.kafka.common.serialization.StringSerializer": STRING_SERIALIZER,
43+
"org.apache.kafka.common.serialization.BooleanSerializer": BOOLEAN_SERIALIZER,
44+
"org.apache.kafka.common.serialization.ShortSerializer": SHORT_SERIALIZER,
45+
"org.apache.kafka.common.serialization.IntegerSerializer": INTEGER_SERIALIZER,
46+
"org.apache.kafka.common.serialization.LongSerializer": LONG_SERIALIZER,
47+
"org.apache.kafka.common.serialization.FloatSerializer": FLOAT_SERIALIZER,
48+
"org.apache.kafka.common.serialization.DoubleSerializer": DOUBLE_SERIALIZER,
49+
"org.apache.kafka.common.serialization.ByteArraySerializer": BYTEARRAY_SERIALIZER,
50+
}
51+
3752

3853
def apply_default_configuration(streaming_cluster, configs):
3954
if "admin" in streaming_cluster["configuration"]:
@@ -308,8 +323,8 @@ class KafkaTopicProducer(TopicProducer):
308323
def __init__(self, configs):
309324
self.configs = configs.copy()
310325
self.topic = self.configs.pop("topic")
311-
self.key_serializer = self.configs.pop("key.serializer")
312-
self.value_serializer = self.configs.pop("value.serializer")
326+
self.key_serializer = SERIALIZERS[self.configs.pop("key.serializer")]
327+
self.value_serializer = SERIALIZERS[self.configs.pop("value.serializer")]
313328
self.producer: Optional[Producer] = None
314329
self.commit_callback: Optional[CommitCallback] = None
315330
self.delivery_failure: Optional[Exception] = None
@@ -342,8 +357,8 @@ def write(self, records: List[Record]):
342357
)
343358
self.producer.produce(
344359
self.topic,
345-
value=STRING_SERIALIZER(record.value()),
346-
key=STRING_SERIALIZER(record.key()),
360+
value=self.value_serializer(record.value()),
361+
key=self.key_serializer(record.key()),
347362
headers=headers,
348363
on_delivery=self.on_delivery,
349364
)

langstream-runtime/langstream-runtime-impl/src/main/python/langstream_runtime/kafka_serialization.py

Lines changed: 115 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,46 @@ def __call__(self, obj, ctx=None):
6060
return b"\x01" if obj else b"\x00"
6161

6262

63+
class ShortSerializer(Serializer):
64+
"""
65+
Serializes int to int16 bytes.
66+
67+
See Also:
68+
`ShortSerializer Javadoc <https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java>`_
69+
""" # noqa: E501
70+
71+
def __call__(self, obj, ctx=None):
72+
"""
73+
Serializes int as int16 bytes.
74+
75+
Args:
76+
obj (object): object to be serialized
77+
78+
ctx (SerializationContext): Metadata pertaining to the serialization
79+
operation
80+
81+
Note:
82+
None objects are represented as Kafka Null.
83+
84+
Raises:
85+
SerializerError if an error occurs during serialization
86+
87+
Returns:
88+
int16 bytes if obj is not None, else None
89+
"""
90+
91+
if obj is None:
92+
return None
93+
94+
try:
95+
return _struct.pack(">h", obj)
96+
except _struct.error as e:
97+
raise SerializationError(str(e))
98+
99+
63100
class LongSerializer(Serializer):
64101
"""
65-
Serializes int to int32 bytes.
102+
Serializes int to int64 bytes.
66103
67104
See Also:
68105
`LongSerializer Javadoc <https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/serialization/LongSerializer.java>`_
@@ -97,8 +134,83 @@ def __call__(self, obj, ctx=None):
97134
raise SerializationError(str(e))
98135

99136

137+
class FloatSerializer(Serializer):
138+
"""
139+
Serializes float to IEEE 754 binary32.
140+
141+
See Also:
142+
`FloatSerializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/FloatSerializer.html>`_
143+
144+
""" # noqa: E501
145+
146+
def __call__(self, obj, ctx=None):
147+
"""
148+
Args:
149+
obj (object): object to be serialized
150+
151+
ctx (SerializationContext): Metadata pertaining to the serialization
152+
operation
153+
154+
Note:
155+
None objects are represented as Kafka Null.
156+
157+
Raises:
158+
SerializerError if an error occurs during serialization.
159+
160+
Returns:
161+
IEEE 764 binary32 bytes if obj is not None, otherwise None
162+
"""
163+
164+
if obj is None:
165+
return None
166+
167+
try:
168+
return _struct.pack(">f", obj)
169+
except _struct.error as e:
170+
raise SerializationError(str(e))
171+
172+
173+
class ByteArraySerializer(Serializer):
174+
"""
175+
Serializes bytes.
176+
177+
See Also:
178+
`ByteArraySerializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/ByteArraySerializer.html>`_
179+
180+
""" # noqa: E501
181+
182+
def __call__(self, obj, ctx=None):
183+
"""
184+
Args:
185+
obj (object): object to be serialized
186+
187+
ctx (SerializationContext): Metadata pertaining to the serialization
188+
operation
189+
190+
Note:
191+
None objects are represented as Kafka Null.
192+
193+
Raises:
194+
SerializerError if an error occurs during serialization.
195+
196+
Returns:
197+
the bytes
198+
"""
199+
200+
if obj is None:
201+
return None
202+
203+
if not isinstance(obj, bytes):
204+
raise SerializationError(f"ByteArraySerializer cannot serialize {obj}")
205+
206+
return obj
207+
208+
100209
STRING_SERIALIZER = StringSerializer()
101-
DOUBLE_SERIALIZER = DoubleSerializer()
210+
BOOLEAN_SERIALIZER = BooleanSerializer()
211+
SHORT_SERIALIZER = ShortSerializer()
102212
INTEGER_SERIALIZER = IntegerSerializer()
103213
LONG_SERIALIZER = LongSerializer()
104-
BOOLEAN_SERIALIZER = BooleanSerializer()
214+
FLOAT_SERIALIZER = FloatSerializer()
215+
DOUBLE_SERIALIZER = DoubleSerializer()
216+
BYTEARRAY_SERIALIZER = ByteArraySerializer()

langstream-runtime/langstream-runtime-impl/src/main/python/langstream_runtime/tests/test_kafka_connection.py

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ def test_kafka_topic_connection():
8282
agentId: testAgentId
8383
configuration:
8484
className: langstream_runtime.tests.test_kafka_connection.TestSuccessProcessor
85-
""" # noqa
85+
""" # noqa: E501
8686

8787
config = yaml.safe_load(config_yaml)
8888

@@ -241,7 +241,7 @@ def test_kafka_dlq():
241241
errorHandlerConfiguration:
242242
retries: 5
243243
onFailure: dead-letter
244-
""" # noqa
244+
""" # noqa: E501
245245

246246
config = yaml.safe_load(config_yaml)
247247

@@ -293,6 +293,60 @@ def test_producer_error():
293293
sink.write([SimpleRecord("will fail")])
294294

295295

296+
def test_producer_serializers():
297+
with KafkaContainer(image=KAFKA_IMAGE) as container:
298+
bootstrap_server = container.get_bootstrap_server()
299+
300+
consumer = Consumer(
301+
{
302+
"bootstrap.servers": bootstrap_server,
303+
"group.id": "foo",
304+
"auto.offset.reset": "earliest",
305+
}
306+
)
307+
consumer.subscribe([OUTPUT_TOPIC])
308+
309+
config_yaml = f"""
310+
streamingCluster:
311+
type: kafka
312+
configuration:
313+
admin:
314+
bootstrap.servers: {bootstrap_server}
315+
"""
316+
317+
config = yaml.safe_load(config_yaml)
318+
319+
for serializer, record_value, message_value in [
320+
("StringSerializer", "test", b"test"),
321+
("BooleanSerializer", True, b"\x01"),
322+
("ShortSerializer", 42, b"\x00\x2A"),
323+
("IntegerSerializer", 42, b"\x00\x00\x00\x2A"),
324+
("LongSerializer", 42, b"\x00\x00\x00\x00\x00\x00\x00\x2A"),
325+
("FloatSerializer", 42.0, b"\x42\x28\x00\x00"),
326+
("DoubleSerializer", 42.0, b"\x40\x45\x00\x00\x00\x00\x00\x00"),
327+
("ByteArraySerializer", b"test", b"test"),
328+
]:
329+
sink = kafka_connection.create_topic_producer(
330+
"id",
331+
config["streamingCluster"],
332+
{
333+
"topic": OUTPUT_TOPIC,
334+
"key.serializer": "org.apache.kafka.common.serialization."
335+
+ serializer,
336+
"value.serializer": "org.apache.kafka.common.serialization."
337+
+ serializer,
338+
},
339+
)
340+
sink.start()
341+
sink.write([SimpleRecord(record_value, key=record_value)])
342+
343+
message = consumer.poll(5)
344+
assert message.value() == message_value
345+
assert message.key() == message_value
346+
347+
sink.close()
348+
349+
296350
class TestSuccessProcessor(SingleRecordProcessor):
297351
def process_record(self, record: Record) -> List[Record]:
298352
headers = record.headers().copy()

0 commit comments

Comments
 (0)