Skip to content

Commit 0f0cb4c

Browse files
authored
enhance: [2.5] Close component in topological order when unsub channel (#40796) (#40819)
Cherry-pick from master pr: #40796 Related to #40795 --------- Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
1 parent 7c716d6 commit 0f0cb4c

File tree

2 files changed

+11
-3
lines changed

2 files changed

+11
-3
lines changed

internal/querynodev2/services.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -366,10 +366,11 @@ func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmC
366366
defer node.unsubscribingChannels.Remove(req.GetChannelName())
367367
delegator, ok := node.delegators.GetAndRemove(req.GetChannelName())
368368
if ok {
369+
node.pipelineManager.Remove(req.GetChannelName())
370+
369371
// close the delegator first to block all coming query/search requests
370372
delegator.Close()
371373

372-
node.pipelineManager.Remove(req.GetChannelName())
373374
node.manager.Segment.RemoveBy(ctx, segments.WithChannel(req.GetChannelName()), segments.WithType(segments.SegmentTypeGrowing))
374375
_, sealed := node.manager.Segment.RemoveBy(ctx, segments.WithChannel(req.GetChannelName()), segments.WithLevel(datapb.SegmentLevel_L0))
375376
// node.tSafeManager.Remove(ctx, req.GetChannelName())

internal/util/pipeline/stream_pipeline.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,11 @@ func (p *streamPipeline) work() {
7171
case <-p.closeCh:
7272
log.Ctx(context.TODO()).Debug("stream pipeline input closed")
7373
return
74-
case msg := <-p.input:
74+
case msg, ok := <-p.input:
75+
if !ok {
76+
log.Ctx(context.TODO()).Debug("stream pipeline input closed")
77+
return
78+
}
7579
p.lastAccessTime.Store(time.Now())
7680
log.Ctx(context.TODO()).RatedDebug(10, "stream pipeline fetch msg", zap.Int("sum", len(msg.Msgs)))
7781
p.pipeline.inputChannel <- msg
@@ -169,12 +173,15 @@ func (p *streamPipeline) Start() error {
169173

170174
func (p *streamPipeline) Close() {
171175
p.closeOnce.Do(func() {
176+
// close datasource first
177+
p.dispatcher.Deregister(p.vChannel)
178+
// close stream input
172179
close(p.closeCh)
173180
p.closeWg.Wait()
174181
if p.scanner != nil {
175182
p.scanner.Close()
176183
}
177-
p.dispatcher.Deregister(p.vChannel)
184+
// close the underline pipeline
178185
p.pipeline.Close()
179186
})
180187
}

0 commit comments

Comments
 (0)