diff --git a/Sources/SparkConnect/DataFrame.swift b/Sources/SparkConnect/DataFrame.swift index 393168e..eda8bda 100644 --- a/Sources/SparkConnect/DataFrame.swift +++ b/Sources/SparkConnect/DataFrame.swift @@ -299,7 +299,20 @@ public actor DataFrame: Sendable { ), interceptors: spark.client.getIntercepters() ) { client in - return try await f(client) + do { + return try await f(client) + } catch let error as RPCError where error.code == .internalError { + switch error.message { + case let m where m.contains("CATALOG_NOT_FOUND"): + throw SparkConnectError.CatalogNotFound + case let m where m.contains("SCHEMA_NOT_FOUND"): + throw SparkConnectError.SchemaNotFound + case let m where m.contains("TABLE_OR_VIEW_NOT_FOUND"): + throw SparkConnectError.TableOrViewNotFound + default: + throw error + } + } } } diff --git a/Sources/SparkConnect/SparkConnectError.swift b/Sources/SparkConnect/SparkConnectError.swift index cff40d7..c300075 100644 --- a/Sources/SparkConnect/SparkConnectError.swift +++ b/Sources/SparkConnect/SparkConnectError.swift @@ -19,8 +19,11 @@ /// A enum for ``SparkConnect`` package errors public enum SparkConnectError: Error { + case CatalogNotFound case InvalidArgument case InvalidSessionID case InvalidType + case SchemaNotFound + case TableOrViewNotFound case UnsupportedOperation } diff --git a/Tests/SparkConnectTests/CatalogTests.swift b/Tests/SparkConnectTests/CatalogTests.swift index 053daf9..30f91b0 100644 --- a/Tests/SparkConnectTests/CatalogTests.swift +++ b/Tests/SparkConnectTests/CatalogTests.swift @@ -37,8 +37,14 @@ struct CatalogTests { func setCurrentCatalog() async throws { let spark = try await SparkSession.builder.getOrCreate() try await spark.catalog.setCurrentCatalog("spark_catalog") - try await #require(throws: Error.self) { - try await spark.catalog.setCurrentCatalog("not_exist_catalog") + if await spark.version >= "4.0.0" { + try await #require(throws: SparkConnectError.CatalogNotFound) { + try await spark.catalog.setCurrentCatalog("not_exist_catalog") + } + } else { + try await #require(throws: Error.self) { + try await spark.catalog.setCurrentCatalog("not_exist_catalog") + } } await spark.stop() } @@ -63,7 +69,7 @@ struct CatalogTests { func setCurrentDatabase() async throws { let spark = try await SparkSession.builder.getOrCreate() try await spark.catalog.setCurrentDatabase("default") - try await #require(throws: Error.self) { + try await #require(throws: SparkConnectError.SchemaNotFound) { try await spark.catalog.setCurrentDatabase("not_exist_database") } await spark.stop() @@ -91,7 +97,7 @@ struct CatalogTests { #expect(db.catalog == "spark_catalog") #expect(db.description == "default database") #expect(db.locationUri.hasSuffix("spark-warehouse")) - try await #require(throws: Error.self) { + try await #require(throws: SparkConnectError.SchemaNotFound) { try await spark.catalog.getDatabase("not_exist_database") } await spark.stop() @@ -313,7 +319,7 @@ struct CatalogTests { try await spark.catalog.cacheTable(tableName, StorageLevel.MEMORY_ONLY) }) - try await #require(throws: Error.self) { + try await #require(throws: SparkConnectError.TableOrViewNotFound) { try await spark.catalog.cacheTable("not_exist_table") } await spark.stop() @@ -330,7 +336,7 @@ struct CatalogTests { #expect(try await spark.catalog.isCached(tableName)) }) - try await #require(throws: Error.self) { + try await #require(throws: SparkConnectError.TableOrViewNotFound) { try await spark.catalog.isCached("not_exist_table") } await spark.stop() @@ -351,7 +357,7 @@ struct CatalogTests { #expect(try await spark.catalog.isCached(tableName)) }) - try await #require(throws: Error.self) { + try await #require(throws: SparkConnectError.TableOrViewNotFound) { try await spark.catalog.refreshTable("not_exist_table") } await spark.stop() @@ -386,7 +392,7 @@ struct CatalogTests { #expect(try await spark.catalog.isCached(tableName) == false) }) - try await #require(throws: Error.self) { + try await #require(throws: SparkConnectError.TableOrViewNotFound) { try await spark.catalog.uncacheTable("not_exist_table") } await spark.stop()