Skip to content
Open
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 45 additions & 39 deletions contrib/cmd/runkperf/commands/bench/read_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"sync"
"time"

internaltypes "github.com/Azure/kperf/contrib/internal/types"
"github.com/Azure/kperf/contrib/utils"
Expand Down Expand Up @@ -44,8 +45,8 @@ It creates ConfigMaps, establishes watch connections, and then issues concurrent
},
cli.IntFlag{
Name: "read-update-configmap-size",
Usage: "Size of each ConfigMap. ConfigMap must not exceed 3 MiB.",
Value: 1024, // 1 MiB
Usage: "Size of each ConfigMap, unit: Byte. ConfigMap must not exceed 1 MiB.",
Value: 1024, // 1 KiB
},
cli.StringFlag{
Name: "read-update-name-pattern",
Expand Down Expand Up @@ -86,7 +87,7 @@ func benchReadUpdateRun(cliCtx *cli.Context) (*internaltypes.BenchmarkReport, er
size := cliCtx.Int("read-update-configmap-size")
namespace := cliCtx.String("read-update-namespace")
namePattern := cliCtx.String("read-update-name-pattern")
if total <= 0 || size <= 0 || total*size > 2*1024*1024 || size > 1024 {
if total <= 0 || size <= 0 || total*size > 2*1024*1024*1024 || size > 1024*1024 {
return nil, fmt.Errorf("invalid total (%d) or size (%d) for configmaps: total must be > 0, size must be > 0, and total*size must not exceed 2 GiB, size must not exceed 1 MiB", total, size)
}

Expand All @@ -96,7 +97,7 @@ func benchReadUpdateRun(cliCtx *cli.Context) (*internaltypes.BenchmarkReport, er
return nil, fmt.Errorf("failed to build clientset: %w", err)
}

err = utils.CreateConfigmaps(ctx, kubeCfgPath, namespace, namePattern, total, size, 2, 0)
err = utils.CreateConfigmaps(ctx, kubeCfgPath, namespace, namePattern, total, size, 30, 0)

if err != nil {
return nil, fmt.Errorf("failed to create configmaps: %w", err)
Expand All @@ -111,11 +112,6 @@ func benchReadUpdateRun(cliCtx *cli.Context) (*internaltypes.BenchmarkReport, er
}()

// Stop all the watches when the function returns
watches := make([]watch.Interface, 0)
defer func() {
stopWatches(watches)
}()

var wg sync.WaitGroup
defer wg.Wait()

Expand All @@ -125,37 +121,57 @@ func benchReadUpdateRun(cliCtx *cli.Context) (*internaltypes.BenchmarkReport, er
// Start to watch the configmaps
for i := 0; i < total; i++ {
wg.Add(1)
go func() {
go func(ii int) {
defer wg.Done()
watchReq, err := client.CoreV1().ConfigMaps(namespace).
Watch(context.TODO(), metav1.ListOptions{
Watch: true,
FieldSelector: fmt.Sprintf("metadata.name=%s-cm-%s-%d", appLabel, namePattern, i),
})

watches = append(watches, watchReq)
if err != nil {
fmt.Printf("Error starting watch for configmap %s: %v\n", fmt.Sprintf("%s-cm-%s-%d", appLabel, namePattern, i), err)
return
}
klog.V(5).Infof("Starting watch for configmap: %s\n", fmt.Sprintf("%s-cm-%s-%d", appLabel, namePattern, i))
timeoutSeconds := int64(100000)

var watchReq watch.Interface
var err error
defer func() {
if watchReq != nil {
watchReq.Stop()
}
}()

// Process watch events proactively to prevent cache oversizing.
for {
if watchReq == nil {
watchReq, err = client.CoreV1().ConfigMaps(namespace).
Watch(context.TODO(), metav1.ListOptions{
Watch: true,
TimeoutSeconds: &timeoutSeconds,
FieldSelector: fmt.Sprintf("metadata.name=%s-cm-%s-%d", appLabel, namePattern, ii),
})

if err != nil {
fmt.Printf("Error starting watch for configmap %s: %v\n", fmt.Sprintf("%s-cm-%s-%d", appLabel, namePattern, ii), err)
time.Sleep(5 * time.Second)
continue
}
klog.V(5).Infof("Starting watch for configmap: %s\n", fmt.Sprintf("%s-cm-%s-%d", appLabel, namePattern, ii))
}

// Process watch events proactively to prevent cache oversizing.
select {
case <-dpCtx.Done():
klog.V(5).Infof("Stopping watch for configmap: %s\n", fmt.Sprintf("%s-cm-%s-%d", appLabel, namePattern, i))
klog.V(5).Infof("Stopping watch for configmap: %s\n", fmt.Sprintf("%s-cm-%s-%d", appLabel, namePattern, ii))
return
case event := <-watchReq.ResultChan():
case event, ok := <-watchReq.ResultChan():
if !ok {
klog.V(2).Infof("Watch channel closed for configmap: %s", fmt.Sprintf("%s-cm-%s-%d", appLabel, namePattern, ii))
watchReq.Stop()
watchReq = nil
}
if event.Type == watch.Error {
klog.Errorf("Error event received for configmap %s: %v", fmt.Sprintf("%s-cm-%s-%d", appLabel, namePattern, i), event.Object)
return
klog.Errorf("Error event received for configmap %s: %v", fmt.Sprintf("%s-cm-%s-%d", appLabel, namePattern, ii), event.Object)
watchReq.Stop()
watchReq = nil
}
klog.V(5).Infof("Event received for configmap %s: %v", fmt.Sprintf("%s-cm-%s-%d", appLabel, namePattern, i), event.Type)
klog.V(5).Infof("Event received for configmap %s: %v", fmt.Sprintf("%s-cm-%s-%d", appLabel, namePattern, ii), event.Type)
case <-time.After(5 * time.Second):
}
}

}()
}(i)
}

// Deploy the runner group
Expand All @@ -180,13 +196,3 @@ Environment: Combine %d%% read requests and %d%% update requests during benchmar
Info: map[string]interface{}{},
}, nil
}

// StopWatches stops all the watches
func stopWatches(watches []watch.Interface) {
for _, w := range watches {
if w != nil {
klog.V(5).Infof("Stopping watch: %v", w)
w.Stop()
}
}
}
Loading