From 263d200bdcdbfd83536b8d28dd8609d0f3fc0b48 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 11 Mar 2025 20:52:26 -0700 Subject: [PATCH] [SPARK-51481] Add `RuntimeConf` actor --- Sources/SparkConnect/RuntimeConf.swift | 57 ++++++++++++ Sources/SparkConnect/SparkConnectClient.swift | 29 +++++++ .../SparkConnectTests/RuntimeConfTests.swift | 87 +++++++++++++++++++ 3 files changed, 173 insertions(+) create mode 100644 Sources/SparkConnect/RuntimeConf.swift create mode 100644 Tests/SparkConnectTests/RuntimeConfTests.swift diff --git a/Sources/SparkConnect/RuntimeConf.swift b/Sources/SparkConnect/RuntimeConf.swift new file mode 100644 index 0000000..62dfa3d --- /dev/null +++ b/Sources/SparkConnect/RuntimeConf.swift @@ -0,0 +1,57 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +/// User-facing configuration API, accessible through `SparkSession.conf`. +public actor RuntimeConf { + private let client: SparkConnectClient + + /// Create a `RuntimeConf` instance with the given client. + /// - Parameter client: A client to talk to the Spark Connect server. + init(_ client: SparkConnectClient) { + self.client = client + } + + /// Set a new configuration. + /// - Parameters: + /// - key: A string for the configuration key. + /// - value: A string for the configuration value. + public func set(_ key: String, _ value: String) async throws { + _ = try await client.setConf(map: [key: value]) + } + + /// Reset a configuration. + /// - Parameters: + /// - key: A string for the configuration key. + public func unset(_ key: String) async throws { + _ = try await client.unsetConf(keys: [key]) + } + + /// Get a configuration. + /// - Parameter key: A string for the configuration look-up. + /// - Returns: A string for the configuration. + public func get(_ key: String) async throws -> String { + return try await client.getConf(key) + } + + /// Get all configurations. + /// - Returns: A map of configuration key-values. + public func getAll() async throws -> [String: String] { + return try await client.getConfAll() + } +} diff --git a/Sources/SparkConnect/SparkConnectClient.swift b/Sources/SparkConnect/SparkConnectClient.swift index 0fa3d15..92cf56e 100644 --- a/Sources/SparkConnect/SparkConnectClient.swift +++ b/Sources/SparkConnect/SparkConnectClient.swift @@ -107,6 +107,35 @@ public actor SparkConnectClient { } } + /// Create a ``ConfigRequest`` instance for `Unset` operation. + /// - Parameter key: A string for key to unset. + /// - Returns: A ``ConfigRequest`` instance. + func getConfigRequestUnset(keys: [String]) -> ConfigRequest { + var request = ConfigRequest() + request.operation = ConfigRequest.Operation() + var unset = ConfigRequest.Unset() + unset.keys = keys + request.operation.opType = .unset(unset) + return request + } + + func unsetConf(keys: [String]) async throws -> Bool { + try await withGRPCClient( + transport: .http2NIOPosix( + target: .dns(host: self.host, port: self.port), + transportSecurity: .plaintext + ) + ) { client in + let service = SparkConnectService.Client(wrapping: client) + var request = getConfigRequestUnset(keys: keys) + request.clientType = clientType + request.userContext = userContext + request.sessionID = self.sessionID! + let _ = try await service.config(request) + return true + } + } + /// Create a ``ConfigRequest`` instance for `Get` operation. /// - Parameter keys: An array of keys to get. /// - Returns: A `ConfigRequest` instance. diff --git a/Tests/SparkConnectTests/RuntimeConfTests.swift b/Tests/SparkConnectTests/RuntimeConfTests.swift new file mode 100644 index 0000000..5b1fa3c --- /dev/null +++ b/Tests/SparkConnectTests/RuntimeConfTests.swift @@ -0,0 +1,87 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +import Foundation +import Testing + +@testable import SparkConnect + +/// A test suite for `RuntimeConf` +@Suite(.serialized) +struct RuntimeConfTests { + @Test + func get() async throws { + let client = SparkConnectClient(remote: "sc://localhost", user: "test") + _ = try await client.connect(UUID().uuidString) + let conf = RuntimeConf(client) + + #expect(try await conf.get("spark.app.name") == "Spark Connect server") + + try await #require(throws: Error.self) { + try await conf.get("spark.test.non-exist") + } + + await client.stop() + } + + @Test + func set() async throws { + let client = SparkConnectClient(remote: "sc://localhost", user: "test") + _ = try await client.connect(UUID().uuidString) + let conf = RuntimeConf(client) + try await conf.set("spark.test.key1", "value1") + #expect(try await conf.get("spark.test.key1") == "value1") + await client.stop() + } + + @Test + func reset() async throws { + let client = SparkConnectClient(remote: "sc://localhost", user: "test") + _ = try await client.connect(UUID().uuidString) + let conf = RuntimeConf(client) + + // Success with a key that doesn't exist + try await conf.unset("spark.test.key1") + + // Make it sure that `spark.test.key1` exists before testing `reset`. + try await conf.set("spark.test.key1", "value1") + #expect(try await conf.get("spark.test.key1") == "value1") + + try await conf.unset("spark.test.key1") + try await #require(throws: Error.self) { + try await conf.get("spark.test.key1") + } + + await client.stop() + } + + @Test + func getAll() async throws { + let client = SparkConnectClient(remote: "sc://localhost", user: "test") + _ = try await client.connect(UUID().uuidString) + let conf = RuntimeConf(client) + let map = try await conf.getAll() + #expect(map.count > 0) + #expect(map["spark.app.id"] != nil) + #expect(map["spark.app.startTime"] != nil) + #expect(map["spark.executor.id"] == "driver") + #expect(map["spark.master"] != nil) + await client.stop() + } +}