Skip to content

Commit 4953f89

Browse files
xi-dbyhuang-db
authored andcommitted
[SPARK-52397][CONNECT] Idempotent ExecutePlan: the second ExecutePlan with same operationId and plan reattaches
### What changes were proposed in this pull request? In Spark Connect, queries can fail with the error INVALID_HANDLE.OPERATION_ALREADY_EXISTS, when a client retries an ExecutePlan RPC—often due to transient network issues—causing the server to receive the same request multiple times. Since each ExecutePlan request includes an operation_id, the server interprets the duplicate as an attempt to create an already existing operation, which results in the OPERATION_ALREADY_EXISTS exception. This behavior interrupts query execution and breaks the user experience under otherwise recoverable conditions. To resolve this, the PR introduces idempotent handling of ExecutePlan on the server side. When a request with a previously seen operation_id and the same plan is received, instead of returning an error, the server now reattaches the response stream to the already running execution associated with that operation. This ensures that retries due to network flakiness no longer result in failed queries, thereby improving the resilience and robustness of query executions. ### Why are the changes needed? It will improve the stability of Spark Connect in case of transient network issues. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Yes, new tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#51084 from xi-db/fix_operation_already_exists. Authored-by: Xi Lyu <xi.lyu@databricks.com> Signed-off-by: Herman van Hovell <herman@databricks.com>
1 parent 766abe9 commit 4953f89

File tree

4 files changed

+159
-49
lines changed

4 files changed

+159
-49
lines changed

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutePlanHandler.scala

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,32 +19,35 @@ package org.apache.spark.sql.connect.service
1919

2020
import io.grpc.stub.StreamObserver
2121

22+
import org.apache.spark.SparkSQLException
2223
import org.apache.spark.connect.proto
2324
import org.apache.spark.internal.Logging
24-
import org.apache.spark.sql.connect.execution.ExecuteGrpcResponseSender
2525

2626
class SparkConnectExecutePlanHandler(responseObserver: StreamObserver[proto.ExecutePlanResponse])
2727
extends Logging {
2828

2929
def handle(v: proto.ExecutePlanRequest): Unit = {
30-
val executeHolder = SparkConnectService.executionManager.createExecuteHolder(v)
31-
try {
32-
executeHolder.eventsManager.postStarted()
33-
executeHolder.start()
34-
} catch {
35-
// Errors raised before the execution holder has finished spawning a thread are considered
36-
// plan execution failure, and the client should not try reattaching it afterwards.
37-
case t: Throwable =>
38-
SparkConnectService.executionManager.removeExecuteHolder(executeHolder.key)
39-
throw t
30+
val previousSessionId = v.hasClientObservedServerSideSessionId match {
31+
case true => Some(v.getClientObservedServerSideSessionId)
32+
case false => None
4033
}
34+
val sessionHolder = SparkConnectService
35+
.getOrCreateIsolatedSession(v.getUserContext.getUserId, v.getSessionId, previousSessionId)
36+
val executeKey = ExecuteKey(v, sessionHolder)
4137

42-
try {
43-
val responseSender =
44-
new ExecuteGrpcResponseSender[proto.ExecutePlanResponse](executeHolder, responseObserver)
45-
executeHolder.runGrpcResponseSender(responseSender)
46-
} finally {
47-
executeHolder.afterInitialRPC()
38+
SparkConnectService.executionManager.getExecuteHolder(executeKey) match {
39+
case None =>
40+
// Create a new execute holder and attach to it.
41+
SparkConnectService.executionManager
42+
.createExecuteHolderAndAttach(executeKey, v, sessionHolder, responseObserver)
43+
case Some(executeHolder) if executeHolder.request.getPlan.equals(v.getPlan) =>
44+
// If the execute holder already exists with the same plan, reattach to it.
45+
SparkConnectService.executionManager
46+
.reattachExecuteHolder(executeHolder, responseObserver, None)
47+
case Some(_) =>
48+
throw new SparkSQLException(
49+
errorClass = "INVALID_HANDLE.OPERATION_ALREADY_EXISTS",
50+
messageParameters = Map("handle" -> executeKey.operationId))
4851
}
4952
}
5053
}

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala

Lines changed: 81 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,14 @@ import scala.jdk.CollectionConverters._
2626
import scala.util.control.NonFatal
2727

2828
import com.google.common.cache.CacheBuilder
29+
import io.grpc.stub.StreamObserver
2930

3031
import org.apache.spark.{SparkEnv, SparkSQLException}
3132
import org.apache.spark.connect.proto
3233
import org.apache.spark.internal.{Logging, LogKeys, MDC}
3334
import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_MILLIS
3435
import org.apache.spark.sql.connect.config.Connect.{CONNECT_EXECUTE_MANAGER_ABANDONED_TOMBSTONES_SIZE, CONNECT_EXECUTE_MANAGER_DETACHED_TIMEOUT, CONNECT_EXECUTE_MANAGER_MAINTENANCE_INTERVAL}
36+
import org.apache.spark.sql.connect.execution.ExecuteGrpcResponseSender
3537
import org.apache.spark.util.ThreadUtils
3638

3739
// Unique key identifying execution by combination of user, session and operation id
@@ -83,17 +85,10 @@ private[connect] class SparkConnectExecutionManager() extends Logging {
8385
/**
8486
* Create a new ExecuteHolder and register it with this global manager and with its session.
8587
*/
86-
private[connect] def createExecuteHolder(request: proto.ExecutePlanRequest): ExecuteHolder = {
87-
val previousSessionId = request.hasClientObservedServerSideSessionId match {
88-
case true => Some(request.getClientObservedServerSideSessionId)
89-
case false => None
90-
}
91-
val sessionHolder = SparkConnectService
92-
.getOrCreateIsolatedSession(
93-
request.getUserContext.getUserId,
94-
request.getSessionId,
95-
previousSessionId)
96-
val executeKey = ExecuteKey(request, sessionHolder)
88+
private[connect] def createExecuteHolder(
89+
executeKey: ExecuteKey,
90+
request: proto.ExecutePlanRequest,
91+
sessionHolder: SessionHolder): ExecuteHolder = {
9792
val executeHolder = executions.compute(
9893
executeKey,
9994
(executeKey, oldExecuteHolder) => {
@@ -123,6 +118,20 @@ private[connect] class SparkConnectExecutionManager() extends Logging {
123118
executeHolder
124119
}
125120

121+
/**
122+
* Create a new ExecuteHolder and register it with this global manager and with its session.
123+
*/
124+
private[connect] def createExecuteHolder(v: proto.ExecutePlanRequest): ExecuteHolder = {
125+
val previousSessionId = v.hasClientObservedServerSideSessionId match {
126+
case true => Some(v.getClientObservedServerSideSessionId)
127+
case false => None
128+
}
129+
val sessionHolder = SparkConnectService
130+
.getOrCreateIsolatedSession(v.getUserContext.getUserId, v.getSessionId, previousSessionId)
131+
val executeKey = ExecuteKey(v, sessionHolder)
132+
createExecuteHolder(executeKey, v, sessionHolder)
133+
}
134+
126135
/**
127136
* Remove an ExecuteHolder from this global manager and from its session. Interrupt the
128137
* execution if still running, free all resources.
@@ -159,6 +168,67 @@ private[connect] class SparkConnectExecutionManager() extends Logging {
159168
Option(executions.get(key))
160169
}
161170

171+
/**
172+
* Create a new ExecuteHolder, register it with this global manager and with its session, and
173+
* attach the given response observer to it.
174+
*/
175+
private[connect] def createExecuteHolderAndAttach(
176+
executeKey: ExecuteKey,
177+
request: proto.ExecutePlanRequest,
178+
sessionHolder: SessionHolder,
179+
responseObserver: StreamObserver[proto.ExecutePlanResponse]): ExecuteHolder = {
180+
val executeHolder = createExecuteHolder(executeKey, request, sessionHolder)
181+
try {
182+
executeHolder.eventsManager.postStarted()
183+
executeHolder.start()
184+
} catch {
185+
// Errors raised before the execution holder has finished spawning a thread are considered
186+
// plan execution failure, and the client should not try reattaching it afterwards.
187+
case t: Throwable =>
188+
removeExecuteHolder(executeHolder.key)
189+
throw t
190+
}
191+
192+
try {
193+
val responseSender =
194+
new ExecuteGrpcResponseSender[proto.ExecutePlanResponse](executeHolder, responseObserver)
195+
executeHolder.runGrpcResponseSender(responseSender)
196+
} finally {
197+
executeHolder.afterInitialRPC()
198+
}
199+
executeHolder
200+
}
201+
202+
/**
203+
* Reattach the given response observer to the given ExecuteHolder.
204+
*/
205+
private[connect] def reattachExecuteHolder(
206+
executeHolder: ExecuteHolder,
207+
responseObserver: StreamObserver[proto.ExecutePlanResponse],
208+
lastConsumedResponseId: Option[String]): Unit = {
209+
if (!executeHolder.reattachable) {
210+
logWarning(log"Reattach to not reattachable operation.")
211+
throw new SparkSQLException(
212+
errorClass = "INVALID_CURSOR.NOT_REATTACHABLE",
213+
messageParameters = Map.empty)
214+
} else if (executeHolder.isOrphan()) {
215+
logWarning(log"Reattach to an orphan operation.")
216+
removeExecuteHolder(executeHolder.key)
217+
throw new IllegalStateException("Operation was orphaned because of an internal error.")
218+
}
219+
220+
val responseSender =
221+
new ExecuteGrpcResponseSender[proto.ExecutePlanResponse](executeHolder, responseObserver)
222+
lastConsumedResponseId match {
223+
case Some(lastResponseId) =>
224+
// start from response after lastResponseId
225+
executeHolder.runGrpcResponseSender(responseSender, lastResponseId)
226+
case None =>
227+
// start from the start of the stream.
228+
executeHolder.runGrpcResponseSender(responseSender)
229+
}
230+
}
231+
162232
private[connect] def removeAllExecutionsForSession(key: SessionKey): Unit = {
163233
executions.forEach((_, executeHolder) => {
164234
if (executeHolder.sessionHolder.key == key) {

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@ import io.grpc.stub.StreamObserver
2222
import org.apache.spark.SparkSQLException
2323
import org.apache.spark.connect.proto
2424
import org.apache.spark.internal.Logging
25-
import org.apache.spark.sql.connect.execution.ExecuteGrpcResponseSender
26-
import org.apache.spark.sql.connect.service.ExecuteKey
2725

2826
class SparkConnectReattachExecuteHandler(
2927
responseObserver: StreamObserver[proto.ExecutePlanResponse])
@@ -57,25 +55,13 @@ class SparkConnectReattachExecuteHandler(
5755
messageParameters = Map("handle" -> v.getOperationId))
5856
}
5957
}
60-
if (!executeHolder.reattachable) {
61-
logWarning(s"Reattach to not reattachable operation.")
62-
throw new SparkSQLException(
63-
errorClass = "INVALID_CURSOR.NOT_REATTACHABLE",
64-
messageParameters = Map.empty)
65-
} else if (executeHolder.isOrphan()) {
66-
logWarning("Reattach to an orphan operation.")
67-
SparkConnectService.executionManager.removeExecuteHolder(executeHolder.key)
68-
throw new IllegalStateException("Operation was orphaned because of an internal error.")
69-
}
7058

71-
val responseSender =
72-
new ExecuteGrpcResponseSender[proto.ExecutePlanResponse](executeHolder, responseObserver)
73-
if (v.hasLastResponseId) {
74-
// start from response after lastResponseId
75-
executeHolder.runGrpcResponseSender(responseSender, v.getLastResponseId)
76-
} else {
77-
// start from the start of the stream.
78-
executeHolder.runGrpcResponseSender(responseSender)
79-
}
59+
SparkConnectService.executionManager.reattachExecuteHolder(
60+
executeHolder,
61+
responseObserver,
62+
v.hasLastResponseId match {
63+
case true => Some(v.getLastResponseId)
64+
case false => None
65+
})
8066
}
8167
}

sql/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,4 +452,55 @@ class ReattachableExecuteSuite extends SparkConnectServerTest {
452452
assert(re.getMessage.contains("INVALID_HANDLE.OPERATION_NOT_FOUND"))
453453
}
454454
}
455+
456+
test("ExecutePlan RPC is idempotent: second ExecutePlan with same operationId reattaches") {
457+
withRawBlockingStub { stub =>
458+
val operationId = UUID.randomUUID().toString
459+
val iter = stub.executePlan(
460+
buildExecutePlanRequest(buildPlan(MEDIUM_RESULTS_QUERY), operationId = operationId))
461+
// open the iterator, guarantees that the RPC reached the server,
462+
// and get the responseId of the first response
463+
val firstResponseId = iter.next().getResponseId
464+
465+
// send execute plan again, it will attach to the same execution,
466+
// instead of throwing INVALID_HANDLE.OPERATION_ALREADY_EXISTS error
467+
val iter2 = stub.executePlan(
468+
buildExecutePlanRequest(buildPlan(MEDIUM_RESULTS_QUERY), operationId = operationId))
469+
// the first response should be the same as the one we got before, because
470+
// lastConsumedResponseId is unset and the server will start from the start of the stream.
471+
assert(iter2.next().getResponseId == firstResponseId)
472+
473+
// should result in INVALID_CURSOR.DISCONNECTED error on the original iterator
474+
val e = intercept[StatusRuntimeException] {
475+
while (iter.hasNext) iter.next()
476+
}
477+
assert(e.getMessage.contains("INVALID_CURSOR.DISCONNECTED"))
478+
479+
// the second iterator should be able to continue
480+
while (iter2.hasNext) {
481+
iter2.next()
482+
}
483+
}
484+
}
485+
486+
test("The second ExecutePlan with same operationId but a different plan will fail") {
487+
withRawBlockingStub { stub =>
488+
val operationId = UUID.randomUUID().toString
489+
val iter = stub.executePlan(
490+
buildExecutePlanRequest(buildPlan(MEDIUM_RESULTS_QUERY), operationId = operationId))
491+
// open the iterator, guarantees that the RPC reached the server,
492+
// and get the responseId of the first response
493+
iter.next()
494+
495+
// send execute plan has the same operation id but different plan,
496+
// it will fail with INVALID_HANDLE.OPERATION_ALREADY_EXISTS error
497+
val SMALL_RESULTS_QUERY = "select * from range(1000)"
498+
val iter2 = stub.executePlan(
499+
buildExecutePlanRequest(buildPlan(SMALL_RESULTS_QUERY), operationId = operationId))
500+
val e = intercept[StatusRuntimeException] {
501+
iter2.next()
502+
}
503+
assert(e.getMessage.contains("INVALID_HANDLE.OPERATION_ALREADY_EXISTS"))
504+
}
505+
}
455506
}

0 commit comments

Comments
 (0)