Skip to content

Commit f1230c0

Browse files
authored
add optional caching mode to serializer (#1151)
1 parent 356d1cd commit f1230c0

File tree

4 files changed

+205
-21
lines changed

4 files changed

+205
-21
lines changed

schemaregistry/serde/protobuf/config.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,21 @@ import "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde"
2121
// SerializerConfig is used to pass multiple configuration options to the serializers.
2222
type SerializerConfig struct {
2323
serde.SerializerConfig
24+
// CacheSchemas will cache serialization results based on the name of the protobuf file
25+
// corresponding to the message being serialized. This will drastically improve serialization
26+
// performance if you are only ever using a _single_ version of a specific protobuf schema
27+
// during any given run of your application. This should be the case for most applications,
28+
// but might not apply if you're not creating proto messages based on generated files (e.g.
29+
// you are proxying or reading raw protobuf messages from a data source), or if for some reason
30+
// you are including multiple versions of the same schema/protobuf in your application.
31+
CacheSchemas bool
2432
}
2533

2634
// NewSerializerConfig returns a new configuration instance with sane defaults.
2735
func NewSerializerConfig() *SerializerConfig {
2836
c := &SerializerConfig{
2937
SerializerConfig: *serde.NewSerializerConfig(),
38+
CacheSchemas: false,
3039
}
3140

3241
return c

schemaregistry/serde/protobuf/protobuf.go

Lines changed: 53 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ import (
6666
// Serializer represents a Protobuf serializer
6767
type Serializer struct {
6868
serde.BaseSerializer
69+
Conf *SerializerConfig
70+
descToSchemaCache cache.Cache
71+
descToSchemaCacheLock sync.RWMutex
6972
}
7073

7174
// Deserializer represents a Protobuf deserializer
@@ -132,8 +135,16 @@ func init() {
132135

133136
// NewSerializer creates a Protobuf serializer for Protobuf-generated objects
134137
func NewSerializer(client schemaregistry.Client, serdeType serde.Type, conf *SerializerConfig) (*Serializer, error) {
135-
s := &Serializer{}
136-
err := s.ConfigureSerializer(client, serdeType, &conf.SerializerConfig)
138+
cache, err := cache.NewLRUCache(1000)
139+
if err != nil {
140+
return nil, err
141+
}
142+
s := &Serializer{
143+
descToSchemaCache: cache,
144+
}
145+
err = s.ConfigureSerializer(client, serdeType, &conf.SerializerConfig)
146+
s.Conf = conf
147+
137148
if err != nil {
138149
return nil, err
139150
}
@@ -166,22 +177,11 @@ func (s *Serializer) Serialize(topic string, msg interface{}) ([]byte, error) {
166177
default:
167178
return nil, fmt.Errorf("serialization target must be a protobuf message. Got '%v'", t)
168179
}
169-
autoRegister := s.Conf.AutoRegisterSchemas
170-
normalize := s.Conf.NormalizeSchemas
171-
fileDesc, deps, err := s.toProtobufSchema(protoMsg)
180+
info, err := s.getSchemaInfo(protoMsg)
172181
if err != nil {
173182
return nil, err
174183
}
175-
metadata, err := s.resolveDependencies(fileDesc, deps, "", autoRegister, normalize)
176-
if err != nil {
177-
return nil, err
178-
}
179-
info := schemaregistry.SchemaInfo{
180-
Schema: metadata.Schema,
181-
SchemaType: metadata.SchemaType,
182-
References: metadata.References,
183-
}
184-
id, err := s.GetID(topic, protoMsg, &info)
184+
id, err := s.GetID(topic, protoMsg, info)
185185
if err != nil {
186186
return nil, err
187187
}
@@ -197,18 +197,50 @@ func (s *Serializer) Serialize(topic string, msg interface{}) ([]byte, error) {
197197
return payload, nil
198198
}
199199

200-
func (s *Serializer) toProtobufSchema(msg proto.Message) (*desc.FileDescriptor, map[string]string, error) {
201-
messageDesc, err := desc.LoadMessageDescriptorForMessage(protoV1.MessageV1(msg))
200+
func (s *Serializer) getSchemaInfo(protoMsg proto.Message) (*schemaregistry.SchemaInfo, error) {
201+
messageDesc, err := desc.LoadMessageDescriptorForMessage(protoV1.MessageV1(protoMsg))
202202
if err != nil {
203-
return nil, nil, err
203+
return nil, err
204204
}
205205
fileDesc := messageDesc.GetFile()
206+
if s.Conf.CacheSchemas {
207+
s.descToSchemaCacheLock.RLock()
208+
value, ok := s.descToSchemaCache.Get(fileDesc.GetName())
209+
s.descToSchemaCacheLock.RUnlock()
210+
if ok {
211+
return value.(*schemaregistry.SchemaInfo), nil
212+
}
213+
}
214+
deps, err := s.toProtobufSchema(fileDesc)
215+
if err != nil {
216+
return nil, err
217+
}
218+
autoRegister := s.Conf.AutoRegisterSchemas
219+
normalize := s.Conf.NormalizeSchemas
220+
metadata, err := s.resolveDependencies(fileDesc, deps, "", autoRegister, normalize)
221+
if err != nil {
222+
return nil, err
223+
}
224+
info := &schemaregistry.SchemaInfo{
225+
Schema: metadata.Schema,
226+
SchemaType: metadata.SchemaType,
227+
References: metadata.References,
228+
}
229+
if s.Conf.CacheSchemas {
230+
s.descToSchemaCacheLock.Lock()
231+
s.descToSchemaCache.Put(fileDesc.GetName(), info)
232+
s.descToSchemaCacheLock.Unlock()
233+
}
234+
return info, nil
235+
}
236+
237+
func (s *Serializer) toProtobufSchema(fileDesc *desc.FileDescriptor) (map[string]string, error) {
206238
deps := make(map[string]string)
207-
err = s.toDependencies(fileDesc, deps)
239+
err := s.toDependencies(fileDesc, deps)
208240
if err != nil {
209-
return nil, nil, err
241+
return nil, err
210242
}
211-
return fileDesc, deps, nil
243+
return deps, nil
212244
}
213245

214246
func (s *Serializer) toDependencies(fileDesc *desc.FileDescriptor, deps map[string]string) error {

schemaregistry/serde/protobuf/protobuf_test.go

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,3 +199,130 @@ func TestProtobufSerdeEmptyMessage(t *testing.T) {
199199
_, err = deser.Deserialize("topic1", []byte{})
200200
serde.MaybeFail("deserialization", err)
201201
}
202+
203+
func BenchmarkProtobufSerWithReference(b *testing.B) {
204+
serde.MaybeFail = serde.InitFailFuncBenchmark(b)
205+
var err error
206+
conf := schemaregistry.NewConfig("mock://")
207+
208+
client, err := schemaregistry.NewClient(conf)
209+
serde.MaybeFail("Schema Registry configuration", err)
210+
211+
ser, err := NewSerializer(client, serde.ValueSerde, NewSerializerConfig())
212+
serde.MaybeFail("Serializer configuration", err)
213+
214+
msg := test.TestMessage{
215+
TestString: "hi",
216+
TestBool: true,
217+
TestBytes: []byte{1, 2},
218+
TestDouble: 1.23,
219+
TestFloat: 3.45,
220+
TestFixed32: 67,
221+
TestFixed64: 89,
222+
TestInt32: 100,
223+
TestInt64: 200,
224+
TestSfixed32: 300,
225+
TestSfixed64: 400,
226+
TestSint32: 500,
227+
TestSint64: 600,
228+
TestUint32: 700,
229+
TestUint64: 800,
230+
}
231+
obj := test.DependencyMessage{
232+
IsActive: true,
233+
TestMesssage: &msg,
234+
}
235+
236+
b.ResetTimer()
237+
for i := 0; i < b.N; i++ {
238+
ser.Serialize("topic1", &obj)
239+
}
240+
}
241+
242+
func BenchmarkProtobufSerWithReferenceCached(b *testing.B) {
243+
serde.MaybeFail = serde.InitFailFuncBenchmark(b)
244+
var err error
245+
conf := schemaregistry.NewConfig("mock://")
246+
247+
client, err := schemaregistry.NewClient(conf)
248+
serde.MaybeFail("Schema Registry configuration", err)
249+
250+
serConf := NewSerializerConfig()
251+
serConf.CacheSchemas = true
252+
ser, err := NewSerializer(client, serde.ValueSerde, serConf)
253+
serde.MaybeFail("Serializer configuration", err)
254+
255+
msg := test.TestMessage{
256+
TestString: "hi",
257+
TestBool: true,
258+
TestBytes: []byte{1, 2},
259+
TestDouble: 1.23,
260+
TestFloat: 3.45,
261+
TestFixed32: 67,
262+
TestFixed64: 89,
263+
TestInt32: 100,
264+
TestInt64: 200,
265+
TestSfixed32: 300,
266+
TestSfixed64: 400,
267+
TestSint32: 500,
268+
TestSint64: 600,
269+
TestUint32: 700,
270+
TestUint64: 800,
271+
}
272+
obj := test.DependencyMessage{
273+
IsActive: true,
274+
TestMesssage: &msg,
275+
}
276+
277+
b.ResetTimer()
278+
for i := 0; i < b.N; i++ {
279+
ser.Serialize("topic1", &obj)
280+
}
281+
}
282+
283+
func BenchmarkProtobufDeserWithReference(b *testing.B) {
284+
serde.MaybeFail = serde.InitFailFuncBenchmark(b)
285+
var err error
286+
conf := schemaregistry.NewConfig("mock://")
287+
288+
client, err := schemaregistry.NewClient(conf)
289+
serde.MaybeFail("Schema Registry configuration", err)
290+
291+
ser, err := NewSerializer(client, serde.ValueSerde, NewSerializerConfig())
292+
serde.MaybeFail("Serializer configuration", err)
293+
294+
msg := test.TestMessage{
295+
TestString: "hi",
296+
TestBool: true,
297+
TestBytes: []byte{1, 2},
298+
TestDouble: 1.23,
299+
TestFloat: 3.45,
300+
TestFixed32: 67,
301+
TestFixed64: 89,
302+
TestInt32: 100,
303+
TestInt64: 200,
304+
TestSfixed32: 300,
305+
TestSfixed64: 400,
306+
TestSint32: 500,
307+
TestSint64: 600,
308+
TestUint32: 700,
309+
TestUint64: 800,
310+
}
311+
obj := test.DependencyMessage{
312+
IsActive: true,
313+
TestMesssage: &msg,
314+
}
315+
bytes, err := ser.Serialize("topic1", &obj)
316+
serde.MaybeFail("serialization", err)
317+
318+
deser, err := NewDeserializer(client, serde.ValueSerde, NewDeserializerConfig())
319+
serde.MaybeFail("Deserializer configuration", err)
320+
deser.Client = ser.Client
321+
322+
deser.ProtoRegistry.RegisterMessage(obj.ProtoReflect().Type())
323+
324+
b.ResetTimer()
325+
for i := 0; i < b.N; i++ {
326+
deser.Deserialize("topic1", bytes)
327+
}
328+
}

schemaregistry/serde/testhelpers.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,22 @@ func InitFailFunc(t *testing.T) FailFunc {
4646
}
4747
}
4848

49+
func InitFailFuncBenchmark(b *testing.B) FailFunc {
50+
tester := b
51+
return func(msg string, errors ...error) {
52+
for _, err := range errors {
53+
if err != nil {
54+
pc := make([]uintptr, 1)
55+
runtime.Callers(2, pc)
56+
caller := runtime.FuncForPC(pc[0])
57+
_, line := caller.FileLine(caller.Entry())
58+
59+
tester.Fatalf("%s:%d failed: %s %s", caller.Name(), line, msg, err)
60+
}
61+
}
62+
}
63+
}
64+
4965
// Expect compares the actual and expected values
5066
func Expect(actual, expected interface{}) error {
5167
if !reflect.DeepEqual(actual, expected) {

0 commit comments

Comments
 (0)