Skip to content

YQ RD mod pushdown and bug fixes #11552

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions ydb/core/fq/libs/row_dispatcher/actors_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

#include <ydb/core/fq/libs/row_dispatcher/topic_session.h>

#include <ydb/library/yql/public/purecalc/common/interface.h>

namespace NFq::NRowDispatcher {


Expand All @@ -19,7 +17,7 @@ struct TActorFactory : public IActorFactory {
ui32 partitionId,
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway) const override {

Expand Down
5 changes: 3 additions & 2 deletions ydb/core/fq/libs/row_dispatcher/actors_factory.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
#pragma once

#include "common.h"

#include <ydb/core/fq/libs/config/protos/row_dispatcher.pb.h>
#include <util/generic/ptr.h>
#include <ydb/library/actors/core/actor.h>
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h>
#include <ydb/library/yql/public/purecalc/common/fwd.h>

namespace NFq::NRowDispatcher {

Expand All @@ -21,7 +22,7 @@ struct IActorFactory : public TThrRefBase {
ui32 partitionId,
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway) const = 0;
};
Expand Down
42 changes: 42 additions & 0 deletions ydb/core/fq/libs/row_dispatcher/common.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#include "common.h"

#include <util/system/mutex.h>

#include <ydb/library/yql/public/purecalc/common/interface.h>

namespace NFq {

namespace {

class TPureCalcProgramFactory : public IPureCalcProgramFactory {
public:
TPureCalcProgramFactory() {
CreateFactory({.EnabledLLVM = false});
CreateFactory({.EnabledLLVM = true});
}

NYql::NPureCalc::IProgramFactoryPtr GetFactory(const TSettings& settings) const override {
const auto it = ProgramFactories.find(settings);
Y_ENSURE(it != ProgramFactories.end());
return it->second;
}

private:
void CreateFactory(const TSettings& settings) {
ProgramFactories.insert({settings, NYql::NPureCalc::MakeProgramFactory(
NYql::NPureCalc::TProgramFactoryOptions()
.SetLLVMSettings(settings.EnabledLLVM ? "ON" : "OFF")
)});
}

private:
std::map<TSettings, NYql::NPureCalc::IProgramFactoryPtr> ProgramFactories;
};

} // anonymous namespace

IPureCalcProgramFactory::TPtr CreatePureCalcProgramFactory() {
return MakeIntrusive<TPureCalcProgramFactory>();
}

} // namespace NFq
25 changes: 25 additions & 0 deletions ydb/core/fq/libs/row_dispatcher/common.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#pragma once

#include <util/generic/ptr.h>

#include <ydb/library/yql/public/purecalc/common/fwd.h>

namespace NFq {

class IPureCalcProgramFactory : public TThrRefBase {
public:
using TPtr = TIntrusivePtr<IPureCalcProgramFactory>;

struct TSettings {
bool EnabledLLVM = false;

std::strong_ordering operator<=>(const TSettings& other) const = default;
};

public:
virtual NYql::NPureCalc::IProgramFactoryPtr GetFactory(const TSettings& settings) const = 0;
};

IPureCalcProgramFactory::TPtr CreatePureCalcProgramFactory();

} // namespace NFq
20 changes: 12 additions & 8 deletions ydb/core/fq/libs/row_dispatcher/json_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,14 +264,15 @@ class TJsonFilter::TImpl {
const TVector<TString>& types,
const TString& whereFilter,
TCallback callback,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory)
: Sql(GenerateSql(whereFilter)) {
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
const IPureCalcProgramFactory::TSettings& factorySettings)
: Sql(GenerateSql(whereFilter, factorySettings)) {
Y_ENSURE(columns.size() == types.size(), "Number of columns and types should by equal");

// Program should be stateless because input values
// allocated on another allocator and should be released
LOG_ROW_DISPATCHER_DEBUG("Creating program...");
Program = pureCalcProgramFactory->MakePushStreamProgram(
Program = pureCalcProgramFactory->GetFactory(factorySettings)->MakePushStreamProgram(
TFilterInputSpec(MakeInputSchema(columns, types)),
TFilterOutputSpec(MakeOutputSchema()),
Sql,
Expand All @@ -291,8 +292,9 @@ class TJsonFilter::TImpl {
}

private:
TString GenerateSql(const TString& whereFilter) {
TString GenerateSql(const TString& whereFilter, const IPureCalcProgramFactory::TSettings& factorySettings) {
TStringStream str;
str << "PRAGMA config.flags(\"LLVM\", \"" << (factorySettings.EnabledLLVM ? "ON" : "OFF") << "\");\n";
str << "$filtered = SELECT * FROM Input " << whereFilter << ";\n";

str << "SELECT " << OffsetFieldName << ", Unwrap(Json::SerializeJson(Yson::From(RemoveMembers(TableRow(), [\"" << OffsetFieldName;
Expand All @@ -312,8 +314,9 @@ TJsonFilter::TJsonFilter(
const TVector<TString>& types,
const TString& whereFilter,
TCallback callback,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory)
: Impl(std::make_unique<TJsonFilter::TImpl>(columns, types, whereFilter, callback, pureCalcProgramFactory)) {
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
const IPureCalcProgramFactory::TSettings& factorySettings)
: Impl(std::make_unique<TJsonFilter::TImpl>(columns, types, whereFilter, callback, pureCalcProgramFactory, factorySettings)) {
}

TJsonFilter::~TJsonFilter() {
Expand All @@ -332,8 +335,9 @@ std::unique_ptr<TJsonFilter> NewJsonFilter(
const TVector<TString>& types,
const TString& whereFilter,
TCallback callback,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory) {
return std::unique_ptr<TJsonFilter>(new TJsonFilter(columns, types, whereFilter, callback, pureCalcProgramFactory));
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
const IPureCalcProgramFactory::TSettings& factorySettings) {
return std::unique_ptr<TJsonFilter>(new TJsonFilter(columns, types, whereFilter, callback, pureCalcProgramFactory, factorySettings));
}

} // namespace NFq
9 changes: 6 additions & 3 deletions ydb/core/fq/libs/row_dispatcher/json_filter.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#pragma once

#include "common.h"

#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
#include <ydb/library/yql/public/purecalc/common/fwd.h>

namespace NFq {

Expand All @@ -15,7 +16,8 @@ class TJsonFilter {
const TVector<TString>& types,
const TString& whereFilter,
TCallback callback,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory);
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
const IPureCalcProgramFactory::TSettings& factorySettings);

~TJsonFilter();

Expand All @@ -32,6 +34,7 @@ std::unique_ptr<TJsonFilter> NewJsonFilter(
const TVector<TString>& types,
const TString& whereFilter,
TJsonFilter::TCallback callback,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory);
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
const IPureCalcProgramFactory::TSettings& factorySettings);

} // namespace NFq
5 changes: 3 additions & 2 deletions ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "row_dispatcher.h"
#include "common.h"
#include "coordinator.h"

#include <ydb/library/actors/core/actorid.h>
Expand Down Expand Up @@ -214,7 +215,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {

NConfig::TRowDispatcherConfig Config;
NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory;
NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory;
IPureCalcProgramFactory::TPtr PureCalcProgramFactory;
TYqSharedResources::TPtr YqSharedResources;
TMaybe<TActorId> CoordinatorActorId;
TSet<TActorId> CoordinatorChangedSubscribers;
Expand Down Expand Up @@ -362,7 +363,7 @@ TRowDispatcher::TRowDispatcher(
const NYql::IPqGateway::TPtr& pqGateway)
: Config(config)
, CredentialsProviderFactory(credentialsProviderFactory)
, PureCalcProgramFactory(NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions()))
, PureCalcProgramFactory(CreatePureCalcProgramFactory())
, YqSharedResources(yqSharedResources)
, CredentialsFactory(credentialsFactory)
, LogPrefix("RowDispatcher: ")
Expand Down
55 changes: 39 additions & 16 deletions ydb/core/fq/libs/row_dispatcher/topic_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
TParserInputType InputType;
};

struct TFieldDescription {
ui64 IndexInParserSchema = 0;
TString Type;
};

bool InflightReconnect = false;
TDuration ReconnectPeriod;
const TString TopicPath;
Expand All @@ -151,7 +156,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
ui32 PartitionId;
NYdb::TDriver Driver;
std::shared_ptr<NYdb::ICredentialsProviderFactory> CredentialsProviderFactory;
NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory;
IPureCalcProgramFactory::TPtr PureCalcProgramFactory;
NYql::ITopicClient::TPtr TopicClient;
std::shared_ptr<NYdb::NTopic::IReadSession> ReadSession;
const i64 BufferSize;
Expand All @@ -168,7 +173,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
const ::NMonitoring::TDynamicCounterPtr Counters;
TTopicSessionMetrics Metrics;
TParserSchema ParserSchema;
THashMap<TString, ui64> FieldsIndexes;
THashMap<TString, TFieldDescription> FieldsIndexes;
NYql::IPqGateway::TPtr PqGateway;
TMaybe<TString> ConsumerName;

Expand All @@ -182,7 +187,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
ui32 partitionId,
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway);

Expand Down Expand Up @@ -273,7 +278,7 @@ TTopicSession::TTopicSession(
ui32 partitionId,
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway)
: TopicPath(topicPath)
Expand Down Expand Up @@ -683,14 +688,16 @@ void TTopicSession::SendData(TClientsInfo& info) {
}

void TTopicSession::UpdateFieldsIds(TClientsInfo& info) {
for (auto name : info.Settings.GetSource().GetColumns()) {
const auto& source = info.Settings.GetSource();
for (size_t i = 0; i < source.ColumnsSize(); ++i) {
const auto& name = source.GetColumns().Get(i);
auto it = FieldsIndexes.find(name);
if (it == FieldsIndexes.end()) {
auto nextIndex = FieldsIndexes.size();
info.FieldsIds.push_back(nextIndex);
FieldsIndexes[name] = nextIndex;
FieldsIndexes[name] = {nextIndex, source.GetColumnTypes().Get(i)};
} else {
info.FieldsIds.push_back(it->second);
info.FieldsIds.push_back(it->second.IndexInParserSchema);
}
}
}
Expand Down Expand Up @@ -727,10 +734,11 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
std::forward_as_tuple(ev)).first->second;
UpdateFieldsIds(clientInfo);

TString predicate = clientInfo.Settings.GetSource().GetPredicate();
const auto& source = clientInfo.Settings.GetSource();
TString predicate = source.GetPredicate();

// TODO: remove this when the re-parsing is removed from pq read actor
if (predicate.empty() && HasJsonColumns(clientInfo.Settings.GetSource())) {
if (predicate.empty() && HasJsonColumns(source)) {
predicate = "WHERE TRUE";
}

Expand All @@ -742,7 +750,9 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
[&, actorId = clientInfo.ReadActorId](ui64 offset, const TString& json){
Send(SelfId(), new NFq::TEvPrivate::TEvDataAfterFilteration(offset, json, actorId));
},
PureCalcProgramFactory);
PureCalcProgramFactory,
{.EnabledLLVM = source.GetEnabledLLVM()}
);
} else {
ClientsWithoutPredicate.insert(ev->Sender);
}
Expand Down Expand Up @@ -816,7 +826,7 @@ void TTopicSession::UpdateParserSchema(const TParserInputType& inputType) {
ui64 offset = 0;
for (const auto& [name, type]: inputType) {
Y_ENSURE(FieldsIndexes.contains(name));
ui64 index = FieldsIndexes[name];
ui64 index = FieldsIndexes[name].IndexInParserSchema;
ParserSchema.FieldsMap[index] = offset++;
}
ParserSchema.InputType = inputType;
Expand Down Expand Up @@ -944,13 +954,26 @@ bool TTopicSession::CheckNewClient(NFq::TEvRowDispatcher::TEvStartSession::TPtr&
SendSessionError(ev->Sender, "Internal error: such a client already exists");
return false;
}
if (!Config.GetWithoutConsumer()
&& ConsumerName
&& ConsumerName != ev->Get()->Record.GetSource().GetConsumerName()) {
LOG_ROW_DISPATCHER_INFO("Different consumer, expected " << ConsumerName << ", actual " << ev->Get()->Record.GetSource().GetConsumerName() << ", send error");

const auto& source = ev->Get()->Record.GetSource();
if (!Config.GetWithoutConsumer() && ConsumerName && ConsumerName != source.GetConsumerName()) {
LOG_ROW_DISPATCHER_INFO("Different consumer, expected " << ConsumerName << ", actual " << source.GetConsumerName() << ", send error");
SendSessionError(ev->Sender, TStringBuilder() << "Use the same consumer in all queries via RD (current consumer " << ConsumerName << ")");
return false;
}

Y_ENSURE(source.ColumnsSize() == source.ColumnTypesSize());
for (size_t i = 0; i < source.ColumnsSize(); ++i) {
const auto& name = source.GetColumns().Get(i);
const auto& type = source.GetColumnTypes().Get(i);
const auto it = FieldsIndexes.find(name);
if (it != FieldsIndexes.end() && it->second.Type != type) {
LOG_ROW_DISPATCHER_INFO("Different column `" << name << "` type, expected " << it->second.Type << ", actual " << type << ", send error");
SendSessionError(ev->Sender, TStringBuilder() << "Use the same column type in all queries via RD, current type for column `" << name << "` is " << it->second.Type << " (requested type is " << type <<")");
return false;
}
}

return true;
}

Expand All @@ -967,7 +990,7 @@ std::unique_ptr<NActors::IActor> NewTopicSession(
ui32 partitionId,
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway) {
return std::unique_ptr<NActors::IActor>(new TTopicSession(topicPath, endpoint, database, config, rowDispatcherActorId, partitionId, std::move(driver), credentialsProviderFactory, pureCalcProgramFactory, counters, pqGateway));
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/fq/libs/row_dispatcher/topic_session.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include "common.h"

#include <ydb/core/fq/libs/config/protos/row_dispatcher.pb.h>
#include <ydb/core/fq/libs/config/protos/common.pb.h>
#include <ydb/core/fq/libs/shared_resources/shared_resources.h>
Expand All @@ -8,7 +10,6 @@

#include <ydb/library/yql/providers/pq/proto/dq_io.pb.h>
#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h>
#include <ydb/library/yql/public/purecalc/common/fwd.h>

#include <ydb/library/actors/core/actor.h>

Expand All @@ -25,7 +26,7 @@ std::unique_ptr<NActors::IActor> NewTopicSession(
ui32 partitionId,
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway);

Expand Down
Loading
Loading