Skip to content

Commit 23208c8

Browse files
Fix issue #9461 with altering CDC streams (#11077) (#11184)
1 parent 66baaeb commit 23208c8

File tree

3 files changed

+85
-4
lines changed

3 files changed

+85
-4
lines changed

ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3126,7 +3126,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
31263126
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
31273127
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
31283128
}
3129-
3129+
31303130
{
31313131
auto result = client.ExecuteQuery(R"(
31323132
SELECT COUNT(*) FROM `/Root/DataShard` WHERE Col3 = 1;
@@ -4560,6 +4560,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
45604560
CheckDirEntry(kikimr, entriesToCheck);
45614561
}
45624562
}
4563+
45634564
Y_UNIT_TEST(CreateOrDropTopicOverTable) {
45644565
NKikimrConfig::TAppConfig appConfig;
45654566
appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);
@@ -4631,6 +4632,65 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
46314632
}
46324633
}
46334634

4635+
Y_UNIT_TEST(AlterCdcTopic) {
4636+
NKikimrConfig::TAppConfig appConfig;
4637+
appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);
4638+
auto setting = NKikimrKqp::TKqpSetting();
4639+
auto serverSettings = TKikimrSettings()
4640+
.SetAppConfig(appConfig)
4641+
.SetKqpSettings({setting});
4642+
TKikimrRunner kikimr{serverSettings};
4643+
auto tableClient = kikimr.GetTableClient();
4644+
4645+
{
4646+
auto tcSession = tableClient.CreateSession().GetValueSync().GetSession();
4647+
UNIT_ASSERT(tcSession.ExecuteSchemeQuery(R"(
4648+
CREATE TABLE `/Root/TmpTable` (
4649+
Key Uint64,
4650+
Value String,
4651+
PRIMARY KEY (Key)
4652+
);
4653+
)").GetValueSync().IsSuccess());
4654+
4655+
UNIT_ASSERT(tcSession.ExecuteSchemeQuery(R"(
4656+
ALTER TABLE `/Root/TmpTable` ADD CHANGEFEED `feed` WITH (
4657+
MODE = 'KEYS_ONLY', FORMAT = 'JSON'
4658+
);
4659+
)").GetValueSync().IsSuccess());
4660+
tcSession.Close();
4661+
}
4662+
4663+
auto pq = NYdb::NTopic::TTopicClient(kikimr.GetDriver(),
4664+
NYdb::NTopic::TTopicClientSettings().Database("/Root").AuthToken("root@builtin"));
4665+
4666+
auto client = kikimr.GetQueryClient(NYdb::NQuery::TClientSettings{}.AuthToken("root@builtin"));
4667+
auto session = client.GetSession().GetValueSync().GetSession();
4668+
{
4669+
4670+
const auto query = Q_(R"(
4671+
--!syntax_v1
4672+
ALTER TOPIC `/Root/TmpTable/feed` ADD CONSUMER consumer21;
4673+
)");
4674+
4675+
RunQuery(query, session);
4676+
auto desc = pq.DescribeTopic("/Root/TmpTable/feed").ExtractValueSync();
4677+
const auto& consumers = desc.GetTopicDescription().GetConsumers();
4678+
UNIT_ASSERT_VALUES_EQUAL(consumers.size(), 1);
4679+
UNIT_ASSERT_VALUES_EQUAL(consumers[0].GetConsumerName(), "consumer21");
4680+
4681+
}
4682+
{
4683+
const auto query = Q_(R"(
4684+
--!syntax_v1
4685+
ALTER TOPIC `/Root/TmpTable/feed` SET (min_active_partitions = 10);
4686+
)");
4687+
RunQuery(query, session, false);
4688+
auto desc = pq.DescribeTopic("/Root/TmpTable/feed").ExtractValueSync();
4689+
UNIT_ASSERT_VALUES_EQUAL(desc.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions(), 1);
4690+
}
4691+
4692+
}
4693+
46344694
Y_UNIT_TEST(TableSink_OlapRWQueries) {
46354695
NKikimrConfig::TAppConfig appConfig;
46364696
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
@@ -4737,7 +4797,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
47374797
auto it = client.StreamExecuteQuery(R"sql(
47384798
SELECT r.Col3
47394799
FROM `/Root/DataShard` AS r
4740-
JOIN `/Root/ColumnShard` AS c
4800+
JOIN `/Root/ColumnShard` AS c
47414801
ON r.Col1 = c.Col1;
47424802
)sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
47434803
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());

ydb/services/lib/actors/pq_schema_actor.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -544,6 +544,10 @@ namespace NKikimr::NGRpcProxy::V1 {
544544
return path;
545545
}
546546

547+
const TMaybe<TString>& GetCdcStreamName() const {
548+
return CdcStreamName;
549+
}
550+
547551
void SendDescribeProposeRequest(bool showPrivate = false) {
548552
return TBase::SendDescribeProposeRequest(this->ActorContext(), showPrivate);
549553
}
@@ -599,6 +603,10 @@ namespace NKikimr::NGRpcProxy::V1 {
599603
if (static_cast<TDerived*>(this)->IsCdcStreamCompatible()) {
600604
Y_ABORT_UNLESS(response.ListNodeEntry->Children.size() == 1);
601605
PrivateTopicName = response.ListNodeEntry->Children.at(0).Name;
606+
607+
if (response.Self) {
608+
CdcStreamName = response.Self->Info.GetName();
609+
}
602610
SendDescribeProposeRequest(true);
603611
return true;
604612
}
@@ -616,6 +624,8 @@ namespace NKikimr::NGRpcProxy::V1 {
616624
TIntrusiveConstPtr<NSchemeCache::TSchemeCacheNavigate::TPQGroupInfo> PQGroupInfo;
617625
TIntrusiveConstPtr<NSchemeCache::TSchemeCacheNavigate::TDirEntryInfo> Self;
618626
TMaybe<TString> PrivateTopicName;
627+
TMaybe<TString> CdcStreamName;
628+
619629
};
620630

621631
}

ydb/services/persqueue_v1/actors/schema_actors.cpp

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1587,7 +1587,18 @@ void TAlterTopicActorInternal::HandleCacheNavigateResponse(TEvTxProxySchemeCache
15871587
}
15881588
TUpdateSchemeBase::HandleCacheNavigateResponse(ev);
15891589
auto& schemeTx = Response->Response.ModifyScheme;
1590-
FillModifyScheme(schemeTx, ActorContext(), GetRequest().WorkingDir, GetRequest().Name);
1590+
std::pair <TString, TString> pathPair;
1591+
try {
1592+
pathPair = NKikimr::NGRpcService::SplitPath(GetTopicPath());
1593+
} catch (const std::exception &ex) {
1594+
Response->Response.Issues.AddIssue(NYql::ExceptionToIssue(ex));
1595+
RespondWithCode(Ydb::StatusIds::BAD_REQUEST);
1596+
return;
1597+
}
1598+
1599+
const auto& workingDir = pathPair.first;
1600+
const auto& name = pathPair.second;
1601+
FillModifyScheme(schemeTx, ActorContext(), workingDir, name);
15911602
}
15921603

15931604
void TAlterTopicActorInternal::ModifyPersqueueConfig(
@@ -1601,7 +1612,7 @@ void TAlterTopicActorInternal::ModifyPersqueueConfig(
16011612
TString error;
16021613
Y_UNUSED(selfInfo);
16031614

1604-
auto status = FillProposeRequestImpl(GetRequest().Request, groupConfig, appData, error, false);
1615+
auto status = FillProposeRequestImpl(GetRequest().Request, groupConfig, appData, error, GetCdcStreamName().Defined());
16051616
if (!error.empty()) {
16061617
Response->Response.Issues.AddIssue(error);
16071618
}

0 commit comments

Comments
 (0)