diff --git a/README.md b/README.md index f7cfb41..078df2e 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,8 @@ # Apache Spark Connect Client for Swift [![GitHub Actions Build](https://github.com/apache/spark-connect-swift/actions/workflows/build_and_test.yml/badge.svg)](https://github.com/apache/spark-connect-swift/blob/main/.github/workflows/build_and_test.yml) +[![Swift Version Compatibility](https://img.shields.io/endpoint?url=https%3A%2F%2Fswiftpackageindex.com%2Fapi%2Fpackages%2Fdongjoon-hyun%2Fspark-connect-swift%2Fbadge%3Ftype%3Dswift-versions)](https://swiftpackageindex.com/dongjoon-hyun/spark-connect-swift) +[![Platform Compatibility](https://img.shields.io/endpoint?url=https%3A%2F%2Fswiftpackageindex.com%2Fapi%2Fpackages%2Fdongjoon-hyun%2Fspark-connect-swift%2Fbadge%3Ftype%3Dplatforms)](https://swiftpackageindex.com/dongjoon-hyun/spark-connect-swift) This is an experimental Swift library to show how to connect to a remote Apache Spark Connect Server and run SQL statements to manipulate remote data. @@ -13,3 +15,96 @@ So far, this library project is tracking the upstream changes like the [Apache S - [gRPC Swift Protobuf 1.0 (March 2025)](https://github.com/grpc/grpc-swift-protobuf/releases/tag/1.1.0) - [gRPC Swift NIO Transport 1.0 (March 2025)](https://github.com/grpc/grpc-swift-nio-transport/releases/tag/1.0.1) - [Apache Arrow Swift](https://github.com/apache/arrow/tree/main/swift) + +## Run `Apache Spark 4.0.0 RC2 Connect Server` + + $ curl -LO https://dist.apache.org/repos/dist/dev/spark/v4.0.0-rc2-bin/spark-4.0.0-bin-hadoop3.tgz + $ tar xvfz spark-4.0.0-bin-hadoop3.tgz + $ cd spark-4.0.0-bin-hadoop3 + $ sbin/start-connect-server.sh + +## Run tests + +``` +$ cd spark-connect-swift +$ swift test +``` + +## How to use in your apps + +Create a Swift project. +``` +$ mkdir SparkConnectSwiftApp +$ cd SparkConnectSwiftApp +$ swift package init --name SparkConnectSwiftApp --type executable +``` + +Add `SparkConnect` package to the dependency like the following +``` +$ cat Package.swift +import PackageDescription + +let package = Package( + name: "SparkConnectSwiftApp", + platforms: [ + .macOS(.v15) + ], + dependencies: [ + .package(url: "https://github.com/dongjoon-hyun/spark-connect-swift.git", branch: "main") + ], + targets: [ + .executableTarget( + name: "SparkConnectSwiftApp", + dependencies: [.product(name: "SparkConnect", package: "spark-connect-swift")] + ) + ] +) +``` + +Use `SparkSession` of `SparkConnect` module in Swift. + +``` +$ cat Sources/main.swift + +import SparkConnect + +let spark = try await SparkSession.builder.getOrCreate() +print("Connected to Apache Spark \(await spark.version) Server") + +let statements = [ + "DROP TABLE IF EXISTS t", + "CREATE TABLE IF NOT EXISTS t(a INT)", + "INSERT INTO t VALUES (1), (2), (3)", +] + +for s in statements { + print("EXECUTE: \(s)") + _ = try await spark.sql(s).count() +} +print("SELECT * FROM t") +try await spark.sql("SELECT * FROM t").show() + +await spark.stop() +``` + +Run your Swift application. + +``` +$ swift run +... +Connected to Apache Spark 4.0.0 Server +EXECUTE: DROP TABLE IF EXISTS t +EXECUTE: CREATE TABLE IF NOT EXISTS t(a INT) +EXECUTE: INSERT INTO t VALUES (1), (2), (3) +SELECT * FROM t ++---+ +| a | ++---+ +| 2 | +| 1 | +| 3 | ++---+ +``` + +You can find this example in the following repository. +- https://github.com/dongjoon-hyun/spark-connect-swift-app diff --git a/Sources/SparkConnect/DataFrame.swift b/Sources/SparkConnect/DataFrame.swift new file mode 100644 index 0000000..ab81072 --- /dev/null +++ b/Sources/SparkConnect/DataFrame.swift @@ -0,0 +1,195 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +import Atomics +import Foundation +import GRPCCore +import GRPCNIOTransportHTTP2 +import GRPCProtobuf +import NIOCore +import SwiftyTextTable +import Synchronization + +/// A DataFrame which supports only SQL queries +public actor DataFrame: Sendable { + var spark: SparkSession + var plan: Plan + var schema: DataType? = nil + private var batches: [RecordBatch] = [RecordBatch]() + + /// Create a new `DataFrame`instance with the given Spark session and plan. + /// - Parameters: + /// - spark: A ``SparkSession`` instance to use. + /// - plan: A plan to execute. + init(spark: SparkSession, plan: Plan) async throws { + self.spark = spark + self.plan = plan + } + + /// Create a new `DataFrame` instance with the given SparkSession and a SQL statement. + /// - Parameters: + /// - spark: A `SparkSession` instance to use. + /// - sqlText: A SQL statement. + init(spark: SparkSession, sqlText: String) async throws { + self.spark = spark + self.plan = sqlText.toSparkConnectPlan + } + + /// Set the schema. This is used to store the analized schema response from `Spark Connect` server. + /// - Parameter schema: <#schema description#> + private func setSchema(_ schema: DataType) { + self.schema = schema + } + + /// Add `Apache Arrow`'s `RecordBatch`s to the intenal array. + /// - Parameter batches: A ``RecordBatch`` instance. + private func addBathes(_ batches: [RecordBatch]) { + self.batches.append(contentsOf: batches) + } + + /// A method to access the underlying Spark's `RDD`. + /// In `Spark Connect`, this feature is not allowed by design. + public func rdd() throws { + // SQLSTATE: 0A000 + // [UNSUPPORTED_CONNECT_FEATURE.RDD] + // Feature is not supported in Spark Connect: Resilient Distributed Datasets (RDDs). + throw SparkConnectError.UnsupportedOperationException + } + + /// Return a `JSON` string of data type because we cannot expose the internal type ``DataType``. + /// - Returns: a `JSON` string. + public func schema() async throws -> String { + var dataType: String? = nil + + try await withGRPCClient( + transport: .http2NIOPosix( + target: .dns(host: spark.client.host, port: spark.client.port), + transportSecurity: .plaintext + ) + ) { client in + let service = Spark_Connect_SparkConnectService.Client(wrapping: client) + let response = try await service.analyzePlan( + spark.client.getAnalyzePlanRequest(spark.sessionID, plan)) + dataType = try response.schema.schema.jsonString() + } + return dataType! + } + + /// Return the total number of rows. + /// - Returns: a `Int64` value. + public func count() async throws -> Int64 { + let counter = Atomic(Int64(0)) + + try await withGRPCClient( + transport: .http2NIOPosix( + target: .dns(host: spark.client.host, port: spark.client.port), + transportSecurity: .plaintext + ) + ) { client in + let service = Spark_Connect_SparkConnectService.Client(wrapping: client) + try await service.executePlan(spark.client.getExecutePlanRequest(spark.sessionID, plan)) { + response in + for try await m in response.messages { + counter.add(m.arrowBatch.rowCount, ordering: .relaxed) + } + } + } + return counter.load(ordering: .relaxed) + } + + /// Execute the plan and try to fill `schema` and `batches`. + private func execute() async throws { + try await withGRPCClient( + transport: .http2NIOPosix( + target: .dns(host: spark.client.host, port: spark.client.port), + transportSecurity: .plaintext + ) + ) { client in + let service = Spark_Connect_SparkConnectService.Client(wrapping: client) + try await service.executePlan(spark.client.getExecutePlanRequest(spark.sessionID, plan)) { + response in + for try await m in response.messages { + if m.hasSchema { + // The original schema should arrive before ArrowBatches + await self.setSchema(m.schema) + } + let ipcStreamBytes = m.arrowBatch.data + if !ipcStreamBytes.isEmpty && m.arrowBatch.rowCount > 0 { + let IPC_CONTINUATION_TOKEN = Int32(-1) + // Schema + assert(ipcStreamBytes[0..<4].int32 == IPC_CONTINUATION_TOKEN) + let schemaSize = Int64(ipcStreamBytes[4..<8].int32) + let schema = Data(ipcStreamBytes[8..<(8 + schemaSize)]) + + // Arrow IPC Data + assert( + ipcStreamBytes[(8 + schemaSize)..<(8 + schemaSize + 4)].int32 + == IPC_CONTINUATION_TOKEN) + var pos: Int64 = 8 + schemaSize + 4 + let dataHeaderSize = Int64(ipcStreamBytes[pos..<(pos + 4)].int32) + pos += 4 + let dataHeader = Data(ipcStreamBytes[pos..<(pos + dataHeaderSize)]) + pos += dataHeaderSize + let dataBodySize = Int64(ipcStreamBytes.count) - pos - 8 + let dataBody = Data(ipcStreamBytes[pos..<(pos + dataBodySize)]) + + // Read ArrowBatches + let reader = ArrowReader() + let arrowResult = ArrowReader.makeArrowReaderResult() + _ = reader.fromMessage(schema, dataBody: Data(), result: arrowResult) + _ = reader.fromMessage(dataHeader, dataBody: dataBody, result: arrowResult) + await self.addBathes(arrowResult.batches) + } + } + } + } + } + + /// This is designed not to support this feature in order to simplify the Swift client. + public func collect() async throws { + throw SparkConnectError.UnsupportedOperationException + } + + /// Execute the plan and show the result. + public func show() async throws { + try await execute() + + if let schema = self.schema { + var columns: [TextTableColumn] = [] + for f in schema.struct.fields { + columns.append(TextTableColumn(header: f.name)) + } + var table = TextTable(columns: columns) + for batch in self.batches { + for i in 0.. String { + sessionID + } + + /// A server-side generated session ID. This is supposed to be overwritten during establishing connections. + var serverSideSessionID: String = "" + + /// A variable for ``SparkContext``. This is designed to throw exceptions by Apache Spark. + var sparkContext: SparkContext { + get throws { + // SQLSTATE: 0A000 + // [UNSUPPORTED_CONNECT_FEATURE.SESSION_SPARK_CONTEXT] + // Feature is not supported in Spark Connect: Access to the SparkContext. + throw SparkConnectError.UnsupportedOperationException + } + } + + /// Stop the current client. + public func stop() async { + await client.stop() + } + + public func range(_ end: Int64) async throws -> DataFrame { + return try await range(0, end) + } + + public func range(_ start: Int64, _ end: Int64, _ step: Int64 = 1) async throws -> DataFrame { + return try await DataFrame(spark: self, plan: client.getPlanRange(start, end, step)) + } + + /// Create a ``DataFrame`` for the given SQL statement. + /// - Parameter sqlText: A SQL string. + /// - Returns: A ``DataFrame`` instance. + public func sql(_ sqlText: String) async throws -> DataFrame { + return try await DataFrame(spark: self, sqlText: sqlText) + } + + /// This is defined as the return type of `SparkSession.sparkContext` method. + /// This is an empty `Struct` type because `sparkContext` method is designed to throw + /// `UNSUPPORTED_CONNECT_FEATURE.SESSION_SPARK_CONTEXT`. + struct SparkContext { + } + + /// A builder to create ``SparkSession`` + public actor Builder { + var sparkConf: [String: String] = [:] + + /// Set a new configuration. + /// - Parameters: + /// - key: A string for the configuration key. + /// - value: A string for the configuration value. + /// - Returns: self + public func config(_ key: String, _ value: String) -> Builder { + sparkConf[key] = value + return self + } + + /// Remove all stored configurations. + /// - Returns: self + func clear() -> Builder { + sparkConf.removeAll() + return self + } + + /// Set a url for remote connection. + /// - Parameter url: A connection string in a pattern, `sc://{host}:{post}`. + /// - Returns: self + public func remote(_ url: String) -> Builder { + return config("spark.remote", url) + } + + /// Set `appName` of this session. + /// - Parameter name: A string for application name + /// - Returns: self + public func appName(_ name: String) -> Builder { + return config("spark.app.name", name) + } + + /// Enable `Apache Hive` metastore support configuration. + /// - Returns: self + func enableHiveSupport() -> Builder { + return config("spark.sql.catalogImplementation", "hive") + } + + /// Create a new ``SparkSession``. If `spark.remote` is not given, `sc://localhost:15002` is used. + /// - Returns: A newly created `SparkSession`. + func create() async throws -> SparkSession { + let session = SparkSession(sparkConf["spark.remote"] ?? "sc://localhost:15002") + let response = try await session.client.connect(session.sessionID) + await session.setVersion(response.sparkVersion.version) + let isSuccess = try await session.client.setConf(map: sparkConf) + assert(isSuccess) + return session + } + + /// Create a ``SparkSession`` from the given configurations. + /// - Returns: A spark session. + public func getOrCreate() async throws -> SparkSession { + return try await create() + } + } +} diff --git a/Tests/SparkConnectTests/BuilderTests.swift b/Tests/SparkConnectTests/BuilderTests.swift index f264525..91fee76 100644 --- a/Tests/SparkConnectTests/BuilderTests.swift +++ b/Tests/SparkConnectTests/BuilderTests.swift @@ -26,13 +26,25 @@ import Testing struct BuilderTests { @Test func builderDefault() async throws { + let spark = try await SparkSession.builder.getOrCreate() + #expect(await spark.client.clientType == "swift") + #expect(await spark.client.url.host() == "localhost") + #expect(await spark.client.url.port == 15002) + await spark.stop() } @Test func remote() async throws { + // Don't try to connect + let builder = await SparkSession.builder.remote("sc://spark:1234") + #expect(await builder.sparkConf["spark.remote"] == "sc://spark:1234") + _ = await builder.clear() } @Test func appName() async throws { + let builder = await SparkSession.builder.appName("TestApp") + #expect(await builder.sparkConf["spark.app.name"] == "TestApp") + try await builder.getOrCreate().stop() } } diff --git a/Tests/SparkConnectTests/DataFrameTests.swift b/Tests/SparkConnectTests/DataFrameTests.swift new file mode 100644 index 0000000..c7d15c0 --- /dev/null +++ b/Tests/SparkConnectTests/DataFrameTests.swift @@ -0,0 +1,109 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +import Testing + +@testable import SparkConnect + +/// A test suite for `DataFrame` +struct DataFrameTests { + @Test + func rdd() async throws { + let spark = try await SparkSession.builder.getOrCreate() + await #expect(throws: SparkConnectError.UnsupportedOperationException) { + try await spark.range(1).rdd() + } + await spark.stop() + } + + @Test + func schema() async throws { + let spark = try await SparkSession.builder.getOrCreate() + + let schema1 = try await spark.sql("SELECT 'a' as col1").schema() + #expect( + schema1 + == #"{"struct":{"fields":[{"name":"col1","dataType":{"string":{"collation":"UTF8_BINARY"}}}]}}"# + ) + + let schema2 = try await spark.sql("SELECT 'a' as col1, 'b' as col2").schema() + #expect( + schema2 + == #"{"struct":{"fields":[{"name":"col1","dataType":{"string":{"collation":"UTF8_BINARY"}}},{"name":"col2","dataType":{"string":{"collation":"UTF8_BINARY"}}}]}}"# + ) + + let emptySchema = try await spark.sql("DROP TABLE IF EXISTS nonexistent").schema() + #expect(emptySchema == #"{"struct":{}}"#) + await spark.stop() + } + + @Test + func count() async throws { + let spark = try await SparkSession.builder.getOrCreate() + #expect(try await spark.sql("SELECT 'spark' as swift").count() == 1) + #expect(try await spark.sql("SELECT * FROM RANGE(2025)").count() == 2025) + await spark.stop() + } + + @Test + func countNull() async throws { + let spark = try await SparkSession.builder.getOrCreate() + #expect(try await spark.sql("SELECT null").count() == 1) + await spark.stop() + } + + @Test + func table() async throws { + let spark = try await SparkSession.builder.getOrCreate() + #expect(try await spark.sql("DROP TABLE IF EXISTS t").count() == 0) + #expect(try await spark.sql("SHOW TABLES").count() == 0) + #expect(try await spark.sql("CREATE TABLE IF NOT EXISTS t(a INT)").count() == 0) + #expect(try await spark.sql("SHOW TABLES").count() == 1) + #expect(try await spark.sql("SELECT * FROM t").count() == 0) + #expect(try await spark.sql("INSERT INTO t VALUES (1), (2), (3)").count() == 0) + #expect(try await spark.sql("SELECT * FROM t").count() == 3) + await spark.stop() + } + + @Test + func show() async throws { + let spark = try await SparkSession.builder.getOrCreate() + try await spark.sql("SHOW TABLES").show() + try await spark.sql("SELECT * FROM VALUES (true, false)").show() + try await spark.sql("SELECT * FROM VALUES (1, 2)").show() + try await spark.sql("SELECT * FROM VALUES ('abc', 'def'), ('ghi', 'jkl')").show() + await spark.stop() + } + + @Test + func showNull() async throws { + let spark = try await SparkSession.builder.getOrCreate() + try await spark.sql( + "SELECT * FROM VALUES (1, true, 'abc'), (null, null, null), (3, false, 'def')" + ).show() + await spark.stop() + } + + @Test + func showCommand() async throws { + let spark = try await SparkSession.builder.getOrCreate() + try await spark.sql("DROP TABLE IF EXISTS t").show() + await spark.stop() + } +} diff --git a/Tests/SparkConnectTests/SparkSessionTests.swift b/Tests/SparkConnectTests/SparkSessionTests.swift new file mode 100644 index 0000000..c1837ab --- /dev/null +++ b/Tests/SparkConnectTests/SparkSessionTests.swift @@ -0,0 +1,74 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +import Foundation +import Testing + +@testable import SparkConnect + +/// A test suite for `SparkSession` +struct SparkSessionTests { + @Test + func sparkContext() async throws { + let spark = try await SparkSession.builder.getOrCreate() + await #expect(throws: SparkConnectError.UnsupportedOperationException) { + try await spark.sparkContext + } + await spark.stop() + } + + @Test + func stop() async throws { + let spark = try await SparkSession.builder.getOrCreate() + await spark.stop() + } + + @Test func userContext() async throws { + let spark = try await SparkSession.builder.getOrCreate() + let defaultUserContext = ProcessInfo.processInfo.userName.toUserContext + #expect(await spark.client.userContext == defaultUserContext) + await spark.stop() + } + + @Test + func version() async throws { + let spark = try await SparkSession.builder.getOrCreate() + #expect(await spark.version.starts(with: "4.0.0")) + await spark.stop() + } + + @Test + func conf() async throws { + let spark = try await SparkSession.builder.getOrCreate() + #expect(try await spark.conf.get("spark.app.name") == "Spark Connect server") + try await spark.conf.set("spark.x", "y") + #expect(try await spark.conf.get("spark.x") == "y") + #expect(try await spark.conf.getAll().count > 10) + await spark.stop() + } + + @Test + func range() async throws { + let spark = try await SparkSession.builder.getOrCreate() + #expect(try await spark.range(10).count() == 10) + #expect(try await spark.range(0, 100).count() == 100) + #expect(try await spark.range(0, 100, 2).count() == 50) + await spark.stop() + } +}