Skip to content

Commit d14474e

Browse files
authored
Fix tpch generator (#10423)
1 parent fad0666 commit d14474e

File tree

2 files changed

+61
-53
lines changed

2 files changed

+61
-53
lines changed

ydb/library/workload/tpch/data_generator.cpp

Lines changed: 59 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -128,28 +128,28 @@ TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::TDataPortions TTpchWo
128128
if (tdefs[TableNum].child != NONE) {
129129
ctxs.emplace_back(*this, tdefs[TableNum].child, nullptr);
130130
}
131-
with_lock(NumbersLock) {
132-
if (!Generated) {
133-
if (Owner.GetProcessCount() > 1) {
134-
DSS_HUGE e;
135-
set_state(TableNum, Owner.GetScale(), Owner.GetProcessCount(), Owner.GetProcessIndex() + 1, &e);
136-
}
137-
if (!!Owner.StateProcessor) {
138-
if (const auto* state = MapFindPtr(Owner.StateProcessor->GetState(), GetName())) {
139-
Generated = state->Position;
140-
GenSeed(TableNum, Generated);
141-
}
142-
}
131+
132+
auto g = Guard(Lock);
133+
if (!Generated) {
134+
if (Owner.GetProcessCount() > 1) {
135+
DSS_HUGE e;
136+
set_state(TableNum, Owner.GetScale(), Owner.GetProcessCount(), Owner.GetProcessIndex() + 1, &e);
143137
}
144-
const auto count = TableSize > Generated ? std::min(ui64(TableSize - Generated), Owner.Params.BulkSize) : 0;
145-
if (!count) {
146-
return result;
138+
if (!!Owner.StateProcessor) {
139+
if (const auto* state = MapFindPtr(Owner.StateProcessor->GetState(), GetName())) {
140+
Generated = state->Position;
141+
GenSeed(TableNum, Generated);
142+
}
147143
}
148-
ctxs.front().SetCount(count);
149-
ctxs.front().SetStart((tdefs[TableNum].base * Owner.GetScale() / Owner.GetProcessCount()) * Owner.GetProcessIndex() + Generated + 1);
150-
Generated += count;
151144
}
152-
GenerateRows(ctxs);
145+
const auto count = TableSize > Generated ? std::min(ui64(TableSize - Generated), Owner.Params.BulkSize) : 0;
146+
if (!count) {
147+
return result;
148+
}
149+
ctxs.front().SetCount(count);
150+
ctxs.front().SetStart((tdefs[TableNum].base * Owner.GetScale() / Owner.GetProcessCount()) * Owner.GetProcessIndex() + Generated + 1);
151+
Generated += count;
152+
GenerateRows(ctxs, std::move(g));
153153
for(auto& ctx: ctxs) {
154154
ctx.AppendPortions(result);
155155
}
@@ -158,7 +158,7 @@ TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::TDataPortions TTpchWo
158158

159159
#define CSV_WRITER_REGISTER_FIELD_DATE(writer, column_name, record_field) \
160160
writer.RegisterField(column_name, [](const decltype(writer)::TItem& item, IOutputStream& out) { \
161-
out << TInstant::ParseIso8601(item.record_field).Days(); \
161+
out << item.record_field; \
162162
});
163163

164164
#define CSV_WRITER_REGISTER_FIELD_COMMENT(writer, prefix) \
@@ -185,13 +185,14 @@ class TBulkDataGeneratorOrderLine : public TTpchWorkloadDataInitializerGenerator
185185
{}
186186

187187
protected:
188-
virtual void GenerateRows(TContexts& ctxs) override {
188+
virtual void GenerateRows(TContexts& ctxs, TGuard<TAdaptiveLock>&& g) override {
189189
TVector<order_t> ordersList(ctxs.front().GetCount());
190-
with_lock(DriverLock) {
191-
for (ui64 i = 0; i < ctxs.front().GetCount(); ++i) {
192-
mk_order(ctxs.front().GetStart() + i, &ordersList[i], 0);
193-
}
190+
for (ui64 i = 0; i < ctxs.front().GetCount(); ++i) {
191+
row_start(TableNum);
192+
mk_order(ctxs.front().GetStart() + i, &ordersList[i], 0);
193+
row_stop(TableNum);
194194
}
195+
g.Release();
195196

196197
TCsvItemWriter<order_t> ordersWriter(ctxs[0].GetCsv().Out);
197198
CSV_WRITER_REGISTER_FIELD(ordersWriter, "o_orderkey", okey);
@@ -245,13 +246,15 @@ class TBulkDataGeneratorPartPSupp : public TTpchWorkloadDataInitializerGenerator
245246
{}
246247

247248
protected:
248-
virtual void GenerateRows(TContexts& ctxs) override {
249+
virtual void GenerateRows(TContexts& ctxs, TGuard<TAdaptiveLock>&& g) override {
249250
TVector<part_t> partsList(ctxs.front().GetCount());
250-
with_lock(DriverLock) {
251-
for (ui64 i = 0; i < ctxs.front().GetCount(); ++i) {
252-
mk_part(ctxs.front().GetStart() + i, &partsList[i]);
253-
}
251+
for (ui64 i = 0; i < ctxs.front().GetCount(); ++i) {
252+
row_start(TableNum);
253+
mk_part(ctxs.front().GetStart() + i, &partsList[i]);
254+
row_stop(TableNum);
254255
}
256+
g.Release();
257+
255258
TCsvItemWriter<part_t> partsWriter(ctxs[0].GetCsv().Out);
256259
CSV_WRITER_REGISTER_FIELD(partsWriter, "p_partkey", partkey);
257260
CSV_WRITER_REGISTER_FIELD(partsWriter, "p_name", name);
@@ -286,13 +289,14 @@ class TBulkDataGeneratorSupplier : public TTpchWorkloadDataInitializerGenerator:
286289
{}
287290

288291
protected:
289-
virtual void GenerateRows(TContexts& ctxs) override {
292+
virtual void GenerateRows(TContexts& ctxs, TGuard<TAdaptiveLock>&& g) override {
290293
TVector<supplier_t> suppList(ctxs.front().GetCount());
291-
with_lock(DriverLock) {
292-
for (ui64 i = 0; i < ctxs.front().GetCount(); ++i) {
293-
mk_supp(ctxs.front().GetStart() + i, &suppList[i]);
294-
}
294+
for (ui64 i = 0; i < ctxs.front().GetCount(); ++i) {
295+
row_start(TableNum);
296+
mk_supp(ctxs.front().GetStart() + i, &suppList[i]);
297+
row_stop(TableNum);
295298
}
299+
g.Release();
296300

297301
TCsvItemWriter<supplier_t> writer(ctxs[0].GetCsv().Out);
298302
CSV_WRITER_REGISTER_FIELD(writer, "s_suppkey", suppkey);
@@ -316,13 +320,14 @@ class TBulkDataGeneratorCustomer : public TTpchWorkloadDataInitializerGenerator:
316320
{}
317321

318322
protected:
319-
virtual void GenerateRows(TContexts& ctxs) override {
323+
virtual void GenerateRows(TContexts& ctxs, TGuard<TAdaptiveLock>&& g) override {
320324
TVector<customer_t> custList(ctxs.front().GetCount());
321-
with_lock(DriverLock) {
322-
for (ui64 i = 0; i < ctxs.front().GetCount(); ++i) {
323-
mk_cust(ctxs.front().GetStart() + i, &custList[i]);
324-
}
325+
for (ui64 i = 0; i < ctxs.front().GetCount(); ++i) {
326+
row_start(TableNum);
327+
mk_cust(ctxs.front().GetStart() + i, &custList[i]);
328+
row_stop(TableNum);
325329
}
330+
g.Release();
326331

327332
TCsvItemWriter<customer_t> writer(ctxs[0].GetCsv().Out);
328333
CSV_WRITER_REGISTER_FIELD(writer, "c_custkey", custkey);
@@ -347,13 +352,15 @@ class TBulkDataGeneratorNation : public TTpchWorkloadDataInitializerGenerator::T
347352
{}
348353

349354
protected:
350-
virtual void GenerateRows(TContexts& ctxs) override {
355+
virtual void GenerateRows(TContexts& ctxs, TGuard<TAdaptiveLock>&& g) override {
351356
TVector<code_t> nationList(ctxs.front().GetCount());
352-
with_lock(DriverLock) {
353-
for (ui64 i = 0; i < ctxs.front().GetCount(); ++i) {
354-
mk_nation(ctxs.front().GetStart() + i, &nationList[i]);
355-
}
357+
for (ui64 i = 0; i < ctxs.front().GetCount(); ++i) {
358+
row_start(TableNum);
359+
mk_nation(ctxs.front().GetStart() + i, &nationList[i]);
360+
row_stop(TableNum);
356361
}
362+
g.Release();
363+
357364
TCsvItemWriter<code_t> writer(ctxs[0].GetCsv().Out);
358365
CSV_WRITER_REGISTER_FIELD(writer, "n_nationkey", code);
359366
CSV_WRITER_REGISTER_FIELD(writer, "n_name", text);
@@ -373,13 +380,15 @@ class TBulkDataGeneratorRegion : public TTpchWorkloadDataInitializerGenerator::T
373380
{}
374381

375382
protected:
376-
virtual void GenerateRows(TContexts& ctxs) override {
383+
virtual void GenerateRows(TContexts& ctxs, TGuard<TAdaptiveLock>&& g) override {
377384
TVector<code_t> regionList(ctxs.front().GetCount());
378-
with_lock(DriverLock) {
379-
for (ui64 i = 0; i < ctxs.front().GetCount(); ++i) {
380-
mk_region(ctxs.front().GetStart() + i, &regionList[i]);
381-
}
385+
for (ui64 i = 0; i < ctxs.front().GetCount(); ++i) {
386+
row_start(TableNum);
387+
mk_region(ctxs.front().GetStart() + i, &regionList[i]);
388+
row_stop(TableNum);
382389
}
390+
g.Release();
391+
383392
TCsvItemWriter<code_t> writer(ctxs[0].GetCsv().Out);
384393
CSV_WRITER_REGISTER_FIELD(writer, "r_regionkey", code);
385394
CSV_WRITER_REGISTER_FIELD(writer, "r_name", text);

ydb/library/workload/tpch/data_generator.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,11 @@ class TTpchWorkloadDataInitializerGenerator: public TWorkloadDataInitializerBase
4848

4949
using TContexts = TVector<TContext>;
5050

51-
virtual void GenerateRows(TContexts& ctxs) = 0;
51+
virtual void GenerateRows(TContexts& ctxs, TGuard<TAdaptiveLock>&& g) = 0;
5252

5353
int TableNum;
5454
ui64 Generated = 0;
55-
TAdaptiveLock NumbersLock;
56-
TAdaptiveLock DriverLock;
55+
TAdaptiveLock Lock;
5756

5857
private:
5958
TString GetFullTableName(const char* table) const;

0 commit comments

Comments
 (0)