Skip to content

Commit 2ce2f5a

Browse files
authored
fix:修复healthcheckLeader偶发卡死问题 (#1289)
1 parent 05a1a37 commit 2ce2f5a

File tree

3 files changed

+30
-46
lines changed

3 files changed

+30
-46
lines changed

plugin/healthchecker/leader/checker_leader.go

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -105,10 +105,6 @@ type LeaderHealthChecker struct {
105105
self Peer
106106
// s store.Store
107107
s store.Store
108-
// putBatchCtrl 批任务执行器
109-
putBatchCtrl *batchjob.BatchController
110-
// getBatchCtrl 批任务执行器
111-
getBatchCtrl *batchjob.BatchController
112108
// subCtx
113109
subCtx *eventhub.SubscribtionContext
114110
}
@@ -146,22 +142,6 @@ func (c *LeaderHealthChecker) Initialize(entry *plugin.ConfigEntry) error {
146142
if err := c.s.StartLeaderElection(electionKey); err != nil {
147143
return err
148144
}
149-
c.getBatchCtrl = batchjob.NewBatchController(context.Background(), batchjob.CtrlConfig{
150-
Label: "RecordGetter",
151-
QueueSize: conf.Batch.QueueSize,
152-
WaitTime: conf.Batch.WaitTime,
153-
MaxBatchCount: conf.Batch.MaxBatchCount,
154-
Concurrency: conf.Batch.Concurrency,
155-
Handler: c.handleSendGetRecords,
156-
})
157-
c.putBatchCtrl = batchjob.NewBatchController(context.Background(), batchjob.CtrlConfig{
158-
Label: "RecordPutter",
159-
QueueSize: conf.Batch.QueueSize,
160-
WaitTime: conf.Batch.WaitTime,
161-
MaxBatchCount: conf.Batch.MaxBatchCount,
162-
Concurrency: conf.Batch.Concurrency,
163-
Handler: c.handleSendPutRecords,
164-
})
165145
registerMetrics()
166146
return nil
167147
}
@@ -235,6 +215,7 @@ func (c *LeaderHealthChecker) becomeFollower(e store.LeaderChangeEvent, leaderVe
235215
remoteLeader := NewRemotePeerFunc()
236216
remoteLeader.Initialize(*c.conf)
237217
if err := remoteLeader.Serve(context.Background(), c, e.LeaderHost, uint32(utils.LocalPort)); err != nil {
218+
_ = remoteLeader.Close()
238219
plog.Error("[HealthCheck][Leader] follower run serve, do retry", zap.Error(err))
239220
go func(e store.LeaderChangeEvent, leaderVersion int64) {
240221
time.Sleep(time.Second)
@@ -267,7 +248,8 @@ func (c *LeaderHealthChecker) Type() plugin.HealthCheckType {
267248

268249
// Report process heartbeat info report
269250
func (c *LeaderHealthChecker) Report(ctx context.Context, request *plugin.ReportRequest) error {
270-
if isSendFromPeer(ctx) {
251+
if !c.isLeader() && isSendFromPeer(ctx) {
252+
plog.Error("[Health Check][Leader] follower checker receive other follower request")
271253
return ErrorRedirectOnlyOnce
272254
}
273255

plugin/healthchecker/leader/peer.go

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -132,10 +132,6 @@ type RemotePeer struct {
132132
port uint32
133133
// Conn grpc connection
134134
conns []*grpc.ClientConn
135-
// putBatchCtrl 批任务执行器
136-
putBatchCtrl *batchjob.BatchController
137-
// getBatchCtrl 批任务执行器
138-
getBatchCtrl *batchjob.BatchController
139135
// Puters 批量心跳发送, 由于一个 stream 对于 server 是一个 goroutine,为了加快 follower 发往 leader 的效率
140136
// 这里采用多个 Putter Client 创建多个 Stream
141137
puters []*beatSender
@@ -185,12 +181,8 @@ func (p *RemotePeer) Serve(_ context.Context, checker *LeaderHealthChecker,
185181
_ = p.Close()
186182
return err
187183
}
188-
p.puters = append(p.puters, &beatSender{
189-
sender: puter,
190-
})
184+
p.puters = append(p.puters, newBeatSender(ctx, p, puter))
191185
}
192-
p.getBatchCtrl = checker.getBatchCtrl
193-
p.putBatchCtrl = checker.putBatchCtrl
194186
p.Cache = newRemoteBeatRecordCache(p.GetFunc, p.PutFunc, p.DelFunc)
195187
return nil
196188
}
@@ -201,26 +193,14 @@ func (p *RemotePeer) Host() string {
201193

202194
// Get get records
203195
func (p *RemotePeer) Get(key string) (*ReadBeatRecord, error) {
204-
future := p.getBatchCtrl.SubmitWithTimeout(&PeerTask{
205-
Key: key,
206-
Peer: p,
207-
}, time.Second)
208-
resp, err := future.DoneTimeout(time.Second)
209-
if err != nil {
210-
return nil, err
211-
}
212-
ret := resp.(map[string]*ReadBeatRecord)
196+
ret := p.Cache.Get(key)
213197
return ret[key], nil
214198
}
215199

216200
// Put put records
217201
func (p *RemotePeer) Put(record WriteBeatRecord) error {
218-
future := p.putBatchCtrl.SubmitWithTimeout(&PeerTask{
219-
Record: &record,
220-
Peer: p,
221-
}, time.Second)
222-
_, err := future.DoneTimeout(time.Second)
223-
return err
202+
p.Cache.Put(record)
203+
return nil
224204
}
225205

226206
// Del del records
@@ -343,6 +323,28 @@ type beatSender struct {
343323
sender apiservice.PolarisHeartbeatGRPC_BatchHeartbeatClient
344324
}
345325

326+
func newBeatSender(ctx context.Context, p *RemotePeer, sender apiservice.PolarisHeartbeatGRPC_BatchHeartbeatClient) *beatSender {
327+
go func(ctx context.Context) {
328+
for {
329+
select {
330+
case <-ctx.Done():
331+
plog.Info("[HealthCheck][Leader] cancel receive put record result", zap.String("host", p.Host()),
332+
zap.Uint32("port", p.port))
333+
return
334+
default:
335+
if _, err := sender.Recv(); err != nil {
336+
plog.Error("[HealthCheck][Leader] receive put record result", zap.String("host", p.Host()),
337+
zap.Uint32("port", p.port), zap.Error(err))
338+
}
339+
}
340+
}
341+
}(ctx)
342+
343+
return &beatSender{
344+
sender: sender,
345+
}
346+
}
347+
346348
func (s *beatSender) Send(req *apiservice.HeartbeatsRequest) error {
347349
s.lock.Lock()
348350
defer s.lock.Unlock()

version

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
v1.17.7
1+
v1.17.8

0 commit comments

Comments
 (0)