Skip to content

Commit 38572a8

Browse files
authored
Fix and simplify logic with multiprocess TPC-* data generation (#11913)
1 parent 8f553e4 commit 38572a8

File tree

9 files changed

+198
-88
lines changed

9 files changed

+198
-88
lines changed

ydb/library/workload/abstract/workload_query_generator.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ class IBulkDataGenerator {
102102

103103
virtual TDataPortions GenerateDataPortion() = 0;
104104
YDB_READONLY_DEF(std::string, Name);
105-
YDB_READONLY(ui64, Size, 0);
105+
YDB_READONLY_PROTECT(ui64, Size, 0);
106106
};
107107

108108
using TBulkDataGeneratorList = std::vector<std::shared_ptr<IBulkDataGenerator>>;

ydb/library/workload/tpcds/data_generator.cpp

Lines changed: 46 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -53,30 +53,6 @@ TBulkDataGeneratorList TTpcdsWorkloadDataInitializerGenerator::DoGetBulkInitialD
5353
return TBulkDataGeneratorList(gens.begin(), gens.end());
5454
}
5555

56-
TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TPositions TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::CalcCountToGenerate(const TTpcdsWorkloadDataInitializerGenerator& owner, int tableNum, bool useState) {
57-
static const TSet<ui32> allowedModules{1, 2, 4};
58-
TPositions result;
59-
const auto* tdef = getTdefsByNumber(tableNum);
60-
if (!tdef) {
61-
return result;
62-
}
63-
split_work(tableNum, &result.FirstRow, &result.Count);
64-
if (useState && owner.StateProcessor && owner.StateProcessor->GetState().contains(tdef->name)) {
65-
result.Position = owner.StateProcessor->GetState().at(tdef->name).Position;
66-
67-
//this magic is needed for SCD to work correctly. See setSCDKeys in ydb/library/benchmarks/gen/tpcds-dbgen/scd.c
68-
while (result.Position && !allowedModules.contains((result.FirstRow + result.Position) % 6)) {
69-
--result.Position;
70-
}
71-
}
72-
//this magic is needed for SCD to work correctly. See setSCDKeys in ydb/library/benchmarks/gen/tpcds-dbgen/scd.c
73-
while (result.FirstRow > 1 && !allowedModules.contains((result.FirstRow + result.Position) % 6)) {
74-
--result.FirstRow;
75-
++result.Count;
76-
}
77-
return result;
78-
}
79-
8056
TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TContext::TContext(const TBulkDataGenerator& owner, int tableNum)
8157
: Owner(owner)
8258
, TableNum(tableNum)
@@ -95,25 +71,27 @@ TStringBuilder& TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TCon
9571
}
9672

9773
void TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TContext::AppendPortions(TDataPortions& result) {
98-
const auto name = getTdefsByNumber(TableNum)->name;
74+
const auto* tdef = getTdefsByNumber(TableNum);
75+
const auto name = tdef->name;
9976
const auto path = Owner.GetFullTableName(name);
77+
auto* stateProcessor = (tdef->flags & FL_CHILD) ? nullptr : Owner.Owner.StateProcessor.Get();
10078
if (Builder) {
10179
Builder->EndList();
10280
result.push_back(MakeIntrusive<TDataPortionWithState>(
103-
Owner.Owner.StateProcessor.Get(),
81+
stateProcessor,
10482
path,
10583
name,
10684
Builder->Build(),
107-
Start - 1,
85+
Start - Owner.FirstRow,
10886
Count
10987
));
11088
} else if (Csv) {
11189
result.push_back(MakeIntrusive<TDataPortionWithState>(
112-
Owner.Owner.StateProcessor.Get(),
90+
stateProcessor,
11391
path,
11492
name,
11593
TDataPortion::TCsv(std::move(Csv), TWorkloadGeneratorBase::PsvFormatString),
116-
Start - 1,
94+
Start - Owner.FirstRow,
11795
Count
11896
));
11997
}
@@ -124,10 +102,41 @@ TString TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::GetFullTable
124102
}
125103

126104
TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TBulkDataGenerator(const TTpcdsWorkloadDataInitializerGenerator& owner, int tableNum)
127-
: IBulkDataGenerator(getTdefsByNumber(tableNum)->name, CalcCountToGenerate(owner, tableNum, true).Count)
105+
: IBulkDataGenerator(getTdefsByNumber(tableNum)->name, 0)
128106
, TableNum(tableNum)
129107
, Owner(owner)
130-
{}
108+
{
109+
static const TSet<ui32> allowedModules{1, 2, 4};
110+
const auto* tdef = getTdefsByNumber(TableNum);
111+
if (!tdef) {
112+
return;
113+
}
114+
ds_key_t rowsCount;
115+
split_work(TableNum, &FirstRow, &rowsCount);
116+
//this magic is needed for SCD to work correctly. See setSCDKeys in ydb/library/benchmarks/gen/tpcds-dbgen/scd.c
117+
while (FirstRow > 1 && !allowedModules.contains(FirstRow % 6)) {
118+
--FirstRow;
119+
++rowsCount;
120+
}
121+
if (owner.StateProcessor) {
122+
if (const auto* state = MapFindPtr(Owner.StateProcessor->GetState(), tdef->name)) {
123+
StartPosition = state->Position;
124+
125+
//this magic is needed for SCD to work correctly. See setSCDKeys in ydb/library/benchmarks/gen/tpcds-dbgen/scd.c
126+
while (StartPosition && !allowedModules.contains((1 + StartPosition) % 6)) {
127+
--StartPosition;
128+
}
129+
if (StartPosition) {
130+
FirstPortion = MakeIntrusive<TDataPortion>(
131+
GetFullTableName(tdef->name),
132+
TDataPortion::TSkip(),
133+
StartPosition
134+
);
135+
}
136+
}
137+
}
138+
Size = rowsCount;
139+
}
131140

132141
TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TDataPortions TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::GenerateDataPortion() {
133142
TDataPortions result;
@@ -143,30 +152,24 @@ TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TDataPortions TTpcds
143152
}
144153

145154
auto g = Guard(Lock);
146-
auto positions = CalcCountToGenerate(Owner, TableNum, !Generated);
147155
if (!Generated) {
148-
Generated = positions.Position;
149-
result.push_back(MakeIntrusive<TDataPortion>(
150-
GetFullTableName(tdef->name),
151-
TDataPortion::TSkip(),
152-
Generated
153-
));
154-
if (const ui32 toSkip = positions.FirstRow + positions.Position - 1) {
156+
Generated = StartPosition;
157+
if (const auto toSkip = FirstRow + StartPosition - 1) {
155158
row_skip(TableNum, toSkip);
156159
if (tdef->flags & FL_PARENT) {
157160
row_skip(tdef->nParam, toSkip);
158161
}
159162
}
160-
if (tdef->flags & FL_SMALL) {
161-
resetCountCount();
162-
}
163+
}
164+
if (FirstPortion) {
165+
result.emplace_back(std::move(FirstPortion));
163166
}
164167
const auto count = GetSize() > Generated ? std::min(ui64(GetSize() - Generated), Owner.Params.BulkSize) : 0;
165168
if (!count) {
166169
return result;
167170
}
168171
ctxs.front().SetCount(count);
169-
ctxs.front().SetStart(positions.FirstRow + Generated);
172+
ctxs.front().SetStart(FirstRow + Generated);
170173
Generated += count;
171174
GenerateRows(ctxs, std::move(g));
172175
for(auto& ctx: ctxs) {

ydb/library/workload/tpcds/data_generator.h

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,9 @@ class TTpcdsWorkloadDataInitializerGenerator: public TWorkloadDataInitializerBas
6666

6767
private:
6868
TString GetFullTableName(const char* table) const;
69-
struct TPositions {
70-
ds_key_t FirstRow = 1;
71-
ui64 Position = 0;
72-
ds_key_t Count = 0;
73-
};
74-
static TPositions CalcCountToGenerate(const TTpcdsWorkloadDataInitializerGenerator& owner, int tableNum, bool useState);
69+
ds_key_t FirstRow = 1;
70+
ui64 StartPosition = 0;
71+
TDataPortionPtr FirstPortion;
7572
const TTpcdsWorkloadDataInitializerGenerator& Owner;
7673
};
7774
};

ydb/library/workload/tpch/data_generator.cpp

Lines changed: 32 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -45,22 +45,6 @@ TBulkDataGeneratorList TTpchWorkloadDataInitializerGenerator::DoGetBulkInitialDa
4545
return TBulkDataGeneratorList(gens.begin(), gens.end());
4646
}
4747

48-
49-
ui64 TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::CalcCountToGenerate(const TTpchWorkloadDataInitializerGenerator& owner, int tableNum) {
50-
if (tableNum == NONE) {
51-
return 0;
52-
}
53-
if (tableNum >= NATION) {
54-
return owner.GetProcessIndex() ? 0 : tdefs[tableNum].base;
55-
}
56-
ui64 rowCount = tdefs[tableNum].base * owner.GetScale();
57-
ui64 extraRows = 0;
58-
if (owner.GetProcessIndex() + 1 >= owner.GetProcessCount()) {
59-
extraRows = rowCount % owner.GetProcessCount();
60-
}
61-
return rowCount / owner.GetProcessCount() + extraRows;
62-
}
63-
6448
TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::TContext::TContext(const TBulkDataGenerator& owner, int tableNum, TGeneratorStateProcessor* state)
6549
: Owner(owner)
6650
, TableNum(tableNum)
@@ -88,7 +72,7 @@ void TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::TContext::Append
8872
path,
8973
tdefs[TableNum].name,
9074
Builder->Build(),
91-
Start - 1,
75+
Start - Owner.FirstRow,
9276
Count
9377
));
9478
} else if (Csv) {
@@ -97,7 +81,7 @@ void TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::TContext::Append
9781
path,
9882
tdefs[TableNum].name,
9983
TDataPortion::TCsv(std::move(Csv), TWorkloadGeneratorBase::PsvFormatString),
100-
Start - 1,
84+
Start - Owner.FirstRow,
10185
Count
10286
));
10387
}
@@ -108,10 +92,35 @@ TString TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::GetFullTableN
10892
}
10993

11094
TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::TBulkDataGenerator(const TTpchWorkloadDataInitializerGenerator& owner, int tableNum)
111-
: IBulkDataGenerator(tdefs[tableNum].name, CalcCountToGenerate(owner, tableNum))
95+
: IBulkDataGenerator(tdefs[tableNum].name, 0)
11296
, TableNum(tableNum)
11397
, Owner(owner)
114-
{}
98+
{
99+
if (TableNum == NONE) {
100+
return;
101+
}
102+
if (tableNum >= NATION) {
103+
Size = owner.GetProcessIndex() ? 0 : tdefs[tableNum].base;
104+
} else {
105+
DSS_HUGE extraRows = 0;
106+
Size = set_state(TableNum, Owner.GetScale(), Owner.GetProcessCount(), Owner.GetProcessIndex() + 1, &extraRows);
107+
FirstRow += Size * Owner.GetProcessIndex();
108+
if (Owner.GetProcessIndex() + 1 == Owner.GetProcessCount()) {
109+
Size += extraRows;
110+
}
111+
}
112+
if (!!Owner.StateProcessor) {
113+
if (const auto* state = MapFindPtr(Owner.StateProcessor->GetState(), GetName())) {
114+
Generated = state->Position;
115+
FirstPortion = MakeIntrusive<TDataPortion>(
116+
GetFullTableName(tdefs[TableNum].name),
117+
TDataPortion::TSkip(),
118+
Generated
119+
);
120+
GenSeed(TableNum, Generated);
121+
}
122+
}
123+
}
115124

116125
TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::TDataPortions TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::GenerateDataPortion() {
117126
TDataPortions result;
@@ -125,29 +134,15 @@ TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::TDataPortions TTpchWo
125134
}
126135

127136
auto g = Guard(Lock);
128-
if (!Generated) {
129-
if (Owner.GetProcessCount() > 1) {
130-
DSS_HUGE e;
131-
set_state(TableNum, Owner.GetScale(), Owner.GetProcessCount(), Owner.GetProcessIndex() + 1, &e);
132-
}
133-
if (!!Owner.StateProcessor) {
134-
if (const auto* state = MapFindPtr(Owner.StateProcessor->GetState(), GetName())) {
135-
Generated = state->Position;
136-
result.push_back(MakeIntrusive<TDataPortion>(
137-
GetFullTableName(tdefs[TableNum].name),
138-
TDataPortion::TSkip(),
139-
Generated
140-
));
141-
GenSeed(TableNum, Generated);
142-
}
143-
}
137+
if (FirstPortion) {
138+
result.emplace_back(std::move(FirstPortion));
144139
}
145140
const auto count = GetSize() > Generated ? std::min(ui64(GetSize() - Generated), Owner.Params.BulkSize) : 0;
146141
if (!count) {
147142
return result;
148143
}
149144
ctxs.front().SetCount(count);
150-
ctxs.front().SetStart((tdefs[TableNum].base * Owner.GetScale() / Owner.GetProcessCount()) * Owner.GetProcessIndex() + Generated + 1);
145+
ctxs.front().SetStart(FirstRow + Generated);
151146
Generated += count;
152147
GenerateRows(ctxs, std::move(g));
153148
for(auto& ctx: ctxs) {

ydb/library/workload/tpch/data_generator.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,13 @@ class TTpchWorkloadDataInitializerGenerator: public TWorkloadDataInitializerBase
5252

5353
int TableNum;
5454
ui64 Generated = 0;
55+
ui64 FirstRow = 1;
5556
TAdaptiveLock Lock;
5657

5758
private:
5859
TString GetFullTableName(const char* table) const;
59-
static ui64 CalcCountToGenerate(const TTpchWorkloadDataInitializerGenerator& owner, int tableNum);
6060
const TTpchWorkloadDataInitializerGenerator& Owner;
61+
TDataPortionPtr FirstPortion;
6162
};
6263

6364
};

ydb/tests/functional/tpc/canondata/result.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
"test_generator.TestTpcdsGenerator.test_s1_state": {
99
"uri": "file://test_generator.TestTpcdsGenerator.test_s1_state/s1.hash"
1010
},
11+
"test_generator.TestTpcdsGenerator.test_s1_state_and_parts": {
12+
"uri": "file://test_generator.TestTpcdsGenerator.test_s1_state_and_parts/s1.hash"
13+
},
1114
"test_generator.TestTpchGenerator.test_s1": {
1215
"uri": "file://test_generator.TestTpchGenerator.test_s1/s1.hash"
1316
},
@@ -17,6 +20,9 @@
1720
"test_generator.TestTpchGenerator.test_s1_state": {
1821
"uri": "file://test_generator.TestTpchGenerator.test_s1_state/s1.hash"
1922
},
23+
"test_generator.TestTpchGenerator.test_s1_state_and_parts": {
24+
"uri": "file://test_generator.TestTpchGenerator.test_s1_state_and_parts/s1.hash"
25+
},
2026
"test_init.TestClickbenchInit.test_s1_column": {
2127
"uri": "file://test_init.TestClickbenchInit.test_s1_column/s1_column"
2228
},
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
call_center count: 6
2+
call_center md5: 86db117a0bb48668acbe63c473e85d96
3+
catalog_page count: 11718
4+
catalog_page md5: e45ffa0691c04f86e0be055a91264cd6
5+
catalog_returns count: 144067
6+
catalog_returns md5: a2a74a6552e74a4a63dae4dbf65855db
7+
catalog_sales count: 1441548
8+
catalog_sales md5: 07f03d83e8579e9a22b565a41835c090
9+
customer count: 100000
10+
customer md5: 1e03db62671f58e0950acfe2748b2009
11+
customer_address count: 50000
12+
customer_address md5: f5df0212260c1a9078fc175f2f74a1b8
13+
customer_demographics count: 1920800
14+
customer_demographics md5: 4f6182b865d1c183d50860387332c0b5
15+
date_dim count: 73049
16+
date_dim md5: f4ef03663ab568ddeb16309f493896c0
17+
household_demographics count: 7200
18+
household_demographics md5: b1c3ff23e00da09d7fc94ce5cd8abdd7
19+
income_band count: 20
20+
income_band md5: 5dbfb6a7379a3ccb81004d9abae0df5e
21+
inventory count: 11745000
22+
inventory md5: b5cb61d5a9b1eb9acf7144623e63315a
23+
item count: 18000
24+
item md5: 0caece8d586d854a0713886012ee6786
25+
promotion count: 300
26+
promotion md5: cf9b3443efc3a5d1c3c983e41779dcaf
27+
reason count: 35
28+
reason md5: 89493ae8b5ab9f63f750c1bdadc57089
29+
ship_mode count: 20
30+
ship_mode md5: 25d7c1abd229862398b88818f81f72fc
31+
store count: 12
32+
store md5: f342258aaec198b0ec4d6bb6e9f7991e
33+
store_returns count: 287514
34+
store_returns md5: bc9f15ce6d773f0af978c22ee084802b
35+
store_sales count: 2880404
36+
store_sales md5: 069e459494d5875a78b5eaeeef5340b0
37+
time_dim count: 86400
38+
time_dim md5: 88f81e9a8618f855f4ff20d98e6ec122
39+
warehouse count: 5
40+
warehouse md5: b54252167f4e5dbdb42e163d82ba8c3d
41+
web_page count: 60
42+
web_page md5: db2cf0327ecff09ed59bc5d68ba8aacc
43+
web_returns count: 71763
44+
web_returns md5: 305e332bb00d9590f62e05d5a52519d5
45+
web_sales count: 719384
46+
web_sales md5: ad5c63cc7f6d2830bc3592a5f35cce38
47+
web_site count: 30
48+
web_site md5: 707d556c664272f685ee8d7ddbc46f61
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
customer count: 150000
2+
customer md5: 1808efe529a289183e7ccf8aa1a2d8e9
3+
lineitem count: 6001215
4+
lineitem md5: caa6f4712334af71fd8c1efcdd37d7a7
5+
nation count: 25
6+
nation md5: 0e91944824fb13e44cda58882f0fedbe
7+
orders count: 1500000
8+
orders md5: 01c5ca96aa3149c64427291ebbd792d4
9+
part count: 200000
10+
part md5: d67727d976d8c05b5d145840efaad449
11+
partsupp count: 800000
12+
partsupp md5: d62e99cf993c6de288a905ae2f95eced
13+
region count: 5
14+
region md5: d1c494f597244c77001246888185e3e3
15+
supplier count: 10000
16+
supplier md5: 815d49d8e71c7993531b32113b2da5b5

0 commit comments

Comments
 (0)