Skip to content

Commit 98a74be

Browse files
authored
grace_join: fix batch mode performance (#6705)
1 parent c02bfea commit 98a74be

File tree

3 files changed

+294
-76
lines changed

3 files changed

+294
-76
lines changed

ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp

Lines changed: 54 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,9 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef
321321
if( hasMoreRightTuples )
322322
RightTableBatch_ = true;
323323

324+
auto table1Batch = LeftTableBatch_;
325+
auto table2Batch = RightTableBatch_;
326+
324327
JoinTable1 = &t1;
325328
JoinTable2 = &t2;
326329

@@ -333,13 +336,11 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef
333336

334337
if ( JoinKind == EJoinKind::Right || JoinKind == EJoinKind::RightOnly || JoinKind == EJoinKind::RightSemi ) {
335338
std::swap(JoinTable1, JoinTable2);
339+
std::swap(table1Batch, table2Batch);
336340
}
337341

338342
ui64 tuplesFound = 0;
339343

340-
std::vector<ui64, TMKQLAllocator<ui64, EMemorySubPool::Temporary>> joinSlots;
341-
ui64 reservedSize = 6 * (DefaultTupleBytes * DefaultTuplesNum) / sizeof(ui64);
342-
joinSlots.reserve( reservedSize );
343344
std::vector<JoinTuplesIds, TMKQLAllocator<JoinTuplesIds, EMemorySubPool::Temporary>> joinResults;
344345

345346

@@ -361,9 +362,10 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef
361362
bool table2HasKeyStringColumns = (JoinTable2->NumberOfKeyStringColumns != 0);
362363
bool table1HasKeyIColumns = (JoinTable1->NumberOfKeyIColumns != 0);
363364
bool table2HasKeyIColumns = (JoinTable2->NumberOfKeyIColumns != 0);
365+
bool swapTables = tuplesNum2 > tuplesNum1 && !table1Batch || table2Batch;
364366

365367

366-
if (tuplesNum2 > tuplesNum1) {
368+
if (swapTables) {
367369
std::swap(bucket1, bucket2);
368370
std::swap(headerSize1, headerSize2);
369371
std::swap(nullsSize1, nullsSize2);
@@ -373,7 +375,7 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef
373375
std::swap(tuplesNum1, tuplesNum2);
374376
}
375377

376-
if (tuplesNum2 == 0)
378+
if (tuplesNum2 == 0 || tuplesNum1 == 0)
377379
continue;
378380

379381
ui64 slotSize = headerSize2 + 1;
@@ -384,10 +386,15 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef
384386
slotSize = slotSize + avgStringsSize;
385387
}
386388

387-
388-
ui64 nSlots = (3 * tuplesNum2 + 1) | 1;
389-
joinSlots.clear();
390-
joinSlots.resize(nSlots*slotSize, 0);
389+
ui64 &nSlots = bucket2->NSlots;
390+
auto &joinSlots = bucket2->JoinSlots;
391+
bool initHashTable = false;
392+
393+
if (!nSlots) {
394+
nSlots = (3 * tuplesNum2 + 1) | 1;
395+
joinSlots.resize(nSlots*slotSize, 0);
396+
initHashTable = true;
397+
}
391398

392399
auto firstSlot = [begin = joinSlots.begin(), slotSize, nSlots](auto hash) {
393400
ui64 slotNum = hash % nSlots;
@@ -401,35 +408,37 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef
401408
return it;
402409
};
403410

404-
ui32 tuple2Idx = 0;
405-
auto it2 = bucket2->KeyIntVals.begin();
406-
for (ui64 keysValSize = headerSize2; it2 != bucket2->KeyIntVals.end(); it2 += keysValSize, ++tuple2Idx) {
407-
if ( table2HasKeyStringColumns || table2HasKeyIColumns) {
408-
keysValSize = headerSize2 + *(it2 + headerSize2 - 1) ;
409-
}
411+
if (initHashTable) {
412+
ui32 tuple2Idx = 0;
413+
auto it2 = bucket2->KeyIntVals.begin();
414+
for (ui64 keysValSize = headerSize2; it2 != bucket2->KeyIntVals.end(); it2 += keysValSize, ++tuple2Idx) {
415+
if ( table2HasKeyStringColumns || table2HasKeyIColumns) {
416+
keysValSize = headerSize2 + *(it2 + headerSize2 - 1) ;
417+
}
410418

411-
ui64 hash = *it2;
412-
ui64 * nullsPtr = it2+1;
413-
if (HasBitSet(nullsPtr, 1))
414-
continue;
419+
ui64 hash = *it2;
420+
ui64 * nullsPtr = it2+1;
421+
if (HasBitSet(nullsPtr, 1))
422+
continue;
415423

416-
auto slotIt = firstSlot(hash);
424+
auto slotIt = firstSlot(hash);
417425

418-
for (; *slotIt != 0; slotIt = nextSlot(slotIt))
419-
{
420-
}
426+
for (; *slotIt != 0; slotIt = nextSlot(slotIt))
427+
{
428+
}
421429

422-
if (keysValSize <= slotSize - 1)
423-
{
424-
std::copy_n(it2, keysValSize, slotIt);
425-
}
426-
else
427-
{
428-
std::copy_n(it2, headerSize2, slotIt);
430+
if (keysValSize <= slotSize - 1)
431+
{
432+
std::copy_n(it2, keysValSize, slotIt);
433+
}
434+
else
435+
{
436+
std::copy_n(it2, headerSize2, slotIt);
429437

430-
*(slotIt + headerSize2) = it2 + headerSize2 - bucket2->KeyIntVals.begin();
438+
*(slotIt + headerSize2) = it2 + headerSize2 - bucket2->KeyIntVals.begin();
439+
}
440+
slotIt[slotSize - 1] = tuple2Idx;
431441
}
432-
slotIt[slotSize - 1] = tuple2Idx;
433442
}
434443

435444

@@ -462,7 +471,7 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef
462471
if (*slotIt != hash)
463472
continue;
464473

465-
tuple2Idx = slotIt[slotSize - 1];
474+
auto tuple2Idx = slotIt[slotSize - 1];
466475

467476
if (table1HasKeyIColumns || !(keysValSize - nullsSize1 <= slotSize - 1 - nullsSize2)) {
468477
// 2nd condition cannot be true unless HasKeyStringColumns or HasKeyIColumns, hence size at the end of header is present
@@ -505,16 +514,18 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef
505514

506515
tuplesFound++;
507516
JoinTuplesIds joinIds;
508-
joinIds.id1 = tuple1Idx;
509-
joinIds.id2 = tuple2Idx;
510-
if (JoinTable2->TableBucketsStats[bucket].TuplesNum > JoinTable1->TableBucketsStats[bucket].TuplesNum)
511-
{
512-
std::swap(joinIds.id1, joinIds.id2);
513-
}
517+
joinIds.id1 = swapTables ? tuple2Idx : tuple1Idx;
518+
joinIds.id2 = swapTables ? tuple1Idx : tuple2Idx;
514519
joinResults.emplace_back(joinIds);
515520
}
516521
}
517522

523+
if (!hasMoreLeftTuples && !hasMoreRightTuples) {
524+
joinSlots.clear();
525+
joinSlots.shrink_to_fit();
526+
nSlots = 0;
527+
}
528+
518529
std::sort(joinResults.begin(), joinResults.end(), [](JoinTuplesIds a, JoinTuplesIds b)
519530
{
520531
if (a.id1 < b.id1) return true;
@@ -560,7 +571,6 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef
560571
}
561572

562573
}
563-
564574
}
565575

566576
HasMoreLeftTuples_ = hasMoreLeftTuples;
@@ -1111,6 +1121,8 @@ void TTable::ClearBucket(ui64 bucket) {
11111121
tb.InterfaceOffsets.clear();
11121122
tb.JoinIds.clear();
11131123
tb.RightIds.clear();
1124+
tb.JoinSlots.clear();
1125+
tb.NSlots = 0;
11141126

11151127
TTableBucketStats & tbs = TableBucketsStats[bucket];
11161128
tbs.TuplesNum = 0;
@@ -1128,6 +1140,7 @@ void TTable::ShrinkBucket(ui64 bucket) {
11281140
tb.InterfaceOffsets.shrink_to_fit();
11291141
tb.JoinIds.shrink_to_fit();
11301142
tb.RightIds.shrink_to_fit();
1143+
tb.JoinSlots.shrink_to_fit();
11311144
}
11321145

11331146
void TTable::InitializeBucketSpillers(ISpiller::TPtr spiller) {
@@ -1138,6 +1151,7 @@ void TTable::InitializeBucketSpillers(ISpiller::TPtr spiller) {
11381151

11391152
ui64 TTable::GetSizeOfBucket(ui64 bucket) const {
11401153
return TableBuckets[bucket].KeyIntVals.size() * sizeof(ui64)
1154+
+ TableBuckets[bucket].JoinSlots.size() * sizeof(ui64)
11411155
+ TableBuckets[bucket].DataIntVals.size() * sizeof(ui64)
11421156
+ TableBuckets[bucket].StringsValues.size()
11431157
+ TableBuckets[bucket].StringsOffsets.size() * sizeof(ui32)

ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ struct TTableBucket {
7171
std::set<ui32> AllLeftMatchedIds; // All row ids of left join table which have matching rows in right table. To process streaming join mode.
7272
std::set<ui32> AllRightMatchedIds; // All row ids of right join table which matching rows in left table. To process streaming join mode.
7373

74+
std::vector<ui64, TMKQLAllocator<ui64>> JoinSlots; // Hashtable
75+
ui64 NSlots = 0; // Hashtable
7476
};
7577

7678
struct TTableBucketStats {

0 commit comments

Comments
 (0)