Skip to content

[SPARK-52361] Support executeCommand in SparkSession #187

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions Sources/SparkConnect/SparkConnectClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ public actor SparkConnectClient {
throw SparkConnectError.TableOrViewNotFound
case let m where m.contains("Invalid view name:"):
throw SparkConnectError.InvalidViewName
case let m where m.contains("DATA_SOURCE_NOT_FOUND"):
throw SparkConnectError.DataSourceNotFound
default:
throw error
}
Expand Down Expand Up @@ -715,6 +717,24 @@ public actor SparkConnectClient {
return result
}

func getExecuteExternalCommand(
_ runner: String,
_ command: String,
_ options: [String: String]
) async throws -> Plan {
var executeExternalCommand = Spark_Connect_ExecuteExternalCommand()
executeExternalCommand.runner = runner
executeExternalCommand.command = command
executeExternalCommand.options = options
var command = Command()
command.commandType = .executeExternalCommand(executeExternalCommand)
let response = try await execute(self.sessionID!, command)
let relation = response.first!.sqlCommandResult.relation
var plan = Plan()
plan.opType = .root(relation)
return plan
}

/// Add a tag to be assigned to all the operations started by this thread in this session.
/// - Parameter tag: The tag to be added. Cannot contain ',' (comma) character or be an empty string.
public func addTag(tag: String) throws {
Expand Down
1 change: 1 addition & 0 deletions Sources/SparkConnect/SparkConnectError.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
public enum SparkConnectError: Error {
case CatalogNotFound
case ColumnNotFound
case DataSourceNotFound
case InvalidArgument
case InvalidSessionID
case InvalidType
Expand Down
7 changes: 7 additions & 0 deletions Sources/SparkConnect/SparkSession.swift
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,13 @@ public actor SparkSession {
return await read.table(tableName)
}

public func executeCommand(_ runner: String, _ command: String, _ options: [String: String])
async throws -> DataFrame
{
let plan = try await self.client.getExecuteExternalCommand(runner, command, options)
return DataFrame(spark: self, plan: plan)
}

/// Executes a code block and prints the execution time.
///
/// This utility method is useful for performance testing and optimization.
Expand Down
12 changes: 12 additions & 0 deletions Tests/SparkConnectTests/SparkSessionTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,18 @@ struct SparkSessionTests {
}
await spark.stop()
}

@Test
func executeCommand() async throws {
await SparkSession.builder.clear()
let spark = try await SparkSession.builder.getOrCreate()
if await spark.version.starts(with: "4.") {
await #expect(throws: SparkConnectError.DataSourceNotFound) {
try await spark.executeCommand("runner", "command", [:]).show()
}
}
await spark.stop()
}
#endif

@Test
Expand Down
Loading