Skip to content

Commit 994aec1

Browse files
author
Xuye (Chris) Qin
authored
Optimize performance of transfer (#3091)
1 parent b670c47 commit 994aec1

File tree

12 files changed

+246
-164
lines changed

12 files changed

+246
-164
lines changed
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
# Copyright 1999-2022 Alibaba Group Holding Ltd.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import itertools
16+
17+
import numpy as np
18+
import pandas as pd
19+
20+
import mars
21+
import mars.remote as mr
22+
from mars.core.context import get_context
23+
from mars.utils import Timer, readable_size
24+
25+
26+
def send_1_to_1(n: int = None):
27+
ctx = get_context()
28+
workers = ctx.get_worker_addresses()
29+
30+
worker_to_gen_data = {
31+
w: mr.spawn(_gen_data, kwargs=dict(n=n, worker=w), expect_worker=w)
32+
for i, w in enumerate(workers)
33+
}
34+
all_data = mars.execute(list(worker_to_gen_data.values()))
35+
progress = 0.1
36+
ctx.set_progress(progress)
37+
infos = [d._fetch_infos(fields=["data_key", "store_size"]) for d in all_data]
38+
data_size = infos[0]["store_size"][0]
39+
worker_to_data_keys = dict(zip(workers, [info["data_key"][0] for info in infos]))
40+
41+
workers_to_durations = dict()
42+
size = len(workers) * (len(workers) - 1)
43+
for worker1, worker2 in itertools.permutations(workers, 2):
44+
fetch_data = mr.spawn(
45+
_fetch_data,
46+
args=(worker_to_data_keys[worker1],),
47+
kwargs=dict(worker=worker2),
48+
expect_worker=worker2,
49+
)
50+
fetch_time = fetch_data.execute().fetch()
51+
rate = readable_size(data_size / fetch_time)
52+
workers_to_durations[worker1, worker2] = (
53+
readable_size(data_size),
54+
f"{rate}B/s",
55+
)
56+
progress += 0.9 / size
57+
ctx.set_progress(min(progress, 1.0))
58+
return workers_to_durations
59+
60+
61+
def _gen_data(
62+
n: int = None, worker: str = None, check_addr: bool = True
63+
) -> pd.DataFrame:
64+
if check_addr:
65+
ctx = get_context()
66+
assert ctx.worker_address == worker
67+
n = n if n is not None else 5_000_000
68+
rs = np.random.RandomState(123)
69+
data = {
70+
"a": rs.rand(n),
71+
"b": rs.randint(n * 10, size=n),
72+
"c": [f"foo{i}" for i in range(n)],
73+
}
74+
return pd.DataFrame(data)
75+
76+
77+
def _fetch_data(data_key: str, worker: str = None):
78+
# do nothing actually
79+
ctx = get_context()
80+
assert ctx.worker_address == worker
81+
with Timer() as timer:
82+
ctx.get_chunks_result([data_key], fetch_only=True)
83+
return timer.duration
84+
85+
86+
class TransferPackageSuite:
87+
"""
88+
Benchmark that times performance of storage transfer
89+
"""
90+
91+
def setup(self):
92+
mars.new_session(n_worker=2, n_cpu=8)
93+
94+
def time_1_to_1(self):
95+
return mr.spawn(send_1_to_1).execute().fetch()
96+
97+
98+
if __name__ == "__main__":
99+
suite = TransferPackageSuite()
100+
suite.setup()
101+
print(suite.time_1_to_1())

benchmarks/tpch/run_queries.py

Lines changed: 39 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -25,71 +25,55 @@
2525
queries: Optional[Union[Set[str], List[str]]] = None
2626

2727

28-
def load_lineitem(data_folder: str) -> md.DataFrame:
28+
def load_lineitem(data_folder: str, use_arrow_dtype: bool = None) -> md.DataFrame:
2929
data_path = data_folder + "/lineitem.pq"
30-
df = md.read_parquet(
31-
data_path,
32-
)
30+
df = md.read_parquet(data_path, use_arrow_dtype=use_arrow_dtype)
3331
df["L_SHIPDATE"] = md.to_datetime(df.L_SHIPDATE, format="%Y-%m-%d")
3432
df["L_RECEIPTDATE"] = md.to_datetime(df.L_RECEIPTDATE, format="%Y-%m-%d")
3533
df["L_COMMITDATE"] = md.to_datetime(df.L_COMMITDATE, format="%Y-%m-%d")
3634
return df
3735

3836

39-
def load_part(data_folder: str) -> md.DataFrame:
37+
def load_part(data_folder: str, use_arrow_dtype: bool = None) -> md.DataFrame:
4038
data_path = data_folder + "/part.pq"
41-
df = md.read_parquet(
42-
data_path,
43-
)
39+
df = md.read_parquet(data_path, use_arrow_dtype=use_arrow_dtype)
4440
return df
4541

4642

47-
def load_orders(data_folder: str) -> md.DataFrame:
43+
def load_orders(data_folder: str, use_arrow_dtype: bool = None) -> md.DataFrame:
4844
data_path = data_folder + "/orders.pq"
49-
df = md.read_parquet(
50-
data_path,
51-
)
45+
df = md.read_parquet(data_path, use_arrow_dtype=use_arrow_dtype)
5246
df["O_ORDERDATE"] = md.to_datetime(df.O_ORDERDATE, format="%Y-%m-%d")
5347
return df
5448

5549

56-
def load_customer(data_folder: str) -> md.DataFrame:
50+
def load_customer(data_folder: str, use_arrow_dtype: bool = None) -> md.DataFrame:
5751
data_path = data_folder + "/customer.pq"
58-
df = md.read_parquet(
59-
data_path,
60-
)
52+
df = md.read_parquet(data_path, use_arrow_dtype=use_arrow_dtype)
6153
return df
6254

6355

64-
def load_nation(data_folder: str) -> md.DataFrame:
56+
def load_nation(data_folder: str, use_arrow_dtype: bool = None) -> md.DataFrame:
6557
data_path = data_folder + "/nation.pq"
66-
df = md.read_parquet(
67-
data_path,
68-
)
58+
df = md.read_parquet(data_path, use_arrow_dtype=use_arrow_dtype)
6959
return df
7060

7161

72-
def load_region(data_folder: str) -> md.DataFrame:
62+
def load_region(data_folder: str, use_arrow_dtype: bool = None) -> md.DataFrame:
7363
data_path = data_folder + "/region.pq"
74-
df = md.read_parquet(
75-
data_path,
76-
)
64+
df = md.read_parquet(data_path, use_arrow_dtype=use_arrow_dtype)
7765
return df
7866

7967

80-
def load_supplier(data_folder: str) -> md.DataFrame:
68+
def load_supplier(data_folder: str, use_arrow_dtype: bool = None) -> md.DataFrame:
8169
data_path = data_folder + "/supplier.pq"
82-
df = md.read_parquet(
83-
data_path,
84-
)
70+
df = md.read_parquet(data_path, use_arrow_dtype=use_arrow_dtype)
8571
return df
8672

8773

88-
def load_partsupp(data_folder: str) -> md.DataFrame:
74+
def load_partsupp(data_folder: str, use_arrow_dtype: bool = None) -> md.DataFrame:
8975
data_path = data_folder + "/partsupp.pq"
90-
df = md.read_parquet(
91-
data_path,
92-
)
76+
df = md.read_parquet(data_path, use_arrow_dtype=use_arrow_dtype)
9377
return df
9478

9579

@@ -982,21 +966,23 @@ def q22(customer, orders):
982966
print(total.execute())
983967

984968

985-
def run_queries(data_folder: str, select: List[str] = None):
969+
def run_queries(
970+
data_folder: str, select: List[str] = None, use_arrow_dtype: bool = None
971+
):
986972
if select:
987973
global queries
988974
queries = select
989975

990976
# Load the data
991977
t1 = time.time()
992-
lineitem = load_lineitem(data_folder)
993-
orders = load_orders(data_folder)
994-
customer = load_customer(data_folder)
995-
nation = load_nation(data_folder)
996-
region = load_region(data_folder)
997-
supplier = load_supplier(data_folder)
998-
part = load_part(data_folder)
999-
partsupp = load_partsupp(data_folder)
978+
lineitem = load_lineitem(data_folder, use_arrow_dtype=use_arrow_dtype)
979+
orders = load_orders(data_folder, use_arrow_dtype=use_arrow_dtype)
980+
customer = load_customer(data_folder, use_arrow_dtype=use_arrow_dtype)
981+
nation = load_nation(data_folder, use_arrow_dtype=use_arrow_dtype)
982+
region = load_region(data_folder, use_arrow_dtype=use_arrow_dtype)
983+
supplier = load_supplier(data_folder, use_arrow_dtype=use_arrow_dtype)
984+
part = load_part(data_folder, use_arrow_dtype=use_arrow_dtype)
985+
partsupp = load_partsupp(data_folder, use_arrow_dtype=use_arrow_dtype)
1000986
mars.execute([lineitem, orders, customer, nation, region, supplier, part, partsupp])
1001987
print("Reading time (s): ", time.time() - t1)
1002988

@@ -1048,14 +1034,25 @@ def main():
10481034
"all tests will be executed"
10491035
),
10501036
)
1037+
parser.add_argument(
1038+
"--use-arrow-dtype",
1039+
type=str,
1040+
choices=["true", "false"],
1041+
help=("Use arrow dtype to read parquet"),
1042+
)
10511043
args = parser.parse_args()
10521044
folder = args.folder
10531045
endpoint = args.endpoint
1046+
use_arrow_dtype = args.use_arrow_dtype
1047+
if use_arrow_dtype == "true":
1048+
use_arrow_dtype = True
1049+
elif use_arrow_dtype == "false":
1050+
use_arrow_dtype = False
10541051
queries = (
10551052
set(x.lower().strip() for x in args.query.split(",")) if args.query else None
10561053
)
10571054
mars.new_session(endpoint)
1058-
run_queries(folder)
1055+
run_queries(folder, use_arrow_dtype=use_arrow_dtype)
10591056

10601057

10611058
if __name__ == "__main__":

mars/core/context.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,19 +117,21 @@ def get_slots(self) -> int:
117117
"""
118118

119119
@abstractmethod
120-
def get_chunks_result(self, data_keys: List[str]) -> List:
120+
def get_chunks_result(self, data_keys: List[str], fetch_only: bool = False) -> List:
121121
"""
122122
Get result of chunks.
123123
124124
Parameters
125125
----------
126126
data_keys : list
127127
Data keys.
128+
fetch_only : bool
129+
If fetch_only, only fetch data but not return.
128130
129131
Returns
130132
-------
131133
results : list
132-
Result of chunks
134+
Result of chunks if not fetch_only, else return None
133135
"""
134136

135137
@abstractmethod

mars/core/operand/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ class Operand(Base, OperatorLogicKeyGeneratorMixin, metaclass=OperandMetaclass):
176176
tileable_op_key = StringField("tileable_op_key", default=None)
177177
extra_params = DictField("extra_params", key_type=FieldTypes.string)
178178
# scheduling hint
179-
scheduling_hint = ReferenceField("scheduling_hint", default=None)
179+
scheduling_hint = ReferenceField("scheduling_hint", SchedulingHint, default=None)
180180

181181
_inputs = ListField(
182182
"inputs", FieldTypes.reference(EntityData), default_factory=list

mars/deploy/oscar/session.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1162,6 +1162,7 @@ async def fetch(self, *tileables, **kwargs) -> list:
11621162

11631163
async def fetch_infos(self, *tileables, fields, **kwargs) -> list:
11641164
available_fields = {
1165+
"data_key",
11651166
"object_id",
11661167
"object_refs",
11671168
"level",
@@ -1217,6 +1218,8 @@ async def fetch_infos(self, *tileables, fields, **kwargs) -> list:
12171218
bands = chunk_to_bands[fetch_info.chunk]
12181219
# Currently there's only one item in the returned List from storage_api.get_infos()
12191220
data = fetch_info.data[0]
1221+
if "data_key" in fields:
1222+
fetched["data_key"].append(fetch_info.chunk.key)
12201223
if "object_id" in fields:
12211224
fetched["object_id"].append(data.object_id)
12221225
if "level" in fields:
@@ -1245,7 +1248,7 @@ async def _query_meta_service(self, tileables, fields, query_storage):
12451248
get_chunk_metas.append(
12461249
self._meta_api.get_chunk_meta.delay(
12471250
chunk.key,
1248-
fields=["bands"] if query_storage else fields,
1251+
fields=["bands"] if query_storage else fields - {"data_key"},
12491252
)
12501253
)
12511254
fetch_infos.append(
@@ -1259,7 +1262,9 @@ async def _query_meta_service(self, tileables, fields, query_storage):
12591262
for fetch_infos in fetch_infos_list:
12601263
fetched = defaultdict(list)
12611264
for fetch_info in fetch_infos:
1262-
for field in fields:
1265+
if "data_key" in fields:
1266+
fetched["data_key"].append(fetch_info.chunk.key)
1267+
for field in fields - {"data_key"}:
12631268
fetched[field].append(chunk_to_meta[fetch_info.chunk][field])
12641269
result.append(fetched)
12651270
return {}, fetch_infos_list, result

0 commit comments

Comments
 (0)