Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions contrib/cmd/runkperf/commands/bench/cilium_cr_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (
"time"

internaltypes "github.com/Azure/kperf/contrib/internal/types"
"github.com/Azure/kperf/contrib/internal/utils"
"github.com/Azure/kperf/contrib/log"
"github.com/Azure/kperf/contrib/utils"
"github.com/google/uuid"
"golang.org/x/sync/errgroup"
"k8s.io/klog/v2"

"github.com/urfave/cli"
)
Expand Down Expand Up @@ -118,7 +118,7 @@ var ciliumCRDs = []string{
}

func installCiliumCRDs(ctx context.Context, kr *utils.KubectlRunner) error {
klog.V(0).Info("Installing Cilium CRDs...")
log.GetLogger(ctx).WithKeyValues("level", "info").LogKV("msg", "Installing Cilium CRDs...")
for _, crdURL := range ciliumCRDs {
err := kr.Apply(ctx, kubectlApplyTimeout, crdURL)
if err != nil {
Expand All @@ -130,7 +130,8 @@ func installCiliumCRDs(ctx context.Context, kr *utils.KubectlRunner) error {

func loadCiliumData(ctx context.Context, kr *utils.KubectlRunner, numCID int, numCEP int) error {
totalNumResources := numCID + numCEP
klog.V(0).Infof("Loading Cilium data (%d CiliumIdentities and %d CiliumEndpoints)...", numCID, numCEP)
log.GetLogger(ctx).WithKeyValues("level", "info").LogKV("msg", "Loading Cilium data",
"CiliumIdentities", numCID, "CiliumEndpoints", numCEP)

// Parallelize kubectl apply to speed it up. Achieves ~80 inserts/sec.
taskChan := make(chan string, numCRApplyWorkers*2)
Expand All @@ -153,7 +154,7 @@ func loadCiliumData(ctx context.Context, kr *utils.KubectlRunner, numCID int, nu
appliedCount.Add(1)
break
} else if i < maxNumCRApplyAttempts-1 {
klog.Warningf("Failed to apply cilium resource data, will retry: %s", err)
log.GetLogger(ctx).WithKeyValues("level", "warn").LogKV("msg", "failed to apply cilium resource data, will retry", "error", err)
}
}
if err != nil { // last retry failed, so give up.
Expand All @@ -178,7 +179,7 @@ func loadCiliumData(ctx context.Context, kr *utils.KubectlRunner, numCID int, nu
case <-timer.C:
c := appliedCount.Load()
percent := int(float64(c) / float64(totalNumResources) * 100)
klog.V(0).Infof("Applied %d/%d cilium resources (%d%%)", c, totalNumResources, percent)
log.GetLogger(ctx).WithKeyValues("level", "info").LogKV("msg", fmt.Sprintf("applied %d/%d cilium resources (%d%%)", c, totalNumResources, percent))
}
}
})
Expand Down Expand Up @@ -210,7 +211,7 @@ func loadCiliumData(ctx context.Context, kr *utils.KubectlRunner, numCID int, nu
return err
}

klog.V(0).Infof("Loaded %d CiliumIdentities and %d CiliumEndpoints\n", numCID, numCEP)
log.GetLogger(ctx).WithKeyValues("level", "info").LogKV("msg", fmt.Sprintf("loaded %d CiliumIdentities and %d CiliumEndpoints\n", numCID, numCEP))

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion contrib/cmd/runkperf/commands/bench/node100_job1_pod3k.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"time"

internaltypes "github.com/Azure/kperf/contrib/internal/types"
"github.com/Azure/kperf/contrib/internal/utils"
"github.com/Azure/kperf/contrib/utils"

"github.com/urfave/cli"
)
Expand Down
20 changes: 14 additions & 6 deletions contrib/cmd/runkperf/commands/bench/node100_pod10k.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ import (
"time"

internaltypes "github.com/Azure/kperf/contrib/internal/types"
"github.com/Azure/kperf/contrib/internal/utils"
"github.com/Azure/kperf/contrib/log"
"github.com/Azure/kperf/contrib/utils"

"github.com/urfave/cli"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
)

var benchNode100DeploymentNPod10KCase = cli.Command{
Expand Down Expand Up @@ -90,7 +90,10 @@ func benchNode100DeploymentNPod10KRun(cliCtx *cli.Context) (*internaltypes.Bench
wg.Add(1)

restartInterval := cliCtx.Duration("interval")
klog.V(0).Infof("The interval is %v for restaring deployments", restartInterval)

log.GetLogger(dpCtx).
WithKeyValues("level", "info").
LogKV("msg", fmt.Sprintf("the interval is %v for restaring deployments", restartInterval))

paddingBytes := cliCtx.Int("padding-bytes")
total := cliCtx.Int("deployments")
Expand Down Expand Up @@ -160,7 +163,9 @@ Workload: Deploy %d deployments with %d pods. Rolling-update deployments one by

// dumpDeploymentReplicas dumps deployment's replica.
func dumpDeploymentReplicas(ctx context.Context, kubeCfgPath string, namePattern string, total int) error {
klog.V(0).Info("Dump deployment's replica information")
infoLogger := log.GetLogger(ctx).WithKeyValues("level", "info")

infoLogger.LogKV("msg", "dump deployment's replica information")

cli, err := utils.BuildClientset(kubeCfgPath)
if err != nil {
Expand All @@ -177,7 +182,8 @@ func dumpDeploymentReplicas(ctx context.Context, kubeCfgPath string, namePattern
name, ns, err)
}

klog.V(0).InfoS("Deployment", "name", name, "ns", ns,
infoLogger.LogKV("msg", "dump Deployment status",
"name", name, "ns", ns,
"replica", *dp.Spec.Replicas, "readyReplicas", dp.Status.ReadyReplicas)
}
return nil
Expand All @@ -188,7 +194,9 @@ func getDeploymentPodSize(ctx context.Context, kubeCfgPath string, namePattern s
ns := fmt.Sprintf("%s-0", namePattern)
labelSelector := fmt.Sprintf("app=%s", namePattern)

klog.V(0).InfoS("Get the size of pod", "labelSelector", labelSelector, "namespace", ns)
log.GetLogger(ctx).
WithKeyValues("level", "info").
LogKV("msg", "get the size of pod", "labelSelector", labelSelector, "namespace", ns)

cli, err := utils.BuildClientset(kubeCfgPath)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion contrib/cmd/runkperf/commands/bench/node10_job1_pod100.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"time"

internaltypes "github.com/Azure/kperf/contrib/internal/types"
"github.com/Azure/kperf/contrib/internal/utils"
"github.com/Azure/kperf/contrib/utils"

"github.com/urfave/cli"
)
Expand Down
33 changes: 24 additions & 9 deletions contrib/cmd/runkperf/commands/bench/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ import (
"github.com/Azure/kperf/api/types"
kperfcmdutils "github.com/Azure/kperf/cmd/kperf/commands/utils"
internaltypes "github.com/Azure/kperf/contrib/internal/types"
"github.com/Azure/kperf/contrib/internal/utils"
"github.com/Azure/kperf/contrib/log"
"github.com/Azure/kperf/contrib/utils"

"github.com/urfave/cli"
"gopkg.in/yaml.v2"
"k8s.io/klog/v2"
)

// subcmdActionFunc is to unify each subcommand's interface. They should return
Expand All @@ -32,7 +32,9 @@ func addAPIServerCoresInfoInterceptor(handler subcmdActionFunc) subcmdActionFunc

beforeCores, ferr := utils.FetchAPIServerCores(ctx, kubeCfgPath)
if ferr != nil {
klog.ErrorS(ferr, "failed to fetch apiserver cores")
log.GetLogger(ctx).
WithKeyValues("level", "warn").
LogKV("msg", "failed to fetch apiserver cores", "error", ferr)
}

report, err := handler(cliCtx)
Expand All @@ -42,7 +44,9 @@ func addAPIServerCoresInfoInterceptor(handler subcmdActionFunc) subcmdActionFunc

afterCores, ferr := utils.FetchAPIServerCores(ctx, kubeCfgPath)
if ferr != nil {
klog.ErrorS(ferr, "failed to fetch apiserver cores")
log.GetLogger(ctx).
WithKeyValues("level", "warn").
LogKV("msg", "failed to fetch apiserver cores", "error", ferr)
}

report.Info["apiserver"] = map[string]interface{}{
Expand Down Expand Up @@ -94,7 +98,9 @@ func renderBenchmarkReportInterceptor(handler subcmdActionFunc) subcmdActionFunc

// deployVirtualNodepool deploys virtual nodepool.
func deployVirtualNodepool(ctx context.Context, cliCtx *cli.Context, target string, nodes, cpu, memory, maxPods int) (func() error, error) {
klog.V(0).InfoS("Deploying virtual nodepool", "name", target)
log.GetLogger(ctx).
WithKeyValues("level", "info").
LogKV("msg", "deploying virtual nodepool", "name", target)

kubeCfgPath := cliCtx.GlobalString("kubeconfig")
virtualNodeAffinity := cliCtx.GlobalString("vc-affinity")
Expand All @@ -112,9 +118,13 @@ func deployVirtualNodepool(ctx context.Context, cliCtx *cli.Context, target stri
}
}

klog.V(0).InfoS("Trying to delete nodepool if necessary", "name", target)
log.GetLogger(ctx).
WithKeyValues("level", "info").
LogKV("msg", "trying to delete nodepool if necessary", "name", target)
if err = kr.DeleteNodepool(ctx, 0, target); err != nil {
klog.V(0).ErrorS(err, "failed to delete nodepool", "name", target)
log.GetLogger(ctx).
WithKeyValues("level", "warn").
LogKV("msg", "failed to delete nodepool", "name", target, "error", err)
}

err = kr.NewNodepool(ctx, 0, target, nodes, cpu, memory, maxPods, virtualNodeAffinity, sharedProviderID)
Expand All @@ -127,11 +137,13 @@ func deployVirtualNodepool(ctx context.Context, cliCtx *cli.Context, target stri
}, nil
}

func NewRunnerGroupSpecFromYamlFile() {}

// newLoadProfileFromEmbed loads load profile from embed and tweaks that load
// profile.
func newLoadProfileFromEmbed(cliCtx *cli.Context, name string) (_name string, _spec *types.RunnerGroupSpec, _cleanup func() error, _err error) {
var rgSpec types.RunnerGroupSpec
rgCfgFile, rgCfgFileDone, err := utils.NewLoadProfileFromEmbed(
rgCfgFile, rgCfgFileDone, err := utils.NewRunnerGroupSpecFileFromEmbed(
name,
func(spec *types.RunnerGroupSpec) error {
reqs := cliCtx.Int("total")
Expand All @@ -151,7 +163,10 @@ func newLoadProfileFromEmbed(cliCtx *cli.Context, name string) (_name string, _s
spec.NodeAffinity = affinityLabels
spec.Profile.Spec.ContentType = types.ContentType(cliCtx.String("content-type"))
data, _ := yaml.Marshal(spec)
klog.V(2).InfoS("Load Profile", "config", string(data))

log.GetLogger(context.TODO()).
WithKeyValues("level", "info").
LogKV("msg", "dump load profile", "config", string(data))

rgSpec = *spec
return nil
Expand Down
30 changes: 19 additions & 11 deletions contrib/cmd/runkperf/commands/warmup/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ import (

"github.com/Azure/kperf/api/types"
kperfcmdutils "github.com/Azure/kperf/cmd/kperf/commands/utils"
"github.com/Azure/kperf/contrib/internal/utils"
"github.com/Azure/kperf/contrib/log"
"github.com/Azure/kperf/contrib/utils"

"github.com/urfave/cli"
"gopkg.in/yaml.v2"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
)

// Command represents warmup subcommand.
Expand Down Expand Up @@ -80,7 +80,10 @@ var Command = cli.Command{
Action: func(cliCtx *cli.Context) (retErr error) {
ctx := context.Background()

rgCfgFile, rgCfgFileDone, err := utils.NewLoadProfileFromEmbed(
infoLogger := log.GetLogger(ctx).WithKeyValues("level", "info")
warnLogger := log.GetLogger(ctx).WithKeyValues("level", "warn")

rgCfgFile, rgCfgFileDone, err := utils.NewRunnerGroupSpecFileFromEmbed(
"loadprofile/warmup.yaml",
func(spec *types.RunnerGroupSpec) error {
reqs := cliCtx.Int("total")
Expand All @@ -104,7 +107,8 @@ var Command = cli.Command{
spec.NodeAffinity = affinityLabels

data, _ := yaml.Marshal(spec)
klog.V(2).InfoS("Load Profile", "config", string(data))

infoLogger.LogKV("msg", "dump load profile", "config", string(data))
return nil
},
)
Expand All @@ -127,11 +131,11 @@ var Command = cli.Command{
cores, ferr := utils.FetchAPIServerCores(ctx, kubeCfgPath)
if ferr == nil {
if isReady(cliCtx, cores) {
klog.V(0).Infof("apiserver resource is ready: %v", cores)
infoLogger.LogKV("msg", fmt.Sprintf("apiserver resource is ready: %v", cores))
return nil
}
} else {
klog.V(0).ErrorS(ferr, "failed to fetch apiserver cores")
warnLogger.LogKV("msg", "failed to fetch apiserver cores", "error", ferr)
}

delNP, err := deployWarmupVirtualNodepool(ctx, kubeCfgPath, isEKS, virtualNodeAffinity)
Expand Down Expand Up @@ -168,7 +172,7 @@ var Command = cli.Command{
cores, ferr = utils.FetchAPIServerCores(ctx, kubeCfgPath)
if ferr == nil {
if isReady(cliCtx, cores) {
klog.V(0).Infof("apiserver resource is ready: %v", cores)
infoLogger.LogKV("msg", fmt.Sprintf("apiserver resource is ready: %v", cores))
return nil
}
}
Expand All @@ -194,7 +198,10 @@ func isReady(cliCtx *cli.Context, cores map[string]int) bool {
func deployWarmupVirtualNodepool(ctx context.Context, kubeCfgPath string, isEKS bool, nodeAffinity string) (func() error, error) {
target := "warmup"

klog.V(0).InfoS("Deploying virtual nodepool", "name", target)
infoLogger := log.GetLogger(ctx).WithKeyValues("level", "info")
warnLogger := log.GetLogger(ctx).WithKeyValues("level", "warn")

infoLogger.LogKV("msg", "deploying virtual nodepool", "name", target)

kr := utils.NewKperfRunner(kubeCfgPath, "")

Expand All @@ -208,9 +215,9 @@ func deployWarmupVirtualNodepool(ctx context.Context, kubeCfgPath string, isEKS
}
}

klog.V(0).InfoS("Trying to delete", "nodepool", target)
infoLogger.LogKV("msg", "trying to delete", "nodepool", target)
if err = kr.DeleteNodepool(ctx, 0, target); err != nil {
klog.V(0).ErrorS(err, "failed to delete", "nodepool", target)
warnLogger.LogKV("msg", "failed to delete", "nodepool", target, "error", err)
}

err = kr.NewNodepool(ctx, 0, target, 100, 32, 96, 110, nodeAffinity, sharedProviderID)
Expand All @@ -226,7 +233,8 @@ func deployWarmupVirtualNodepool(ctx context.Context, kubeCfgPath string, isEKS
// patchEKSDaemonsetWithoutToleration removes tolerations to avoid pod scheduled
// to virtual nodes.
func patchEKSDaemonsetWithoutToleration(ctx context.Context, kubeCfgPath string) error {
klog.V(0).Info("Trying to removes EKS Daemonset's tolerations to avoid pod scheduled to virtual nodes")
log.GetLogger(ctx).WithKeyValues("level", "info").
LogKV("msg", "trying to removes EKS Daemonset's tolerations to avoid pod scheduled to virtual nodes")

clientset := mustClientset(kubeCfgPath)
ds := clientset.AppsV1().DaemonSets("kube-system")
Expand Down
52 changes: 52 additions & 0 deletions contrib/log/klogger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

package log

import (
"fmt"

"k8s.io/klog/v2"
)

type klogger struct {
level klog.Level
kvs []any
}

// Logf implements Logger.Logf.
func (kl klogger) Logf(msg string, args ...any) {
if len(kl.kvs) > 0 {
klog.V(kl.level).InfoS(fmt.Sprintf(msg, args...), kl.kvs...)
return
}
klog.V(kl.level).Infof(msg, args...)
}

// LogKV implements Logger.LogKV.
func (kl klogger) LogKV(kvs ...any) {
klog.V(kl.level).InfoS("", append(copySlice(kl.kvs), kvs...)...)
}

// WithKeyValues implements Logger.WithKeyValues.
func (kl klogger) WithKeyValues(kvs ...any) Logger {
return klogger{
level: kl.level,
kvs: append(copySlice(kl.kvs), kvs...),
}
}

// NewLogger returns builtin Logger implementation.
func NewLogger(level int32) Logger {
return klogger{level: klog.Level(level)}
}

func copySlice(src []any) []any {
if len(src) == 0 {
return []any{}
}

dst := make([]any, len(src))
copy(dst, src)
return dst
}
Loading
Loading