Skip to content

Commit f073686

Browse files
author
robot-ydb-importer
committed
YDB Import 765
commit_hash:d9b36270b6cb6510b9a1f144ff3a9777112df439
1 parent 21d64de commit f073686

File tree

7 files changed

+72
-167
lines changed

7 files changed

+72
-167
lines changed

yql/essentials/minikql/comp_nodes/mkql_grace_join.cpp

Lines changed: 17 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -80,12 +80,9 @@ struct TGraceJoinPacker {
8080
ui64 DataIColumnsNum = TotalIColumnsNum - KeyIColumnsNum;
8181
std::vector<GraceJoin::TColTypeInterface> ColumnInterfaces;
8282
bool IsAny; // Flag to support any join attribute
83-
const NUdf::TLoggerPtr Logger; // Logger instance
84-
const NUdf::TLogComponentId LogComponent; // Id of current component for logging. GracejJoin here
8583
inline void Pack() ; // Packs new tuple from TupleHolder and TuplePtrs to TupleIntVals, TupleStrSizes, TupleStrings
8684
inline void UnPack(); // Unpacks packed values from TupleIntVals, TupleStrSizes, TupleStrings into TupleHolder and TuplePtrs
87-
TGraceJoinPacker(const std::vector<TType*>& columnTypes, const std::vector<ui32>& keyColumns,
88-
const THolderFactory& holderFactory, bool isAny, NUdf::TLoggerPtr logger, NUdf::TLogComponentId logComponent);
85+
TGraceJoinPacker(const std::vector<TType*>& columnTypes, const std::vector<ui32>& keyColumns, const THolderFactory& holderFactory, bool isAny);
8986
};
9087

9188

@@ -433,13 +430,10 @@ void TGraceJoinPacker::UnPack() {
433430
}
434431

435432

436-
TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, const std::vector<ui32>& keyColumns,
437-
const THolderFactory& holderFactory, bool isAny, NUdf::TLoggerPtr logger = nullptr, NUdf::TLogComponentId logComponent = 0) :
433+
TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, const std::vector<ui32>& keyColumns, const THolderFactory& holderFactory, bool isAny) :
438434
ColumnTypes(columnTypes)
439435
, HolderFactory(holderFactory)
440-
, IsAny(isAny)
441-
, Logger(logger)
442-
, LogComponent(logComponent) {
436+
, IsAny(isAny) {
443437

444438
ui64 nColumns = ColumnTypes.size();
445439
ui64 nKeyColumns = keyColumns.size();
@@ -556,9 +550,8 @@ TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, con
556550
}
557551

558552
TablePtr = std::make_unique<GraceJoin::TTable>(
559-
Logger, LogComponent,
560553
PackedKeyIntColumnsNum, KeyStrColumnsNum, PackedDataIntColumnsNum,
561-
DataStrColumnsNum, KeyIColumnsNum, DataIColumnsNum, NullsBitmapSize, cti_p, IsAny);
554+
DataStrColumnsNum, KeyIColumnsNum, DataIColumnsNum, NullsBitmapSize, cti_p, IsAny );
562555

563556
}
564557

@@ -576,7 +569,7 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
576569
EJoinKind joinKind, EAnyJoinSettings anyJoinSettings, const std::vector<ui32>& leftKeyColumns, const std::vector<ui32>& rightKeyColumns,
577570
const std::vector<ui32>& leftRenames, const std::vector<ui32>& rightRenames,
578571
const std::vector<TType*>& leftColumnsTypes, const std::vector<TType*>& rightColumnsTypes, TComputationContext& ctx,
579-
const bool isSelfJoin, bool isSpillingAllowed, NUdf::TLoggerPtr logger, NUdf::TLogComponentId logComponent)
572+
const bool isSelfJoin, bool isSpillingAllowed)
580573
: TBase(memInfo)
581574
, FlowLeft(flowLeft)
582575
, FlowRight(flowRight)
@@ -585,20 +578,18 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
585578
, RightKeyColumns(rightKeyColumns)
586579
, LeftRenames(leftRenames)
587580
, RightRenames(rightRenames)
588-
, LeftPacker(std::make_unique<TGraceJoinPacker>(leftColumnsTypes, leftKeyColumns, ctx.HolderFactory, (anyJoinSettings == EAnyJoinSettings::Left || anyJoinSettings == EAnyJoinSettings::Both || joinKind == EJoinKind::RightSemi || joinKind == EJoinKind::RightOnly), logger, logComponent))
589-
, RightPacker(std::make_unique<TGraceJoinPacker>(rightColumnsTypes, rightKeyColumns, ctx.HolderFactory, (anyJoinSettings == EAnyJoinSettings::Right || anyJoinSettings == EAnyJoinSettings::Both || joinKind == EJoinKind::LeftSemi || joinKind == EJoinKind::LeftOnly), logger, logComponent))
590-
, JoinedTablePtr(std::make_unique<GraceJoin::TTable>(logger, logComponent))
581+
, LeftPacker(std::make_unique<TGraceJoinPacker>(leftColumnsTypes, leftKeyColumns, ctx.HolderFactory, (anyJoinSettings == EAnyJoinSettings::Left || anyJoinSettings == EAnyJoinSettings::Both || joinKind == EJoinKind::RightSemi || joinKind == EJoinKind::RightOnly)))
582+
, RightPacker(std::make_unique<TGraceJoinPacker>(rightColumnsTypes, rightKeyColumns, ctx.HolderFactory, (anyJoinSettings == EAnyJoinSettings::Right || anyJoinSettings == EAnyJoinSettings::Both || joinKind == EJoinKind::LeftSemi || joinKind == EJoinKind::LeftOnly)))
583+
, JoinedTablePtr(std::make_unique<GraceJoin::TTable>())
591584
, JoinCompleted(std::make_unique<bool>(false))
592585
, PartialJoinCompleted(std::make_unique<bool>(false))
593586
, HaveMoreLeftRows(std::make_unique<bool>(true))
594587
, HaveMoreRightRows(std::make_unique<bool>(true))
595588
, IsSelfJoin_(isSelfJoin)
596589
, SelfJoinSameKeys_(isSelfJoin && (leftKeyColumns == rightKeyColumns))
597590
, IsSpillingAllowed(isSpillingAllowed)
598-
, Logger(logger)
599-
, LogComponent(logComponent)
600591
{
601-
UDF_LOG(Logger, LogComponent, GRACEJOIN_DEBUG, TStringBuilder() << (const void *)&*JoinedTablePtr << "# AnyJoinSettings=" << (int)anyJoinSettings << " JoinKind=" << (int)joinKind);
592+
YQL_LOG(GRACEJOIN_DEBUG) << (const void *)&*JoinedTablePtr << "# AnyJoinSettings=" << (int)anyJoinSettings << " JoinKind=" << (int)joinKind;
602593
if (IsSelfJoin_) {
603594
LeftPacker->BatchSize = std::numeric_limits<ui64>::max();
604595
RightPacker->BatchSize = std::numeric_limits<ui64>::max();
@@ -655,23 +646,20 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
655646
LogMemoryUsage();
656647
switch(mode) {
657648
case EOperatingMode::InMemory: {
658-
UDF_LOG(Logger, LogComponent, NUdf::ELogLevel::Info, TStringBuilder()
659-
<< (const void *)&*JoinedTablePtr << "# switching Memory mode to InMemory");
649+
YQL_LOG(INFO) << (const void *)&*JoinedTablePtr << "# switching Memory mode to InMemory";
660650
MKQL_ENSURE(false, "Internal logic error");
661651
break;
662652
}
663653
case EOperatingMode::Spilling: {
664-
UDF_LOG(Logger, LogComponent, NUdf::ELogLevel::Info, TStringBuilder()
665-
<< (const void *)&*JoinedTablePtr << "# switching Memory mode to Spilling");
654+
YQL_LOG(INFO) << (const void *)&*JoinedTablePtr << "# switching Memory mode to Spilling";
666655
MKQL_ENSURE(EOperatingMode::InMemory == Mode, "Internal logic error");
667656
auto spiller = ctx.SpillerFactory->CreateSpiller();
668657
RightPacker->TablePtr->InitializeBucketSpillers(spiller);
669658
LeftPacker->TablePtr->InitializeBucketSpillers(spiller);
670659
break;
671660
}
672661
case EOperatingMode::ProcessSpilled: {
673-
UDF_LOG(Logger, LogComponent, NUdf::ELogLevel::Info, TStringBuilder()
674-
<< (const void *)&*JoinedTablePtr << "# switching Memory mode to ProcessSpilled");
662+
YQL_LOG(INFO) << (const void *)&*JoinedTablePtr << "# switching Memory mode to ProcessSpilled";
675663
SpilledBucketsJoinOrder.reserve(GraceJoin::NumberOfBuckets);
676664
for (ui32 i = 0; i < GraceJoin::NumberOfBuckets; ++i) SpilledBucketsJoinOrder.push_back(i);
677665

@@ -808,11 +796,6 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
808796
}
809797

810798
void LogMemoryUsage() const {
811-
const auto memoryUsageLogLevel = NUdf::ELogLevel::Info;
812-
if (!Logger->IsActive(LogComponent, memoryUsageLogLevel)) {
813-
return;
814-
}
815-
816799
const auto used = TlsAllocState->GetUsed();
817800
const auto limit = TlsAllocState->GetLimit();
818801
TStringBuilder logmsg;
@@ -822,7 +805,7 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
822805
}
823806
logmsg << (used/1_MB) << "MB/" << (limit/1_MB) << "MB";
824807

825-
UDF_LOG(Logger, LogComponent, memoryUsageLogLevel, logmsg);
808+
YQL_LOG(INFO) << logmsg;
826809
}
827810

828811
EFetchResult DoCalculateInMemory(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) {
@@ -874,10 +857,11 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
874857
(!*HaveMoreRightRows && (!*HaveMoreLeftRows || LeftPacker->TuplesBatchPacked >= LeftPacker->BatchSize )) ||
875858
(!*HaveMoreLeftRows && RightPacker->TuplesBatchPacked >= RightPacker->BatchSize))) {
876859

877-
UDF_LOG(Logger, LogComponent, GRACEJOIN_TRACE, TStringBuilder()
860+
YQL_LOG(GRACEJOIN_TRACE)
878861
<< (const void *)&*JoinedTablePtr << '#'
879862
<< " HaveLeft " << *HaveMoreLeftRows << " LeftPacked " << LeftPacker->TuplesBatchPacked << " LeftBatch " << LeftPacker->BatchSize
880-
<< " HaveRight " << *HaveMoreRightRows << " RightPacked " << RightPacker->TuplesBatchPacked << " RightBatch " << RightPacker->BatchSize);
863+
<< " HaveRight " << *HaveMoreRightRows << " RightPacked " << RightPacker->TuplesBatchPacked << " RightBatch " << RightPacker->BatchSize
864+
;
881865

882866
auto& leftTable = *LeftPacker->TablePtr;
883867
auto& rightTable = SelfJoinSameKeys_ ? *LeftPacker->TablePtr : *RightPacker->TablePtr;
@@ -1064,9 +1048,6 @@ EFetchResult ProcessSpilledData(TComputationContext&, NUdf::TUnboxedValue*const*
10641048
NYql::NUdf::TCounter CounterOutputRows_;
10651049
ui32 SpilledBucketsJoinOrderCurrentIndex = 0;
10661050
std::vector<ui32> SpilledBucketsJoinOrder;
1067-
1068-
const NUdf::TLoggerPtr Logger;
1069-
const NUdf::TLogComponentId LogComponent;
10701051
};
10711052

10721053
class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWrapper> {
@@ -1186,14 +1167,10 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWr
11861167
}
11871168

11881169
void MakeSpillingSupportState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
1189-
NYql::NUdf::TLoggerPtr logger = ctx.MakeLogger();
1190-
NYql::NUdf::TLogComponentId logComponent = logger->RegisterComponent("GraceJoin");
1191-
UDF_LOG(logger, logComponent, NUdf::ELogLevel::Debug, TStringBuilder() << "State initialized");
1192-
11931170
state = ctx.HolderFactory.Create<TGraceJoinSpillingSupportState>(
11941171
FlowLeft, FlowRight, JoinKind, AnyJoinSettings_, LeftKeyColumns, RightKeyColumns,
11951172
LeftRenames, RightRenames, LeftColumnsTypes, RightColumnsTypes,
1196-
ctx, IsSelfJoin_, IsSpillingAllowed, logger, logComponent);
1173+
ctx, IsSelfJoin_, IsSpillingAllowed);
11971174
}
11981175

11991176
IComputationWideFlowNode *const FlowLeft;

yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.cpp

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,6 @@ bool TTable::TryToPreallocateMemoryForJoin(TTable & t1, TTable & t2, EJoinKind /
368368
}
369369

370370
if (wasException || TlsAllocState->IsMemoryYellowZoneEnabled()) {
371-
UDF_LOG(Logger_, LogComponent_, NUdf::ELogLevel::Debug, TStringBuilder() << "Preallocation failed. WasException: " << wasException);
372371
for (ui64 i = 0; i < bucket; ++i) {
373372
auto& b1 = t1.TableBuckets[i];
374373
b1.JoinSlots.resize(0);
@@ -674,7 +673,7 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef
674673
BloomHits_ += bloomHits;
675674
BloomLookups_ += bloomLookups;
676675

677-
UDF_LOG(Logger_, LogComponent_, GRACEJOIN_TRACE, TStringBuilder()
676+
YQL_LOG(GRACEJOIN_TRACE)
678677
<< (const void *)this << '#'
679678
<< bucket
680679
<< " Table1 " << JoinTable1->TableBucketsStats[bucket].TuplesNum
@@ -686,7 +685,7 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef
686685
<< " joinKind " << (int)JoinKind
687686
<< " swapTables " << swapTables
688687
<< " initHashTable " << initHashTable
689-
);
688+
;
690689
}
691690

692691
HasMoreLeftTuples_ = hasMoreLeftTuples;
@@ -1063,12 +1062,10 @@ void TTable::PrepareBucket(ui64 bucket) {
10631062
}
10641063

10651064
// Creates new table with key columns and data columns
1066-
TTable::TTable( NUdf::TLoggerPtr logger, NUdf::TLogComponentId logComponent,
1067-
ui64 numberOfKeyIntColumns, ui64 numberOfKeyStringColumns,
1065+
TTable::TTable( ui64 numberOfKeyIntColumns, ui64 numberOfKeyStringColumns,
10681066
ui64 numberOfDataIntColumns, ui64 numberOfDataStringColumns,
10691067
ui64 numberOfKeyIColumns, ui64 numberOfDataIColumns,
1070-
ui64 nullsBitmapSize, TColTypeInterface * colInterfaces,
1071-
bool isAny) :
1068+
ui64 nullsBitmapSize, TColTypeInterface * colInterfaces, bool isAny ) :
10721069

10731070
NumberOfKeyIntColumns(numberOfKeyIntColumns),
10741071
NumberOfKeyStringColumns(numberOfKeyStringColumns),
@@ -1078,9 +1075,7 @@ TTable::TTable( NUdf::TLoggerPtr logger, NUdf::TLogComponentId logComponent,
10781075
NumberOfDataIColumns(numberOfDataIColumns),
10791076
ColInterfaces(colInterfaces),
10801077
NullsBitmapSize_(nullsBitmapSize),
1081-
IsAny_(isAny),
1082-
Logger_(logger),
1083-
LogComponent_(logComponent) {
1078+
IsAny_(isAny) {
10841079

10851080
NumberOfKeyColumns = NumberOfKeyIntColumns + NumberOfKeyStringColumns + NumberOfKeyIColumns;
10861081
NumberOfDataColumns = NumberOfDataIntColumns + NumberOfDataStringColumns + NumberOfDataIColumns;
@@ -1119,21 +1114,16 @@ TTable::TTable( NUdf::TLoggerPtr logger, NUdf::TLogComponentId logComponent,
11191114
}
11201115

11211116
TTable::~TTable() {
1122-
UDF_LOG_IF(InitHashTableCount_, Logger_, LogComponent_, GRACEJOIN_DEBUG, TStringBuilder()
1117+
YQL_LOG_IF(GRACEJOIN_DEBUG, InitHashTableCount_)
11231118
<< (const void *)this << '#' << "InitHashTableCount " << InitHashTableCount_
11241119
<< " BloomLookups " << BloomLookups_ << " BloomHits " << BloomHits_ << " BloomFalsePositives " << BloomFalsePositives_
11251120
<< " HashLookups " << HashLookups_ << " HashChainTraversal " << HashO1Iterations_/(double)HashLookups_ << " HashSlotOperations " << HashSlotIterations_/(double)HashLookups_
11261121
<< " Table1 " << JoinTable1Total_ << " Table2 " << JoinTable2Total_ << " TuplesFound " << TuplesFound_
1127-
);
1128-
1129-
UDF_LOG_IF(JoinTable1 && JoinTable1->AnyFiltered_, Logger_, LogComponent_, GRACEJOIN_DEBUG, TStringBuilder()
1130-
<< (const void *)this << '#' << "L AnyFiltered " << JoinTable1->AnyFiltered_);
1131-
UDF_LOG_IF(JoinTable1 && JoinTable1->BloomLookups_, Logger_, LogComponent_, GRACEJOIN_DEBUG, TStringBuilder()
1132-
<< (const void *)this << '#' << "L BloomLookups " << JoinTable1->BloomLookups_ << " BloomHits " << JoinTable1->BloomHits_);
1133-
UDF_LOG_IF(JoinTable2 && JoinTable2->AnyFiltered_, Logger_, LogComponent_, GRACEJOIN_DEBUG, TStringBuilder()
1134-
<< (const void *)this << '#' << "R AnyFiltered " << JoinTable2->AnyFiltered_);
1135-
UDF_LOG_IF(JoinTable2 && JoinTable2->BloomLookups_, Logger_, LogComponent_, GRACEJOIN_DEBUG, TStringBuilder()
1136-
<< (const void *)this << '#' << "R BloomLookups " << JoinTable2->BloomLookups_ << " BloomHits " << JoinTable2->BloomHits_);
1122+
;
1123+
YQL_LOG_IF(GRACEJOIN_DEBUG, JoinTable1 && JoinTable1->AnyFiltered_) << (const void *)this << '#' << "L AnyFiltered " << JoinTable1->AnyFiltered_;
1124+
YQL_LOG_IF(GRACEJOIN_DEBUG, JoinTable1 && JoinTable1->BloomLookups_) << (const void *)this << '#' << "L BloomLookups " << JoinTable1->BloomLookups_ << " BloomHits " << JoinTable1->BloomHits_;
1125+
YQL_LOG_IF(GRACEJOIN_DEBUG, JoinTable2 && JoinTable2->AnyFiltered_) << (const void *)this << '#' << "R AnyFiltered " << JoinTable2->AnyFiltered_;
1126+
YQL_LOG_IF(GRACEJOIN_DEBUG, JoinTable2 && JoinTable2->BloomLookups_) << (const void *)this << '#' << "R BloomLookups " << JoinTable2->BloomLookups_ << " BloomHits " << JoinTable2->BloomHits_;
11371127
};
11381128

11391129
TTableBucketSpiller::TTableBucketSpiller(ISpiller::TPtr spiller, size_t sizeLimit)

yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.h

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ namespace NMiniKQL {
1111
namespace GraceJoin {
1212

1313
class TTableBucketSpiller;
14-
#define GRACEJOIN_DEBUG NUdf::ELogLevel::Debug
15-
#define GRACEJOIN_TRACE NUdf::ELogLevel::Trace
14+
#define GRACEJOIN_DEBUG DEBUG
15+
#define GRACEJOIN_TRACE TRACE
1616

1717
const ui64 BitsForNumberOfBuckets = 6; // 2^6 = 64
1818
const ui64 BucketsMask = (0x00000001 << BitsForNumberOfBuckets) - 1;
@@ -348,9 +348,6 @@ class TTable {
348348

349349
ui64 TuplesFound_ = 0; // Total number of matching keys found during join
350350

351-
const NUdf::TLoggerPtr Logger_ = nullptr; // Logger instance
352-
const NUdf::TLogComponentId LogComponent_ = 0; // Unique component id. GraceJoin here.
353-
354351
public:
355352

356353

@@ -420,12 +417,10 @@ class TTable {
420417
void Clear();
421418

422419
// Creates new table with key columns and data columns
423-
TTable(NUdf::TLoggerPtr logger = nullptr, NUdf::TLogComponentId logComponent = 0,
424-
ui64 numberOfKeyIntColumns = 0, ui64 numberOfKeyStringColumns = 0,
425-
ui64 numberOfDataIntColumns = 0, ui64 numberOfDataStringColumns = 0,
426-
ui64 numberOfKeyIColumns = 0, ui64 numberOfDataIColumns = 0,
427-
ui64 nullsBitmapSize = 1, TColTypeInterface * colInterfaces = nullptr,
428-
bool isAny = false);
420+
TTable(ui64 numberOfKeyIntColumns = 0, ui64 numberOfKeyStringColumns = 0,
421+
ui64 numberOfDataIntColumns = 0, ui64 numberOfDataStringColumns = 0,
422+
ui64 numberOfKeyIColumns = 0, ui64 numberOfDataIColumns = 0,
423+
ui64 nullsBitmapSize = 1, TColTypeInterface * colInterfaces = nullptr, bool isAny = false);
429424

430425
enum class EAddTupleResult { Added, Unmatched, AnyMatch };
431426
// Adds new tuple to the table. intColumns, stringColumns - data of columns,

0 commit comments

Comments
 (0)