Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions contrib/cmd/runkperf/commands/bench/node100_pod10k.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,9 @@ func benchNode100DeploymentNPod10KRun(cliCtx *cli.Context) (*internaltypes.Bench
// NOTE: The name pattern should be aligned with ../../../../internal/manifests/loadprofile/node100_pod10k.yaml.
deploymentNamePattern := "benchmark"

rollingUpdateFn, ruCleanupFn, err := utils.DeployAndRepeatRollingUpdateDeployments(dpCtx,
kubeCfgPath, deploymentNamePattern, total, replica, paddingBytes, restartInterval)
// TODO(xinwei): Implement batching support for deploying deployments after decoupling it from rolling update logic.
ruCleanupFn, err := utils.DeployDeployments(dpCtx,
kubeCfgPath, deploymentNamePattern, total, replica, paddingBytes)
if err != nil {
dpCancel()
return nil, fmt.Errorf("failed to setup workload: %w", err)
Expand All @@ -129,7 +130,7 @@ func benchNode100DeploymentNPod10KRun(cliCtx *cli.Context) (*internaltypes.Bench
//
// DeployRunnerGroup should return ready notification.
// The rolling update should run after runners.
rollingUpdateFn()
utils.RollingUpdateDeployments(dpCtx, total, deploymentNamePattern, kubeCfgPath, restartInterval)
}()

rgResult, derr := utils.DeployRunnerGroup(ctx,
Expand Down
77 changes: 39 additions & 38 deletions contrib/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,21 +104,20 @@ func RepeatJobWithPod(ctx context.Context, kubeCfgPath string, namespace string,
}
}

// DeployAndRepeatRollingUpdateDeployments deploys and repeats to rolling-update deployments.
func DeployAndRepeatRollingUpdateDeployments(
// DeployDeployments deploys deployments.
func DeployDeployments(
ctx context.Context,
kubeCfgPath string,
releaseName string,
total, replica, paddingBytes int,
internal time.Duration,
) (rollingUpdateFn, cleanupFn func(), retErr error) {
) (cleanupFn func(), retErr error) {
infoLogger := log.GetLogger(ctx).WithKeyValues("level", "info")
warnLogger := log.GetLogger(ctx).WithKeyValues("level", "warn")

target := "workload/deployments"
ch, err := manifests.LoadChart(target)
if err != nil {
return nil, nil, fmt.Errorf("failed to load %s chart: %w", target, err)
return nil, fmt.Errorf("failed to load %s chart: %w", target, err)
}

namePattern := releaseName
Expand All @@ -139,7 +138,7 @@ func DeployAndRepeatRollingUpdateDeployments(
),
)
if err != nil {
return nil, nil, fmt.Errorf("failed to create a new helm release cli: %w", err)
return nil, fmt.Errorf("failed to create a new helm release cli: %w", err)
}

infoLogger.LogKV(
Expand All @@ -153,9 +152,9 @@ func DeployAndRepeatRollingUpdateDeployments(
if err != nil {
if errors.Is(err, context.Canceled) {
infoLogger.LogKV("msg", "deploy is canceled")
return func() {}, func() {}, nil
return func() {}, nil
}
return nil, nil, fmt.Errorf("failed to deploy helm chart %s: %w", target, err)
return nil, fmt.Errorf("failed to deploy helm chart %s: %w", target, err)
}
infoLogger.LogKV("msg", "deployed deployments")

Expand All @@ -169,45 +168,47 @@ func DeployAndRepeatRollingUpdateDeployments(
}
}

rollingUpdateFn = func() {
for {
select {
case <-ctx.Done():
infoLogger.LogKV("msg", "stop rolling-updating")
return
case <-time.After(internal):
}
return cleanupFn, nil
}
func RollingUpdateDeployments(ctx context.Context, total int, namePattern string, kubeCfgPath string, internal time.Duration) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not blocker: you can help it in the follow-up

func RollingUpdateDeployments(ctx context.Context, kubeCfgPath string, namePattern string, total int, internal time.Duration)

infoLogger := log.GetLogger(ctx).WithKeyValues("level", "info")
warnLogger := log.GetLogger(ctx).WithKeyValues("level", "warn")
for {
select {
case <-ctx.Done():
infoLogger.LogKV("msg", "stop rolling-updating")
return
case <-time.After(internal):
}

infoLogger.LogKV("msg", "start to rolling-update deployments")
for i := 0; i < total; i++ {
name := fmt.Sprintf("%s-%d", namePattern, i)
ns := name
infoLogger.LogKV("msg", "start to rolling-update deployments")
for i := range total {
name := fmt.Sprintf("%s-%d", namePattern, i)
ns := name

infoLogger.LogKV("msg", "rolling-update deployment", "name", name, "namespace", ns)
err := func() error {
kr := NewKubectlRunner(kubeCfgPath, ns)
infoLogger.LogKV("msg", "rolling-update deployment", "name", name, "namespace", ns)
err := func() error {
kr := NewKubectlRunner(kubeCfgPath, ns)

err := kr.DeploymentRestart(ctx, 2*time.Minute, name)
if err != nil {
return fmt.Errorf("failed to restart deployment %s: %w", name, err)
}
err := kr.DeploymentRestart(ctx, 2*time.Minute, name)
if err != nil {
return fmt.Errorf("failed to restart deployment %s: %w", name, err)
}

err = kr.DeploymentRolloutStatus(ctx, 10*time.Minute, name)
if err != nil {
return fmt.Errorf("failed to watch the rollout status of deployment %s: %w", name, err)
}
return nil
}()
err = kr.DeploymentRolloutStatus(ctx, 10*time.Minute, name)
if err != nil {
warnLogger.LogKV("msg", "failed to rolling-update",
"error", err,
"deployment", name,
"namespace", ns)
return fmt.Errorf("failed to watch the rollout status of deployment %s: %w", name, err)
}
return nil
}()
if err != nil {
warnLogger.LogKV("msg", "failed to rolling-update",
"error", err,
"deployment", name,
"namespace", ns)
}
}
}
return rollingUpdateFn, cleanupFn, nil
}

// NewRunnerGroupSpecFromYAML returns RunnerGroupSpec instance from yaml data.
Expand Down
Loading