4
4
5
5
import click
6
6
7
- from misc .utils import DBGymConfig , get_scale_factor_string , link_result , workload_name_fn
8
- from util .shell import subprocess_run
7
+ from misc .utils import (
8
+ DBGymConfig ,
9
+ get_scale_factor_string ,
10
+ link_result ,
11
+ workload_name_fn ,
12
+ )
9
13
from util .pg import *
14
+ from util .shell import subprocess_run
10
15
11
16
benchmark_tpch_logger = logging .getLogger ("benchmark/tpch" )
12
17
benchmark_tpch_logger .setLevel (logging .INFO )
@@ -29,8 +34,18 @@ def tpch_data(dbgym_cfg: DBGymConfig, scale_factor: float):
29
34
30
35
31
36
@tpch_group .command (name = "workload" )
32
- @click .option ("--seed-start" , type = int , default = 15721 , help = "A workload consists of queries from multiple seeds. This is the starting seed (inclusive)." )
33
- @click .option ("--seed-end" , type = int , default = 15721 , help = "A workload consists of queries from multiple seeds. This is the ending seed (inclusive)." )
37
+ @click .option (
38
+ "--seed-start" ,
39
+ type = int ,
40
+ default = 15721 ,
41
+ help = "A workload consists of queries from multiple seeds. This is the starting seed (inclusive)." ,
42
+ )
43
+ @click .option (
44
+ "--seed-end" ,
45
+ type = int ,
46
+ default = 15721 ,
47
+ help = "A workload consists of queries from multiple seeds. This is the ending seed (inclusive)." ,
48
+ )
34
49
@click .option (
35
50
"--query-subset" ,
36
51
type = click .Choice (["all" , "even" , "odd" ]),
@@ -45,7 +60,9 @@ def tpch_workload(
45
60
query_subset : str ,
46
61
scale_factor : float ,
47
62
):
48
- assert seed_start <= seed_end , f'seed_start ({ seed_start } ) must be <= seed_end ({ seed_end } )'
63
+ assert (
64
+ seed_start <= seed_end
65
+ ), f"seed_start ({ seed_start } ) must be <= seed_end ({ seed_end } )"
49
66
_clone (dbgym_cfg )
50
67
_generate_queries (dbgym_cfg , seed_start , seed_end , scale_factor )
51
68
_generate_workload (dbgym_cfg , seed_start , seed_end , query_subset , scale_factor )
@@ -56,7 +73,9 @@ def _get_queries_dname(seed: int, scale_factor: float) -> str:
56
73
57
74
58
75
def _clone (dbgym_cfg : DBGymConfig ):
59
- expected_symlink_dpath = dbgym_cfg .cur_symlinks_build_path (mkdir = True ) / "tpch-kit.link"
76
+ expected_symlink_dpath = (
77
+ dbgym_cfg .cur_symlinks_build_path (mkdir = True ) / "tpch-kit.link"
78
+ )
60
79
if expected_symlink_dpath .exists ():
61
80
benchmark_tpch_logger .info (f"Skipping clone: { expected_symlink_dpath } " )
62
81
return
@@ -73,22 +92,32 @@ def _clone(dbgym_cfg: DBGymConfig):
73
92
74
93
def _get_tpch_kit_dpath (dbgym_cfg : DBGymConfig ) -> Path :
75
94
tpch_kit_dpath = (dbgym_cfg .cur_symlinks_build_path () / "tpch-kit.link" ).resolve ()
76
- assert tpch_kit_dpath .exists () and tpch_kit_dpath .is_absolute () and not tpch_kit_dpath .is_symlink ()
95
+ assert (
96
+ tpch_kit_dpath .exists ()
97
+ and tpch_kit_dpath .is_absolute ()
98
+ and not tpch_kit_dpath .is_symlink ()
99
+ )
77
100
return tpch_kit_dpath
78
101
79
102
80
- def _generate_queries (dbgym_cfg : DBGymConfig , seed_start : int , seed_end : int , scale_factor : float ):
103
+ def _generate_queries (
104
+ dbgym_cfg : DBGymConfig , seed_start : int , seed_end : int , scale_factor : float
105
+ ):
81
106
tpch_kit_dpath = _get_tpch_kit_dpath (dbgym_cfg )
82
107
data_path = dbgym_cfg .cur_symlinks_data_path (mkdir = True )
83
108
benchmark_tpch_logger .info (
84
109
f"Generating queries: { data_path } [{ seed_start } , { seed_end } ]"
85
110
)
86
111
for seed in range (seed_start , seed_end + 1 ):
87
- expected_queries_symlink_dpath = data_path / (_get_queries_dname (seed , scale_factor ) + ".link" )
112
+ expected_queries_symlink_dpath = data_path / (
113
+ _get_queries_dname (seed , scale_factor ) + ".link"
114
+ )
88
115
if expected_queries_symlink_dpath .exists ():
89
116
continue
90
117
91
- real_dir = dbgym_cfg .cur_task_runs_data_path (_get_queries_dname (seed , scale_factor ), mkdir = True )
118
+ real_dir = dbgym_cfg .cur_task_runs_data_path (
119
+ _get_queries_dname (seed , scale_factor ), mkdir = True
120
+ )
92
121
for i in range (1 , 22 + 1 ):
93
122
target_sql = (real_dir / f"{ i } .sql" ).resolve ()
94
123
subprocess_run (
@@ -106,16 +135,20 @@ def _generate_queries(dbgym_cfg: DBGymConfig, seed_start: int, seed_end: int, sc
106
135
def _generate_data (dbgym_cfg : DBGymConfig , scale_factor : float ):
107
136
tpch_kit_dpath = _get_tpch_kit_dpath (dbgym_cfg )
108
137
data_path = dbgym_cfg .cur_symlinks_data_path (mkdir = True )
109
- expected_tables_symlink_dpath = data_path / f"tables_sf{ get_scale_factor_string (scale_factor )} .link"
138
+ expected_tables_symlink_dpath = (
139
+ data_path / f"tables_sf{ get_scale_factor_string (scale_factor )} .link"
140
+ )
110
141
if expected_tables_symlink_dpath .exists ():
111
- benchmark_tpch_logger .info (f"Skipping generation: { expected_tables_symlink_dpath } " )
142
+ benchmark_tpch_logger .info (
143
+ f"Skipping generation: { expected_tables_symlink_dpath } "
144
+ )
112
145
return
113
146
114
147
benchmark_tpch_logger .info (f"Generating: { expected_tables_symlink_dpath } " )
115
- subprocess_run (
116
- f"./dbgen -vf -s { scale_factor } " , cwd = tpch_kit_dpath / "dbgen"
148
+ subprocess_run (f"./dbgen -vf -s { scale_factor } " , cwd = tpch_kit_dpath / "dbgen" )
149
+ real_dir = dbgym_cfg .cur_task_runs_data_path (
150
+ f"tables_sf{ get_scale_factor_string (scale_factor )} " , mkdir = True
117
151
)
118
- real_dir = dbgym_cfg .cur_task_runs_data_path (f"tables_sf{ get_scale_factor_string (scale_factor )} " , mkdir = True )
119
152
subprocess_run (f"mv ./*.tbl { real_dir } " , cwd = tpch_kit_dpath / "dbgen" )
120
153
121
154
tables_symlink_dpath = link_result (dbgym_cfg , real_dir )
@@ -135,9 +168,7 @@ def _generate_workload(
135
168
expected_workload_symlink_dpath = symlink_data_dpath / (workload_name + ".link" )
136
169
137
170
benchmark_tpch_logger .info (f"Generating: { expected_workload_symlink_dpath } " )
138
- real_dpath = dbgym_cfg .cur_task_runs_data_path (
139
- workload_name , mkdir = True
140
- )
171
+ real_dpath = dbgym_cfg .cur_task_runs_data_path (workload_name , mkdir = True )
141
172
142
173
queries = None
143
174
if query_subset == "all" :
@@ -150,12 +181,19 @@ def _generate_workload(
150
181
with open (real_dpath / "order.txt" , "w" ) as f :
151
182
for seed in range (seed_start , seed_end + 1 ):
152
183
for qnum in queries :
153
- sql_fpath = (symlink_data_dpath / (_get_queries_dname (seed , scale_factor ) + ".link" )).resolve () / f"{ qnum } .sql"
154
- assert sql_fpath .exists () and not sql_fpath .is_symlink () and sql_fpath .is_absolute (), "We should only write existent real absolute paths to a file"
184
+ sql_fpath = (
185
+ symlink_data_dpath
186
+ / (_get_queries_dname (seed , scale_factor ) + ".link" )
187
+ ).resolve () / f"{ qnum } .sql"
188
+ assert (
189
+ sql_fpath .exists ()
190
+ and not sql_fpath .is_symlink ()
191
+ and sql_fpath .is_absolute ()
192
+ ), "We should only write existent real absolute paths to a file"
155
193
output = "," .join ([f"S{ seed } -Q{ qnum } " , str (sql_fpath )])
156
194
print (output , file = f )
157
195
# TODO(WAN): add option to deep-copy the workload.
158
-
196
+
159
197
workload_symlink_dpath = link_result (dbgym_cfg , real_dpath )
160
198
assert workload_symlink_dpath == expected_workload_symlink_dpath
161
199
benchmark_tpch_logger .info (f"Generated: { expected_workload_symlink_dpath } " )
0 commit comments