Skip to content

Commit 277bae7

Browse files
authored
Make clearer separation of sessions and allow customized implementations (#3335)
1 parent 9b08d4a commit 277bae7

35 files changed

+1485
-1431
lines changed

benchmarks/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ This script assumes `tpch-dbgen` is in the same directory. If you downloaded it
4848

4949
### Installation
5050

51-
Follow the intstructions [here](https://docs.pymars.org/en/latest/installation/index.html).
51+
Follow the instructions [here](https://docs.pymars.org/en/latest/installation/index.html).
5252

5353
### Running queries
5454

mars/contrib/dask/converter.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515
from dask import is_dask_collection, optimize
1616
from dask.bag import Bag
1717

18+
from ...remote import spawn
1819
from .scheduler import mars_dask_get
1920
from .utils import reduce
20-
from ...remote import spawn
2121

2222

2323
def convert_dask_collection(dc):

mars/contrib/dask/scheduler.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,13 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
from typing import List, Tuple, Union
16+
1517
from dask.core import istask, ishashable
1618

17-
from typing import List, Tuple, Union
18-
from .utils import reduce
1919
from ...remote import spawn
20-
from ...deploy.oscar.session import execute
20+
from ...session import execute
21+
from .utils import reduce
2122

2223

2324
def mars_scheduler(dsk: dict, keys: Union[List[List[str]], List[str]]):

mars/core/entity/executable.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ def start(self):
3838
self._decref_thread.start()
3939

4040
def _thread_body(self):
41-
from ...deploy.oscar.session import SyncSession
4241
from ...oscar.errors import ActorNotExist
42+
from ...session import SyncSession
4343

4444
while True:
4545
key, session_ref, fut = self._queue.get()
@@ -130,7 +130,7 @@ def register(self, tileable: TileableType, session: SessionType):
130130

131131

132132
def _get_session(executable: "_ExecutableMixin", session: SessionType = None):
133-
from ...deploy.oscar.session import get_default_session
133+
from ...session import get_default_session
134134

135135
# if session is not specified, use default session
136136
if session is None:
@@ -144,7 +144,7 @@ class _ExecutableMixin:
144144
_executed_sessions: List[SessionType]
145145

146146
def execute(self, session: SessionType = None, **kw):
147-
from ...deploy.oscar.session import execute
147+
from ...session import execute
148148

149149
session = _get_session(self, session)
150150
return execute(self, session=session, **kw)
@@ -160,7 +160,7 @@ def _check_session(self, session: SessionType, action: str):
160160
)
161161

162162
def _fetch(self, session: SessionType = None, **kw):
163-
from ...deploy.oscar.session import fetch
163+
from ...session import fetch
164164

165165
session = _get_session(self, session)
166166
self._check_session(session, "fetch")
@@ -175,14 +175,14 @@ def fetch_log(
175175
offsets: List[int] = None,
176176
sizes: List[int] = None,
177177
):
178-
from ...deploy.oscar.session import fetch_log
178+
from ...session import fetch_log
179179

180180
session = _get_session(self, session)
181181
self._check_session(session, "fetch_log")
182182
return fetch_log(self, session=session, offsets=offsets, sizes=sizes)[0]
183183

184184
def _fetch_infos(self, fields=None, session=None, **kw):
185-
from ...deploy.oscar.session import fetch_infos
185+
from ...session import fetch_infos
186186

187187
session = _get_session(self, session)
188188
self._check_session(session, "fetch_infos")
@@ -207,7 +207,7 @@ class _ExecuteAndFetchMixin:
207207
__slots__ = ()
208208

209209
def _execute_and_fetch(self, session: SessionType = None, **kw):
210-
from ...deploy.oscar.session import ExecutionInfo, SyncSession, fetch
210+
from ...session import ExecutionInfo, SyncSession, fetch
211211

212212
session = _get_session(self, session)
213213
fetch_kwargs = kw.pop("fetch_kwargs", dict())
@@ -273,7 +273,7 @@ def __repr__(self):
273273
return "%s(%s)" % (self._raw_type.__name__, ", ".join(items))
274274

275275
def execute(self, session: SessionType = None, **kw):
276-
from ...deploy.oscar.session import execute
276+
from ...session import execute
277277

278278
if len(self) == 0:
279279
return self
@@ -290,14 +290,14 @@ def execute(self, session: SessionType = None, **kw):
290290
return ret
291291

292292
def _fetch(self, session: SessionType = None, **kw):
293-
from ...deploy.oscar.session import fetch
293+
from ...session import fetch
294294

295295
session = _get_session(self, session)
296296
self._check_session(session, "fetch")
297297
return fetch(*self, session=session, **kw)
298298

299299
def _fetch_infos(self, fields=None, session=None, **kw):
300-
from ...deploy.oscar.session import fetch_infos
300+
from ...session import fetch_infos
301301

302302
session = _get_session(self, session)
303303
self._check_session(session, "fetch_infos")
@@ -321,7 +321,7 @@ def fetch_log(
321321
offsets: List[int] = None,
322322
sizes: List[int] = None,
323323
):
324-
from ...deploy.oscar.session import fetch_log
324+
from ...session import fetch_log
325325

326326
if len(self) == 0:
327327
return []

mars/dataframe/base/drop.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ def df_drop(
308308
309309
Drop columns and/or rows of MultiIndex DataFrame
310310
311-
>>> midx = pd.MultiIndex(levels=[['lama', 'cow', 'falcon'],
311+
>>> midx = pd.MultiIndex(levels=[['lame', 'cow', 'falcon'],
312312
... ['speed', 'weight', 'length']],
313313
... codes=[[0, 0, 0, 1, 1, 1, 2, 2, 2],
314314
... [0, 1, 2, 0, 1, 2, 0, 1, 2]])
@@ -318,7 +318,7 @@ def df_drop(
318318
... [1, 0.8], [0.3, 0.2]])
319319
>>> df.execute()
320320
big small
321-
lama speed 45.0 30.0
321+
lame speed 45.0 30.0
322322
weight 200.0 100.0
323323
length 1.5 1.0
324324
cow speed 30.0 20.0
@@ -330,7 +330,7 @@ def df_drop(
330330
331331
>>> df.drop(index='cow', columns='small').execute()
332332
big
333-
lama speed 45.0
333+
lame speed 45.0
334334
weight 200.0
335335
length 1.5
336336
falcon speed 320.0
@@ -339,7 +339,7 @@ def df_drop(
339339
340340
>>> df.drop(index='length', level=1).execute()
341341
big small
342-
lama speed 45.0 30.0
342+
lame speed 45.0 30.0
343343
weight 200.0 100.0
344344
cow speed 30.0 20.0
345345
weight 250.0 150.0
@@ -483,14 +483,14 @@ def series_drop(
483483
484484
Drop 2nd level label in MultiIndex Series
485485
486-
>>> midx = pd.MultiIndex(levels=[['lama', 'cow', 'falcon'],
486+
>>> midx = pd.MultiIndex(levels=[['lame', 'cow', 'falcon'],
487487
... ['speed', 'weight', 'length']],
488488
... codes=[[0, 0, 0, 1, 1, 1, 2, 2, 2],
489489
... [0, 1, 2, 0, 1, 2, 0, 1, 2]])
490490
>>> s = md.Series([45, 200, 1.2, 30, 250, 1.5, 320, 1, 0.3],
491491
... index=midx)
492492
>>> s.execute()
493-
lama speed 45.0
493+
lame speed 45.0
494494
weight 200.0
495495
length 1.2
496496
cow speed 30.0
@@ -502,7 +502,7 @@ def series_drop(
502502
dtype: float64
503503
504504
>>> s.drop(labels='weight', level=1).execute()
505-
lama speed 45.0
505+
lame speed 45.0
506506
length 1.2
507507
cow speed 30.0
508508
length 1.5

mars/dataframe/base/drop_duplicates.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -314,14 +314,14 @@ def series_drop_duplicates(series, keep="first", inplace=False, method="auto"):
314314
Generate a Series with duplicated entries.
315315
316316
>>> import mars.dataframe as md
317-
>>> s = md.Series(['lama', 'cow', 'lama', 'beetle', 'lama', 'hippo'],
317+
>>> s = md.Series(['lame', 'cow', 'lame', 'beetle', 'lame', 'hippo'],
318318
... name='animal')
319319
>>> s.execute()
320-
0 lama
320+
0 lame
321321
1 cow
322-
2 lama
322+
2 lame
323323
3 beetle
324-
4 lama
324+
4 lame
325325
5 hippo
326326
Name: animal, dtype: object
327327
@@ -330,7 +330,7 @@ def series_drop_duplicates(series, keep="first", inplace=False, method="auto"):
330330
set of duplicated entries. The default value of keep is 'first'.
331331
332332
>>> s.drop_duplicates().execute()
333-
0 lama
333+
0 lame
334334
1 cow
335335
3 beetle
336336
5 hippo
@@ -342,7 +342,7 @@ def series_drop_duplicates(series, keep="first", inplace=False, method="auto"):
342342
>>> s.drop_duplicates(keep='last').execute()
343343
1 cow
344344
3 beetle
345-
4 lama
345+
4 lame
346346
5 hippo
347347
Name: animal, dtype: object
348348
@@ -393,20 +393,20 @@ def index_drop_duplicates(index, keep="first", method="auto"):
393393
394394
>>> import mars.dataframe as md
395395
396-
>>> idx = md.Index(['lama', 'cow', 'lama', 'beetle', 'lama', 'hippo'])
396+
>>> idx = md.Index(['lame', 'cow', 'lame', 'beetle', 'lame', 'hippo'])
397397
398398
The `keep` parameter controls which duplicate values are removed.
399399
The value 'first' keeps the first occurrence for each
400400
set of duplicated entries. The default value of keep is 'first'.
401401
402402
>>> idx.drop_duplicates(keep='first').execute()
403-
Index(['lama', 'cow', 'beetle', 'hippo'], dtype='object')
403+
Index(['lame', 'cow', 'beetle', 'hippo'], dtype='object')
404404
405405
The value 'last' keeps the last occurrence for each set of duplicated
406406
entries.
407407
408408
>>> idx.drop_duplicates(keep='last').execute()
409-
Index(['cow', 'beetle', 'lama', 'hippo'], dtype='object')
409+
Index(['cow', 'beetle', 'lame', 'hippo'], dtype='object')
410410
411411
The value ``False`` discards all sets of duplicated entries.
412412

mars/dataframe/base/duplicated.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,7 @@ def series_duplicated(series, keep="first", method="auto"):
427427
428428
>>> import mars.dataframe as md
429429
430-
>>> animals = md.Series(['lama', 'cow', 'lama', 'beetle', 'lama'])
430+
>>> animals = md.Series(['lame', 'cow', 'lame', 'beetle', 'lame'])
431431
>>> animals.duplicated().execute()
432432
0 False
433433
1 False
@@ -510,7 +510,7 @@ def index_duplicated(index, keep="first"):
510510
511511
>>> import mars.dataframe as md
512512
513-
>>> idx = md.Index(['lama', 'cow', 'lama', 'beetle', 'lama'])
513+
>>> idx = md.Index(['lame', 'cow', 'lame', 'beetle', 'lame'])
514514
>>> idx.duplicated().execute()
515515
array([False, False, True, False, True])
516516

mars/dataframe/base/isin.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -291,9 +291,9 @@ def series_isin(elements, values):
291291
Examples
292292
--------
293293
>>> import mars.dataframe as md
294-
>>> s = md.Series(['lama', 'cow', 'lama', 'beetle', 'lama',
294+
>>> s = md.Series(['lame', 'cow', 'lame', 'beetle', 'lame',
295295
... 'hippo'], name='animal')
296-
>>> s.isin(['cow', 'lama']).execute()
296+
>>> s.isin(['cow', 'lame']).execute()
297297
0 True
298298
1 True
299299
2 True
@@ -302,10 +302,10 @@ def series_isin(elements, values):
302302
5 False
303303
Name: animal, dtype: bool
304304
305-
Passing a single string as ``s.isin('lama')`` will raise an error. Use
305+
Passing a single string as ``s.isin('lame')`` will raise an error. Use
306306
a list of one element instead:
307307
308-
>>> s.isin(['lama']).execute()
308+
>>> s.isin(['lame']).execute()
309309
0 True
310310
1 False
311311
2 True

mars/dataframe/contrib/raydataset/tests/test_mldataset.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from ..... import dataframe as md
2121
from .....conftest import MARS_CI_BACKEND
2222
from .....deploy.oscar.ray import new_cluster
23-
from .....deploy.oscar.session import new_session
23+
from .....session import new_session
2424
from .....tests.core import require_ray
2525
from .....utils import lazy_import
2626
from ....utils import ray_deprecate_ml_dataset

mars/dataframe/contrib/raydataset/tests/test_raydataset.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from ..... import dataframe as md
2121
from .....conftest import MARS_CI_BACKEND
2222
from .....deploy.oscar.ray import new_cluster
23-
from .....deploy.oscar.session import new_session
23+
from .....session import new_session
2424
from .....tests.core import require_ray
2525
from .....utils import lazy_import
2626
from ....contrib import raydataset as mdd

0 commit comments

Comments
 (0)