Skip to content

Commit 618f550

Browse files
authored
fixed CDC write to the autopartitioning topic (#20243)
1 parent 26e438e commit 618f550

File tree

2 files changed

+48
-22
lines changed

2 files changed

+48
-22
lines changed

ydb/core/persqueue/ut/slow/autopartitioning_ut.cpp

Lines changed: 44 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,20 @@ Y_UNIT_TEST_SUITE(SlowTopicAutopartitioning) {
2424
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
2525
}
2626

27-
void ExecuteDataQuery(NYdb::NTable::TSession& session, const TString& query ) {
28-
const auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx().CommitTx()).GetValueSync();
29-
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
27+
void ExecuteDataQuery(NYdb::NTable::TSession& session, const TString& query) {
28+
TString error;
29+
for (size_t i = 0; i < 20; ++i) {
30+
const auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx().CommitTx()).GetValueSync();
31+
if (NYdb::EStatus::SUCCESS != result.GetStatus()) {
32+
error = result.GetIssues().ToString();
33+
Sleep(TDuration::MilliSeconds(500));
34+
continue;
35+
}
36+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
37+
return;
38+
}
39+
40+
UNIT_ASSERT_C(false, "ExecuteDataQuery error: " << error);
3041
}
3142

3243
ui64 GetBalancerTabletId(TTopicSdkTestSetup& setup, const TString& topicPath) {
@@ -68,6 +79,8 @@ Y_UNIT_TEST_SUITE(SlowTopicAutopartitioning) {
6879

6980
size_t count = 0;
7081

82+
std::set<ui32> partitions;
83+
7184
auto reader = client.CreateReadSession(
7285
TReadSessionSettings()
7386
.AutoPartitioningSupport(true)
@@ -78,6 +91,7 @@ Y_UNIT_TEST_SUITE(SlowTopicAutopartitioning) {
7891
if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent>(&event)) {
7992
count += x->GetMessages().size();
8093
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&event)) {
94+
partitions.insert(x->GetPartitionSession()->GetPartitionId());
8195
x->Confirm();
8296
Cerr << ">>>>> " << x->DebugString() << Endl << Flush;
8397
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(&event)) {
@@ -105,6 +119,8 @@ Y_UNIT_TEST_SUITE(SlowTopicAutopartitioning) {
105119
}
106120

107121
UNIT_ASSERT_VALUES_EQUAL(expected, count);
122+
UNIT_ASSERT_C(partitions.size() > 1, "Split must be happened");
123+
Cerr << ">>>>> Partition count: " << partitions.size() << Endl << Flush;
108124
}
109125

110126
Y_UNIT_TEST(CDC_Write) {
@@ -121,9 +137,12 @@ Y_UNIT_TEST_SUITE(SlowTopicAutopartitioning) {
121137
value Text,
122138
PRIMARY KEY (id, order)
123139
) WITH (
124-
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 64,
140+
AUTO_PARTITIONING_BY_SIZE = ENABLED,
141+
AUTO_PARTITIONING_PARTITION_SIZE_MB = 5,
142+
AUTO_PARTITIONING_BY_LOAD = ENABLED,
143+
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 1,
125144
AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = 64,
126-
UNIFORM_PARTITIONS = 64
145+
UNIFORM_PARTITIONS = 1
127146
);
128147
)");
129148

@@ -134,7 +153,8 @@ Y_UNIT_TEST_SUITE(SlowTopicAutopartitioning) {
134153
MODE = 'UPDATES',
135154
FORMAT = 'JSON',
136155
TOPIC_AUTO_PARTITIONING = 'ENABLED',
137-
TOPIC_MIN_ACTIVE_PARTITIONS = 2
156+
TOPIC_MIN_ACTIVE_PARTITIONS = 1,
157+
TOPIC_MAX_ACTIVE_PARTITIONS = 100
138158
);
139159
)");
140160

@@ -143,10 +163,10 @@ Y_UNIT_TEST_SUITE(SlowTopicAutopartitioning) {
143163
alterSettings
144164
.BeginAlterPartitioningSettings()
145165
.MinActivePartitions(1)
146-
.MaxActivePartitions(10000)
166+
.MaxActivePartitions(100)
147167
.BeginAlterAutoPartitioningSettings()
148168
.Strategy(EAutoPartitioningStrategy::ScaleUp)
149-
.StabilizationWindow(TDuration::Seconds(1))
169+
.StabilizationWindow(TDuration::Seconds(15))
150170
.DownUtilizationPercent(1)
151171
.UpUtilizationPercent(2)
152172
.EndAlterAutoPartitioningSettings()
@@ -162,23 +182,25 @@ Y_UNIT_TEST_SUITE(SlowTopicAutopartitioning) {
162182
}
163183

164184
Cerr << ">>>>> " << TInstant::Now() << " Start table insert" << Endl << Flush;
165-
ExecuteDataQuery(session, R"(
166-
--!syntax_v1
167-
$sample = AsList(
168-
AsStruct(ListFromRange(0, 150000) AS v)
169-
);
185+
for (size_t i = 0; i < 50; ++i) {
186+
ExecuteDataQuery(session, R"(
187+
--!syntax_v1
188+
$sample = AsList(
189+
AsStruct(ListFromRange(0, 5000) AS v)
190+
);
170191
171-
UPSERT INTO `/Root/origin` (id, order, value)
172-
SELECT
173-
RandomNumber(v) AS id,
174-
v AS order,
175-
CAST('0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF' AS Utf8?) AS value
176-
FROM as_table($sample)
177-
FLATTEN BY (v);
178-
)");
192+
UPSERT INTO `/Root/origin` (id, order, value)
193+
SELECT
194+
RandomNumber(v) AS id,
195+
CAST(v AS Uint64) AS order,
196+
CAST('0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF' AS Utf8?) AS value
197+
FROM as_table($sample)
198+
FLATTEN BY (v);
199+
)");
200+
}
179201

180202
Cerr << ">>>>> " << TInstant::Now() << " Start read topic" << Endl << Flush;
181-
AssertMessageCountInTopic(client, "/Root/origin/feed/streamImpl", 150000);
203+
AssertMessageCountInTopic(client, "/Root/origin/feed/streamImpl", 250000);
182204
Cerr << ">>>>> " << TInstant::Now() << " End" << Endl << Flush;
183205
}
184206
}

ydb/core/tx/datashard/change_exchange_split.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,10 @@ class TCdcWorker
360360
const auto partitionId = partition.GetPartitionId();
361361
const auto tabletId = partition.GetTabletId();
362362

363+
if (NKikimrPQ::ETopicPartitionStatus::Active != partition.GetStatus()) {
364+
continue;
365+
}
366+
363367
auto it = Workers.find(partitionId);
364368
if (it != Workers.end()) {
365369
workers.emplace(partitionId, it->second);

0 commit comments

Comments
 (0)