Skip to content

DGS-21268 Add support for full payload encryption #1452

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 41 additions & 34 deletions schemaregistry/rules/encryption/encrypt_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,11 @@
// Register registers the encryption rule executor
func Register() {
serde.RegisterRuleExecutor(NewExecutor())
serde.RegisterRuleExecutor(NewFieldExecutor())
}

// RegisterWithClock registers the encryption rule executor with a given clock
func RegisterWithClock(c Clock) *FieldEncryptionExecutor {
// RegisterExecutorWithClock registers the encryption rule executor with a given clock
func RegisterExecutorWithClock(c Clock) *Executor {
f := NewExecutorWithClock(c)
serde.RegisterRuleExecutor(f)
return f
Expand All @@ -60,10 +61,8 @@
}

// NewExecutorWithClock creates a new encryption rule executor with a given clock
func NewExecutorWithClock(c Clock) *FieldEncryptionExecutor {
a := &serde.AbstractFieldRuleExecutor{}
f := &FieldEncryptionExecutor{*a, nil, nil, c}
f.FieldRuleExecutor = f
func NewExecutorWithClock(c Clock) *Executor {
f := &Executor{nil, nil, c}
return f
}

Expand Down Expand Up @@ -101,16 +100,15 @@
return time.Now().UnixMilli()
}

// FieldEncryptionExecutor is a field encryption executor
type FieldEncryptionExecutor struct {
serde.AbstractFieldRuleExecutor
// Executor is an encryption executor
type Executor struct {
Config map[string]string
Client deks.Client
Clock Clock
}

// Configure configures the executor
func (f *FieldEncryptionExecutor) Configure(clientConfig *schemaregistry.Config, config map[string]string) error {
func (f *Executor) Configure(clientConfig *schemaregistry.Config, config map[string]string) error {

Check failure on line 111 in schemaregistry/rules/encryption/encrypt_executor.go

View check run for this annotation

SonarQube-Confluent / confluent-kafka-go Sonarqube Results

schemaregistry/rules/encryption/encrypt_executor.go#L111

Refactor this method to reduce its Cognitive Complexity from 19 to the 15 allowed.
if f.Client != nil {
if !schemaregistry.ConfigsEqual(f.Client.Config(), clientConfig) {
return errors.New("executor already configured")
Expand Down Expand Up @@ -143,12 +141,21 @@
}

// Type returns the type of the executor
func (f *FieldEncryptionExecutor) Type() string {
return "ENCRYPT"
func (f *Executor) Type() string {
return "ENCRYPT_PAYLOAD"
}

// Transform transforms the message using the rule
func (f *Executor) Transform(ctx serde.RuleContext, msg interface{}) (interface{}, error) {
transform, err := f.NewTransform(ctx)
if err != nil {
return nil, err
}
return transform.Transform(ctx, serde.TypeBytes, msg)
}

// NewTransform creates a new transform
func (f *FieldEncryptionExecutor) NewTransform(ctx serde.RuleContext) (serde.FieldTransform, error) {
func (f *Executor) NewTransform(ctx serde.RuleContext) (*ExecutorTransform, error) {
kekName, err := getKekName(ctx)
if err != nil {
return nil, err
Expand All @@ -157,7 +164,7 @@
if err != nil {
return nil, err
}
transform := FieldEncryptionExecutorTransform{
transform := ExecutorTransform{
Executor: *f,
Cryptor: getCryptor(ctx),
KekName: kekName,
Expand All @@ -172,13 +179,13 @@
}

// Close closes the executor
func (f *FieldEncryptionExecutor) Close() error {
func (f *Executor) Close() error {
return f.Client.Close()
}

// FieldEncryptionExecutorTransform is a field encryption executor transform
type FieldEncryptionExecutorTransform struct {
Executor FieldEncryptionExecutor
// ExecutorTransform is a field encryption executor transform
type ExecutorTransform struct {
Executor Executor
Cryptor Cryptor
KekName string
Kek deks.Kek
Expand Down Expand Up @@ -290,11 +297,11 @@
return i, nil
}

func (f *FieldEncryptionExecutorTransform) isDekRotated() bool {
func (f *ExecutorTransform) isDekRotated() bool {
return f.DekExpiryDays > 0
}

func (f *FieldEncryptionExecutorTransform) getOrCreateKek(ctx serde.RuleContext) (*deks.Kek, error) {
func (f *ExecutorTransform) getOrCreateKek(ctx serde.RuleContext) (*deks.Kek, error) {

Check failure on line 304 in schemaregistry/rules/encryption/encrypt_executor.go

View check run for this annotation

SonarQube-Confluent / confluent-kafka-go Sonarqube Results

schemaregistry/rules/encryption/encrypt_executor.go#L304

Refactor this method to reduce its Cognitive Complexity from 20 to the 15 allowed.
isRead := ctx.RuleMode == schemaregistry.Read
kekID := deks.KekID{
Name: f.KekName,
Expand Down Expand Up @@ -334,7 +341,7 @@
return kek, nil
}

func (f *FieldEncryptionExecutorTransform) retrieveKekFromRegistry(key deks.KekID) (*deks.Kek, error) {
func (f *ExecutorTransform) retrieveKekFromRegistry(key deks.KekID) (*deks.Kek, error) {
kek, err := f.Executor.Client.GetKek(key.Name, key.Deleted)
if err != nil {
var restErr *rest.Error
Expand All @@ -348,7 +355,7 @@
return &kek, nil
}

func (f *FieldEncryptionExecutorTransform) storeKekToRegistry(key deks.KekID, kmsType string, kmsKeyID string, shared bool) (*deks.Kek, error) {
func (f *ExecutorTransform) storeKekToRegistry(key deks.KekID, kmsType string, kmsKeyID string, shared bool) (*deks.Kek, error) {
kek, err := f.Executor.Client.RegisterKek(key.Name, kmsType, kmsKeyID, nil, "", shared)
if err != nil {
var restErr *rest.Error
Expand All @@ -362,7 +369,7 @@
return &kek, nil
}

func (f *FieldEncryptionExecutorTransform) getOrCreateDek(ctx serde.RuleContext, version *int) (*deks.Dek, error) {
func (f *ExecutorTransform) getOrCreateDek(ctx serde.RuleContext, version *int) (*deks.Dek, error) {

Check failure on line 372 in schemaregistry/rules/encryption/encrypt_executor.go

View check run for this annotation

SonarQube-Confluent / confluent-kafka-go Sonarqube Results

schemaregistry/rules/encryption/encrypt_executor.go#L372

Refactor this method to reduce its Cognitive Complexity from 36 to the 15 allowed.
isRead := ctx.RuleMode == schemaregistry.Read
ver := 1
if version != nil {
Expand Down Expand Up @@ -442,7 +449,7 @@
return dek, nil
}

func (f *FieldEncryptionExecutorTransform) createDek(dekID deks.DekID, newVersion int, encryptedDek []byte) (*deks.Dek, error) {
func (f *ExecutorTransform) createDek(dekID deks.DekID, newVersion int, encryptedDek []byte) (*deks.Dek, error) {
newDekID := deks.DekID{
KekName: dekID.KekName,
Subject: dekID.Subject,
Expand All @@ -466,7 +473,7 @@
return dek, nil
}

func (f *FieldEncryptionExecutorTransform) retrieveDekFromRegistry(key deks.DekID) (*deks.Dek, error) {
func (f *ExecutorTransform) retrieveDekFromRegistry(key deks.DekID) (*deks.Dek, error) {
var dek deks.Dek
var err error
if key.Version != 0 {
Expand All @@ -486,7 +493,7 @@
return &dek, nil
}

func (f *FieldEncryptionExecutorTransform) storeDekToRegistry(key deks.DekID, encryptedDek []byte) (*deks.Dek, error) {
func (f *ExecutorTransform) storeDekToRegistry(key deks.DekID, encryptedDek []byte) (*deks.Dek, error) {
var encryptedDekStr string
if encryptedDek != nil {
encryptedDekStr = base64.StdEncoding.EncodeToString(encryptedDek)
Expand All @@ -510,7 +517,7 @@
return &dek, nil
}

func (f *FieldEncryptionExecutorTransform) isExpired(ctx serde.RuleContext, dek *deks.Dek) bool {
func (f *ExecutorTransform) isExpired(ctx serde.RuleContext, dek *deks.Dek) bool {
now := f.Executor.Clock.NowUnixMilli()
return ctx.RuleMode != schemaregistry.Read &&
f.DekExpiryDays > 0 &&
Expand All @@ -519,15 +526,15 @@
}

// Transform transforms the field value using the rule
func (f *FieldEncryptionExecutorTransform) Transform(ctx serde.RuleContext, fieldCtx serde.FieldContext, fieldValue interface{}) (interface{}, error) {
func (f *ExecutorTransform) Transform(ctx serde.RuleContext, fieldType serde.FieldType, fieldValue interface{}) (interface{}, error) {

Check failure on line 529 in schemaregistry/rules/encryption/encrypt_executor.go

View check run for this annotation

SonarQube-Confluent / confluent-kafka-go Sonarqube Results

schemaregistry/rules/encryption/encrypt_executor.go#L529

Refactor this method to reduce its Cognitive Complexity from 37 to the 15 allowed.
if fieldValue == nil {
return nil, nil
}
switch ctx.RuleMode {
case schemaregistry.Write:
plaintext := toBytes(fieldCtx.Type, fieldValue)
plaintext := toBytes(fieldType, fieldValue)
if plaintext == nil {
return nil, fmt.Errorf("type '%v' not supported for encryption", fieldCtx.Type)
return nil, fmt.Errorf("type '%v' not supported for encryption", fieldType)
}
var version *int
if f.isDekRotated() {
Expand All @@ -552,16 +559,16 @@
return nil, err
}
}
if fieldCtx.Type == serde.TypeString {
if fieldType == serde.TypeString {
return base64.StdEncoding.EncodeToString(ciphertext), nil
}
return ciphertext, nil
case schemaregistry.Read:
ciphertext := toBytes(fieldCtx.Type, fieldValue)
ciphertext := toBytes(fieldType, fieldValue)
if ciphertext == nil {
return fieldValue, nil
}
if fieldCtx.Type == serde.TypeString {
if fieldType == serde.TypeString {
var err error
ciphertext, err = base64.StdEncoding.DecodeString(string(ciphertext))
if err != nil {
Expand Down Expand Up @@ -589,7 +596,7 @@
if err != nil {
return nil, err
}
return toObject(fieldCtx.Type, plaintext), nil
return toObject(fieldType, plaintext), nil
default:
return nil, fmt.Errorf("unsupported rule mode %v", ctx.RuleMode)
}
Expand Down
2 changes: 1 addition & 1 deletion schemaregistry/rules/encryption/encrypt_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/confluentinc/confluent-kafka-go/v2/schemaregistry"
)

func TestFieldEncryptionExecutor_Configure(t *testing.T) {
func TestEncryptionExecutor_Configure(t *testing.T) {
maybeFail = initFailFunc(t)

executor := NewExecutor()
Expand Down
86 changes: 86 additions & 0 deletions schemaregistry/rules/encryption/field_encrypt_executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/**
* Copyright 2024 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package encryption

import (
"github.com/confluentinc/confluent-kafka-go/v2/schemaregistry"
"github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde"
)

// RegisterFieldExecutorWithClock registers the encryption rule executor with a given clock
func RegisterFieldExecutorWithClock(c Clock) *FieldEncryptionExecutor {
f := NewFieldExecutorWithClock(c)
serde.RegisterRuleExecutor(f)
return f
}

// NewFieldExecutor creates a new encryption rule executor
func NewFieldExecutor() serde.RuleExecutor {
c := clock{}
return NewFieldExecutorWithClock(&c)
}

// NewFieldExecutorWithClock creates a new encryption rule executor with a given clock
func NewFieldExecutorWithClock(c Clock) *FieldEncryptionExecutor {
a := &serde.AbstractFieldRuleExecutor{}
f := &FieldEncryptionExecutor{*a, *NewExecutorWithClock(c)}
f.FieldRuleExecutor = f
return f
}

// FieldEncryptionExecutor is a field encryption executor
type FieldEncryptionExecutor struct {
serde.AbstractFieldRuleExecutor
Executor Executor
}

// Configure configures the executor
func (f *FieldEncryptionExecutor) Configure(clientConfig *schemaregistry.Config, config map[string]string) error {
return f.Executor.Configure(clientConfig, config)
}

// Type returns the type of the executor
func (f *FieldEncryptionExecutor) Type() string {
return "ENCRYPT"
}

// NewTransform creates a new transform
func (f *FieldEncryptionExecutor) NewTransform(ctx serde.RuleContext) (serde.FieldTransform, error) {
executorTransform, err := f.Executor.NewTransform(ctx)
if err != nil {
return nil, err
}
transform := FieldEncryptionExecutorTransform{
ExecutorTransform: *executorTransform,
}
return &transform, nil
}

// Close closes the executor
func (f *FieldEncryptionExecutor) Close() error {
return f.Executor.Close()
}

// FieldEncryptionExecutorTransform is a field encryption executor transform
type FieldEncryptionExecutorTransform struct {
ExecutorTransform ExecutorTransform
}

// Transform transforms the field value using the rule
func (f *FieldEncryptionExecutorTransform) Transform(ctx serde.RuleContext, fieldCtx serde.FieldContext, fieldValue interface{}) (interface{}, error) {
return f.ExecutorTransform.Transform(ctx, fieldCtx.Type, fieldValue)
}
32 changes: 27 additions & 5 deletions schemaregistry/schemaregistry_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,18 @@
Disabled bool `json:"disabled,omitempty"`
}

// RulePhase represents the rule phase
type RulePhase = int

const (
// MigrationPhase denotes migration phase
MigrationPhase = 1
// DomainPhase denotes domain phase
DomainPhase = 2
// EncodingPhase denotes encoding phase
EncodingPhase = 3
)

// RuleMode represents the rule mode
type RuleMode = int

Expand Down Expand Up @@ -138,25 +150,35 @@
type RuleSet struct {
MigrationRules []Rule `json:"migrationRules,omitempty"`
DomainRules []Rule `json:"domainRules,omitempty"`
EncodingRules []Rule `json:"encodingRules,omitempty"`
}

// HasRules checks if the ruleset has rules for the given mode
func (r *RuleSet) HasRules(mode RuleMode) bool {
func (r *RuleSet) HasRules(phase RulePhase, mode RuleMode) bool {
var rules []Rule
switch phase {
case MigrationPhase:
rules = r.MigrationRules
case DomainPhase:
rules = r.DomainRules
case EncodingPhase:
rules = r.EncodingRules
}
switch mode {
case Upgrade, Downgrade:
return r.hasRules(r.MigrationRules, func(ruleMode RuleMode) bool {
return r.hasRules(rules, func(ruleMode RuleMode) bool {
return ruleMode == mode || ruleMode == UpDown
})
case UpDown:
return r.hasRules(r.MigrationRules, func(ruleMode RuleMode) bool {
return r.hasRules(rules, func(ruleMode RuleMode) bool {
return ruleMode == mode
})
case Write, Read:
return r.hasRules(r.DomainRules, func(ruleMode RuleMode) bool {
return r.hasRules(rules, func(ruleMode RuleMode) bool {
return ruleMode == mode || ruleMode == WriteRead
})
case WriteRead:
return r.hasRules(r.DomainRules, func(ruleMode RuleMode) bool {
return r.hasRules(rules, func(ruleMode RuleMode) bool {

Check warning on line 181 in schemaregistry/schemaregistry_client.go

View check run for this annotation

SonarQube-Confluent / confluent-kafka-go Sonarqube Results

schemaregistry/schemaregistry_client.go#L181

This branch's code block is the same as the block for the branch on line 173.
return ruleMode == mode
})
}
Expand Down
Loading