-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-53696][PYTHON][CONNECT][SQL] Default to bytes for BinaryType in PySpark #52467
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[SPARK-53696][PYTHON][CONNECT][SQL] Default to bytes for BinaryType in PySpark #52467
Conversation
safecheck, | ||
input_types, | ||
int_to_decimal_coercion_enabled=False, | ||
int_to_decimal_coercion_enabled, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the default value for int_to_decimal_coercion_enabled
is not used at all
python/pyspark/sql/tests/arrow/test_arrow_binary_as_bytes_udf.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also add this change to the migration guide?
where shall I add it? I found the pyspark migration guide is archived https://github.com/apache/spark/blob/master/docs/pyspark-migration-guide.md |
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
Outdated
Show resolved
Hide resolved
@unittest.skip("Duplicate test as it is already tested in ArrowPythonUDFLegacyTests.") | ||
def test_udf_binary_type(self): | ||
super().test_udf_binary_type(self) | ||
|
||
@unittest.skip("Duplicate test as it is already tested in ArrowPythonUDFLegacyTests.") | ||
def test_udf_binary_type_in_nested_structures(self): | ||
super().test_udf_binary_type_in_nested_structures(self) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we add tests then skip them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
class ArrowPythonUDFParityLegacyTestsMixin
extends from ArrowPythonUDFTestsMixin
and ArrowPythonUDFTestsMixin
already contains this test.
We already run this test in test_arrow_python_udf.py. It is not meaningful to re-run a test. We can marginally save some test resources.
schema: StructType, | ||
*, | ||
return_as_tuples: bool = False, | ||
binary_as_bytes: bool = True, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe this should be controlled by flag?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is already controlled by a flag, as binary_as_bytes
is passed at caller's place as the value of the SQL conf spark.sql.execution.pyspark.binaryAsBytes
.
It is not possible, or against the style, to access the SQL conf in this conversion.py
@ueshin @Yicong-Huang do you have other concerns? Thanks a lot! |
Let's fix up the linter failure and merge |
Add type: ignore[misc] comments to suppress mypy errors about "None not callable" in converter functions. These comments are needed because mypy cannot infer that converters are callable after None checks when the binary_as_bytes parameter is added to the type signature. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
Add type ignore comment to suppress mypy error about "None" not callable in ArrowTableToRowsConversion._create_converter. The field_convs[i] can be None, but the conditional expression structure causes mypy to analyze the call before the None check. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
I did another pass and fixed the linter error. The PR is good for merge from my point of view. @HyukjinKwon @ueshin @allisonwang-db thank you! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good!
|
||
val PYSPARK_BINARY_AS_BYTES = | ||
buildConf("spark.sql.execution.pyspark.binaryAsBytes") | ||
.doc("When true, BinaryType is mapped consistently to bytes in PySpark." + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Mapped" here means the function input will be mapped as bytes right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exactly
for idx, field in enumerate(result_df.schema.fields): | ||
self.assertEqual(field.dataType, expected_output_types[idx]) | ||
|
||
def test_udtf_binary_type(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also add one more test for test_arrow_udtf?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what test did you mean? The same test case but applied for arrow udtf? See another thread, as the class for arrow udtf inherits the class for regular udtf, it is already tested
for conf_value in ["true", "false"]: | ||
with self.sql_conf({"spark.sql.execution.pyspark.binaryAsBytes": conf_value}): | ||
result = BinaryTypeUDTF(lit(b"test")).collect() | ||
self.assertEqual(result[0]["type_name"], "bytes") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@allisonwang-db arrow udtf with legacy conversion is tested here
def test_udtf_binary_type(self): | ||
# For Arrow Python UDTF with non-legacy conversionBinaryType is mapped to | ||
# bytes or bytearray consistently with non-Arrow Python UDTF behavior. | ||
BaseUDTFTestsMixin.test_udtf_binary_type(self) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@allisonwang-db arrow udtf without legacy conversion is tested here
DataFrame APIs (both Spark Classic and Spark Connect) ``bytearray`` | ||
Data sources ``bytearray`` | ||
Arrow-optimized UDF and UDTF with unnecessary conversion to pandas instances ``bytes`` | ||
=============================================================================== ============================== |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ueshin @allisonwang-db migration guide is added!
What changes were proposed in this pull request?
Currently,
BinaryType
is mapped inconsistently in PySpark:Cases when it is mapped to
bytearray
:Cases when it is mapped to
bytes
:regular UDF with arrow optimization and legacy pandas conversion.
This complicates the data mapping model. With this PR,
BinaryType
will be consistently mapped tobytes
in all aforementioned cases.We gate the change with a SQL Conf, and enable the conversion to bytes by default.
This PR is based on #52370
Why are the changes needed?
bytes
is more efficient as it is immutable and requires zero copy.Does this PR introduce any user-facing change?
Yes. For the aforementioned cases where
BinaryType
is mapped tobytearray
, we changed the mapping tobytes
.How was this patch tested?
Many tests.
Was this patch authored or co-authored using generative AI tooling?
Yes, with the help of claude code.