Skip to content

[SPARK-51483] Add SparkSession and DataFrame actors #10

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 195 additions & 0 deletions Sources/SparkConnect/DataFrame.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
import Atomics
import Foundation
import GRPCCore
import GRPCNIOTransportHTTP2
import GRPCProtobuf
import NIOCore
import SwiftyTextTable
import Synchronization

/// A DataFrame which supports only SQL queries
public actor DataFrame: Sendable {
var spark: SparkSession
var plan: Plan
var schema: DataType? = nil
private var batches: [RecordBatch] = [RecordBatch]()

/// Create a new `DataFrame`instance with the given Spark session and plan.
/// - Parameters:
/// - spark: A ``SparkSession`` instance to use.
/// - plan: A plan to execute.
init(spark: SparkSession, plan: Plan) async throws {
self.spark = spark
self.plan = plan
}

/// Create a new `DataFrame` instance with the given SparkSession and a SQL statement.
/// - Parameters:
/// - spark: A `SparkSession` instance to use.
/// - sqlText: A SQL statement.
init(spark: SparkSession, sqlText: String) async throws {
self.spark = spark
self.plan = sqlText.toSparkConnectPlan
}

/// Set the schema. This is used to store the analized schema response from `Spark Connect` server.
/// - Parameter schema: <#schema description#>
private func setSchema(_ schema: DataType) {
self.schema = schema
}

/// Add `Apache Arrow`'s `RecordBatch`s to the internal array.
/// - Parameter batches: An array of ``RecordBatch``.
private func addBathes(_ batches: [RecordBatch]) {
self.batches.append(contentsOf: batches)
}

/// A method to access the underlying Spark's `RDD`.
/// In `Spark Connect`, this feature is not allowed by design.
public func rdd() throws {
// SQLSTATE: 0A000
// [UNSUPPORTED_CONNECT_FEATURE.RDD]
// Feature is not supported in Spark Connect: Resilient Distributed Datasets (RDDs).
throw SparkConnectError.UnsupportedOperationException
}

/// Return a `JSON` string of data type because we cannot expose the internal type ``DataType``.
/// - Returns: a `JSON` string.
public func schema() async throws -> String {
var dataType: String? = nil

try await withGRPCClient(
transport: .http2NIOPosix(
target: .dns(host: spark.client.host, port: spark.client.port),
transportSecurity: .plaintext
)
) { client in
let service = Spark_Connect_SparkConnectService.Client(wrapping: client)
let response = try await service.analyzePlan(
spark.client.getAnalyzePlanRequest(spark.sessionID, plan))
dataType = try response.schema.schema.jsonString()
}
return dataType!
}

/// Return the total number of rows.
/// - Returns: a `Int64` value.
public func count() async throws -> Int64 {
let counter = Atomic(Int64(0))

try await withGRPCClient(
transport: .http2NIOPosix(
target: .dns(host: spark.client.host, port: spark.client.port),
transportSecurity: .plaintext
)
) { client in
let service = Spark_Connect_SparkConnectService.Client(wrapping: client)
try await service.executePlan(spark.client.getExecutePlanRequest(spark.sessionID, plan)) {
response in
for try await m in response.messages {
counter.add(m.arrowBatch.rowCount, ordering: .relaxed)
}
}
}
return counter.load(ordering: .relaxed)
}

/// Execute the plan and try to fill `schema` and `batches`.
private func execute() async throws {
try await withGRPCClient(
transport: .http2NIOPosix(
target: .dns(host: spark.client.host, port: spark.client.port),
transportSecurity: .plaintext
)
) { client in
let service = Spark_Connect_SparkConnectService.Client(wrapping: client)
try await service.executePlan(spark.client.getExecutePlanRequest(spark.sessionID, plan)) {
response in
for try await m in response.messages {
if m.hasSchema {
// The original schema should arrive before ArrowBatches
await self.setSchema(m.schema)
}
let ipcStreamBytes = m.arrowBatch.data
if !ipcStreamBytes.isEmpty && m.arrowBatch.rowCount > 0 {
let IPC_CONTINUATION_TOKEN = Int32(-1)
// Schema
assert(ipcStreamBytes[0..<4].int32 == IPC_CONTINUATION_TOKEN)
let schemaSize = Int64(ipcStreamBytes[4..<8].int32)
let schema = Data(ipcStreamBytes[8..<(8 + schemaSize)])

// Arrow IPC Data
assert(
ipcStreamBytes[(8 + schemaSize)..<(8 + schemaSize + 4)].int32
== IPC_CONTINUATION_TOKEN)
var pos: Int64 = 8 + schemaSize + 4
let dataHeaderSize = Int64(ipcStreamBytes[pos..<(pos + 4)].int32)
pos += 4
let dataHeader = Data(ipcStreamBytes[pos..<(pos + dataHeaderSize)])
pos += dataHeaderSize
let dataBodySize = Int64(ipcStreamBytes.count) - pos - 8
let dataBody = Data(ipcStreamBytes[pos..<(pos + dataBodySize)])

// Read ArrowBatches
let reader = ArrowReader()
let arrowResult = ArrowReader.makeArrowReaderResult()
_ = reader.fromMessage(schema, dataBody: Data(), result: arrowResult)
_ = reader.fromMessage(dataHeader, dataBody: dataBody, result: arrowResult)
await self.addBathes(arrowResult.batches)
}
}
}
}
}

/// This is designed not to support this feature in order to simplify the Swift client.
public func collect() async throws {
throw SparkConnectError.UnsupportedOperationException
}

/// Execute the plan and show the result.
public func show() async throws {
try await execute()

if let schema = self.schema {
var columns: [TextTableColumn] = []
for f in schema.struct.fields {
columns.append(TextTableColumn(header: f.name))
}
var table = TextTable(columns: columns)
for batch in self.batches {
for i in 0..<batch.length {
var values: [String] = []
for column in batch.columns {
let str = column.array as! AsString
if column.data.isNull(i) {
values.append("NULL")
} else {
values.append(str.asString(i))
}
}
table.addRow(values: values)
}
}
print(table.render())
}
}
}
179 changes: 179 additions & 0 deletions Sources/SparkConnect/SparkSession.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//

import Foundation
import GRPCCore
import GRPCNIOTransportHTTP2
import GRPCProtobuf
import Synchronization

/// The entry point to programming Spark with ``DataFrame`` API.
///
/// Use the builder to get a session.
///
/// ```swift
/// let spark = try await SparkSession.builder.getOrCreate()
/// ```
public actor SparkSession {

public static let builder: Builder = Builder()

let client: SparkConnectClient

/// Runtime configuration interface for Spark.
public let conf: RuntimeConf

/// Create a session that uses the specified connection string and userID.
/// - Parameters:
/// - connection: a string in a patter, `sc://{host}:{port}`
/// - userID: an optional user ID. If absent, `SPARK_USER` environment or ``ProcessInfo.processInfo.userName`` is used.
init(_ connection: String, _ userID: String? = nil) {
let processInfo = ProcessInfo.processInfo
let userName = processInfo.environment["SPARK_USER"] ?? processInfo.userName
self.client = SparkConnectClient(remote: connection, user: userID ?? userName)
self.conf = RuntimeConf(self.client)
}

/// The Spark version of Spark Connect Servier. This is supposed to be overwritten during establishing connections.
public var version: String = ""

func setVersion(_ version: String) {
self.version = version
}

/// A unique session ID for this session from client.
var sessionID: String = UUID().uuidString

/// Get the current session ID
/// - Returns: the current session ID
func getSessionID() -> String {
sessionID
}

/// A server-side generated session ID. This is supposed to be overwritten during establishing connections.
var serverSideSessionID: String = ""

/// A variable for ``SparkContext``. This is designed to throw exceptions by Apache Spark.
var sparkContext: SparkContext {
get throws {
// SQLSTATE: 0A000
// [UNSUPPORTED_CONNECT_FEATURE.SESSION_SPARK_CONTEXT]
// Feature is not supported in Spark Connect: Access to the SparkContext.
throw SparkConnectError.UnsupportedOperationException
}
}

/// Stop the current client.
public func stop() async {
await client.stop()
}

/// Create a ``DataFrame`` with a single ``Int64`` column name `id`, containing elements in a
/// range from 0 to `end` (exclusive) with step value 1.
///
/// - Parameter end: A value for the end of range.
/// - Returns: A ``DataFrame`` instance.
public func range(_ end: Int64) async throws -> DataFrame {
return try await range(0, end)
}

/// Create a ``DataFrame`` with a single ``Int64`` column named `id`, containing elements in a
/// range from `start` to `end` (exclusive) with a step value (default: 1).
///
/// - Parameters:
/// - start: A value for the start of range.
/// - end: A value for the end of range.
/// - step: A value for the step.
/// - Returns: A ``DataFrame`` instance.
public func range(_ start: Int64, _ end: Int64, _ step: Int64 = 1) async throws -> DataFrame {
return try await DataFrame(spark: self, plan: client.getPlanRange(start, end, step))
}

/// Create a ``DataFrame`` for the given SQL statement.
/// - Parameter sqlText: A SQL string.
/// - Returns: A ``DataFrame`` instance.
public func sql(_ sqlText: String) async throws -> DataFrame {
return try await DataFrame(spark: self, sqlText: sqlText)
}

/// This is defined as the return type of `SparkSession.sparkContext` method.
/// This is an empty `Struct` type because `sparkContext` method is designed to throw
/// `UNSUPPORTED_CONNECT_FEATURE.SESSION_SPARK_CONTEXT`.
struct SparkContext {
}

/// A builder to create ``SparkSession``
public actor Builder {
var sparkConf: [String: String] = [:]

/// Set a new configuration.
/// - Parameters:
/// - key: A string for the configuration key.
/// - value: A string for the configuration value.
/// - Returns: self
public func config(_ key: String, _ value: String) -> Builder {
sparkConf[key] = value
return self
}

/// Remove all stored configurations.
/// - Returns: self
func clear() -> Builder {
sparkConf.removeAll()
return self
}

/// Set a url for remote connection.
/// - Parameter url: A connection string in a pattern, `sc://{host}:{post}`.
/// - Returns: self
public func remote(_ url: String) -> Builder {
return config("spark.remote", url)
}

/// Set `appName` of this session.
/// - Parameter name: A string for application name
/// - Returns: self
public func appName(_ name: String) -> Builder {
return config("spark.app.name", name)
}

/// Enable `Apache Hive` metastore support configuration.
/// - Returns: self
func enableHiveSupport() -> Builder {
return config("spark.sql.catalogImplementation", "hive")
}

/// Create a new ``SparkSession``. If `spark.remote` is not given, `sc://localhost:15002` is used.
/// - Returns: A newly created `SparkSession`.
func create() async throws -> SparkSession {
let session = SparkSession(sparkConf["spark.remote"] ?? "sc://localhost:15002")
let response = try await session.client.connect(session.sessionID)
await session.setVersion(response.sparkVersion.version)
let isSuccess = try await session.client.setConf(map: sparkConf)
assert(isSuccess)
return session
}

/// Create a ``SparkSession`` from the given configurations.
/// - Returns: A spark session.
public func getOrCreate() async throws -> SparkSession {
return try await create()
}
}
}
Loading
Loading