Skip to content

Commit d6c7e47

Browse files
authored
YQ RD mod pushdown and bug fixes (#11552)
1 parent 0295ff2 commit d6c7e47

21 files changed

+190
-47
lines changed

ydb/core/fq/libs/row_dispatcher/actors_factory.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22

33
#include <ydb/core/fq/libs/row_dispatcher/topic_session.h>
44

5-
#include <ydb/library/yql/public/purecalc/common/interface.h>
6-
75
namespace NFq::NRowDispatcher {
86

97

@@ -19,7 +17,7 @@ struct TActorFactory : public IActorFactory {
1917
ui32 partitionId,
2018
NYdb::TDriver driver,
2119
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
22-
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
20+
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
2321
const ::NMonitoring::TDynamicCounterPtr& counters,
2422
const NYql::IPqGateway::TPtr& pqGateway) const override {
2523

ydb/core/fq/libs/row_dispatcher/actors_factory.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
#pragma once
22

3+
#include "common.h"
4+
35
#include <ydb/core/fq/libs/config/protos/row_dispatcher.pb.h>
46
#include <util/generic/ptr.h>
57
#include <ydb/library/actors/core/actor.h>
68
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
79
#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h>
8-
#include <ydb/library/yql/public/purecalc/common/fwd.h>
910

1011
namespace NFq::NRowDispatcher {
1112

@@ -21,7 +22,7 @@ struct IActorFactory : public TThrRefBase {
2122
ui32 partitionId,
2223
NYdb::TDriver driver,
2324
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
24-
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
25+
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
2526
const ::NMonitoring::TDynamicCounterPtr& counters,
2627
const NYql::IPqGateway::TPtr& pqGateway) const = 0;
2728
};
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
#include "common.h"
2+
3+
#include <util/system/mutex.h>
4+
5+
#include <ydb/library/yql/public/purecalc/common/interface.h>
6+
7+
namespace NFq {
8+
9+
namespace {
10+
11+
class TPureCalcProgramFactory : public IPureCalcProgramFactory {
12+
public:
13+
TPureCalcProgramFactory() {
14+
CreateFactory({.EnabledLLVM = false});
15+
CreateFactory({.EnabledLLVM = true});
16+
}
17+
18+
NYql::NPureCalc::IProgramFactoryPtr GetFactory(const TSettings& settings) const override {
19+
const auto it = ProgramFactories.find(settings);
20+
Y_ENSURE(it != ProgramFactories.end());
21+
return it->second;
22+
}
23+
24+
private:
25+
void CreateFactory(const TSettings& settings) {
26+
ProgramFactories.insert({settings, NYql::NPureCalc::MakeProgramFactory(
27+
NYql::NPureCalc::TProgramFactoryOptions()
28+
.SetLLVMSettings(settings.EnabledLLVM ? "ON" : "OFF")
29+
)});
30+
}
31+
32+
private:
33+
std::map<TSettings, NYql::NPureCalc::IProgramFactoryPtr> ProgramFactories;
34+
};
35+
36+
} // anonymous namespace
37+
38+
IPureCalcProgramFactory::TPtr CreatePureCalcProgramFactory() {
39+
return MakeIntrusive<TPureCalcProgramFactory>();
40+
}
41+
42+
} // namespace NFq
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#pragma once
2+
3+
#include <util/generic/ptr.h>
4+
5+
#include <ydb/library/yql/public/purecalc/common/fwd.h>
6+
7+
namespace NFq {
8+
9+
class IPureCalcProgramFactory : public TThrRefBase {
10+
public:
11+
using TPtr = TIntrusivePtr<IPureCalcProgramFactory>;
12+
13+
struct TSettings {
14+
bool EnabledLLVM = false;
15+
16+
std::strong_ordering operator<=>(const TSettings& other) const = default;
17+
};
18+
19+
public:
20+
virtual NYql::NPureCalc::IProgramFactoryPtr GetFactory(const TSettings& settings) const = 0;
21+
};
22+
23+
IPureCalcProgramFactory::TPtr CreatePureCalcProgramFactory();
24+
25+
} // namespace NFq

ydb/core/fq/libs/row_dispatcher/json_filter.cpp

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -264,14 +264,15 @@ class TJsonFilter::TImpl {
264264
const TVector<TString>& types,
265265
const TString& whereFilter,
266266
TCallback callback,
267-
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory)
268-
: Sql(GenerateSql(whereFilter)) {
267+
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
268+
const IPureCalcProgramFactory::TSettings& factorySettings)
269+
: Sql(GenerateSql(whereFilter, factorySettings)) {
269270
Y_ENSURE(columns.size() == types.size(), "Number of columns and types should by equal");
270271

271272
// Program should be stateless because input values
272273
// allocated on another allocator and should be released
273274
LOG_ROW_DISPATCHER_DEBUG("Creating program...");
274-
Program = pureCalcProgramFactory->MakePushStreamProgram(
275+
Program = pureCalcProgramFactory->GetFactory(factorySettings)->MakePushStreamProgram(
275276
TFilterInputSpec(MakeInputSchema(columns, types)),
276277
TFilterOutputSpec(MakeOutputSchema()),
277278
Sql,
@@ -291,8 +292,9 @@ class TJsonFilter::TImpl {
291292
}
292293

293294
private:
294-
TString GenerateSql(const TString& whereFilter) {
295+
TString GenerateSql(const TString& whereFilter, const IPureCalcProgramFactory::TSettings& factorySettings) {
295296
TStringStream str;
297+
str << "PRAGMA config.flags(\"LLVM\", \"" << (factorySettings.EnabledLLVM ? "ON" : "OFF") << "\");\n";
296298
str << "$filtered = SELECT * FROM Input " << whereFilter << ";\n";
297299

298300
str << "SELECT " << OffsetFieldName << ", Unwrap(Json::SerializeJson(Yson::From(RemoveMembers(TableRow(), [\"" << OffsetFieldName;
@@ -312,8 +314,9 @@ TJsonFilter::TJsonFilter(
312314
const TVector<TString>& types,
313315
const TString& whereFilter,
314316
TCallback callback,
315-
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory)
316-
: Impl(std::make_unique<TJsonFilter::TImpl>(columns, types, whereFilter, callback, pureCalcProgramFactory)) {
317+
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
318+
const IPureCalcProgramFactory::TSettings& factorySettings)
319+
: Impl(std::make_unique<TJsonFilter::TImpl>(columns, types, whereFilter, callback, pureCalcProgramFactory, factorySettings)) {
317320
}
318321

319322
TJsonFilter::~TJsonFilter() {
@@ -332,8 +335,9 @@ std::unique_ptr<TJsonFilter> NewJsonFilter(
332335
const TVector<TString>& types,
333336
const TString& whereFilter,
334337
TCallback callback,
335-
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory) {
336-
return std::unique_ptr<TJsonFilter>(new TJsonFilter(columns, types, whereFilter, callback, pureCalcProgramFactory));
338+
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
339+
const IPureCalcProgramFactory::TSettings& factorySettings) {
340+
return std::unique_ptr<TJsonFilter>(new TJsonFilter(columns, types, whereFilter, callback, pureCalcProgramFactory, factorySettings));
337341
}
338342

339343
} // namespace NFq

ydb/core/fq/libs/row_dispatcher/json_filter.h

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
#pragma once
22

3+
#include "common.h"
4+
35
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
4-
#include <ydb/library/yql/public/purecalc/common/fwd.h>
56

67
namespace NFq {
78

@@ -15,7 +16,8 @@ class TJsonFilter {
1516
const TVector<TString>& types,
1617
const TString& whereFilter,
1718
TCallback callback,
18-
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory);
19+
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
20+
const IPureCalcProgramFactory::TSettings& factorySettings);
1921

2022
~TJsonFilter();
2123

@@ -32,6 +34,7 @@ std::unique_ptr<TJsonFilter> NewJsonFilter(
3234
const TVector<TString>& types,
3335
const TString& whereFilter,
3436
TJsonFilter::TCallback callback,
35-
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory);
37+
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
38+
const IPureCalcProgramFactory::TSettings& factorySettings);
3639

3740
} // namespace NFq

ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "row_dispatcher.h"
2+
#include "common.h"
23
#include "coordinator.h"
34

45
#include <ydb/library/actors/core/actorid.h>
@@ -214,7 +215,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
214215

215216
NConfig::TRowDispatcherConfig Config;
216217
NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory;
217-
NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory;
218+
IPureCalcProgramFactory::TPtr PureCalcProgramFactory;
218219
TYqSharedResources::TPtr YqSharedResources;
219220
TMaybe<TActorId> CoordinatorActorId;
220221
TSet<TActorId> CoordinatorChangedSubscribers;
@@ -362,7 +363,7 @@ TRowDispatcher::TRowDispatcher(
362363
const NYql::IPqGateway::TPtr& pqGateway)
363364
: Config(config)
364365
, CredentialsProviderFactory(credentialsProviderFactory)
365-
, PureCalcProgramFactory(NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions()))
366+
, PureCalcProgramFactory(CreatePureCalcProgramFactory())
366367
, YqSharedResources(yqSharedResources)
367368
, CredentialsFactory(credentialsFactory)
368369
, LogPrefix("RowDispatcher: ")

ydb/core/fq/libs/row_dispatcher/topic_session.cpp

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,11 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
144144
TParserInputType InputType;
145145
};
146146

147+
struct TFieldDescription {
148+
ui64 IndexInParserSchema = 0;
149+
TString Type;
150+
};
151+
147152
bool InflightReconnect = false;
148153
TDuration ReconnectPeriod;
149154
const TString TopicPath;
@@ -153,7 +158,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
153158
ui32 PartitionId;
154159
NYdb::TDriver Driver;
155160
std::shared_ptr<NYdb::ICredentialsProviderFactory> CredentialsProviderFactory;
156-
NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory;
161+
IPureCalcProgramFactory::TPtr PureCalcProgramFactory;
157162
NYql::ITopicClient::TPtr TopicClient;
158163
std::shared_ptr<NYdb::NTopic::IReadSession> ReadSession;
159164
const i64 BufferSize;
@@ -170,7 +175,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
170175
const ::NMonitoring::TDynamicCounterPtr Counters;
171176
TTopicSessionMetrics Metrics;
172177
TParserSchema ParserSchema;
173-
THashMap<TString, ui64> FieldsIndexes;
178+
THashMap<TString, TFieldDescription> FieldsIndexes;
174179
NYql::IPqGateway::TPtr PqGateway;
175180
TMaybe<TString> ConsumerName;
176181
ui64 RestartSessionByOffsets = 0;
@@ -185,7 +190,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
185190
ui32 partitionId,
186191
NYdb::TDriver driver,
187192
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
188-
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
193+
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
189194
const ::NMonitoring::TDynamicCounterPtr& counters,
190195
const NYql::IPqGateway::TPtr& pqGateway);
191196

@@ -276,7 +281,7 @@ TTopicSession::TTopicSession(
276281
ui32 partitionId,
277282
NYdb::TDriver driver,
278283
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
279-
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
284+
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
280285
const ::NMonitoring::TDynamicCounterPtr& counters,
281286
const NYql::IPqGateway::TPtr& pqGateway)
282287
: TopicPath(topicPath)
@@ -686,14 +691,16 @@ void TTopicSession::SendData(TClientsInfo& info) {
686691
}
687692

688693
void TTopicSession::UpdateFieldsIds(TClientsInfo& info) {
689-
for (auto name : info.Settings.GetSource().GetColumns()) {
694+
const auto& source = info.Settings.GetSource();
695+
for (size_t i = 0; i < source.ColumnsSize(); ++i) {
696+
const auto& name = source.GetColumns().Get(i);
690697
auto it = FieldsIndexes.find(name);
691698
if (it == FieldsIndexes.end()) {
692699
auto nextIndex = FieldsIndexes.size();
693700
info.FieldsIds.push_back(nextIndex);
694-
FieldsIndexes[name] = nextIndex;
701+
FieldsIndexes[name] = {nextIndex, source.GetColumnTypes().Get(i)};
695702
} else {
696-
info.FieldsIds.push_back(it->second);
703+
info.FieldsIds.push_back(it->second.IndexInParserSchema);
697704
}
698705
}
699706
}
@@ -730,10 +737,11 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
730737
std::forward_as_tuple(ev)).first->second;
731738
UpdateFieldsIds(clientInfo);
732739

733-
TString predicate = clientInfo.Settings.GetSource().GetPredicate();
740+
const auto& source = clientInfo.Settings.GetSource();
741+
TString predicate = source.GetPredicate();
734742

735743
// TODO: remove this when the re-parsing is removed from pq read actor
736-
if (predicate.empty() && HasJsonColumns(clientInfo.Settings.GetSource())) {
744+
if (predicate.empty() && HasJsonColumns(source)) {
737745
predicate = "WHERE TRUE";
738746
}
739747

@@ -745,7 +753,9 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
745753
[&, actorId = clientInfo.ReadActorId](ui64 offset, const TString& json){
746754
Send(SelfId(), new NFq::TEvPrivate::TEvDataAfterFilteration(offset, json, actorId));
747755
},
748-
PureCalcProgramFactory);
756+
PureCalcProgramFactory,
757+
{.EnabledLLVM = source.GetEnabledLLVM()}
758+
);
749759
} else {
750760
ClientsWithoutPredicate.insert(ev->Sender);
751761
}
@@ -821,7 +831,7 @@ void TTopicSession::UpdateParserSchema(const TParserInputType& inputType) {
821831
ui64 offset = 0;
822832
for (const auto& [name, type]: inputType) {
823833
Y_ENSURE(FieldsIndexes.contains(name));
824-
ui64 index = FieldsIndexes[name];
834+
ui64 index = FieldsIndexes[name].IndexInParserSchema;
825835
ParserSchema.FieldsMap[index] = offset++;
826836
}
827837
ParserSchema.InputType = inputType;
@@ -950,13 +960,26 @@ bool TTopicSession::CheckNewClient(NFq::TEvRowDispatcher::TEvStartSession::TPtr&
950960
SendSessionError(ev->Sender, "Internal error: such a client already exists");
951961
return false;
952962
}
953-
if (!Config.GetWithoutConsumer()
954-
&& ConsumerName
955-
&& ConsumerName != ev->Get()->Record.GetSource().GetConsumerName()) {
956-
LOG_ROW_DISPATCHER_INFO("Different consumer, expected " << ConsumerName << ", actual " << ev->Get()->Record.GetSource().GetConsumerName() << ", send error");
963+
964+
const auto& source = ev->Get()->Record.GetSource();
965+
if (!Config.GetWithoutConsumer() && ConsumerName && ConsumerName != source.GetConsumerName()) {
966+
LOG_ROW_DISPATCHER_INFO("Different consumer, expected " << ConsumerName << ", actual " << source.GetConsumerName() << ", send error");
957967
SendSessionError(ev->Sender, TStringBuilder() << "Use the same consumer in all queries via RD (current consumer " << ConsumerName << ")");
958968
return false;
959969
}
970+
971+
Y_ENSURE(source.ColumnsSize() == source.ColumnTypesSize());
972+
for (size_t i = 0; i < source.ColumnsSize(); ++i) {
973+
const auto& name = source.GetColumns().Get(i);
974+
const auto& type = source.GetColumnTypes().Get(i);
975+
const auto it = FieldsIndexes.find(name);
976+
if (it != FieldsIndexes.end() && it->second.Type != type) {
977+
LOG_ROW_DISPATCHER_INFO("Different column `" << name << "` type, expected " << it->second.Type << ", actual " << type << ", send error");
978+
SendSessionError(ev->Sender, TStringBuilder() << "Use the same column type in all queries via RD, current type for column `" << name << "` is " << it->second.Type << " (requested type is " << type <<")");
979+
return false;
980+
}
981+
}
982+
960983
return true;
961984
}
962985

@@ -973,7 +996,7 @@ std::unique_ptr<NActors::IActor> NewTopicSession(
973996
ui32 partitionId,
974997
NYdb::TDriver driver,
975998
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
976-
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
999+
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
9771000
const ::NMonitoring::TDynamicCounterPtr& counters,
9781001
const NYql::IPqGateway::TPtr& pqGateway) {
9791002
return std::unique_ptr<NActors::IActor>(new TTopicSession(topicPath, endpoint, database, config, rowDispatcherActorId, partitionId, std::move(driver), credentialsProviderFactory, pureCalcProgramFactory, counters, pqGateway));

ydb/core/fq/libs/row_dispatcher/topic_session.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
#pragma once
22

3+
#include "common.h"
4+
35
#include <ydb/core/fq/libs/config/protos/row_dispatcher.pb.h>
46
#include <ydb/core/fq/libs/config/protos/common.pb.h>
57
#include <ydb/core/fq/libs/shared_resources/shared_resources.h>
@@ -8,7 +10,6 @@
810

911
#include <ydb/library/yql/providers/pq/proto/dq_io.pb.h>
1012
#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h>
11-
#include <ydb/library/yql/public/purecalc/common/fwd.h>
1213

1314
#include <ydb/library/actors/core/actor.h>
1415

@@ -25,7 +26,7 @@ std::unique_ptr<NActors::IActor> NewTopicSession(
2526
ui32 partitionId,
2627
NYdb::TDriver driver,
2728
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
28-
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
29+
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
2930
const ::NMonitoring::TDynamicCounterPtr& counters,
3031
const NYql::IPqGateway::TPtr& pqGateway);
3132

0 commit comments

Comments
 (0)