Skip to content

Commit d7c0053

Browse files
committed
fix row size estimation in stream lookup join (#19094)
1 parent b11d285 commit d7c0053

File tree

2 files changed

+24
-9
lines changed

2 files changed

+24
-9
lines changed

ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include <ydb/public/api/protos/ydb_status_codes.pb.h>
1010

1111
#include <yql/essentials/minikql/mkql_node_serialization.h>
12+
#include <ydb/library/yql/dq/runtime/dq_transport.h>
1213

1314
namespace NKikimr {
1415
namespace NKqp {
@@ -57,11 +58,6 @@ std::vector<std::pair<ui64, TOwnedTableRange>> GetRangePartitioning(const TKqpSt
5758
return rangePartition;
5859
}
5960

60-
NScheme::TTypeInfo UnpackTypeInfo(NKikimr::NMiniKQL::TType* type) {
61-
YQL_ENSURE(type);
62-
return NScheme::TypeInfoFromMiniKQLType(type);
63-
}
64-
6561

6662
} // !namespace
6763

@@ -901,10 +897,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
901897

902898
i64 storageReadBytes = 0;
903899

904-
for (size_t i = 0; i < leftRowType->GetMembersCount(); ++i) {
905-
auto columnTypeInfo = UnpackTypeInfo(leftRowType->GetMemberType(i));
906-
leftRowSize += NMiniKQL::GetUnboxedValueSize(leftRowInfo.Row.GetElement(i), columnTypeInfo).AllocatedBytes;
907-
}
900+
leftRowSize = NYql::NDq::TDqDataSerializer::EstimateSize(leftRowInfo.Row, leftRowType);
908901

909902
if (!rightRow.empty()) {
910903
leftRowInfo.RightRowExist = true;

ydb/core/kqp/ut/join/kqp_join_ut.cpp

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,28 @@ Y_UNIT_TEST_SUITE(KqpJoin) {
275275
}
276276
}
277277

278+
Y_UNIT_TEST_TWIN(IndexLoookupJoinStructJoin, StreamLookupJoin) {
279+
NKikimrConfig::TAppConfig appConfig;
280+
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(StreamLookupJoin);
281+
auto settings = TKikimrSettings().SetAppConfig(appConfig);
282+
TKikimrRunner kikimr(settings);
283+
auto db = kikimr.GetTableClient();
284+
auto session = db.CreateSession().GetValueSync().GetSession();
285+
286+
CreateSampleTables(session);
287+
288+
auto result = session.ExecuteDataQuery(Q_(R"(
289+
$a = AsList(AsStruct(AsStruct("Key" as Key) as join_info), AsStruct(AsStruct("Name1" as Key) as join_info));
290+
SELECT a.join_info.Key as Key, b.Value as Value from AS_TABLE($a) as a
291+
LEFT JOIN `/Root/Join1_3` as b
292+
ON a.join_info.Key = b.Key
293+
)"), TTxControl::BeginTx().CommitTx()).ExtractValueSync();
294+
UNIT_ASSERT(result.IsSuccess());
295+
296+
CompareYson(R"([["Key";#];["Name1";[1001]]])",
297+
FormatResultSetYson(result.GetResultSet(0)));
298+
}
299+
278300
Y_UNIT_TEST(IdxLookupPartialLeftPredicate) {
279301
TKikimrSettings settings;
280302
TKikimrRunner kikimr(settings);

0 commit comments

Comments
 (0)