From 601848336c31b8240cb5228ed06fd6e25081a6ab Mon Sep 17 00:00:00 2001 From: Eduardas Kazakas Date: Fri, 13 Dec 2024 18:00:32 +0200 Subject: [PATCH] feat: add option to specify a version number if schema ID is not available --- go.mod | 2 +- schemaregistry/serde/avrov2/avro.go | 1 + schemaregistry/serde/config.go | 3 +++ schemaregistry/serde/jsonschema/json_schema.go | 1 + schemaregistry/serde/protobuf/protobuf.go | 1 + schemaregistry/serde/serde.go | 8 ++++++++ 6 files changed, 15 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index b785919ee..bf67b9dbf 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/aws/aws-sdk-go-v2/config v1.27.10 github.com/aws/aws-sdk-go-v2/credentials v1.17.10 github.com/aws/aws-sdk-go-v2/service/kms v1.30.1 + github.com/aws/aws-sdk-go-v2/service/sts v1.28.6 github.com/golang/protobuf v1.5.4 github.com/google/cel-go v0.20.1 github.com/google/uuid v1.6.0 @@ -57,7 +58,6 @@ require ( github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.7 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.20.4 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4 // indirect - github.com/aws/aws-sdk-go-v2/service/sts v1.28.6 // indirect github.com/aws/smithy-go v1.20.2 // indirect github.com/bahlo/generic-list-go v0.2.0 // indirect github.com/beorn7/perks v1.0.1 // indirect diff --git a/schemaregistry/serde/avrov2/avro.go b/schemaregistry/serde/avrov2/avro.go index 0ff8d4574..c5b89e4e0 100644 --- a/schemaregistry/serde/avrov2/avro.go +++ b/schemaregistry/serde/avrov2/avro.go @@ -93,6 +93,7 @@ func (s *Serializer) Serialize(topic string, msg interface{}) ([]byte, error) { var err error // Don't derive the schema if it is being looked up in the following ways if s.Conf.UseSchemaID == -1 && + s.Conf.UseSpecificVersion == -1 && !s.Conf.UseLatestVersion && len(s.Conf.UseLatestWithMetadata) == 0 { msgType := reflect.TypeOf(msg) diff --git a/schemaregistry/serde/config.go b/schemaregistry/serde/config.go index 234ecda6b..70a4e1521 100644 --- a/schemaregistry/serde/config.go +++ b/schemaregistry/serde/config.go @@ -22,6 +22,8 @@ type SerializerConfig struct { AutoRegisterSchemas bool // UseSchemaID specifies a schema ID to use during serialization UseSchemaID int + // UseVersion specifies a specific schema version to use during serialization + UseSpecificVersion int // UseLatestVersion specifies whether to use the latest schema version during serialization UseLatestVersion bool // UseLatestWithMetadata specifies whether to use the latest schema with metadata during serialization @@ -38,6 +40,7 @@ func NewSerializerConfig() *SerializerConfig { c.AutoRegisterSchemas = true c.UseSchemaID = -1 + c.UseSpecificVersion = -1 c.UseLatestVersion = false c.NormalizeSchemas = false diff --git a/schemaregistry/serde/jsonschema/json_schema.go b/schemaregistry/serde/jsonschema/json_schema.go index 8b637974e..047c1b73d 100644 --- a/schemaregistry/serde/jsonschema/json_schema.go +++ b/schemaregistry/serde/jsonschema/json_schema.go @@ -95,6 +95,7 @@ func (s *Serializer) Serialize(topic string, msg interface{}) ([]byte, error) { var err error // Don't derive the schema if it is being looked up in the following ways if s.Conf.UseSchemaID == -1 && + s.Conf.UseSpecificVersion == -1 && !s.Conf.UseLatestVersion && len(s.Conf.UseLatestWithMetadata) == 0 { jschema := jsonschema.Reflect(msg) diff --git a/schemaregistry/serde/protobuf/protobuf.go b/schemaregistry/serde/protobuf/protobuf.go index a021e61d3..41800ead6 100644 --- a/schemaregistry/serde/protobuf/protobuf.go +++ b/schemaregistry/serde/protobuf/protobuf.go @@ -208,6 +208,7 @@ func (s *Serializer) Serialize(topic string, msg interface{}) ([]byte, error) { var err error // Don't derive the schema if it is being looked up in the following ways if s.Conf.UseSchemaID == -1 && + s.Conf.UseSpecificVersion == -1 && !s.Conf.UseLatestVersion && len(s.Conf.UseLatestWithMetadata) == 0 { schemaInfo, err := s.getSchemaInfo(protoMsg) diff --git a/schemaregistry/serde/serde.go b/schemaregistry/serde/serde.go index ddccefe72..2a84a9a1d 100644 --- a/schemaregistry/serde/serde.go +++ b/schemaregistry/serde/serde.go @@ -408,6 +408,7 @@ func TopicNameStrategy(topic string, serdeType Type, schema schemaregistry.Schem func (s *BaseSerializer) GetID(topic string, msg interface{}, info *schemaregistry.SchemaInfo) (int, error) { autoRegister := s.Conf.AutoRegisterSchemas useSchemaID := s.Conf.UseSchemaID + useSpecificVersion := s.Conf.UseSpecificVersion useLatestWithMetadata := s.Conf.UseLatestWithMetadata useLatest := s.Conf.UseLatestVersion normalizeSchema := s.Conf.NormalizeSchemas @@ -428,6 +429,13 @@ func (s *BaseSerializer) GetID(topic string, msg interface{}, info *schemaregist return -1, err } id = useSchemaID + } else if useSpecificVersion >= 0 { + metadata, err := s.Client.GetSchemaMetadata(subject, useSpecificVersion) + if err != nil { + return -1, err + } + *info = metadata.SchemaInfo + id = metadata.ID } else if len(useLatestWithMetadata) != 0 { metadata, err := s.Client.GetLatestWithMetadata(subject, useLatestWithMetadata, true) if err != nil {