Skip to content

Commit e95799a

Browse files
committed
Added unit test on PRAGMA ydb.MaxTasksPerStage
1 parent a9a28dd commit e95799a

File tree

1 file changed

+63
-0
lines changed

1 file changed

+63
-0
lines changed

ydb/core/kqp/ut/federated_query/s3/kqp_s3_plan_ut.cpp

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,69 @@ Y_UNIT_TEST_SUITE(KqpS3PlanTest) {
159159
UNIT_ASSERT_VALUES_EQUAL(sourceOp["ReadColumns"].GetArraySafe()[0].GetStringSafe(), "key");
160160
UNIT_ASSERT_VALUES_EQUAL(sourceOp["ReadColumns"].GetArraySafe()[1].GetStringSafe(), "value");
161161
}
162+
163+
Y_UNIT_TEST(S3CreateTableAsSelect) {
164+
{
165+
Aws::S3::S3Client s3Client = MakeS3Client();
166+
CreateBucketWithObject("test_ctas_read", "test_ctas_read1", TEST_CONTENT, s3Client);
167+
UploadObject("test_ctas_read", "test_ctas_read2", TEST_CONTENT, s3Client);
168+
}
169+
170+
auto kikimr = NTestUtils::MakeKikimrRunner();
171+
172+
auto tc = kikimr->GetTableClient();
173+
auto session = tc.CreateSession().GetValueSync().GetSession();
174+
const TString query = fmt::format(R"sql(
175+
CREATE EXTERNAL DATA SOURCE read_data_source WITH (
176+
SOURCE_TYPE="ObjectStorage",
177+
LOCATION="{read_location}",
178+
AUTH_METHOD="NONE"
179+
);
180+
CREATE EXTERNAL TABLE read_table (
181+
key Utf8 NOT NULL,
182+
value Utf8 NOT NULL
183+
) WITH (
184+
DATA_SOURCE="read_data_source",
185+
LOCATION="/",
186+
FORMAT="json_each_row"
187+
);
188+
)sql",
189+
"read_location"_a = GetBucketLocation("test_ctas_read")
190+
);
191+
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
192+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
193+
194+
const TString sql = R"sql(
195+
PRAGMA ydb.MaxTasksPerStage = "1";
196+
197+
CREATE TABLE result_table (
198+
PRIMARY KEY (key)
199+
)
200+
WITH (STORE = COLUMN)
201+
AS SELECT * FROM read_table
202+
)sql";
203+
204+
auto queryClient = kikimr->GetQueryClient();
205+
TExecuteQueryResult queryResult = queryClient.ExecuteQuery(
206+
sql,
207+
TTxControl::NoTx(),
208+
TExecuteQuerySettings().StatsMode(EStatsMode::Full)).GetValueSync();
209+
210+
UNIT_ASSERT_VALUES_EQUAL_C(queryResult.GetStatus(), NYdb::EStatus::SUCCESS, queryResult.GetIssues().ToString());
211+
UNIT_ASSERT(queryResult.GetStats());
212+
UNIT_ASSERT(queryResult.GetStats()->GetPlan());
213+
Cerr << "Plan: " << *queryResult.GetStats()->GetPlan() << Endl;
214+
NJson::TJsonValue plan;
215+
UNIT_ASSERT(NJson::ReadJsonTree(*queryResult.GetStats()->GetPlan(), &plan));
216+
217+
const auto& writeStagePlan = plan["Plan"]["Plans"][0]["Plans"][0];
218+
UNIT_ASSERT_VALUES_EQUAL(writeStagePlan["Node Type"].GetStringSafe(), "Stage-Sink");
219+
UNIT_ASSERT_VALUES_EQUAL(writeStagePlan["Stats"]["Tasks"], 1);
220+
221+
const auto& readStagePlan = plan["Plan"]["Plans"][0]["Plans"][0]["Plans"][0]["Plans"][0];
222+
UNIT_ASSERT_VALUES_EQUAL(readStagePlan["Node Type"].GetStringSafe(), "Stage");
223+
UNIT_ASSERT_VALUES_EQUAL(readStagePlan["Stats"]["Tasks"], 1);
224+
}
162225
}
163226

164227
} // namespace NKikimr::NKqp

0 commit comments

Comments
 (0)