Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions pkg/common/utils/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type ServiceEqual func(svc1 *corev1.Service, svc2 *corev1.Service) bool
// judge two statefulset equal or not in some fields. develoer can custom the function.
type StatefulSetEqual func(st1 *appv1.StatefulSet, st2 *appv1.StatefulSet) bool

type PreApplyStatefulset func(nst *appv1.StatefulSet, est *appv1.StatefulSet)

func ApplyService(ctx context.Context, k8sclient client.Client, svc *corev1.Service, equal ServiceEqual) error {
// As stated in the RetryOnConflict's documentation, the returned error shouldn't be wrapped.
var esvc corev1.Service
Expand Down Expand Up @@ -87,17 +89,30 @@ func ListStatefulsetInNamespace(ctx context.Context, k8sclient client.Client, na
}

// ApplyStatefulSet when the object is not exist, create object. if exist and statefulset have been updated, patch the statefulset.
func ApplyStatefulSet(ctx context.Context, k8sclient client.Client, st *appv1.StatefulSet, equal StatefulSetEqual) error {
func ApplyStatefulSet(ctx context.Context, k8sclient client.Client, st *appv1.StatefulSet, equal StatefulSetEqual, pasfs ...PreApplyStatefulset) error {
var est appv1.StatefulSet
create := false
err := k8sclient.Get(ctx, types.NamespacedName{Namespace: st.Namespace, Name: st.Name}, &est)
if err != nil && apierrors.IsNotFound(err) {
return CreateClientObject(ctx, k8sclient, st)
create = true
} else if err != nil && !apierrors.IsNotFound(err) {
return err
}

//check the statefulset need update or not.
ev := equal(st, &est)

// apply pre-processing before create statefulset
for _, pasf := range pasfs {
pasf(st, nil)
}

if create {
return CreateClientObject(ctx, k8sclient, st)
}

//if have restart annotation we should exclude it impacts on hash.
if equal(st, &est) {
if ev {
klog.Infof("ApplyStatefulSet Sync exist statefulset name=%s, namespace=%s, equals to new statefulset.", est.Name, est.Namespace)
return nil
}
Expand Down
21 changes: 21 additions & 0 deletions pkg/common/utils/resource/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ const (
DISAGGREGATED_LIVE_PARAM_READY = "ready"

POD_CONTROLLER_REVISION_HASH_KEY = "controller-revision-hash"

DISAGGREGATED_FE_MAIN_CONTAINER_NAME = "fe"
DISAGGREGATED_BE_MAIN_CONTAINER_NAME = "compute"
)

type ProbeType string
Expand Down Expand Up @@ -1212,3 +1215,21 @@ func constructBeDefaultInitContainer(defaultImage string) corev1.Container {
},
)
}

// the default value has updated, when updated statefulset use new default value.
func UseNewDefaultInitContainerImage(pts *corev1.PodTemplateSpec) {
var beImage string
for _, c := range pts.Spec.Containers {
if c.Name == string(v1.Component_BE) || c.Name == string(DISAGGREGATED_BE_MAIN_CONTAINER_NAME){
beImage = c.Image
break
}
}

//use be image as default init image
for i, _ := range pts.Spec.InitContainers {
if pts.Spec.InitContainers[i].Image == DEFAULT_INIT_IMAGE {
pts.Spec.InitContainers[i].Image = beImage
}
}
}
7 changes: 6 additions & 1 deletion pkg/controller/sub_controller/be/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,16 @@ func (be *Controller) Sync(ctx context.Context, dcr *v1.DorisCluster) error {
return nil
}

//use new default value before apply new statefulset.
ndf := func(st *appv1.StatefulSet, est *appv1.StatefulSet){
be.useNewDefaultValuesInStatefulset(st)
}

if err = k8s.ApplyStatefulSet(ctx, be.K8sclient, &st, func(new *appv1.StatefulSet, est *appv1.StatefulSet) bool {
// if have restart annotation, we should exclude the interference for comparison.
be.RestrictConditionsEqual(new, est)
return resource.StatefulSetDeepEqual(new, est, false)
}); err != nil {
}, ndf); err != nil {
klog.Errorf("fe controller sync statefulset name=%s, namespace=%s, clusterName=%s failed. message=%s.",
st.Name, st.Namespace, dcr.Name, err.Error())
return err
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/sub_controller/be/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,7 @@ func (be *Controller) buildBEStatefulSet(dcr *v1.DorisCluster, config map[string
st.Spec.Template = be.buildBEPodTemplateSpec(dcr, config)
return st
}

func (be *Controller) useNewDefaultValuesInStatefulset(st *appv1.StatefulSet) {
resource.UseNewDefaultInitContainerImage(&st.Spec.Template)
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ func (dcgs *DisaggregatedComputeGroupsController) reconcileStatefulset(ctx conte
return nil, nil
}

//use new default value before apply new statefulset
ndf := func(st *appv1.StatefulSet, est *appv1.StatefulSet) {
dcgs.useNewDefaultValuesInStatefulset(st)
}
if err := k8s.ApplyStatefulSet(ctx, dcgs.K8sclient, st, func(st, est *appv1.StatefulSet) bool {
//store annotations "doris.disaggregated.cluster/generation={generation}" on statefulset
//store annotations "doris.disaggregated.cluster/update-{uniqueid}=true/false" on DorisDisaggregatedCluster
Expand All @@ -205,7 +209,7 @@ func (dcgs *DisaggregatedComputeGroupsController) reconcileStatefulset(ctx conte
}
return equal

}); err != nil {
}, ndf); err != nil {
klog.Errorf("disaggregatedComputeGroupsController reconcileStatefulset apply statefulset namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error())
return &sc.Event{Type: sc.EventWarning, Reason: sc.CGApplyResourceFailed, Message: err.Error()}, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (dcgs *DisaggregatedComputeGroupsController) NewCGContainer(ddc *dv1.DorisD
cmd, args := sub.GetDisaggregatedCommand(dv1.DisaggregatedBE)
c.Command = cmd
c.Args = args
c.Name = sub.BEMainContainerName
c.Name = resource.DISAGGREGATED_BE_MAIN_CONTAINER_NAME

c.Ports = resource.GetDisaggregatedContainerPorts(cvs, dv1.DisaggregatedBE)
c.Env = cg.CommonSpec.EnvVars
Expand Down Expand Up @@ -209,3 +209,7 @@ func (dcgs *DisaggregatedComputeGroupsController) newSpecificEnvs(ddc *dv1.Doris

return cgEnvs
}

func(dcgs *DisaggregatedComputeGroupsController) useNewDefaultValuesInStatefulset(st *appv1.StatefulSet) {
resource.UseNewDefaultInitContainerImage(&st.Spec.Template)
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (dfc *DisaggregatedFEController) NewFEContainer(ddc *v1.DorisDisaggregatedC
cmd, args := sub.GetDisaggregatedCommand(v1.DisaggregatedFE)
c.Command = cmd
c.Args = args
c.Name = sub.FEMainContainerName
c.Name = resource.DISAGGREGATED_FE_MAIN_CONTAINER_NAME

c.Ports = resource.GetDisaggregatedContainerPorts(cvs, v1.DisaggregatedFE)
c.Env = ddc.Spec.FeSpec.CommonSpec.EnvVars
Expand Down
7 changes: 3 additions & 4 deletions pkg/controller/sub_controller/disaggregated_subcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ const (
FileCachePathKey = "file_cache_path"
FileCacheSubConfigPathKey = "path"
FileCacheSubConfigTotalSizeKey = "total_size"
FEMainContainerName = "fe"
BEMainContainerName = "compute"

)

type DisaggregatedSubController interface {
Expand Down Expand Up @@ -300,14 +299,14 @@ func(d *DisaggregatedSubDefaultController) AddClusterSpecForPodTemplate(componen
switch componentType {
case v1.DisaggregatedFE:
for i, _ := range pts.Spec.Containers {
if pts.Spec.Containers[i].Name == FEMainContainerName {
if pts.Spec.Containers[i].Name == resource.DISAGGREGATED_FE_MAIN_CONTAINER_NAME {
c = &pts.Spec.Containers[i]
break
}
}
case v1.DisaggregatedBE:
for i, _ := range pts.Spec.Containers {
if pts.Spec.Containers[i].Name == BEMainContainerName {
if pts.Spec.Containers[i].Name == resource.DISAGGREGATED_BE_MAIN_CONTAINER_NAME {
c = &pts.Spec.Containers[i]
break
}
Expand Down