From 603f0e4975138b99e8e5e661d855379e9c6e7fa5 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 26 May 2025 20:02:20 -0700 Subject: [PATCH 1/2] [SPARK-52319] Add `(Catalog|Schema|TableOrView)NotFound` to `SparkConnectError` --- Sources/SparkConnect/DataFrame.swift | 15 ++++++++++++++- Sources/SparkConnect/SparkConnectError.swift | 3 +++ Tests/SparkConnectTests/CatalogTests.swift | 14 +++++++------- 3 files changed, 24 insertions(+), 8 deletions(-) diff --git a/Sources/SparkConnect/DataFrame.swift b/Sources/SparkConnect/DataFrame.swift index 393168e..eda8bda 100644 --- a/Sources/SparkConnect/DataFrame.swift +++ b/Sources/SparkConnect/DataFrame.swift @@ -299,7 +299,20 @@ public actor DataFrame: Sendable { ), interceptors: spark.client.getIntercepters() ) { client in - return try await f(client) + do { + return try await f(client) + } catch let error as RPCError where error.code == .internalError { + switch error.message { + case let m where m.contains("CATALOG_NOT_FOUND"): + throw SparkConnectError.CatalogNotFound + case let m where m.contains("SCHEMA_NOT_FOUND"): + throw SparkConnectError.SchemaNotFound + case let m where m.contains("TABLE_OR_VIEW_NOT_FOUND"): + throw SparkConnectError.TableOrViewNotFound + default: + throw error + } + } } } diff --git a/Sources/SparkConnect/SparkConnectError.swift b/Sources/SparkConnect/SparkConnectError.swift index cff40d7..c300075 100644 --- a/Sources/SparkConnect/SparkConnectError.swift +++ b/Sources/SparkConnect/SparkConnectError.swift @@ -19,8 +19,11 @@ /// A enum for ``SparkConnect`` package errors public enum SparkConnectError: Error { + case CatalogNotFound case InvalidArgument case InvalidSessionID case InvalidType + case SchemaNotFound + case TableOrViewNotFound case UnsupportedOperation } diff --git a/Tests/SparkConnectTests/CatalogTests.swift b/Tests/SparkConnectTests/CatalogTests.swift index 053daf9..26924f5 100644 --- a/Tests/SparkConnectTests/CatalogTests.swift +++ b/Tests/SparkConnectTests/CatalogTests.swift @@ -37,7 +37,7 @@ struct CatalogTests { func setCurrentCatalog() async throws { let spark = try await SparkSession.builder.getOrCreate() try await spark.catalog.setCurrentCatalog("spark_catalog") - try await #require(throws: Error.self) { + try await #require(throws: SparkConnectError.CatalogNotFound) { try await spark.catalog.setCurrentCatalog("not_exist_catalog") } await spark.stop() @@ -63,7 +63,7 @@ struct CatalogTests { func setCurrentDatabase() async throws { let spark = try await SparkSession.builder.getOrCreate() try await spark.catalog.setCurrentDatabase("default") - try await #require(throws: Error.self) { + try await #require(throws: SparkConnectError.SchemaNotFound) { try await spark.catalog.setCurrentDatabase("not_exist_database") } await spark.stop() @@ -91,7 +91,7 @@ struct CatalogTests { #expect(db.catalog == "spark_catalog") #expect(db.description == "default database") #expect(db.locationUri.hasSuffix("spark-warehouse")) - try await #require(throws: Error.self) { + try await #require(throws: SparkConnectError.SchemaNotFound) { try await spark.catalog.getDatabase("not_exist_database") } await spark.stop() @@ -313,7 +313,7 @@ struct CatalogTests { try await spark.catalog.cacheTable(tableName, StorageLevel.MEMORY_ONLY) }) - try await #require(throws: Error.self) { + try await #require(throws: SparkConnectError.TableOrViewNotFound) { try await spark.catalog.cacheTable("not_exist_table") } await spark.stop() @@ -330,7 +330,7 @@ struct CatalogTests { #expect(try await spark.catalog.isCached(tableName)) }) - try await #require(throws: Error.self) { + try await #require(throws: SparkConnectError.TableOrViewNotFound) { try await spark.catalog.isCached("not_exist_table") } await spark.stop() @@ -351,7 +351,7 @@ struct CatalogTests { #expect(try await spark.catalog.isCached(tableName)) }) - try await #require(throws: Error.self) { + try await #require(throws: SparkConnectError.TableOrViewNotFound) { try await spark.catalog.refreshTable("not_exist_table") } await spark.stop() @@ -386,7 +386,7 @@ struct CatalogTests { #expect(try await spark.catalog.isCached(tableName) == false) }) - try await #require(throws: Error.self) { + try await #require(throws: SparkConnectError.TableOrViewNotFound) { try await spark.catalog.uncacheTable("not_exist_table") } await spark.stop() From b8473d49571080671b48470f7ee6279126a5c6e4 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 26 May 2025 20:18:19 -0700 Subject: [PATCH 2/2] handle Spark 3.5 --- Tests/SparkConnectTests/CatalogTests.swift | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/Tests/SparkConnectTests/CatalogTests.swift b/Tests/SparkConnectTests/CatalogTests.swift index 26924f5..30f91b0 100644 --- a/Tests/SparkConnectTests/CatalogTests.swift +++ b/Tests/SparkConnectTests/CatalogTests.swift @@ -37,8 +37,14 @@ struct CatalogTests { func setCurrentCatalog() async throws { let spark = try await SparkSession.builder.getOrCreate() try await spark.catalog.setCurrentCatalog("spark_catalog") - try await #require(throws: SparkConnectError.CatalogNotFound) { - try await spark.catalog.setCurrentCatalog("not_exist_catalog") + if await spark.version >= "4.0.0" { + try await #require(throws: SparkConnectError.CatalogNotFound) { + try await spark.catalog.setCurrentCatalog("not_exist_catalog") + } + } else { + try await #require(throws: Error.self) { + try await spark.catalog.setCurrentCatalog("not_exist_catalog") + } } await spark.stop() }