Skip to content

Commit c4ddca9

Browse files
authored
Support for schema id in header (#1431)
* DGS-20366 Support schema ID in header * Use id serdes * Minor cleanup * Minor cleanup * Fix comments * Add test * Minor renaming * Add test * Fix get id with response
1 parent e71d5ac commit c4ddca9

16 files changed

+886
-166
lines changed

schemaregistry/internal/rest_service.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ const (
4848
Contexts = "/contexts"
4949
SchemasBySubject = "/schemas/ids/%d?subject=%s"
5050
SubjectsAndVersionsByID = "/schemas/ids/%d/versions"
51+
SchemasByGUID = "/schemas/guids/%s"
5152
Subject = "/subjects"
5253
Subjects = Subject + "/%s"
5354
SubjectsNormalize = Subject + "/%s?normalize=%t"

schemaregistry/mock_schemaregistry_client.go

Lines changed: 73 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package schemaregistry
1919
import (
2020
"errors"
2121
"fmt"
22+
"github.com/google/uuid"
2223
"net/url"
2324
"reflect"
2425
"sort"
@@ -67,6 +68,8 @@ type mockclient struct {
6768
infoToSchemaCacheLock sync.RWMutex
6869
idToSchemaCache map[subjectID]infoCacheEntry
6970
idToSchemaCacheLock sync.RWMutex
71+
guidToSchemaCache map[string]infoCacheEntry
72+
guidToSchemaCacheLock sync.RWMutex
7073
schemaToVersionCache map[subjectJSON]versionCacheEntry
7174
schemaToVersionCacheLock sync.RWMutex
7275
configCache map[string]ServerConfig
@@ -118,7 +121,7 @@ func (c *mockclient) RegisterFullResponse(subject string, schema SchemaInfo, nor
118121
return *cacheEntryVal.metadata, nil
119122
}
120123

121-
id, err := c.getIDFromRegistry(subject, schema)
124+
id, guid, err := c.getIDFromRegistry(subject, schema)
122125
if err != nil {
123126
return SchemaMetadata{
124127
ID: -1,
@@ -127,14 +130,15 @@ func (c *mockclient) RegisterFullResponse(subject string, schema SchemaInfo, nor
127130
result = SchemaMetadata{
128131
SchemaInfo: schema,
129132
ID: id,
133+
GUID: guid,
130134
}
131135
c.infoToSchemaCacheLock.Lock()
132136
c.infoToSchemaCache[cacheKey] = metadataCacheEntry{&result, false}
133137
c.infoToSchemaCacheLock.Unlock()
134138
return result, nil
135139
}
136140

137-
func (c *mockclient) getIDFromRegistry(subject string, schema SchemaInfo) (int, error) {
141+
func (c *mockclient) getIDFromRegistry(subject string, schema SchemaInfo) (int, string, error) {
138142
var id = -1
139143
c.idToSchemaCacheLock.RLock()
140144
for key, value := range c.idToSchemaCache {
@@ -144,9 +148,18 @@ func (c *mockclient) getIDFromRegistry(subject string, schema SchemaInfo) (int,
144148
}
145149
}
146150
c.idToSchemaCacheLock.RUnlock()
151+
var guid string
152+
c.guidToSchemaCacheLock.RLock()
153+
for key, value := range c.guidToSchemaCache {
154+
if schemasEqual(*value.info, schema) {
155+
guid = key
156+
break
157+
}
158+
}
159+
c.guidToSchemaCacheLock.RUnlock()
147160
err := c.generateVersion(subject, schema)
148161
if err != nil {
149-
return -1, err
162+
return -1, "", err
150163
}
151164
if id < 0 {
152165
id = c.counter.increment()
@@ -157,8 +170,13 @@ func (c *mockclient) getIDFromRegistry(subject string, schema SchemaInfo) (int,
157170
c.idToSchemaCacheLock.Lock()
158171
c.idToSchemaCache[idCacheKey] = infoCacheEntry{&schema, false}
159172
c.idToSchemaCacheLock.Unlock()
173+
174+
guid = uuid.New().String()
175+
c.guidToSchemaCacheLock.Lock()
176+
c.guidToSchemaCache[guid] = infoCacheEntry{&schema, false}
177+
c.guidToSchemaCacheLock.Unlock()
160178
}
161-
return id, nil
179+
return id, guid, nil
162180
}
163181

164182
func (c *mockclient) generateVersion(subject string, schema SchemaInfo) error {
@@ -204,6 +222,23 @@ func (c *mockclient) GetBySubjectAndID(subject string, id int) (schema SchemaInf
204222
return SchemaInfo{}, &posErr
205223
}
206224

225+
// GetByGUID returns the schema identified by guid
226+
// Returns Schema object on success
227+
func (c *mockclient) GetByGUID(guid string) (schema SchemaInfo, err error) {
228+
c.guidToSchemaCacheLock.RLock()
229+
cacheEntryValue, ok := c.guidToSchemaCache[guid]
230+
c.guidToSchemaCacheLock.RUnlock()
231+
if ok {
232+
return *cacheEntryValue.info, nil
233+
}
234+
posErr := url.Error{
235+
Op: "GET",
236+
URL: c.url.String() + fmt.Sprintf(internal.SchemasByGUID, guid),
237+
Err: errors.New("Schema Not Found"),
238+
}
239+
return SchemaInfo{}, &posErr
240+
}
241+
207242
func (c *mockclient) GetSubjectsAndVersionsByID(id int) (subjectsAndVersions []SubjectAndVersion, err error) {
208243
subjectsAndVersions = make([]SubjectAndVersion, 0)
209244

@@ -255,10 +290,21 @@ func (c *mockclient) GetSubjectsAndVersionsByID(id int) (subjectsAndVersions []S
255290

256291
// GetID checks if a schema has been registered with the subject. Returns ID if the registration can be found
257292
func (c *mockclient) GetID(subject string, schema SchemaInfo, normalize bool) (id int, err error) {
258-
schemaJSON, err := schema.MarshalJSON()
293+
metadata, err := c.GetIDFullResponse(subject, schema, normalize)
259294
if err != nil {
260295
return -1, err
261296
}
297+
return metadata.ID, err
298+
}
299+
300+
// GetIDFullResponse checks if a schema has been registered with the subject. Returns ID if the registration can be found
301+
func (c *mockclient) GetIDFullResponse(subject string, schema SchemaInfo, normalize bool) (result SchemaMetadata, err error) {
302+
schemaJSON, err := schema.MarshalJSON()
303+
if err != nil {
304+
return SchemaMetadata{
305+
ID: -1,
306+
}, err
307+
}
262308
cacheKey := subjectJSON{
263309
subject: subject,
264310
json: string(schemaJSON),
@@ -270,15 +316,17 @@ func (c *mockclient) GetID(subject string, schema SchemaInfo, normalize bool) (i
270316
}
271317
c.infoToSchemaCacheLock.RUnlock()
272318
if ok {
273-
return cacheEntryVal.metadata.ID, nil
319+
return *cacheEntryVal.metadata, nil
274320
}
275321

276322
posErr := url.Error{
277323
Op: "GET",
278324
URL: c.url.String() + fmt.Sprintf(internal.Subjects, url.PathEscape(subject)),
279325
Err: errors.New("Subject Not found"),
280326
}
281-
return -1, &posErr
327+
return SchemaMetadata{
328+
ID: -1,
329+
}, &posErr
282330
}
283331

284332
// GetLatestSchemaMetadata fetches latest version registered with the provided subject
@@ -345,10 +393,28 @@ func (c *mockclient) GetSchemaMetadataIncludeDeleted(subject string, version int
345393
}
346394
return SchemaMetadata{}, &posErr
347395
}
396+
var guid string
397+
c.guidToSchemaCacheLock.RLock()
398+
for key, value := range c.guidToSchemaCache {
399+
if schemasEqual(*value.info, info) && (!value.softDeleted || deleted) {
400+
guid = key
401+
break
402+
}
403+
}
404+
c.guidToSchemaCacheLock.RUnlock()
405+
if guid == "" {
406+
posErr := url.Error{
407+
Op: "GET",
408+
URL: c.url.String() + fmt.Sprintf(internal.Versions, url.PathEscape(subject), version),
409+
Err: errors.New("Subject Not found"),
410+
}
411+
return SchemaMetadata{}, &posErr
412+
}
348413
return SchemaMetadata{
349414
SchemaInfo: info,
350415

351416
ID: id,
417+
GUID: guid,
352418
Subject: subject,
353419
Version: version,
354420
}, nil

schemaregistry/rules/encryption/deks/dekregistry_client.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -397,7 +397,7 @@ func (c *client) GetDekVersion(kekName string, subject string, version int, algo
397397
return dek, err
398398
}
399399

400-
// GetEncryptedKeyMaterialBytes returns the EncryptedKeyMaterialBytes
400+
// GetDekEncryptedKeyMaterialBytes returns the EncryptedKeyMaterialBytes
401401
func (c *client) GetDekEncryptedKeyMaterialBytes(dek *Dek) ([]byte, error) {
402402
if dek.EncryptedKeyMaterial == "" {
403403
return nil, nil
@@ -416,7 +416,7 @@ func (c *client) GetDekEncryptedKeyMaterialBytes(dek *Dek) ([]byte, error) {
416416
return dek.EncryptedKeyMaterialBytes, nil
417417
}
418418

419-
// GetKeyMaterialBytes returns the KeyMaterialBytes
419+
// GetDekKeyMaterialBytes returns the KeyMaterialBytes
420420
func (c *client) GetDekKeyMaterialBytes(dek *Dek) ([]byte, error) {
421421
if dek.KeyMaterial == "" {
422422
return nil, nil
@@ -435,7 +435,7 @@ func (c *client) GetDekKeyMaterialBytes(dek *Dek) ([]byte, error) {
435435
return dek.KeyMaterialBytes, nil
436436
}
437437

438-
// SetKeyMaterial sets the KeyMaterial using the given bytes
438+
// SetDekKeyMaterial sets the KeyMaterial using the given bytes
439439
func (c *client) SetDekKeyMaterial(dek *Dek, keyMaterialBytes []byte) {
440440
c.dekCacheLock.Lock()
441441
defer c.dekCacheLock.Unlock()

schemaregistry/rules/encryption/deks/mock_dekregistry_client.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ func (c *mockclient) GetDekVersion(kekName string, subject string, version int,
181181
return Dek{}, &posErr
182182
}
183183

184-
// GetEncryptedKeyMaterialBytes returns the EncryptedKeyMaterialBytes
184+
// GetDekEncryptedKeyMaterialBytes returns the EncryptedKeyMaterialBytes
185185
func (c *mockclient) GetDekEncryptedKeyMaterialBytes(dek *Dek) ([]byte, error) {
186186
if dek.EncryptedKeyMaterial == "" {
187187
return nil, nil
@@ -200,7 +200,7 @@ func (c *mockclient) GetDekEncryptedKeyMaterialBytes(dek *Dek) ([]byte, error) {
200200
return dek.EncryptedKeyMaterialBytes, nil
201201
}
202202

203-
// GetKeyMaterialBytes returns the KeyMaterialBytes
203+
// GetDekKeyMaterialBytes returns the KeyMaterialBytes
204204
func (c *mockclient) GetDekKeyMaterialBytes(dek *Dek) ([]byte, error) {
205205
if dek.KeyMaterial == "" {
206206
return nil, nil
@@ -219,7 +219,7 @@ func (c *mockclient) GetDekKeyMaterialBytes(dek *Dek) ([]byte, error) {
219219
return dek.KeyMaterialBytes, nil
220220
}
221221

222-
// SetKeyMaterial sets the KeyMaterial using the given bytes
222+
// SetDekKeyMaterial sets the KeyMaterial using the given bytes
223223
func (c *mockclient) SetDekKeyMaterial(dek *Dek, keyMaterialBytes []byte) {
224224
dekCacheLock.Lock()
225225
defer dekCacheLock.Unlock()

schemaregistry/rules/encryption/encrypt_executor.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -597,7 +597,7 @@ func (f *FieldEncryptionExecutorTransform) Transform(ctx serde.RuleContext, fiel
597597

598598
func prefixVersion(version int, ciphertext []byte) ([]byte, error) {
599599
var buf bytes.Buffer
600-
err := buf.WriteByte(serde.MagicByte)
600+
err := buf.WriteByte(serde.MagicByteV0)
601601
if err != nil {
602602
return nil, err
603603
}
@@ -615,7 +615,7 @@ func prefixVersion(version int, ciphertext []byte) ([]byte, error) {
615615
}
616616

617617
func extractVersion(ciphertext []byte) (int, error) {
618-
if ciphertext[0] != serde.MagicByte {
618+
if ciphertext[0] != serde.MagicByteV0 {
619619
return -1, fmt.Errorf("unknown magic byte")
620620
}
621621
version := binary.BigEndian.Uint32(ciphertext[1:5])

0 commit comments

Comments
 (0)