Skip to content

Commit 9d4cea3

Browse files
committed
chore: clusterinfo pkg manages consumers cluster info
1 parent b84263b commit 9d4cea3

File tree

5 files changed

+31
-35
lines changed

5 files changed

+31
-35
lines changed

collector/indices.go

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -461,17 +461,6 @@ func NewIndices(logger *slog.Logger, client *http.Client, url *url.URL, shards b
461461
},
462462
}
463463

464-
// start go routine to fetch clusterinfo updates and save them to lastClusterinfo
465-
go func() {
466-
logger.Debug("starting cluster info receive loop")
467-
for ci := range indices.clusterInfoCh {
468-
if ci != nil {
469-
logger.Debug("received cluster info update", "cluster", ci.ClusterName)
470-
indices.lastClusterInfo = ci
471-
}
472-
}
473-
logger.Debug("exiting cluster info receive loop")
474-
}()
475464
return indices
476465
}
477466

@@ -481,6 +470,10 @@ func (i *Indices) ClusterLabelUpdates() *chan *clusterinfo.Response {
481470
return &i.clusterInfoCh
482471
}
483472

473+
func (i *Indices) SetClusterInfo(r *clusterinfo.Response) {
474+
i.lastClusterInfo = r
475+
}
476+
484477
// String implements the stringer interface. It is part of the clusterinfo.consumer interface
485478
func (i *Indices) String() string {
486479
return namespace + "indices"

collector/shards.go

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ func (s *Shards) ClusterLabelUpdates() *chan *clusterinfo.Response {
5252
return &s.clusterInfoCh
5353
}
5454

55+
func (s *Shards) SetClusterInfo(r *clusterinfo.Response) {
56+
s.lastClusterInfo = r
57+
}
58+
5559
// String implements the stringer interface. It is part of the clusterinfo.consumer interface
5660
func (s *Shards) String() string {
5761
return namespace + "shards"
@@ -111,18 +115,6 @@ func NewShards(logger *slog.Logger, client *http.Client, url *url.URL) *Shards {
111115
}),
112116
}
113117

114-
// start go routine to fetch clusterinfo updates and save them to lastClusterinfo
115-
go func() {
116-
logger.Debug("starting cluster info receive loop")
117-
for ci := range shards.clusterInfoCh {
118-
if ci != nil {
119-
logger.Debug("received cluster info update", "cluster", ci.ClusterName)
120-
shards.lastClusterInfo = ci
121-
}
122-
}
123-
logger.Debug("exiting cluster info receive loop")
124-
}()
125-
126118
return shards
127119
}
128120

collector/slm.go

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -139,18 +139,6 @@ func NewSLM(logger *slog.Logger, u *url.URL, hc *http.Client, ci *clusterinfo.Re
139139
return slm, err
140140
}
141141

142-
// start go routine to fetch clusterinfo updates and save them to lastClusterinfo
143-
go func() {
144-
logger.Debug("starting cluster info receive loop")
145-
for ci := range slm.clusterInfoCh {
146-
if ci != nil {
147-
logger.Debug("received cluster info update", "cluster", ci.ClusterName)
148-
slm.lastClusterInfo = ci
149-
}
150-
}
151-
logger.Debug("exiting cluster info receive loop")
152-
}()
153-
154142
return slm, nil
155143
}
156144

@@ -174,6 +162,10 @@ func (s *SLM) ClusterLabelUpdates() *chan *clusterinfo.Response {
174162
return &s.clusterInfoCh
175163
}
176164

165+
func (s *SLM) SetClusterInfo(r *clusterinfo.Response) {
166+
s.lastClusterInfo = r
167+
}
168+
177169
func (s *SLM) String() string {
178170
return namespace + "slm"
179171
}

pkg/clusterinfo/clusterinfo.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ type consumer interface {
4646
ClusterLabelUpdates() *chan *Response
4747
// String implements the stringer interface
4848
String() string
49+
SetClusterInfo(*Response)
4950
}
5051

5152
// Retriever periodically gets the cluster info from the / endpoint and
@@ -160,6 +161,18 @@ func (r *Retriever) RegisterConsumer(c consumer) error {
160161
return ErrConsumerAlreadyRegistered
161162
}
162163
r.consumerChannels[c.String()] = c.ClusterLabelUpdates()
164+
165+
// start go routine to fetch clusterinfo updates and save them to the consumer
166+
go func() {
167+
r.logger.Debug("starting cluster info receive loop", "consumer", c.String())
168+
for ci := range *c.ClusterLabelUpdates() {
169+
if ci != nil {
170+
r.logger.Debug("received cluster info update", "consumer", c.String(), "cluster", ci.ClusterName)
171+
c.SetClusterInfo(ci)
172+
}
173+
}
174+
r.logger.Debug("exiting cluster info receive loop", "consumer", c.String())
175+
}()
163176
return nil
164177
}
165178

pkg/clusterinfo/clusterinfo_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,12 @@ func (mc *mockConsumer) ClusterLabelUpdates() *chan *Response {
113113
return &mc.ch
114114
}
115115

116+
func (mc *mockConsumer) SetClusterInfo(r *Response) {
117+
mc.mu.Lock()
118+
mc.data = r
119+
mc.mu.Unlock()
120+
}
121+
116122
func TestNew(t *testing.T) {
117123
u, err := url.Parse("http://localhost:9200")
118124
if err != nil {

0 commit comments

Comments
 (0)