Skip to content

Commit 4089a29

Browse files
committed
Supported topic autopartitioning for the transfer (#19043)
1 parent 4a6f92d commit 4089a29

File tree

10 files changed

+194
-76
lines changed

10 files changed

+194
-76
lines changed

ydb/core/persqueue/pq_impl.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1802,8 +1802,6 @@ void TPersQueue::Handle(TEvPersQueue::TEvStatus::TPtr& ev, const TActorContext&
18021802
{
18031803
PQ_LOG_T("Handle TEvPersQueue::TEvStatus");
18041804

1805-
ReadBalancerActorId = ev->Sender;
1806-
18071805
if (!ConfigInited || !AllOriginalPartitionsInited()) {
18081806
PQ_LOG_D("Postpone the request." <<
18091807
" ConfigInited " << static_cast<int>(ConfigInited) <<

ydb/core/persqueue/read_balancer.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -534,6 +534,8 @@ void TPersQueueReadBalancer::RequestTabletIfNeeded(const ui64 tabletId, const TA
534534
} else {
535535
TActorId pipeClient = GetPipeClient(tabletId, ctx);
536536

537+
NTabletPipe::SendData(ctx, pipeClient, new TEvPQ::TEvSubDomainStatus(SubDomainOutOfSpace));
538+
537539
auto it = AggregatedStats.Cookies.find(tabletId);
538540
if (!pipeReconnected || it != AggregatedStats.Cookies.end()) {
539541
ui64 cookie;
@@ -548,8 +550,6 @@ void TPersQueueReadBalancer::RequestTabletIfNeeded(const ui64 tabletId, const TA
548550
TStringBuilder() << "Send TEvPersQueue::TEvStatus TabletId: " << tabletId << " Cookie: " << cookie);
549551
NTabletPipe::SendData(ctx, pipeClient, new TEvPersQueue::TEvStatus("", true), cookie);
550552
}
551-
552-
NTabletPipe::SendData(ctx, pipeClient, new TEvPQ::TEvSubDomainStatus(SubDomainOutOfSpace));
553553
}
554554
}
555555

ydb/core/transfer/transfer_writer.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -818,6 +818,10 @@ class TTransferWriter
818818
Send(Worker, new TEvWorker::TEvPoll());
819819
}
820820

821+
if (LastWriteTime) {
822+
LastWriteTime = TInstant::Now();
823+
}
824+
821825
return StartWork();
822826
}
823827

ydb/core/transfer/ut/common/utils.h

Lines changed: 46 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ struct MainTestCase {
168168
if (user) {
169169
config.SetAuthToken(TStringBuilder() << user.value() << "@builtin");
170170
}
171-
// config.SetLog(std::unique_ptr<TLogBackend>(CreateLogBackend("cerr", ELogPriority::TLOG_INFO).Release()))
171+
// config.SetLog(std::unique_ptr<TLogBackend>(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG).Release()));
172172
return config;
173173
}
174174

@@ -184,7 +184,6 @@ struct MainTestCase {
184184
, TransferName(TStringBuilder() << "Transfer_" << Id)
185185
, Driver(CreateDriverConfig(ConnectionString, user))
186186
, TableClient(Driver)
187-
, Session(TableClient.GetSession().GetValueSync().GetSession())
188187
, TopicClient(Driver)
189188
{
190189
}
@@ -193,9 +192,13 @@ struct MainTestCase {
193192
Driver.Stop(true);
194193
}
195194

195+
auto Session() {
196+
return TableClient.GetSession().GetValueSync().GetSession();
197+
}
198+
196199
void ExecuteDDL(const std::string& ddl, bool checkResult = true, const std::optional<std::string> expectedMessage = std::nullopt) {
197200
Cerr << "DDL: " << ddl << Endl << Flush;
198-
auto res = Session.ExecuteQuery(ddl, TTxControl::NoTx()).GetValueSync();
201+
auto res = Session().ExecuteQuery(ddl, TTxControl::NoTx()).GetValueSync();
199202
if (checkResult) {
200203
if (expectedMessage) {
201204
UNIT_ASSERT(!res.IsSuccess());
@@ -211,7 +214,7 @@ struct MainTestCase {
211214
auto ExecuteQuery(const std::string& query, bool retry = true) {
212215
for (size_t i = 10; i--;) {
213216
Cerr << ">>>>> Query: " << query << Endl << Flush;
214-
auto res = Session.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync();
217+
auto res = Session().ExecuteQuery(query, TTxControl::NoTx()).GetValueSync();
215218
if (!res.IsSuccess()) {
216219
Cerr << ">>>>> Query error: " << res.GetIssues().ToString() << Endl << Flush;
217220
}
@@ -282,13 +285,41 @@ struct MainTestCase {
282285
)", SourceTableName.data(), ChangefeedName.data()));
283286
}
284287

288+
struct CreatTopicSettings {
289+
size_t MinPartitionCount = 1;
290+
size_t MaxPartitionCount = 100;
291+
bool AutoPartitioningEnabled = false;
292+
};
293+
285294
void CreateTopic(size_t partitionCount = 10) {
286-
ExecuteDDL(Sprintf(R"(
287-
CREATE TOPIC `%s`
288-
WITH (
289-
min_active_partitions = %d
290-
);
291-
)", TopicName.data(), partitionCount));
295+
CreateTopic({
296+
.MinPartitionCount = partitionCount
297+
});
298+
}
299+
300+
void CreateTopic(const CreatTopicSettings& settings) {
301+
if (settings.AutoPartitioningEnabled) {
302+
ExecuteDDL(Sprintf(R"(
303+
CREATE TOPIC `%s`
304+
WITH (
305+
MIN_ACTIVE_PARTITIONS = %d,
306+
MAX_ACTIVE_PARTITIONS = %d,
307+
AUTO_PARTITIONING_STRATEGY = 'UP',
308+
auto_partitioning_down_utilization_percent = 1,
309+
auto_partitioning_up_utilization_percent=2,
310+
auto_partitioning_stabilization_window = Interval('PT1S'),
311+
partition_write_speed_bytes_per_second = 3
312+
);
313+
)", TopicName.data(), settings.MinPartitionCount, settings.MaxPartitionCount));
314+
} else {
315+
ExecuteDDL(Sprintf(R"(
316+
CREATE TOPIC `%s`
317+
WITH (
318+
MIN_ACTIVE_PARTITIONS = %d
319+
320+
);
321+
)", TopicName.data(), settings.MinPartitionCount));
322+
}
292323
}
293324

294325
void DropTopic() {
@@ -427,7 +458,7 @@ struct MainTestCase {
427458
setOptions = TStringBuilder() << "SET (" << sb << " )";
428459
}
429460

430-
auto res = Session.ExecuteQuery(Sprintf(R"(
461+
auto res = Session().ExecuteQuery(Sprintf(R"(
431462
%s;
432463
433464
ALTER TRANSFER `%s`
@@ -474,7 +505,7 @@ struct MainTestCase {
474505
settings.IncludeStats(true);
475506

476507
auto c = TopicClient.DescribeConsumer(TopicName, consumerName, settings).GetValueSync();
477-
UNIT_ASSERT(c.IsSuccess());
508+
UNIT_ASSERT_C(c.IsSuccess(), c.GetIssues().ToOneLineString());
478509
return c;
479510
}
480511

@@ -554,7 +585,9 @@ struct MainTestCase {
554585
settings.IncludeLocation(true);
555586
settings.IncludeStats(true);
556587

557-
return TopicClient.DescribeTopic(TopicName, settings).ExtractValueSync();
588+
auto result = TopicClient.DescribeTopic(TopicName, settings).ExtractValueSync();
589+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToOneLineString());
590+
return result;
558591
}
559592

560593
void CreateUser(const std::string& username) {
@@ -704,7 +737,6 @@ struct MainTestCase {
704737

705738
TDriver Driver;
706739
TQueryClient TableClient;
707-
TSession Session;
708740
TTopicClient TopicClient;
709741
};
710742

0 commit comments

Comments
 (0)