Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/cmd/runkperf/commands/bench/node100_job1_pod3k.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func benchNode100Job1Pod3KCaseRun(cliCtx *cli.Context) (*internaltypes.Benchmark
go func() {
defer wg.Done()

utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "job1pod3k", "workload/3kpod.job.yaml", jobInterval)
utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "job1pod3k", "workload/3kpod.job.yaml", jobInterval, utils.DefaultJobsTimeout)
}()

rgResult, derr := utils.DeployRunnerGroup(ctx,
Expand Down
2 changes: 1 addition & 1 deletion contrib/cmd/runkperf/commands/bench/node100_pod10k.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func benchNode100DeploymentNPod10KRun(cliCtx *cli.Context) (*internaltypes.Bench
deploymentNamePattern := "benchmark"

rollingUpdateFn, ruCleanupFn, err := utils.DeployAndRepeatRollingUpdateDeployments(dpCtx,
kubeCfgPath, deploymentNamePattern, total, replica, paddingBytes, restartInterval)
kubeCfgPath, deploymentNamePattern, total, replica, paddingBytes, restartInterval, utils.DefaultDeploymentsTimeout)
if err != nil {
dpCancel()
return nil, fmt.Errorf("failed to setup workload: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion contrib/cmd/runkperf/commands/bench/node10_job1_pod100.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func benchNode10Job1Pod100CaseRun(cliCtx *cli.Context) (*internaltypes.Benchmark
go func() {
defer wg.Done()

utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "job1pod100", "workload/100pod.job.yaml", jobInterval)
utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "job1pod100", "workload/100pod.job.yaml", jobInterval, utils.DefaultJobsTimeout)
}()

rgResult, derr := utils.DeployRunnerGroup(ctx,
Expand Down
2 changes: 1 addition & 1 deletion contrib/cmd/runkperf/commands/warmup/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ var Command = cli.Command{
go func() {
defer wg.Done()

utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "warmupjob", "workload/3kpod.job.yaml", 5*time.Second)
utils.RepeatJobWithPod(jobCtx, kubeCfgPath, "warmupjob", "workload/3kpod.job.yaml", 5*time.Second, utils.DefaultJobsTimeout)
}()

_, derr := utils.DeployRunnerGroup(ctx,
Expand Down
38 changes: 31 additions & 7 deletions contrib/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,31 @@ var (
EKSIdleNodepoolInstanceType = "m4.large"
)

type DeploymentsTimeout struct {
deployTimeout time.Duration
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this field is unvisiable to other package. I think we can handle it like this.

type deploymentTimeoutOption struct {
      deployTimeout time.Duration
      restartTimeout time.Duration
      rolloutTimeout time.Duration
      rolloutInterval time.Duration
}

type DeploymentTimeoutOpt func(*deploymentTimeoutOption)

func WithDeploymentDeployTimeoutOpt(to time.Duration) DeploymentTimeoutOpt {
         return func(dto *deploymentTimeoutOption) {
                dto.deployTimeout = to
        }
}
...

func DeployAndRepeatRollingUpdateDeployments(
	ctx context.Context,
	kubeCfgPath string,
	releaseName string,
	total, replica, paddingBytes int,
	timeoutOps DeploymentTimeoutOpt...,
) (rollingUpdateFn, cleanupFn func(), retErr error) {
           var to = &deploymentTimeout{
                  deployTimeout:  10 * time.Minute,
	         restartTimeout: 2 * time.Minute,
	         rolloutTimeout: 10 * time.Minute,
                 rolloutInterval: 5 * time.Second
           }
          
           for _, opt := range timeoutOpts {
                 opt(to)
           }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change it as suggested and include a new file containing the functions.
All conflicts have been successfully resolved.

restartTimeout time.Duration
rolloutTimeout time.Duration
}
type JobsTimeout struct {
applyTimeout time.Duration
waitTimeout time.Duration
deleteTimeout time.Duration
}

var DefaultDeploymentsTimeout = DeploymentsTimeout{
deployTimeout: 10 * time.Minute,
restartTimeout: 2 * time.Minute,
rolloutTimeout: 10 * time.Minute,
}
var DefaultJobsTimeout = JobsTimeout{
applyTimeout: 5 * time.Minute,
waitTimeout: 15 * time.Minute,
deleteTimeout: 5 * time.Minute,
}

// RepeatJobWithPod repeats to deploy 3k pods.
func RepeatJobWithPod(ctx context.Context, kubeCfgPath string, namespace string, target string, internal time.Duration) {
func RepeatJobWithPod(ctx context.Context, kubeCfgPath string, namespace string,
target string, internal time.Duration, jobsTimeout JobsTimeout) {
infoLogger := log.GetLogger(ctx).WithKeyValues("level", "info")
warnLogger := log.GetLogger(ctx).WithKeyValues("level", "warn")

Expand Down Expand Up @@ -85,18 +108,18 @@ func RepeatJobWithPod(ctx context.Context, kubeCfgPath string, namespace string,

time.Sleep(retryInterval)

aerr := kr.Apply(ctx, 5*time.Minute, jobFile)
aerr := kr.Apply(ctx, jobsTimeout.applyTimeout, jobFile)
if aerr != nil {
warnLogger.LogKV("msg", "failed to apply job, retry after 5 seconds", "job", target, "error", aerr)
continue
}

werr := kr.Wait(ctx, 15*time.Minute, "condition=complete", "15m", "job/batchjobs")
werr := kr.Wait(ctx, jobsTimeout.waitTimeout, "condition=complete", "15m", "job/batchjobs")
if werr != nil {
warnLogger.LogKV("msg", "failed to wait job finish", "job", target, "error", werr)
}

derr := kr.Delete(ctx, 5*time.Minute, jobFile)
derr := kr.Delete(ctx, jobsTimeout.deleteTimeout, jobFile)
if derr != nil {
warnLogger.LogKV("msg", "failed to delete job", "job", target, "error", derr)
}
Expand All @@ -111,6 +134,7 @@ func DeployAndRepeatRollingUpdateDeployments(
releaseName string,
total, replica, paddingBytes int,
internal time.Duration,
deploymentsTimeout DeploymentsTimeout,
) (rollingUpdateFn, cleanupFn func(), retErr error) {
infoLogger := log.GetLogger(ctx).WithKeyValues("level", "info")
warnLogger := log.GetLogger(ctx).WithKeyValues("level", "warn")
Expand Down Expand Up @@ -149,7 +173,7 @@ func DeployAndRepeatRollingUpdateDeployments(
"paddingBytes", paddingBytes,
)

err = releaseCli.Deploy(ctx, 10*time.Minute)
err = releaseCli.Deploy(ctx, deploymentsTimeout.deployTimeout)
if err != nil {
if errors.Is(err, context.Canceled) {
infoLogger.LogKV("msg", "deploy is canceled")
Expand Down Expand Up @@ -187,12 +211,12 @@ func DeployAndRepeatRollingUpdateDeployments(
err := func() error {
kr := NewKubectlRunner(kubeCfgPath, ns)

err := kr.DeploymentRestart(ctx, 2*time.Minute, name)
err := kr.DeploymentRestart(ctx, deploymentsTimeout.restartTimeout, name)
if err != nil {
return fmt.Errorf("failed to restart deployment %s: %w", name, err)
}

err = kr.DeploymentRolloutStatus(ctx, 10*time.Minute, name)
err = kr.DeploymentRolloutStatus(ctx, deploymentsTimeout.rolloutTimeout, name)
if err != nil {
return fmt.Errorf("failed to watch the rollout status of deployment %s: %w", name, err)
}
Expand Down
Loading