Skip to content

Commit 13e5fe9

Browse files
authored
feat: use hamba encoder schema function if available (#1440)
1 parent faebfb7 commit 13e5fe9

File tree

2 files changed

+128
-9
lines changed

2 files changed

+128
-9
lines changed

schemaregistry/serde/avrov2/avro.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -105,13 +105,17 @@ func (s *Serializer) SerializeWithHeaders(topic string, msg interface{}) ([]kafk
105105
if s.Conf.UseSchemaID == -1 &&
106106
!s.Conf.UseLatestVersion &&
107107
len(s.Conf.UseLatestWithMetadata) == 0 {
108-
msgType := reflect.TypeOf(msg)
109-
if msgType.Kind() != reflect.Pointer {
110-
return nil, nil, errors.New("input message must be a pointer")
111-
}
112-
avroSchema, err = StructToSchema(msgType.Elem())
113-
if err != nil {
114-
return nil, nil, err
108+
if st, ok := msg.(interface{ Schema() avro.Schema }); ok {
109+
avroSchema = st.Schema()
110+
} else {
111+
msgType := reflect.TypeOf(msg)
112+
if msgType.Kind() != reflect.Pointer {
113+
return nil, nil, errors.New("input message must be a pointer")
114+
}
115+
avroSchema, err = StructToSchema(msgType.Elem())
116+
if err != nil {
117+
return nil, nil, err
118+
}
115119
}
116120
info = schemaregistry.SchemaInfo{
117121
Schema: avroSchema.String(),

schemaregistry/serde/avrov2/avro_test.go

Lines changed: 117 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@ package avrov2
1818

1919
import (
2020
"errors"
21-
"github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/rules/cel"
2221
"reflect"
2322
"testing"
2423
"time"
2524

25+
"github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/rules/cel"
26+
"github.com/hamba/avro/v2"
27+
2628
_ "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/rules/cel"
2729
"github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/rules/encryption"
2830
_ "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/rules/encryption/awskms"
@@ -286,7 +288,62 @@ const (
286288
"confluent:tags": [ "PII" ]
287289
}
288290
]
289-
}
291+
}
292+
`
293+
demoWithSchemaFuncSchema = `
294+
{
295+
"name": "DemoWithSchemaFunc",
296+
"type": "record",
297+
"fields": [
298+
{
299+
"name": "IntField",
300+
"type": "int"
301+
},
302+
{
303+
"name": "BoolField",
304+
"type": "boolean"
305+
},
306+
{
307+
"name": "ArrayField",
308+
"type": {
309+
"type": "array",
310+
"items": "string"
311+
}
312+
},
313+
{
314+
"name": "MapField",
315+
"type": {
316+
"type": "map",
317+
"values": "string"
318+
}
319+
},
320+
{
321+
"name": "StringField",
322+
"type": ["null", "string"]
323+
},
324+
{
325+
"name": "EnumField",
326+
"type": {
327+
"name": "GreetingsEnum",
328+
"type": "enum",
329+
"symbols": ["hey", "bye"]
330+
}
331+
},
332+
{
333+
"name": "RecordField",
334+
"type": {
335+
"name": "GreetingsObj",
336+
"type": "record",
337+
"fields": [
338+
{
339+
"name": "Hey",
340+
"type": "string"
341+
}
342+
]
343+
}
344+
}
345+
]
346+
}
290347
`
291348
)
292349

@@ -302,6 +359,8 @@ func testMessageFactory(subject string, name string) (interface{}, error) {
302359
return &DemoSchemaSingleTag{}, nil
303360
case "DemoSchemaWithUnion":
304361
return &DemoSchemaWithUnion{}, nil
362+
case "DemoWithSchemaFunc":
363+
return &DemoWithSchemaFunc{}, nil
305364
case "ComplexSchema":
306365
return &ComplexSchema{}, nil
307366
case "SchemaEvolution":
@@ -804,6 +863,46 @@ func TestAvroSchemaEvolution(t *testing.T) {
804863
serde.MaybeFail("deserialization", err, serde.Expect(msg, &obj2))
805864
}
806865

866+
func TestAvroSerdeWithEncodingSchemaFunc(t *testing.T) {
867+
serde.MaybeFail = serde.InitFailFunc(t)
868+
var err error
869+
conf := schemaregistry.NewConfig("mock://")
870+
871+
client, err := schemaregistry.NewClient(conf)
872+
serde.MaybeFail("Schema Registry configuration", err)
873+
874+
ser, err := NewSerializer(client, serde.ValueSerde, NewSerializerConfig())
875+
serde.MaybeFail("Serializer configuration", err)
876+
877+
obj := DemoWithSchemaFunc{
878+
IntField: 123,
879+
StringField: nil,
880+
BoolField: true,
881+
ArrayField: []string{"hello", "world"},
882+
MapField: map[string]string{
883+
"hello": "world",
884+
},
885+
EnumField: "hey",
886+
RecordField: struct {
887+
Hey string `json:"Hey"`
888+
}{Hey: "bye"},
889+
}
890+
bytes, err := ser.Serialize("topic1", &obj)
891+
serde.MaybeFail("serialization", err)
892+
893+
deser, err := NewDeserializer(client, serde.ValueSerde, NewDeserializerConfig())
894+
serde.MaybeFail("Deserializer configuration", err)
895+
deser.Client = ser.Client
896+
deser.MessageFactory = testMessageFactory
897+
898+
var newobj DemoWithSchemaFunc
899+
err = deser.DeserializeInto("topic1", bytes, &newobj)
900+
serde.MaybeFail("deserialization into", err, serde.Expect(newobj, obj))
901+
902+
msg, err := deser.Deserialize("topic1", bytes)
903+
serde.MaybeFail("deserialization", err, serde.Expect(msg, &obj))
904+
}
905+
807906
func TestAvroSerdeWithCELCondition(t *testing.T) {
808907
serde.MaybeFail = serde.InitFailFunc(t)
809908
var err error
@@ -2688,3 +2787,19 @@ type SchemaEvolution1 struct {
26882787
type SchemaEvolution2 struct {
26892788
NewOptionalField string `json:"NewOptionalField"`
26902789
}
2790+
2791+
type DemoWithSchemaFunc struct {
2792+
IntField int32 `json:"IntField"`
2793+
BoolField bool `json:"BoolField"`
2794+
StringField *string `json:"StringField"`
2795+
ArrayField []string `json:"ArrayField"`
2796+
MapField map[string]string `json:"MapField"`
2797+
EnumField string `json:"EnumField"`
2798+
RecordField struct {
2799+
Hey string `json:"Hey"`
2800+
} `json:"RecordField"`
2801+
}
2802+
2803+
func (d *DemoWithSchemaFunc) Schema() avro.Schema {
2804+
return avro.MustParse(demoWithSchemaFuncSchema)
2805+
}

0 commit comments

Comments
 (0)