Skip to content

Commit 7cd8a2c

Browse files
writing to a topic in transaction using ydb-cli (#7872)
1 parent 27fc439 commit 7cd8a2c

11 files changed

+218
-34
lines changed

ydb/apps/ydb/ut/run_ydb.cpp

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ TString RunYdb(const TList<TString>& args1, const TList<TString>& args2, bool ch
5151
return command.GetOutput();
5252
}
5353

54-
ui64 GetFullTimeValue(const TString& output)
54+
ui64 GetMostRecentValue(const TString& output)
5555
{
5656
TVector<TString> lines, columns;
5757

@@ -61,6 +61,16 @@ ui64 GetFullTimeValue(const TString& output)
6161
return FromString<ui64>(columns.back());
6262
}
6363

64+
ui64 GetFullTimeValue(const TString& output)
65+
{
66+
return GetMostRecentValue(output);
67+
}
68+
69+
ui64 GetCommitTimeValue(const TString& output)
70+
{
71+
return GetMostRecentValue(output);
72+
}
73+
6474
THashSet<TString> GetCodecsList(const TString& output)
6575
{
6676
THashSet<TString> result;

ydb/apps/ydb/ut/run_ydb.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ TString RunYdb(const TList<TString>& args1, const TList<TString>& args2, bool ch
1111
ui64 GetFullTimeValue(const TString& output);
1212
THashSet<TString> GetCodecsList(const TString& output);
1313

14+
ui64 GetCommitTimeValue(const TString& output);
15+
1416
void EnsureStatisticsColumns(const TList<TString>& args,
1517
const TVector<TString>& columns1,
1618
const TVector<TString>& columns2);

ydb/apps/ydb/ut/workload-topic.cpp

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,4 +126,47 @@ Y_UNIT_TEST(ReadWrite_Statistics)
126126
{"#", "msg/s", "MB/s", "percentile,ms", "percentile,msg", "percentile,msg", "percentile,ms", "msg/s", "MB/s", "percentile,ms"});
127127
}
128128

129+
Y_UNIT_TEST(Write_Statistics_UseTx)
130+
{
131+
EnsureStatisticsColumns({"run", "write", "-s", "1", "--warmup", "0", "--use-tx"},
132+
{"Window", "Write speed", "Write time", "Inflight", "Select time", "Upsert time", "Commit time"},
133+
{"#", "msg/s", "MB/s", "percentile,ms", "percentile,msg", "percentile,ms", "percentile,ms", "percentile,ms"});
134+
}
135+
136+
Y_UNIT_TEST(WriteInTx)
137+
{
138+
// In the test, 6 writers write messages within 10 seconds.
139+
// Then the number of recorded messages is checked. Commit transactions every second.
140+
// It is expected that at least 60 messages will be written.
141+
142+
ExecYdb({"init",
143+
"--partitions", "3"});
144+
145+
auto output = ExecYdb({"run", "write",
146+
"--threads", "6",
147+
"--byte-rate", "102400",
148+
"--message-size", "10240",
149+
"--use-tx",
150+
"--commit-messages", "10",
151+
"--warmup", "2",
152+
"--seconds", "10"});
153+
ui64 commitTimeValue = GetCommitTimeValue(output);
154+
155+
output = RunYdb({},
156+
{"topic", "read", "workload-topic",
157+
"--consumer", "workload-consumer-0",
158+
"--commit", "0",
159+
"--format", "newline-delimited",
160+
"--limit", "200"});
161+
TVector<TString> lines;
162+
Split(output, "\n", lines);
163+
164+
ExecYdb({"clean"});
165+
166+
// The value in the 'Commit time` column is greater than 0
167+
UNIT_ASSERT_GT(commitTimeValue, 0);
168+
169+
UNIT_ASSERT_GE(lines.size(), 60);
170+
}
171+
129172
}

ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -274,10 +274,12 @@ void TTopicOperationsScenario::StartProducerThreads(std::vector<std::future<void
274274
.Direct = Direct,
275275
.Codec = Codec,
276276
.UseTransactions = UseTransactions,
277-
.UseAutoPartitioning = useAutoPartitioning
277+
.UseAutoPartitioning = useAutoPartitioning,
278+
.CommitPeriod = CommitPeriod,
279+
.CommitMessages = CommitMessages
278280
};
279281

280-
threads.push_back(std::async([writerParams = std::move(writerParams)]() mutable { TTopicWorkloadWriterWorker::WriterLoop(writerParams); }));
282+
threads.push_back(std::async([writerParams = std::move(writerParams)]() mutable { TTopicWorkloadWriterWorker::RetryableWriterLoop(writerParams); }));
281283
}
282284

283285
while (*count != ProducerThreadCount) {

ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -195,9 +195,9 @@ void TTopicWorkloadReader::TryCommitTableChanges(TTopicWorkloadReaderParams& par
195195

196196
auto execTimes = txSupport->CommitTx(params.UseTableSelect, params.UseTableUpsert);
197197

198-
params.StatsCollector->AddSelectEvent(params.ReaderIdx, {execTimes.SelectTime.MilliSeconds()});
199-
params.StatsCollector->AddUpsertEvent(params.ReaderIdx, {execTimes.UpsertTime.MilliSeconds()});
200-
params.StatsCollector->AddCommitTxEvent(params.ReaderIdx, {execTimes.CommitTime.MilliSeconds()});
198+
params.StatsCollector->AddReaderSelectEvent(params.ReaderIdx, {execTimes.SelectTime.MilliSeconds()});
199+
params.StatsCollector->AddReaderUpsertEvent(params.ReaderIdx, {execTimes.UpsertTime.MilliSeconds()});
200+
params.StatsCollector->AddReaderCommitTxEvent(params.ReaderIdx, {execTimes.CommitTime.MilliSeconds()});
201201
}
202202

203203
void TTopicWorkloadReader::GracefullShutdown(TVector<NYdb::NTopic::TReadSessionEvent::TStopPartitionSessionEvent>& stopPartitionSessionEvents)

ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader_transaction_support.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ void TTransactionSupport::CreateSession()
8181
TDuration TTransactionSupport::SelectFromTable()
8282
{
8383
Y_ABORT_UNLESS(Transaction);
84+
Y_ABORT_UNLESS(ReadOnlyTableName);
8485

8586
ui64 left = RandomNumber<ui64>();
8687
ui64 right = RandomNumber<ui64>();
@@ -125,6 +126,7 @@ TDuration TTransactionSupport::SelectFromTable()
125126
TDuration TTransactionSupport::UpsertIntoTable()
126127
{
127128
Y_ABORT_UNLESS(Transaction);
129+
Y_ABORT_UNLESS(WriteOnlyTableName);
128130

129131
TString query = " \
130132
DECLARE $rows AS List<Struct< \

ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_write.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,17 @@ void TCommandWorkloadTopicRunWrite::Config(TConfig& config)
6161

6262
config.Opts->MutuallyExclusive("message-rate", "byte-rate");
6363

64+
config.Opts->AddLongOption("use-tx", "Use transactions.")
65+
.Optional()
66+
.DefaultValue(false)
67+
.StoreTrue(&Scenario.UseTransactions);
68+
config.Opts->AddLongOption("commit-period", "Waiting time between commit.")
69+
.DefaultValue(10)
70+
.StoreResult(&Scenario.CommitPeriod);
71+
config.Opts->AddLongOption("commit-messages", "Number of messages per transaction")
72+
.DefaultValue(1'000'000)
73+
.StoreResult(&Scenario.CommitMessages);
74+
6475
config.IsNetworkIntensive = true;
6576
}
6677

ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,17 @@ TTopicWorkloadStatsCollector::TTopicWorkloadStatsCollector(
2727
{
2828
for (size_t writerIdx = 0; writerIdx < writerCount; writerIdx++) {
2929
AddQueue(WriterEventQueues);
30+
AddQueue(WriterSelectEventQueues);
31+
AddQueue(WriterUpsertEventQueues);
32+
AddQueue(WriterCommitTxEventQueues);
3033
}
3134

3235
for (size_t readerIdx = 0; readerIdx < readerCount; readerIdx++) {
3336
AddQueue(ReaderEventQueues);
3437
AddQueue(LagEventQueues);
35-
AddQueue(SelectEventQueues);
36-
AddQueue(UpsertEventQueues);
37-
AddQueue(CommitTxEventQueues);
38+
AddQueue(ReaderSelectEventQueues);
39+
AddQueue(ReaderUpsertEventQueues);
40+
AddQueue(ReaderCommitTxEventQueues);
3841
}
3942
}
4043

@@ -158,9 +161,12 @@ void TTopicWorkloadStatsCollector::CollectThreadEvents()
158161
CollectThreadEvents(WriterEventQueues);
159162
CollectThreadEvents(ReaderEventQueues);
160163
CollectThreadEvents(LagEventQueues);
161-
CollectThreadEvents(SelectEventQueues);
162-
CollectThreadEvents(UpsertEventQueues);
163-
CollectThreadEvents(CommitTxEventQueues);
164+
CollectThreadEvents(ReaderSelectEventQueues);
165+
CollectThreadEvents(ReaderUpsertEventQueues);
166+
CollectThreadEvents(ReaderCommitTxEventQueues);
167+
CollectThreadEvents(WriterSelectEventQueues);
168+
CollectThreadEvents(WriterUpsertEventQueues);
169+
CollectThreadEvents(WriterCommitTxEventQueues);
164170
}
165171

166172
template<class T>
@@ -205,19 +211,34 @@ void TTopicWorkloadStatsCollector::AddLagEvent(size_t readerIdx, const TTopicWor
205211
AddEvent(readerIdx, LagEventQueues, event);
206212
}
207213

208-
void TTopicWorkloadStatsCollector::AddSelectEvent(size_t readerIdx, const TTopicWorkloadStats::SelectEvent& event)
214+
void TTopicWorkloadStatsCollector::AddReaderSelectEvent(size_t readerIdx, const TTopicWorkloadStats::SelectEvent& event)
209215
{
210-
AddEvent(readerIdx, SelectEventQueues, event);
216+
AddEvent(readerIdx, ReaderSelectEventQueues, event);
211217
}
212218

213-
void TTopicWorkloadStatsCollector::AddUpsertEvent(size_t readerIdx, const TTopicWorkloadStats::UpsertEvent& event)
219+
void TTopicWorkloadStatsCollector::AddReaderUpsertEvent(size_t readerIdx, const TTopicWorkloadStats::UpsertEvent& event)
214220
{
215-
AddEvent(readerIdx, UpsertEventQueues, event);
221+
AddEvent(readerIdx, ReaderUpsertEventQueues, event);
216222
}
217223

218-
void TTopicWorkloadStatsCollector::AddCommitTxEvent(size_t readerIdx, const TTopicWorkloadStats::CommitTxEvent& event)
224+
void TTopicWorkloadStatsCollector::AddReaderCommitTxEvent(size_t readerIdx, const TTopicWorkloadStats::CommitTxEvent& event)
219225
{
220-
AddEvent(readerIdx, CommitTxEventQueues, event);
226+
AddEvent(readerIdx, ReaderCommitTxEventQueues, event);
227+
}
228+
229+
void TTopicWorkloadStatsCollector::AddWriterSelectEvent(size_t writerIdx, const TTopicWorkloadStats::SelectEvent& event)
230+
{
231+
AddEvent(writerIdx, WriterSelectEventQueues, event);
232+
}
233+
234+
void TTopicWorkloadStatsCollector::AddWriterUpsertEvent(size_t writerIdx, const TTopicWorkloadStats::UpsertEvent& event)
235+
{
236+
AddEvent(writerIdx, WriterUpsertEventQueues, event);
237+
}
238+
239+
void TTopicWorkloadStatsCollector::AddWriterCommitTxEvent(size_t writerIdx, const TTopicWorkloadStats::CommitTxEvent& event)
240+
{
241+
AddEvent(writerIdx, WriterCommitTxEventQueues, event);
221242
}
222243

223244
template<class T>

ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.h

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,12 @@ namespace NYdb {
2626
void AddWriterEvent(size_t writerIdx, const TTopicWorkloadStats::WriterEvent& event);
2727
void AddReaderEvent(size_t readerIdx, const TTopicWorkloadStats::ReaderEvent& event);
2828
void AddLagEvent(size_t readerIdx, const TTopicWorkloadStats::LagEvent& event);
29-
void AddSelectEvent(size_t readerIdx, const TTopicWorkloadStats::SelectEvent& event);
30-
void AddUpsertEvent(size_t readerIdx, const TTopicWorkloadStats::UpsertEvent& event);
31-
void AddCommitTxEvent(size_t readerIdx, const TTopicWorkloadStats::CommitTxEvent& event);
29+
void AddReaderSelectEvent(size_t readerIdx, const TTopicWorkloadStats::SelectEvent& event);
30+
void AddReaderUpsertEvent(size_t readerIdx, const TTopicWorkloadStats::UpsertEvent& event);
31+
void AddReaderCommitTxEvent(size_t readerIdx, const TTopicWorkloadStats::CommitTxEvent& event);
32+
void AddWriterSelectEvent(size_t readerIdx, const TTopicWorkloadStats::SelectEvent& event);
33+
void AddWriterUpsertEvent(size_t readerIdx, const TTopicWorkloadStats::UpsertEvent& event);
34+
void AddWriterCommitTxEvent(size_t readerIdx, const TTopicWorkloadStats::CommitTxEvent& event);
3235

3336
ui64 GetTotalReadMessages() const;
3437
ui64 GetTotalWriteMessages() const;
@@ -57,9 +60,12 @@ namespace NYdb {
5760
TEventQueues<TTopicWorkloadStats::WriterEvent> WriterEventQueues;
5861
TEventQueues<TTopicWorkloadStats::ReaderEvent> ReaderEventQueues;
5962
TEventQueues<TTopicWorkloadStats::LagEvent> LagEventQueues;
60-
TEventQueues<TTopicWorkloadStats::SelectEvent> SelectEventQueues;
61-
TEventQueues<TTopicWorkloadStats::UpsertEvent> UpsertEventQueues;
62-
TEventQueues<TTopicWorkloadStats::CommitTxEvent> CommitTxEventQueues;
63+
TEventQueues<TTopicWorkloadStats::SelectEvent> ReaderSelectEventQueues;
64+
TEventQueues<TTopicWorkloadStats::UpsertEvent> ReaderUpsertEventQueues;
65+
TEventQueues<TTopicWorkloadStats::CommitTxEvent> ReaderCommitTxEventQueues;
66+
TEventQueues<TTopicWorkloadStats::SelectEvent> WriterSelectEventQueues;
67+
TEventQueues<TTopicWorkloadStats::UpsertEvent> WriterUpsertEventQueues;
68+
TEventQueues<TTopicWorkloadStats::CommitTxEvent> WriterCommitTxEventQueues;
6369

6470
bool Quiet;
6571
bool PrintTimestamp;

ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.cpp

Lines changed: 83 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ bool TTopicWorkloadWriterWorker::WaitForInitSeqNo()
5454
NThreading::TFuture<ui64> InitSeqNo = WriteSession->GetInitSeqNo();
5555
while (!*Params.ErrorFlag) {
5656
if (!InitSeqNo.HasValue() && !InitSeqNo.Wait(TDuration::Seconds(1))) {
57-
WRITE_LOG(Params.Log, ELogPriority::TLOG_WARNING, "No initial sequence number.");
57+
WRITE_LOG(Params.Log, ELogPriority::TLOG_WARNING, TStringBuilder() << "No initial sequence number for ProducerId " << Params.ProducerId << " PartitionId " << Params.PartitionId);
5858
Sleep(TDuration::Seconds(1));
5959
continue;
6060
}
@@ -80,10 +80,10 @@ bool TTopicWorkloadWriterWorker::WaitForInitSeqNo()
8080
return false;
8181
}
8282

83-
void TTopicWorkloadWriterWorker::Process() {
83+
void TTopicWorkloadWriterWorker::Process(TInstant endTime) {
8484
Sleep(TDuration::Seconds((float)Params.WarmupSec * Params.WriterIdx / Params.ProducerThreadCount));
85-
86-
const TInstant endTime = TInstant::Now() + TDuration::Seconds(Params.TotalSec);
85+
86+
TInstant commitTime = TInstant::Now() + TDuration::Seconds(Params.CommitPeriod);
8787

8888
StartTimestamp = Now();
8989
WRITE_LOG(Params.Log, ELogPriority::TLOG_DEBUG, TStringBuilder() << "StartTimestamp " << StartTimestamp);
@@ -127,7 +127,7 @@ void TTopicWorkloadWriterWorker::Process() {
127127
WRITE_LOG(Params.Log, ELogPriority::TLOG_DEBUG, TStringBuilder() << "Inflight size " << InflightMessages.size() << " writingAllowed " << writingAllowed);
128128
}
129129

130-
if (writingAllowed)
130+
if (writingAllowed && !WaitForCommitTx)
131131
{
132132
TString data = GetGeneratedMessage();
133133

@@ -137,14 +137,39 @@ void TTopicWorkloadWriterWorker::Process() {
137137

138138
BytesWritten += Params.MessageSize;
139139

140-
WriteSession->Write(std::move(ContinuationToken.GetRef()), data, MessageId, createTimestamp);
140+
if (TxSupport && !TxSupport->Transaction) {
141+
TxSupport->BeginTx();
142+
}
143+
144+
NTopic::TWriteMessage writeMessage(data);
145+
writeMessage.SeqNo(MessageId);
146+
writeMessage.CreateTimestamp(createTimestamp);
147+
if (TxSupport) {
148+
writeMessage.Tx(*TxSupport->Transaction);
149+
}
150+
151+
WriteSession->Write(std::move(ContinuationToken.GetRef()), std::move(writeMessage));
152+
153+
if (TxSupport) {
154+
TxSupport->AppendRow("");
155+
}
141156

142157
WRITE_LOG(Params.Log, ELogPriority::TLOG_DEBUG, TStringBuilder() << "Written message " << MessageId << " CreateTimestamp " << createTimestamp << " delta from now " << (Params.ByteRate == 0 ? TDuration() : now - *createTimestamp.Get()));
143158
ContinuationToken.Clear();
144159
MessageId++;
160+
161+
if (TxSupport) {
162+
TryCommitTx(Params, commitTime);
163+
}
145164
}
146165
else
166+
{
167+
if (TxSupport) {
168+
TryCommitTx(Params, commitTime);
169+
}
170+
147171
Sleep(TDuration::MilliSeconds(1));
172+
}
148173

149174
if (events.empty())
150175
break;
@@ -236,17 +261,67 @@ void TTopicWorkloadWriterWorker::CreateWorker() {
236261
WriteSession = NYdb::NTopic::TTopicClient(Params.Driver).CreateWriteSession(settings);
237262
}
238263

239-
void TTopicWorkloadWriterWorker::WriterLoop(TTopicWorkloadWriterParams& params) {
264+
void TTopicWorkloadWriterWorker::RetryableWriterLoop(TTopicWorkloadWriterParams& params) {
265+
const TInstant endTime = Now() + TDuration::Seconds(params.TotalSec + 3);
266+
267+
while (!*params.ErrorFlag && Now() < endTime) {
268+
try {
269+
WriterLoop(params, endTime);
270+
} catch (const yexception& ex) {
271+
WRITE_LOG(params.Log, ELogPriority::TLOG_WARNING, TStringBuilder() << ex);
272+
}
273+
}
274+
}
275+
276+
void TTopicWorkloadWriterWorker::WriterLoop(TTopicWorkloadWriterParams& params, TInstant endTime) {
240277
TTopicWorkloadWriterWorker writer(std::move(params));
241278

279+
if (params.UseTransactions) {
280+
writer.TxSupport.emplace(params.Driver, "", "");
281+
}
282+
242283
(*params.StartedCount)++;
243284

244285
WRITE_LOG(params.Log, ELogPriority::TLOG_INFO, TStringBuilder() << "Writer started " << Now().ToStringUpToSeconds());
245286

246287
if (!writer.WaitForInitSeqNo())
247288
return;
248289

249-
writer.Process();
290+
writer.Process(endTime);
250291

251292
WRITE_LOG(params.Log, ELogPriority::TLOG_INFO, TStringBuilder() << "Writer finished " << Now().ToStringUpToSeconds());
252293
}
294+
295+
void TTopicWorkloadWriterWorker::TryCommitTx(TTopicWorkloadWriterParams& params,
296+
TInstant& commitTime)
297+
{
298+
Y_ABORT_UNLESS(TxSupport);
299+
300+
if ((commitTime > Now()) && (params.CommitMessages > TxSupport->Rows.size())) {
301+
return;
302+
}
303+
304+
if (!InflightMessages.empty()) {
305+
WaitForCommitTx = true;
306+
return;
307+
}
308+
309+
TryCommitTableChanges(params);
310+
311+
commitTime += TDuration::Seconds(params.CommitPeriod);
312+
313+
WaitForCommitTx = false;
314+
}
315+
316+
void TTopicWorkloadWriterWorker::TryCommitTableChanges(TTopicWorkloadWriterParams& params)
317+
{
318+
if (TxSupport->Rows.empty()) {
319+
return;
320+
}
321+
322+
auto execTimes = TxSupport->CommitTx(params.UseTableSelect, params.UseTableUpsert);
323+
324+
params.StatsCollector->AddWriterSelectEvent(params.WriterIdx, {execTimes.SelectTime.MilliSeconds()});
325+
params.StatsCollector->AddWriterUpsertEvent(params.WriterIdx, {execTimes.UpsertTime.MilliSeconds()});
326+
params.StatsCollector->AddWriterCommitTxEvent(params.WriterIdx, {execTimes.CommitTime.MilliSeconds()});
327+
}

0 commit comments

Comments
 (0)