Skip to content

Commit e488c7b

Browse files
authored
Support jsonschema validation in json serdes (#414)
1 parent 6181921 commit e488c7b

File tree

5 files changed

+160
-1
lines changed

5 files changed

+160
-1
lines changed

LICENSES/LICENSE.jsonschema

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
Copyright (c) 2013 Julian Berman
2+
3+
Permission is hereby granted, free of charge, to any person obtaining a copy
4+
of this software and associated documentation files (the "Software"), to deal
5+
in the Software without restriction, including without limitation the rights
6+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7+
copies of the Software, and to permit persons to whom the Software is
8+
furnished to do so, subject to the following conditions:
9+
10+
The above copyright notice and this permission notice shall be included in
11+
all copies or substantial portions of the Software.
12+
13+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19+
THE SOFTWARE.

docs/advanced/serialization.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,27 @@ output_topic = app.topic('output', value_serializer=JSONSerializer())
4848
```
4949

5050
You can find all available serializers in `quixstreams.models.serializers` module.
51+
52+
## Jsonschema support
53+
54+
The json serializer and deserializer support validation of the data against a jsonschema.
55+
56+
```python
57+
from jsonschema import Draft202012Validator
58+
59+
from quixstreams import Application
60+
from quixstreams.models import JSONDeserializer, JSONSerializer
61+
62+
MY_SCHEMA = {
63+
"type": "object",
64+
"properties": {
65+
"name": {"type": "string"},
66+
"id": {"type": "number"},
67+
},
68+
"required": ["id"],
69+
}
70+
71+
app = Application(broker_address='localhost:9092', consumer_group='consumer')
72+
input_topic = app.topic('input', value_deserializer=JSONDeserializer(schema=MY_SCHEMA))
73+
output_topic = app.topic('output', value_serializer=JSONSerializer(schema=MY_SCHEMA))
74+
```

quixstreams/models/serializers/json.py

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from typing import Callable, Union, Mapping, Optional, Any, Iterable
22

3+
from jsonschema import ValidationError, SchemaError, Validator, Draft202012Validator
4+
35
from quixstreams.utils.json import (
46
dumps as default_dumps,
57
loads as default_loads,
@@ -14,18 +16,39 @@ class JSONSerializer(Serializer):
1416
def __init__(
1517
self,
1618
dumps: Callable[[Any], Union[str, bytes]] = default_dumps,
19+
schema: Optional[Mapping] = None,
20+
validator: Optional[Validator] = None,
1721
):
1822
"""
1923
Serializer that returns data in json format.
2024
:param dumps: a function to serialize objects to json.
2125
Default - :py:func:`quixstreams.utils.json.dumps`
26+
:param schema: A schema used to validate the data using [`jsonschema.Draft202012Validator`](https://python-jsonschema.readthedocs.io/en/stable/api/jsonschema/validators/#jsonschema.validators.Draft202012Validator).
27+
Default - `None`
28+
:param validator: A jsonschema validator used to validate the data. Takes precedences over the schema.
29+
Default - `None`
2230
"""
31+
32+
if schema and not validator:
33+
validator = Draft202012Validator(schema)
34+
35+
super().__init__()
2336
self._dumps = dumps
37+
self._validator = validator
38+
39+
if self._validator:
40+
self._validator.check_schema(self._validator.schema)
2441

2542
def __call__(self, value: Any, ctx: SerializationContext) -> Union[str, bytes]:
2643
return self._to_json(value)
2744

2845
def _to_json(self, value: Any):
46+
if self._validator:
47+
try:
48+
self._validator.validate(value)
49+
except ValidationError as exc:
50+
raise SerializationError(str(exc)) from exc
51+
2952
try:
3053
return self._dumps(value)
3154
except (ValueError, TypeError) as exc:
@@ -36,20 +59,42 @@ class JSONDeserializer(Deserializer):
3659
def __init__(
3760
self,
3861
loads: Callable[[Union[bytes, bytearray]], Any] = default_loads,
62+
schema: Optional[Mapping] = None,
63+
validator: Optional[Validator] = None,
3964
):
4065
"""
4166
Deserializer that parses data from JSON
4267
4368
:param loads: function to parse json from bytes.
4469
Default - :py:func:`quixstreams.utils.json.loads`.
70+
:param schema: A schema used to validate the data using [`jsonschema.Draft202012Validator`](https://python-jsonschema.readthedocs.io/en/stable/api/jsonschema/validators/#jsonschema.validators.Draft202012Validator).
71+
Default - `None`
72+
:param validator: A jsonschema validator used to validate the data. Takes precedences over the schema.
73+
Default - `None`
4574
"""
75+
76+
if schema and not validator:
77+
validator = Draft202012Validator(schema)
78+
4679
super().__init__()
4780
self._loads = loads
81+
self._validator = validator
82+
83+
if self._validator:
84+
self._validator.check_schema(self._validator.schema)
4885

4986
def __call__(
5087
self, value: bytes, ctx: SerializationContext
5188
) -> Union[Iterable[Mapping], Mapping]:
5289
try:
53-
return self._loads(value)
90+
data = self._loads(value)
5491
except (ValueError, TypeError) as exc:
5592
raise SerializationError(str(exc)) from exc
93+
94+
if self._validator:
95+
try:
96+
self._validator.validate(data)
97+
except ValidationError as exc:
98+
raise SerializationError(str(exc)) from exc
99+
100+
return data

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@ typing_extensions>=4.8
55
orjson>=3.9,<4
66
pydantic>=2.7,<2.8
77
pydantic-settings>=2.3,<2.4
8+
jsonschema>=4.0

tests/test_quixstreams/test_models/test_serializers.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import pytest
22

3+
import jsonschema
4+
35
from quixstreams.models import (
46
IntegerSerializer,
57
SerializationContext,
@@ -20,6 +22,15 @@
2022

2123
dummy_context = SerializationContext(topic="topic")
2224

25+
JSONSCHEMA_TEST_SCHEMA = {
26+
"type": "object",
27+
"properties": {
28+
"name": {"type": "string"},
29+
"id": {"type": "number"},
30+
},
31+
"required": ["name"],
32+
}
33+
2334

2435
class TestSerializers:
2536
@pytest.mark.parametrize(
@@ -34,6 +45,18 @@ class TestSerializers:
3445
(BytesSerializer(), b"abc", b"abc"),
3546
(JSONSerializer(), {"a": 123}, b'{"a":123}'),
3647
(JSONSerializer(), [1, 2, 3], b"[1,2,3]"),
48+
(
49+
JSONSerializer(schema=JSONSCHEMA_TEST_SCHEMA),
50+
{"id": 10, "name": "foo"},
51+
b'{"id":10,"name":"foo"}',
52+
),
53+
(
54+
JSONSerializer(
55+
validator=jsonschema.Draft202012Validator(JSONSCHEMA_TEST_SCHEMA)
56+
),
57+
{"id": 10, "name": "foo"},
58+
b'{"id":10,"name":"foo"}',
59+
),
3760
],
3861
)
3962
def test_serialize_success(self, serializer: Serializer, value, expected):
@@ -50,12 +73,28 @@ def test_serialize_success(self, serializer: Serializer, value, expected):
5073
(StringSerializer(), {"a": 123}),
5174
(JSONSerializer(), object()),
5275
(JSONSerializer(), complex(1, 2)),
76+
(
77+
JSONSerializer(schema=JSONSCHEMA_TEST_SCHEMA),
78+
{"id": 10},
79+
),
80+
(
81+
JSONSerializer(
82+
validator=jsonschema.Draft202012Validator(JSONSCHEMA_TEST_SCHEMA)
83+
),
84+
{"id": 10},
85+
),
5386
],
5487
)
5588
def test_serialize_error(self, serializer: Serializer, value):
5689
with pytest.raises(SerializationError):
5790
serializer(value, ctx=dummy_context)
5891

92+
def test_invalid_jsonschema(self):
93+
with pytest.raises(jsonschema.SchemaError):
94+
JSONSerializer(
95+
validator=jsonschema.Draft202012Validator({"type": "invalid"})
96+
)
97+
5998

6099
class TestDeserializers:
61100
@pytest.mark.parametrize(
@@ -70,6 +109,18 @@ class TestDeserializers:
70109
(BytesDeserializer(), b"123123", b"123123"),
71110
(JSONDeserializer(), b"123123", 123123),
72111
(JSONDeserializer(), b'{"a":"b"}', {"a": "b"}),
112+
(
113+
JSONDeserializer(schema=JSONSCHEMA_TEST_SCHEMA),
114+
b'{"id":10,"name":"foo"}',
115+
{"id": 10, "name": "foo"},
116+
),
117+
(
118+
JSONDeserializer(
119+
validator=jsonschema.Draft202012Validator(JSONSCHEMA_TEST_SCHEMA)
120+
),
121+
b'{"id":10,"name":"foo"}',
122+
{"id": 10, "name": "foo"},
123+
),
73124
],
74125
)
75126
def test_deserialize_no_column_name_success(
@@ -84,8 +135,27 @@ def test_deserialize_no_column_name_success(
84135
(IntegerDeserializer(), b'{"abc": "abc"}'),
85136
(DoubleDeserializer(), b"abc"),
86137
(JSONDeserializer(), b"{"),
138+
(
139+
JSONDeserializer(schema=JSONSCHEMA_TEST_SCHEMA),
140+
b'{"id":10}',
141+
),
142+
(
143+
JSONDeserializer(
144+
validator=jsonschema.Draft202012Validator(JSONSCHEMA_TEST_SCHEMA)
145+
),
146+
b'{"id":10}',
147+
),
87148
],
88149
)
89150
def test_deserialize_error(self, deserializer: Deserializer, value):
90151
with pytest.raises(SerializationError):
91152
deserializer(value, ctx=dummy_context)
153+
154+
def test_invalid_jsonschema(self):
155+
with pytest.raises(jsonschema.SchemaError):
156+
JSONDeserializer(
157+
validator=jsonschema.Draft202012Validator({"type": "invalid"})
158+
)
159+
160+
with pytest.raises(jsonschema.SchemaError):
161+
JSONDeserializer(schema={"type": "invalid"})

0 commit comments

Comments
 (0)