[WIP][SPARK-53917][CONNECT] Support large local relations #52613
+785
−226
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
https://issues.apache.org/jira/browse/SPARK-53917
Problem description
LocalRelation is a Catalyst logical operator used to represent a dataset of rows inline as part of the LogicalPlan. LocalRelations represent dataframes created directly from Python and Scala objects, e.g., Python and Scala lists, pandas dataframes, csv files loaded in memory, etc.
In Spark Connect, local relations are transferred over gRPC using LocalRelation (for relations under 64MB) and CachedLocalRelation (larger relations over 64MB) messages.
CachedLocalRelations currently have a hard size limit of 2GB, which means that spark users can’t execute queries with local client data, pandas dataframes, csv files of over 2GB.
Design
In Spark Connect, the client needs to serialize the local relation before transferring it to the server. It serializes data via an Arrow IPC stream as a single record batch and schema as a json string. It then embeds data and schema as LocalRelation{schema,data} proto message.
Small local relations (under 64MB) are sent directly as part of the ExecutePlanRequest.
Larger local relations are first sent to the server via addArtifact and stored in memory or on disk via BlockManager. Then an ExecutePlanRequest is sent containing CachedLocalRelation{hash}, where hash is the artifact hash. The server retrieves the cached LocalRelation from the BlockManager via the hash, deserializes it, adds it to the LogicalPlan and then executes it.
The server reads the data from the BlockManager as a stream and tries to create proto.LocalRelation via
This fails, because java protobuf library has a 2GB limit on deserializing protobuf messages from a string.
To fix this, I propose avoiding the protobuf layer during the serialization on the client and deserialization on the server. Instead of caching the full protobuf LocalRelation message, we cache the data and schema as separate artifacts, send two hashes {data_hash, schema_hash} to the server, load them both from BlockManager directly and create a LocalRelation on the server based on the unpacked data and schema.
After creating a prototype with the new proto message, I discovered that there are additional limits for CachedLocalRelations. Both the Scala Client and the Server store the data in a single Java Array[Byte], which has a 2GB size limit in Java. To avoid this limit, I propose transferring data in chunks. The Python and Scala clients will split data into multiple Arrow batches and upload them separately to the server. Each batch will be uploaded and stored a separate artifact. The Server will then load and process each batch separately. We will keep batch sizes around 16MB (TBD), well below the 2GB limit. This way we will avoid 2GB limits on both clients and on the server.
The final proto message looks like this:
Why are the changes needed?
LocalRelations currently have a hard size limit of 2GB, which means that spark users can’t execute queries with local client data, pandas dataframes, csv files of over 2GB.
Does this PR introduce any user-facing change?
No
How was this patch tested?
New python and scala tests for large local relations.
Was this patch authored or co-authored using generative AI tooling?
No