Skip to content

Commit 043e333

Browse files
MrPresent-HanMrPresent-Han
and
MrPresent-Han
authored
enhance: support strict expiry compaction for milvus(#41855) (#41856)
related: #41855 Signed-off-by: MrPresent-Han <chun.han@gmail.com> Co-authored-by: MrPresent-Han <chun.han@gmail.com>
1 parent 7c8370c commit 043e333

File tree

4 files changed

+180
-55
lines changed

4 files changed

+180
-55
lines changed

configs/milvus.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -596,6 +596,7 @@ dataCoord:
596596
maxSize: 67108864 # The maxmum size in bytes to force trigger a LevelZero Compaction, default as 64MB
597597
deltalogMinNum: 10 # The minimum number of deltalog files to force trigger a LevelZero Compaction
598598
deltalogMaxNum: 30 # The maxmum number of deltalog files to force trigger a LevelZero Compaction, default as 30
599+
forceExpiryInterval: -1 # interval in hours to do force expiry compaction, -1 means disable force expiry compaction
599600
single:
600601
ratio:
601602
threshold: 0.2 # The ratio threshold of a segment to trigger a single compaction, default as 0.2

internal/datacoord/compaction_trigger.go

Lines changed: 76 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,16 @@ type trigger interface {
5353
}
5454

5555
type compactionSignal struct {
56-
id UniqueID
57-
isForce bool
58-
collectionID UniqueID
59-
partitionID UniqueID
60-
channel string
61-
segmentIDs []UniqueID
62-
pos *msgpb.MsgPosition
63-
resultCh chan error
64-
waitResult bool
56+
id UniqueID
57+
isForce bool
58+
collectionID UniqueID
59+
partitionID UniqueID
60+
channel string
61+
segmentIDs []UniqueID
62+
pos *msgpb.MsgPosition
63+
resultCh chan error
64+
waitResult bool
65+
doStrictExpiryCompaction bool
6566
}
6667

6768
func NewCompactionSignal() *compactionSignal {
@@ -133,6 +134,8 @@ type compactionTrigger struct {
133134
// A sloopy hack, so we can test with different segment row count without worrying that
134135
// they are re-calculated in every compaction.
135136
testingOnly bool
137+
// no need to use mutex for this map, as all operations towards should be executed serially
138+
lastStrictExpiryCompactionTsMap map[CompactionGroupLabel]time.Time
136139
}
137140

138141
func newCompactionTrigger(
@@ -143,16 +146,17 @@ func newCompactionTrigger(
143146
indexVersionManager IndexEngineVersionManager,
144147
) *compactionTrigger {
145148
return &compactionTrigger{
146-
meta: meta,
147-
allocator: allocator,
148-
signals: make(chan *compactionSignal, 100),
149-
manualSignals: make(chan *compactionSignal, 100),
150-
compactionHandler: compactionHandler,
151-
indexEngineVersionManager: indexVersionManager,
152-
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
153-
estimateNonDiskSegmentPolicy: calBySchemaPolicy,
154-
handler: handler,
155-
closeCh: lifetime.NewSafeChan(),
149+
meta: meta,
150+
allocator: allocator,
151+
signals: make(chan *compactionSignal, 100),
152+
manualSignals: make(chan *compactionSignal, 100),
153+
compactionHandler: compactionHandler,
154+
indexEngineVersionManager: indexVersionManager,
155+
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
156+
estimateNonDiskSegmentPolicy: calBySchemaPolicy,
157+
handler: handler,
158+
closeCh: lifetime.NewSafeChan(),
159+
lastStrictExpiryCompactionTsMap: make(map[CompactionGroupLabel]time.Time, 0),
156160
}
157161
}
158162

@@ -321,6 +325,24 @@ func (t *compactionTrigger) allocSignalID(ctx context.Context) (UniqueID, error)
321325
return t.allocator.AllocID(ctx)
322326
}
323327

328+
func (t *compactionTrigger) shouldDoStrictExpiryCompaction(group *chanPartSegments) bool {
329+
if paramtable.Get().DataCoordCfg.CompactionForceExpiryInterval.GetAsInt() > 0 {
330+
cate := CompactionGroupLabel{group.collectionID, group.partitionID, group.channelName}
331+
lastExpiryCompactionTime, ok := t.lastStrictExpiryCompactionTsMap[cate]
332+
if !ok || time.Since(lastExpiryCompactionTime) >= paramtable.Get().DataCoordCfg.CompactionForceExpiryInterval.GetAsDuration(time.Hour) {
333+
return true
334+
}
335+
}
336+
return false
337+
}
338+
339+
func (t *compactionTrigger) mayUpdateStrictExpiryCompactionTs(signal *compactionSignal, plansSubmitted bool) {
340+
if paramtable.Get().DataCoordCfg.CompactionForceExpiryInterval.GetAsInt() > 0 && signal.doStrictExpiryCompaction && plansSubmitted {
341+
cate := CompactionGroupLabel{signal.collectionID, signal.partitionID, signal.channel}
342+
t.lastStrictExpiryCompactionTsMap[cate] = time.Now()
343+
}
344+
}
345+
324346
// handleSignal is the internal logic to convert compactionSignal into compaction tasks.
325347
func (t *compactionTrigger) handleSignal(signal *compactionSignal) error {
326348
log := log.With(zap.Int64("compactionID", signal.id),
@@ -375,10 +397,13 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) error {
375397
}
376398

377399
expectedSize := getExpectedSegmentSize(t.meta, coll)
400+
signal.doStrictExpiryCompaction = t.shouldDoStrictExpiryCompaction(&group)
378401
plans := t.generatePlans(group.segments, signal, ct, expectedSize)
402+
plansSubmitted := true
379403
for _, plan := range plans {
380404
if !signal.isForce && t.compactionHandler.isFull() {
381405
log.Warn("compaction plan skipped due to handler full")
406+
plansSubmitted = false
382407
break
383408
}
384409
totalRows, inputSegmentIDs := plan.A, plan.B
@@ -422,13 +447,15 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) error {
422447
zap.Int64("planID", task.GetPlanID()),
423448
zap.Int64s("inputSegments", inputSegmentIDs),
424449
zap.Error(err))
450+
plansSubmitted = false
425451
continue
426452
}
427453

428454
log.Info("time cost of generating global compaction",
429455
zap.Int64("time cost", time.Since(start).Milliseconds()),
430456
zap.Int64s("segmentIDs", inputSegmentIDs))
431457
}
458+
t.mayUpdateStrictExpiryCompactionTs(signal, plansSubmitted)
432459
}
433460
return nil
434461
}
@@ -449,7 +476,7 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, signal *compa
449476
for _, segment := range segments {
450477
segment := segment.ShadowClone()
451478
// TODO should we trigger compaction periodically even if the segment has no obvious reason to be compacted?
452-
if signal.isForce || t.ShouldDoSingleCompaction(segment, compactTime) {
479+
if signal.isForce || t.ShouldDoSingleCompaction(segment, compactTime, signal) {
453480
prioritizedCandidates = append(prioritizedCandidates, segment)
454481
} else if t.isSmallSegment(segment, expectedSize) {
455482
smallCandidates = append(smallCandidates, segment)
@@ -587,25 +614,19 @@ func (t *compactionTrigger) getCandidates(signal *compactionSignal) ([]chanPartS
587614
if len(signal.segmentIDs) > 0 && len(segments) != len(signal.segmentIDs) {
588615
return nil, merr.WrapErrServiceInternal("not all segment ids provided could be compacted")
589616
}
590-
591-
type category struct {
592-
collectionID int64
593-
partitionID int64
594-
channelName string
595-
}
596-
groups := lo.GroupBy(segments, func(segment *SegmentInfo) category {
597-
return category{
598-
collectionID: segment.CollectionID,
599-
partitionID: segment.PartitionID,
600-
channelName: segment.InsertChannel,
617+
groups := lo.GroupBy(segments, func(segment *SegmentInfo) CompactionGroupLabel {
618+
return CompactionGroupLabel{
619+
CollectionID: segment.CollectionID,
620+
PartitionID: segment.PartitionID,
621+
Channel: segment.InsertChannel,
601622
}
602623
})
603624

604-
return lo.MapToSlice(groups, func(c category, segments []*SegmentInfo) chanPartSegments {
625+
return lo.MapToSlice(groups, func(c CompactionGroupLabel, segments []*SegmentInfo) chanPartSegments {
605626
return chanPartSegments{
606-
collectionID: c.collectionID,
607-
partitionID: c.partitionID,
608-
channelName: c.channelName,
627+
collectionID: c.CollectionID,
628+
partitionID: c.PartitionID,
629+
channelName: c.Channel,
609630
segments: segments,
610631
}
611632
}), nil
@@ -659,7 +680,20 @@ func isDeleteRowsTooManySegment(segment *SegmentInfo) bool {
659680
return is
660681
}
661682

662-
func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, compactTime *compactTime) bool {
683+
func (t *compactionTrigger) ShouldStrictCompactExpiry(fromTs uint64, compactTime *compactTime, signal *compactionSignal, segID int64) bool {
684+
if signal != nil && signal.doStrictExpiryCompaction && fromTs <= compactTime.expireTime {
685+
log.Info("Trigger strict expiry compaction for segment",
686+
zap.Int64("segmentID", segID),
687+
zap.Int64("collectionID", signal.collectionID),
688+
zap.Int64("partition", signal.partitionID),
689+
zap.String("channel", signal.channel),
690+
)
691+
return true
692+
}
693+
return false
694+
}
695+
696+
func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, compactTime *compactTime, signal *compactionSignal) bool {
663697
// no longer restricted binlog numbers because this is now related to field numbers
664698

665699
log := log.Ctx(context.TODO())
@@ -673,6 +707,7 @@ func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, compa
673707
// if expire time is enabled, put segment into compaction candidate
674708
totalExpiredSize := int64(0)
675709
totalExpiredRows := 0
710+
var earliestFromTs uint64 = math.MaxUint64
676711
for _, binlogs := range segment.GetBinlogs() {
677712
for _, l := range binlogs.GetBinlogs() {
678713
// TODO, we should probably estimate expired log entries by total rows in binlog and the ralationship of timeTo, timeFrom and expire time
@@ -685,9 +720,14 @@ func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, compa
685720
totalExpiredRows += int(l.GetEntriesNum())
686721
totalExpiredSize += l.GetMemorySize()
687722
}
723+
earliestFromTs = min(earliestFromTs, l.TimestampFrom)
688724
}
689725
}
690726

727+
if t.ShouldStrictCompactExpiry(earliestFromTs, compactTime, signal, segment.GetID()) {
728+
return true
729+
}
730+
691731
if float64(totalExpiredRows)/float64(segment.GetNumOfRows()) >= Params.DataCoordCfg.SingleCompactionRatioThreshold.GetAsFloat() ||
692732
totalExpiredSize > Params.DataCoordCfg.SingleCompactionExpiredLogMaxSize.GetAsInt64() {
693733
log.Info("total expired entities is too much, trigger compaction", zap.Int64("segmentID", segment.ID),

0 commit comments

Comments
 (0)