Skip to content

Commit c36071e

Browse files
authored
fix Topic.deserialize not using the correct value deserializer (#413)
1 parent 099e39f commit c36071e

File tree

3 files changed

+70
-15
lines changed

3 files changed

+70
-15
lines changed

quixstreams/models/topics/topic.py

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -274,19 +274,23 @@ def serialize(
274274

275275
def deserialize(self, message: ConfluentKafkaMessageProto):
276276
ctx = SerializationContext(topic=message.topic(), headers=message.headers())
277-
key_bytes = message.key()
278-
value_bytes = message.value()
277+
if (key := message.key()) is not None:
278+
if self._key_deserializer:
279+
key = self._key_deserializer(key, ctx=ctx)
280+
else:
281+
raise DeserializerIsNotProvidedError(
282+
f'Key deserializer is not provided for topic "{self.name}"'
283+
)
284+
if (value := message.value()) is not None:
285+
if self._value_deserializer:
286+
value = self._value_deserializer(value, ctx=ctx)
287+
else:
288+
raise DeserializerIsNotProvidedError(
289+
f'Value deserializer is not provided for topic "{self.name}"'
290+
)
279291
return KafkaMessage(
280-
key=(
281-
None
282-
if key_bytes is None
283-
else self._key_deserializer(key_bytes, ctx=ctx)
284-
),
285-
value=(
286-
None
287-
if value_bytes is None
288-
else self._value_serializer(value_bytes, ctx=ctx)
289-
),
292+
key=key,
293+
value=value,
290294
headers=message.headers(),
291295
timestamp=message.timestamp()[1],
292296
)

tests/test_quixstreams/fixtures.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -580,6 +580,7 @@ def factory(
580580
name: Optional[str] = None,
581581
partitions: int = 1,
582582
create_topic: bool = False,
583+
use_serdes_nones: bool = False,
583584
key_serializer: Optional[Union[Serializer, str]] = None,
584585
value_serializer: Optional[Union[Serializer, str]] = None,
585586
key_deserializer: Optional[Union[Deserializer, str]] = None,
@@ -596,9 +597,10 @@ def factory(
596597
"config": topic_manager.topic_config(num_partitions=partitions),
597598
"timestamp_extractor": timestamp_extractor,
598599
}
599-
topic = topic_manager.topic(
600-
name, **{k: v for k, v in topic_args.items() if v is not None}
601-
)
600+
if not use_serdes_nones:
601+
# will use the topic manager serdes defaults rather than "Nones"
602+
topic_args = {k: v for k, v in topic_args.items() if v is not None}
603+
topic = topic_manager.topic(name, **topic_args)
602604
if create_topic:
603605
topic_manager.create_all_topics()
604606
return topic

tests/test_quixstreams/test_models/test_topics/test_topics.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -502,6 +502,55 @@ def test_serialize_serializer_missing(
502502
timestamp_ms=1234567890,
503503
)
504504

505+
@pytest.mark.parametrize(
506+
"deserializers, input_kv, expected_kv",
507+
[
508+
(
509+
{"key_deserializer": "str", "value_deserializer": "json"},
510+
{"key": b"woo", "value": b'{"a":["cool","json"]}'},
511+
{"key": "woo", "value": {"a": ["cool", "json"]}},
512+
),
513+
(
514+
{"key_deserializer": None, "value_deserializer": "int"},
515+
{"key": None, "value": int_to_bytes(12345)},
516+
{"key": None, "value": 12345},
517+
),
518+
(
519+
{"key_deserializer": "str", "value_deserializer": "json"},
520+
{"key": None, "value": None},
521+
{"key": None, "value": None},
522+
),
523+
],
524+
)
525+
def test_deserialize(
526+
self, topic_manager_topic_factory, deserializers, input_kv, expected_kv
527+
):
528+
topic = topic_manager_topic_factory(**deserializers, use_serdes_nones=True)
529+
message = topic.deserialize(
530+
ConfluentKafkaMessageStub(
531+
**input_kv,
532+
headers=[("header", b"value")],
533+
timestamp=(1000, 2000),
534+
)
535+
)
536+
assert {"key": message.key, "value": message.value} == expected_kv
537+
assert message.headers == [("header", b"value")]
538+
assert message.timestamp == 2000
539+
540+
def test_deserializer_missing_deserializer(self, topic_manager_topic_factory):
541+
topic = topic_manager_topic_factory(
542+
key_deserializer=None, value_deserializer="str", use_serdes_nones=True
543+
)
544+
with pytest.raises(DeserializerIsNotProvidedError):
545+
topic.deserialize(
546+
ConfluentKafkaMessageStub(
547+
key=b"key",
548+
value=b"value",
549+
headers=[("header", b"value")],
550+
timestamp=(1000, 2000),
551+
)
552+
)
553+
505554
@pytest.mark.parametrize(
506555
"key_serializer, value_serializer, key, value",
507556
[

0 commit comments

Comments
 (0)