Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
290 changes: 290 additions & 0 deletions contrib/cmd/runkperf/commands/bench/cilium_cr_list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,290 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

package bench

import (
"context"
"fmt"
"sync/atomic"
"time"

internaltypes "github.com/Azure/kperf/contrib/internal/types"
"github.com/Azure/kperf/contrib/internal/utils"
"github.com/google/uuid"
"golang.org/x/sync/errgroup"
"k8s.io/klog/v2"

"github.com/urfave/cli"
)

const (
numCRApplyWorkers = 50
maxNumCRApplyAttempts = 5
kubectlApplyTimeout = 30 * time.Second
progressReportInterval = 10 * time.Second
installCiliumCRDsFlag = "install-cilium-crds"
numCEPFlag = "num-cilium-endpoints"
numCIDFlag = "num-cilium-identities"
)

var benchCiliumCustomResourceListCase = cli.Command{
Name: "cilium_cr_list",
Usage: `

Simulate workload with stale list requests for Cilium custom resources.

This benchmark MUST be run in a cluster *without* Cilium installed, so Cilium doesn't
delete or modify the synthetic CiliumEndpoint and CiliumIdentity resources created in this test.
`,
Flags: append(
[]cli.Flag{
cli.BoolTFlag{
Name: installCiliumCRDsFlag,
Usage: "Install Cilium CRDs if they don't already exist (default: true)",
},
cli.IntFlag{
Name: numCIDFlag,
Usage: "Number of CiliumIdentities to generate (default: 1000)",
Value: 1000,
},
cli.IntFlag{
Name: numCEPFlag,
Usage: "Number of CiliumEndpoints to generate (default: 1000)",
Value: 1000,
},
},
commonFlags...,
),
Action: func(cliCtx *cli.Context) error {
_, err := renderBenchmarkReportInterceptor(ciliumCustomResourceListRun)(cliCtx)
return err
},
}

// ciliumCustomResourceListRun runs a benchmark that:
// (1) creates many Cilium custom resources (CiliumIdentity and CiliumEndpoint).
// (2) executes stale list requests against those resources.
// This simulates a "worst case" scenario in which Cilium performs many expensive list requests.
func ciliumCustomResourceListRun(cliCtx *cli.Context) (*internaltypes.BenchmarkReport, error) {
ctx := context.Background()

rgCfgFile, rgSpec, rgCfgFileDone, err := newLoadProfileFromEmbed(cliCtx, "loadprofile/cilium_cr_list.yaml")
if err != nil {
return nil, err
}
defer func() { _ = rgCfgFileDone() }()

kubeCfgPath := cliCtx.GlobalString("kubeconfig")
kr := utils.NewKubectlRunner(kubeCfgPath, "")

if cliCtx.BoolT(installCiliumCRDsFlag) {
if err := installCiliumCRDs(ctx, kr); err != nil {
return nil, fmt.Errorf("failed to install Cilium CRDs: %w", err)
}
}

numCID := cliCtx.Int(numCIDFlag)
numCEP := cliCtx.Int(numCEPFlag)
if err := loadCiliumData(ctx, kr, numCID, numCEP); err != nil {
return nil, fmt.Errorf("failed to load Cilium data: %w", err)
}

rgResult, err := utils.DeployRunnerGroup(ctx,
cliCtx.GlobalString("kubeconfig"),
cliCtx.GlobalString("runner-image"),
rgCfgFile,
cliCtx.GlobalString("runner-flowcontrol"),
cliCtx.GlobalString("rg-affinity"),
)
if err != nil {
return nil, fmt.Errorf("failed to deploy runner group: %w", err)
}

return &internaltypes.BenchmarkReport{
Description: fmt.Sprintf(`Deploy %d CiliumIdentities and %d CiliumEndpoints, then run stale list requests against them`, numCID, numCEP),
LoadSpec: *rgSpec,
Result: *rgResult,
Info: map[string]interface{}{
"numCiliumIdentities": numCID,
"numCiliumEndpoints": numCEP,
},
}, nil
}

var ciliumCRDs = []string{
"https://raw.githubusercontent.com/cilium/cilium/refs/tags/v1.16.6/pkg/k8s/apis/cilium.io/client/crds/v2/ciliumendpoints.yaml",
"https://raw.githubusercontent.com/cilium/cilium/refs/tags/v1.16.6/pkg/k8s/apis/cilium.io/client/crds/v2/ciliumidentities.yaml",
}

func installCiliumCRDs(ctx context.Context, kr *utils.KubectlRunner) error {
klog.V(0).Info("Installing Cilium CRDs...")
for _, crdURL := range ciliumCRDs {
err := kr.Apply(ctx, kubectlApplyTimeout, crdURL)
if err != nil {
return fmt.Errorf("failed to apply CRD %s: %v", crdURL, err)
}
}
return nil
}

func loadCiliumData(ctx context.Context, kr *utils.KubectlRunner, numCID int, numCEP int) error {
totalNumResources := numCID + numCEP
klog.V(0).Infof("Loading Cilium data (%d CiliumIdentities and %d CiliumEndpoints)...", numCID, numCEP)

// Parallelize kubectl apply to speed it up. Achieves ~80 inserts/sec.
taskChan := make(chan string, numCRApplyWorkers*2)
var appliedCount atomic.Uint64
g, ctx := errgroup.WithContext(ctx)
for i := 0; i < numCRApplyWorkers; i++ {
g.Go(func() error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case ciliumResourceData, ok := <-taskChan:
if !ok {
return nil // taskChan closed
}
var err error
for i := 0; i < maxNumCRApplyAttempts; i++ {
err = kr.ServerSideApplyWithData(ctx, kubectlApplyTimeout, ciliumResourceData)
if err == nil {
appliedCount.Add(1)
break
} else if i < maxNumCRApplyAttempts-1 {
klog.Warningf("Failed to apply cilium resource data, will retry: %s", err)
}
}
if err != nil { // last retry failed, so give up.
return fmt.Errorf("failed to apply cilium resource data: %w", err)
}
}
}
})
}

// Report progress periodically.
reporterDoneChan := make(chan struct{})
g.Go(func() error {
timer := time.NewTicker(progressReportInterval)
defer timer.Stop()
for {
select {
case <-reporterDoneChan:
return nil
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
c := appliedCount.Load()
percent := int(float64(c) / float64(totalNumResources) * 100)
klog.V(0).Infof("Applied %d/%d cilium resources (%d%%)", c, totalNumResources, percent)
}
}
})

// Generate CiliumIdentity and CiliumEndpoint CRs to be applied by the worker goroutines.
g.Go(func() error {
for i := 0; i < numCID; i++ {
select {
case <-ctx.Done():
return ctx.Err()
case taskChan <- generateCiliumIdentity():
}
}

for i := 0; i < numCEP; i++ {
select {
case <-ctx.Done():
return ctx.Err()
case taskChan <- generateCiliumEndpoint():
}
}

close(taskChan) // signal to consumer goroutines that we're done.
close(reporterDoneChan)
return nil
})

if err := g.Wait(); err != nil {
return err
}

klog.V(0).Infof("Loaded %d CiliumIdentities and %d CiliumEndpoints\n", numCID, numCEP)

return nil
}

func generateCiliumIdentity() string {
identityName := uuid.New().String()
return fmt.Sprintf(`
apiVersion: cilium.io/v2
kind: CiliumIdentity
metadata:
name: "%s"
security-labels:
k8s:io.cilium.k8s.namespace.labels.control-plane: "true"
k8s:io.cilium.k8s.namespace.labels.kubernetes.azure.com/managedby: aks
k8s:io.cilium.k8s.namespace.labels.kubernetes.io/cluster-service: "true"
k8s:io.cilium.k8s.namespace.labels.kubernetes.io/metadata.name: kube-system
k8s:io.cilium.k8s.policy.cluster: default
k8s:io.cilium.k8s.policy.serviceaccount: coredns
k8s:io.kubernetes.pod.namespace: kube-system
k8s:k8s-app: kube-dns
k8s:kubernetes.azure.com/managedby: aks
k8s:version: v20`, identityName)
}

func generateCiliumEndpoint() string {
cepName := uuid.New().String()
return fmt.Sprintf(`
apiVersion: cilium.io/v2
kind: CiliumEndpoint
metadata:
name: "%s"
status:
encryption: {}
external-identifiers:
container-id: 790d85075c394a8384f8b1a0fec62e2316c2556d175dab0c1fe676e5a6d92f33
k8s-namespace: kube-system
k8s-pod-name: coredns-54b69f46b8-dbcdl
pod-name: kube-system/coredns-54b69f46b8-dbcdl
id: 1453
identity:
id: 0000001
labels:
- k8s:io.cilium.k8s.namespace.labels.control-plane=true
- k8s:io.cilium.k8s.namespace.labels.kubernetes.azure.com/managedby=aks
- k8s:io.cilium.k8s.namespace.labels.kubernetes.io/cluster-service=true
- k8s:io.cilium.k8s.namespace.labels.kubernetes.io/metadata.name=kube-system
- k8s:io.cilium.k8s.policy.cluster=default
- k8s:io.cilium.k8s.policy.serviceaccount=coredns
- k8s:io.kubernetes.pod.namespace=kube-system
- k8s:k8s-app=kube-dns
- k8s:kubernetes.azure.com/managedby=aks
- k8s:version=v20
named-ports:
- name: dns
port: 53
protocol: UDP
- name: dns-tcp
port: 53
protocol: TCP
- name: metrics
port: 9153
protocol: TCP
networking:
addressing:
- ipv4: 10.244.1.38
node: 10.224.0.4
policy:
egress:
enforcing: false
state: <status disabled>
ingress:
enforcing: false
state: <status disabled>
state: ready
visibility-policy-status: <status disabled>
`, cepName)
}
1 change: 1 addition & 0 deletions contrib/cmd/runkperf/commands/bench/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ var Command = cli.Command{
benchNode10Job1Pod100Case,
benchNode100Job1Pod3KCase,
benchNode100DeploymentNPod10KCase,
benchCiliumCustomResourceListCase,
},
}

Expand Down
37 changes: 37 additions & 0 deletions contrib/internal/manifests/loadprofile/cilium_cr_list.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# count defines how many runners in the group.
count: 10

# This simulates worst-case behavior for Cilium:
# every agent hammering apiserver with stale LIST requests for CiliumIdentity and CiliumEndpoint.
# The request rate is much, much higher than what we'd expect to see in production with
# client-go exponential backoff configured, so if apiserver can survive this onslaught it can
# survive anything Cilium throws at it.
loadProfile:
version: 1
description: cilium list profile
spec:
rate: 20 # 20 req/sec * 10 runners = 200 req/sec
total: 12000 # run for ~10 minutes, 600 seconds * 20/sec = 12000
# 5k node cluster, one cilium agent per node
# divided by the number of runners.
conns: 500
client: 500

contentType: json
disableHTTP2: false

# 50/50 mix of ciliumidentity and ciliumendpoint queries.
# We're simulating with CilumEndpointSlice disabled here, on the assumption that CES will always
# have lower count than CEP, so if we can survive with CEP only then we're in good shape.
requests:
- staleList:
group: cilium.io
version: v2
resource: ciliumidentities
shares: 1000 # Has 50% chance = 1000 / (1000 + 1000)
- staleList:
group: cilium.io
version: v2
resource: ciliumendpoints
namespace: "default"
shares: 1000 # Has 50% chance = 1000 / (1000 + 1000)
15 changes: 15 additions & 0 deletions contrib/internal/utils/kubectl_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,21 @@ func (kr *KubectlRunner) Apply(ctx context.Context, timeout time.Duration, fileP
return err
}

// ServerSideApplyWithData runs kubectl apply with --server-side=true, with input data piped through stdin.
func (kr *KubectlRunner) ServerSideApplyWithData(ctx context.Context, timeout time.Duration, data string) error {
args := []string{}
if kr.kubeCfgPath != "" {
args = append(args, "--kubeconfig", kr.kubeCfgPath)
}
if kr.namespace != "" {
args = append(args, "-n", kr.namespace)
}
args = append(args, "apply", "--server-side=true", "--validate=ignore", "-f", "-")

_, err := runCommandWithInput(ctx, timeout, "kubectl", args, data)
return err
}

// Delete runs delete subcommand.
func (kr *KubectlRunner) Delete(ctx context.Context, timeout time.Duration, filePath string) error {
args := []string{}
Expand Down
21 changes: 21 additions & 0 deletions contrib/internal/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,27 @@ func runCommand(ctx context.Context, timeout time.Duration, cmd string, args []s
return output, nil
}

// runCommandWithInput executes a command with `input` piped through stdin.
func runCommandWithInput(ctx context.Context, timeout time.Duration, cmd string, args []string, input string) ([]byte, error) {
var cancel context.CancelFunc
if timeout != 0 {
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}

c := exec.CommandContext(ctx, cmd, args...)
c.SysProcAttr = &syscall.SysProcAttr{Pdeathsig: syscall.SIGKILL}
c.Stdin = strings.NewReader(input)

klog.V(2).Infof("[CMD] %s", c.String())
output, err := c.CombinedOutput()
if err != nil {
return nil, fmt.Errorf("failed to invoke %s:\n (output: %s): %w",
c.String(), strings.TrimSpace(string(output)), err)
}
return output, nil
}

// CreateTempFileWithContent creates temporary file with data.
func CreateTempFileWithContent(data []byte) (_name string, _cleanup func() error, retErr error) {
f, err := os.CreateTemp("", "temp*")
Expand Down
Loading
Loading