Skip to content

[SPARK-52904][PYTHON] Enable convertToArrowArraySafely by default #51596

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions python/docs/source/migration_guide/pyspark_upgrade.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ Upgrading from PySpark 4.0 to 4.1

* In Spark 4.1, unnecessary conversion to pandas instances is removed when ``spark.sql.execution.pythonUDTF.arrow.enabled`` is enabled. As a result, the type coercion changes when the produced output has a schema different from the specified schema. To restore the previous behavior, enable ``spark.sql.legacy.execution.pythonUDTF.pandas.conversion.enabled``.

* In Spark 4.1, the ``spark.sql.execution.pandas.convertToArrowArraySafely`` configuration is enabled by default. When this setting is enabled, PyArrow raises errors for unsafe conversions such as integer overflows, floating point truncation, and loss of precision. This change affects the return data serialization of arrow-enabled UDFs/pandas_udfs, and the creation of PySpark DataFrames. To restore the previous behavior, set the configuration to ``false``.

Upgrading from PySpark 3.5 to 4.0
---------------------------------

Expand Down
6 changes: 6 additions & 0 deletions python/pyspark/pandas/tests/computation/test_describe.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@


class FrameDescribeMixin:
@classmethod
def setUpClass(cls):
super(FrameDescribeMixin, cls).setUpClass()
# Some nanosecond->microsecond conversions throw loss of precision errors
cls.spark.conf.set("spark.sql.execution.pandas.convertToArrowArraySafely", "false")

@property
def pdf(self):
return pd.DataFrame(
Expand Down
6 changes: 6 additions & 0 deletions python/pyspark/pandas/tests/data_type_ops/testing_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@
class OpsTestBase:
"""The test base for arithmetic operations of different data types."""

@classmethod
def setUpClass(cls):
super(OpsTestBase, cls).setUpClass()
# Some nanosecond->microsecond conversions throw loss of precision errors
cls.spark.conf.set("spark.sql.execution.pandas.convertToArrowArraySafely", "false")

@property
def numeric_pdf(self):
dtypes = [np.int32, int, np.float32, float]
Expand Down
6 changes: 6 additions & 0 deletions python/pyspark/pandas/tests/test_numpy_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@


class NumPyCompatTestsMixin:
@classmethod
def setUpClass(cls):
super(NumPyCompatTestsMixin, cls).setUpClass()
# Some nanosecond->microsecond conversions throw loss of precision errors
cls.spark.conf.set("spark.sql.execution.pandas.convertToArrowArraySafely", "false")

blacklist = [
# Pandas-on-Spark does not currently support
"conj",
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/sql/tests/arrow/test_arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -707,8 +707,8 @@ def check_createDataFrame_with_single_data_type(self):
def test_createDataFrame_does_not_modify_input(self):
# Some series get converted for Spark to consume, this makes sure input is unchanged
pdf = self.create_pandas_data_frame()
# Use a nanosecond value to make sure it is not truncated
pdf.iloc[0, 7] = pd.Timestamp(1)
# Use a nanosecond value that converts to microseconds without precision loss
pdf.iloc[0, 7] = pd.Timestamp(1000)
# Integers with nulls will get NaNs filled with 0 and will be casted
pdf.iloc[1, 1] = None
pdf_copy = pdf.copy(deep=True)
Expand Down
5 changes: 4 additions & 1 deletion python/pyspark/sql/tests/connect/test_connect_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,12 +530,15 @@ def test_create_dataframe_from_pandas_with_ns_timestamp(self):
from pandas import Timestamp
import pandas as pd

# Nanoseconds are truncated to microseconds in the serializer
# Arrow will throw an error if precision is lost
# (i.e., nanoseconds cannot be represented in microseconds)
pdf = pd.DataFrame(
{
"naive": [datetime(2019, 1, 1, 0)],
"aware": [
Timestamp(
year=2019, month=1, day=1, nanosecond=500, tz=timezone(timedelta(hours=-8))
year=2019, month=1, day=1, nanosecond=0, tz=timezone(timedelta(hours=-8))
)
],
}
Expand Down
7 changes: 5 additions & 2 deletions python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,10 @@ def test_vectorized_udf_struct_complex(self):

def _scalar_f(id):
return pd.DataFrame(
{"ts": id.apply(lambda i: pd.Timestamp(i)), "arr": id.apply(lambda i: [i, i + 1])}
{
"ts": id.apply(lambda i: pd.Timestamp(i, unit="s")),
"arr": id.apply(lambda i: [i, i + 1]),
}
)

scalar_f = pandas_udf(_scalar_f, returnType=return_type)
Expand All @@ -532,7 +535,7 @@ def iter_f(it):
for i, row in enumerate(actual):
id, f = row
self.assertEqual(i, id)
self.assertEqual(pd.Timestamp(i).to_pydatetime(), f[0])
self.assertEqual(pd.Timestamp(i, unit="s").to_pydatetime(), f[0])
self.assertListEqual([i, i + 1], f[1])

def test_vectorized_udf_struct_empty(self):
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/tests/test_memory_profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ def ser_to_ser(ser: pd.Series) -> pd.Series:
def exec_pandas_udf_ser_to_scalar(self):
import pandas as pd

@pandas_udf("int")
@pandas_udf("double")
def ser_to_scalar(ser: pd.Series) -> float:
return ser.median()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3990,7 +3990,7 @@ object SQLConf {
"check and do type conversions anyway. This config only works for Arrow 0.11.0+.")
.version("3.0.0")
.booleanConf
.createWithDefault(false)
.createWithDefault(true)

val PYSPARK_WORKER_PYTHON_EXECUTABLE =
buildConf("spark.sql.execution.pyspark.python")
Expand Down