From 0d79a240c45668dd8d4a32d7325870ae267e37bb Mon Sep 17 00:00:00 2001 From: Xinwei Liu Date: Wed, 21 May 2025 11:02:52 +1000 Subject: [PATCH 1/5] contrib/utils: Add defaut timeout for deployments and jobs Fixed timeouts could cause failures in benchmarking for large-scale deployments due to longer operation times. This change ensures smoother benchmarking by adapting timeouts to cluster size. --- .../commands/bench/node100_job1_pod3k.go | 2 +- .../runkperf/commands/bench/node100_pod10k.go | 2 +- .../commands/bench/node10_job1_pod100.go | 2 +- .../cmd/runkperf/commands/warmup/command.go | 2 +- contrib/utils/utils.go | 38 +++++++++++++++---- 5 files changed, 35 insertions(+), 11 deletions(-) diff --git a/contrib/cmd/runkperf/commands/bench/node100_job1_pod3k.go b/contrib/cmd/runkperf/commands/bench/node100_job1_pod3k.go index cc1981d3..29ece146 100644 --- a/contrib/cmd/runkperf/commands/bench/node100_job1_pod3k.go +++ b/contrib/cmd/runkperf/commands/bench/node100_job1_pod3k.go @@ -71,7 +71,7 @@ func benchNode100Job1Pod3KCaseRun(cliCtx *cli.Context) (*internaltypes.Benchmark go func() { defer wg.Done() - utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "job1pod3k", "workload/3kpod.job.yaml", jobInterval) + utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "job1pod3k", "workload/3kpod.job.yaml", jobInterval, utils.DefaultJobsTimeout) }() rgResult, derr := utils.DeployRunnerGroup(ctx, diff --git a/contrib/cmd/runkperf/commands/bench/node100_pod10k.go b/contrib/cmd/runkperf/commands/bench/node100_pod10k.go index 4c4e9359..8798597e 100644 --- a/contrib/cmd/runkperf/commands/bench/node100_pod10k.go +++ b/contrib/cmd/runkperf/commands/bench/node100_pod10k.go @@ -103,7 +103,7 @@ func benchNode100DeploymentNPod10KRun(cliCtx *cli.Context) (*internaltypes.Bench deploymentNamePattern := "benchmark" rollingUpdateFn, ruCleanupFn, err := utils.DeployAndRepeatRollingUpdateDeployments(dpCtx, - kubeCfgPath, deploymentNamePattern, total, replica, paddingBytes, restartInterval) + kubeCfgPath, deploymentNamePattern, total, replica, paddingBytes, restartInterval, utils.DefaultDeploymentsTimeout) if err != nil { dpCancel() return nil, fmt.Errorf("failed to setup workload: %w", err) diff --git a/contrib/cmd/runkperf/commands/bench/node10_job1_pod100.go b/contrib/cmd/runkperf/commands/bench/node10_job1_pod100.go index 5a3ff7be..3a22987c 100644 --- a/contrib/cmd/runkperf/commands/bench/node10_job1_pod100.go +++ b/contrib/cmd/runkperf/commands/bench/node10_job1_pod100.go @@ -71,7 +71,7 @@ func benchNode10Job1Pod100CaseRun(cliCtx *cli.Context) (*internaltypes.Benchmark go func() { defer wg.Done() - utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "job1pod100", "workload/100pod.job.yaml", jobInterval) + utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "job1pod100", "workload/100pod.job.yaml", jobInterval, utils.DefaultJobsTimeout) }() rgResult, derr := utils.DeployRunnerGroup(ctx, diff --git a/contrib/cmd/runkperf/commands/warmup/command.go b/contrib/cmd/runkperf/commands/warmup/command.go index e4a7591d..7e44a93b 100644 --- a/contrib/cmd/runkperf/commands/warmup/command.go +++ b/contrib/cmd/runkperf/commands/warmup/command.go @@ -156,7 +156,7 @@ var Command = cli.Command{ go func() { defer wg.Done() - utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "warmupjob", "workload/3kpod.job.yaml", 5*time.Second) + utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "warmupjob", "workload/3kpod.job.yaml", 5*time.Second, utils.DefaultJobsTimeout) }() _, derr := utils.DeployRunnerGroup(ctx, diff --git a/contrib/utils/utils.go b/contrib/utils/utils.go index e4275b25..da4e0a3c 100644 --- a/contrib/utils/utils.go +++ b/contrib/utils/utils.go @@ -39,8 +39,31 @@ var ( EKSIdleNodepoolInstanceType = "m4.large" ) +type DeploymentsTimeout struct { + deployTimeout time.Duration + restartTimeout time.Duration + rolloutTimeout time.Duration +} +type JobsTimeout struct { + applyTimeout time.Duration + waitTimeout time.Duration + deleteTimeout time.Duration +} + +var DefaultDeploymentsTimeout = DeploymentsTimeout{ + deployTimeout: 10 * time.Minute, + restartTimeout: 2 * time.Minute, + rolloutTimeout: 10 * time.Minute, +} +var DefaultJobsTimeout = JobsTimeout{ + applyTimeout: 5 * time.Minute, + waitTimeout: 15 * time.Minute, + deleteTimeout: 5 * time.Minute, +} + // RepeatJobWithPod repeats to deploy 3k pods. -func RepeatJobWithPod(ctx context.Context, kubeCfgPath string, namespace string, target string, internal time.Duration) { +func RepeatJobWithPod(ctx context.Context, kubeCfgPath string, namespace string, + target string, internal time.Duration, jobsTimeout JobsTimeout) { infoLogger := log.GetLogger(ctx).WithKeyValues("level", "info") warnLogger := log.GetLogger(ctx).WithKeyValues("level", "warn") @@ -85,18 +108,18 @@ func RepeatJobWithPod(ctx context.Context, kubeCfgPath string, namespace string, time.Sleep(retryInterval) - aerr := kr.Apply(ctx, 5*time.Minute, jobFile) + aerr := kr.Apply(ctx, jobsTimeout.applyTimeout, jobFile) if aerr != nil { warnLogger.LogKV("msg", "failed to apply job, retry after 5 seconds", "job", target, "error", aerr) continue } - werr := kr.Wait(ctx, 15*time.Minute, "condition=complete", "15m", "job/batchjobs") + werr := kr.Wait(ctx, jobsTimeout.waitTimeout, "condition=complete", "15m", "job/batchjobs") if werr != nil { warnLogger.LogKV("msg", "failed to wait job finish", "job", target, "error", werr) } - derr := kr.Delete(ctx, 5*time.Minute, jobFile) + derr := kr.Delete(ctx, jobsTimeout.deleteTimeout, jobFile) if derr != nil { warnLogger.LogKV("msg", "failed to delete job", "job", target, "error", derr) } @@ -111,6 +134,7 @@ func DeployAndRepeatRollingUpdateDeployments( releaseName string, total, replica, paddingBytes int, internal time.Duration, + deploymentsTimeout DeploymentsTimeout, ) (rollingUpdateFn, cleanupFn func(), retErr error) { infoLogger := log.GetLogger(ctx).WithKeyValues("level", "info") warnLogger := log.GetLogger(ctx).WithKeyValues("level", "warn") @@ -149,7 +173,7 @@ func DeployAndRepeatRollingUpdateDeployments( "paddingBytes", paddingBytes, ) - err = releaseCli.Deploy(ctx, 10*time.Minute) + err = releaseCli.Deploy(ctx, deploymentsTimeout.deployTimeout) if err != nil { if errors.Is(err, context.Canceled) { infoLogger.LogKV("msg", "deploy is canceled") @@ -187,12 +211,12 @@ func DeployAndRepeatRollingUpdateDeployments( err := func() error { kr := NewKubectlRunner(kubeCfgPath, ns) - err := kr.DeploymentRestart(ctx, 2*time.Minute, name) + err := kr.DeploymentRestart(ctx, deploymentsTimeout.restartTimeout, name) if err != nil { return fmt.Errorf("failed to restart deployment %s: %w", name, err) } - err = kr.DeploymentRolloutStatus(ctx, 10*time.Minute, name) + err = kr.DeploymentRolloutStatus(ctx, deploymentsTimeout.rolloutTimeout, name) if err != nil { return fmt.Errorf("failed to watch the rollout status of deployment %s: %w", name, err) } From 02cee1fe8f95d6be64c08337c3cf5f67822fb495 Mon Sep 17 00:00:00 2001 From: Xinwei Liu Date: Mon, 14 Jul 2025 14:19:10 +1000 Subject: [PATCH 2/5] Revert "contrib/utils: Add defaut timeout for deployments and jobs" This reverts commit 0d79a240c45668dd8d4a32d7325870ae267e37bb. --- .../commands/bench/node100_job1_pod3k.go | 2 +- .../runkperf/commands/bench/node100_pod10k.go | 2 +- .../commands/bench/node10_job1_pod100.go | 2 +- .../cmd/runkperf/commands/warmup/command.go | 2 +- contrib/utils/utils.go | 38 ++++--------------- 5 files changed, 11 insertions(+), 35 deletions(-) diff --git a/contrib/cmd/runkperf/commands/bench/node100_job1_pod3k.go b/contrib/cmd/runkperf/commands/bench/node100_job1_pod3k.go index 29ece146..cc1981d3 100644 --- a/contrib/cmd/runkperf/commands/bench/node100_job1_pod3k.go +++ b/contrib/cmd/runkperf/commands/bench/node100_job1_pod3k.go @@ -71,7 +71,7 @@ func benchNode100Job1Pod3KCaseRun(cliCtx *cli.Context) (*internaltypes.Benchmark go func() { defer wg.Done() - utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "job1pod3k", "workload/3kpod.job.yaml", jobInterval, utils.DefaultJobsTimeout) + utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "job1pod3k", "workload/3kpod.job.yaml", jobInterval) }() rgResult, derr := utils.DeployRunnerGroup(ctx, diff --git a/contrib/cmd/runkperf/commands/bench/node100_pod10k.go b/contrib/cmd/runkperf/commands/bench/node100_pod10k.go index 8798597e..4c4e9359 100644 --- a/contrib/cmd/runkperf/commands/bench/node100_pod10k.go +++ b/contrib/cmd/runkperf/commands/bench/node100_pod10k.go @@ -103,7 +103,7 @@ func benchNode100DeploymentNPod10KRun(cliCtx *cli.Context) (*internaltypes.Bench deploymentNamePattern := "benchmark" rollingUpdateFn, ruCleanupFn, err := utils.DeployAndRepeatRollingUpdateDeployments(dpCtx, - kubeCfgPath, deploymentNamePattern, total, replica, paddingBytes, restartInterval, utils.DefaultDeploymentsTimeout) + kubeCfgPath, deploymentNamePattern, total, replica, paddingBytes, restartInterval) if err != nil { dpCancel() return nil, fmt.Errorf("failed to setup workload: %w", err) diff --git a/contrib/cmd/runkperf/commands/bench/node10_job1_pod100.go b/contrib/cmd/runkperf/commands/bench/node10_job1_pod100.go index 3a22987c..5a3ff7be 100644 --- a/contrib/cmd/runkperf/commands/bench/node10_job1_pod100.go +++ b/contrib/cmd/runkperf/commands/bench/node10_job1_pod100.go @@ -71,7 +71,7 @@ func benchNode10Job1Pod100CaseRun(cliCtx *cli.Context) (*internaltypes.Benchmark go func() { defer wg.Done() - utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "job1pod100", "workload/100pod.job.yaml", jobInterval, utils.DefaultJobsTimeout) + utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "job1pod100", "workload/100pod.job.yaml", jobInterval) }() rgResult, derr := utils.DeployRunnerGroup(ctx, diff --git a/contrib/cmd/runkperf/commands/warmup/command.go b/contrib/cmd/runkperf/commands/warmup/command.go index 7e44a93b..e4a7591d 100644 --- a/contrib/cmd/runkperf/commands/warmup/command.go +++ b/contrib/cmd/runkperf/commands/warmup/command.go @@ -156,7 +156,7 @@ var Command = cli.Command{ go func() { defer wg.Done() - utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "warmupjob", "workload/3kpod.job.yaml", 5*time.Second, utils.DefaultJobsTimeout) + utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "warmupjob", "workload/3kpod.job.yaml", 5*time.Second) }() _, derr := utils.DeployRunnerGroup(ctx, diff --git a/contrib/utils/utils.go b/contrib/utils/utils.go index da4e0a3c..e4275b25 100644 --- a/contrib/utils/utils.go +++ b/contrib/utils/utils.go @@ -39,31 +39,8 @@ var ( EKSIdleNodepoolInstanceType = "m4.large" ) -type DeploymentsTimeout struct { - deployTimeout time.Duration - restartTimeout time.Duration - rolloutTimeout time.Duration -} -type JobsTimeout struct { - applyTimeout time.Duration - waitTimeout time.Duration - deleteTimeout time.Duration -} - -var DefaultDeploymentsTimeout = DeploymentsTimeout{ - deployTimeout: 10 * time.Minute, - restartTimeout: 2 * time.Minute, - rolloutTimeout: 10 * time.Minute, -} -var DefaultJobsTimeout = JobsTimeout{ - applyTimeout: 5 * time.Minute, - waitTimeout: 15 * time.Minute, - deleteTimeout: 5 * time.Minute, -} - // RepeatJobWithPod repeats to deploy 3k pods. -func RepeatJobWithPod(ctx context.Context, kubeCfgPath string, namespace string, - target string, internal time.Duration, jobsTimeout JobsTimeout) { +func RepeatJobWithPod(ctx context.Context, kubeCfgPath string, namespace string, target string, internal time.Duration) { infoLogger := log.GetLogger(ctx).WithKeyValues("level", "info") warnLogger := log.GetLogger(ctx).WithKeyValues("level", "warn") @@ -108,18 +85,18 @@ func RepeatJobWithPod(ctx context.Context, kubeCfgPath string, namespace string, time.Sleep(retryInterval) - aerr := kr.Apply(ctx, jobsTimeout.applyTimeout, jobFile) + aerr := kr.Apply(ctx, 5*time.Minute, jobFile) if aerr != nil { warnLogger.LogKV("msg", "failed to apply job, retry after 5 seconds", "job", target, "error", aerr) continue } - werr := kr.Wait(ctx, jobsTimeout.waitTimeout, "condition=complete", "15m", "job/batchjobs") + werr := kr.Wait(ctx, 15*time.Minute, "condition=complete", "15m", "job/batchjobs") if werr != nil { warnLogger.LogKV("msg", "failed to wait job finish", "job", target, "error", werr) } - derr := kr.Delete(ctx, jobsTimeout.deleteTimeout, jobFile) + derr := kr.Delete(ctx, 5*time.Minute, jobFile) if derr != nil { warnLogger.LogKV("msg", "failed to delete job", "job", target, "error", derr) } @@ -134,7 +111,6 @@ func DeployAndRepeatRollingUpdateDeployments( releaseName string, total, replica, paddingBytes int, internal time.Duration, - deploymentsTimeout DeploymentsTimeout, ) (rollingUpdateFn, cleanupFn func(), retErr error) { infoLogger := log.GetLogger(ctx).WithKeyValues("level", "info") warnLogger := log.GetLogger(ctx).WithKeyValues("level", "warn") @@ -173,7 +149,7 @@ func DeployAndRepeatRollingUpdateDeployments( "paddingBytes", paddingBytes, ) - err = releaseCli.Deploy(ctx, deploymentsTimeout.deployTimeout) + err = releaseCli.Deploy(ctx, 10*time.Minute) if err != nil { if errors.Is(err, context.Canceled) { infoLogger.LogKV("msg", "deploy is canceled") @@ -211,12 +187,12 @@ func DeployAndRepeatRollingUpdateDeployments( err := func() error { kr := NewKubectlRunner(kubeCfgPath, ns) - err := kr.DeploymentRestart(ctx, deploymentsTimeout.restartTimeout, name) + err := kr.DeploymentRestart(ctx, 2*time.Minute, name) if err != nil { return fmt.Errorf("failed to restart deployment %s: %w", name, err) } - err = kr.DeploymentRolloutStatus(ctx, deploymentsTimeout.rolloutTimeout, name) + err = kr.DeploymentRolloutStatus(ctx, 10*time.Minute, name) if err != nil { return fmt.Errorf("failed to watch the rollout status of deployment %s: %w", name, err) } From 29652b132b37c11f8129e3a74df7eb3f39d60ef7 Mon Sep 17 00:00:00 2001 From: Xinwei Liu Date: Tue, 15 Jul 2025 15:20:57 +1000 Subject: [PATCH 3/5] Add customizable solution for setting timeout for deployments and jobs --- .../commands/bench/node100_job1_pod3k.go | 3 +- .../runkperf/commands/bench/node100_pod10k.go | 4 +- .../commands/bench/node10_job1_pod100.go | 3 +- .../cmd/runkperf/commands/warmup/command.go | 3 +- contrib/utils/utils.go | 43 +++++++++--- contrib/utils/utils_comman.go | 65 +++++++++++++++++++ 6 files changed, 106 insertions(+), 15 deletions(-) create mode 100644 contrib/utils/utils_comman.go diff --git a/contrib/cmd/runkperf/commands/bench/node100_job1_pod3k.go b/contrib/cmd/runkperf/commands/bench/node100_job1_pod3k.go index cc1981d3..c425cd1d 100644 --- a/contrib/cmd/runkperf/commands/bench/node100_job1_pod3k.go +++ b/contrib/cmd/runkperf/commands/bench/node100_job1_pod3k.go @@ -71,7 +71,8 @@ func benchNode100Job1Pod3KCaseRun(cliCtx *cli.Context) (*internaltypes.Benchmark go func() { defer wg.Done() - utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "job1pod3k", "workload/3kpod.job.yaml", jobInterval) + utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "job1pod3k", "workload/3kpod.job.yaml", + utils.WithJobIntervalOpt(jobInterval)) }() rgResult, derr := utils.DeployRunnerGroup(ctx, diff --git a/contrib/cmd/runkperf/commands/bench/node100_pod10k.go b/contrib/cmd/runkperf/commands/bench/node100_pod10k.go index 0f5f5033..9f3816be 100644 --- a/contrib/cmd/runkperf/commands/bench/node100_pod10k.go +++ b/contrib/cmd/runkperf/commands/bench/node100_pod10k.go @@ -104,7 +104,7 @@ func benchNode100DeploymentNPod10KRun(cliCtx *cli.Context) (*internaltypes.Bench // TODO(xinwei): Implement batching support for deploying deployments after decoupling it from rolling update logic. ruCleanupFn, err := utils.DeployDeployments(dpCtx, - kubeCfgPath, deploymentNamePattern, total, replica, paddingBytes) + kubeCfgPath, deploymentNamePattern, total, replica, paddingBytes, 10*time.Minute) if err != nil { dpCancel() return nil, fmt.Errorf("failed to setup workload: %w", err) @@ -130,7 +130,7 @@ func benchNode100DeploymentNPod10KRun(cliCtx *cli.Context) (*internaltypes.Bench // // DeployRunnerGroup should return ready notification. // The rolling update should run after runners. - utils.RollingUpdateDeployments(dpCtx, total, deploymentNamePattern, kubeCfgPath, restartInterval) + utils.RollingUpdateDeployments(dpCtx, total, deploymentNamePattern, kubeCfgPath, utils.WithRollingUpdateIntervalTimeoutOpt(restartInterval)) }() rgResult, derr := utils.DeployRunnerGroup(ctx, diff --git a/contrib/cmd/runkperf/commands/bench/node10_job1_pod100.go b/contrib/cmd/runkperf/commands/bench/node10_job1_pod100.go index e50b864c..cd38c775 100644 --- a/contrib/cmd/runkperf/commands/bench/node10_job1_pod100.go +++ b/contrib/cmd/runkperf/commands/bench/node10_job1_pod100.go @@ -71,7 +71,8 @@ func benchNode10Job1Pod100CaseRun(cliCtx *cli.Context) (*internaltypes.Benchmark go func() { defer wg.Done() - utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "job1pod100", "workload/100pod.job.yaml", jobInterval) + utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "job1pod100", "workload/100pod.job.yaml", + utils.WithJobIntervalOpt(jobInterval)) }() rgResult, derr := utils.DeployRunnerGroup(ctx, diff --git a/contrib/cmd/runkperf/commands/warmup/command.go b/contrib/cmd/runkperf/commands/warmup/command.go index b5debe4f..e022e91f 100644 --- a/contrib/cmd/runkperf/commands/warmup/command.go +++ b/contrib/cmd/runkperf/commands/warmup/command.go @@ -156,7 +156,8 @@ var Command = cli.Command{ go func() { defer wg.Done() - utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "warmupjob", "workload/3kpod.job.yaml", 5*time.Second) + utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "warmupjob", "workload/3kpod.job.yaml", + utils.WithJobIntervalOpt(5*time.Second)) }() _, derr := utils.DeployRunnerGroup(ctx, diff --git a/contrib/utils/utils.go b/contrib/utils/utils.go index 57497a26..9eb3fdbb 100644 --- a/contrib/utils/utils.go +++ b/contrib/utils/utils.go @@ -40,10 +40,21 @@ var ( ) // RepeatJobWithPod repeats to deploy 3k pods. -func RepeatJobWithPod(ctx context.Context, kubeCfgPath string, namespace string, target string, internal time.Duration) { +func RepeatJobWithPod(ctx context.Context, kubeCfgPath string, namespace string, + target string, timeoutOpts ...JobTimeoutOpt) { infoLogger := log.GetLogger(ctx).WithKeyValues("level", "info") warnLogger := log.GetLogger(ctx).WithKeyValues("level", "warn") + var jobsTimeout = &jobsTimeoutOption{ + jobInterval: 10 * time.Second, + applyTimeout: 5 * time.Minute, + waitTimeout: 15 * time.Minute, + deleteTimeout: 5 * time.Minute, + } + for _, opt := range timeoutOpts { + opt(jobsTimeout) + } + infoLogger.LogKV("msg", "repeat to create job with 3k pods") data, err := manifests.FS.ReadFile(target) @@ -85,22 +96,23 @@ func RepeatJobWithPod(ctx context.Context, kubeCfgPath string, namespace string, time.Sleep(retryInterval) - aerr := kr.Apply(ctx, 5*time.Minute, jobFile) + aerr := kr.Apply(ctx, jobsTimeout.applyTimeout, jobFile) if aerr != nil { warnLogger.LogKV("msg", "failed to apply job, retry after 5 seconds", "job", target, "error", aerr) continue } - werr := kr.Wait(ctx, 15*time.Minute, "condition=complete", "15m", "job/batchjobs") + timoutString := fmt.Sprintf("%ds", int(jobsTimeout.waitTimeout.Seconds())) + werr := kr.Wait(ctx, jobsTimeout.waitTimeout, "condition=complete", timoutString, "job/batchjobs") if werr != nil { warnLogger.LogKV("msg", "failed to wait job finish", "job", target, "error", werr) } - derr := kr.Delete(ctx, 5*time.Minute, jobFile) + derr := kr.Delete(ctx, jobsTimeout.deleteTimeout, jobFile) if derr != nil { warnLogger.LogKV("msg", "failed to delete job", "job", target, "error", derr) } - time.Sleep(internal) + time.Sleep(jobsTimeout.jobInterval) } } @@ -110,6 +122,7 @@ func DeployDeployments( kubeCfgPath string, releaseName string, total, replica, paddingBytes int, + deployTimeout time.Duration, ) (cleanupFn func(), retErr error) { infoLogger := log.GetLogger(ctx).WithKeyValues("level", "info") warnLogger := log.GetLogger(ctx).WithKeyValues("level", "warn") @@ -148,7 +161,7 @@ func DeployDeployments( "paddingBytes", paddingBytes, ) - err = releaseCli.Deploy(ctx, 10*time.Minute) + err = releaseCli.Deploy(ctx, deployTimeout) if err != nil { if errors.Is(err, context.Canceled) { infoLogger.LogKV("msg", "deploy is canceled") @@ -171,7 +184,17 @@ func DeployDeployments( return cleanupFn, nil } -func RollingUpdateDeployments(ctx context.Context, total int, namePattern string, kubeCfgPath string, internal time.Duration) { +func RollingUpdateDeployments(ctx context.Context, total int, + namePattern string, kubeCfgPath string, timeoutOpts ...RollingUpdateTimeoutOpt) { + var rollingUpdateTimeout = &rollingUpdateTimeoutOption{ + restartTimeout: 2 * time.Minute, + rolloutTimeout: 10 * time.Minute, + rolloutInterval: 1 * time.Minute, + } + for _, opt := range timeoutOpts { + opt(rollingUpdateTimeout) + } + infoLogger := log.GetLogger(ctx).WithKeyValues("level", "info") warnLogger := log.GetLogger(ctx).WithKeyValues("level", "warn") for { @@ -179,7 +202,7 @@ func RollingUpdateDeployments(ctx context.Context, total int, namePattern string case <-ctx.Done(): infoLogger.LogKV("msg", "stop rolling-updating") return - case <-time.After(internal): + case <-time.After(rollingUpdateTimeout.rolloutInterval): } infoLogger.LogKV("msg", "start to rolling-update deployments") @@ -191,12 +214,12 @@ func RollingUpdateDeployments(ctx context.Context, total int, namePattern string err := func() error { kr := NewKubectlRunner(kubeCfgPath, ns) - err := kr.DeploymentRestart(ctx, 2*time.Minute, name) + err := kr.DeploymentRestart(ctx, rollingUpdateTimeout.restartTimeout, name) if err != nil { return fmt.Errorf("failed to restart deployment %s: %w", name, err) } - err = kr.DeploymentRolloutStatus(ctx, 10*time.Minute, name) + err = kr.DeploymentRolloutStatus(ctx, rollingUpdateTimeout.rolloutTimeout, name) if err != nil { return fmt.Errorf("failed to watch the rollout status of deployment %s: %w", name, err) } diff --git a/contrib/utils/utils_comman.go b/contrib/utils/utils_comman.go new file mode 100644 index 00000000..845d8ea0 --- /dev/null +++ b/contrib/utils/utils_comman.go @@ -0,0 +1,65 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +package utils + +import ( + "time" +) + +type rollingUpdateTimeoutOption struct { + restartTimeout time.Duration + rolloutTimeout time.Duration + rolloutInterval time.Duration +} +type jobsTimeoutOption struct { + jobInterval time.Duration + applyTimeout time.Duration + waitTimeout time.Duration + deleteTimeout time.Duration +} + +type RollingUpdateTimeoutOpt func(*rollingUpdateTimeoutOption) + +func WithRollingUpdateRestartTimeoutOpt(to time.Duration) RollingUpdateTimeoutOpt { + return func(rto *rollingUpdateTimeoutOption) { + rto.restartTimeout = to + } +} + +func WithRollingUpdateRolloutTimeoutOpt(to time.Duration) RollingUpdateTimeoutOpt { + return func(rto *rollingUpdateTimeoutOption) { + rto.rolloutTimeout = to + } +} + +func WithRollingUpdateIntervalTimeoutOpt(to time.Duration) RollingUpdateTimeoutOpt { + return func(rto *rollingUpdateTimeoutOption) { + rto.rolloutInterval = to + } +} + +type JobTimeoutOpt func(*jobsTimeoutOption) + +func WithJobIntervalOpt(to time.Duration) JobTimeoutOpt { + return func(jto *jobsTimeoutOption) { + jto.jobInterval = to + } +} +func WithJobApplyTimeoutOpt(to time.Duration) JobTimeoutOpt { + return func(jto *jobsTimeoutOption) { + jto.applyTimeout = to + } +} + +func WithJobWaitTimeoutOpt(to time.Duration) JobTimeoutOpt { + return func(jto *jobsTimeoutOption) { + jto.waitTimeout = to + } +} + +func WithJobDeleteTimeoutOpt(to time.Duration) JobTimeoutOpt { + return func(jto *jobsTimeoutOption) { + jto.deleteTimeout = to + } +} From b84354250d9e1017880d94060784e1645df334aa Mon Sep 17 00:00:00 2001 From: Xinwei Liu Date: Tue, 15 Jul 2025 15:29:29 +1000 Subject: [PATCH 4/5] Fix merge conflict --- .../commands/bench/node10_job1_pod1k.go | 101 ++++++ contrib/cmd/runkperf/commands/bench/root.go | 1 + .../commands/data/daemonsets/daemonset.go | 343 ++++++++++++++++++ contrib/cmd/runkperf/commands/data/root.go | 2 + .../loadprofile/node10_job1_pod1k.yaml | 23 ++ .../manifests/workload/1kpod.job.yaml | 32 ++ 6 files changed, 502 insertions(+) create mode 100644 contrib/cmd/runkperf/commands/bench/node10_job1_pod1k.go create mode 100644 contrib/cmd/runkperf/commands/data/daemonsets/daemonset.go create mode 100644 contrib/internal/manifests/loadprofile/node10_job1_pod1k.yaml create mode 100644 contrib/internal/manifests/workload/1kpod.job.yaml diff --git a/contrib/cmd/runkperf/commands/bench/node10_job1_pod1k.go b/contrib/cmd/runkperf/commands/bench/node10_job1_pod1k.go new file mode 100644 index 00000000..df101eeb --- /dev/null +++ b/contrib/cmd/runkperf/commands/bench/node10_job1_pod1k.go @@ -0,0 +1,101 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +// SLI Read Only Benchmark +// This benchmark is to test the read-only performance of a Kubernetes cluster +// deploying a job with 1000 pods on 10 virtual nodes. +package bench + +import ( + "context" + "fmt" + "sync" + "time" + + internaltypes "github.com/Azure/kperf/contrib/internal/types" + "github.com/Azure/kperf/contrib/utils" + + "github.com/urfave/cli" +) + +var benchNode10Job1Pod1kCase = cli.Command{ + Name: "node10_job1_pod1k", + Usage: ` + The test suite is to setup 10 virtual nodes and deploy one job with 1000 pods on + those nodes. It repeats to create and delete job. The load profile is fixed. + `, + Flags: append( + []cli.Flag{ + cli.IntFlag{ + Name: "total", + Usage: "Total requests per runner (There are 10 runners totally and runner's rate is 1)", + Value: 1000, + }, + }, + commonFlags..., + ), + Action: func(cliCtx *cli.Context) error { + _, err := renderBenchmarkReportInterceptor( + addAPIServerCoresInfoInterceptor(benchNode10Job1Pod1kCaseRun), + )(cliCtx) + return err + }, +} + +func benchNode10Job1Pod1kCaseRun(cliCtx *cli.Context) (*internaltypes.BenchmarkReport, error) { + ctx := context.Background() + kubeCfgPath := cliCtx.GlobalString("kubeconfig") + + rgCfgFile, rgSpec, rgCfgFileDone, err := newLoadProfileFromEmbed(cliCtx, + "loadprofile/node10_job1_pod1k.yaml") + if err != nil { + return nil, err + } + defer func() { _ = rgCfgFileDone() }() + + vcDone, err := deployVirtualNodepool(ctx, cliCtx, "node10job1pod1k", + 10, + cliCtx.Int("cpu"), + cliCtx.Int("memory"), + cliCtx.Int("max-pods"), + ) + if err != nil { + return nil, fmt.Errorf("failed to deploy virtual node: %w", err) + } + defer func() { _ = vcDone() }() + + var wg sync.WaitGroup + wg.Add(1) + + jobInterval := 5 * time.Second + jobCtx, jobCancel := context.WithCancel(ctx) + go func() { + defer wg.Done() + + utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "job1pod1k", "workload/1kpod.job.yaml", + utils.WithJobIntervalOpt(jobInterval)) + }() + + rgResult, derr := utils.DeployRunnerGroup(ctx, + cliCtx.GlobalString("kubeconfig"), + cliCtx.GlobalString("runner-image"), + rgCfgFile, + cliCtx.GlobalString("runner-flowcontrol"), + cliCtx.GlobalString("rg-affinity"), + ) + jobCancel() + wg.Wait() + + if derr != nil { + return nil, derr + } + + return &internaltypes.BenchmarkReport{ + Description: fmt.Sprintf(` + Environment: 10 virtual nodes managed by kwok-controller, + Workload: Deploy 1 job with 1,000 pods repeatedly. The parallelism is 100. The interval is %v`, jobInterval), + LoadSpec: *rgSpec, + Result: *rgResult, + Info: make(map[string]interface{}), + }, nil +} diff --git a/contrib/cmd/runkperf/commands/bench/root.go b/contrib/cmd/runkperf/commands/bench/root.go index c2028081..cd8154aa 100644 --- a/contrib/cmd/runkperf/commands/bench/root.go +++ b/contrib/cmd/runkperf/commands/bench/root.go @@ -60,6 +60,7 @@ var Command = cli.Command{ benchNode100DeploymentNPod10KCase, benchCiliumCustomResourceListCase, benchListConfigmapsCase, + benchNode10Job1Pod1kCase, }, } diff --git a/contrib/cmd/runkperf/commands/data/daemonsets/daemonset.go b/contrib/cmd/runkperf/commands/data/daemonsets/daemonset.go new file mode 100644 index 00000000..9c7b79f1 --- /dev/null +++ b/contrib/cmd/runkperf/commands/data/daemonsets/daemonset.go @@ -0,0 +1,343 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +package daemonsets + +import ( + "context" + "fmt" + "os" + "strings" + "text/tabwriter" + + "github.com/Azure/kperf/cmd/kperf/commands/utils" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/util/flowcontrol" + + "github.com/urfave/cli" + + "regexp" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" +) + +var Command = cli.Command{ + Name: "daemonset", + ShortName: "ds", + Usage: "Manage daemonsets", + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "kubeconfig", + Usage: "Path to the kubeconfig file", + Value: utils.DefaultKubeConfigPath, + }, + cli.StringFlag{ + Name: "namespace", + Usage: "The namespace to create daemonsets in. If not set, the default namespace will be used.", + Value: "default", + }, + }, + Subcommands: []cli.Command{ + daemonsetAddCommand, + daemonsetDelCommand, + daemonsetListCommand, + }, +} + +var daemonsetAddCommand = cli.Command{ + Name: "add", + Usage: "Add daemonset", + ArgsUsage: "NAME of the daemonset", + Flags: []cli.Flag{ + cli.IntFlag{ + Name: "count", + Usage: "The value is the number of daemonsets to deploy. Each daemonset will be deployed on all virtual nodes by default.", + Value: 1, + }, + }, + Action: func(cliCtx *cli.Context) error { + if cliCtx.NArg() != 1 { + return fmt.Errorf("required only one argument as daemonsets name prefix: %v", cliCtx.Args()) + } + dsName := strings.TrimSpace(cliCtx.Args().Get(0)) + if len(dsName) == 0 { + return fmt.Errorf("required non-empty daemonset name prefix") + } + + kubeCfgPath := cliCtx.GlobalString("kubeconfig") + namespace := cliCtx.GlobalString("namespace") + count := cliCtx.Int("count") + + if count <= 0 { + return fmt.Errorf("count must be greater than 0") + } + + err := prepareNamespace(kubeCfgPath, namespace) + if err != nil { + return err + } + + clientset, err := newClientsetWithRateLimiter(kubeCfgPath, 30, 10) + if err != nil { + return err + } + + // Create the daemonsets + err = createDaemonsets(clientset, namespace, dsName, count) + if err != nil { + return err + } + fmt.Printf("Created %d daemonsets with prefix %s in %s namespace\n", count, dsName, namespace) + return nil + }, +} + +var daemonsetDelCommand = cli.Command{ + Name: "delete", + ShortName: "del", + ArgsUsage: "NAME", + Usage: "Delete a daemonset", + Action: func(cliCtx *cli.Context) error { + if cliCtx.NArg() != 1 { + return fmt.Errorf("required only one argument as daemonset name prefix: %v", cliCtx.Args()) + } + dsName := strings.TrimSpace(cliCtx.Args().Get(0)) + if len(dsName) == 0 { + return fmt.Errorf("required non-empty daemonset name prefix") + } + + namespace := cliCtx.GlobalString("namespace") + kubeCfgPath := cliCtx.GlobalString("kubeconfig") + + clientset, err := newClientsetWithRateLimiter(kubeCfgPath, 30, 10) + if err != nil { + return err + } + + // Delete each daemonset + err = deleteDaemonsets(clientset, namespace, dsName) + if err != nil { + return err + } + + fmt.Printf("Deleted daemonset %s in %s namespace\n", dsName, namespace) + return nil + + }, +} + +var appLabel = "fake-daemonset" + +var daemonsetListCommand = cli.Command{ + Name: "list", + Usage: "List daemonsets generated by Kperf. Lists all if no arguments are given; otherwise, provide daemonset group names separated by spaces (e.g., `list dsName1 dsName2`).", + ArgsUsage: "NAME", + Action: func(cliCtx *cli.Context) error { + namespace := cliCtx.GlobalString("namespace") + kubeCfgPath := cliCtx.GlobalString("kubeconfig") + clientset, err := newClientsetWithRateLimiter(kubeCfgPath, 30, 10) + if err != nil { + return err + } + + const ( + minWidth = 1 + tabWidth = 12 + padding = 3 + padChar = ' ' + flags = 0 + ) + tw := tabwriter.NewWriter(os.Stdout, minWidth, tabWidth, padding, padChar, flags) + fmt.Fprintln(tw, "NAME\tnamespace\tcount\t") + + labelSelector := fmt.Sprintf("app=%s", appLabel) + if cliCtx.NArg() != 0 { + args := cliCtx.Args() + namesStr := strings.Join(args, ",") + labelSelector = fmt.Sprintf("app=%s, dsName in (%s)", appLabel, namesStr) + } + + daemonSets, err := listDaemonsets(clientset, labelSelector, namespace) + if err != nil { + return err + } + + if len(daemonSets.Items) == 0 { + return tw.Flush() + } + + dsNames := make(map[string]int) + + for _, ds := range daemonSets.Items { + re := regexp.MustCompile(`^(.*)-\d+$`) // Match the daemonset name pattern like "dsName-0", "dsName-1", etc. + matches := re.FindStringSubmatch(ds.Name) + if len(matches) > 1 { + dsNames[matches[1]]++ + } else { + dsNames[ds.Name]++ + } + } + for name, count := range dsNames { + fmt.Fprintf(tw, "%s\t%s\t%d\t\n", + name, + namespace, + count, + ) + } + return tw.Flush() + }, +} + +func prepareNamespace(kubeCfgPath string, namespace string) error { + if namespace == "" { + return fmt.Errorf("namespace cannot be empty") + } + + if namespace == "default" { + return nil + } + + clientset, err := newClientsetWithRateLimiter(kubeCfgPath, 30, 10) + if err != nil { + return err + } + + _, err = clientset.CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: namespace, + }, + }, metav1.CreateOptions{}) + if err != nil { + // If the namespace already exists, ignore the error + if errors.IsAlreadyExists(err) { + return nil + } + return fmt.Errorf("failed to create namespace %s: %v", namespace, err) + } + return nil +} + +func newClientsetWithRateLimiter(kubeCfgPath string, qps float32, burst int) (*kubernetes.Clientset, error) { + config, err := clientcmd.BuildConfigFromFlags("", kubeCfgPath) + if err != nil { + return nil, err + } + + config.QPS = qps + config.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(qps, burst) + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, err + } + return clientset, nil +} + +func createDaemonsets(clientset *kubernetes.Clientset, namespace string, dsName string, count int) error { + for i := 0; i < count; i++ { + ds := &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%d", dsName, i), + Labels: map[string]string{ + "app": appLabel, + "dsName": dsName, + "idx": fmt.Sprintf("%d", i), + }, + Namespace: namespace, + }, + Spec: appsv1.DaemonSetSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": appLabel, + "dsName": dsName, + "idx": fmt.Sprintf("%d", i), + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": appLabel, + "dsName": dsName, + "idx": fmt.Sprintf("%d", i), + }, + }, + // Set node affinity and tolerations to ensure the pods are scheduled on virtual nodes + Spec: corev1.PodSpec{ + RestartPolicy: corev1.RestartPolicyAlways, + Affinity: &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "type", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"kperf-virtualnodes"}, + }, + }, + }, + }, + }, + }, + }, + Tolerations: []corev1.Toleration{ + { + Key: "kperf.io/nodepool", + Operator: corev1.TolerationOpExists, + Effect: corev1.TaintEffectNoSchedule, + }, + }, + Containers: []corev1.Container{ + { + Name: "fake-container", + Image: "fake-image", + }, + }, + }, + }, + }, + } + + _, err := clientset.AppsV1().DaemonSets(namespace).Create(context.TODO(), ds, metav1.CreateOptions{}) + if err != nil { + return err + } + fmt.Printf("Created daemonset %s in namespace %s\n", ds.Name, namespace) + } + return nil +} + +func deleteDaemonsets(clientset *kubernetes.Clientset, namespace, dsName string) error { + // List all daemonsets with the label selector + labelSelector := fmt.Sprintf("app=%s, dsName in (%s)", appLabel, dsName) + daemonSets, err := listDaemonsets(clientset, labelSelector, namespace) + if err != nil { + return err + } + + if len(daemonSets.Items) == 0 { + return fmt.Errorf("no daemonsets found in namespace: %s", namespace) + } + + for i := range daemonSets.Items { + err := clientset.AppsV1().DaemonSets(namespace).Delete(context.TODO(), daemonSets.Items[i].Name, metav1.DeleteOptions{}) + if err != nil && !errors.IsNotFound(err) { + // Ignore not found errors + return fmt.Errorf("failed to delete daemonset %s: %v", daemonSets.Items[i].Name, err) + } + } + return nil +} + +func listDaemonsets(clientset *kubernetes.Clientset, labelSelector string, namespace string) (*appsv1.DaemonSetList, error) { + daemonSets, err := clientset.AppsV1().DaemonSets(namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: labelSelector}) + if err != nil { + return nil, fmt.Errorf("failed to list daemonsets: %v", err) + } + + return daemonSets, nil +} diff --git a/contrib/cmd/runkperf/commands/data/root.go b/contrib/cmd/runkperf/commands/data/root.go index 13b642a6..d1cbc5a1 100644 --- a/contrib/cmd/runkperf/commands/data/root.go +++ b/contrib/cmd/runkperf/commands/data/root.go @@ -5,6 +5,7 @@ package data import ( "github.com/Azure/kperf/contrib/cmd/runkperf/commands/data/configmaps" + "github.com/Azure/kperf/contrib/cmd/runkperf/commands/data/daemonsets" "github.com/urfave/cli" ) @@ -14,5 +15,6 @@ var Command = cli.Command{ Usage: "Create data for runkperf", Subcommands: []cli.Command{ configmaps.Command, + daemonsets.Command, }, } diff --git a/contrib/internal/manifests/loadprofile/node10_job1_pod1k.yaml b/contrib/internal/manifests/loadprofile/node10_job1_pod1k.yaml new file mode 100644 index 00000000..0e749654 --- /dev/null +++ b/contrib/internal/manifests/loadprofile/node10_job1_pod1k.yaml @@ -0,0 +1,23 @@ +# SLI Read-Only Node10 Job1 Pod1k load profile +count: 10 +loadProfile: + version: 1 + description: "10nodes_1000pods" + spec: + rate: 10 + conns: 10 + client: 10 + contentType: json + requests: + - staleList: + version: v1 + resource: pods + namespace: job1pod1k + shares: 10 + - staleGet: + version: v1 + resource: pods + namespace: virtualnodes-kperf-io + name: node10job1pod1k-1 + shares: 300 + #TODO: Add more resources: jobs, nodes, configmaps ... in next pr diff --git a/contrib/internal/manifests/workload/1kpod.job.yaml b/contrib/internal/manifests/workload/1kpod.job.yaml new file mode 100644 index 00000000..930af3c5 --- /dev/null +++ b/contrib/internal/manifests/workload/1kpod.job.yaml @@ -0,0 +1,32 @@ +#SLI Read-Only Node10 Job1 Pod1k workload manifest +apiVersion: batch/v1 +kind: Job +metadata: + name: batchjobs +spec: + completions: 1000 + parallelism: 100 + template: + metadata: + labels: + app: fake-pod + spec: + restartPolicy: Never + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: type + operator: In + values: + - kperf-virtualnodes + # A taints was added to an automatically created Node. + # You can remove taints of Node or add this tolerations. + tolerations: + - key: "kperf.io/nodepool" + operator: "Exists" + effect: "NoSchedule" + containers: + - name: fake-container + image: fake-image From 4bd0c8461cbf01dd9fb4702fb7419ed52046c0b9 Mon Sep 17 00:00:00 2001 From: Xinwei Liu Date: Tue, 15 Jul 2025 15:34:53 +1000 Subject: [PATCH 5/5] format code --- contrib/cmd/runkperf/commands/bench/node10_job1_pod1k.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/contrib/cmd/runkperf/commands/bench/node10_job1_pod1k.go b/contrib/cmd/runkperf/commands/bench/node10_job1_pod1k.go index 4056b719..f2096088 100644 --- a/contrib/cmd/runkperf/commands/bench/node10_job1_pod1k.go +++ b/contrib/cmd/runkperf/commands/bench/node10_job1_pod1k.go @@ -72,8 +72,8 @@ func benchNode10Job1Pod1kCaseRun(cliCtx *cli.Context) (*internaltypes.BenchmarkR go func() { defer wg.Done() - utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "job1pod1k", "workload/1kpod.job.yaml", - utils.WithJobIntervalOpt(jobInterval)) + utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "job1pod1k", "workload/1kpod.job.yaml", + utils.WithJobIntervalOpt(jobInterval)) }()