Skip to content

Commit 0a42ba8

Browse files
authored
Fix conversion from arrow column to arrow RecordBatch (#3359)
1 parent 2175678 commit 0a42ba8

File tree

19 files changed

+71
-41
lines changed

19 files changed

+71
-41
lines changed

mars/dataframe/arrays.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,12 @@ def __repr__(self):
301301
def _array(self):
302302
return self._arrow_array if self._use_arrow else self._ndarray
303303

304+
def __arrow_array__(self, type=None):
305+
if self._use_arrow:
306+
combined = self._arrow_array.combine_chunks()
307+
return combined.cast(type) if type else combined
308+
return super().__arrow_array__(type=type)
309+
304310
@property
305311
def dtype(self) -> "Type[ArrowDtype]":
306312
return self._dtype

mars/dataframe/core.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
tokenize,
6868
estimate_pandas_size,
6969
calc_nsplits,
70+
is_debugger_repr_thread,
7071
)
7172
from .utils import fetch_corner_data, ReprSeries, parse_index, merge_index_value
7273

@@ -1430,6 +1431,10 @@ def __mars_tensor__(self, dtype=None, order="K"):
14301431
return tensor.astype(dtype=dtype, order=order, copy=False)
14311432

14321433
def iteritems(self, batch_size=10000, session=None):
1434+
if is_build_mode():
1435+
raise NotImplementedError("Not implemented when building dags")
1436+
if is_debugger_repr_thread() and len(self._executed_sessions) == 0:
1437+
raise NotImplementedError("Not implemented when not executed under debug")
14331438
for batch_data in self.iterbatch(batch_size=batch_size, session=session):
14341439
yield from getattr(batch_data, "iteritems")()
14351440

mars/dataframe/tests/test_arrays.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -480,3 +480,7 @@ def test_to_pandas():
480480
s2 = df2["b"].str[:2]
481481
expected = df["b"].astype("string").str[:2]
482482
pd.testing.assert_series_equal(s2, expected)
483+
484+
# test reverse conversion to arrow
485+
arrow_data = pa.RecordBatch.from_pandas(df2)
486+
assert arrow_data.num_rows == len(df2)

mars/learn/contrib/joblib/backend.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ def configure(self, n_jobs=1, parallel=None, **backend_args):
5959

6060
def effective_n_jobs(self, n_jobs):
6161
eff_n_jobs = super(MarsDistributedBackend, self).effective_n_jobs(n_jobs)
62-
if n_jobs == -1:
62+
if n_jobs == -1 or not eff_n_jobs:
6363
eff_n_jobs = self.n_parallel
6464
return eff_n_jobs
6565

mars/learn/contrib/lightgbm/classifier.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@
2424
lightgbm = None
2525

2626

27-
LGBMClassifier = make_import_error_func("lightgbm")
28-
if lightgbm:
27+
if not lightgbm:
28+
LGBMClassifier = make_import_error_func("lightgbm")
29+
else:
2930

3031
class LGBMClassifier(LGBMScikitLearnBase, lightgbm.LGBMClassifier):
3132
def fit(

mars/learn/contrib/lightgbm/core.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@
1919
import numpy as np
2020
import pandas as pd
2121

22-
from ....tensor import tensor as mars_tensor
2322
from ....dataframe import DataFrame as MarsDataFrame, Series as MarsSeries
23+
from ....lib.version import parse as parse_version
24+
from ....tensor import tensor as mars_tensor
2425

2526

2627
class LGBMModelType(enum.Enum):

mars/learn/contrib/lightgbm/ranker.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@
2424
lightgbm = None
2525

2626

27-
LGBMRanker = make_import_error_func("lightgbm")
28-
if lightgbm:
27+
if not lightgbm:
28+
LGBMRanker = make_import_error_func("lightgbm")
29+
else:
2930

3031
class LGBMRanker(LGBMScikitLearnBase, lightgbm.LGBMRanker):
3132
def fit(

mars/learn/contrib/lightgbm/regressor.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@
2424
lightgbm = None
2525

2626

27-
LGBMRegressor = make_import_error_func("lightgbm")
28-
if lightgbm:
27+
if not lightgbm:
28+
LGBMRegressor = make_import_error_func("lightgbm")
29+
else:
2930

3031
class LGBMRegressor(LGBMScikitLearnBase, lightgbm.LGBMRegressor):
3132
def fit(

mars/learn/contrib/lightgbm/tests/test_classifier.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ async def create_cluster():
6464
def test_local_classifier(create_cluster):
6565
y_data = (y * 10).astype(mt.int32)
6666
classifier = LGBMClassifier(n_estimators=2)
67-
classifier.fit(X, y_data, eval_set=[(X, y_data)], verbose=True)
67+
classifier.fit(X, y_data, eval_set=[(X, y_data)])
6868
prediction = classifier.predict(X)
6969

7070
assert prediction.ndim == 1
@@ -76,7 +76,7 @@ def test_local_classifier(create_cluster):
7676
X_sparse_data = X_sparse
7777
classifier = LGBMClassifier(n_estimators=2)
7878
classifier.fit(
79-
X_sparse_data, y_data, eval_set=[(X_sparse_data, y_data)], verbose=True
79+
X_sparse_data, y_data, eval_set=[(X_sparse_data, y_data)]
8080
)
8181
prediction = classifier.predict(X_sparse_data)
8282

@@ -94,7 +94,7 @@ def test_local_classifier(create_cluster):
9494
# test dataframe
9595
X_df_data = X_df
9696
classifier = LGBMClassifier(n_estimators=2)
97-
classifier.fit(X_df_data, y_data, verbose=True)
97+
classifier.fit(X_df_data, y_data)
9898
prediction = classifier.predict(X_df_data)
9999

100100
assert prediction.ndim == 1
@@ -110,7 +110,7 @@ def test_local_classifier(create_cluster):
110110
y_df = md.DataFrame(y_data)
111111
for weight in weights:
112112
classifier = LGBMClassifier(n_estimators=2)
113-
classifier.fit(X, y_df, sample_weight=weight, verbose=True)
113+
classifier.fit(X, y_df, sample_weight=weight)
114114
prediction = classifier.predict(X)
115115

116116
assert prediction.ndim == 1
@@ -119,13 +119,13 @@ def test_local_classifier(create_cluster):
119119
# should raise error if weight.ndim > 1
120120
with pytest.raises(ValueError):
121121
LGBMClassifier(n_estimators=2).fit(
122-
X, y_df, sample_weight=mt.random.rand(1, 1), verbose=True
122+
X, y_df, sample_weight=mt.random.rand(1, 1)
123123
)
124124

125125
# test binary classifier
126126
new_y = (y_data > 0.5).astype(mt.int32)
127127
classifier = LGBMClassifier(n_estimators=2)
128-
classifier.fit(X, new_y, verbose=True)
128+
classifier.fit(X, new_y)
129129

130130
prediction = classifier.predict(X)
131131
assert prediction.ndim == 1
@@ -139,7 +139,7 @@ def test_local_classifier(create_cluster):
139139
X_np = X.execute().fetch()
140140
new_y_np = new_y.execute().fetch()
141141
raw_classifier = lightgbm.LGBMClassifier(n_estimators=2)
142-
raw_classifier.fit(X_np, new_y_np, verbose=True)
142+
raw_classifier.fit(X_np, new_y_np)
143143

144144
classifier = LGBMClassifier(raw_classifier)
145145
label_result = classifier.predict(X_df)
@@ -162,7 +162,7 @@ def test_local_classifier_from_to_parquet(setup):
162162

163163
# test with existing model
164164
classifier = lightgbm.LGBMClassifier(n_estimators=2)
165-
classifier.fit(X, y, verbose=True)
165+
classifier.fit(X, y)
166166

167167
with tempfile.TemporaryDirectory() as d:
168168
result_dir = os.path.join(d, "result")
@@ -239,7 +239,7 @@ def fit(
239239

240240
y_data = (y * 10).astype(mt.int32)
241241
classifier = MockLGBMClassifier(n_estimators=2)
242-
classifier.fit(X, y_data, eval_set=[(X, y_data)], verbose=True)
242+
classifier.fit(X, y_data, eval_set=[(X, y_data)])
243243
prediction = classifier.predict(X)
244244

245245
assert prediction.ndim == 1

mars/learn/contrib/lightgbm/tests/test_ranker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
def test_local_ranker(setup):
3636
y = (y_raw * 10).astype(mt.int32)
3737
ranker = LGBMRanker(n_estimators=2)
38-
ranker.fit(X_raw, y, group=[X_raw.shape[0]], verbose=True)
38+
ranker.fit(X_raw, y, group=[X_raw.shape[0]])
3939
prediction = ranker.predict(X_raw)
4040

4141
assert prediction.ndim == 1

0 commit comments

Comments
 (0)