Skip to content

Commit 9b08d4a

Browse files
fyrestoneUranusSevenChengjieLi28aresnow1qianduoduo0904
authored
Merge code (#3330)
* Implements ColumnPruning optimization rule (#30) Co-authored-by: ChengjieLi <chengjieli23@outlook.com> Co-authored-by: aresnow <aresnow1@gmail.com> * Fix execution hang on optimization error (#33) * enh: Optimize groupby nunique implementation (#38) * enh: Support passing mars objects when call df.map_chunk (#41) * enh: Support `read_parquet` for GPU (#45) * enh: Merge small data on shuffle mapper side (#49) * Fix: `series.drop_duplicates()` failed (#53) * Fix: dataframe.isna on scaler args (#55) * enh: Support df or series type for apply function (#56) * enh: Support running TPC-H queries on GPU (#60) * Avoid creating too much thread pools (#62) * enh: Optimize DataFrame.isin (#67) * tst: Fix gpu CI (#42) * enh: Refactor `read_buffers` & `write_buffers` for GPU (#68) * Fix: ensure dataframe.all acts identical to pandas (#79) * fix: Fix wrong results of `DataFrame.replace` (#86) * enh: Add `skip_infer` paramter for all use defined functions (#76) * fix: Modify signature of `df.map_chunk` (#87) * BUG: DataFrame.agg with built-in functions (#91) Co-authored-by: UranusSeven <109661872+UranusSeven@users.noreply.github.com> * Fix dataframe.ewm (#97) * BUG: Fix dataframe.sample() (#99) * BUG: df.apply with list input returns wrong answer when axis=1 (#100) * BUG: Fix md.unique (#102) * Fix implement ordered in md.cut (#104) Co-authored-by: UranusSeven <109661872+UranusSeven@users.noreply.github.com> * Fix: map_chunk output type inference (#111) * Enh: support groupby plot (#113) * ENH: Clear default context after execution (#115) * BUG: fix supervisor start method (#116) * BUG: Fix `date_range` and pin sphinx<6.0.0 (#118) * BUG: handle missing merge key (#124) * BUG: Fix `read_csv` with specific names and header (#130) * BUG: needed cols being pruned (#134) * BUG: Fix `read_parquet` with latest pyarrow (#135) * BUG: Suppress `FutureWarning` (#108) * Fix merge * Add `_repr_mimebundle_` ... as `_repr_svg_` is deprecated in `Source` class of `graphviz`. --------- Co-authored-by: UranusSeven <109661872+UranusSeven@users.noreply.github.com> Co-authored-by: ChengjieLi <chengjieli23@outlook.com> Co-authored-by: aresnow <aresnow1@gmail.com> Co-authored-by: Chengjie Li <109656400+ChengjieLi28@users.noreply.github.com> Co-authored-by: aresnow1 <109642806+aresnow1@users.noreply.github.com> Co-authored-by: qianduoduo0904 <109654808+qianduoduo0904@users.noreply.github.com> Co-authored-by: 黄浩杰 <65081722+hoarjour@users.noreply.github.com> Co-authored-by: 刘宝 <po.lb@antgroup.com> Co-authored-by: Wenjun Si <swj0066@gmail.com>
1 parent 1863382 commit 9b08d4a

File tree

133 files changed

+5567
-1203
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

133 files changed

+5567
-1203
lines changed

azure-pipelines.yml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,7 @@ jobs:
9494
9595
# do compatibility test for earliest supported pandas release
9696
if [[ "$(mars.test.module)" == "dataframe" ]]; then
97-
pip install numpy\<1.24.0 sqlalchemy\<2.0
98-
pip install -i https://pkgs.dev.azure.com/mars-project/mars/_packaging/pandas/pypi/simple/ pandas==1.0.5
97+
pip install numpy\<1.24.0
9998
pytest $PYTEST_CONFIG -m pd_compat mars/dataframe
10099
mv .coverage build/.coverage.pd_compat.file
101100
fi

benchmarks/asv_bench/benchmarks/storage.py

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,47 +24,49 @@
2424
from mars.utils import Timer, readable_size
2525

2626

27-
def send_1_to_1(n: int = None):
27+
def send_1_to_1(n: int = None, cpu: bool = True):
2828
ctx = get_context()
29-
workers = ctx.get_worker_addresses()
30-
31-
worker_to_gen_data = {
32-
w: mr.spawn(_gen_data, kwargs=dict(n=n, worker=w), expect_worker=w)
33-
for i, w in enumerate(workers)
29+
bands = [
30+
b
31+
for b in ctx.get_worker_bands()
32+
if (cpu and b[1].startswith("numa-")) or (not cpu and b[1].startswith("gpu-"))
33+
]
34+
35+
band_to_gen_data = {
36+
b: mr.spawn(_gen_data, kwargs=dict(n=n, band=b), expect_band=b)
37+
for i, b in enumerate(bands)
3438
}
35-
all_data = mars.execute(list(worker_to_gen_data.values()))
39+
all_data = mars.execute(list(band_to_gen_data.values()))
3640
progress = 0.1
3741
ctx.set_progress(progress)
3842
infos = [d._fetch_infos(fields=["data_key", "store_size"]) for d in all_data]
3943
data_size = infos[0]["store_size"][0]
40-
worker_to_data_keys = dict(zip(workers, [info["data_key"][0] for info in infos]))
44+
band_to_data_keys = dict(zip(bands, [info["data_key"][0] for info in infos]))
4145

42-
workers_to_durations = dict()
43-
size = len(workers) * (len(workers) - 1)
44-
for worker1, worker2 in itertools.permutations(workers, 2):
46+
bands_to_durations = dict()
47+
size = len(bands) * (len(bands) - 1)
48+
for band1, band2 in itertools.permutations(bands, 2):
4549
fetch_data = mr.spawn(
4650
_fetch_data,
47-
args=(worker_to_data_keys[worker1],),
48-
kwargs=dict(worker=worker2),
49-
expect_worker=worker2,
51+
args=(band_to_data_keys[band1],),
52+
kwargs=dict(band=band2),
53+
expect_band=band2,
5054
)
5155
fetch_time = fetch_data.execute().fetch()
5256
rate = readable_size(data_size / fetch_time)
53-
workers_to_durations[worker1, worker2] = (
57+
bands_to_durations[band1, band2] = (
5458
readable_size(data_size),
5559
f"{rate}B/s",
5660
)
5761
progress += 0.9 / size
5862
ctx.set_progress(min(progress, 1.0))
59-
return workers_to_durations
63+
return bands_to_durations
6064

6165

62-
def _gen_data(
63-
n: int = None, worker: str = None, check_addr: bool = True
64-
) -> pd.DataFrame:
66+
def _gen_data(n: int = None, band: str = None, check_addr: bool = True) -> pd.DataFrame:
6567
if check_addr:
6668
ctx = get_context()
67-
assert ctx.worker_address == worker
69+
assert ctx.band == band
6870
n = n if n is not None else 5_000_000
6971
rs = np.random.RandomState(123)
7072
data = {
@@ -75,10 +77,10 @@ def _gen_data(
7577
return pd.DataFrame(data)
7678

7779

78-
def _fetch_data(data_key: str, worker: str = None):
80+
def _fetch_data(data_key: str, band: str = None):
7981
# do nothing actually
8082
ctx = get_context()
81-
assert ctx.worker_address == worker
83+
assert ctx.band == band
8284
with Timer() as timer:
8385
ctx.get_chunks_result([data_key], fetch_only=True)
8486
return timer.duration

benchmarks/tpch/run_queries.py

Lines changed: 60 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -25,55 +25,71 @@
2525
queries: Optional[Union[Set[str], List[str]]] = None
2626

2727

28-
def load_lineitem(data_folder: str, use_arrow_dtype: bool = None) -> md.DataFrame:
28+
def load_lineitem(
29+
data_folder: str, use_arrow_dtype: bool = None, gpu: bool = False
30+
) -> md.DataFrame:
2931
data_path = data_folder + "/lineitem.pq"
30-
df = md.read_parquet(data_path, use_arrow_dtype=use_arrow_dtype)
32+
df = md.read_parquet(data_path, use_arrow_dtype=use_arrow_dtype, gpu=gpu)
3133
df["L_SHIPDATE"] = md.to_datetime(df.L_SHIPDATE, format="%Y-%m-%d")
3234
df["L_RECEIPTDATE"] = md.to_datetime(df.L_RECEIPTDATE, format="%Y-%m-%d")
3335
df["L_COMMITDATE"] = md.to_datetime(df.L_COMMITDATE, format="%Y-%m-%d")
3436
return df
3537

3638

37-
def load_part(data_folder: str, use_arrow_dtype: bool = None) -> md.DataFrame:
39+
def load_part(
40+
data_folder: str, use_arrow_dtype: bool = None, gpu: bool = False
41+
) -> md.DataFrame:
3842
data_path = data_folder + "/part.pq"
39-
df = md.read_parquet(data_path, use_arrow_dtype=use_arrow_dtype)
43+
df = md.read_parquet(data_path, use_arrow_dtype=use_arrow_dtype, gpu=gpu)
4044
return df
4145

4246

43-
def load_orders(data_folder: str, use_arrow_dtype: bool = None) -> md.DataFrame:
47+
def load_orders(
48+
data_folder: str, use_arrow_dtype: bool = None, gpu: bool = False
49+
) -> md.DataFrame:
4450
data_path = data_folder + "/orders.pq"
45-
df = md.read_parquet(data_path, use_arrow_dtype=use_arrow_dtype)
51+
df = md.read_parquet(data_path, use_arrow_dtype=use_arrow_dtype, gpu=gpu)
4652
df["O_ORDERDATE"] = md.to_datetime(df.O_ORDERDATE, format="%Y-%m-%d")
4753
return df
4854

4955

50-
def load_customer(data_folder: str, use_arrow_dtype: bool = None) -> md.DataFrame:
56+
def load_customer(
57+
data_folder: str, use_arrow_dtype: bool = None, gpu: bool = False
58+
) -> md.DataFrame:
5159
data_path = data_folder + "/customer.pq"
52-
df = md.read_parquet(data_path, use_arrow_dtype=use_arrow_dtype)
60+
df = md.read_parquet(data_path, use_arrow_dtype=use_arrow_dtype, gpu=gpu)
5361
return df
5462

5563

56-
def load_nation(data_folder: str, use_arrow_dtype: bool = None) -> md.DataFrame:
64+
def load_nation(
65+
data_folder: str, use_arrow_dtype: bool = None, gpu: bool = False
66+
) -> md.DataFrame:
5767
data_path = data_folder + "/nation.pq"
58-
df = md.read_parquet(data_path, use_arrow_dtype=use_arrow_dtype)
68+
df = md.read_parquet(data_path, use_arrow_dtype=use_arrow_dtype, gpu=gpu)
5969
return df
6070

6171

62-
def load_region(data_folder: str, use_arrow_dtype: bool = None) -> md.DataFrame:
72+
def load_region(
73+
data_folder: str, use_arrow_dtype: bool = None, gpu: bool = False
74+
) -> md.DataFrame:
6375
data_path = data_folder + "/region.pq"
64-
df = md.read_parquet(data_path, use_arrow_dtype=use_arrow_dtype)
76+
df = md.read_parquet(data_path, use_arrow_dtype=use_arrow_dtype, gpu=gpu)
6577
return df
6678

6779

68-
def load_supplier(data_folder: str, use_arrow_dtype: bool = None) -> md.DataFrame:
80+
def load_supplier(
81+
data_folder: str, use_arrow_dtype: bool = None, gpu: bool = False
82+
) -> md.DataFrame:
6983
data_path = data_folder + "/supplier.pq"
70-
df = md.read_parquet(data_path, use_arrow_dtype=use_arrow_dtype)
84+
df = md.read_parquet(data_path, use_arrow_dtype=use_arrow_dtype, gpu=gpu)
7185
return df
7286

7387

74-
def load_partsupp(data_folder: str, use_arrow_dtype: bool = None) -> md.DataFrame:
88+
def load_partsupp(
89+
data_folder: str, use_arrow_dtype: bool = None, gpu: bool = False
90+
) -> md.DataFrame:
7591
data_path = data_folder + "/partsupp.pq"
76-
df = md.read_parquet(data_path, use_arrow_dtype=use_arrow_dtype)
92+
df = md.read_parquet(data_path, use_arrow_dtype=use_arrow_dtype, gpu=gpu)
7793
return df
7894

7995

@@ -967,22 +983,25 @@ def q22(customer, orders):
967983

968984

969985
def run_queries(
970-
data_folder: str, select: List[str] = None, use_arrow_dtype: bool = None
986+
data_folder: str,
987+
select: List[str] = None,
988+
use_arrow_dtype: bool = None,
989+
gpu: bool = False,
971990
):
972991
if select:
973992
global queries
974993
queries = select
975994

976995
# Load the data
977996
t1 = time.time()
978-
lineitem = load_lineitem(data_folder, use_arrow_dtype=use_arrow_dtype)
979-
orders = load_orders(data_folder, use_arrow_dtype=use_arrow_dtype)
980-
customer = load_customer(data_folder, use_arrow_dtype=use_arrow_dtype)
981-
nation = load_nation(data_folder, use_arrow_dtype=use_arrow_dtype)
982-
region = load_region(data_folder, use_arrow_dtype=use_arrow_dtype)
983-
supplier = load_supplier(data_folder, use_arrow_dtype=use_arrow_dtype)
984-
part = load_part(data_folder, use_arrow_dtype=use_arrow_dtype)
985-
partsupp = load_partsupp(data_folder, use_arrow_dtype=use_arrow_dtype)
997+
lineitem = load_lineitem(data_folder, use_arrow_dtype=use_arrow_dtype, gpu=gpu)
998+
orders = load_orders(data_folder, use_arrow_dtype=use_arrow_dtype, gpu=gpu)
999+
customer = load_customer(data_folder, use_arrow_dtype=use_arrow_dtype, gpu=gpu)
1000+
nation = load_nation(data_folder, use_arrow_dtype=use_arrow_dtype, gpu=gpu)
1001+
region = load_region(data_folder, use_arrow_dtype=use_arrow_dtype, gpu=gpu)
1002+
supplier = load_supplier(data_folder, use_arrow_dtype=use_arrow_dtype, gpu=gpu)
1003+
part = load_part(data_folder, use_arrow_dtype=use_arrow_dtype, gpu=gpu)
1004+
partsupp = load_partsupp(data_folder, use_arrow_dtype=use_arrow_dtype, gpu=gpu)
9861005
mars.execute([lineitem, orders, customer, nation, region, supplier, part, partsupp])
9871006
print("Reading time (s): ", time.time() - t1)
9881007

@@ -1038,7 +1057,15 @@ def main():
10381057
"--use-arrow-dtype",
10391058
type=str,
10401059
choices=["true", "false"],
1041-
help=("Use arrow dtype to read parquet"),
1060+
help="Use arrow dtype to read parquet",
1061+
)
1062+
parser.add_argument(
1063+
"--gpu", "-g", action="store_true", help="Use GPU to read parquet"
1064+
)
1065+
parser.add_argument(
1066+
"--cuda-devices",
1067+
type=str,
1068+
help="GPU devices to use, use comma to split, only available when using GPU",
10421069
)
10431070
args = parser.parse_args()
10441071
folder = args.folder
@@ -1051,9 +1078,14 @@ def main():
10511078
queries = (
10521079
set(x.lower().strip() for x in args.query.split(",")) if args.query else None
10531080
)
1054-
sess = mars.new_session(endpoint)
1081+
gpu = args.gpu
1082+
new_session_kwargs = dict()
1083+
if gpu and args.cuda_devices:
1084+
cuda_devices = args.cuda_devices.split(",")
1085+
new_session_kwargs["cuda_devices"] = [int(d) for d in cuda_devices]
1086+
sess = mars.new_session(endpoint, **new_session_kwargs)
10551087
try:
1056-
run_queries(folder, use_arrow_dtype=use_arrow_dtype)
1088+
run_queries(folder, use_arrow_dtype=use_arrow_dtype, gpu=gpu)
10571089
finally:
10581090
if endpoint is None:
10591091
sess.stop_server()

mars/_utils.pyx

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
15+
1516
import collections
1617
import importlib
1718
import itertools
@@ -21,6 +22,7 @@ import pkgutil
2122
import time
2223
import types
2324
import uuid
25+
import warnings
2426
from datetime import date, datetime, timedelta, tzinfo
2527
from enum import Enum
2628
from functools import lru_cache, partial
@@ -138,7 +140,12 @@ cdef class TypeDispatcher:
138140
cdef _reload_lazy_handlers(self):
139141
for k, v in self._lazy_handlers.items():
140142
mod_name, obj_name = k.rsplit('.', 1)
141-
mod = importlib.import_module(mod_name, __name__)
143+
with warnings.catch_warnings():
144+
# the lazy imported cudf will warn no device found,
145+
# when we set visible device to -1 for CPU processes,
146+
# ignore the warning to not distract users
147+
warnings.simplefilter("ignore")
148+
mod = importlib.import_module(mod_name, __name__)
142149
self.register(getattr(mod, obj_name), v)
143150
self._lazy_handlers = dict()
144151

mars/core/context.py

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
from ..typing import BandType, SessionType
2020
from ..storage.base import StorageLevel
21+
from ..utils import classproperty
2122

2223

2324
class Context(ABC):
@@ -26,8 +27,7 @@ class Context(ABC):
2627
used inside `tile` and `execute`.
2728
"""
2829

29-
prev = None
30-
current = None
30+
all_contexts = []
3131

3232
def __init__(
3333
self,
@@ -96,6 +96,16 @@ def get_worker_addresses(self) -> List[str]:
9696
worker_addresses : list
9797
"""
9898

99+
@abstractmethod
100+
def get_worker_bands(self) -> List[BandType]:
101+
"""
102+
Get worker bands.
103+
104+
Returns
105+
-------
106+
worker_bands : list
107+
"""
108+
99109
@abstractmethod
100110
def get_total_n_cpu(self) -> int:
101111
"""
@@ -276,16 +286,18 @@ def set_progress(self, progress: float):
276286
"""
277287

278288
def __enter__(self):
279-
Context.prev = Context.current
280-
Context.current = self
289+
Context.all_contexts.append(self)
281290

282291
def __exit__(self, *_):
283-
Context.current = Context.prev
284-
Context.prev = None
292+
Context.all_contexts.pop()
293+
294+
@classproperty
295+
def current(cls):
296+
return cls.all_contexts[-1] if cls.all_contexts else None
285297

286298

287299
def set_context(context: Context):
288-
Context.current = context
300+
Context.all_contexts.append(context)
289301

290302

291303
def get_context() -> Context:

mars/core/entity/chunks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ class ChunkData(EntityData):
2323
is_broadcaster = BoolField("is_broadcaster", default=False)
2424
# If the operand is a shuffle mapper, this flag indicates whether the current chunk is mapper chunk when
2525
# the operand produce multiple chunks such as TensorUnique.
26-
is_mapper = BoolField("is_mapper", default=False)
26+
is_mapper = BoolField("is_mapper", default=None)
2727
# optional fields
2828
_index = TupleField("index", FieldTypes.uint32)
2929

mars/core/entity/executable.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ def put(self, key: str, session_ref: ref):
8383

8484
class _TileableSession:
8585
def __init__(self, tileable: TileableType, session: SessionType):
86+
self._sess_id = id(session)
8687
key = tileable.key
8788

8889
def cb(_, sess=ref(session)):
@@ -106,6 +107,9 @@ def cb(_, sess=ref(session)):
106107

107108
self.tileable = ref(tileable, cb)
108109

110+
def __eq__(self, other: "_TileableSession"):
111+
return self._sess_id == other._sess_id
112+
109113

110114
class _TileableDataCleaner:
111115
def __init__(self):
@@ -189,6 +193,15 @@ def _attach_session(self, session: SessionType):
189193
_cleaner.register(self, session)
190194
self._executed_sessions.append(session)
191195

196+
def _detach_session(self, session: SessionType):
197+
if session in self._executed_sessions:
198+
sessions = _cleaner._tileable_to_sessions.get(self, [])
199+
if sessions:
200+
sessions.remove(_TileableSession(self, session))
201+
if len(sessions) == 0:
202+
del _cleaner._tileable_to_sessions[self]
203+
self._executed_sessions.remove(session)
204+
192205

193206
class _ExecuteAndFetchMixin:
194207
__slots__ = ()

mars/core/entity/output_types.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ class OutputType(Enum):
2929
categorical = 7
3030
dataframe_groupby = 8
3131
series_groupby = 9
32+
df_or_series = 10
3233

3334
@classmethod
3435
def serialize_list(cls, output_types):

mars/core/entity/tileables.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ class TileableData(EntityData, _ExecutableMixin):
270270
# `nsplits` means the sizes of chunks for each dimension
271271
_nsplits = TupleField(
272272
"nsplits",
273-
FieldTypes.tuple(FieldTypes.uint64),
273+
FieldTypes.tuple(FieldTypes.tuple(FieldTypes.uint64)),
274274
on_serialize=on_serialize_nsplits,
275275
)
276276
# cache tileable data, if true, this data will be materialized

0 commit comments

Comments
 (0)