Skip to content

Commit 839af4a

Browse files
ovvdaniil-quix
andauthored
Avro serdes support (#407)
Avro serdes support --------- Co-authored-by: Daniil Gusev <daniil@quix.io>
1 parent e488c7b commit 839af4a

File tree

6 files changed

+205
-1
lines changed

6 files changed

+205
-1
lines changed

LICENSES/LICENSE.fastavro

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
MIT License
2+
3+
Copyright (c) 2011 Miki Tebeka
4+
5+
Permission is hereby granted, free of charge, to any person obtaining a copy
6+
of this software and associated documentation files (the "Software"), to deal
7+
in the Software without restriction, including without limitation the rights
8+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
copies of the Software, and to permit persons to whom the Software is
10+
furnished to do so, subject to the following conditions:
11+
12+
The above copyright notice and this permission notice shall be included in all
13+
copies or substantial portions of the Software.
14+
15+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
SOFTWARE.

docs/advanced/serialization.md

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ Quix Streams supports multiple serialization formats to exchange data between Ka
77
- `integer`
88
- `double`
99
- `json`
10+
- `avro`
1011

1112
The serialization settings are defined per-topic using these parameters of `Application.topic()` function:
1213

@@ -72,3 +73,33 @@ app = Application(broker_address='localhost:9092', consumer_group='consumer')
7273
input_topic = app.topic('input', value_deserializer=JSONDeserializer(schema=MY_SCHEMA))
7374
output_topic = app.topic('output', value_serializer=JSONSerializer(schema=MY_SCHEMA))
7475
```
76+
77+
## Avro
78+
Apache Avro is a row-based binary serialization format data. Avro stores the schema in JSON format alongside the data, enabling efficient processing and schema evolution.
79+
80+
You can learn more the Apache Avro format [here](https://avro.apache.org/docs/).
81+
The Avro serializer and deserializer need to be passed explicitly.
82+
83+
In the current version, the schema must be provided manually.
84+
85+
> ***WARNING***: Avro serializer and deserializer require the `fastavro` library.
86+
> You can install quixstreams with the necessary dependencies using
87+
> `pip install quixstreams[avro]`
88+
89+
```python
90+
from quixstreams import Application
91+
from quixstreams.models.serialize.avro import AvroSerializer, AvroDeserializer
92+
93+
MY_SCHEMA = {
94+
"type": "record",
95+
"name": "testschema",
96+
"fields": [
97+
{"name": "name", "type": "string"},
98+
{"name": "id", "type": "int", "default": 0},
99+
],
100+
}
101+
102+
app = Application(broker_address='localhost:9092', consumer_group='consumer')
103+
input_topic = app.topic('input', value_deserializer=AvroDeserializer(schema=MY_SCHEMA))
104+
output_topic = app.topic('output', value_serializer=AvroSerializer(schema=MY_SCHEMA))
105+
```

pyproject.toml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
44

55
[project]
66
name = "quixstreams"
7-
dynamic = ["version", "dependencies", "optional-dependencies"]
7+
dynamic = ["version", "dependencies"]
88
description = "Python library for building stream processing applications with Apache Kafka"
99
license = {file = "LICENSE"}
1010
readme = "README.md"
@@ -26,6 +26,13 @@ classifiers = [
2626
[project.urls]
2727
Homepage = "https://github.com/quixio/quix-streams"
2828

29+
[project.optional-dependencies]
30+
all = [
31+
"fastavro>=1.8,<2.0"
32+
]
33+
34+
avro = ["fastavro>=1.8,<2.0"]
35+
2936
[tool.setuptools.packages.find]
3037
include = ["quixstreams*"]
3138
exclude = ["tests*", "docs*", "examples*"]
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
from typing import Union, Mapping, Optional, Any, Iterable
2+
3+
from io import BytesIO
4+
5+
from fastavro import schemaless_reader, schemaless_writer, parse_schema
6+
from fastavro.types import Schema
7+
8+
from .base import Serializer, Deserializer, SerializationContext
9+
from .exceptions import SerializationError
10+
11+
__all__ = ("AvroSerializer", "AvroDeserializer")
12+
13+
14+
class AvroSerializer(Serializer):
15+
def __init__(
16+
self,
17+
schema: Schema,
18+
strict: bool = False,
19+
strict_allow_default: bool = False,
20+
disable_tuple_notation: bool = False,
21+
):
22+
"""
23+
Serializer that returns data in Avro format.
24+
25+
For more information see fastavro [schemaless_writer](https://fastavro.readthedocs.io/en/latest/writer.html#fastavro._write_py.schemaless_writer) method.
26+
27+
:param schema: The avro schema.
28+
:param strict: If set to True, an error will be raised if records do not contain exactly the same fields that the schema states.
29+
Default - `False`
30+
:param strict_allow_default: If set to True, an error will be raised if records do not contain exactly the same fields that the schema states unless it is a missing field that has a default value in the schema.
31+
Default - `False`
32+
:param disable_tuple_notation: If set to True, tuples will not be treated as a special case. Therefore, using a tuple to indicate the type of a record will not work.
33+
Default - `False`
34+
"""
35+
self._schema = parse_schema(schema)
36+
self._strict = strict
37+
self._strict_allow_default = strict_allow_default
38+
self._disable_tuple_notation = disable_tuple_notation
39+
40+
def __call__(self, value: Any, ctx: SerializationContext) -> bytes:
41+
data = BytesIO()
42+
43+
with BytesIO() as data:
44+
try:
45+
schemaless_writer(
46+
data,
47+
self._schema,
48+
value,
49+
strict=self._strict,
50+
strict_allow_default=self._strict_allow_default,
51+
disable_tuple_notation=self._disable_tuple_notation,
52+
)
53+
except ValueError as exc:
54+
raise SerializationError(str(exc)) from exc
55+
56+
return data.getvalue()
57+
58+
59+
class AvroDeserializer(Deserializer):
60+
def __init__(
61+
self,
62+
schema: Schema,
63+
reader_schema: Optional[Schema] = None,
64+
return_record_name: bool = False,
65+
return_record_name_override: bool = False,
66+
return_named_type: bool = False,
67+
return_named_type_override: bool = False,
68+
handle_unicode_errors: str = "strict",
69+
):
70+
"""
71+
Deserializer that parses data from Avro.
72+
73+
For more information see fastavro [schemaless_reader](https://fastavro.readthedocs.io/en/latest/reader.html#fastavro._read_py.schemaless_reader) method.
74+
75+
:param schema: The Avro schema.
76+
:param reader_schema: If the schema has changed since being written then the new schema can be given to allow for schema migration.
77+
Default - `None`
78+
:param return_record_name: If true, when reading a union of records, the result will be a tuple where the first value is the name of the record and the second value is the record itself.
79+
Default - `False`
80+
:param return_record_name_override: If true, this will modify the behavior of return_record_name so that the record name is only returned for unions where there is more than one record. For unions that only have one record, this option will make it so that the record is returned by itself, not a tuple with the name.
81+
Default - `False`
82+
:param return_named_type: If true, when reading a union of named types, the result will be a tuple where the first value is the name of the type and the second value is the record itself NOTE: Using this option will ignore return_record_name and return_record_name_override.
83+
Default - `False`
84+
:param return_named_type_override: If true, this will modify the behavior of return_named_type so that the named type is only returned for unions where there is more than one named type. For unions that only have one named type, this option will make it so that the named type is returned by itself, not a tuple with the name.
85+
Default - `False`
86+
:param handle_unicode_errors: Should be set to a valid string that can be used in the errors argument of the string decode() function.
87+
Default - `"strict"`
88+
"""
89+
super().__init__()
90+
self._schema = parse_schema(schema)
91+
self._reader_schema = parse_schema(reader_schema) if reader_schema else None
92+
self._return_record_name = return_record_name
93+
self._return_record_name_override = return_record_name_override
94+
self._return_named_type = return_named_type
95+
self._return_named_type_override = return_named_type_override
96+
self._handle_unicode_errors = handle_unicode_errors
97+
98+
def __call__(
99+
self, value: bytes, ctx: SerializationContext
100+
) -> Union[Iterable[Mapping], Mapping]:
101+
try:
102+
return schemaless_reader(
103+
BytesIO(value),
104+
self._schema,
105+
reader_schema=self._reader_schema,
106+
return_record_name=self._return_record_name,
107+
return_record_name_override=self._return_record_name_override,
108+
return_named_type=self._return_named_type,
109+
return_named_type_override=self._return_named_type_override,
110+
handle_unicode_errors=self._handle_unicode_errors,
111+
)
112+
except EOFError as exc:
113+
raise SerializationError(str(exc)) from exc

tests/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@ testcontainers==4.5.1; python_version >= '3.9'
33
pytest
44
requests>=2.32
55
docker>=7.1.0 # Required to use requests>=2.32
6+
fastavro>=1.8,<2.0

tests/test_quixstreams/test_models/test_serializers.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,17 @@
2020
)
2121
from .utils import int_to_bytes, float_to_bytes
2222

23+
from quixstreams.models.serializers.avro import AvroDeserializer, AvroSerializer
24+
25+
AVRO_TEST_SCHEMA = {
26+
"type": "record",
27+
"name": "testschema",
28+
"fields": [
29+
{"name": "name", "type": "string"},
30+
{"name": "id", "type": "int", "default": 0},
31+
],
32+
}
33+
2334
dummy_context = SerializationContext(topic="topic")
2435

2536
JSONSCHEMA_TEST_SCHEMA = {
@@ -57,6 +68,12 @@ class TestSerializers:
5768
{"id": 10, "name": "foo"},
5869
b'{"id":10,"name":"foo"}',
5970
),
71+
(
72+
AvroSerializer(AVRO_TEST_SCHEMA),
73+
{"name": "foo", "id": 123},
74+
b"\x06foo\xf6\x01",
75+
),
76+
(AvroSerializer(AVRO_TEST_SCHEMA), {"name": "foo"}, b"\x06foo\x00"),
6077
],
6178
)
6279
def test_serialize_success(self, serializer: Serializer, value, expected):
@@ -83,6 +100,9 @@ def test_serialize_success(self, serializer: Serializer, value, expected):
83100
),
84101
{"id": 10},
85102
),
103+
(AvroSerializer(AVRO_TEST_SCHEMA), {"foo": "foo", "id": 123}),
104+
(AvroSerializer(AVRO_TEST_SCHEMA), {"id": 123}),
105+
(AvroSerializer(AVRO_TEST_SCHEMA, strict=True), {"name": "foo"}),
86106
],
87107
)
88108
def test_serialize_error(self, serializer: Serializer, value):
@@ -121,6 +141,16 @@ class TestDeserializers:
121141
b'{"id":10,"name":"foo"}',
122142
{"id": 10, "name": "foo"},
123143
),
144+
(
145+
AvroDeserializer(AVRO_TEST_SCHEMA),
146+
b"\x06foo\xf6\x01",
147+
{"name": "foo", "id": 123},
148+
),
149+
(
150+
AvroDeserializer(AVRO_TEST_SCHEMA),
151+
b"\x06foo\x00",
152+
{"name": "foo", "id": 0},
153+
),
124154
],
125155
)
126156
def test_deserialize_no_column_name_success(
@@ -145,6 +175,7 @@ def test_deserialize_no_column_name_success(
145175
),
146176
b'{"id":10}',
147177
),
178+
(AvroDeserializer(AVRO_TEST_SCHEMA), b"\x26foo\x00"),
148179
],
149180
)
150181
def test_deserialize_error(self, deserializer: Deserializer, value):

0 commit comments

Comments
 (0)