Skip to content

[SPARK-52904][PYTHON] Enable convertToArrowArraySafely by default #51596

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions python/pyspark/pandas/tests/computation/test_describe.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def test_describe(self):
psdf = ps.DataFrame(
{
"a": [1, 2, 3],
"b": [pd.Timestamp(1), pd.Timestamp(1), pd.Timestamp(1)],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel we can skip the changes under python/pyspark/pandas for now, but explicitly setting the config false in the setUpClass

also cc @xinrong-meng on the behavior change in PS

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alright, I've just disabled the conf for the tests in python/pyspark/pandas

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me know if you'd also want to avoid changes to the other tests

"b": [pd.Timestamp(1000), pd.Timestamp(1000), pd.Timestamp(1000)],
"c": [None, None, None],
}
)
Expand Down Expand Up @@ -184,8 +184,8 @@ def test_describe_empty(self):
# Explicit empty DataFrame timestamp only
psdf = ps.DataFrame(
{
"a": [pd.Timestamp(1), pd.Timestamp(1), pd.Timestamp(1)],
"b": [pd.Timestamp(1), pd.Timestamp(1), pd.Timestamp(1)],
"a": [pd.Timestamp(1000), pd.Timestamp(1000), pd.Timestamp(1000)],
"b": [pd.Timestamp(1000), pd.Timestamp(1000), pd.Timestamp(1000)],
}
)
pdf = psdf._to_pandas()
Expand All @@ -199,7 +199,7 @@ def test_describe_empty(self):

# Explicit empty DataFrame numeric & timestamp
psdf = ps.DataFrame(
{"a": [1, 2, 3], "b": [pd.Timestamp(1), pd.Timestamp(1), pd.Timestamp(1)]}
{"a": [1, 2, 3], "b": [pd.Timestamp(1000), pd.Timestamp(1000), pd.Timestamp(1000)]}
)
pdf = psdf._to_pandas()
pdf_result = pdf[pdf.a != pdf.a].describe()
Expand All @@ -219,7 +219,10 @@ def test_describe_empty(self):

# Explicit empty DataFrame string & timestamp
psdf = ps.DataFrame(
{"a": ["a", "b", "c"], "b": [pd.Timestamp(1), pd.Timestamp(1), pd.Timestamp(1)]}
{
"a": ["a", "b", "c"],
"b": [pd.Timestamp(1000), pd.Timestamp(1000), pd.Timestamp(1000)],
}
)
pdf = psdf._to_pandas()
pdf_result = pdf[pdf.a != pdf.a].describe()
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/pandas/tests/data_type_ops/testing_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def non_numeric_pdf(self):
"date": pd.Series(
[datetime.date(1994, 1, 1), datetime.date(1994, 1, 2), datetime.date(1994, 1, 3)]
),
"datetime": pd.to_datetime(pd.Series([1, 2, 3])),
"datetime": pd.to_datetime(pd.Series([1, 2, 3]), unit="s"),
"timedelta": pd.Series(
[datetime.timedelta(1), datetime.timedelta(hours=2), datetime.timedelta(weeks=3)]
),
Expand Down Expand Up @@ -127,7 +127,7 @@ def numeric_pser_psser_pairs(self):
def non_numeric_psers(self):
psers = {
"string": pd.Series(["x", "y", "z"]),
"datetime": pd.to_datetime(pd.Series([1, 2, 3])),
"datetime": pd.to_datetime(pd.Series([1, 2, 3]), unit="s"),
"bool": pd.Series([True, True, False]),
"date": pd.Series(
[datetime.date(1994, 1, 1), datetime.date(1994, 1, 2), datetime.date(1994, 1, 3)]
Expand Down
82 changes: 43 additions & 39 deletions python/pyspark/pandas/tests/test_numpy_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,52 +85,56 @@ def test_np_unsupported_frame(self):
def test_np_spark_compat_series(self):
from pyspark.pandas.numpy_compat import unary_np_spark_mappings, binary_np_spark_mappings

# Use randomly generated dataFrame
pdf = pd.DataFrame(
np.random.randint(-100, 100, size=(np.random.randint(100), 2)), columns=["a", "b"]
)
pdf2 = pd.DataFrame(
np.random.randint(-100, 100, size=(len(pdf), len(pdf.columns))), columns=["a", "b"]
)
psdf = ps.from_pandas(pdf)
psdf2 = ps.from_pandas(pdf2)

for np_name, spark_func in unary_np_spark_mappings.items():
np_func = getattr(np, np_name)
if np_name not in self.blacklist:
try:
# unary ufunc
self.assert_eq(np_func(pdf.a), np_func(psdf.a), almost=True)
except Exception as e:
raise AssertionError("Test in '%s' function was failed." % np_name) from e

for np_name, spark_func in binary_np_spark_mappings.items():
np_func = getattr(np, np_name)
if np_name not in self.blacklist:
try:
# binary ufunc
self.assert_eq(np_func(pdf.a, pdf.b), np_func(psdf.a, psdf.b), almost=True)
self.assert_eq(np_func(pdf.a, 1), np_func(psdf.a, 1), almost=True)
except Exception as e:
raise AssertionError("Test in '%s' function was failed." % np_name) from e
# Disable arrow errors, some numpy functions produce results that exceed value ranges
with self.sql_conf({"spark.sql.execution.pandas.convertToArrowArraySafely": False}):
# Use randomly generated dataFrame
pdf = pd.DataFrame(
np.random.randint(-100, 100, size=(np.random.randint(100), 2)), columns=["a", "b"]
)
pdf2 = pd.DataFrame(
np.random.randint(-100, 100, size=(len(pdf), len(pdf.columns))), columns=["a", "b"]
)
psdf = ps.from_pandas(pdf)
psdf2 = ps.from_pandas(pdf2)

for np_name, spark_func in unary_np_spark_mappings.items():
np_func = getattr(np, np_name)
if np_name not in self.blacklist:
try:
# unary ufunc
self.assert_eq(np_func(pdf.a), np_func(psdf.a), almost=True)
except Exception as e:
raise AssertionError("Test in '%s' function was failed." % np_name) from e

# Test only top 5 for now. 'compute.ops_on_diff_frames' option increases too much time.
try:
set_option("compute.ops_on_diff_frames", True)
for np_name, spark_func in list(binary_np_spark_mappings.items())[:5]:
for np_name, spark_func in binary_np_spark_mappings.items():
np_func = getattr(np, np_name)
if np_name not in self.blacklist:
try:
# binary ufunc
self.assert_eq(
np_func(pdf.a, pdf2.b).sort_index(),
np_func(psdf.a, psdf2.b).sort_index(),
almost=True,
)
self.assert_eq(np_func(pdf.a, pdf.b), np_func(psdf.a, psdf.b), almost=True)
self.assert_eq(np_func(pdf.a, 1), np_func(psdf.a, 1), almost=True)
except Exception as e:
raise AssertionError("Test in '%s' function was failed." % np_name) from e
finally:
reset_option("compute.ops_on_diff_frames")

# Test only top 5 for now. 'compute.ops_on_diff_frames' option increases too much time.
try:
set_option("compute.ops_on_diff_frames", True)
for np_name, spark_func in list(binary_np_spark_mappings.items())[:5]:
np_func = getattr(np, np_name)
if np_name not in self.blacklist:
try:
# binary ufunc
self.assert_eq(
np_func(pdf.a, pdf2.b).sort_index(),
np_func(psdf.a, psdf2.b).sort_index(),
almost=True,
)
except Exception as e:
raise AssertionError(
"Test in '%s' function was failed." % np_name
) from e
finally:
reset_option("compute.ops_on_diff_frames")

@unittest.skipIf(is_ansi_mode_test, ansi_mode_not_supported_message)
def test_np_spark_compat_frame(self):
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/sql/tests/arrow/test_arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -707,8 +707,8 @@ def check_createDataFrame_with_single_data_type(self):
def test_createDataFrame_does_not_modify_input(self):
# Some series get converted for Spark to consume, this makes sure input is unchanged
pdf = self.create_pandas_data_frame()
# Use a nanosecond value to make sure it is not truncated
pdf.iloc[0, 7] = pd.Timestamp(1)
# Use a nanosecond value that converts to microseconds without precision loss
pdf.iloc[0, 7] = pd.Timestamp(1000)
# Integers with nulls will get NaNs filled with 0 and will be casted
pdf.iloc[1, 1] = None
pdf_copy = pdf.copy(deep=True)
Expand Down
5 changes: 4 additions & 1 deletion python/pyspark/sql/tests/connect/test_connect_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,12 +530,15 @@ def test_create_dataframe_from_pandas_with_ns_timestamp(self):
from pandas import Timestamp
import pandas as pd

# Nanoseconds are truncated to microseconds in the serializer
# Arrow will throw an error if precision is lost
# (i.e., nanoseconds cannot be represented in microseconds)
pdf = pd.DataFrame(
{
"naive": [datetime(2019, 1, 1, 0)],
"aware": [
Timestamp(
year=2019, month=1, day=1, nanosecond=500, tz=timezone(timedelta(hours=-8))
year=2019, month=1, day=1, nanosecond=0, tz=timezone(timedelta(hours=-8))
)
],
}
Expand Down
7 changes: 5 additions & 2 deletions python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,10 @@ def test_vectorized_udf_struct_complex(self):

def _scalar_f(id):
return pd.DataFrame(
{"ts": id.apply(lambda i: pd.Timestamp(i)), "arr": id.apply(lambda i: [i, i + 1])}
{
"ts": id.apply(lambda i: pd.Timestamp(i, unit="s")),
"arr": id.apply(lambda i: [i, i + 1]),
}
)

scalar_f = pandas_udf(_scalar_f, returnType=return_type)
Expand All @@ -532,7 +535,7 @@ def iter_f(it):
for i, row in enumerate(actual):
id, f = row
self.assertEqual(i, id)
self.assertEqual(pd.Timestamp(i).to_pydatetime(), f[0])
self.assertEqual(pd.Timestamp(i, unit="s").to_pydatetime(), f[0])
self.assertListEqual([i, i + 1], f[1])

def test_vectorized_udf_struct_empty(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3990,7 +3990,7 @@ object SQLConf {
"check and do type conversions anyway. This config only works for Arrow 0.11.0+.")
.version("3.0.0")
.booleanConf
.createWithDefault(false)
.createWithDefault(true)

val PYSPARK_WORKER_PYTHON_EXECUTABLE =
buildConf("spark.sql.execution.pyspark.python")
Expand Down