From 9c64c9ae918b9dfebb92f6296844315c58d808c7 Mon Sep 17 00:00:00 2001 From: smiletan Date: Fri, 27 Jun 2025 13:49:32 +0800 Subject: [PATCH] use be image as init image --- pkg/common/utils/k8s/client.go | 21 ++++++++++++++++--- pkg/common/utils/resource/pod.go | 21 +++++++++++++++++++ .../sub_controller/be/controller.go | 7 ++++++- .../sub_controller/be/statefulset.go | 4 ++++ .../computegroups/controller.go | 6 +++++- .../computegroups/statefulset.go | 6 +++++- .../disaggregated_fe/statefulset.go | 2 +- .../disaggregated_subcontroller.go | 7 +++---- 8 files changed, 63 insertions(+), 11 deletions(-) diff --git a/pkg/common/utils/k8s/client.go b/pkg/common/utils/k8s/client.go index 8c6c47c8..ecd40b20 100644 --- a/pkg/common/utils/k8s/client.go +++ b/pkg/common/utils/k8s/client.go @@ -42,6 +42,8 @@ type ServiceEqual func(svc1 *corev1.Service, svc2 *corev1.Service) bool // judge two statefulset equal or not in some fields. develoer can custom the function. type StatefulSetEqual func(st1 *appv1.StatefulSet, st2 *appv1.StatefulSet) bool +type PreApplyStatefulset func(nst *appv1.StatefulSet, est *appv1.StatefulSet) + func ApplyService(ctx context.Context, k8sclient client.Client, svc *corev1.Service, equal ServiceEqual) error { // As stated in the RetryOnConflict's documentation, the returned error shouldn't be wrapped. var esvc corev1.Service @@ -87,17 +89,30 @@ func ListStatefulsetInNamespace(ctx context.Context, k8sclient client.Client, na } // ApplyStatefulSet when the object is not exist, create object. if exist and statefulset have been updated, patch the statefulset. -func ApplyStatefulSet(ctx context.Context, k8sclient client.Client, st *appv1.StatefulSet, equal StatefulSetEqual) error { +func ApplyStatefulSet(ctx context.Context, k8sclient client.Client, st *appv1.StatefulSet, equal StatefulSetEqual, pasfs ...PreApplyStatefulset) error { var est appv1.StatefulSet + create := false err := k8sclient.Get(ctx, types.NamespacedName{Namespace: st.Namespace, Name: st.Name}, &est) if err != nil && apierrors.IsNotFound(err) { - return CreateClientObject(ctx, k8sclient, st) + create = true } else if err != nil && !apierrors.IsNotFound(err) { return err } + //check the statefulset need update or not. + ev := equal(st, &est) + + // apply pre-processing before create statefulset + for _, pasf := range pasfs { + pasf(st, nil) + } + + if create { + return CreateClientObject(ctx, k8sclient, st) + } + //if have restart annotation we should exclude it impacts on hash. - if equal(st, &est) { + if ev { klog.Infof("ApplyStatefulSet Sync exist statefulset name=%s, namespace=%s, equals to new statefulset.", est.Name, est.Namespace) return nil } diff --git a/pkg/common/utils/resource/pod.go b/pkg/common/utils/resource/pod.go index 0fcbac50..3055e081 100644 --- a/pkg/common/utils/resource/pod.go +++ b/pkg/common/utils/resource/pod.go @@ -84,6 +84,9 @@ const ( DISAGGREGATED_LIVE_PARAM_READY = "ready" POD_CONTROLLER_REVISION_HASH_KEY = "controller-revision-hash" + + DISAGGREGATED_FE_MAIN_CONTAINER_NAME = "fe" + DISAGGREGATED_BE_MAIN_CONTAINER_NAME = "compute" ) type ProbeType string @@ -1212,3 +1215,21 @@ func constructBeDefaultInitContainer(defaultImage string) corev1.Container { }, ) } + +// the default value has updated, when updated statefulset use new default value. +func UseNewDefaultInitContainerImage(pts *corev1.PodTemplateSpec) { + var beImage string + for _, c := range pts.Spec.Containers { + if c.Name == string(v1.Component_BE) || c.Name == string(DISAGGREGATED_BE_MAIN_CONTAINER_NAME){ + beImage = c.Image + break + } + } + + //use be image as default init image + for i, _ := range pts.Spec.InitContainers { + if pts.Spec.InitContainers[i].Image == DEFAULT_INIT_IMAGE { + pts.Spec.InitContainers[i].Image = beImage + } + } +} diff --git a/pkg/controller/sub_controller/be/controller.go b/pkg/controller/sub_controller/be/controller.go index fbd8066b..557f0a38 100644 --- a/pkg/controller/sub_controller/be/controller.go +++ b/pkg/controller/sub_controller/be/controller.go @@ -105,11 +105,16 @@ func (be *Controller) Sync(ctx context.Context, dcr *v1.DorisCluster) error { return nil } + //use new default value before apply new statefulset. + ndf := func(st *appv1.StatefulSet, est *appv1.StatefulSet){ + be.useNewDefaultValuesInStatefulset(st) + } + if err = k8s.ApplyStatefulSet(ctx, be.K8sclient, &st, func(new *appv1.StatefulSet, est *appv1.StatefulSet) bool { // if have restart annotation, we should exclude the interference for comparison. be.RestrictConditionsEqual(new, est) return resource.StatefulSetDeepEqual(new, est, false) - }); err != nil { + }, ndf); err != nil { klog.Errorf("fe controller sync statefulset name=%s, namespace=%s, clusterName=%s failed. message=%s.", st.Name, st.Namespace, dcr.Name, err.Error()) return err diff --git a/pkg/controller/sub_controller/be/statefulset.go b/pkg/controller/sub_controller/be/statefulset.go index 16063dbc..bad44c80 100644 --- a/pkg/controller/sub_controller/be/statefulset.go +++ b/pkg/controller/sub_controller/be/statefulset.go @@ -28,3 +28,7 @@ func (be *Controller) buildBEStatefulSet(dcr *v1.DorisCluster, config map[string st.Spec.Template = be.buildBEPodTemplateSpec(dcr, config) return st } + +func (be *Controller) useNewDefaultValuesInStatefulset(st *appv1.StatefulSet) { + resource.UseNewDefaultInitContainerImage(&st.Spec.Template) +} \ No newline at end of file diff --git a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go index 2ca8112b..6f8dbd4a 100644 --- a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go +++ b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go @@ -186,6 +186,10 @@ func (dcgs *DisaggregatedComputeGroupsController) reconcileStatefulset(ctx conte return nil, nil } + //use new default value before apply new statefulset + ndf := func(st *appv1.StatefulSet, est *appv1.StatefulSet) { + dcgs.useNewDefaultValuesInStatefulset(st) + } if err := k8s.ApplyStatefulSet(ctx, dcgs.K8sclient, st, func(st, est *appv1.StatefulSet) bool { //store annotations "doris.disaggregated.cluster/generation={generation}" on statefulset //store annotations "doris.disaggregated.cluster/update-{uniqueid}=true/false" on DorisDisaggregatedCluster @@ -205,7 +209,7 @@ func (dcgs *DisaggregatedComputeGroupsController) reconcileStatefulset(ctx conte } return equal - }); err != nil { + }, ndf); err != nil { klog.Errorf("disaggregatedComputeGroupsController reconcileStatefulset apply statefulset namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error()) return &sc.Event{Type: sc.EventWarning, Reason: sc.CGApplyResourceFailed, Message: err.Error()}, err } diff --git a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/statefulset.go b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/statefulset.go index 6edc3df2..9bfbaff4 100644 --- a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/statefulset.go +++ b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/statefulset.go @@ -137,7 +137,7 @@ func (dcgs *DisaggregatedComputeGroupsController) NewCGContainer(ddc *dv1.DorisD cmd, args := sub.GetDisaggregatedCommand(dv1.DisaggregatedBE) c.Command = cmd c.Args = args - c.Name = sub.BEMainContainerName + c.Name = resource.DISAGGREGATED_BE_MAIN_CONTAINER_NAME c.Ports = resource.GetDisaggregatedContainerPorts(cvs, dv1.DisaggregatedBE) c.Env = cg.CommonSpec.EnvVars @@ -209,3 +209,7 @@ func (dcgs *DisaggregatedComputeGroupsController) newSpecificEnvs(ddc *dv1.Doris return cgEnvs } + +func(dcgs *DisaggregatedComputeGroupsController) useNewDefaultValuesInStatefulset(st *appv1.StatefulSet) { + resource.UseNewDefaultInitContainerImage(&st.Spec.Template) +} \ No newline at end of file diff --git a/pkg/controller/sub_controller/disaggregated_cluster/disaggregated_fe/statefulset.go b/pkg/controller/sub_controller/disaggregated_cluster/disaggregated_fe/statefulset.go index 8a63a5ed..e9b60b45 100644 --- a/pkg/controller/sub_controller/disaggregated_cluster/disaggregated_fe/statefulset.go +++ b/pkg/controller/sub_controller/disaggregated_cluster/disaggregated_fe/statefulset.go @@ -127,7 +127,7 @@ func (dfc *DisaggregatedFEController) NewFEContainer(ddc *v1.DorisDisaggregatedC cmd, args := sub.GetDisaggregatedCommand(v1.DisaggregatedFE) c.Command = cmd c.Args = args - c.Name = sub.FEMainContainerName + c.Name = resource.DISAGGREGATED_FE_MAIN_CONTAINER_NAME c.Ports = resource.GetDisaggregatedContainerPorts(cvs, v1.DisaggregatedFE) c.Env = ddc.Spec.FeSpec.CommonSpec.EnvVars diff --git a/pkg/controller/sub_controller/disaggregated_subcontroller.go b/pkg/controller/sub_controller/disaggregated_subcontroller.go index b1a0d73b..ca0d355e 100644 --- a/pkg/controller/sub_controller/disaggregated_subcontroller.go +++ b/pkg/controller/sub_controller/disaggregated_subcontroller.go @@ -57,8 +57,7 @@ const ( FileCachePathKey = "file_cache_path" FileCacheSubConfigPathKey = "path" FileCacheSubConfigTotalSizeKey = "total_size" - FEMainContainerName = "fe" - BEMainContainerName = "compute" + ) type DisaggregatedSubController interface { @@ -300,14 +299,14 @@ func(d *DisaggregatedSubDefaultController) AddClusterSpecForPodTemplate(componen switch componentType { case v1.DisaggregatedFE: for i, _ := range pts.Spec.Containers { - if pts.Spec.Containers[i].Name == FEMainContainerName { + if pts.Spec.Containers[i].Name == resource.DISAGGREGATED_FE_MAIN_CONTAINER_NAME { c = &pts.Spec.Containers[i] break } } case v1.DisaggregatedBE: for i, _ := range pts.Spec.Containers { - if pts.Spec.Containers[i].Name == BEMainContainerName { + if pts.Spec.Containers[i].Name == resource.DISAGGREGATED_BE_MAIN_CONTAINER_NAME { c = &pts.Spec.Containers[i] break }