Skip to content

Commit 3d5d50c

Browse files
committed
[ENH] Leader election for sysdb
1 parent 6b40d2c commit 3d5d50c

File tree

5 files changed

+100
-59
lines changed

5 files changed

+100
-59
lines changed

go/cmd/logservice/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ import (
44
"context"
55
"net"
66

7+
"github.com/chroma-core/chroma/go/pkg/leader"
78
"github.com/chroma-core/chroma/go/pkg/log/configuration"
8-
"github.com/chroma-core/chroma/go/pkg/log/leader"
99
"github.com/chroma-core/chroma/go/pkg/log/metrics"
1010
"github.com/chroma-core/chroma/go/pkg/log/purging"
1111
"github.com/chroma-core/chroma/go/pkg/log/repository"

go/pkg/log/leader/main.go renamed to go/pkg/leader/election.go

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@ package leader
33
import (
44
"context"
55
"os"
6+
"strings"
67
"time"
78

89
"github.com/pingcap/log"
9-
1010
"go.uber.org/zap"
1111
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1212
"k8s.io/client-go/kubernetes"
@@ -15,6 +15,21 @@ import (
1515
"k8s.io/client-go/tools/leaderelection/resourcelock"
1616
)
1717

18+
// extractServiceName extracts the service name from a pod name.
19+
// The service name is expected to be the prefix before the last hyphen in the pod name.
20+
// For example, from pod name "chroma-query-abc123", it will return "chroma-query".
21+
func extractServiceName(podName string) string {
22+
parts := strings.Split(podName, "-")
23+
if len(parts) > 1 {
24+
return strings.Join(parts[:len(parts)-1], "-")
25+
}
26+
return podName
27+
}
28+
29+
// AcquireLeaderLock starts leader election and runs the given function when leadership is acquired.
30+
// The context passed to onStartedLeading will be cancelled when leadership is lost.
31+
// The service name is automatically determined from the pod name by extracting the prefix before the last hyphen.
32+
// The lock name will be formatted as "{service-name}-leader".
1833
func AcquireLeaderLock(ctx context.Context, onStartedLeading func(context.Context)) {
1934
podName, _ := os.LookupEnv("POD_NAME")
2035
if podName == "" {
@@ -32,7 +47,11 @@ func AcquireLeaderLock(ctx context.Context, onStartedLeading func(context.Contex
3247
return
3348
}
3449

35-
elector, err := setupLeaderElection(client, namespace, podName, onStartedLeading)
50+
// Format the lock name with the service name
51+
serviceName := extractServiceName(podName)
52+
lockName := serviceName + "-leader"
53+
54+
elector, err := setupLeaderElection(client, namespace, podName, lockName, onStartedLeading)
3655
if err != nil {
3756
log.Error("failed to setup leader election", zap.Error(err))
3857
return
@@ -49,10 +68,10 @@ func createKubernetesClient() (*kubernetes.Clientset, error) {
4968
return kubernetes.NewForConfig(config)
5069
}
5170

52-
func setupLeaderElection(client *kubernetes.Clientset, namespace, podName string, onStartedLeading func(context.Context)) (lr *leaderelection.LeaderElector, err error) {
71+
func setupLeaderElection(client *kubernetes.Clientset, namespace, podName, lockName string, onStartedLeading func(context.Context)) (*leaderelection.LeaderElector, error) {
5372
lock := &resourcelock.LeaseLock{
5473
LeaseMeta: metav1.ObjectMeta{
55-
Name: "log-leader-lock",
74+
Name: lockName,
5675
Namespace: namespace,
5776
},
5877
Client: client.CoordinationV1(),
@@ -61,21 +80,20 @@ func setupLeaderElection(client *kubernetes.Clientset, namespace, podName string
6180
},
6281
}
6382

64-
lr, err = leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
83+
return leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
6584
Lock: lock,
6685
ReleaseOnCancel: true,
6786
LeaseDuration: 15 * time.Second,
6887
RenewDeadline: 10 * time.Second,
6988
RetryPeriod: 2 * time.Second,
7089
Callbacks: leaderelection.LeaderCallbacks{
7190
OnStartedLeading: func(ctx context.Context) {
72-
log.Info("started leading")
91+
log.Info("started leading", zap.String("lock", lockName))
7392
onStartedLeading(ctx)
7493
},
7594
OnStoppedLeading: func() {
76-
log.Info("stopped leading")
95+
log.Info("stopped leading", zap.String("lock", lockName))
7796
},
7897
},
7998
})
80-
return
8199
}

go/pkg/memberlist_manager/memberlist_manager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ package memberlist_manager
22

33
import (
44
"context"
5-
"time"
65
"sort"
6+
"time"
77

88
"github.com/chroma-core/chroma/go/pkg/common"
99
"github.com/pingcap/log"

go/pkg/sysdb/grpc/server.go

Lines changed: 51 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import (
66
"time"
77

88
"github.com/chroma-core/chroma/go/pkg/grpcutils"
9-
9+
"github.com/chroma-core/chroma/go/pkg/leader"
1010
"github.com/chroma-core/chroma/go/pkg/memberlist_manager"
1111
"github.com/chroma-core/chroma/go/pkg/proto/coordinatorpb"
1212
"github.com/chroma-core/chroma/go/pkg/sysdb/coordinator"
@@ -92,6 +92,48 @@ func New(config Config) (*Server, error) {
9292
}
9393
}
9494

95+
func StartMemberListManagers(leaderCtx context.Context, config Config) error {
96+
namespace := config.KubernetesNamespace
97+
98+
// Store managers for cleanup
99+
managers := []struct {
100+
serviceType string
101+
manager *memberlist_manager.MemberlistManager
102+
memberlistName string
103+
podLabel string
104+
}{
105+
{"query", nil, config.QueryServiceMemberlistName, config.QueryServicePodLabel},
106+
{"compaction", nil, config.CompactionServiceMemberlistName, config.CompactionServicePodLabel},
107+
{"garbage_collection", nil, config.GarbageCollectionServiceMemberlistName, config.GarbageCollectionServicePodLabel},
108+
{"log", nil, config.LogServiceMemberlistName, config.LogServicePodLabel},
109+
}
110+
111+
for i, m := range managers {
112+
manager, err := createMemberlistManager(namespace, m.memberlistName, m.podLabel, config.WatchInterval, config.ReconcileInterval, config.ReconcileCount)
113+
if err != nil {
114+
log.Error("Failed to create memberlist manager for service", zap.String("service", m.serviceType), zap.Error(err))
115+
return err
116+
}
117+
managers[i].manager = manager
118+
}
119+
120+
// Start all memberlist managers
121+
for _, m := range managers {
122+
if err := m.manager.Start(); err != nil {
123+
log.Error("Failed to start memberlist manager for service", zap.String("service", m.serviceType), zap.Error(err))
124+
}
125+
}
126+
127+
// Wait for context cancellation (leadership lost)
128+
<-leaderCtx.Done()
129+
130+
// Stop all memberlist managers
131+
for _, m := range managers {
132+
m.manager.Stop()
133+
}
134+
return nil
135+
}
136+
95137
func NewWithGrpcProvider(config Config, provider grpcutils.GrpcProvider) (*Server, error) {
96138
log.Info("Creating new GRPC server with config", zap.Any("config", config))
97139
ctx := context.Background()
@@ -110,54 +152,14 @@ func NewWithGrpcProvider(config Config, provider grpcutils.GrpcProvider) (*Serve
110152
}
111153
s.coordinator = *coordinator
112154
if !config.Testing {
113-
namespace := config.KubernetesNamespace
114-
// Create memberlist manager for query service
115-
queryMemberlistManager, err := createMemberlistManager(namespace, config.QueryServiceMemberlistName, config.QueryServicePodLabel, config.WatchInterval, config.ReconcileInterval, config.ReconcileCount)
116-
if err != nil {
117-
return nil, err
118-
}
119-
120-
// Create memberlist manager for compaction service
121-
compactionMemberlistManager, err := createMemberlistManager(namespace, config.CompactionServiceMemberlistName, config.CompactionServicePodLabel, config.WatchInterval, config.ReconcileInterval, config.ReconcileCount)
122-
if err != nil {
123-
return nil, err
124-
}
125-
126-
// Create memberlist manager for garbage collection service
127-
garbageCollectionMemberlistManager, err := createMemberlistManager(namespace, config.GarbageCollectionServiceMemberlistName, config.GarbageCollectionServicePodLabel, config.WatchInterval, config.ReconcileInterval, config.ReconcileCount)
128-
if err != nil {
129-
return nil, err
130-
}
131-
132-
// Create memberlist manager for log service
133-
logServiceMemberlistManager, err := createMemberlistManager(namespace, config.LogServiceMemberlistName, config.LogServicePodLabel, config.WatchInterval, config.ReconcileInterval, config.ReconcileCount)
134-
if err != nil {
135-
return nil, err
136-
}
137-
138-
// Start the memberlist manager for query service
139-
err = queryMemberlistManager.Start()
140-
if err != nil {
141-
return nil, err
142-
}
143-
// Start the memberlist manager for compaction service
144-
err = compactionMemberlistManager.Start()
145-
if err != nil {
146-
return nil, err
147-
}
148-
149-
// Start the memberlist manager for garbage collection service
150-
err = garbageCollectionMemberlistManager.Start()
151-
if err != nil {
152-
return nil, err
153-
}
154-
155-
// Start the memberlist manager for log service
156-
err = logServiceMemberlistManager.Start()
157-
if err != nil {
158-
return nil, err
159-
}
160-
155+
// Start leader election for memberlist management
156+
go leader.AcquireLeaderLock(context.Background(), func(leaderCtx context.Context) {
157+
log.Info("Acquired leadership for memberlist management")
158+
if err := StartMemberListManagers(leaderCtx, config); err != nil {
159+
log.Error("Failed to start memberlist manager", zap.Error(err))
160+
}
161+
log.Info("Released leadership for memberlist management")
162+
})
161163
log.Info("Starting GRPC server")
162164
s.grpcServer, err = provider.StartGrpcServer("coordinator", config.GrpcConfig, func(registrar grpc.ServiceRegistrar) {
163165
coordinatorpb.RegisterSysDBServer(registrar, s)

k8s/distributed-chroma/templates/sysdb-service.yaml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,14 @@ spec:
2626
# TODO properly use flow control here to check which type of value we need.
2727
{{ .value | nindent 14 }}
2828
{{ end }}
29+
- name: POD_NAME
30+
valueFrom:
31+
fieldRef:
32+
fieldPath: metadata.name
33+
- name: POD_NAMESPACE
34+
valueFrom:
35+
fieldRef:
36+
fieldPath: metadata.namespace
2937
image: "{{ .Values.sysdb.image.repository }}:{{ .Values.sysdb.image.tag }}"
3038
imagePullPolicy: IfNotPresent
3139
readinessProbe:
@@ -84,3 +92,16 @@ subjects:
8492
namespace: {{ .Values.namespace }}
8593

8694
---
95+
apiVersion: rbac.authorization.k8s.io/v1
96+
kind: RoleBinding
97+
metadata:
98+
name: sysdb-serviceaccount-lease-watcher-binding
99+
namespace: {{ .Values.namespace }}
100+
roleRef:
101+
apiGroup: rbac.authorization.k8s.io
102+
kind: Role
103+
name: lease-watcher
104+
subjects:
105+
- kind: ServiceAccount
106+
name: sysdb-serviceaccount
107+
namespace: {{ .Values.namespace }}

0 commit comments

Comments
 (0)