Skip to content

[SPARK-52321] Add SessionClosed/SqlConfNotFound/ParseSyntaxError to SparkConnectError #180

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Sources/SparkConnect/DataFrame.swift
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,8 @@ public actor DataFrame: Sendable {
throw SparkConnectError.TableOrViewNotFound
case let m where m.contains("UNRESOLVED_COLUMN.WITH_SUGGESTION"):
throw SparkConnectError.ColumnNotFound
case let m where m.contains("PARSE_SYNTAX_ERROR"):
throw SparkConnectError.ParseSyntaxError
default:
throw error
}
Expand Down
4 changes: 4 additions & 0 deletions Sources/SparkConnect/SparkConnectClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ public actor SparkConnectClient {
return try await f(client)
} catch let error as RPCError where error.code == .internalError {
switch error.message {
case let m where m.contains("INVALID_HANDLE.SESSION_CLOSED"):
throw SparkConnectError.SessionClosed
case let m where m.contains("SQL_CONF_NOT_FOUND"):
throw SparkConnectError.SqlConfNotFound
case let m where m.contains("TABLE_OR_VIEW_ALREADY_EXISTS"):
throw SparkConnectError.TableOrViewAlreadyExists
case let m where m.contains("TABLE_OR_VIEW_NOT_FOUND"):
Expand Down
3 changes: 3 additions & 0 deletions Sources/SparkConnect/SparkConnectError.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ public enum SparkConnectError: Error {
case InvalidSessionID
case InvalidType
case InvalidViewName
case ParseSyntaxError
case SchemaNotFound
case SessionClosed
case SqlConfNotFound
case TableOrViewAlreadyExists
case TableOrViewNotFound
case UnsupportedOperation
Expand Down
4 changes: 2 additions & 2 deletions Tests/SparkConnectTests/CatalogTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ struct CatalogTests {
})
#expect(try await spark.catalog.tableExists(tableName) == false)

try await #require(throws: Error.self) {
try await #require(throws: SparkConnectError.ParseSyntaxError) {
try await spark.catalog.tableExists("invalid table name")
}
await spark.stop()
Expand Down Expand Up @@ -190,7 +190,7 @@ struct CatalogTests {
#expect(try await spark.catalog.functionExists("base64"))
#expect(try await spark.catalog.functionExists("non_exist_function") == false)

try await #require(throws: Error.self) {
try await #require(throws: SparkConnectError.ParseSyntaxError) {
try await spark.catalog.functionExists("invalid function name")
}
await spark.stop()
Expand Down
4 changes: 2 additions & 2 deletions Tests/SparkConnectTests/RuntimeConfTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ struct RuntimeConfTests {

#expect(try await !conf.get("spark.app.name").isEmpty)

try await #require(throws: Error.self) {
try await #require(throws: SparkConnectError.SqlConfNotFound) {
try await conf.get("spark.test.non-exist")
}

Expand Down Expand Up @@ -86,7 +86,7 @@ struct RuntimeConfTests {
#expect(try await conf.get("spark.test.key1") == "value1")

try await conf.unset("spark.test.key1")
try await #require(throws: Error.self) {
try await #require(throws: SparkConnectError.SqlConfNotFound) {
try await conf.get("spark.test.key1")
}

Expand Down
2 changes: 1 addition & 1 deletion Tests/SparkConnectTests/SparkSessionTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ struct SparkSessionTests {
let sessionID = spark1.sessionID
await spark1.stop()
let remote = ProcessInfo.processInfo.environment["SPARK_REMOTE"] ?? "sc://localhost"
try await #require(throws: Error.self) {
try await #require(throws: SparkConnectError.SessionClosed) {
try await SparkSession.builder.remote("\(remote)/;session_id=\(sessionID)").getOrCreate()
}
}
Expand Down
Loading