Skip to content

Commit 7c90c71

Browse files
authored
Supported autopartition of topics in the workload (#7420)
1 parent 1dc8165 commit 7c90c71

File tree

8 files changed

+94
-21
lines changed

8 files changed

+94
-21
lines changed

ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,13 +91,18 @@ void TTopicOperationsScenario::InitStatsCollector()
9191
void TTopicOperationsScenario::CreateTopic(const TString& database,
9292
const TString& topic,
9393
ui32 partitionCount,
94-
ui32 consumerCount)
94+
ui32 consumerCount,
95+
bool autoscaling,
96+
ui32 maxPartitionCount,
97+
ui32 stabilizationWindowSeconds,
98+
ui32 upUtilizationPercent,
99+
ui32 downUtilizationPercent)
95100
{
96101
auto topicPath =
97102
TCommandWorkloadTopicDescribe::GenerateFullTopicName(database, topic);
98103

99104
EnsureTopicNotExist(topicPath);
100-
CreateTopic(topicPath, partitionCount, consumerCount);
105+
CreateTopic(topicPath, partitionCount, consumerCount, autoscaling, maxPartitionCount, stabilizationWindowSeconds, upUtilizationPercent, downUtilizationPercent);
101106
}
102107

103108
void TTopicOperationsScenario::DropTopic(const TString& database,
@@ -155,14 +160,32 @@ void TTopicOperationsScenario::EnsureTopicNotExist(const TString& topic)
155160

156161
void TTopicOperationsScenario::CreateTopic(const TString& topic,
157162
ui32 partitionCount,
158-
ui32 consumerCount)
163+
ui32 consumerCount,
164+
bool autoscaling,
165+
ui32 maxPartitionCount,
166+
ui32 stabilizationWindowSeconds,
167+
ui32 upUtilizationPercent,
168+
ui32 downUtilizationPercent)
159169
{
160170
Y_ABORT_UNLESS(Driver);
161171

162172
NTopic::TTopicClient client(*Driver);
163173

164174
NTopic::TCreateTopicSettings settings;
165-
settings.PartitioningSettings(partitionCount, partitionCount);
175+
if (autoscaling) {
176+
settings.BeginConfigurePartitioningSettings()
177+
.MinActivePartitions(partitionCount)
178+
.MaxActivePartitions(maxPartitionCount)
179+
.BeginConfigureAutoPartitioningSettings()
180+
.Strategy(NTopic::EAutoPartitioningStrategy::ScaleUpAndDown)
181+
.StabilizationWindow(TDuration::Seconds(stabilizationWindowSeconds))
182+
.UpUtilizationPercent(upUtilizationPercent)
183+
.DownUtilizationPercent(downUtilizationPercent)
184+
.EndConfigureAutoPartitioningSettings()
185+
.EndConfigurePartitioningSettings();
186+
} else {
187+
settings.PartitioningSettings(partitionCount, partitionCount);
188+
}
166189

167190
for (unsigned consumerIdx = 0; consumerIdx < consumerCount; ++consumerIdx) {
168191
settings
@@ -223,8 +246,12 @@ void TTopicOperationsScenario::StartConsumerThreads(std::vector<std::future<void
223246
void TTopicOperationsScenario::StartProducerThreads(std::vector<std::future<void>>& threads,
224247
ui32 partitionCount,
225248
ui32 partitionSeed,
226-
const std::vector<TString>& generatedMessages)
249+
const std::vector<TString>& generatedMessages,
250+
const TString& database)
227251
{
252+
auto describeTopicResult = TCommandWorkloadTopicDescribe::DescribeTopic(database, TopicName, *Driver);
253+
bool useAutoPartitioning = NYdb::NTopic::EAutoPartitioningStrategy::Disabled != describeTopicResult.GetPartitioningSettings().GetAutoPartitioningSettings().GetStrategy();
254+
228255
auto count = std::make_shared<std::atomic_uint>();
229256
for (ui32 writerIdx = 0; writerIdx < ProducerThreadCount; ++writerIdx) {
230257
TTopicWorkloadWriterParams writerParams{
@@ -236,6 +263,7 @@ void TTopicOperationsScenario::StartProducerThreads(std::vector<std::future<void
236263
.ErrorFlag = ErrorFlag,
237264
.StartedCount = count,
238265
.GeneratedMessages = generatedMessages,
266+
.Database = database,
239267
.TopicName = TopicName,
240268
.ByteRate = MessageRate != 0 ? MessageRate * MessageSize : ByteRate,
241269
.MessageSize = MessageSize,
@@ -245,7 +273,8 @@ void TTopicOperationsScenario::StartProducerThreads(std::vector<std::future<void
245273
.PartitionId = (partitionSeed + writerIdx) % partitionCount,
246274
.Direct = Direct,
247275
.Codec = Codec,
248-
.UseTransactions = UseTransactions
276+
.UseTransactions = UseTransactions,
277+
.UseAutoPartitioning = useAutoPartitioning
249278
};
250279

251280
threads.push_back(std::async([writerParams = std::move(writerParams)]() mutable { TTopicWorkloadWriterWorker::WriterLoop(writerParams); }));

ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,11 @@ class TTopicOperationsScenario {
5353
double Percentile = 99.0;
5454
TString TopicName;
5555
ui32 TopicPartitionCount = 1;
56+
bool TopicAutoscaling = false;
57+
ui32 TopicMaxPartitionCount = 100;
58+
ui32 StabilizationWindowSeconds = 15;
59+
ui32 UpUtilizationPercent = 90;
60+
ui32 DownUtilizationPercent = 30;
5661
ui32 ProducerThreadCount = 0;
5762
ui32 ConsumerThreadCount = 0;
5863
ui32 ConsumerCount = 0;
@@ -76,7 +81,12 @@ class TTopicOperationsScenario {
7681
void CreateTopic(const TString& database,
7782
const TString& topic,
7883
ui32 partitionCount,
79-
ui32 consumerCount);
84+
ui32 consumerCount,
85+
bool autoscaling = false,
86+
ui32 maxPartitionCount = 100,
87+
ui32 stabilizationWindowSeconds = 15,
88+
ui32 upUtilizationPercent = 90,
89+
ui32 downUtilizationPercent = 30);
8090
void DropTopic(const TString& database,
8191
const TString& topic);
8292

@@ -90,7 +100,8 @@ class TTopicOperationsScenario {
90100
void StartProducerThreads(std::vector<std::future<void>>& threads,
91101
ui32 partitionCount,
92102
ui32 partitionSeed,
93-
const std::vector<TString>& generatedMessages);
103+
const std::vector<TString>& generatedMessages,
104+
const TString& database);
94105
void JoinThreads(const std::vector<std::future<void>>& threads);
95106

96107
bool AnyErrors() const;
@@ -108,7 +119,12 @@ class TTopicOperationsScenario {
108119
void EnsureTopicNotExist(const TString& topic);
109120
void CreateTopic(const TString& topic,
110121
ui32 partitionCount,
111-
ui32 consumerCount);
122+
ui32 consumerCount,
123+
bool autoscaling,
124+
ui32 maxPartitionCount,
125+
ui32 stabilizationWindowSeconds,
126+
ui32 upUtilizationPercent,
127+
ui32 downUtilizationPercent);
112128

113129
static NTable::TSession GetSession(NTable::TTableClient& client);
114130

ydb/public/lib/ydb_cli/commands/topic_readwrite_scenario.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ int TTopicReadWriteScenario::DoRun(const TClientCommand::TConfig& config)
2121
std::vector<std::future<void>> threads;
2222

2323
StartConsumerThreads(threads, config.Database);
24-
StartProducerThreads(threads, partitionCount, partitionSeed, generatedMessages);
24+
StartProducerThreads(threads, partitionCount, partitionSeed, generatedMessages, config.Database);
2525

2626
StatsCollector->PrintWindowStatsLoop();
2727

ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_init.cpp

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ using namespace NYdb::NConsoleClient;
1010

1111
int TCommandWorkloadTopicInit::TScenario::DoRun(const TConfig& config)
1212
{
13-
CreateTopic(config.Database, TopicName, TopicPartitionCount, ConsumerCount);
13+
CreateTopic(config.Database, TopicName, TopicPartitionCount, ConsumerCount, TopicAutoscaling, TopicMaxPartitionCount, StabilizationWindowSeconds, UpUtilizationPercent, DownUtilizationPercent);
1414

1515
return EXIT_SUCCESS;
1616
}
@@ -39,6 +39,22 @@ void TCommandWorkloadTopicInit::Config(TConfig& config)
3939
config.Opts->AddLongOption('c', "consumers", "Number of consumers in the topic.")
4040
.DefaultValue(1)
4141
.StoreResult(&Scenario.ConsumerCount);
42+
43+
44+
config.Opts->AddLongOption('a', "auto-partitioning", "Enable autopartitioning of topic.")
45+
.DefaultValue(false)
46+
.StoreTrue(&Scenario.TopicAutoscaling);
47+
config.Opts->AddLongOption('m', "auto-partitioning-max-partitions-count", "Max number of partitions in the topic.")
48+
.StoreResult(&Scenario.TopicMaxPartitionCount);
49+
config.Opts->AddLongOption("auto-partitioning-stabilization-window-seconds", "Duration in seconds of high or low load before automatically scale the number of partitions")
50+
.Optional()
51+
.StoreResult(&Scenario.StabilizationWindowSeconds);
52+
config.Opts->AddLongOption("auto-partitioning-up-utilization-percent", "The load percentage at which the number of partitions will increase")
53+
.Optional()
54+
.StoreResult(&Scenario.UpUtilizationPercent);
55+
config.Opts->AddLongOption("auto-partitioning-down-utilization-percent", "The load percentage at which the number of partitions will decrease")
56+
.Optional()
57+
.StoreResult(&Scenario.DownUtilizationPercent);
4258
}
4359

4460
void TCommandWorkloadTopicInit::Parse(TConfig& config)

ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ void TTopicWorkloadReader::ReaderLoop(TTopicWorkloadReaderParams& params, TInsta
2626

2727
auto describeTopicResult = TCommandWorkloadTopicDescribe::DescribeTopic(params.Database, params.TopicName, params.Driver);
2828
NYdb::NTopic::TReadSessionSettings settings;
29+
settings.AutoPartitioningSupport(true);
2930

3031
if (!params.ReadWithoutConsumer) {
3132
auto consumerName = TCommandWorkloadTopicDescribe::GenerateConsumerName(params.ConsumerPrefix, params.ConsumerIdx);
@@ -45,8 +46,8 @@ void TTopicWorkloadReader::ReaderLoop(TTopicWorkloadReaderParams& params, TInsta
4546
}
4647
settings.WithoutConsumer().AppendTopics(topic);
4748
}
48-
49-
49+
50+
5051
if (params.UseTransactions) {
5152
txSupport.emplace(params.Driver, params.ReadOnlyTableName, params.TableName);
5253
}
@@ -97,8 +98,8 @@ void TTopicWorkloadReader::ReaderLoop(TTopicWorkloadReaderParams& params, TInsta
9798
txSupport->AppendRow(message.GetData());
9899
}
99100

100-
WRITE_LOG(params.Log, ELogPriority::TLOG_DEBUG, TStringBuilder() << "Got message: " << message.GetMessageGroupId()
101-
<< " topic " << message.GetPartitionSession()->GetTopicPath() << " partition " << message.GetPartitionSession()->GetPartitionId()
101+
WRITE_LOG(params.Log, ELogPriority::TLOG_DEBUG, TStringBuilder() << "Got message: " << message.GetMessageGroupId()
102+
<< " topic " << message.GetPartitionSession()->GetTopicPath() << " partition " << message.GetPartitionSession()->GetPartitionId()
102103
<< " offset " << message.GetOffset() << " seqNo " << message.GetSeqNo()
103104
<< " createTime " << message.GetCreateTime() << " fullTimeMs " << fullTime);
104105
}
@@ -126,6 +127,8 @@ void TTopicWorkloadReader::ReaderLoop(TTopicWorkloadReaderParams& params, TInsta
126127
WRITE_LOG(params.Log, ELogPriority::TLOG_ERR, TStringBuilder() << "Read session closed: " << closeSessionEvent->DebugString());
127128
*params.ErrorFlag = 1;
128129
break;
130+
} else if (auto* endPartitionStreamEvent = std::get_if<NYdb::NTopic::TReadSessionEvent::TEndPartitionSessionEvent>(&event)) {
131+
endPartitionStreamEvent->Confirm();
129132
} else if (auto* partitionStreamStatusEvent = std::get_if<NYdb::NTopic::TReadSessionEvent::TPartitionSessionStatusEvent>(&event)) {
130133
WRITE_LOG(params.Log, ELogPriority::TLOG_DEBUG, TStringBuilder() << partitionStreamStatusEvent->DebugString())
131134

ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.cpp

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "topic_workload_writer.h"
2+
#include "topic_workload_describe.h"
23

34
#include <util/generic/overloaded.h>
45

@@ -81,7 +82,7 @@ bool TTopicWorkloadWriterWorker::WaitForInitSeqNo()
8182

8283
void TTopicWorkloadWriterWorker::Process() {
8384
Sleep(TDuration::Seconds((float)Params.WarmupSec * Params.WriterIdx / Params.ProducerThreadCount));
84-
85+
8586
const TInstant endTime = TInstant::Now() + TDuration::Seconds(Params.TotalSec);
8687

8788
StartTimestamp = Now();
@@ -120,13 +121,13 @@ void TTopicWorkloadWriterWorker::Process() {
120121
writingAllowed &= BytesWritten < bytesMustBeWritten;
121122
WRITE_LOG(Params.Log, ELogPriority::TLOG_DEBUG, TStringBuilder() << "BytesWritten " << BytesWritten << " bytesMustBeWritten " << bytesMustBeWritten << " writingAllowed " << writingAllowed);
122123
}
123-
else
124+
else
124125
{
125126
writingAllowed &= InflightMessages.size() <= 1_MB / Params.MessageSize;
126127
WRITE_LOG(Params.Log, ELogPriority::TLOG_DEBUG, TStringBuilder() << "Inflight size " << InflightMessages.size() << " writingAllowed " << writingAllowed);
127128
}
128129

129-
if (writingAllowed)
130+
if (writingAllowed)
130131
{
131132
TString data = GetGeneratedMessage();
132133

@@ -142,7 +143,7 @@ void TTopicWorkloadWriterWorker::Process() {
142143
ContinuationToken.Clear();
143144
MessageId++;
144145
}
145-
else
146+
else
146147
Sleep(TDuration::MilliSeconds(1));
147148

148149
if (events.empty())
@@ -220,11 +221,17 @@ bool TTopicWorkloadWriterWorker::ProcessSessionClosedEvent(
220221

221222
void TTopicWorkloadWriterWorker::CreateWorker() {
222223
WRITE_LOG(Params.Log, ELogPriority::TLOG_INFO, TStringBuilder() << "Create writer worker for ProducerId " << Params.ProducerId << " PartitionId " << Params.PartitionId);
224+
223225
NYdb::NTopic::TWriteSessionSettings settings;
224226
settings.Codec((NYdb::NTopic::ECodec)Params.Codec);
225227
settings.Path(Params.TopicName);
226228
settings.ProducerId(Params.ProducerId);
227-
settings.PartitionId(Params.PartitionId);
229+
if (Params.UseAutoPartitioning) {
230+
settings.MessageGroupId(Params.ProducerId);
231+
} else {
232+
settings.PartitionId(Params.PartitionId);
233+
}
234+
228235
settings.DirectWriteToPartition(Params.Direct);
229236
WriteSession = NYdb::NTopic::TTopicClient(Params.Driver).CreateWriteSession(settings);
230237
}

ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ namespace NYdb {
2020
std::shared_ptr<std::atomic<bool>> ErrorFlag;
2121
std::shared_ptr<std::atomic_uint> StartedCount;
2222
const std::vector<TString>& GeneratedMessages;
23+
TString Database;
2324
TString TopicName;
2425
size_t ByteRate;
2526
size_t MessageSize;
@@ -30,6 +31,7 @@ namespace NYdb {
3031
bool Direct;
3132
ui32 Codec = 0;
3233
bool UseTransactions = false;
34+
bool UseAutoPartitioning = false;
3335
};
3436

3537
class TTopicWorkloadWriterWorker {

ydb/public/lib/ydb_cli/commands/topic_write_scenario.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ int TTopicWriteScenario::DoRun(const TClientCommand::TConfig& config)
2121
std::vector<std::future<void>> threads;
2222

2323
StartConsumerThreads(threads, config.Database);
24-
StartProducerThreads(threads, partitionCount, partitionSeed, generatedMessages);
24+
StartProducerThreads(threads, partitionCount, partitionSeed, generatedMessages, config.Database);
2525

2626
StatsCollector->PrintWindowStatsLoop();
2727

0 commit comments

Comments
 (0)