Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions contrib/cmd/runkperf/commands/bench/node100_pod10k.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,9 @@ func benchNode100DeploymentNPod10KRun(cliCtx *cli.Context) (*internaltypes.Bench
// NOTE: The name pattern should be aligned with ../../../../internal/manifests/loadprofile/node100_pod10k.yaml.
deploymentNamePattern := "benchmark"

rollingUpdateFn, ruCleanupFn, err := utils.DeployAndRepeatRollingUpdateDeployments(dpCtx,
kubeCfgPath, deploymentNamePattern, total, replica, paddingBytes, restartInterval)
// TODO(xinwei): Implement batching support for deploying deployments after decoupling it from rolling update logic.
ruCleanupFn, err := utils.DeployDeployments(dpCtx,
kubeCfgPath, deploymentNamePattern, total, replica, paddingBytes)
if err != nil {
dpCancel()
return nil, fmt.Errorf("failed to setup workload: %w", err)
Expand All @@ -129,7 +130,7 @@ func benchNode100DeploymentNPod10KRun(cliCtx *cli.Context) (*internaltypes.Bench
//
// DeployRunnerGroup should return ready notification.
// The rolling update should run after runners.
rollingUpdateFn()
utils.RollingUpdateDeployments(dpCtx, total, deploymentNamePattern, kubeCfgPath, restartInterval)
}()

rgResult, derr := utils.DeployRunnerGroup(ctx,
Expand Down
78 changes: 40 additions & 38 deletions contrib/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,21 +104,20 @@ func RepeatJobWithPod(ctx context.Context, kubeCfgPath string, namespace string,
}
}

// DeployAndRepeatRollingUpdateDeployments deploys and repeats to rolling-update deployments.
func DeployAndRepeatRollingUpdateDeployments(
// DeployDeployments deploys deployments.
func DeployDeployments(
ctx context.Context,
kubeCfgPath string,
releaseName string,
total, replica, paddingBytes int,
internal time.Duration,
) (rollingUpdateFn, cleanupFn func(), retErr error) {
) (cleanupFn func(), retErr error) {
infoLogger := log.GetLogger(ctx).WithKeyValues("level", "info")
warnLogger := log.GetLogger(ctx).WithKeyValues("level", "warn")

target := "workload/deployments"
ch, err := manifests.LoadChart(target)
if err != nil {
return nil, nil, fmt.Errorf("failed to load %s chart: %w", target, err)
return nil, fmt.Errorf("failed to load %s chart: %w", target, err)
}

namePattern := releaseName
Expand All @@ -139,7 +138,7 @@ func DeployAndRepeatRollingUpdateDeployments(
),
)
if err != nil {
return nil, nil, fmt.Errorf("failed to create a new helm release cli: %w", err)
return nil, fmt.Errorf("failed to create a new helm release cli: %w", err)
}

infoLogger.LogKV(
Expand All @@ -153,9 +152,9 @@ func DeployAndRepeatRollingUpdateDeployments(
if err != nil {
if errors.Is(err, context.Canceled) {
infoLogger.LogKV("msg", "deploy is canceled")
return func() {}, func() {}, nil
return func() {}, nil
}
return nil, nil, fmt.Errorf("failed to deploy helm chart %s: %w", target, err)
return nil, fmt.Errorf("failed to deploy helm chart %s: %w", target, err)
}
infoLogger.LogKV("msg", "deployed deployments")

Expand All @@ -169,45 +168,48 @@ func DeployAndRepeatRollingUpdateDeployments(
}
}

rollingUpdateFn = func() {
for {
select {
case <-ctx.Done():
infoLogger.LogKV("msg", "stop rolling-updating")
return
case <-time.After(internal):
}
return cleanupFn, nil
}

infoLogger.LogKV("msg", "start to rolling-update deployments")
for i := 0; i < total; i++ {
name := fmt.Sprintf("%s-%d", namePattern, i)
ns := name
func RollingUpdateDeployments(ctx context.Context, total int, namePattern string, kubeCfgPath string, internal time.Duration) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not blocker: you can help it in the follow-up

func RollingUpdateDeployments(ctx context.Context, kubeCfgPath string, namePattern string, total int, internal time.Duration)

infoLogger := log.GetLogger(ctx).WithKeyValues("level", "info")
warnLogger := log.GetLogger(ctx).WithKeyValues("level", "warn")
for {
select {
case <-ctx.Done():
infoLogger.LogKV("msg", "stop rolling-updating")
return
case <-time.After(internal):
}

infoLogger.LogKV("msg", "rolling-update deployment", "name", name, "namespace", ns)
err := func() error {
kr := NewKubectlRunner(kubeCfgPath, ns)
infoLogger.LogKV("msg", "start to rolling-update deployments")
for i := range total {
name := fmt.Sprintf("%s-%d", namePattern, i)
ns := name

err := kr.DeploymentRestart(ctx, 2*time.Minute, name)
if err != nil {
return fmt.Errorf("failed to restart deployment %s: %w", name, err)
}
infoLogger.LogKV("msg", "rolling-update deployment", "name", name, "namespace", ns)
err := func() error {
kr := NewKubectlRunner(kubeCfgPath, ns)

err = kr.DeploymentRolloutStatus(ctx, 10*time.Minute, name)
if err != nil {
return fmt.Errorf("failed to watch the rollout status of deployment %s: %w", name, err)
}
return nil
}()
err := kr.DeploymentRestart(ctx, 2*time.Minute, name)
if err != nil {
warnLogger.LogKV("msg", "failed to rolling-update",
"error", err,
"deployment", name,
"namespace", ns)
return fmt.Errorf("failed to restart deployment %s: %w", name, err)
}

err = kr.DeploymentRolloutStatus(ctx, 10*time.Minute, name)
if err != nil {
return fmt.Errorf("failed to watch the rollout status of deployment %s: %w", name, err)
}
return nil
}()
if err != nil {
warnLogger.LogKV("msg", "failed to rolling-update",
"error", err,
"deployment", name,
"namespace", ns)
}
}
}
return rollingUpdateFn, cleanupFn, nil
}

// NewRunnerGroupSpecFromYAML returns RunnerGroupSpec instance from yaml data.
Expand Down
Loading