@@ -355,6 +355,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
355
355
};
356
356
357
357
EBucketState BucketState = EBucketState::InMemory;
358
+ ui64 LineCount = 0 ;
358
359
};
359
360
360
361
enum class EOperatingMode {
@@ -417,7 +418,9 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
417
418
case EOperatingMode::Spilling: {
418
419
UpdateSpillingBuckets ();
419
420
420
- if (!HasMemoryForProcessing () && InputStatus != EFetchResult::Finish && TryToReduceMemoryAndWait ()) return EUpdateResult::Yield;
421
+ if (!HasMemoryForProcessing () && InputStatus != EFetchResult::Finish && TryToReduceMemoryAndWait ()) {
422
+ return EUpdateResult::Yield;
423
+ }
421
424
422
425
if (BufferForUsedInputItems.size ()) {
423
426
auto & bucket = SpilledBuckets[BufferForUsedInputItemsBucketId];
@@ -456,13 +459,16 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
456
459
457
460
if (bucket.BucketState == TSpilledBucket::EBucketState::InMemory) {
458
461
std::copy_n (ViewForKeyAndState.data (), KeyWidth, static_cast <NUdf::TUnboxedValue*>(bucket.InMemoryProcessingState ->Tongue ));
459
-
462
+
460
463
bool isNew = bucket.InMemoryProcessingState ->TasteIt ();
461
464
Throat = bucket.InMemoryProcessingState ->Throat ;
465
+ bucket.LineCount += isNew;
466
+
462
467
return isNew ? ETasteResult::Init : ETasteResult::Update;
463
468
}
464
-
465
- // Prepare space for raw data
469
+ bucket.LineCount ++;
470
+
471
+ // Prepare space for raw data
466
472
MKQL_ENSURE (BufferForUsedInputItems.size () == 0 , " Internal logic error" );
467
473
BufferForUsedInputItems.resize (ItemNodesSize);
468
474
BufferForUsedInputItemsBucketId = bucketId;
@@ -485,6 +491,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
485
491
486
492
value = static_cast <NUdf::TUnboxedValue*>(SpilledBuckets.front ().InMemoryProcessingState ->Extract ());
487
493
if (!value) {
494
+ SpilledBuckets.front ().InMemoryProcessingState ->ReadMore <false >();
488
495
SpilledBuckets.pop_front ();
489
496
if (SpilledBuckets.empty ()) IsEverythingExtracted = true ;
490
497
}
@@ -521,6 +528,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
521
528
auto bucketId = hash % SpilledBucketCount;
522
529
auto & bucket = SpilledBuckets[bucketId];
523
530
531
+ bucket.LineCount ++;
524
532
auto & processingState = *bucket.InMemoryProcessingState ;
525
533
526
534
for (size_t i = 0 ; i < KeyWidth; ++i) {
@@ -566,6 +574,8 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
566
574
567
575
if (bucket.BucketState == TSpilledBucket::EBucketState::InMemory) {
568
576
bucket.BucketState = TSpilledBucket::EBucketState::SpillingState;
577
+ SpillingBucketsCount++;
578
+ InMemoryBucketsCount--;
569
579
}
570
580
571
581
while (const auto keyAndState = static_cast <NUdf::TUnboxedValue*>(bucket.InMemoryProcessingState ->Extract ())) {
@@ -583,10 +593,11 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
583
593
bucket.InMemoryProcessingState ->ReadMore <false >();
584
594
585
595
bucket.BucketState = TSpilledBucket::EBucketState::SpillingData;
596
+ SpillingBucketsCount--;
586
597
}
587
598
588
599
void UpdateSpillingBuckets () {
589
- for (ui64 i = 0 ; i < NextBucketToSpill ; ++i) {
600
+ for (ui64 i = 0 ; i < SpilledBucketCount ; ++i) {
590
601
auto & bucket = SpilledBuckets[i];
591
602
if (bucket.AsyncWriteOperation .has_value () && bucket.AsyncWriteOperation ->HasValue ()) {
592
603
if (bucket.BucketState == TSpilledBucket::EBucketState::SpillingState) {
@@ -604,16 +615,27 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
604
615
}
605
616
606
617
bool TryToReduceMemoryAndWait () {
607
- for (ui64 i = 0 ; i < NextBucketToSpill; ++i ) {
608
- if (SpilledBuckets[i]. BucketState == TSpilledBucket::EBucketState::SpillingState) return true ;
618
+ if (SpillingBucketsCount > 0 ) {
619
+ return true ;
609
620
}
621
+ while (InMemoryBucketsCount > 0 ) {
622
+ ui64 maxLineCount = 0 ;
623
+ ui32 maxLineBucketInd = (ui32)-1 ;
624
+ for (ui64 i = 0 ; i < SpilledBucketCount; ++i) {
625
+ const auto & bucket = SpilledBuckets[i];
626
+ if (bucket.BucketState == TSpilledBucket::EBucketState::InMemory && (maxLineBucketInd == (ui32)-1 || bucket.LineCount > maxLineCount)) {
627
+ maxLineCount = bucket.LineCount ;
628
+ maxLineBucketInd = i;
629
+ }
630
+ }
631
+ MKQL_ENSURE (maxLineBucketInd != (ui32)-1 , " Internal logic error" );
610
632
611
- while (NextBucketToSpill < SpilledBucketCount) {
612
- auto & bucket = SpilledBuckets[NextBucketToSpill++];
613
- SpillMoreStateFromBucket (bucket);
614
- if (bucket.BucketState == TSpilledBucket::EBucketState::SpillingState) return true ;
633
+ auto & bucketToSpill = SpilledBuckets[maxLineBucketInd];
634
+ SpillMoreStateFromBucket (bucketToSpill);
635
+ if (bucketToSpill.BucketState == TSpilledBucket::EBucketState::SpillingState) {
636
+ return true ;
637
+ }
615
638
}
616
-
617
639
return false ;
618
640
}
619
641
@@ -661,7 +683,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
661
683
662
684
Throat = BufferForUsedInputItems.data ();
663
685
Tongue = bucket.InMemoryProcessingState ->Tongue ;
664
-
686
+
665
687
return EUpdateResult::ExtractRawData;
666
688
}
667
689
bucket.BucketState = TSpilledBucket::EBucketState::InMemory;
@@ -719,8 +741,6 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
719
741
NUdf::TUnboxedValuePod* Throat = nullptr ;
720
742
721
743
private:
722
- ui64 NextBucketToSpill = 0 ;
723
-
724
744
bool IsEverythingExtracted = false ;
725
745
726
746
TState InMemoryProcessingState;
@@ -735,6 +755,8 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
735
755
TAsyncReadOperation AsyncReadOperation = std::nullopt;
736
756
static constexpr size_t SpilledBucketCount = 128 ;
737
757
std::deque<TSpilledBucket> SpilledBuckets;
758
+ ui32 SpillingBucketsCount = 0 ;
759
+ ui32 InMemoryBucketsCount = SpilledBucketCount;
738
760
ui64 BufferForUsedInputItemsBucketId;
739
761
TUnboxedValueVector BufferForUsedInputItems;
740
762
std::vector<NUdf::TUnboxedValuePod, TMKQLAllocator<NUdf::TUnboxedValuePod>> ViewForKeyAndState;
@@ -1237,7 +1259,7 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideLastCombinerWra
1237
1259
EFetchResult DoCalculate (NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const * output) const {
1238
1260
if (state.IsInvalid ()) {
1239
1261
MakeState (ctx, state);
1240
- }
1262
+ }
1241
1263
1242
1264
if (const auto ptr = static_cast <TSpillingSupportState*>(state.AsBoxed ().Get ())) {
1243
1265
auto **fields = ctx.WideFields .data () + WideFieldsIndex;
0 commit comments