Skip to content

Commit 76514f2

Browse files
authored
fix row size estimation in stream lookup join (#19094) (#19114)
2 parents 003c739 + d7c0053 commit 76514f2

File tree

2 files changed

+24
-9
lines changed

2 files changed

+24
-9
lines changed

ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include <ydb/public/api/protos/ydb_status_codes.pb.h>
1010

1111
#include <yql/essentials/minikql/mkql_node_serialization.h>
12+
#include <ydb/library/yql/dq/runtime/dq_transport.h>
1213

1314
namespace NKikimr {
1415
namespace NKqp {
@@ -59,11 +60,6 @@ std::vector<std::pair<ui64, TOwnedTableRange>> GetRangePartitioning(const TKqpSt
5960
return rangePartition;
6061
}
6162

62-
NScheme::TTypeInfo UnpackTypeInfo(NKikimr::NMiniKQL::TType* type) {
63-
YQL_ENSURE(type);
64-
return NScheme::TypeInfoFromMiniKQLType(type);
65-
}
66-
6763

6864
} // !namespace
6965

@@ -909,10 +905,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
909905

910906
i64 storageReadBytes = 0;
911907

912-
for (size_t i = 0; i < leftRowType->GetMembersCount(); ++i) {
913-
auto columnTypeInfo = UnpackTypeInfo(leftRowType->GetMemberType(i));
914-
leftRowSize += NMiniKQL::GetUnboxedValueSize(leftRowInfo.Row.GetElement(i), columnTypeInfo).AllocatedBytes;
915-
}
908+
leftRowSize = NYql::NDq::TDqDataSerializer::EstimateSize(leftRowInfo.Row, leftRowType);
916909

917910
if (!rightRow.empty()) {
918911
leftRowInfo.RightRowExist = true;

ydb/core/kqp/ut/join/kqp_join_ut.cpp

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,28 @@ Y_UNIT_TEST_SUITE(KqpJoin) {
275275
}
276276
}
277277

278+
Y_UNIT_TEST_TWIN(IndexLoookupJoinStructJoin, StreamLookupJoin) {
279+
NKikimrConfig::TAppConfig appConfig;
280+
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(StreamLookupJoin);
281+
auto settings = TKikimrSettings().SetAppConfig(appConfig);
282+
TKikimrRunner kikimr(settings);
283+
auto db = kikimr.GetTableClient();
284+
auto session = db.CreateSession().GetValueSync().GetSession();
285+
286+
CreateSampleTables(session);
287+
288+
auto result = session.ExecuteDataQuery(Q_(R"(
289+
$a = AsList(AsStruct(AsStruct("Key" as Key) as join_info), AsStruct(AsStruct("Name1" as Key) as join_info));
290+
SELECT a.join_info.Key as Key, b.Value as Value from AS_TABLE($a) as a
291+
LEFT JOIN `/Root/Join1_3` as b
292+
ON a.join_info.Key = b.Key
293+
)"), TTxControl::BeginTx().CommitTx()).ExtractValueSync();
294+
UNIT_ASSERT(result.IsSuccess());
295+
296+
CompareYson(R"([["Key";#];["Name1";[1001]]])",
297+
FormatResultSetYson(result.GetResultSet(0)));
298+
}
299+
278300
Y_UNIT_TEST(IdxLookupPartialLeftPredicate) {
279301
TKikimrSettings settings;
280302
TKikimrRunner kikimr(settings);

0 commit comments

Comments
 (0)