Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions api/types/load_traffic.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ type WeightedRequest struct {
QuorumGet *RequestGet `json:"quorumGet,omitempty" yaml:"quorumGet,omitempty"`
// Put means this is mutating request.
Put *RequestPut `json:"put,omitempty" yaml:"put,omitempty"`
// Patch means this is mutating request to update resource.
Patch *RequestPatch `json:"patch,omitempty" yaml:"patch,omitempty"`
// GetPodLog means this is to get log from target pod.
GetPodLog *RequestGetPodLog `json:"getPodLog,omitempty" yaml:"getPodLog,omitempty"`
}
Expand Down Expand Up @@ -146,6 +148,19 @@ type RequestPut struct {
ValueSize int `json:"valueSize" yaml:"valueSize"`
}

// RequestPatch defines PATCH request for target resource type.
type RequestPatch struct {
KubeGroupVersionResource `yaml:",inline"`
// Namespace is object's namespace.
Namespace string `json:"namespace" yaml:"namespace"`
// Name is object's prefix name.
Name string `json:"name" yaml:"name"`
// PatchType is the type of patch, e.g. "json", "merge", "strategic-merge".
PatchType string `json:"patchType" yaml:"patchType"`
// Body is the request body, for fields to be changed.
Body string `json:"body" yaml:"body"`
}

// RequestGetPodLog defines GetLog request for target pod.
type RequestGetPodLog struct {
// Namespace is pod's namespace.
Expand Down Expand Up @@ -221,6 +236,8 @@ func (r WeightedRequest) Validate() error {
return r.QuorumGet.Validate()
case r.Put != nil:
return r.Put.Validate()
case r.Patch != nil:
return r.Patch.Validate()
case r.GetPodLog != nil:
return r.GetPodLog.Validate()
default:
Expand Down Expand Up @@ -304,3 +321,20 @@ func (m *KubeGroupVersionResource) Validate() error {
}
return nil
}

// Validate validates RequestPatch type.
func (r *RequestPatch) Validate() error {
if err := r.KubeGroupVersionResource.Validate(); err != nil {
return fmt.Errorf("kube metadata: %v", err)
}
if r.Namespace == "" {
return fmt.Errorf("namespace is required")
}
if r.Name == "" {
return fmt.Errorf("name is required")
}
if r.Body == "" {
return fmt.Errorf("body is required")
}
return nil
}
84 changes: 84 additions & 0 deletions request/random.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@ package request
import (
"context"
"crypto/rand"
"encoding/json"
"fmt"
"math/big"
"strings"
"sync"

"github.com/Azure/kperf/api/types"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
apitypes "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
)
Expand Down Expand Up @@ -43,6 +46,7 @@ func NewWeightedRandomRequests(spec *types.LoadProfileSpec) (*WeightedRandomRequ
shares = append(shares, r.Shares)

var builder RESTRequestBuilder
var err error
switch {
case r.StaleList != nil:
builder = newRequestListBuilder(r.StaleList, "0", spec.MaxRetries)
Expand All @@ -56,6 +60,11 @@ func NewWeightedRandomRequests(spec *types.LoadProfileSpec) (*WeightedRandomRequ
builder = newRequestGetBuilder(r.QuorumGet, "", spec.MaxRetries)
case r.GetPodLog != nil:
builder = newRequestGetPodLogBuilder(r.GetPodLog, spec.MaxRetries)
case r.Patch != nil:
builder, err = newRequestPatchBuilder(r.Patch, "", spec.MaxRetries)
if err != nil {
return nil, fmt.Errorf("failed to create patch request builder: %v", err)
}
default:
return nil, fmt.Errorf("not implement for PUT yet")
}
Expand Down Expand Up @@ -354,6 +363,81 @@ func (b *requestGetPodLogBuilder) Build(cli rest.Interface) Requester {
}
}

type requestPatchBuilder struct {
version schema.GroupVersion
resource string
resourceVersion string
namespace string
name string
patchType apitypes.PatchType
body interface{}
maxRetries int
}

var patchTypes = map[string]apitypes.PatchType{
// json: Array of operations [{"op": "replace", "path": "/spec/replicas", "value": 3}]
"json": apitypes.JSONPatchType,
// merge: Simple object merge {"spec": {"replicas": 3}}
"merge": apitypes.MergePatchType,
// strategic-merge: Smart merge for Kubernetes resources that preserves arrays
"strategic-merge": apitypes.StrategicMergePatchType,
}

func newRequestPatchBuilder(src *types.RequestPatch, resourceVersion string, maxRetries int) (*requestPatchBuilder, error) {

var body interface{}

trimmed := strings.TrimSpace(src.Body)
// validate that the patch body contains valid json
if !json.Valid([]byte(trimmed)) {
return nil, fmt.Errorf("invalid JSON in patch body: %q", src.Body)
}
body = []byte(trimmed)
// validate patch type
patchType, ok := patchTypes[src.PatchType]
if !ok {
return nil, fmt.Errorf("unknown patch type: %s", src.PatchType)
}

return &requestPatchBuilder{
version: schema.GroupVersion{
Group: src.Group,
Version: src.Version,
},
resource: src.Resource,
resourceVersion: resourceVersion,
namespace: src.Namespace,
name: src.Name,
patchType: patchType,
body: body,
maxRetries: maxRetries,
}, nil
}

// Build implements RequestBuilder.Build.
func (b *requestPatchBuilder) Build(cli rest.Interface) Requester {
// https://kubernetes.io/docs/reference/using-api/#api-groups
comps := make([]string, 0, 5)
if b.version.Group == "" {
comps = append(comps, "api", b.version.Version)
} else {
comps = append(comps, "apis", b.version.Group, b.version.Version)
}
if b.namespace != "" {
comps = append(comps, "namespaces", b.namespace)
}
comps = append(comps, b.resource, b.name)

return &DiscardRequester{
BaseRequester: BaseRequester{
method: "PATCH",
req: cli.Patch(b.patchType).AbsPath(comps...).
Body(b.body).
MaxRetries(b.maxRetries),
},
}
}

func toPtr[T any](v T) *T {
return &v
}
Loading