diff --git a/contrib/cmd/runkperf/commands/bench/node100_job1_pod3k.go b/contrib/cmd/runkperf/commands/bench/node100_job1_pod3k.go index cc1981d3..c425cd1d 100644 --- a/contrib/cmd/runkperf/commands/bench/node100_job1_pod3k.go +++ b/contrib/cmd/runkperf/commands/bench/node100_job1_pod3k.go @@ -71,7 +71,8 @@ func benchNode100Job1Pod3KCaseRun(cliCtx *cli.Context) (*internaltypes.Benchmark go func() { defer wg.Done() - utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "job1pod3k", "workload/3kpod.job.yaml", jobInterval) + utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "job1pod3k", "workload/3kpod.job.yaml", + utils.WithJobIntervalOpt(jobInterval)) }() rgResult, derr := utils.DeployRunnerGroup(ctx, diff --git a/contrib/cmd/runkperf/commands/bench/node100_pod10k.go b/contrib/cmd/runkperf/commands/bench/node100_pod10k.go index 0f5f5033..9f3816be 100644 --- a/contrib/cmd/runkperf/commands/bench/node100_pod10k.go +++ b/contrib/cmd/runkperf/commands/bench/node100_pod10k.go @@ -104,7 +104,7 @@ func benchNode100DeploymentNPod10KRun(cliCtx *cli.Context) (*internaltypes.Bench // TODO(xinwei): Implement batching support for deploying deployments after decoupling it from rolling update logic. ruCleanupFn, err := utils.DeployDeployments(dpCtx, - kubeCfgPath, deploymentNamePattern, total, replica, paddingBytes) + kubeCfgPath, deploymentNamePattern, total, replica, paddingBytes, 10*time.Minute) if err != nil { dpCancel() return nil, fmt.Errorf("failed to setup workload: %w", err) @@ -130,7 +130,7 @@ func benchNode100DeploymentNPod10KRun(cliCtx *cli.Context) (*internaltypes.Bench // // DeployRunnerGroup should return ready notification. // The rolling update should run after runners. - utils.RollingUpdateDeployments(dpCtx, total, deploymentNamePattern, kubeCfgPath, restartInterval) + utils.RollingUpdateDeployments(dpCtx, total, deploymentNamePattern, kubeCfgPath, utils.WithRollingUpdateIntervalTimeoutOpt(restartInterval)) }() rgResult, derr := utils.DeployRunnerGroup(ctx, diff --git a/contrib/cmd/runkperf/commands/bench/node10_job1_pod100.go b/contrib/cmd/runkperf/commands/bench/node10_job1_pod100.go index e50b864c..cd38c775 100644 --- a/contrib/cmd/runkperf/commands/bench/node10_job1_pod100.go +++ b/contrib/cmd/runkperf/commands/bench/node10_job1_pod100.go @@ -71,7 +71,8 @@ func benchNode10Job1Pod100CaseRun(cliCtx *cli.Context) (*internaltypes.Benchmark go func() { defer wg.Done() - utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "job1pod100", "workload/100pod.job.yaml", jobInterval) + utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "job1pod100", "workload/100pod.job.yaml", + utils.WithJobIntervalOpt(jobInterval)) }() rgResult, derr := utils.DeployRunnerGroup(ctx, diff --git a/contrib/cmd/runkperf/commands/bench/node10_job1_pod1k.go b/contrib/cmd/runkperf/commands/bench/node10_job1_pod1k.go index 6fd5e184..f2096088 100644 --- a/contrib/cmd/runkperf/commands/bench/node10_job1_pod1k.go +++ b/contrib/cmd/runkperf/commands/bench/node10_job1_pod1k.go @@ -72,7 +72,9 @@ func benchNode10Job1Pod1kCaseRun(cliCtx *cli.Context) (*internaltypes.BenchmarkR go func() { defer wg.Done() - utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "job1pod1k", "workload/1kpod.job.yaml", jobInterval) + utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "job1pod1k", "workload/1kpod.job.yaml", + utils.WithJobIntervalOpt(jobInterval)) + }() rgResult, derr := utils.DeployRunnerGroup(ctx, diff --git a/contrib/cmd/runkperf/commands/warmup/command.go b/contrib/cmd/runkperf/commands/warmup/command.go index b5debe4f..e022e91f 100644 --- a/contrib/cmd/runkperf/commands/warmup/command.go +++ b/contrib/cmd/runkperf/commands/warmup/command.go @@ -156,7 +156,8 @@ var Command = cli.Command{ go func() { defer wg.Done() - utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "warmupjob", "workload/3kpod.job.yaml", 5*time.Second) + utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "warmupjob", "workload/3kpod.job.yaml", + utils.WithJobIntervalOpt(5*time.Second)) }() _, derr := utils.DeployRunnerGroup(ctx, diff --git a/contrib/utils/utils.go b/contrib/utils/utils.go index 57497a26..9eb3fdbb 100644 --- a/contrib/utils/utils.go +++ b/contrib/utils/utils.go @@ -40,10 +40,21 @@ var ( ) // RepeatJobWithPod repeats to deploy 3k pods. -func RepeatJobWithPod(ctx context.Context, kubeCfgPath string, namespace string, target string, internal time.Duration) { +func RepeatJobWithPod(ctx context.Context, kubeCfgPath string, namespace string, + target string, timeoutOpts ...JobTimeoutOpt) { infoLogger := log.GetLogger(ctx).WithKeyValues("level", "info") warnLogger := log.GetLogger(ctx).WithKeyValues("level", "warn") + var jobsTimeout = &jobsTimeoutOption{ + jobInterval: 10 * time.Second, + applyTimeout: 5 * time.Minute, + waitTimeout: 15 * time.Minute, + deleteTimeout: 5 * time.Minute, + } + for _, opt := range timeoutOpts { + opt(jobsTimeout) + } + infoLogger.LogKV("msg", "repeat to create job with 3k pods") data, err := manifests.FS.ReadFile(target) @@ -85,22 +96,23 @@ func RepeatJobWithPod(ctx context.Context, kubeCfgPath string, namespace string, time.Sleep(retryInterval) - aerr := kr.Apply(ctx, 5*time.Minute, jobFile) + aerr := kr.Apply(ctx, jobsTimeout.applyTimeout, jobFile) if aerr != nil { warnLogger.LogKV("msg", "failed to apply job, retry after 5 seconds", "job", target, "error", aerr) continue } - werr := kr.Wait(ctx, 15*time.Minute, "condition=complete", "15m", "job/batchjobs") + timoutString := fmt.Sprintf("%ds", int(jobsTimeout.waitTimeout.Seconds())) + werr := kr.Wait(ctx, jobsTimeout.waitTimeout, "condition=complete", timoutString, "job/batchjobs") if werr != nil { warnLogger.LogKV("msg", "failed to wait job finish", "job", target, "error", werr) } - derr := kr.Delete(ctx, 5*time.Minute, jobFile) + derr := kr.Delete(ctx, jobsTimeout.deleteTimeout, jobFile) if derr != nil { warnLogger.LogKV("msg", "failed to delete job", "job", target, "error", derr) } - time.Sleep(internal) + time.Sleep(jobsTimeout.jobInterval) } } @@ -110,6 +122,7 @@ func DeployDeployments( kubeCfgPath string, releaseName string, total, replica, paddingBytes int, + deployTimeout time.Duration, ) (cleanupFn func(), retErr error) { infoLogger := log.GetLogger(ctx).WithKeyValues("level", "info") warnLogger := log.GetLogger(ctx).WithKeyValues("level", "warn") @@ -148,7 +161,7 @@ func DeployDeployments( "paddingBytes", paddingBytes, ) - err = releaseCli.Deploy(ctx, 10*time.Minute) + err = releaseCli.Deploy(ctx, deployTimeout) if err != nil { if errors.Is(err, context.Canceled) { infoLogger.LogKV("msg", "deploy is canceled") @@ -171,7 +184,17 @@ func DeployDeployments( return cleanupFn, nil } -func RollingUpdateDeployments(ctx context.Context, total int, namePattern string, kubeCfgPath string, internal time.Duration) { +func RollingUpdateDeployments(ctx context.Context, total int, + namePattern string, kubeCfgPath string, timeoutOpts ...RollingUpdateTimeoutOpt) { + var rollingUpdateTimeout = &rollingUpdateTimeoutOption{ + restartTimeout: 2 * time.Minute, + rolloutTimeout: 10 * time.Minute, + rolloutInterval: 1 * time.Minute, + } + for _, opt := range timeoutOpts { + opt(rollingUpdateTimeout) + } + infoLogger := log.GetLogger(ctx).WithKeyValues("level", "info") warnLogger := log.GetLogger(ctx).WithKeyValues("level", "warn") for { @@ -179,7 +202,7 @@ func RollingUpdateDeployments(ctx context.Context, total int, namePattern string case <-ctx.Done(): infoLogger.LogKV("msg", "stop rolling-updating") return - case <-time.After(internal): + case <-time.After(rollingUpdateTimeout.rolloutInterval): } infoLogger.LogKV("msg", "start to rolling-update deployments") @@ -191,12 +214,12 @@ func RollingUpdateDeployments(ctx context.Context, total int, namePattern string err := func() error { kr := NewKubectlRunner(kubeCfgPath, ns) - err := kr.DeploymentRestart(ctx, 2*time.Minute, name) + err := kr.DeploymentRestart(ctx, rollingUpdateTimeout.restartTimeout, name) if err != nil { return fmt.Errorf("failed to restart deployment %s: %w", name, err) } - err = kr.DeploymentRolloutStatus(ctx, 10*time.Minute, name) + err = kr.DeploymentRolloutStatus(ctx, rollingUpdateTimeout.rolloutTimeout, name) if err != nil { return fmt.Errorf("failed to watch the rollout status of deployment %s: %w", name, err) } diff --git a/contrib/utils/utils_comman.go b/contrib/utils/utils_comman.go new file mode 100644 index 00000000..845d8ea0 --- /dev/null +++ b/contrib/utils/utils_comman.go @@ -0,0 +1,65 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +package utils + +import ( + "time" +) + +type rollingUpdateTimeoutOption struct { + restartTimeout time.Duration + rolloutTimeout time.Duration + rolloutInterval time.Duration +} +type jobsTimeoutOption struct { + jobInterval time.Duration + applyTimeout time.Duration + waitTimeout time.Duration + deleteTimeout time.Duration +} + +type RollingUpdateTimeoutOpt func(*rollingUpdateTimeoutOption) + +func WithRollingUpdateRestartTimeoutOpt(to time.Duration) RollingUpdateTimeoutOpt { + return func(rto *rollingUpdateTimeoutOption) { + rto.restartTimeout = to + } +} + +func WithRollingUpdateRolloutTimeoutOpt(to time.Duration) RollingUpdateTimeoutOpt { + return func(rto *rollingUpdateTimeoutOption) { + rto.rolloutTimeout = to + } +} + +func WithRollingUpdateIntervalTimeoutOpt(to time.Duration) RollingUpdateTimeoutOpt { + return func(rto *rollingUpdateTimeoutOption) { + rto.rolloutInterval = to + } +} + +type JobTimeoutOpt func(*jobsTimeoutOption) + +func WithJobIntervalOpt(to time.Duration) JobTimeoutOpt { + return func(jto *jobsTimeoutOption) { + jto.jobInterval = to + } +} +func WithJobApplyTimeoutOpt(to time.Duration) JobTimeoutOpt { + return func(jto *jobsTimeoutOption) { + jto.applyTimeout = to + } +} + +func WithJobWaitTimeoutOpt(to time.Duration) JobTimeoutOpt { + return func(jto *jobsTimeoutOption) { + jto.waitTimeout = to + } +} + +func WithJobDeleteTimeoutOpt(to time.Duration) JobTimeoutOpt { + return func(jto *jobsTimeoutOption) { + jto.deleteTimeout = to + } +}