Skip to content

Commit 727613b

Browse files
authored
Allow sinks for data query (#8242)
1 parent d98c32b commit 727613b

File tree

3 files changed

+94
-2
lines changed

3 files changed

+94
-2
lines changed

ydb/core/kqp/opt/kqp_opt.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ bool IsKqpEffectsStage(const TDqStageBase& stage) {
8383
}
8484

8585
bool NeedSinks(const TKikimrTableDescription& table, const TKqpOptimizeContext& kqpCtx) {
86-
return kqpCtx.IsGenericQuery()
86+
return (kqpCtx.IsGenericQuery() || kqpCtx.IsDataQuery())
8787
&& (table.Metadata->Kind != EKikimrTableKind::Olap || kqpCtx.Config->EnableOlapSink)
8888
&& (table.Metadata->Kind != EKikimrTableKind::Datashard || kqpCtx.Config->EnableOltpSink);
8989
}

ydb/core/kqp/session_actor/kqp_session_actor.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1290,7 +1290,9 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
12901290

12911291
const bool useEvWrite = ((HasOlapTable && Settings.TableService.GetEnableOlapSink()) || (!HasOlapTable && Settings.TableService.GetEnableOltpSink()))
12921292
&& (request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_QUERY
1293-
|| request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY);
1293+
|| request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY
1294+
|| request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_DML
1295+
|| request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_PREPARED_DML);
12941296
auto executerActor = CreateKqpExecuter(std::move(request), Settings.Database,
12951297
QueryState ? QueryState->UserToken : TIntrusiveConstPtr<NACLib::TUserToken>(),
12961298
RequestCounters, Settings.TableService,

ydb/core/kqp/ut/query/kqp_query_ut.cpp

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1689,6 +1689,96 @@ Y_UNIT_TEST_SUITE(KqpQuery) {
16891689
CompareYson(output, R"([[1u;[1];["test1"]];[100u;[100];["test2"]]])");
16901690
}
16911691
}
1692+
1693+
Y_UNIT_TEST_TWIN(TableSink_ReplaceDataShardDataQuery, UseSink) {
1694+
NKikimrConfig::TAppConfig appConfig;
1695+
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(UseSink);
1696+
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
1697+
auto settings = TKikimrSettings()
1698+
.SetAppConfig(appConfig)
1699+
.SetWithSampleTables(false);
1700+
TKikimrRunner kikimr(settings);
1701+
Tests::NCommon::TLoggerInit(kikimr).Initialize();
1702+
1703+
auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();
1704+
1705+
const TString query = R"(
1706+
CREATE TABLE `/Root/DataShard` (
1707+
Col1 Uint32 NOT NULL,
1708+
Col2 String,
1709+
Col3 Int32 NOT NULL,
1710+
PRIMARY KEY (Col1)
1711+
)
1712+
WITH (
1713+
AUTO_PARTITIONING_BY_SIZE = DISABLED,
1714+
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 16,
1715+
AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = 16,
1716+
UNIFORM_PARTITIONS = 16);
1717+
1718+
CREATE TABLE `/Root/DataShard2` (
1719+
Col1 Uint32 NOT NULL,
1720+
Col2 String,
1721+
Col3 Int32 NOT NULL,
1722+
PRIMARY KEY (Col1)
1723+
)
1724+
WITH (
1725+
AUTO_PARTITIONING_BY_SIZE = DISABLED,
1726+
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 17,
1727+
AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = 17,
1728+
UNIFORM_PARTITIONS = 17);
1729+
)";
1730+
1731+
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
1732+
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
1733+
1734+
{
1735+
auto prepareResult = session.ExecuteDataQuery(R"(
1736+
REPLACE INTO `/Root/DataShard` (Col1, Col2, Col3) VALUES
1737+
(10u, "test1", 10), (20u, "test2", 11), (2147483647u, "test3", 12), (2147483640u, NULL, 13);
1738+
)", NYdb::NTable::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
1739+
UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
1740+
}
1741+
1742+
{
1743+
auto it = session.ExecuteDataQuery(R"(
1744+
SELECT COUNT(*) FROM `/Root/DataShard`;
1745+
)", NYdb::NTable::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
1746+
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
1747+
CompareYson(R"([[4u]])", FormatResultSetYson(it.GetResultSet(0)));
1748+
}
1749+
1750+
{
1751+
auto prepareResult = session.ExecuteDataQuery(R"(
1752+
REPLACE INTO `/Root/DataShard2` SELECT * FROM `/Root/DataShard`;
1753+
)", NYdb::NTable::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
1754+
UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
1755+
}
1756+
1757+
{
1758+
auto it = session.ExecuteDataQuery(R"(
1759+
SELECT COUNT(*) FROM `/Root/DataShard2`;
1760+
)", NYdb::NTable::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
1761+
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
1762+
CompareYson(R"([[4u]])", FormatResultSetYson(it.GetResultSet(0)));
1763+
}
1764+
1765+
{
1766+
auto prepareResult = session.ExecuteDataQuery(R"(
1767+
REPLACE INTO `/Root/DataShard2` (Col1, Col2, Col3) VALUES
1768+
(11u, "test1", 10), (21u, "test2", 11), (2147483646u, "test3", 12), (2147483641u, NULL, 13);
1769+
SELECT COUNT(*) FROM `/Root/DataShard`;
1770+
)", NYdb::NTable::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
1771+
UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
1772+
}
1773+
1774+
{
1775+
auto it = session.ExecuteDataQuery(R"(
1776+
SELECT COUNT(*) FROM `/Root/DataShard2`;
1777+
)", NYdb::NTable::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
1778+
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
1779+
CompareYson(R"([[8u]])", FormatResultSetYson(it.GetResultSet(0)));
1780+
}
1781+
}
16921782
}
16931783

16941784
} // namespace NKqp

0 commit comments

Comments
 (0)