1
- from typing import Union , Mapping , Iterable , Dict
1
+ from typing import Dict , Iterable , Mapping , Optional , Union
2
+
3
+ from confluent_kafka .schema_registry import SchemaRegistryClient , SchemaRegistryError
4
+ from confluent_kafka .schema_registry .protobuf import (
5
+ ProtobufDeserializer as _ProtobufDeserializer ,
6
+ ProtobufSerializer as _ProtobufSerializer ,
7
+ )
8
+ from confluent_kafka .serialization import SerializationError as _SerializationError
9
+ from google .protobuf .json_format import MessageToDict , ParseDict , ParseError
10
+ from google .protobuf .message import DecodeError , EncodeError , Message
2
11
3
- from .base import Serializer , Deserializer , SerializationContext
12
+ from .base import Deserializer , SerializationContext , Serializer
4
13
from .exceptions import SerializationError
14
+ from .schema_registry import (
15
+ SchemaRegistryClientConfig ,
16
+ SchemaRegistrySerializationConfig ,
17
+ )
5
18
6
- from google .protobuf .message import Message , DecodeError , EncodeError
7
- from google .protobuf .json_format import MessageToDict , ParseDict , ParseError
8
19
9
20
__all__ = ("ProtobufSerializer" , "ProtobufDeserializer" )
10
21
@@ -15,6 +26,10 @@ def __init__(
15
26
msg_type : Message ,
16
27
deterministic : bool = False ,
17
28
ignore_unknown_fields : bool = False ,
29
+ schema_registry_client_config : Optional [SchemaRegistryClientConfig ] = None ,
30
+ schema_registry_serialization_config : Optional [
31
+ SchemaRegistrySerializationConfig
32
+ ] = None ,
18
33
):
19
34
"""
20
35
Serializer that returns data in protobuf format.
@@ -26,26 +41,68 @@ def __init__(
26
41
Default - `False`
27
42
:param ignore_unknown_fields: If True, do not raise errors for unknown fields.
28
43
Default - `False`
44
+ :param schema_registry_client_config: If provided, serialization is offloaded to Confluent's ProtobufSerializer.
45
+ Default - `None`
46
+ :param schema_registry_serialization_config: Additional configuration for Confluent's ProtobufSerializer.
47
+ Default - `None`
48
+ >***NOTE:*** `schema_registry_client_config` must also be set.
29
49
"""
30
50
super ().__init__ ()
31
51
self ._msg_type = msg_type
32
52
33
53
self ._deterministic = deterministic
34
54
self ._ignore_unknown_fields = ignore_unknown_fields
35
55
56
+ self ._schema_registry_serializer = None
57
+ if schema_registry_client_config :
58
+ client_config = schema_registry_client_config .as_dict (
59
+ plaintext_secrets = True ,
60
+ )
61
+
62
+ if schema_registry_serialization_config :
63
+ serialization_config = schema_registry_serialization_config .as_dict ()
64
+ else :
65
+ # The use.deprecated.format has been mandatory since Confluent Kafka version 1.8.2.
66
+ # https://github.com/confluentinc/confluent-kafka-python/releases/tag/v1.8.2
67
+ serialization_config = SchemaRegistrySerializationConfig ().as_dict (
68
+ include = {"use_deprecated_format" },
69
+ )
70
+
71
+ self ._schema_registry_serializer = _ProtobufSerializer (
72
+ msg_type = msg_type ,
73
+ schema_registry_client = SchemaRegistryClient (client_config ),
74
+ conf = serialization_config ,
75
+ )
76
+
36
77
def __call__ (
37
78
self , value : Union [Dict , Message ], ctx : SerializationContext
38
79
) -> Union [str , bytes ]:
80
+ if isinstance (value , self ._msg_type ):
81
+ msg = value
82
+ else :
83
+ try :
84
+ msg = ParseDict (
85
+ value ,
86
+ self ._msg_type (),
87
+ ignore_unknown_fields = self ._ignore_unknown_fields ,
88
+ )
89
+ except TypeError as exc :
90
+ raise SerializationError (
91
+ "Value to serialize must be of type "
92
+ f"`{ self ._msg_type } ` or dict, not `{ type (value )} `."
93
+ ) from exc
94
+ except ParseError as exc :
95
+ raise SerializationError (str (exc )) from exc
96
+
97
+ if self ._schema_registry_serializer is not None :
98
+ try :
99
+ return self ._schema_registry_serializer (msg , ctx )
100
+ except (SchemaRegistryError , _SerializationError ) as exc :
101
+ raise SerializationError (str (exc )) from exc
39
102
40
103
try :
41
- if isinstance (value , self ._msg_type ):
42
- return value .SerializeToString (deterministic = self ._deterministic )
43
-
44
- msg = self ._msg_type ()
45
- return ParseDict (
46
- value , msg , ignore_unknown_fields = self ._ignore_unknown_fields
47
- ).SerializeToString (deterministic = self ._deterministic )
48
- except (EncodeError , ParseError ) as exc :
104
+ return msg .SerializeToString (deterministic = self ._deterministic )
105
+ except EncodeError as exc :
49
106
raise SerializationError (str (exc )) from exc
50
107
51
108
@@ -56,6 +113,10 @@ def __init__(
56
113
use_integers_for_enums : bool = False ,
57
114
preserving_proto_field_name : bool = False ,
58
115
to_dict : bool = True ,
116
+ schema_registry_client_config : Optional [SchemaRegistryClientConfig ] = None ,
117
+ schema_registry_serialization_config : Optional [
118
+ SchemaRegistrySerializationConfig
119
+ ] = None ,
59
120
):
60
121
"""
61
122
Deserializer that parses protobuf data into a dictionary suitable for a StreamingDataframe.
@@ -71,6 +132,11 @@ def __init__(
71
132
Default - `False`
72
133
:param to_dict: If false, return the protobuf message instead of a dict.
73
134
Default - `True`
135
+ :param schema_registry_client_config: If provided, deserialization is offloaded to Confluent's ProtobufDeserializer.
136
+ Default - `None`
137
+ :param schema_registry_serialization_config: Additional configuration for Confluent's ProtobufDeserializer.
138
+ Default - `None`
139
+ >***NOTE:*** `schema_registry_client_config` must also be set.
74
140
"""
75
141
super ().__init__ ()
76
142
self ._msg_type = msg_type
@@ -79,15 +145,42 @@ def __init__(
79
145
self ._use_integers_for_enums = use_integers_for_enums
80
146
self ._preserving_proto_field_name = preserving_proto_field_name
81
147
148
+ # Confluent's ProtobufDeserializer is not utilizing the
149
+ # Schema Registry. However, we still accept a fully qualified
150
+ # SchemaRegistryClientConfig to maintain a unified API and ensure
151
+ # future compatibility in case we choose to bypass Confluent
152
+ # and interact with the Schema Registry directly.
153
+ # On the other hand, ProtobufDeserializer requires
154
+ # conf dict with a single key: `use.deprecated.format`.
155
+ self ._schema_registry_deserializer = None
156
+ if schema_registry_client_config :
157
+
158
+ # The use.deprecated.format has been mandatory since Confluent Kafka version 1.8.2.
159
+ # https://github.com/confluentinc/confluent-kafka-python/releases/tag/v1.8.2
160
+ serialization_config = (
161
+ schema_registry_serialization_config
162
+ or SchemaRegistrySerializationConfig ()
163
+ ).as_dict (include = {"use_deprecated_format" })
164
+
165
+ self ._schema_registry_deserializer = _ProtobufDeserializer (
166
+ message_type = msg_type ,
167
+ conf = serialization_config ,
168
+ )
169
+
82
170
def __call__ (
83
171
self , value : bytes , ctx : SerializationContext
84
172
) -> Union [Iterable [Mapping ], Mapping , Message ]:
85
- msg = self ._msg_type ()
86
-
87
- try :
88
- msg .ParseFromString (value )
89
- except DecodeError as exc :
90
- raise SerializationError (str (exc )) from exc
173
+ if self ._schema_registry_deserializer is not None :
174
+ try :
175
+ msg = self ._schema_registry_deserializer (value , ctx )
176
+ except (_SerializationError , DecodeError ) as exc :
177
+ raise SerializationError (str (exc )) from exc
178
+ else :
179
+ msg = self ._msg_type ()
180
+ try :
181
+ msg .ParseFromString (value )
182
+ except DecodeError as exc :
183
+ raise SerializationError (str (exc )) from exc
91
184
92
185
if not self ._to_dict :
93
186
return msg
0 commit comments