Skip to content

YQ CTAS fix for script executions #9228

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
break;

case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT:
AsyncCompileResult = KqpHost->PrepareGenericScript(QueryRef, prepareSettings);
AsyncCompileResult = KqpHost->PrepareGenericScript(QueryRef, prepareSettings, SplitExpr);
break;

default:
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1142,10 +1142,10 @@ class TKqpHost : public IKqpHost {
});
}

IAsyncQueryResultPtr PrepareGenericScript(const TKqpQueryRef& query, const TPrepareSettings& settings) override {
IAsyncQueryResultPtr PrepareGenericScript(const TKqpQueryRef& query, const TPrepareSettings& settings, NYql::TExprNode::TPtr expr = nullptr) override {
return CheckedProcessQuery(*ExprCtx,
[this, &query, settings] (TExprContext& ctx) mutable {
return PrepareQueryInternal(query, nullptr, EKikimrQueryType::Script, settings, ctx);
[this, &query, settings, expr] (TExprContext& ctx) mutable {
return PrepareQueryInternal(query, expr, EKikimrQueryType::Script, settings, ctx);
});
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/host/kqp_host.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class IKqpHost : public TThrRefBase {
virtual IAsyncQueryResultPtr PrepareGenericQuery(const TKqpQueryRef& query, const TPrepareSettings& settings, NYql::TExprNode::TPtr expr = nullptr) = 0;

/* Federated queries */
virtual IAsyncQueryResultPtr PrepareGenericScript(const TKqpQueryRef& query, const TPrepareSettings& settings) = 0;
virtual IAsyncQueryResultPtr PrepareGenericScript(const TKqpQueryRef& query, const TPrepareSettings& settings, NYql::TExprNode::TPtr expr = nullptr) = 0;

/* Scripting */
virtual IAsyncQueryResultPtr ValidateYqlScript(const TKqpQueryRef& script) = 0;
Expand Down
66 changes: 46 additions & 20 deletions ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1804,13 +1804,13 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
ExecuteSelectQuery("test_bucket_execute_script_with_large_file", 5_MB, 500000);
}

std::shared_ptr<TKikimrRunner> CreateSampleDataSource(const TString& externalDataSourceName, const TString& externalTableName) {
std::shared_ptr<TKikimrRunner> CreateSampleDataSource(const TString& externalDataSourceName, const TString& externalTableName, bool enableOltp) {
const TString bucket = "test_bucket3";
const TString object = "test_object";

NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(true);
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(enableOltp);
appConfig.MutableTableServiceConfig()->SetEnableCreateTableAs(true);
appConfig.MutableTableServiceConfig()->SetEnablePerStatementQueryExecution(true);
appConfig.MutableFeatureFlags()->SetEnableTempTables(true);
Expand Down Expand Up @@ -1863,8 +1863,8 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {

}

void ValidateTables(TQueryClient& client, const TString& oltpTable, const TString& olapTable) {
{
void ValidateTables(TQueryClient& client, const TString& oltpTable, const TString& olapTable, bool enableOltp) {
if (enableOltp) {
const TString query = TStringBuilder() << "SELECT Unwrap(key), Unwrap(value) FROM `" << oltpTable << "`;";
ValidateResult(client.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync());
}
Expand All @@ -1875,15 +1875,15 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
}
}

Y_UNIT_TEST(CreateTableAsSelectFromExternalDataSource) {
void DoCreateTableAsSelectFromExternalDataSource(std::function<void(const TString&, TQueryClient&, const TDriver&)> requestRunner, bool enableOltp) {
const TString externalDataSourceName = "external_data_source";
const TString externalTableName = "test_binding_resolve";

auto kikimr = CreateSampleDataSource(externalDataSourceName, externalTableName);
auto kikimr = CreateSampleDataSource(externalDataSourceName, externalTableName, enableOltp);
auto client = kikimr->GetQueryClient();

const TString oltpTable = "DestinationOltp";
{
if (enableOltp) {
const TString query = fmt::format(R"(
PRAGMA TablePathPrefix = "TestDomain";
CREATE TABLE `{destination}` (
Expand All @@ -1900,8 +1900,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
"destination"_a = oltpTable,
"external_source"_a = externalDataSourceName
);
auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
requestRunner(query, client, kikimr->GetDriver());
}

const TString olapTable = "DestinationOlap";
Expand All @@ -1923,22 +1922,43 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
"destination"_a = olapTable,
"external_source"_a = externalDataSourceName
);
auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
requestRunner(query, client, kikimr->GetDriver());
}

ValidateTables(client, oltpTable, olapTable);
ValidateTables(client, oltpTable, olapTable, enableOltp);
}

void RunGenericQuery(const TString& query, TQueryClient& client, const TDriver&) {
auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

void RunGenericScript(const TString& script, TQueryClient& client, const TDriver& driver) {
auto scriptExecutionOperation = client.ExecuteScript(script).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId);

NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), driver);
UNIT_ASSERT_VALUES_EQUAL_C(readyOp.Metadata().ExecStatus, EExecStatus::Completed, readyOp.Status().GetIssues().ToOneLineString());
}

Y_UNIT_TEST(CreateTableAsSelectFromExternalTable) {
Y_UNIT_TEST(CreateTableAsSelectFromExternalDataSourceGenericQuery) {
DoCreateTableAsSelectFromExternalDataSource(&RunGenericQuery, true);
}

Y_UNIT_TEST(CreateTableAsSelectFromExternalDataSourceGenericScript) {
DoCreateTableAsSelectFromExternalDataSource(&RunGenericScript, false);
}

void DoCreateTableAsSelectFromExternalTable(std::function<void(const TString&, TQueryClient&, const TDriver&)> requestRunner, bool enableOltp) {
const TString externalDataSourceName = "external_data_source";
const TString externalTableName = "test_binding_resolve";

auto kikimr = CreateSampleDataSource(externalDataSourceName, externalTableName);
auto kikimr = CreateSampleDataSource(externalDataSourceName, externalTableName, enableOltp);
auto client = kikimr->GetQueryClient();

const TString oltpTable = "DestinationOltp";
{
if (enableOltp) {
const TString query = fmt::format(R"(
PRAGMA TablePathPrefix = "TestDomain";
CREATE TABLE `{destination}` (
Expand All @@ -1949,8 +1969,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
"destination"_a = oltpTable,
"external_table"_a = externalTableName
);
auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
requestRunner(query, client, kikimr->GetDriver());
}

const TString olapTable = "DestinationOlap";
Expand All @@ -1966,11 +1985,18 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
"destination"_a = olapTable,
"external_table"_a = externalTableName
);
auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
requestRunner(query, client, kikimr->GetDriver());
}

ValidateTables(client, oltpTable, olapTable);
ValidateTables(client, oltpTable, olapTable, enableOltp);
}

Y_UNIT_TEST(CreateTableAsSelectFromExternalTableGenericQuery) {
DoCreateTableAsSelectFromExternalTable(&RunGenericQuery, true);
}

Y_UNIT_TEST(CreateTableAsSelectFromExternalTableGenericScript) {
DoCreateTableAsSelectFromExternalTable(&RunGenericScript, false);
}

Y_UNIT_TEST(OverridePlannerDefaults) {
Expand Down
Loading