Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 192 additions & 0 deletions contrib/cmd/runkperf/commands/bench/read_update.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

package bench

import (
"context"
"fmt"
"sync"

internaltypes "github.com/Azure/kperf/contrib/internal/types"
"github.com/Azure/kperf/contrib/utils"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/klog/v2"

"github.com/urfave/cli"
)

var appLabel = "runkperf"

var benchReadUpdateCase = cli.Command{
Name: "read_update",
Usage: `
The test suite sets up a benchmark that simulates a mix of read, watch, and update operations on ConfigMaps.
It creates ConfigMaps, establishes watch connections, and then issues concurrent read and update requests based on a specified ratio to evaluate API server performance under combined load.
`,
Flags: append(
[]cli.Flag{
cli.IntFlag{
Name: "total",
Usage: "Total requests per runner (There are 10 runners totally and runner's rate is 10)",
Value: 3600,
},
cli.StringFlag{
Name: "read-update-namespace",
Usage: "Kubernetes namespace to use. If not specified, it will use the default namespace.",
Value: "default",
},
cli.IntFlag{
Name: "read-update-configmap-total",
Usage: "Total ConfigMaps need to create",
Value: 100,
},
cli.IntFlag{
Name: "read-update-configmap-size",
Usage: "Size of each ConfigMap. ConfigMap must not exceed 3 MiB.",
Value: 1024, // 1 MiB
},
cli.StringFlag{
Name: "read-update-name-pattern",
Usage: "Name pattern for the resources to create",
Value: "kperf-read-update",
},
cli.Float64Flag{
Name: "read-ratio",
Usage: "Proportion of read requests among all requests (range: 0.0 to 1.0). For example, 0.5 indicates 50% of the requests are reads.",
Value: 0.5,
},
},
commonFlags...,
),
Action: func(cliCtx *cli.Context) error {
_, err := renderBenchmarkReportInterceptor(
addAPIServerCoresInfoInterceptor(benchReadUpdateRun),
)(cliCtx)
return err
},
}

// benchReadUpdateRun is for subcommand benchReadUpdateRun.
func benchReadUpdateRun(cliCtx *cli.Context) (*internaltypes.BenchmarkReport, error) {
ctx := context.Background()
kubeCfgPath := cliCtx.GlobalString("kubeconfig")

// Load the load profile
rgCfgFile, rgSpec, rgCfgFileDone, err := newLoadProfileFromEmbed(cliCtx,
"loadprofile/read_update.yaml")

if err != nil {
return nil, err
}
defer func() { _ = rgCfgFileDone() }()

total := cliCtx.Int("read-update-configmap-total")
size := cliCtx.Int("read-update-configmap-size")
namespace := cliCtx.String("read-update-namespace")
namePattern := cliCtx.String("read-update-name-pattern")
if total <= 0 || size <= 0 || total*size > 2*1024*1024 || size > 1024 {
return nil, fmt.Errorf("invalid total (%d) or size (%d) for configmaps: total must be > 0, size must be > 0, and total*size must not exceed 2 GiB, size must not exceed 1 MiB", total, size)
}

// Create configmaps with specified name pattern
client, err := utils.BuildClientset(kubeCfgPath)
if err != nil {
return nil, fmt.Errorf("failed to build clientset: %w", err)
}

err = utils.CreateConfigmaps(ctx, kubeCfgPath, namespace, namePattern, total, size, 2, 0)

if err != nil {
return nil, fmt.Errorf("failed to create configmaps: %w", err)
}

defer func() {
// Delete the configmaps after the benchmark
err = utils.DeleteConfigmaps(ctx, kubeCfgPath, namespace, namePattern, 0)
if err != nil {
klog.Errorf("Failed to delete configmaps: %v", err)
}
}()

// Stop all the watches when the function returns
watches := make([]watch.Interface, 0)
defer func() {
stopWatches(watches)
}()

var wg sync.WaitGroup
defer wg.Wait()

dpCtx, dpCancel := context.WithCancel(ctx)
defer dpCancel()

// Start to watch the configmaps
for i := 0; i < total; i++ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this supposed to simulate cilium's use case? In Cilium's use case, each pod is watched by each cilium agent, so it creates multiple watch updates for one mutation of a pod.

wg.Add(1)
go func() {
defer wg.Done()
watchReq, err := client.CoreV1().ConfigMaps(namespace).
Watch(context.TODO(), metav1.ListOptions{
Watch: true,
FieldSelector: fmt.Sprintf("metadata.name=%s-cm-%s-%d", appLabel, namePattern, i),
})

watches = append(watches, watchReq)
if err != nil {
fmt.Printf("Error starting watch for configmap %s: %v\n", fmt.Sprintf("%s-cm-%s-%d", appLabel, namePattern, i), err)
return
}
klog.V(5).Infof("Starting watch for configmap: %s\n", fmt.Sprintf("%s-cm-%s-%d", appLabel, namePattern, i))

// Process watch events proactively to prevent cache oversizing.
for {
select {
case <-dpCtx.Done():
klog.V(5).Infof("Stopping watch for configmap: %s\n", fmt.Sprintf("%s-cm-%s-%d", appLabel, namePattern, i))
return
case event := <-watchReq.ResultChan():
if event.Type == watch.Error {
klog.Errorf("Error event received for configmap %s: %v", fmt.Sprintf("%s-cm-%s-%d", appLabel, namePattern, i), event.Object)
return
}
klog.V(5).Infof("Event received for configmap %s: %v", fmt.Sprintf("%s-cm-%s-%d", appLabel, namePattern, i), event.Type)
}
}

}()
}

// Deploy the runner group
rgResult, derr := utils.DeployRunnerGroup(ctx,
cliCtx.GlobalString("kubeconfig"),
cliCtx.GlobalString("runner-image"),
rgCfgFile,
cliCtx.GlobalString("runner-flowcontrol"),
cliCtx.GlobalString("rg-affinity"),
)

if derr != nil {
return nil, fmt.Errorf("failed to deploy runner group: %w", derr)
}

return &internaltypes.BenchmarkReport{
Description: fmt.Sprintf(`
Environment: Combine %d%% read requests and %d%% update requests during benchmarking. Workload: Deploy %d configmaps in %d KiB`,
int(100*cliCtx.Float64("read-ratio")), 100-int(100*cliCtx.Float64("read-ratio")), total, size*total),
LoadSpec: *rgSpec,
Result: *rgResult,
Info: map[string]interface{}{},
}, nil
}

// StopWatches stops all the watches
func stopWatches(watches []watch.Interface) {
for _, w := range watches {
if w != nil {
klog.V(5).Infof("Stopping watch: %v", w)
w.Stop()
}
}
}
1 change: 1 addition & 0 deletions contrib/cmd/runkperf/commands/bench/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ var Command = cli.Command{
benchListConfigmapsCase,
benchNode10Job1Pod1kCase,
benchNode100Job10Pod10kCase,
benchReadUpdateCase,
},
}

Expand Down
47 changes: 47 additions & 0 deletions contrib/cmd/runkperf/commands/bench/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,14 @@ func newLoadProfileFromEmbed(cliCtx *cli.Context, name string) (_name string, _s
spec.Profile.Spec.ContentType = types.ContentType(cliCtx.String("content-type"))
data, _ := yaml.Marshal(spec)

// Tweak the load profile for read-update case
if cliCtx.Command.Name == "read_update" {
err = tweakReadUpdateProfile(cliCtx, spec)
if err != nil {
return fmt.Errorf("failed to tweak read-update profile: %w", err)
}
}

log.GetLogger(context.TODO()).
WithKeyValues("level", "info").
LogKV("msg", "dump load profile", "config", string(data))
Expand All @@ -182,3 +190,42 @@ func newLoadProfileFromEmbed(cliCtx *cli.Context, name string) (_name string, _s
}
return rgCfgFile, &rgSpec, rgCfgFileDone, nil
}

func tweakReadUpdateProfile(cliCtx *cli.Context, spec *types.RunnerGroupSpec) error {
namePattern := cliCtx.String("read-update-name-pattern")
ratio := cliCtx.Float64("read-ratio")
if ratio <= 0 || ratio > 1 {
return fmt.Errorf("invalid read-ratio: %.2f, must be between 0 and 1", ratio)
}

namespace := cliCtx.String("read-update-namespace")
configmapTotal := cliCtx.Int("read-update-configmap-total")

if namePattern != "" || ratio != 0 || namespace != "" || configmapTotal > 0 {
for _, r := range spec.Profile.Spec.Requests {
if r.Patch != nil {
if namePattern != "" {
r.Patch.Name = fmt.Sprintf("runkperf-cm-%s", namePattern)
}
if ratio != 0 {
r.Shares = 100 - int(ratio*100)
}
if namespace != "" {
r.Patch.Namespace = namespace
}
if configmapTotal > 0 {
r.Patch.KeySpaceSize = configmapTotal
}
}
if r.StaleList != nil {
if ratio != 0 {
r.Shares = int(ratio * 100)
}
if namespace != "" {
r.StaleList.Namespace = namespace
}
}
}
}
return nil
}
34 changes: 34 additions & 0 deletions contrib/internal/manifests/loadprofile/read_update.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
count: 10
loadProfile:
version: 1
description: "read_update"
spec:
rate: 10
total: 3000
conns: 10
client: 10
contentType: json
disableHTTP2: false
maxRetries: 0
requests:
- staleList:
version: v1
resource: configmaps
namespace: default
shares: 50
- patch:
version: v1
resource: configmaps
namespace: default
patchType: merge
name: runkperf-cm-kperf-read-update
keySpaceSize: 100
body: |
{
"metadata": {
"labels": {
"test-label": "mutation-test"
}
}
}
shares: 50
Loading