72
72
#include " mongo/db/query/index_bounds.h"
73
73
#include " mongo/db/query/internal_plans.h"
74
74
#include " mongo/db/query/plan_yield_policy.h"
75
- #include " mongo/db/query/record_id_bound.h"
76
75
#include " mongo/db/query/write_ops/insert.h"
77
76
#include " mongo/db/record_id_helpers.h"
78
77
#include " mongo/db/repl/member_state.h"
84
83
#include " mongo/db/service_context.h"
85
84
#include " mongo/db/shard_role.h"
86
85
#include " mongo/db/stats/resource_consumption_metrics.h"
86
+ #include " mongo/db/timeseries/timeseries_constants.h"
87
87
#include " mongo/db/timeseries/timeseries_gen.h"
88
88
#include " mongo/db/transaction_resources.h"
89
89
#include " mongo/db/ttl/ttl_collection_cache.h"
@@ -159,6 +159,7 @@ std::unique_ptr<BatchedDeleteStageParams> getBatchedDeleteStageParams(bool batch
159
159
// 'safe' handling for time-series collections.
160
160
Date_t safeExpirationDate (OperationContext* opCtx,
161
161
const CollectionPtr& coll,
162
+ Date_t at,
162
163
std::int64_t expireAfterSeconds) {
163
164
if (auto timeseries = coll->getTimeseriesOptions ()) {
164
165
const auto bucketMaxSpan = Seconds (*timeseries->getBucketMaxSpanSeconds ());
@@ -168,10 +169,10 @@ Date_t safeExpirationDate(OperationContext* opCtx,
168
169
// time value of a bucket. A bucket may have newer data, so we cannot safely delete
169
170
// the entire bucket yet until the maximum bucket range has passed, even if the
170
171
// minimum value can be expired.
171
- return Date_t::now () - Seconds (expireAfterSeconds) - bucketMaxSpan;
172
+ return at - Seconds (expireAfterSeconds) - bucketMaxSpan;
172
173
}
173
174
174
- return Date_t::now () - Seconds (expireAfterSeconds);
175
+ return at - Seconds (expireAfterSeconds);
175
176
}
176
177
177
178
// Computes and returns the start 'RecordIdBound' with the correct type for a bounded, clustered
@@ -328,7 +329,9 @@ void TTLMonitor::run() {
328
329
try {
329
330
const auto opCtxPtr = cc ().makeOperationContext ();
330
331
writeConflictRetry (opCtxPtr.get (), " TTL pass" , NamespaceString::kEmpty , [&] {
331
- _doTTLPass (opCtxPtr.get ());
332
+ hangTTLMonitorBetweenPasses.pauseWhileSet (opCtxPtr.get ());
333
+
334
+ _doTTLPass (opCtxPtr.get (), Date_t::now ());
332
335
});
333
336
} catch (const DBException& ex) {
334
337
LOGV2_WARNING (22537 ,
@@ -350,16 +353,14 @@ void TTLMonitor::shutdown() {
350
353
LOGV2 (3684101 , " Finished shutting down TTL collection monitor thread" );
351
354
}
352
355
353
- void TTLMonitor::_doTTLPass (OperationContext* opCtx) {
356
+ void TTLMonitor::_doTTLPass (OperationContext* opCtx, Date_t at ) {
354
357
// Don't do work if we are a secondary (TTL will be handled by primary)
355
358
auto replCoordinator = repl::ReplicationCoordinator::get (opCtx);
356
359
if (replCoordinator && replCoordinator->getSettings ().isReplSet () &&
357
360
!replCoordinator->getMemberState ().primary ()) {
358
361
return ;
359
362
}
360
363
361
- hangTTLMonitorBetweenPasses.pauseWhileSet (opCtx);
362
-
363
364
// Increment the metric after the TTL work has been finished.
364
365
ON_BLOCK_EXIT ([&] { ttlPasses.increment (); });
365
366
@@ -369,11 +370,11 @@ void TTLMonitor::_doTTLPass(OperationContext* opCtx) {
369
370
// indicates that it did not delete everything possible, we continue performing sub-passes.
370
371
// This maintains the semantic that a full TTL pass deletes everything it possibly can
371
372
// before sleeping periodically.
372
- moreToDelete = _doTTLSubPass (opCtx);
373
+ moreToDelete = _doTTLSubPass (opCtx, at );
373
374
}
374
375
}
375
376
376
- bool TTLMonitor::_doTTLSubPass (OperationContext* opCtx) {
377
+ bool TTLMonitor::_doTTLSubPass (OperationContext* opCtx, Date_t at ) {
377
378
// If part of replSet but not in a readable state (e.g. during initial sync), skip.
378
379
if (repl::ReplicationCoordinator::get (opCtx)->getSettings ().isReplSet () &&
379
380
!repl::ReplicationCoordinator::get (opCtx)->getMemberState ().readable ())
@@ -400,7 +401,7 @@ bool TTLMonitor::_doTTLSubPass(OperationContext* opCtx) {
400
401
TTLCollectionCache::InfoMap moreWork;
401
402
for (const auto & [uuid, infos] : work) {
402
403
for (const auto & info : infos) {
403
- bool moreToDelete = _doTTLIndexDelete (opCtx, &ttlCollectionCache, uuid, info);
404
+ bool moreToDelete = _doTTLIndexDelete (opCtx, at, &ttlCollectionCache, uuid, info);
404
405
if (moreToDelete) {
405
406
moreWork[uuid].push_back (info);
406
407
}
@@ -417,6 +418,7 @@ bool TTLMonitor::_doTTLSubPass(OperationContext* opCtx) {
417
418
}
418
419
419
420
bool TTLMonitor::_doTTLIndexDelete (OperationContext* opCtx,
421
+ Date_t at,
420
422
TTLCollectionCache* ttlCollectionCache,
421
423
const UUID& uuid,
422
424
const TTLCollectionCache::Info& info) {
@@ -478,16 +480,31 @@ bool TTLMonitor::_doTTLIndexDelete(OperationContext* opCtx,
478
480
return false ;
479
481
}
480
482
481
- if (collectionPtr->getRequiresTimeseriesExtendedRangeSupport ()) {
482
- return false ;
483
- }
484
-
485
483
ResourceConsumption::ScopedMetricsCollector scopedMetrics (opCtx, nss->dbName ());
486
484
487
485
if (info.isClustered ()) {
488
- return _deleteExpiredWithCollscan (opCtx, ttlCollectionCache, coll);
486
+ const auto & collOptions = collectionPtr->getCollectionOptions ();
487
+ uassert (5400701 ,
488
+ " collection is not clustered but is described as being TTL" ,
489
+ collOptions.clusteredIndex );
490
+ invariant (collectionPtr->isClustered ());
491
+
492
+ auto expireAfterSeconds = collOptions.expireAfterSeconds ;
493
+ if (!expireAfterSeconds) {
494
+ ttlCollectionCache->deregisterTTLClusteredIndex (coll.uuid ());
495
+ return false ;
496
+ }
497
+
498
+ if (collectionPtr->getRequiresTimeseriesExtendedRangeSupport ()) {
499
+ return _deleteExpiredWithCollscanForTimeseriesExtendedRange (
500
+ opCtx, at, ttlCollectionCache, coll, *expireAfterSeconds);
501
+ } else {
502
+ return _deleteExpiredWithCollscan (
503
+ opCtx, at, ttlCollectionCache, coll, *expireAfterSeconds);
504
+ }
489
505
} else {
490
- return _deleteExpiredWithIndex (opCtx, ttlCollectionCache, coll, info.getIndexName ());
506
+ return _deleteExpiredWithIndex (
507
+ opCtx, at, ttlCollectionCache, coll, info.getIndexName ());
491
508
}
492
509
} catch (const ExceptionForCat<ErrorCategory::StaleShardVersionError>& ex) {
493
510
// The TTL index tried to delete some information from a sharded collection
@@ -548,6 +565,7 @@ bool TTLMonitor::_doTTLIndexDelete(OperationContext* opCtx,
548
565
}
549
566
550
567
bool TTLMonitor::_deleteExpiredWithIndex (OperationContext* opCtx,
568
+ Date_t at,
551
569
TTLCollectionCache* ttlCollectionCache,
552
570
const CollectionAcquisition& collection,
553
571
std::string indexName) {
@@ -574,7 +592,7 @@ bool TTLMonitor::_deleteExpiredWithIndex(OperationContext* opCtx,
574
592
575
593
auto expireAfterSeconds = spec[IndexDescriptor::kExpireAfterSecondsFieldName ].safeNumberLong ();
576
594
const Date_t kDawnOfTime = Date_t::fromMillisSinceEpoch (std::numeric_limits<long long >::min ());
577
- const auto expirationDate = safeExpirationDate (opCtx, collectionPtr, expireAfterSeconds);
595
+ const auto expirationDate = safeExpirationDate (opCtx, collectionPtr, at, expireAfterSeconds);
578
596
const BSONObj startKey = BSON (" " << kDawnOfTime );
579
597
const BSONObj endKey = BSON (" " << expirationDate);
580
598
@@ -648,28 +666,91 @@ bool TTLMonitor::_deleteExpiredWithIndex(OperationContext* opCtx,
648
666
}
649
667
650
668
bool TTLMonitor::_deleteExpiredWithCollscan (OperationContext* opCtx,
669
+ Date_t at,
651
670
TTLCollectionCache* ttlCollectionCache,
652
- const CollectionAcquisition& collection) {
653
- const auto & collectionPtr = collection.getCollectionPtr ();
654
- const auto & collOptions = collectionPtr->getCollectionOptions ();
655
- uassert (5400701 ,
656
- " collection is not clustered but is described as being TTL" ,
657
- collOptions.clusteredIndex );
658
- invariant (collectionPtr->isClustered ());
659
-
660
- auto expireAfterSeconds = collOptions.expireAfterSeconds ;
661
- if (!expireAfterSeconds) {
662
- ttlCollectionCache->deregisterTTLClusteredIndex (collection.uuid ());
663
- return false ;
664
- }
665
-
671
+ const CollectionAcquisition& collection,
672
+ int64_t expireAfterSeconds) {
666
673
LOGV2_DEBUG (5400704 , 1 , " running TTL job for clustered collection" , logAttrs (collection.nss ()));
674
+ const auto & collectionPtr = collection.getCollectionPtr ();
667
675
668
676
const auto startId = makeCollScanStartBound (collectionPtr, Date_t{});
669
677
670
- const auto expirationDate = safeExpirationDate (opCtx, collectionPtr, * expireAfterSeconds);
678
+ const auto expirationDate = safeExpirationDate (opCtx, collectionPtr, at, expireAfterSeconds);
671
679
const auto endId = makeCollScanEndBound (collectionPtr, expirationDate);
672
680
681
+ return _performDeleteExpiredWithCollscan (
682
+ opCtx, collection, startId, endId, /* forward*/ true , /* filter*/ nullptr );
683
+ }
684
+
685
+ bool TTLMonitor::_deleteExpiredWithCollscanForTimeseriesExtendedRange (
686
+ OperationContext* opCtx,
687
+ Date_t at,
688
+ TTLCollectionCache* ttlCollectionCache,
689
+ const CollectionAcquisition& collection,
690
+ int64_t expireAfterSeconds) {
691
+ // We cannot rely on the _id index for time-series data with extended time ranges. In theory
692
+ // data eligible for deletion could be located anywhere in the collection. It would not be
693
+ // performant to consider any bucket document. We instead run the deletion in two separate
694
+ // batches: [epoch, at-expiry] and [2038, 2106]. The second range will include data prior to
695
+ // the epoch unless they are too far from the epoch that cause then to be truncated into the
696
+ // [at-expiry, 2038] range that we don't consider for deletion. This is an acceptible tradeoff
697
+ // until we have a new _id format for time-series.
698
+ LOGV2_DEBUG (9736801 ,
699
+ 1 ,
700
+ " running TTL job for timeseries collection with extended range" ,
701
+ logAttrs (collection.nss ()));
702
+
703
+ const auto & collectionPtr = collection.getCollectionPtr ();
704
+ bool passTargetMet = false ;
705
+
706
+ auto timeSeriesOptions = collectionPtr->getTimeseriesOptions ();
707
+ std::string timeField =
708
+ timeseries::kControlMaxFieldNamePrefix .toString () + timeSeriesOptions->getTimeField ();
709
+ LTEMatchExpression filter (boost::optional<StringData>{timeField},
710
+ Value{at - Seconds (expireAfterSeconds)});
711
+
712
+ // Delete from the beginning of the clustered _id index. In the typical case we consider
713
+ // anything from the epoch to at-expiry eligible for deletion. We add a filter to ensure we
714
+ // don't delete any data after 2038 that is not eligible for deletion.
715
+ {
716
+ const auto startId = makeCollScanStartBound (collectionPtr, Date_t{});
717
+ const auto expirationDate =
718
+ safeExpirationDate (opCtx, collectionPtr, at, expireAfterSeconds);
719
+ const auto endId = makeCollScanEndBound (collectionPtr, expirationDate);
720
+
721
+ passTargetMet |= _performDeleteExpiredWithCollscan (
722
+ opCtx, collection, startId, endId, /* forward*/ true , &filter);
723
+ }
724
+
725
+ // Delete from the end of the clustered _id index. In the typical case nothing should be
726
+ // deleted. But data prior to 1970 is sorted at the end and is eligible for deletion. We add a
727
+ // filter to ensure we only delete such data. {
728
+ {
729
+ // 0x80000000 (in seconds) is the first value that no longer fits in a signed 32bit integer.
730
+ // We subtract the bucket span to get the beginning of the range we should consider
731
+ // deleting.
732
+ const auto startId = makeCollScanStartBound (
733
+ collectionPtr,
734
+ Date_t::fromMillisSinceEpoch ((static_cast <long long >(0x80000000 ) -
735
+ *timeSeriesOptions->getBucketMaxSpanSeconds ()) *
736
+ 1000 ));
737
+
738
+ const auto endId = makeCollScanEndBound (
739
+ collectionPtr, Date_t::fromMillisSinceEpoch (static_cast <long long >(0xFFFFFFFF ) * 1000 ));
740
+
741
+ passTargetMet |= _performDeleteExpiredWithCollscan (
742
+ opCtx, collection, startId, endId, /* forward*/ false , &filter);
743
+ }
744
+ return passTargetMet;
745
+ }
746
+
747
+
748
+ bool TTLMonitor::_performDeleteExpiredWithCollscan (OperationContext* opCtx,
749
+ const CollectionAcquisition& collection,
750
+ const RecordIdBound& startBound,
751
+ const RecordIdBound& endBound,
752
+ bool forward,
753
+ const MatchExpression* filter) {
673
754
auto params = std::make_unique<DeleteStageParams>();
674
755
params->isMulti = true ;
675
756
@@ -686,11 +767,12 @@ bool TTLMonitor::_deleteExpiredWithCollscan(OperationContext* opCtx,
686
767
collection,
687
768
std::move (params),
688
769
PlanYieldPolicy::YieldPolicy::YIELD_AUTO,
689
- InternalPlanner::Direction::FORWARD,
690
- startId ,
691
- endId ,
770
+ forward ? InternalPlanner::Direction::FORWARD : InternalPlanner::Direction::BACKWARD ,
771
+ startBound ,
772
+ endBound ,
692
773
CollectionScanParams::ScanBoundInclusion::kIncludeBothStartAndEndRecords ,
693
- getBatchedDeleteStageParams (batchingEnabled));
774
+ getBatchedDeleteStageParams (batchingEnabled),
775
+ filter);
694
776
695
777
try {
696
778
const auto numDeleted = exec->executeDelete ();
@@ -706,7 +788,9 @@ bool TTLMonitor::_deleteExpiredWithCollscan(OperationContext* opCtx,
706
788
" Deleted expired documents using clustered index scan" ,
707
789
logAttrs (collection.nss ()),
708
790
" numDeleted" _attr = numDeleted,
709
- " duration" _attr = duration);
791
+ " duration" _attr = duration,
792
+ " extendedRange" _attr =
793
+ collection.getCollectionPtr ()->getRequiresTimeseriesExtendedRangeSupport ());
710
794
}
711
795
if (batchingEnabled) {
712
796
auto batchedDeleteStats = exec->getBatchedDeleteStats ();
0 commit comments