Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions api/disaggregated/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,9 @@ type DorisDisaggregatedClusterStatus struct {

//ComputeGroupStatuses reflect a list of computeGroup status.
ComputeGroupStatuses []ComputeGroupStatus `json:"computeGroupStatuses,omitempty"`

//is the most recent generation observed for DorisDisaggregatedCluster
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
}

type MetaServiceStatus struct {
Expand Down
6 changes: 6 additions & 0 deletions api/disaggregated/v1/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ const (
DisaggregatedSpecHashValueAnnotation string = "doris.disaggregated.cluster/hash"

ServiceRoleForCluster string = "app.doris.service/role"

//annnotate on statefulset, represent which generation of DorisDisaggregatedCluster update the statefulset.
UpdateStatefulsetGeneration string = "doris.disaggregated.cluster/generation"

//use uniqueId as indifier of which statefulset updated. value is the ddc updateVersion
UpdateStatefulsetName = "doris.disaggregated.cluster/%s"
)

type DisaggregatedComponentType string
Expand Down
2 changes: 2 additions & 0 deletions pkg/common/utils/resource/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ const (

DISAGGREGATED_LIVE_PARAM_ALIVE = "alive"
DISAGGREGATED_LIVE_PARAM_READY = "ready"

POD_CONTROLLER_REVISION_HASH_KEY = "controller-revision-hash"
)

type ProbeType string
Expand Down
10 changes: 9 additions & 1 deletion pkg/controller/disaggregated_cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,16 +262,24 @@ func (dc *DisaggregatedClusterReconciler) reorganizeStatus(ddc *dv1.DorisDisaggr
//update component status.
if err := sc.UpdateComponentStatus(ddc); err != nil {
klog.Errorf("DorisClusterReconciler reconcile update component %s status failed.err=%s\n", sc.GetControllerName(), err.Error())
return requeueIfError(err)
// if failed, the cluster status is not green, in follow step will return requeue after 5 second. so, return error is not need.
//return requeueIfError(err)
}
}

ddc.Status.ObservedGeneration = ddc.Generation
ddc.Status.ClusterHealth.Health = dv1.Green
if ddc.Status.FEStatus.AvailableStatus != dv1.Available || ddc.Status.ClusterHealth.CGAvailableCount <= (ddc.Status.ClusterHealth.CGCount/2) {
ddc.Status.ClusterHealth.Health = dv1.Red
} else if ddc.Status.FEStatus.Phase != dv1.Ready || ddc.Status.ClusterHealth.CGAvailableCount < ddc.Status.ClusterHealth.CGCount {
ddc.Status.ClusterHealth.Health = dv1.Yellow
}

//if have any component not ready, should reconcile.
if ddc.Status.MetaServiceStatus.Phase != dv1.Ready || ddc.Status.FEStatus.Phase != dv1.Ready || ddc.Status.ClusterHealth.CGAvailableCount != ddc.Status.ClusterHealth.CGCount {
return ctrl.Result{Requeue: true}, nil
}

return ctrl.Result{}, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,24 @@ func (dcgs *DisaggregatedComputeGroupsController) reconcileStatefulset(ctx conte
}

if err := k8s.ApplyStatefulSet(ctx, dcgs.K8sclient, st, func(st, est *appv1.StatefulSet) bool {
return resource.StatefulsetDeepEqualWithKey(st, est, dv1.DisaggregatedSpecHashValueAnnotation, false)
//store annotations "doris.disaggregated.cluster/generation={generation}" on statefulset
//store annotations "doris.disaggregated.cluster/update-{uniqueid}=true/false" on DorisDisaggregatedCluster
equal := resource.StatefulsetDeepEqualWithKey(st, est, dv1.DisaggregatedSpecHashValueAnnotation, false)
if !equal {
if len(st.Annotations) == 0 {
st.Annotations = map[string]string{}
}
st_annos := (resource.Annotations)(st.Annotations)
st_annos.Add(dv1.UpdateStatefulsetGeneration, strconv.FormatInt(cluster.Generation, 10))
if len(cluster.Annotations) == 0 {
cluster.Annotations = map[string]string{}
}
ddc_annos := (resource.Annotations)(cluster.Annotations)
msUniqueIdKey := strings.ToLower(fmt.Sprintf(dv1.UpdateStatefulsetName, cluster.GetCGStatefulsetName(cg)))
ddc_annos.Add(msUniqueIdKey, "true")
}
return equal

}); err != nil {
klog.Errorf("disaggregatedComputeGroupsController reconcileStatefulset apply statefulset namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error())
return &sc.Event{Type: sc.EventWarning, Reason: sc.CGApplyResourceFailed, Message: err.Error()}, err
Expand Down Expand Up @@ -546,12 +563,34 @@ func (dcgs *DisaggregatedComputeGroupsController) UpdateComponentStatus(obj clie
}

func (dcgs *DisaggregatedComputeGroupsController) updateCGStatus(ddc *dv1.DorisDisaggregatedCluster, cgs *dv1.ComputeGroupStatus) error {
stfName := cgs.StatefulsetName
sts, err := k8s.GetStatefulSet(context.Background(), dcgs.K8sclient, ddc.Namespace, stfName)
if err != nil {
klog.Errorf("DisaggregatedComputeGroupsController updateCGStatus get statefulset %s failed, err=%s", stfName, err.Error())
return err
}

//check statefulset updated or not, if this reconcile update the sts, should exclude the circumstance that get old sts and the pods not updated.
updateStatefulsetKey := strings.ToLower(fmt.Sprintf(dv1.UpdateStatefulsetName, stfName))
if _, updated := ddc.Annotations[updateStatefulsetKey]; updated {
generation := dcgs.DisaggregatedSubDefaultController.ReturnStatefulsetUpdatedGeneration(sts, updateStatefulsetKey)
//if this reconcile not update statefulset will not check the generation equals or not.
if ddc.Generation != generation {
return errors.New("waiting statefulset upd ated")
}
}

selector := dcgs.newCGPodsSelector(ddc.Name, cgs.UniqueId)
var podList corev1.PodList
if err := dcgs.K8sclient.List(context.Background(), &podList, client.InNamespace(ddc.Namespace), client.MatchingLabels(selector)); err != nil {
return err
}


updateRevision := sts.Status.UpdateRevision
//check all pods controlled by new statefulset.
allUpdated := dcgs.DisaggregatedSubDefaultController.StatefulsetControlledPodsAllUseNewUpdateRevision(updateRevision, podList.Items)

var availableReplicas int32
var creatingReplicas int32
var failedReplicas int32
Expand All @@ -567,7 +606,7 @@ func (dcgs *DisaggregatedComputeGroupsController) updateCGStatus(ddc *dv1.DorisD
}

cgs.AvailableReplicas = availableReplicas
if availableReplicas == cgs.Replicas {
if allUpdated && availableReplicas == cgs.Replicas {
cgs.Phase = dv1.Ready
}
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ func getScaledOutBENode(
return dropNodes, nil
}

//if in decommission, skip apply statefulset.
func skipApplyStatefulset(ddc *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup) bool {
var cgStatus *dv1.ComputeGroupStatus
uniqueId := cg.UniqueId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package disaggregated_fe

import (
"context"
"errors"
"fmt"
"github.com/apache/doris-operator/api/disaggregated/v1"
"github.com/apache/doris-operator/pkg/common/utils/k8s"
Expand Down Expand Up @@ -57,7 +58,7 @@ func New(mgr ctrl.Manager) *DisaggregatedFEController {

func (dfc *DisaggregatedFEController) Sync(ctx context.Context, obj client.Object) error {
ddc := obj.(*v1.DorisDisaggregatedCluster)
//TODO: check ms status
//deploying fe when ms is available.
if !dfc.msAvailable(ddc) {
dfc.K8srecorder.Event(ddc, string(sc.EventNormal), string(sc.WaitMetaServiceAvailable), "meta service have not ready.")
return nil
Expand Down Expand Up @@ -85,6 +86,7 @@ func (dfc *DisaggregatedFEController) Sync(ctx context.Context, obj client.Objec
svc := dfc.newService(ddc, confMap)

st := dfc.NewStatefulset(ddc, confMap)
//initial fe status on start. in resource process step, may be use the status record the process.
dfc.initialFEStatus(ddc)

event, err := dfc.DefaultReconcileService(ctx, svcInternal)
Expand Down Expand Up @@ -195,7 +197,23 @@ func (dfc *DisaggregatedFEController) UpdateComponentStatus(obj client.Object) e
ddc := obj.(*v1.DorisDisaggregatedCluster)

stfName := ddc.GetFEStatefulsetName()
sts, err := k8s.GetStatefulSet(context.Background(), dfc.K8sclient, ddc.Namespace, stfName)
if err != nil {
klog.Errorf("DisaggregatedFEController UpdateComponentStatus get statefulset %s failed, err=%s", stfName, err.Error())
return err
}

//check statefulset updated or not, if this reconcile update the sts, should exclude the circumstance that get old sts and the pods not updated.
updateStatefulsetKey := strings.ToLower(fmt.Sprintf(v1.UpdateStatefulsetName, ddc.GetFEStatefulsetName()))
if _, updated := ddc.Annotations[updateStatefulsetKey]; updated {
generation := dfc.DisaggregatedSubDefaultController.ReturnStatefulsetUpdatedGeneration(sts, updateStatefulsetKey)
//if this reconcile not update statefulset will not check the generation equals or not.
if ddc.Generation != generation {
return errors.New("waiting statefulset upd ated")
}
}

updateRevision := sts.Status.UpdateRevision
// FEStatus
feSpec := ddc.Spec.FeSpec
electionNumber := ddc.GetElectionNumber()
Expand All @@ -204,6 +222,8 @@ func (dfc *DisaggregatedFEController) UpdateComponentStatus(obj client.Object) e
if err := dfc.K8sclient.List(context.Background(), &podList, client.InNamespace(ddc.Namespace), client.MatchingLabels(selector)); err != nil {
return err
}
//check all pods controlled by new statefulset.
allUpdated := dfc.DisaggregatedSubDefaultController.StatefulsetControlledPodsAllUseNewUpdateRevision(updateRevision, podList.Items)
for _, pod := range podList.Items {

if ready := k8s.PodIsReady(&pod.Status); ready {
Expand All @@ -227,7 +247,7 @@ func (dfc *DisaggregatedFEController) UpdateComponentStatus(obj client.Object) e
}
// all fe pods are Ready, FEStatus.Phase is Ready,
// for ClusterHealth.Health is green
if masterAliveReplicas == electionNumber && availableReplicas == *(feSpec.Replicas) {
if allUpdated && masterAliveReplicas == electionNumber && availableReplicas == *(feSpec.Replicas) {
ddc.Status.FEStatus.Phase = v1.Ready
}

Expand Down Expand Up @@ -288,7 +308,23 @@ func (dfc *DisaggregatedFEController) reconcileStatefulset(ctx context.Context,

// apply fe StatefulSet
if err := k8s.ApplyStatefulSet(ctx, dfc.K8sclient, st, func(st, est *appv1.StatefulSet) bool {
return resource.StatefulsetDeepEqualWithKey(st, est, v1.DisaggregatedSpecHashValueAnnotation, false)
//store annotations "doris.disaggregated.cluster/generation={generation}" on statefulset
//store annotations "doris.disaggregated.cluster/update-{uniqueid}=true/false" on DorisDisaggregatedCluster
equal := resource.StatefulsetDeepEqualWithKey(st, est, v1.DisaggregatedSpecHashValueAnnotation, false)
if !equal {
if len(st.Annotations) == 0 {
st.Annotations = map[string]string{}
}
st_annos := (resource.Annotations)(st.Annotations)
st_annos.Add(v1.UpdateStatefulsetGeneration, strconv.FormatInt(cluster.Generation, 10))
if len(cluster.Annotations) == 0 {
cluster.Annotations = map[string]string{}
}
ddc_annos := (resource.Annotations)(cluster.Annotations)
msUniqueIdKey := strings.ToLower(fmt.Sprintf(v1.UpdateStatefulsetName, cluster.GetFEStatefulsetName()))
ddc_annos.Add(msUniqueIdKey, "true")
}
return equal
}); err != nil {
klog.Errorf("disaggregatedFEController reconcileStatefulset apply statefulset namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error())
return &sc.Event{Type: sc.EventWarning, Reason: sc.FEApplyResourceFailed, Message: err.Error()}, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package metaservice

import (
"context"
"errors"
"fmt"
"github.com/apache/doris-operator/api/disaggregated/v1"
"github.com/apache/doris-operator/pkg/common/utils/k8s"
"github.com/apache/doris-operator/pkg/common/utils/resource"
Expand All @@ -31,6 +33,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"strconv"
"strings"
)

type DisaggregatedMSController struct {
Expand Down Expand Up @@ -83,11 +86,33 @@ func (dms *DisaggregatedMSController) UpdateComponentStatus(obj client.Object) e
token = v.(string)
}
ddc.Status.MetaServiceStatus.MsToken = token

stsName := ddc.GetMSStatefulsetName()
sts, err := k8s.GetStatefulSet(context.Background(), dms.K8sclient, ddc.Namespace, stsName)
if err != nil {
klog.Errorf("DisaggregatedMSController UpdateComponentStatus get statefulset %s failed, err=%s", stsName, err.Error())
return err
}

//check statefulset updated or not, if this reconcile update the sts, so we should exclude the circumstance that get old sts and the pods not updated.
updateStatefulsetKey := strings.ToLower(fmt.Sprintf(v1.UpdateStatefulsetName, ddc.GetMSStatefulsetName()))
if _, updated := ddc.Annotations[updateStatefulsetKey]; updated {
generation := dms.DisaggregatedSubDefaultController.ReturnStatefulsetUpdatedGeneration(sts, updateStatefulsetKey)
//if this reconcile not update statefulset will not check the generation equals or not.
if ddc.Generation != generation {
return errors.New("waiting statefulset updated")
}
}

updateRevision := sts.Status.UpdateRevision
selector := dms.newMSPodsSelector(ddc.Name)
var podList corev1.PodList
if err := dms.K8sclient.List(context.Background(), &podList, client.InNamespace(ddc.Namespace), client.MatchingLabels(selector)); err != nil {
return err
}

//check all pods controlled by new statefulset.
allUpdated := dms.DisaggregatedSubDefaultController.StatefulsetControlledPodsAllUseNewUpdateRevision(updateRevision, podList.Items)
for _, pod := range podList.Items {
if ready := k8s.PodIsReady(&pod.Status); ready {
availableReplicas++
Expand All @@ -100,6 +125,14 @@ func (dms *DisaggregatedMSController) UpdateComponentStatus(obj client.Object) e

if availableReplicas > 0 {
ddc.Status.MetaServiceStatus.AvailableStatus = v1.Available
}

//all pods ready and controlled by new update revision.
msReplicas := int32(2)
if ddc.Spec.MetaService.Replicas != nil {
msReplicas = *ddc.Spec.MetaService.Replicas
}
if availableReplicas == msReplicas && allUpdated {
ddc.Status.MetaServiceStatus.Phase = v1.Ready
}

Expand Down Expand Up @@ -142,7 +175,7 @@ func (dms *DisaggregatedMSController) Sync(ctx context.Context, obj client.Objec
return err
}

event, err = dms.reconcileStatefulset(ctx, st)
event, err = dms.reconcileStatefulset(ctx, st, ddc)
if err != nil {
if event != nil {
dms.K8srecorder.Event(ddc, string(event.Type), string(event.Reason), event.Message)
Expand All @@ -154,7 +187,7 @@ func (dms *DisaggregatedMSController) Sync(ctx context.Context, obj client.Objec
return nil
}

func (dms *DisaggregatedMSController) reconcileStatefulset(ctx context.Context, st *appv1.StatefulSet) (*sc.Event, error) {
func (dms *DisaggregatedMSController) reconcileStatefulset(ctx context.Context, st *appv1.StatefulSet, ddc *v1.DorisDisaggregatedCluster) (*sc.Event, error) {
var est appv1.StatefulSet
if err := dms.K8sclient.Get(ctx, types.NamespacedName{Namespace: st.Namespace, Name: st.Name}, &est); apierrors.IsNotFound(err) {
if err = k8s.CreateClientObject(ctx, dms.K8sclient, st); err != nil {
Expand All @@ -169,7 +202,24 @@ func (dms *DisaggregatedMSController) reconcileStatefulset(ctx context.Context,
}

if err := k8s.ApplyStatefulSet(ctx, dms.K8sclient, st, func(st, est *appv1.StatefulSet) bool {
return resource.StatefulsetDeepEqualWithKey(st, est, v1.DisaggregatedSpecHashValueAnnotation, false)
//store annotations "doris.disaggregated.cluster/generation={generation}" on statefulset
//store annotations "doris.disaggregated.cluster/update-{uniqueid}=true/false" on DorisDisaggregatedCluster
equal := resource.StatefulsetDeepEqualWithKey(st, est, v1.DisaggregatedSpecHashValueAnnotation, false)
if !equal {
if len(st.Annotations) == 0 {
st.Annotations = map[string]string{}
}
st_annos := (resource.Annotations)(st.Annotations)
st_annos.Add(v1.UpdateStatefulsetGeneration, strconv.FormatInt(ddc.Generation, 10))
if len(ddc.Annotations) == 0 {
ddc.Annotations = map[string]string{}
}
ddc_annos := (resource.Annotations)(ddc.Annotations)
msUniqueIdKey := strings.ToLower(fmt.Sprintf(v1.UpdateStatefulsetName, ddc.GetMSStatefulsetName()))
ddc_annos.Add(msUniqueIdKey, "true")
}

return equal
}); err != nil {
klog.Errorf("dms controller reconcileStatefulset apply statefulset namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error())
return &sc.Event{Type: sc.EventWarning, Reason: sc.CGApplyResourceFailed, Message: err.Error()}, err
Expand All @@ -183,6 +233,7 @@ func (dms *DisaggregatedMSController) initMSStatus(ddc *v1.DorisDisaggregatedClu
if ddc.Status.MetaServiceStatus.Phase != "" {
initPhase = ddc.Status.MetaServiceStatus.Phase
}

//re initial status to un available
ddc.Status.MetaServiceStatus.AvailableStatus = v1.UnAvailable
ddc.Status.MetaServiceStatus.Phase = initPhase
Expand Down
40 changes: 40 additions & 0 deletions pkg/controller/sub_controller/disaggregated_subcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,46 @@ func(d *DisaggregatedSubDefaultController) AddClusterSpecForPodTemplate(componen

}

//return which generation had updated the statefulset.
func(d *DisaggregatedSubDefaultController) ReturnStatefulsetUpdatedGeneration(sts *appv1.StatefulSet, annoGenerationKey string) int64 {
if sts == nil {
return 0
}

if len(sts.Annotations) == 0 {
return 0
}

g_str := sts.Annotations[annoGenerationKey]
//if g_str is empty, g will be zero, this is our expectation, so ignore parse failed or not.
g, _ := strconv.ParseInt(g_str, 10, 64)
return g
}

//use statefulset.status.updateRevision and pod `controller-revision-hash` annotation to check pods updated to new revision.
//if all pods used new updateRevision return true, else return false.
func(d *DisaggregatedSubDefaultController) StatefulsetControlledPodsAllUseNewUpdateRevision(stsUpdateRevision string, pods []corev1.Pod) bool {
if stsUpdateRevision == "" {
return false
}

if len(pods) ==0 {
return false
}


for _, pod := range pods {
labels := pod.Labels
podControlledRevision := labels[resource.POD_CONTROLLER_REVISION_HASH_KEY]
//if use selector filter pods have one controlled by new revision of statefulset, represents the new revision is working.
if stsUpdateRevision != podControlledRevision {
return false
}
}

return true
}

func (d *DisaggregatedSubDefaultController) BuildVolumesVolumeMountsAndPVCs(confMap map[string]interface{}, componentType v1.DisaggregatedComponentType, commonSpec *v1.CommonSpec) ([]corev1.Volume, []corev1.VolumeMount, []corev1.PersistentVolumeClaim) {
if commonSpec.PersistentVolume == nil && len(commonSpec.PersistentVolumes) == 0 {
vs, vms := d.getEmptyDirVolumesVolumeMounts(confMap, componentType)
Expand Down