Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion contrib/cmd/runkperf/commands/bench/node100_job1_pod3k.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ func benchNode100Job1Pod3KCaseRun(cliCtx *cli.Context) (*internaltypes.Benchmark
go func() {
defer wg.Done()

utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "job1pod3k", "workload/3kpod.job.yaml", jobInterval)
utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "job1pod3k", "workload/3kpod.job.yaml",
utils.WithJobIntervalOpt(jobInterval))
}()

rgResult, derr := utils.DeployRunnerGroup(ctx,
Expand Down
4 changes: 2 additions & 2 deletions contrib/cmd/runkperf/commands/bench/node100_pod10k.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func benchNode100DeploymentNPod10KRun(cliCtx *cli.Context) (*internaltypes.Bench

// TODO(xinwei): Implement batching support for deploying deployments after decoupling it from rolling update logic.
ruCleanupFn, err := utils.DeployDeployments(dpCtx,
kubeCfgPath, deploymentNamePattern, total, replica, paddingBytes)
kubeCfgPath, deploymentNamePattern, total, replica, paddingBytes, 10*time.Minute)
if err != nil {
dpCancel()
return nil, fmt.Errorf("failed to setup workload: %w", err)
Expand All @@ -130,7 +130,7 @@ func benchNode100DeploymentNPod10KRun(cliCtx *cli.Context) (*internaltypes.Bench
//
// DeployRunnerGroup should return ready notification.
// The rolling update should run after runners.
utils.RollingUpdateDeployments(dpCtx, total, deploymentNamePattern, kubeCfgPath, restartInterval)
utils.RollingUpdateDeployments(dpCtx, total, deploymentNamePattern, kubeCfgPath, utils.WithRollingUpdateIntervalTimeoutOpt(restartInterval))
}()

rgResult, derr := utils.DeployRunnerGroup(ctx,
Expand Down
3 changes: 2 additions & 1 deletion contrib/cmd/runkperf/commands/bench/node10_job1_pod100.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ func benchNode10Job1Pod100CaseRun(cliCtx *cli.Context) (*internaltypes.Benchmark
go func() {
defer wg.Done()

utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "job1pod100", "workload/100pod.job.yaml", jobInterval)
utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "job1pod100", "workload/100pod.job.yaml",
utils.WithJobIntervalOpt(jobInterval))
}()

rgResult, derr := utils.DeployRunnerGroup(ctx,
Expand Down
4 changes: 3 additions & 1 deletion contrib/cmd/runkperf/commands/bench/node10_job1_pod1k.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ func benchNode10Job1Pod1kCaseRun(cliCtx *cli.Context) (*internaltypes.BenchmarkR
go func() {
defer wg.Done()

utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "job1pod1k", "workload/1kpod.job.yaml", jobInterval)
utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "job1pod1k", "workload/1kpod.job.yaml",
utils.WithJobIntervalOpt(jobInterval))

}()

rgResult, derr := utils.DeployRunnerGroup(ctx,
Expand Down
3 changes: 2 additions & 1 deletion contrib/cmd/runkperf/commands/warmup/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ var Command = cli.Command{
go func() {
defer wg.Done()

utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "warmupjob", "workload/3kpod.job.yaml", 5*time.Second)
utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "warmupjob", "workload/3kpod.job.yaml",
utils.WithJobIntervalOpt(5*time.Second))
}()

_, derr := utils.DeployRunnerGroup(ctx,
Expand Down
43 changes: 33 additions & 10 deletions contrib/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,21 @@ var (
)

// RepeatJobWithPod repeats to deploy 3k pods.
func RepeatJobWithPod(ctx context.Context, kubeCfgPath string, namespace string, target string, internal time.Duration) {
func RepeatJobWithPod(ctx context.Context, kubeCfgPath string, namespace string,
target string, timeoutOpts ...JobTimeoutOpt) {
infoLogger := log.GetLogger(ctx).WithKeyValues("level", "info")
warnLogger := log.GetLogger(ctx).WithKeyValues("level", "warn")

var jobsTimeout = &jobsTimeoutOption{
jobInterval: 10 * time.Second,
applyTimeout: 5 * time.Minute,
waitTimeout: 15 * time.Minute,
deleteTimeout: 5 * time.Minute,
}
for _, opt := range timeoutOpts {
opt(jobsTimeout)
}

infoLogger.LogKV("msg", "repeat to create job with 3k pods")

data, err := manifests.FS.ReadFile(target)
Expand Down Expand Up @@ -85,22 +96,23 @@ func RepeatJobWithPod(ctx context.Context, kubeCfgPath string, namespace string,

time.Sleep(retryInterval)

aerr := kr.Apply(ctx, 5*time.Minute, jobFile)
aerr := kr.Apply(ctx, jobsTimeout.applyTimeout, jobFile)
if aerr != nil {
warnLogger.LogKV("msg", "failed to apply job, retry after 5 seconds", "job", target, "error", aerr)
continue
}

werr := kr.Wait(ctx, 15*time.Minute, "condition=complete", "15m", "job/batchjobs")
timoutString := fmt.Sprintf("%ds", int(jobsTimeout.waitTimeout.Seconds()))
werr := kr.Wait(ctx, jobsTimeout.waitTimeout, "condition=complete", timoutString, "job/batchjobs")
if werr != nil {
warnLogger.LogKV("msg", "failed to wait job finish", "job", target, "error", werr)
}

derr := kr.Delete(ctx, 5*time.Minute, jobFile)
derr := kr.Delete(ctx, jobsTimeout.deleteTimeout, jobFile)
if derr != nil {
warnLogger.LogKV("msg", "failed to delete job", "job", target, "error", derr)
}
time.Sleep(internal)
time.Sleep(jobsTimeout.jobInterval)
}
}

Expand All @@ -110,6 +122,7 @@ func DeployDeployments(
kubeCfgPath string,
releaseName string,
total, replica, paddingBytes int,
deployTimeout time.Duration,
) (cleanupFn func(), retErr error) {
infoLogger := log.GetLogger(ctx).WithKeyValues("level", "info")
warnLogger := log.GetLogger(ctx).WithKeyValues("level", "warn")
Expand Down Expand Up @@ -148,7 +161,7 @@ func DeployDeployments(
"paddingBytes", paddingBytes,
)

err = releaseCli.Deploy(ctx, 10*time.Minute)
err = releaseCli.Deploy(ctx, deployTimeout)
if err != nil {
if errors.Is(err, context.Canceled) {
infoLogger.LogKV("msg", "deploy is canceled")
Expand All @@ -171,15 +184,25 @@ func DeployDeployments(
return cleanupFn, nil
}

func RollingUpdateDeployments(ctx context.Context, total int, namePattern string, kubeCfgPath string, internal time.Duration) {
func RollingUpdateDeployments(ctx context.Context, total int,
namePattern string, kubeCfgPath string, timeoutOpts ...RollingUpdateTimeoutOpt) {
var rollingUpdateTimeout = &rollingUpdateTimeoutOption{
restartTimeout: 2 * time.Minute,
rolloutTimeout: 10 * time.Minute,
rolloutInterval: 1 * time.Minute,
}
for _, opt := range timeoutOpts {
opt(rollingUpdateTimeout)
}

infoLogger := log.GetLogger(ctx).WithKeyValues("level", "info")
warnLogger := log.GetLogger(ctx).WithKeyValues("level", "warn")
for {
select {
case <-ctx.Done():
infoLogger.LogKV("msg", "stop rolling-updating")
return
case <-time.After(internal):
case <-time.After(rollingUpdateTimeout.rolloutInterval):
}

infoLogger.LogKV("msg", "start to rolling-update deployments")
Expand All @@ -191,12 +214,12 @@ func RollingUpdateDeployments(ctx context.Context, total int, namePattern string
err := func() error {
kr := NewKubectlRunner(kubeCfgPath, ns)

err := kr.DeploymentRestart(ctx, 2*time.Minute, name)
err := kr.DeploymentRestart(ctx, rollingUpdateTimeout.restartTimeout, name)
if err != nil {
return fmt.Errorf("failed to restart deployment %s: %w", name, err)
}

err = kr.DeploymentRolloutStatus(ctx, 10*time.Minute, name)
err = kr.DeploymentRolloutStatus(ctx, rollingUpdateTimeout.rolloutTimeout, name)
if err != nil {
return fmt.Errorf("failed to watch the rollout status of deployment %s: %w", name, err)
}
Expand Down
65 changes: 65 additions & 0 deletions contrib/utils/utils_comman.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

package utils

import (
"time"
)

type rollingUpdateTimeoutOption struct {
restartTimeout time.Duration
rolloutTimeout time.Duration
rolloutInterval time.Duration
}
type jobsTimeoutOption struct {
jobInterval time.Duration
applyTimeout time.Duration
waitTimeout time.Duration
deleteTimeout time.Duration
}

type RollingUpdateTimeoutOpt func(*rollingUpdateTimeoutOption)

func WithRollingUpdateRestartTimeoutOpt(to time.Duration) RollingUpdateTimeoutOpt {
return func(rto *rollingUpdateTimeoutOption) {
rto.restartTimeout = to
}
}

func WithRollingUpdateRolloutTimeoutOpt(to time.Duration) RollingUpdateTimeoutOpt {
return func(rto *rollingUpdateTimeoutOption) {
rto.rolloutTimeout = to
}
}

func WithRollingUpdateIntervalTimeoutOpt(to time.Duration) RollingUpdateTimeoutOpt {
return func(rto *rollingUpdateTimeoutOption) {
rto.rolloutInterval = to
}
}

type JobTimeoutOpt func(*jobsTimeoutOption)

func WithJobIntervalOpt(to time.Duration) JobTimeoutOpt {
return func(jto *jobsTimeoutOption) {
jto.jobInterval = to
}
}
func WithJobApplyTimeoutOpt(to time.Duration) JobTimeoutOpt {
return func(jto *jobsTimeoutOption) {
jto.applyTimeout = to
}
}

func WithJobWaitTimeoutOpt(to time.Duration) JobTimeoutOpt {
return func(jto *jobsTimeoutOption) {
jto.waitTimeout = to
}
}

func WithJobDeleteTimeoutOpt(to time.Duration) JobTimeoutOpt {
return func(jto *jobsTimeoutOption) {
jto.deleteTimeout = to
}
}
Loading