File tree Expand file tree Collapse file tree 2 files changed +29
-12
lines changed Expand file tree Collapse file tree 2 files changed +29
-12
lines changed Original file line number Diff line number Diff line change 6
6
import ray
7
7
8
8
from benchmark import Benchmark
9
+ from python .ray .data import DataContext
10
+ from python .ray .data .context import ShuffleStrategy
9
11
10
12
11
13
def parse_args () -> argparse .Namespace :
@@ -24,6 +26,14 @@ def parse_args() -> argparse.Namespace:
24
26
type = str ,
25
27
help = "Which columns to group by" ,
26
28
)
29
+ parser .add_argument (
30
+ "--shuffle-strategy" ,
31
+ required = False ,
32
+ default = ShuffleStrategy .SORT_SHUFFLE_PULL_BASED ,
33
+ nargs = "?" ,
34
+ type = str ,
35
+ help = "Strategy to use when shuffling data (see ShuffleStrategy for accepted values)" ,
36
+ )
27
37
28
38
consume_group = parser .add_mutually_exclusive_group ()
29
39
consume_group .add_argument ("--aggregate" , action = "store_true" )
@@ -39,6 +49,11 @@ def main(args):
39
49
def benchmark_fn ():
40
50
path = f"s3://ray-benchmark-data/tpch/parquet/sf{ args .sf } /lineitem"
41
51
52
+ # Configure appropriate shuffle-strategy
53
+ DataContext .get_current ().shuffle_strategy = ShuffleStrategy (
54
+ args .shuffle_strategy
55
+ )
56
+
42
57
grouped_ds = ray .data .read_parquet (path ).groupby (args .group_by )
43
58
consume_fn (grouped_ds )
44
59
Original file line number Diff line number Diff line change 135
135
timeout : 3600
136
136
137
137
variations :
138
- - __suffix__ : few_groups
138
+ - __suffix__ : few_groups (sort_shuffle_pull_based)
139
139
run :
140
140
script : >
141
- python groupby_benchmark.py --sf 10 --aggregate
142
- --group-by column08 column13 column14
143
- - __suffix__ : many_groups
141
+ python groupby_benchmark.py --sf 10 --aggregate --group-by column08 column13 column14
142
+ --shuffle-strategy sort_shuffle_pull_based
143
+
144
+ - __suffix__ : many_groups (sort_shuffle_pull_based)
144
145
run :
145
146
script : >
146
- python groupby_benchmark.py --sf 10 --aggregate
147
- --group-by column02 column14
147
+ python groupby_benchmark.py --sf 10 --aggregate --group-by column02 column14
148
+ --shuffle-strategy sort_shuffle_pull_based
148
149
149
150
- name : map_groups
150
151
155
156
timeout : 3600
156
157
157
158
variations :
158
- - __suffix__ : few_groups
159
+ - __suffix__ : few_groups (sort_shuffle_pull_based)
159
160
run :
160
161
script : >
161
- python groupby_benchmark.py --sf 10 --map-groups
162
- --group-by column08 column13 column14
163
- - __suffix__ : many_groups
162
+ python groupby_benchmark.py --sf 10 --map-groups --group-by column08 column13 column14
163
+ --shuffle-strategy sort_shuffle_pull_based
164
+
165
+ - __suffix__ : many_groups (sort_shuffle_pull_based)
164
166
run :
165
167
script : >
166
- python groupby_benchmark.py --sf 10 --map-groups
167
- --group-by column02 column14
168
+ python groupby_benchmark.py --sf 10 --map-groups --group-by column02 column14
169
+ --shuffle-strategy sort_shuffle_pull_based
168
170
169
171
# ######################
170
172
# Streaming split tests
You can’t perform that action at this time.
0 commit comments