From 8044c3076bdd0657f4a23747589214567bfe2fa4 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Fri, 30 May 2025 16:58:19 -0700 Subject: [PATCH] [SPARK-52361] Support `executeCommand` in `SparkSession` --- Sources/SparkConnect/SparkConnectClient.swift | 20 +++++++++++++++++++ Sources/SparkConnect/SparkConnectError.swift | 1 + Sources/SparkConnect/SparkSession.swift | 7 +++++++ .../SparkConnectTests/SparkSessionTests.swift | 12 +++++++++++ 4 files changed, 40 insertions(+) diff --git a/Sources/SparkConnect/SparkConnectClient.swift b/Sources/SparkConnect/SparkConnectClient.swift index f742973..03a6ffe 100644 --- a/Sources/SparkConnect/SparkConnectClient.swift +++ b/Sources/SparkConnect/SparkConnectClient.swift @@ -141,6 +141,8 @@ public actor SparkConnectClient { throw SparkConnectError.TableOrViewNotFound case let m where m.contains("Invalid view name:"): throw SparkConnectError.InvalidViewName + case let m where m.contains("DATA_SOURCE_NOT_FOUND"): + throw SparkConnectError.DataSourceNotFound default: throw error } @@ -715,6 +717,24 @@ public actor SparkConnectClient { return result } + func getExecuteExternalCommand( + _ runner: String, + _ command: String, + _ options: [String: String] + ) async throws -> Plan { + var executeExternalCommand = Spark_Connect_ExecuteExternalCommand() + executeExternalCommand.runner = runner + executeExternalCommand.command = command + executeExternalCommand.options = options + var command = Command() + command.commandType = .executeExternalCommand(executeExternalCommand) + let response = try await execute(self.sessionID!, command) + let relation = response.first!.sqlCommandResult.relation + var plan = Plan() + plan.opType = .root(relation) + return plan + } + /// Add a tag to be assigned to all the operations started by this thread in this session. /// - Parameter tag: The tag to be added. Cannot contain ',' (comma) character or be an empty string. public func addTag(tag: String) throws { diff --git a/Sources/SparkConnect/SparkConnectError.swift b/Sources/SparkConnect/SparkConnectError.swift index 890e16c..9c8d1c6 100644 --- a/Sources/SparkConnect/SparkConnectError.swift +++ b/Sources/SparkConnect/SparkConnectError.swift @@ -21,6 +21,7 @@ public enum SparkConnectError: Error { case CatalogNotFound case ColumnNotFound + case DataSourceNotFound case InvalidArgument case InvalidSessionID case InvalidType diff --git a/Sources/SparkConnect/SparkSession.swift b/Sources/SparkConnect/SparkSession.swift index ed5bd38..b22ee14 100644 --- a/Sources/SparkConnect/SparkSession.swift +++ b/Sources/SparkConnect/SparkSession.swift @@ -267,6 +267,13 @@ public actor SparkSession { return await read.table(tableName) } + public func executeCommand(_ runner: String, _ command: String, _ options: [String: String]) + async throws -> DataFrame + { + let plan = try await self.client.getExecuteExternalCommand(runner, command, options) + return DataFrame(spark: self, plan: plan) + } + /// Executes a code block and prints the execution time. /// /// This utility method is useful for performance testing and optimization. diff --git a/Tests/SparkConnectTests/SparkSessionTests.swift b/Tests/SparkConnectTests/SparkSessionTests.swift index ab3af28..deece09 100644 --- a/Tests/SparkConnectTests/SparkSessionTests.swift +++ b/Tests/SparkConnectTests/SparkSessionTests.swift @@ -141,6 +141,18 @@ struct SparkSessionTests { } await spark.stop() } + + @Test + func executeCommand() async throws { + await SparkSession.builder.clear() + let spark = try await SparkSession.builder.getOrCreate() + if await spark.version.starts(with: "4.") { + await #expect(throws: SparkConnectError.DataSourceNotFound) { + try await spark.executeCommand("runner", "command", [:]).show() + } + } + await spark.stop() + } #endif @Test