Skip to content

Commit 1b8df99

Browse files
committed
After spilling join inmemory buckets first
теперь при спиллинге сначала будут джоиниться бакеты, которые сейчас есть в памяти, а только потом уже загружаться новые бакеты commit_hash:bb66673affba8f5f65eb7ab79ce6d09b26f77ec2
1 parent cd83d36 commit 1b8df99

File tree

3 files changed

+70
-57
lines changed

3 files changed

+70
-57
lines changed

yql/essentials/minikql/comp_nodes/mkql_grace_join.cpp

Lines changed: 47 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -656,6 +656,15 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
656656
break;
657657
}
658658
case EOperatingMode::ProcessSpilled: {
659+
SpilledBucketsJoinOrder.reserve(GraceJoin::NumberOfBuckets);
660+
for (ui32 i = 0; i < GraceJoin::NumberOfBuckets; ++i) SpilledBucketsJoinOrder.push_back(i);
661+
662+
std::sort(SpilledBucketsJoinOrder.begin(), SpilledBucketsJoinOrder.end(), [&](ui32 lhs, ui32 rhs) {
663+
auto lhs_in_memory = LeftPacker->TablePtr->IsBucketInMemory(lhs) + RightPacker->TablePtr->IsBucketInMemory(lhs);
664+
auto rhs_in_memory = LeftPacker->TablePtr->IsBucketInMemory(rhs) + RightPacker->TablePtr->IsBucketInMemory(rhs);
665+
666+
return lhs_in_memory > rhs_in_memory;
667+
});
659668
MKQL_ENSURE(EOperatingMode::Spilling == Mode, "Internal logic error");
660669
break;
661670
}
@@ -871,8 +880,22 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
871880
}
872881

873882
bool TryToReduceMemoryAndWait() {
874-
bool isWaitingLeftForReduce = LeftPacker->TablePtr->TryToReduceMemoryAndWait();
875-
bool isWaitingRightForReduce = RightPacker->TablePtr->TryToReduceMemoryAndWait();
883+
if (!IsSpillingFinished()) return true;
884+
i32 largestBucketsPairIndex = 0;
885+
ui64 largestBucketsPairSize = 0;
886+
for (ui32 bucket = 0; bucket < GraceJoin::NumberOfBuckets; ++bucket) {
887+
888+
ui64 leftBucketSize = LeftPacker->TablePtr->GetSizeOfBucket(bucket);
889+
ui64 rightBucketSize = RightPacker->TablePtr->GetSizeOfBucket(bucket);
890+
ui64 totalSize = leftBucketSize + rightBucketSize;
891+
if (totalSize > largestBucketsPairSize) {
892+
largestBucketsPairSize = totalSize;
893+
largestBucketsPairIndex = bucket;
894+
}
895+
}
896+
897+
bool isWaitingLeftForReduce = LeftPacker->TablePtr->TryToReduceMemoryAndWait(largestBucketsPairIndex);
898+
bool isWaitingRightForReduce = RightPacker->TablePtr->TryToReduceMemoryAndWait(largestBucketsPairIndex);
876899

877900
return isWaitingLeftForReduce || isWaitingRightForReduce;
878901
}
@@ -930,54 +953,56 @@ EFetchResult DoCalculateWithSpilling(TComputationContext& ctx, NUdf::TUnboxedVal
930953
}
931954

932955
EFetchResult ProcessSpilledData(TComputationContext&, NUdf::TUnboxedValue*const* output) {
933-
while (NextBucketToJoin != GraceJoin::NumberOfBuckets) {
956+
while (SpilledBucketsJoinOrderCurrentIndex != GraceJoin::NumberOfBuckets) {
934957
UpdateSpilling();
935958
if (IsRestoringSpilledBuckets()) return EFetchResult::Yield;
936959

937-
if (LeftPacker->TablePtr->IsSpilledBucketWaitingForExtraction(NextBucketToJoin)) {
938-
LeftPacker->TablePtr->PrepareBucket(NextBucketToJoin);
960+
ui32 nextBucketToJoin = SpilledBucketsJoinOrder[SpilledBucketsJoinOrderCurrentIndex];
961+
962+
if (LeftPacker->TablePtr->IsSpilledBucketWaitingForExtraction(nextBucketToJoin)) {
963+
LeftPacker->TablePtr->PrepareBucket(nextBucketToJoin);
939964
}
940965

941-
if (RightPacker->TablePtr->IsSpilledBucketWaitingForExtraction(NextBucketToJoin)) {
942-
RightPacker->TablePtr->PrepareBucket(NextBucketToJoin);
966+
if (RightPacker->TablePtr->IsSpilledBucketWaitingForExtraction(nextBucketToJoin)) {
967+
RightPacker->TablePtr->PrepareBucket(nextBucketToJoin);
943968
}
944969

945-
if (!LeftPacker->TablePtr->IsBucketInMemory(NextBucketToJoin)) {
946-
LeftPacker->TablePtr->StartLoadingBucket(NextBucketToJoin);
970+
if (!LeftPacker->TablePtr->IsBucketInMemory(nextBucketToJoin)) {
971+
LeftPacker->TablePtr->StartLoadingBucket(nextBucketToJoin);
947972
}
948973

949-
if (!RightPacker->TablePtr->IsBucketInMemory(NextBucketToJoin)) {
950-
RightPacker->TablePtr->StartLoadingBucket(NextBucketToJoin);
974+
if (!RightPacker->TablePtr->IsBucketInMemory(nextBucketToJoin)) {
975+
RightPacker->TablePtr->StartLoadingBucket(nextBucketToJoin);
951976
}
952977

953-
if (LeftPacker->TablePtr->IsBucketInMemory(NextBucketToJoin) && RightPacker->TablePtr->IsBucketInMemory(NextBucketToJoin)) {
978+
if (LeftPacker->TablePtr->IsBucketInMemory(nextBucketToJoin) && RightPacker->TablePtr->IsBucketInMemory(nextBucketToJoin)) {
954979
if (*PartialJoinCompleted) {
955-
while (JoinedTablePtr->NextJoinedData(LeftPacker->JoinTupleData, RightPacker->JoinTupleData, NextBucketToJoin + 1)) {
980+
while (JoinedTablePtr->NextJoinedData(LeftPacker->JoinTupleData, RightPacker->JoinTupleData, nextBucketToJoin + 1)) {
956981
UnpackJoinedData(output);
957982
return EFetchResult::One;
958983
}
959984

960985
LeftPacker->TuplesBatchPacked = 0;
961-
LeftPacker->TablePtr->ClearBucket(NextBucketToJoin); // Clear content of returned bucket
962-
LeftPacker->TablePtr->ShrinkBucket(NextBucketToJoin);
986+
LeftPacker->TablePtr->ClearBucket(nextBucketToJoin); // Clear content of returned bucket
987+
LeftPacker->TablePtr->ShrinkBucket(nextBucketToJoin);
963988

964989
RightPacker->TuplesBatchPacked = 0;
965-
RightPacker->TablePtr->ClearBucket(NextBucketToJoin); // Clear content of returned bucket
966-
RightPacker->TablePtr->ShrinkBucket(NextBucketToJoin);
990+
RightPacker->TablePtr->ClearBucket(nextBucketToJoin); // Clear content of returned bucket
991+
RightPacker->TablePtr->ShrinkBucket(nextBucketToJoin);
967992

968993
JoinedTablePtr->Clear();
969994
JoinedTablePtr->ResetIterator();
970995
*PartialJoinCompleted = false;
971996

972-
NextBucketToJoin++;
997+
SpilledBucketsJoinOrderCurrentIndex++;
973998
} else {
974999
*PartialJoinCompleted = true;
9751000
LeftPacker->StartTime = std::chrono::system_clock::now();
9761001
RightPacker->StartTime = std::chrono::system_clock::now();
9771002
if ( SelfJoinSameKeys_ ) {
978-
JoinedTablePtr->Join(*LeftPacker->TablePtr, *LeftPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows, NextBucketToJoin, NextBucketToJoin+1);
1003+
JoinedTablePtr->Join(*LeftPacker->TablePtr, *LeftPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows, nextBucketToJoin, nextBucketToJoin+1);
9791004
} else {
980-
JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows, NextBucketToJoin, NextBucketToJoin+1);
1005+
JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows, nextBucketToJoin, nextBucketToJoin+1);
9811006
}
9821007

9831008
JoinedTablePtr->ResetIterator();
@@ -1016,9 +1041,9 @@ EFetchResult ProcessSpilledData(TComputationContext&, NUdf::TUnboxedValue*const*
10161041

10171042
bool IsSpillingFinalized = false;
10181043

1019-
ui32 NextBucketToJoin = 0;
1020-
10211044
NYql::NUdf::TCounter CounterOutputRows_;
1045+
ui32 SpilledBucketsJoinOrderCurrentIndex = 0;
1046+
std::vector<ui32> SpilledBucketsJoinOrder;
10221047
};
10231048

10241049
class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWrapper> {

yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.cpp

Lines changed: 13 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -489,7 +489,7 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef
489489
// slotSize, slotIdx and strPos is only for hashtable (table2)
490490
ui64 bloomHits = 0;
491491
ui64 bloomLookups = 0;
492-
492+
493493
for (ui64 keysValSize = headerSize1; it1 != bucket1->KeyIntVals.end(); it1 += keysValSize, ++tuple1Idx ) {
494494

495495
if ( table1HasKeyStringColumns || table1HasKeyIColumns ) {
@@ -630,7 +630,7 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef
630630
HasMoreRightTuples_ = hasMoreRightTuples;
631631

632632
TuplesFound_ += tuplesFound;
633-
633+
634634
}
635635

636636
inline void TTable::GetTupleData(ui32 bucketNum, ui32 tupleId, TupleData & td) {
@@ -772,7 +772,7 @@ inline bool TTable::AddKeysToHashTable(KeysHashTable& t, ui64* keys, NYql::NUdf:
772772
continue;
773773

774774
if (NumberOfKeyIColumns > 0) {
775-
if (!CompareIColumns(
775+
if (!CompareIColumns(
776776
(char *) (slotStringsStart),
777777
(char *) (keys + HeaderSize ),
778778
iColumns,
@@ -903,22 +903,10 @@ ui64 TTable::GetSizeOfBucket(ui64 bucket) const {
903903
+ TableBuckets[bucket].InterfaceOffsets.size() * sizeof(ui32);
904904
}
905905

906-
bool TTable::TryToReduceMemoryAndWait() {
907-
i32 largestBucketIndex = 0;
908-
ui64 largestBucketSize = 0;
909-
for (ui32 bucket = 0; bucket < NumberOfBuckets; ++bucket) {
910-
if (TableBucketsSpillers[bucket].IsProcessingSpilling()) return true;
911-
912-
ui64 bucketSize = GetSizeOfBucket(bucket);
913-
if (bucketSize > largestBucketSize) {
914-
largestBucketSize = bucketSize;
915-
largestBucketIndex = bucket;
916-
}
917-
}
918-
919-
if (largestBucketSize < SpillingSizeLimit/NumberOfBuckets) return false;
920-
if (const auto &tbs = TableBucketsStats[largestBucketIndex]; tbs.HashtableMatches) {
921-
auto &tb = TableBuckets[largestBucketIndex];
906+
bool TTable::TryToReduceMemoryAndWait(ui64 bucket) {
907+
if (GetSizeOfBucket(bucket) < SpillingSizeLimit/NumberOfBuckets) return false;
908+
if (const auto &tbs = TableBucketsStats[bucket]; tbs.HashtableMatches) {
909+
auto &tb = TableBuckets[bucket];
922910

923911
if (tb.JoinSlots.size()) {
924912
const auto slotSize = tbs.SlotSize;
@@ -946,10 +934,10 @@ bool TTable::TryToReduceMemoryAndWait() {
946934
tb.JoinSlots.shrink_to_fit();
947935
}
948936
}
949-
TableBucketsSpillers[largestBucketIndex].SpillBucket(std::move(TableBuckets[largestBucketIndex]));
950-
TableBuckets[largestBucketIndex] = TTableBucket{};
937+
TableBucketsSpillers[bucket].SpillBucket(std::move(TableBuckets[bucket]));
938+
TableBuckets[bucket] = TTableBucket{};
951939

952-
return TableBucketsSpillers[largestBucketIndex].IsProcessingSpilling();
940+
return TableBucketsSpillers[bucket].IsProcessingSpilling();
953941
}
954942

955943
void TTable::UpdateSpilling() {
@@ -987,7 +975,7 @@ void TTable::FinalizeSpilling() {
987975
TableBucketsSpillers[bucket].Finalize();
988976
TableBucketsSpillers[bucket].SpillBucket(std::move(TableBuckets[bucket]));
989977
TableBuckets[bucket] = TTableBucket{};
990-
978+
991979
}
992980
}
993981
}
@@ -1288,15 +1276,15 @@ void TTableBucketSpiller::ProcessBucketRestoration() {
12881276
case ENextVectorToProcess::InterfaceOffsets:
12891277
if (StateUi32Adapter.IsDataReady()) {
12901278
AppendVector(CurrentBucket.InterfaceOffsets, StateUi32Adapter.ExtractVector());
1291-
1279+
12921280
SpilledBucketsCount--;
12931281
if (SpilledBucketsCount == 0) {
12941282
NextVectorToProcess = ENextVectorToProcess::None;
12951283
State = EState::WaitingForExtraction;
12961284
} else {
12971285
NextVectorToProcess = ENextVectorToProcess::KeyAndVals;
12981286
}
1299-
1287+
13001288
break;
13011289
}
13021290

yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.h

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ namespace GraceJoin {
1313
class TTableBucketSpiller;
1414
#define GRACEJOIN_DEBUG DEBUG
1515
#define GRACEJOIN_TRACE TRACE
16-
16+
1717
const ui64 BitsForNumberOfBuckets = 5; // 2^5 = 32
1818
const ui64 BucketsMask = (0x00000001 << BitsForNumberOfBuckets) - 1;
1919
const ui64 NumberOfBuckets = (0x00000001 << BitsForNumberOfBuckets); // Number of hashed keys buckets to distribute incoming tables tuples
@@ -108,7 +108,7 @@ class TBloomfilter {
108108

109109
/*
110110
Table data stored in buckets. Table columns are interpreted either as integers, strings or some interface-based type,
111-
providing IHash, IEquate, IPack and IUnpack functions.
111+
providing IHash, IEquate, IPack and IUnpack functions.
112112
External clients should transform (pack) data into appropriate presentation.
113113
114114
Key columns always first, following int columns, string columns and interface-based columns.
@@ -135,7 +135,7 @@ struct JoinTuplesIds {
135135
// To store keys values when making join only for unique keys (any join attribute)
136136
struct KeysHashTable {
137137
ui64 SlotSize = 0; // Slot size in hash table
138-
ui64 NSlots = 0; // Total number of slots in table
138+
ui64 NSlots = 0; // Total number of slots in table
139139
ui64 FillCount = 0; // Number of ui64 slots which are filled
140140
std::vector<ui64, TMKQLAllocator<ui64>> Table; // Table to store keys data in particular slots
141141
std::vector<ui64, TMKQLAllocator<ui64>> SpillData; // Vector to store long data which cannot be fit in single hash table slot.
@@ -148,7 +148,7 @@ struct TTableBucket {
148148
std::vector<ui32, TMKQLAllocator<ui32>> StringsOffsets; // Vector to store strings values sizes (offsets in StringsValues are calculated) for particular tuple.
149149
std::vector<char, TMKQLAllocator<char>> InterfaceValues; // Vector to store types to work through external-provided IHash, IEquate interfaces
150150
std::vector<ui32, TMKQLAllocator<ui32>> InterfaceOffsets; // Vector to store sizes of columns to work through IHash, IEquate interfaces
151-
std::vector<JoinTuplesIds, TMKQLAllocator<JoinTuplesIds>> JoinIds; // Results of join operations stored as index of tuples in buckets
151+
std::vector<JoinTuplesIds, TMKQLAllocator<JoinTuplesIds>> JoinIds; // Results of join operations stored as index of tuples in buckets
152152
// of two tables with the same number
153153
std::vector<ui32, TMKQLAllocator<ui32>> LeftIds; // Left-side ids missing in other table
154154

@@ -181,7 +181,7 @@ struct TColTypeInterface {
181181
NYql::NUdf::IHash::TPtr HashI = nullptr; // Interface to calculate hash of column value
182182
NYql::NUdf::IEquate::TPtr EquateI = nullptr; // Interface to compare two column values
183183
std::shared_ptr<TValuePacker> Packer; // Class to pack and unpack column values
184-
const THolderFactory& HolderFactory; // To use during unpacking
184+
const THolderFactory& HolderFactory; // To use during unpacking
185185
};
186186

187187
// Class that spills bucket data.
@@ -269,7 +269,7 @@ class TTable {
269269
ui64 NumberOfDataIntColumns = 0; //Number of integer data columns in the Table
270270
ui64 NumberOfDataStringColumns = 0; // Number of strings data columns in the Table
271271
ui64 NumberOfDataIColumns = 0; // Number of interface - provided data columns
272-
272+
273273
TColTypeInterface * ColInterfaces = nullptr; // Array of interfaces to work with corresponding columns data
274274

275275

@@ -285,7 +285,7 @@ class TTable {
285285
ui64 HeaderSize = HashSize + NullsBitmapSize_ + NumberOfKeyIntColumns + NumberOfKeyIColumns + TotalStringsSize; // Header of all tuples size
286286

287287
ui64 BytesInKeyIntColumns = sizeof(ui64) * NumberOfKeyIntColumns;
288-
288+
289289
// Table data is partitioned in buckets based on key value
290290
std::vector<TTableBucket> TableBuckets;
291291
// Statistics for buckets. Total number of tuples inside a single bucket and offsets.
@@ -365,7 +365,7 @@ class TTable {
365365
ui64 GetSizeOfBucket(ui64 bucket) const;
366366

367367
// This functions wind the largest bucket and spills it to the disk.
368-
bool TryToReduceMemoryAndWait();
368+
bool TryToReduceMemoryAndWait(ui64 bucket);
369369

370370
// Update state of spilling. Must be called during each DoCalculate.
371371
void UpdateSpilling();
@@ -406,15 +406,15 @@ class TTable {
406406
// Creates new table with key columns and data columns
407407
TTable(ui64 numberOfKeyIntColumns = 0, ui64 numberOfKeyStringColumns = 0,
408408
ui64 numberOfDataIntColumns = 0, ui64 numberOfDataStringColumns = 0,
409-
ui64 numberOfKeyIColumns = 0, ui64 numberOfDataIColumns = 0,
409+
ui64 numberOfKeyIColumns = 0, ui64 numberOfDataIColumns = 0,
410410
ui64 nullsBitmapSize = 1, TColTypeInterface * colInterfaces = nullptr, bool isAny = false);
411411

412412
enum class EAddTupleResult { Added, Unmatched, AnyMatch };
413413
// Adds new tuple to the table. intColumns, stringColumns - data of columns,
414414
// stringsSizes - sizes of strings columns. Indexes of null-value columns
415415
// in the form of bit array should be first values of intColumns.
416416
EAddTupleResult AddTuple(ui64* intColumns, char** stringColumns, ui32* stringsSizes, NYql::NUdf::TUnboxedValue * iColumns = nullptr, const TTable &other = {});
417-
417+
418418
~TTable();
419419

420420
ui64 InitHashTableCount_ = 0;

0 commit comments

Comments
 (0)