Skip to content

Commit 609c24a

Browse files
authored
Protobuf SerDes support (#402)
1 parent fa9fd56 commit 609c24a

File tree

7 files changed

+248
-3
lines changed

7 files changed

+248
-3
lines changed

docs/advanced/serialization.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ Quix Streams supports multiple serialization formats to exchange data between Ka
88
- `double`
99
- `json`
1010
- `avro`
11+
- `protobuf`
1112

1213
The serialization settings are defined per-topic using these parameters of `Application.topic()` function:
1314

@@ -103,3 +104,29 @@ app = Application(broker_address='localhost:9092', consumer_group='consumer')
103104
input_topic = app.topic('input', value_deserializer=AvroDeserializer(schema=MY_SCHEMA))
104105
output_topic = app.topic('output', value_serializer=AvroSerializer(schema=MY_SCHEMA))
105106
```
107+
108+
## Protobuf
109+
Protocol Buffers are language-neutral, platform-neutral extensible mechanisms for serializing structured data.
110+
111+
You can learn more about the Protocol buffers format [here](https://protobuf.dev/)
112+
The Protobuf serializer and deserializer need to be passed explicitly.
113+
114+
In the current version, the schema must be provided manually.
115+
116+
> ***WARNING***: The protobuf serializer and deserializer requires the protobuf library.
117+
> You can install quixstreams with the necessary dependencies using
118+
> `pip install quixstreams[protobuf]`
119+
120+
```python
121+
from quixstreams import Application
122+
from quixstreams.models.serialize.protobuf import ProtobufSerializer, ProtobufDeserializer
123+
124+
from my_input_models_pb2 import InputProto
125+
from my_output_models_pb2 import OutputProto
126+
127+
app = Application(broker_address='localhost:9092', consumer_group='consumer')
128+
input_topic = app.topic('input', value_deserializer=ProtobufDeserializer(msg_type=InputProto))
129+
output_topic = app.topic('output', value_serializer=ProtobufSerializer(msg_type=OutputProto))
130+
```
131+
132+
By default the protobuf deserializer will deserialize the message to a python dictionary. Doing it has a big performance impact. You can disable this behavior by initializing the deserializer with `to_dict` set to `False`. The protobuf message object will then be used as the message value limiting the available StreamingDataframe API.

pyproject.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,12 @@ Homepage = "https://github.com/quixio/quix-streams"
2828

2929
[project.optional-dependencies]
3030
all = [
31-
"fastavro>=1.8,<2.0"
31+
"fastavro>=1.8,<2.0",
32+
"protobuf>=5.27.2,<6.0"
3233
]
3334

3435
avro = ["fastavro>=1.8,<2.0"]
36+
protobuf = ["protobuf>=5.27.2,<6.0"]
3537

3638
[tool.setuptools.packages.find]
3739
include = ["quixstreams*"]
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
from typing import Union, Mapping, Iterable, Dict
2+
3+
from .base import Serializer, Deserializer, SerializationContext
4+
from .exceptions import SerializationError
5+
6+
from google.protobuf.message import Message, DecodeError, EncodeError
7+
from google.protobuf.json_format import MessageToDict, ParseDict, ParseError
8+
9+
__all__ = ("ProtobufSerializer", "ProtobufDeserializer")
10+
11+
12+
class ProtobufSerializer(Serializer):
13+
def __init__(
14+
self,
15+
msg_type: Message,
16+
deterministic: bool = False,
17+
ignore_unknown_fields: bool = False,
18+
):
19+
"""
20+
Serializer that returns data in protobuf format.
21+
22+
Serialisation from a python dictionary can have a significant performance impact. An alternative is to pass the serializer an object of the `msg_type` class.
23+
24+
:param msg_type: protobuf message class.
25+
:param deterministic: If true, requests deterministic serialization of the protobuf, with predictable ordering of map keys
26+
Default - `False`
27+
:param ignore_unknown_fields: If True, do not raise errors for unknown fields.
28+
Default - `False`
29+
"""
30+
super().__init__()
31+
self._msg_type = msg_type
32+
33+
self._deterministic = deterministic
34+
self._ignore_unknown_fields = ignore_unknown_fields
35+
36+
def __call__(
37+
self, value: Union[Dict, Message], ctx: SerializationContext
38+
) -> Union[str, bytes]:
39+
40+
try:
41+
if isinstance(value, self._msg_type):
42+
return value.SerializeToString(deterministic=self._deterministic)
43+
44+
msg = self._msg_type()
45+
return ParseDict(
46+
value, msg, ignore_unknown_fields=self._ignore_unknown_fields
47+
).SerializeToString(deterministic=self._deterministic)
48+
except (EncodeError, ParseError) as exc:
49+
raise SerializationError(str(exc)) from exc
50+
51+
52+
class ProtobufDeserializer(Deserializer):
53+
def __init__(
54+
self,
55+
msg_type: Message,
56+
use_integers_for_enums: bool = False,
57+
preserving_proto_field_name: bool = False,
58+
to_dict: bool = True,
59+
):
60+
"""
61+
Deserializer that parses protobuf data into a dictionary suitable for a StreamingDataframe.
62+
63+
Deserialisation to a python dictionary can have a significant performance impact. You can disable this behavior using `to_dict`, in that case the protobuf message will be used as the StreamingDataframe row value.
64+
65+
:param msg_type: protobuf message class.
66+
:param use_integers_for_enums: If true, use integers instead of enum names.
67+
Default - `False`
68+
:param preserving_proto_field_name: If True, use the original proto field names as
69+
defined in the .proto file. If False, convert the field names to
70+
lowerCamelCase.
71+
Default - `False`
72+
:param to_dict: If false, return the protobuf message instead of a dict.
73+
Default - `True`
74+
"""
75+
super().__init__()
76+
self._msg_type = msg_type
77+
self._to_dict = to_dict
78+
79+
self._use_integers_for_enums = use_integers_for_enums
80+
self._preserving_proto_field_name = preserving_proto_field_name
81+
82+
def __call__(
83+
self, value: bytes, ctx: SerializationContext
84+
) -> Union[Iterable[Mapping], Mapping, Message]:
85+
msg = self._msg_type()
86+
87+
try:
88+
msg.ParseFromString(value)
89+
except DecodeError as exc:
90+
raise SerializationError(str(exc)) from exc
91+
92+
if not self._to_dict:
93+
return msg
94+
95+
return MessageToDict(
96+
msg,
97+
always_print_fields_with_no_presence=True,
98+
use_integers_for_enums=self._use_integers_for_enums,
99+
preserving_proto_field_name=self._preserving_proto_field_name,
100+
)

tests/requirements.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ testcontainers==4.5.1; python_version >= '3.9'
33
pytest
44
requests>=2.32
55
docker>=7.1.0 # Required to use requests>=2.32
6-
fastavro>=1.8,<2.0
6+
fastavro>=1.8,<2.0
7+
protobuf>=5.27.2
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
syntax = "proto3";
2+
3+
package test;
4+
5+
enum TestEnum {
6+
A = 0;
7+
B = 1;
8+
}
9+
10+
message Test {
11+
string name = 1;
12+
int32 id = 2;
13+
TestEnum enum = 3;
14+
}

tests/test_quixstreams/test_models/protobuf/test_pb2.py

Lines changed: 46 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

tests/test_quixstreams/test_models/test_serializers.py

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,16 @@
1818
DoubleDeserializer,
1919
StringDeserializer,
2020
)
21-
from .utils import int_to_bytes, float_to_bytes
21+
from quixstreams.models.serializers.protobuf import (
22+
ProtobufSerializer,
23+
ProtobufDeserializer,
24+
)
2225

2326
from quixstreams.models.serializers.avro import AvroDeserializer, AvroSerializer
2427

28+
from .utils import int_to_bytes, float_to_bytes
29+
from .protobuf.test_pb2 import Test
30+
2531
AVRO_TEST_SCHEMA = {
2632
"type": "record",
2733
"name": "testschema",
@@ -31,6 +37,7 @@
3137
],
3238
}
3339

40+
3441
dummy_context = SerializationContext(topic="topic")
3542

3643
JSONSCHEMA_TEST_SCHEMA = {
@@ -74,6 +81,22 @@ class TestSerializers:
7481
b"\x06foo\xf6\x01",
7582
),
7683
(AvroSerializer(AVRO_TEST_SCHEMA), {"name": "foo"}, b"\x06foo\x00"),
84+
(ProtobufSerializer(Test), {}, b""),
85+
(ProtobufSerializer(Test), {"id": 3}, b"\x10\x03"),
86+
(ProtobufSerializer(Test), {"name": "foo", "id": 2}, b"\n\x03foo\x10\x02"),
87+
(ProtobufSerializer(Test), Test(name="foo", id=2), b"\n\x03foo\x10\x02"),
88+
# Both values are supported for enum
89+
(
90+
ProtobufSerializer(Test),
91+
{"name": "foo", "id": 2, "enum": "B"},
92+
b"\n\x03foo\x10\x02\x18\x01",
93+
),
94+
(
95+
ProtobufSerializer(Test),
96+
{"name": "foo", "id": 2, "enum": 1},
97+
b"\n\x03foo\x10\x02\x18\x01",
98+
),
99+
(ProtobufSerializer(Test), {"name": "foo"}, b"\n\x03foo"),
77100
],
78101
)
79102
def test_serialize_success(self, serializer: Serializer, value, expected):
@@ -103,6 +126,7 @@ def test_serialize_success(self, serializer: Serializer, value, expected):
103126
(AvroSerializer(AVRO_TEST_SCHEMA), {"foo": "foo", "id": 123}),
104127
(AvroSerializer(AVRO_TEST_SCHEMA), {"id": 123}),
105128
(AvroSerializer(AVRO_TEST_SCHEMA, strict=True), {"name": "foo"}),
129+
(ProtobufSerializer(Test), {"bar": 3}),
106130
],
107131
)
108132
def test_serialize_error(self, serializer: Serializer, value):
@@ -151,6 +175,36 @@ class TestDeserializers:
151175
b"\x06foo\x00",
152176
{"name": "foo", "id": 0},
153177
),
178+
(
179+
ProtobufDeserializer(Test),
180+
b"\n\x03foo\x10\x02",
181+
{"enum": "A", "name": "foo", "id": 2},
182+
),
183+
(
184+
ProtobufDeserializer(Test, to_dict=False),
185+
b"\n\x03foo\x10\x02",
186+
Test(name="foo", id=2),
187+
),
188+
(
189+
ProtobufDeserializer(Test, use_integers_for_enums=True),
190+
b"\n\x03foo\x10\x02",
191+
{"enum": 0, "name": "foo", "id": 2},
192+
),
193+
(
194+
ProtobufDeserializer(Test),
195+
b"\n\x03foo",
196+
{
197+
"enum": "A",
198+
"name": "foo",
199+
"id": 0,
200+
},
201+
),
202+
(
203+
ProtobufDeserializer(Test),
204+
b"\x10\x03",
205+
{"enum": "A", "name": "", "id": 3},
206+
),
207+
(ProtobufDeserializer(Test), b"", {"enum": "A", "name": "", "id": 0}),
154208
],
155209
)
156210
def test_deserialize_no_column_name_success(
@@ -176,6 +230,7 @@ def test_deserialize_no_column_name_success(
176230
b'{"id":10}',
177231
),
178232
(AvroDeserializer(AVRO_TEST_SCHEMA), b"\x26foo\x00"),
233+
(ProtobufDeserializer(Test), b"\n\x03foo\x10\x02\x13"),
179234
],
180235
)
181236
def test_deserialize_error(self, deserializer: Deserializer, value):

0 commit comments

Comments
 (0)