Skip to content

Conversation

khakhlyuk
Copy link
Contributor

@khakhlyuk khakhlyuk commented Oct 14, 2025

What changes were proposed in this pull request?

https://issues.apache.org/jira/browse/SPARK-53917

Problem description

LocalRelation is a Catalyst logical operator used to represent a dataset of rows inline as part of the LogicalPlan. LocalRelations represent dataframes created directly from Python and Scala objects, e.g., Python and Scala lists, pandas dataframes, csv files loaded in memory, etc.
In Spark Connect, local relations are transferred over gRPC using LocalRelation (for relations under 64MB) and CachedLocalRelation (larger relations over 64MB) messages.
CachedLocalRelations currently have a hard size limit of 2GB, which means that spark users can’t execute queries with local client data, pandas dataframes, csv files of over 2GB.

Design

In Spark Connect, the client needs to serialize the local relation before transferring it to the server. It serializes data via an Arrow IPC stream as a single record batch and schema as a json string. It then embeds data and schema as LocalRelation{schema,data} proto message.
Small local relations (under 64MB) are sent directly as part of the ExecutePlanRequest.

image

Larger local relations are first sent to the server via addArtifact and stored in memory or on disk via BlockManager. Then an ExecutePlanRequest is sent containing CachedLocalRelation{hash}, where hash is the artifact hash. The server retrieves the cached LocalRelation from the BlockManager via the hash, deserializes it, adds it to the LogicalPlan and then executes it.

image

The server reads the data from the BlockManager as a stream and tries to create proto.LocalRelation via

proto.Relation
.newBuilder()
.getLocalRelation
.getParserForType
.parseFrom(blockData.toInputStream())

This fails, because java protobuf library has a 2GB limit on deserializing protobuf messages from a string.

org.sparkproject.connect.com.google.protobuf.InvalidProtocolBufferException) CodedInputStream encountered an embedded string or message which claimed to have negative size.
image

To fix this, I propose avoiding the protobuf layer during the serialization on the client and deserialization on the server. Instead of caching the full protobuf LocalRelation message, we cache the data and schema as separate artifacts, send two hashes {data_hash, schema_hash} to the server, load them both from BlockManager directly and create a LocalRelation on the server based on the unpacked data and schema.

image

After creating a prototype with the new proto message, I discovered that there are additional limits for CachedLocalRelations. Both the Scala Client and the Server store the data in a single Java Array[Byte], which has a 2GB size limit in Java. To avoid this limit, I propose transferring data in chunks. The Python and Scala clients will split data into multiple Arrow batches and upload them separately to the server. Each batch will be uploaded and stored a separate artifact. The Server will then load and process each batch separately. We will keep batch sizes around 16MB (TBD), well below the 2GB limit. This way we will avoid 2GB limits on both clients and on the server.

image

The final proto message looks like this:

message ChunkedCachedLocalRelation {
  repeated string dataHashes = 1;
  optional string schemaHash = 2;
}

Why are the changes needed?

LocalRelations currently have a hard size limit of 2GB, which means that spark users can’t execute queries with local client data, pandas dataframes, csv files of over 2GB.

Does this PR introduce any user-facing change?

No

How was this patch tested?

New python and scala tests for large local relations.

Was this patch authored or co-authored using generative AI tooling?

No

@khakhlyuk khakhlyuk changed the title [WIP][CONNECT] Support large local relations [SPARK-53917][CONNECT] Support large local relations Oct 15, 2025
@khakhlyuk khakhlyuk changed the title [SPARK-53917][CONNECT] Support large local relations [WIP][SPARK-53917][CONNECT] Support large local relations Oct 15, 2025
@pan3793
Copy link
Member

pan3793 commented Oct 17, 2025

constructing large local relations in driver sounds dangerous, is it possible to offload the data from the driver to the executor side (an RDD)?

@khakhlyuk
Copy link
Contributor Author

khakhlyuk commented Oct 17, 2025

constructing large local relations in driver sounds dangerous, is it possible to offload the data from the driver to the executor side (an RDD)?

Hey @pan3793!
Thanks for the feedback, you are totally correct. Spark already materializes the LocalRelations fully on the driver today (both in the classic and connect mode), so this PR is a net-positive improvement over the existing behaviour. My changes remove the hard limit of 2GB, the new limit can be controlled via the spark.sql.session.localRelationSizeLimit conf and which is set to 3GB by default.
Offloading the data materialization from the driver to the executor would be the next important improvement and I believe it should be done, but it's out of scope of this PR and outside of my expertise (I'm mainly familiar with spark connect). I may be able to work on the executor changes in several months. If someone else can pick up the executor work, I'm happy with that. Also happy to create a jira ticket for tracking the follow-up.

@khakhlyuk khakhlyuk force-pushed the largelocalrelations branch from c34471f to 6cbb246 Compare October 17, 2025 11:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants