Skip to content

Commit 2264103

Browse files
committed
Fixed pq rd source types
1 parent 851dc32 commit 2264103

File tree

4 files changed

+50
-23
lines changed

4 files changed

+50
-23
lines changed

ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "yql_pq_provider_impl.h"
2+
#include "yql_pq_helpers.h"
23

34
#include <yql/essentials/core/expr_nodes/yql_expr_nodes.h>
45
#include <ydb/library/yql/providers/pq/expr_nodes/yql_pq_expr_nodes.h>
@@ -7,6 +8,7 @@
78
#include <yql/essentials/providers/common/provider/yql_provider_names.h>
89
#include <ydb/library/yql/providers/common/pushdown/type_ann.h>
910
#include <ydb/library/yql/providers/pq/common/pq_meta_fields.h>
11+
#include <ydb/library/yql/providers/pq/common/yql_names.h>
1012
#include <yql/essentials/providers/common/provider/yql_data_provider_impl.h>
1113

1214
#include <yql/essentials/utils/log/log.h>
@@ -156,6 +158,13 @@ class TPqDataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
156158
return TStatus::Error;
157159
}
158160

161+
if (const auto maybeSharedReadingSetting = FindSetting(topicSource.Settings().Ptr(), SharedReading)) {
162+
const TExprNode& value = maybeSharedReadingSetting.Cast().Ref();
163+
if (value.IsAtom() && FromString<bool>(value.Content())) {
164+
input.Ptr()->SetTypeAnn(ctx.MakeType<TStreamExprType>(topicSource.RowType().Ref().GetTypeAnn()));
165+
return TStatus::Ok;
166+
}
167+
}
159168

160169
if (topic.Metadata().Empty()) {
161170
input.Ptr()->SetTypeAnn(ctx.MakeType<TStreamExprType>(ctx.MakeType<TDataExprType>(EDataSlot::String)));

ydb/library/yql/providers/pq/provider/yql_pq_helpers.cpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,5 +101,20 @@ void FillSettingsWithResolvedYdsIds(
101101
}
102102
}
103103

104+
TMaybeNode<TExprBase> FindSetting(TExprNode::TPtr settings, TStringBuf name) {
105+
const auto maybeSettingsList = TMaybeNode<TCoNameValueTupleList>(settings);
106+
if (!maybeSettingsList) {
107+
return nullptr;
108+
}
109+
const auto settingsList = maybeSettingsList.Cast();
110+
111+
for (size_t i = 0; i < settingsList.Size(); ++i) {
112+
TCoNameValueTuple setting = settingsList.Item(i);
113+
if (setting.Name().Value() == name) {
114+
return setting.Value();
115+
}
116+
}
117+
return nullptr;
118+
}
104119

105120
} // namespace NYql

ydb/library/yql/providers/pq/provider/yql_pq_helpers.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,6 @@ void FillSettingsWithResolvedYdsIds(
2121
const TPqState::TPtr& state,
2222
const TDatabaseResolverResponse::TDatabaseDescriptionMap& fullResolvedIds);
2323

24+
NNodes::TMaybeNode<NNodes::TExprBase> FindSetting(TExprNode::TPtr settings, TStringBuf name);
25+
2426
} // namespace NYql

ydb/library/yql/providers/pq/provider/yql_pq_mkql_compiler.cpp

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "yql_pq_mkql_compiler.h"
2+
#include "yql_pq_helpers.h"
23

34
#include <ydb/library/yql/providers/pq/common/yql_names.h>
45
#include <ydb/library/yql/providers/pq/expr_nodes/yql_pq_expr_nodes.h>
@@ -12,22 +13,6 @@ using namespace NNodes;
1213

1314
namespace {
1415

15-
TMaybeNode<TExprBase> FindSetting(TExprNode::TPtr settings, TStringBuf name) {
16-
const auto maybeSettingsList = TMaybeNode<TCoNameValueTupleList>(settings);
17-
if (!maybeSettingsList) {
18-
return nullptr;
19-
}
20-
const auto settingsList = maybeSettingsList.Cast();
21-
22-
for (size_t i = 0; i < settingsList.Size(); ++i) {
23-
TCoNameValueTuple setting = settingsList.Item(i);
24-
if (setting.Name().Value() == name) {
25-
return setting.Value();
26-
}
27-
}
28-
return nullptr;
29-
}
30-
3116
bool UseSharedReading(TExprNode::TPtr settings) {
3217
const auto maybeInnerSettings = FindSetting(settings, "settings");
3318
if (!maybeInnerSettings) {
@@ -43,26 +28,42 @@ bool UseSharedReading(TExprNode::TPtr settings) {
4328
return value.IsAtom() && FromString<bool>(value.Content());
4429
}
4530

31+
TRuntimeNode WrapSharedReading(const TDqSourceWrapBase &wrapper, NCommon::TMkqlBuildContext& ctx) {
32+
const auto input = MkqlBuildExpr(wrapper.Input().Ref(), ctx);
33+
const auto flow = ctx.ProgramBuilder.ToFlow(input);
34+
35+
const TStructExprType* rowType = wrapper.RowType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>();
36+
const auto* finalItemStructType = static_cast<TStructType*>(NCommon::BuildType(wrapper.RowType().Ref(), *rowType, ctx.ProgramBuilder));
37+
38+
return ctx.ProgramBuilder.ExpandMap(flow, [&](TRuntimeNode item) -> TRuntimeNode::TList {
39+
TRuntimeNode::TList fields;
40+
fields.reserve(finalItemStructType->GetMembersCount());
41+
for (ui32 i = 0; i < finalItemStructType->GetMembersCount(); ++i) {
42+
fields.push_back(ctx.ProgramBuilder.Member(item, finalItemStructType->GetMemberName(i)));
43+
}
44+
return fields;
45+
});
46+
}
47+
4648
}
4749

4850
void RegisterDqPqMkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler) {
4951
compiler.ChainCallable(TDqSourceWideWrap::CallableName(),
5052
[](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) {
5153
if (const auto wrapper = TDqSourceWideWrap(&node); wrapper.DataSource().Category().Value() == PqProviderName) {
54+
if (const auto maybeSettings = wrapper.Settings()) {
55+
if (UseSharedReading(maybeSettings.Cast().Ptr())) {
56+
return WrapSharedReading(wrapper, ctx);
57+
}
58+
}
59+
5260
const auto wrapped = TryWrapWithParser(wrapper, ctx);
5361
if (wrapped) {
5462
return *wrapped;
5563
}
5664

5765
const auto input = MkqlBuildExpr(wrapper.Input().Ref(), ctx);
5866
auto flow = ctx.ProgramBuilder.ToFlow(input);
59-
60-
if (const auto maybeSettings = wrapper.Settings()) {
61-
if (UseSharedReading(maybeSettings.Cast().Ptr())) {
62-
return flow;
63-
}
64-
}
65-
6667
return ctx.ProgramBuilder.ExpandMap(flow,
6768
[&](TRuntimeNode item) -> TRuntimeNode::TList {
6869
return {item};

0 commit comments

Comments
 (0)