Skip to content

Initial Implementation #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Apache Spark Connect Client for Swift

[![GitHub Actions Build](https://github.com/apache/spark-connect-swift/actions/workflows/build_and_test.yml/badge.svg)](https://github.com/apache/spark-connect-swift/blob/main/.github/workflows/build_and_test.yml)
[![Swift Version Compatibility](https://img.shields.io/endpoint?url=https%3A%2F%2Fswiftpackageindex.com%2Fapi%2Fpackages%2Fdongjoon-hyun%2Fspark-connect-swift%2Fbadge%3Ftype%3Dswift-versions)](https://swiftpackageindex.com/dongjoon-hyun/spark-connect-swift)
[![Platform Compatibility](https://img.shields.io/endpoint?url=https%3A%2F%2Fswiftpackageindex.com%2Fapi%2Fpackages%2Fdongjoon-hyun%2Fspark-connect-swift%2Fbadge%3Ftype%3Dplatforms)](https://swiftpackageindex.com/dongjoon-hyun/spark-connect-swift)

This is an experimental Swift library to show how to connect to a remote Apache Spark Connect Server and run SQL statements to manipulate remote data.

Expand All @@ -13,3 +15,96 @@ So far, this library project is tracking the upstream changes like the [Apache S
- [gRPC Swift Protobuf 1.0 (March 2025)](https://github.com/grpc/grpc-swift-protobuf/releases/tag/1.1.0)
- [gRPC Swift NIO Transport 1.0 (March 2025)](https://github.com/grpc/grpc-swift-nio-transport/releases/tag/1.0.1)
- [Apache Arrow Swift](https://github.com/apache/arrow/tree/main/swift)

## Run `Apache Spark 4.0.0 RC2 Connect Server`

$ curl -LO https://dist.apache.org/repos/dist/dev/spark/v4.0.0-rc2-bin/spark-4.0.0-bin-hadoop3.tgz
$ tar xvfz spark-4.0.0-bin-hadoop3.tgz
$ cd spark-4.0.0-bin-hadoop3
$ sbin/start-connect-server.sh

## Run tests

```
$ cd spark-connect-swift
$ swift test
```

## How to use in your apps

Create a Swift project.
```
$ mkdir SparkConnectSwiftApp
$ cd SparkConnectSwiftApp
$ swift package init --name SparkConnectSwiftApp --type executable
```

Add `SparkConnect` package to the dependency like the following
```
$ cat Package.swift
import PackageDescription

let package = Package(
name: "SparkConnectSwiftApp",
platforms: [
.macOS(.v15)
],
dependencies: [
.package(url: "https://github.com/dongjoon-hyun/spark-connect-swift.git", branch: "main")
],
targets: [
.executableTarget(
name: "SparkConnectSwiftApp",
dependencies: [.product(name: "SparkConnect", package: "spark-connect-swift")]
)
]
)
```

Use `SparkSession` of `SparkConnect` module in Swift.

```
$ cat Sources/main.swift

import SparkConnect

let spark = try await SparkSession.builder.getOrCreate()
print("Connected to Apache Spark \(await spark.version) Server")

let statements = [
"DROP TABLE IF EXISTS t",
"CREATE TABLE IF NOT EXISTS t(a INT)",
"INSERT INTO t VALUES (1), (2), (3)",
]

for s in statements {
print("EXECUTE: \(s)")
_ = try await spark.sql(s).count()
}
print("SELECT * FROM t")
try await spark.sql("SELECT * FROM t").show()

await spark.stop()
```

Run your Swift application.

```
$ swift run
...
Connected to Apache Spark 4.0.0 Server
EXECUTE: DROP TABLE IF EXISTS t
EXECUTE: CREATE TABLE IF NOT EXISTS t(a INT)
EXECUTE: INSERT INTO t VALUES (1), (2), (3)
SELECT * FROM t
+---+
| a |
+---+
| 2 |
| 1 |
| 3 |
+---+
```

You can find this example in the following repository.
- https://github.com/dongjoon-hyun/spark-connect-swift-app
195 changes: 195 additions & 0 deletions Sources/SparkConnect/DataFrame.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
import Atomics
import Foundation
import GRPCCore
import GRPCNIOTransportHTTP2
import GRPCProtobuf
import NIOCore
import SwiftyTextTable
import Synchronization

/// A DataFrame which supports only SQL queries
public actor DataFrame: Sendable {
var spark: SparkSession
var plan: Plan
var schema: DataType? = nil
private var batches: [RecordBatch] = [RecordBatch]()

/// Create a new `DataFrame`instance with the given Spark session and plan.
/// - Parameters:
/// - spark: A ``SparkSession`` instance to use.
/// - plan: A plan to execute.
init(spark: SparkSession, plan: Plan) async throws {
self.spark = spark
self.plan = plan
}

/// Create a new `DataFrame` instance with the given SparkSession and a SQL statement.
/// - Parameters:
/// - spark: A `SparkSession` instance to use.
/// - sqlText: A SQL statement.
init(spark: SparkSession, sqlText: String) async throws {
self.spark = spark
self.plan = sqlText.toSparkConnectPlan
}

/// Set the schema. This is used to store the analized schema response from `Spark Connect` server.
/// - Parameter schema: <#schema description#>
private func setSchema(_ schema: DataType) {
self.schema = schema
}

/// Add `Apache Arrow`'s `RecordBatch`s to the intenal array.
/// - Parameter batches: A ``RecordBatch`` instance.
private func addBathes(_ batches: [RecordBatch]) {
self.batches.append(contentsOf: batches)
}

/// A method to access the underlying Spark's `RDD`.
/// In `Spark Connect`, this feature is not allowed by design.
public func rdd() throws {
// SQLSTATE: 0A000
// [UNSUPPORTED_CONNECT_FEATURE.RDD]
// Feature is not supported in Spark Connect: Resilient Distributed Datasets (RDDs).
throw SparkConnectError.UnsupportedOperationException
}

/// Return a `JSON` string of data type because we cannot expose the internal type ``DataType``.
/// - Returns: a `JSON` string.
public func schema() async throws -> String {
var dataType: String? = nil

try await withGRPCClient(
transport: .http2NIOPosix(
target: .dns(host: spark.client.host, port: spark.client.port),
transportSecurity: .plaintext
)
) { client in
let service = Spark_Connect_SparkConnectService.Client(wrapping: client)
let response = try await service.analyzePlan(
spark.client.getAnalyzePlanRequest(spark.sessionID, plan))
dataType = try response.schema.schema.jsonString()
}
return dataType!
}

/// Return the total number of rows.
/// - Returns: a `Int64` value.
public func count() async throws -> Int64 {
let counter = Atomic(Int64(0))

try await withGRPCClient(
transport: .http2NIOPosix(
target: .dns(host: spark.client.host, port: spark.client.port),
transportSecurity: .plaintext
)
) { client in
let service = Spark_Connect_SparkConnectService.Client(wrapping: client)
try await service.executePlan(spark.client.getExecutePlanRequest(spark.sessionID, plan)) {
response in
for try await m in response.messages {
counter.add(m.arrowBatch.rowCount, ordering: .relaxed)
}
}
}
return counter.load(ordering: .relaxed)
}

/// Execute the plan and try to fill `schema` and `batches`.
private func execute() async throws {
try await withGRPCClient(
transport: .http2NIOPosix(
target: .dns(host: spark.client.host, port: spark.client.port),
transportSecurity: .plaintext
)
) { client in
let service = Spark_Connect_SparkConnectService.Client(wrapping: client)
try await service.executePlan(spark.client.getExecutePlanRequest(spark.sessionID, plan)) {
response in
for try await m in response.messages {
if m.hasSchema {
// The original schema should arrive before ArrowBatches
await self.setSchema(m.schema)
}
let ipcStreamBytes = m.arrowBatch.data
if !ipcStreamBytes.isEmpty && m.arrowBatch.rowCount > 0 {
let IPC_CONTINUATION_TOKEN = Int32(-1)
// Schema
assert(ipcStreamBytes[0..<4].int32 == IPC_CONTINUATION_TOKEN)
let schemaSize = Int64(ipcStreamBytes[4..<8].int32)
let schema = Data(ipcStreamBytes[8..<(8 + schemaSize)])

// Arrow IPC Data
assert(
ipcStreamBytes[(8 + schemaSize)..<(8 + schemaSize + 4)].int32
== IPC_CONTINUATION_TOKEN)
var pos: Int64 = 8 + schemaSize + 4
let dataHeaderSize = Int64(ipcStreamBytes[pos..<(pos + 4)].int32)
pos += 4
let dataHeader = Data(ipcStreamBytes[pos..<(pos + dataHeaderSize)])
pos += dataHeaderSize
let dataBodySize = Int64(ipcStreamBytes.count) - pos - 8
let dataBody = Data(ipcStreamBytes[pos..<(pos + dataBodySize)])

// Read ArrowBatches
let reader = ArrowReader()
let arrowResult = ArrowReader.makeArrowReaderResult()
_ = reader.fromMessage(schema, dataBody: Data(), result: arrowResult)
_ = reader.fromMessage(dataHeader, dataBody: dataBody, result: arrowResult)
await self.addBathes(arrowResult.batches)
}
}
}
}
}

/// This is designed not to support this feature in order to simplify the Swift client.
public func collect() async throws {
throw SparkConnectError.UnsupportedOperationException
}

/// Execute the plan and show the result.
public func show() async throws {
try await execute()

if let schema = self.schema {
var columns: [TextTableColumn] = []
for f in schema.struct.fields {
columns.append(TextTableColumn(header: f.name))
}
var table = TextTable(columns: columns)
for batch in self.batches {
for i in 0..<batch.length {
var values: [String] = []
for column in batch.columns {
let str = column.array as! AsString
if column.data.isNull(i) {
values.append("NULL")
} else {
values.append(str.asString(i))
}
}
table.addRow(values: values)
}
}
print(table.render())
}
}
}
Loading
Loading