From ebfd2cb0a19cf17846ffc63a5784c93ca97c9d4c Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 26 May 2025 21:43:57 -0700 Subject: [PATCH 1/2] [SPARK-52320] Add `ColumnNotFound/InvalidViewName/TableOrViewAlreadyExists` to `SparkConnectError` --- Sources/SparkConnect/DataFrame.swift | 2 ++ Sources/SparkConnect/SparkConnectClient.swift | 15 ++++++++++++++- Sources/SparkConnect/SparkConnectError.swift | 3 +++ Tests/SparkConnectTests/CatalogTests.swift | 12 ++++++------ Tests/SparkConnectTests/DataFrameTests.swift | 4 ++-- .../SparkConnectTests/DataFrameWriterTests.swift | 5 ++--- .../DataFrameWriterV2Tests.swift | 2 +- .../SparkConnectTests/MergeIntoWriterTests.swift | 12 ++++++------ 8 files changed, 36 insertions(+), 19 deletions(-) diff --git a/Sources/SparkConnect/DataFrame.swift b/Sources/SparkConnect/DataFrame.swift index eda8bda..fa1eb48 100644 --- a/Sources/SparkConnect/DataFrame.swift +++ b/Sources/SparkConnect/DataFrame.swift @@ -309,6 +309,8 @@ public actor DataFrame: Sendable { throw SparkConnectError.SchemaNotFound case let m where m.contains("TABLE_OR_VIEW_NOT_FOUND"): throw SparkConnectError.TableOrViewNotFound + case let m where m.contains("UNRESOLVED_COLUMN.WITH_SUGGESTION"): + throw SparkConnectError.ColumnNotFound default: throw error } diff --git a/Sources/SparkConnect/SparkConnectClient.swift b/Sources/SparkConnect/SparkConnectClient.swift index 93889ce..c9fc5b0 100644 --- a/Sources/SparkConnect/SparkConnectClient.swift +++ b/Sources/SparkConnect/SparkConnectClient.swift @@ -127,7 +127,20 @@ public actor SparkConnectClient { ), interceptors: self.intercepters ) { client in - return try await f(client) + do { + return try await f(client) + } catch let error as RPCError where error.code == .internalError { + switch error.message { + case let m where m.contains("TABLE_OR_VIEW_ALREADY_EXISTS"): + throw SparkConnectError.TableOrViewAlreadyExists + case let m where m.contains("TABLE_OR_VIEW_NOT_FOUND"): + throw SparkConnectError.TableOrViewNotFound + case let m where m.contains("Invalid view name:"): + throw SparkConnectError.InvalidViewName + default: + throw error + } + } } } diff --git a/Sources/SparkConnect/SparkConnectError.swift b/Sources/SparkConnect/SparkConnectError.swift index c300075..f235859 100644 --- a/Sources/SparkConnect/SparkConnectError.swift +++ b/Sources/SparkConnect/SparkConnectError.swift @@ -20,10 +20,13 @@ /// A enum for ``SparkConnect`` package errors public enum SparkConnectError: Error { case CatalogNotFound + case ColumnNotFound case InvalidArgument case InvalidSessionID case InvalidType + case InvalidViewName case SchemaNotFound + case TableOrViewAlreadyExists case TableOrViewNotFound case UnsupportedOperation } diff --git a/Tests/SparkConnectTests/CatalogTests.swift b/Tests/SparkConnectTests/CatalogTests.swift index 30f91b0..b63fd35 100644 --- a/Tests/SparkConnectTests/CatalogTests.swift +++ b/Tests/SparkConnectTests/CatalogTests.swift @@ -205,12 +205,12 @@ struct CatalogTests { try await spark.range(1).createTempView(viewName) #expect(try await spark.catalog.tableExists(viewName)) - try await #require(throws: Error.self) { + try await #require(throws: SparkConnectError.TableOrViewAlreadyExists) { try await spark.range(1).createTempView(viewName) } }) - try await #require(throws: Error.self) { + try await #require(throws: SparkConnectError.InvalidViewName) { try await spark.range(1).createTempView("invalid view name") } @@ -228,7 +228,7 @@ struct CatalogTests { try await spark.range(1).createOrReplaceTempView(viewName) }) - try await #require(throws: Error.self) { + try await #require(throws: SparkConnectError.InvalidViewName) { try await spark.range(1).createOrReplaceTempView("invalid view name") } @@ -244,13 +244,13 @@ struct CatalogTests { try await spark.range(1).createGlobalTempView(viewName) #expect(try await spark.catalog.tableExists("global_temp.\(viewName)")) - try await #require(throws: Error.self) { + try await #require(throws: SparkConnectError.TableOrViewAlreadyExists) { try await spark.range(1).createGlobalTempView(viewName) } }) #expect(try await spark.catalog.tableExists("global_temp.\(viewName)") == false) - try await #require(throws: Error.self) { + try await #require(throws: SparkConnectError.InvalidViewName) { try await spark.range(1).createGlobalTempView("invalid view name") } @@ -269,7 +269,7 @@ struct CatalogTests { }) #expect(try await spark.catalog.tableExists("global_temp.\(viewName)") == false) - try await #require(throws: Error.self) { + try await #require(throws: SparkConnectError.InvalidViewName) { try await spark.range(1).createOrReplaceGlobalTempView("invalid view name") } diff --git a/Tests/SparkConnectTests/DataFrameTests.swift b/Tests/SparkConnectTests/DataFrameTests.swift index 15cc6d6..f5c6eeb 100644 --- a/Tests/SparkConnectTests/DataFrameTests.swift +++ b/Tests/SparkConnectTests/DataFrameTests.swift @@ -235,10 +235,10 @@ struct DataFrameTests { @Test func selectInvalidColumn() async throws { let spark = try await SparkSession.builder.getOrCreate() - try await #require(throws: Error.self) { + try await #require(throws: SparkConnectError.ColumnNotFound) { try await spark.range(1).select("invalid").schema } - try await #require(throws: Error.self) { + try await #require(throws: SparkConnectError.ColumnNotFound) { try await spark.range(1).select("id + 1").schema } await spark.stop() diff --git a/Tests/SparkConnectTests/DataFrameWriterTests.swift b/Tests/SparkConnectTests/DataFrameWriterTests.swift index 3ad1234..5228667 100644 --- a/Tests/SparkConnectTests/DataFrameWriterTests.swift +++ b/Tests/SparkConnectTests/DataFrameWriterTests.swift @@ -112,7 +112,7 @@ struct DataFrameWriterTests { try await spark.range(1).write.saveAsTable(tableName) #expect(try await spark.read.table(tableName).count() == 1) - try await #require(throws: Error.self) { + try await #require(throws: SparkConnectError.TableOrViewAlreadyExists) { try await spark.range(1).write.saveAsTable(tableName) } @@ -130,8 +130,7 @@ struct DataFrameWriterTests { let spark = try await SparkSession.builder.getOrCreate() let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "") try await SQLHelper.withTable(spark, tableName)({ - // Table doesn't exist. - try await #require(throws: Error.self) { + try await #require(throws: SparkConnectError.TableOrViewNotFound) { try await spark.range(1).write.insertInto(tableName) } diff --git a/Tests/SparkConnectTests/DataFrameWriterV2Tests.swift b/Tests/SparkConnectTests/DataFrameWriterV2Tests.swift index 938caa8..60f47c2 100644 --- a/Tests/SparkConnectTests/DataFrameWriterV2Tests.swift +++ b/Tests/SparkConnectTests/DataFrameWriterV2Tests.swift @@ -34,7 +34,7 @@ struct DataFrameWriterV2Tests { let write = try await spark.range(2).writeTo(tableName).using("orc") try await write.create() #expect(try await spark.table(tableName).count() == 2) - try await #require(throws: Error.self) { + try await #require(throws: SparkConnectError.TableOrViewAlreadyExists) { try await write.create() } }) diff --git a/Tests/SparkConnectTests/MergeIntoWriterTests.swift b/Tests/SparkConnectTests/MergeIntoWriterTests.swift index d7d693d..332999d 100644 --- a/Tests/SparkConnectTests/MergeIntoWriterTests.swift +++ b/Tests/SparkConnectTests/MergeIntoWriterTests.swift @@ -31,10 +31,10 @@ struct MergeIntoWriterTests { let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "") try await SQLHelper.withTable(spark, tableName)({ let mergeInto = try await spark.range(1).mergeInto(tableName, "true") - try await #require(throws: Error.self) { + try await #require(throws: SparkConnectError.TableOrViewNotFound) { try await mergeInto.whenMatched().delete().merge() } - try await #require(throws: Error.self) { + try await #require(throws: SparkConnectError.TableOrViewNotFound) { try await mergeInto.whenMatched("true").delete().merge() } }) @@ -47,10 +47,10 @@ struct MergeIntoWriterTests { let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "") try await SQLHelper.withTable(spark, tableName)({ let mergeInto = try await spark.range(1).mergeInto(tableName, "true") - try await #require(throws: Error.self) { + try await #require(throws: SparkConnectError.TableOrViewNotFound) { try await mergeInto.whenNotMatched().insertAll().merge() } - try await #require(throws: Error.self) { + try await #require(throws: SparkConnectError.TableOrViewNotFound) { try await mergeInto.whenNotMatched("true").insertAll().merge() } }) @@ -63,10 +63,10 @@ struct MergeIntoWriterTests { let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "") try await SQLHelper.withTable(spark, tableName)({ let mergeInto = try await spark.range(1).mergeInto(tableName, "true") - try await #require(throws: Error.self) { + try await #require(throws: SparkConnectError.TableOrViewNotFound) { try await mergeInto.whenNotMatchedBySource().delete().merge() } - try await #require(throws: Error.self) { + try await #require(throws: SparkConnectError.TableOrViewNotFound) { try await mergeInto.whenNotMatchedBySource("true").delete().merge() } }) From b15c15fde2399024d3db186d3e74aa597254da7a Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 26 May 2025 22:05:46 -0700 Subject: [PATCH 2/2] Handle Spark 3 --- .../MergeIntoWriterTests.swift | 57 ++++++++++++++----- 1 file changed, 42 insertions(+), 15 deletions(-) diff --git a/Tests/SparkConnectTests/MergeIntoWriterTests.swift b/Tests/SparkConnectTests/MergeIntoWriterTests.swift index 332999d..9881653 100644 --- a/Tests/SparkConnectTests/MergeIntoWriterTests.swift +++ b/Tests/SparkConnectTests/MergeIntoWriterTests.swift @@ -31,11 +31,20 @@ struct MergeIntoWriterTests { let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "") try await SQLHelper.withTable(spark, tableName)({ let mergeInto = try await spark.range(1).mergeInto(tableName, "true") - try await #require(throws: SparkConnectError.TableOrViewNotFound) { - try await mergeInto.whenMatched().delete().merge() - } - try await #require(throws: SparkConnectError.TableOrViewNotFound) { - try await mergeInto.whenMatched("true").delete().merge() + if await spark.version >= "4.0.0" { + try await #require(throws: SparkConnectError.TableOrViewNotFound) { + try await mergeInto.whenMatched().delete().merge() + } + try await #require(throws: SparkConnectError.TableOrViewNotFound) { + try await mergeInto.whenMatched("true").delete().merge() + } + } else { + try await #require(throws: Error.self) { + try await mergeInto.whenMatched().delete().merge() + } + try await #require(throws: Error.self) { + try await mergeInto.whenMatched("true").delete().merge() + } } }) await spark.stop() @@ -47,11 +56,20 @@ struct MergeIntoWriterTests { let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "") try await SQLHelper.withTable(spark, tableName)({ let mergeInto = try await spark.range(1).mergeInto(tableName, "true") - try await #require(throws: SparkConnectError.TableOrViewNotFound) { - try await mergeInto.whenNotMatched().insertAll().merge() - } - try await #require(throws: SparkConnectError.TableOrViewNotFound) { - try await mergeInto.whenNotMatched("true").insertAll().merge() + if await spark.version >= "4.0.0" { + try await #require(throws: SparkConnectError.TableOrViewNotFound) { + try await mergeInto.whenNotMatched().insertAll().merge() + } + try await #require(throws: SparkConnectError.TableOrViewNotFound) { + try await mergeInto.whenNotMatched("true").insertAll().merge() + } + } else { + try await #require(throws: Error.self) { + try await mergeInto.whenNotMatched().insertAll().merge() + } + try await #require(throws: Error.self) { + try await mergeInto.whenNotMatched("true").insertAll().merge() + } } }) await spark.stop() @@ -63,11 +81,20 @@ struct MergeIntoWriterTests { let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "") try await SQLHelper.withTable(spark, tableName)({ let mergeInto = try await spark.range(1).mergeInto(tableName, "true") - try await #require(throws: SparkConnectError.TableOrViewNotFound) { - try await mergeInto.whenNotMatchedBySource().delete().merge() - } - try await #require(throws: SparkConnectError.TableOrViewNotFound) { - try await mergeInto.whenNotMatchedBySource("true").delete().merge() + if await spark.version >= "4.0.0" { + try await #require(throws: SparkConnectError.TableOrViewNotFound) { + try await mergeInto.whenNotMatchedBySource().delete().merge() + } + try await #require(throws: SparkConnectError.TableOrViewNotFound) { + try await mergeInto.whenNotMatchedBySource("true").delete().merge() + } + } else { + try await #require(throws: Error.self) { + try await mergeInto.whenNotMatchedBySource().delete().merge() + } + try await #require(throws: Error.self) { + try await mergeInto.whenNotMatchedBySource("true").delete().merge() + } } }) await spark.stop()