diff --git a/contrib/cmd/runkperf/commands/bench/node100_pod10k.go b/contrib/cmd/runkperf/commands/bench/node100_pod10k.go index 4c4e9359..0f5f5033 100644 --- a/contrib/cmd/runkperf/commands/bench/node100_pod10k.go +++ b/contrib/cmd/runkperf/commands/bench/node100_pod10k.go @@ -102,8 +102,9 @@ func benchNode100DeploymentNPod10KRun(cliCtx *cli.Context) (*internaltypes.Bench // NOTE: The name pattern should be aligned with ../../../../internal/manifests/loadprofile/node100_pod10k.yaml. deploymentNamePattern := "benchmark" - rollingUpdateFn, ruCleanupFn, err := utils.DeployAndRepeatRollingUpdateDeployments(dpCtx, - kubeCfgPath, deploymentNamePattern, total, replica, paddingBytes, restartInterval) + // TODO(xinwei): Implement batching support for deploying deployments after decoupling it from rolling update logic. + ruCleanupFn, err := utils.DeployDeployments(dpCtx, + kubeCfgPath, deploymentNamePattern, total, replica, paddingBytes) if err != nil { dpCancel() return nil, fmt.Errorf("failed to setup workload: %w", err) @@ -129,7 +130,7 @@ func benchNode100DeploymentNPod10KRun(cliCtx *cli.Context) (*internaltypes.Bench // // DeployRunnerGroup should return ready notification. // The rolling update should run after runners. - rollingUpdateFn() + utils.RollingUpdateDeployments(dpCtx, total, deploymentNamePattern, kubeCfgPath, restartInterval) }() rgResult, derr := utils.DeployRunnerGroup(ctx, diff --git a/contrib/utils/utils.go b/contrib/utils/utils.go index 6e59a667..57497a26 100644 --- a/contrib/utils/utils.go +++ b/contrib/utils/utils.go @@ -104,21 +104,20 @@ func RepeatJobWithPod(ctx context.Context, kubeCfgPath string, namespace string, } } -// DeployAndRepeatRollingUpdateDeployments deploys and repeats to rolling-update deployments. -func DeployAndRepeatRollingUpdateDeployments( +// DeployDeployments deploys deployments. +func DeployDeployments( ctx context.Context, kubeCfgPath string, releaseName string, total, replica, paddingBytes int, - internal time.Duration, -) (rollingUpdateFn, cleanupFn func(), retErr error) { +) (cleanupFn func(), retErr error) { infoLogger := log.GetLogger(ctx).WithKeyValues("level", "info") warnLogger := log.GetLogger(ctx).WithKeyValues("level", "warn") target := "workload/deployments" ch, err := manifests.LoadChart(target) if err != nil { - return nil, nil, fmt.Errorf("failed to load %s chart: %w", target, err) + return nil, fmt.Errorf("failed to load %s chart: %w", target, err) } namePattern := releaseName @@ -139,7 +138,7 @@ func DeployAndRepeatRollingUpdateDeployments( ), ) if err != nil { - return nil, nil, fmt.Errorf("failed to create a new helm release cli: %w", err) + return nil, fmt.Errorf("failed to create a new helm release cli: %w", err) } infoLogger.LogKV( @@ -153,9 +152,9 @@ func DeployAndRepeatRollingUpdateDeployments( if err != nil { if errors.Is(err, context.Canceled) { infoLogger.LogKV("msg", "deploy is canceled") - return func() {}, func() {}, nil + return func() {}, nil } - return nil, nil, fmt.Errorf("failed to deploy helm chart %s: %w", target, err) + return nil, fmt.Errorf("failed to deploy helm chart %s: %w", target, err) } infoLogger.LogKV("msg", "deployed deployments") @@ -169,45 +168,48 @@ func DeployAndRepeatRollingUpdateDeployments( } } - rollingUpdateFn = func() { - for { - select { - case <-ctx.Done(): - infoLogger.LogKV("msg", "stop rolling-updating") - return - case <-time.After(internal): - } + return cleanupFn, nil +} - infoLogger.LogKV("msg", "start to rolling-update deployments") - for i := 0; i < total; i++ { - name := fmt.Sprintf("%s-%d", namePattern, i) - ns := name +func RollingUpdateDeployments(ctx context.Context, total int, namePattern string, kubeCfgPath string, internal time.Duration) { + infoLogger := log.GetLogger(ctx).WithKeyValues("level", "info") + warnLogger := log.GetLogger(ctx).WithKeyValues("level", "warn") + for { + select { + case <-ctx.Done(): + infoLogger.LogKV("msg", "stop rolling-updating") + return + case <-time.After(internal): + } - infoLogger.LogKV("msg", "rolling-update deployment", "name", name, "namespace", ns) - err := func() error { - kr := NewKubectlRunner(kubeCfgPath, ns) + infoLogger.LogKV("msg", "start to rolling-update deployments") + for i := range total { + name := fmt.Sprintf("%s-%d", namePattern, i) + ns := name - err := kr.DeploymentRestart(ctx, 2*time.Minute, name) - if err != nil { - return fmt.Errorf("failed to restart deployment %s: %w", name, err) - } + infoLogger.LogKV("msg", "rolling-update deployment", "name", name, "namespace", ns) + err := func() error { + kr := NewKubectlRunner(kubeCfgPath, ns) - err = kr.DeploymentRolloutStatus(ctx, 10*time.Minute, name) - if err != nil { - return fmt.Errorf("failed to watch the rollout status of deployment %s: %w", name, err) - } - return nil - }() + err := kr.DeploymentRestart(ctx, 2*time.Minute, name) if err != nil { - warnLogger.LogKV("msg", "failed to rolling-update", - "error", err, - "deployment", name, - "namespace", ns) + return fmt.Errorf("failed to restart deployment %s: %w", name, err) } + + err = kr.DeploymentRolloutStatus(ctx, 10*time.Minute, name) + if err != nil { + return fmt.Errorf("failed to watch the rollout status of deployment %s: %w", name, err) + } + return nil + }() + if err != nil { + warnLogger.LogKV("msg", "failed to rolling-update", + "error", err, + "deployment", name, + "namespace", ns) } } } - return rollingUpdateFn, cleanupFn, nil } // NewRunnerGroupSpecFromYAML returns RunnerGroupSpec instance from yaml data.