-
Notifications
You must be signed in to change notification settings - Fork 5
[SPARK-51483] Add SparkSession
and DataFrame
actors
#10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from 1 commit
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,195 @@ | ||
// | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
// | ||
import Atomics | ||
import Foundation | ||
import GRPCCore | ||
import GRPCNIOTransportHTTP2 | ||
import GRPCProtobuf | ||
import NIOCore | ||
import SwiftyTextTable | ||
import Synchronization | ||
|
||
/// A DataFrame which supports only SQL queries | ||
public actor DataFrame: Sendable { | ||
var spark: SparkSession | ||
var plan: Plan | ||
var schema: DataType? = nil | ||
private var batches: [RecordBatch] = [RecordBatch]() | ||
|
||
/// Create a new `DataFrame`instance with the given Spark session and plan. | ||
/// - Parameters: | ||
/// - spark: A ``SparkSession`` instance to use. | ||
/// - plan: A plan to execute. | ||
init(spark: SparkSession, plan: Plan) async throws { | ||
self.spark = spark | ||
self.plan = plan | ||
} | ||
|
||
/// Create a new `DataFrame` instance with the given SparkSession and a SQL statement. | ||
/// - Parameters: | ||
/// - spark: A `SparkSession` instance to use. | ||
/// - sqlText: A SQL statement. | ||
init(spark: SparkSession, sqlText: String) async throws { | ||
self.spark = spark | ||
self.plan = sqlText.toSparkConnectPlan | ||
} | ||
|
||
/// Set the schema. This is used to store the analized schema response from `Spark Connect` server. | ||
/// - Parameter schema: <#schema description#> | ||
private func setSchema(_ schema: DataType) { | ||
self.schema = schema | ||
} | ||
|
||
/// Add `Apache Arrow`'s `RecordBatch`s to the intenal array. | ||
/// - Parameter batches: A ``RecordBatch`` instance. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't it be an array? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oops. typo. Let me fix it. |
||
private func addBathes(_ batches: [RecordBatch]) { | ||
self.batches.append(contentsOf: batches) | ||
} | ||
|
||
/// A method to access the underlying Spark's `RDD`. | ||
/// In `Spark Connect`, this feature is not allowed by design. | ||
public func rdd() throws { | ||
// SQLSTATE: 0A000 | ||
// [UNSUPPORTED_CONNECT_FEATURE.RDD] | ||
// Feature is not supported in Spark Connect: Resilient Distributed Datasets (RDDs). | ||
throw SparkConnectError.UnsupportedOperationException | ||
} | ||
|
||
/// Return a `JSON` string of data type because we cannot expose the internal type ``DataType``. | ||
/// - Returns: a `JSON` string. | ||
public func schema() async throws -> String { | ||
var dataType: String? = nil | ||
|
||
try await withGRPCClient( | ||
transport: .http2NIOPosix( | ||
target: .dns(host: spark.client.host, port: spark.client.port), | ||
transportSecurity: .plaintext | ||
) | ||
) { client in | ||
let service = Spark_Connect_SparkConnectService.Client(wrapping: client) | ||
let response = try await service.analyzePlan( | ||
spark.client.getAnalyzePlanRequest(spark.sessionID, plan)) | ||
dataType = try response.schema.schema.jsonString() | ||
} | ||
return dataType! | ||
} | ||
|
||
/// Return the total number of rows. | ||
/// - Returns: a `Int64` value. | ||
public func count() async throws -> Int64 { | ||
let counter = Atomic(Int64(0)) | ||
|
||
try await withGRPCClient( | ||
transport: .http2NIOPosix( | ||
target: .dns(host: spark.client.host, port: spark.client.port), | ||
transportSecurity: .plaintext | ||
) | ||
) { client in | ||
let service = Spark_Connect_SparkConnectService.Client(wrapping: client) | ||
try await service.executePlan(spark.client.getExecutePlanRequest(spark.sessionID, plan)) { | ||
response in | ||
for try await m in response.messages { | ||
counter.add(m.arrowBatch.rowCount, ordering: .relaxed) | ||
} | ||
} | ||
} | ||
return counter.load(ordering: .relaxed) | ||
} | ||
|
||
/// Execute the plan and try to fill `schema` and `batches`. | ||
private func execute() async throws { | ||
try await withGRPCClient( | ||
transport: .http2NIOPosix( | ||
target: .dns(host: spark.client.host, port: spark.client.port), | ||
transportSecurity: .plaintext | ||
) | ||
) { client in | ||
let service = Spark_Connect_SparkConnectService.Client(wrapping: client) | ||
try await service.executePlan(spark.client.getExecutePlanRequest(spark.sessionID, plan)) { | ||
response in | ||
for try await m in response.messages { | ||
if m.hasSchema { | ||
// The original schema should arrive before ArrowBatches | ||
await self.setSchema(m.schema) | ||
} | ||
let ipcStreamBytes = m.arrowBatch.data | ||
if !ipcStreamBytes.isEmpty && m.arrowBatch.rowCount > 0 { | ||
let IPC_CONTINUATION_TOKEN = Int32(-1) | ||
// Schema | ||
assert(ipcStreamBytes[0..<4].int32 == IPC_CONTINUATION_TOKEN) | ||
let schemaSize = Int64(ipcStreamBytes[4..<8].int32) | ||
let schema = Data(ipcStreamBytes[8..<(8 + schemaSize)]) | ||
|
||
// Arrow IPC Data | ||
assert( | ||
ipcStreamBytes[(8 + schemaSize)..<(8 + schemaSize + 4)].int32 | ||
== IPC_CONTINUATION_TOKEN) | ||
var pos: Int64 = 8 + schemaSize + 4 | ||
let dataHeaderSize = Int64(ipcStreamBytes[pos..<(pos + 4)].int32) | ||
pos += 4 | ||
let dataHeader = Data(ipcStreamBytes[pos..<(pos + dataHeaderSize)]) | ||
pos += dataHeaderSize | ||
let dataBodySize = Int64(ipcStreamBytes.count) - pos - 8 | ||
let dataBody = Data(ipcStreamBytes[pos..<(pos + dataBodySize)]) | ||
|
||
// Read ArrowBatches | ||
let reader = ArrowReader() | ||
let arrowResult = ArrowReader.makeArrowReaderResult() | ||
_ = reader.fromMessage(schema, dataBody: Data(), result: arrowResult) | ||
_ = reader.fromMessage(dataHeader, dataBody: dataBody, result: arrowResult) | ||
await self.addBathes(arrowResult.batches) | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
/// This is designed not to support this feature in order to simplify the Swift client. | ||
public func collect() async throws { | ||
throw SparkConnectError.UnsupportedOperationException | ||
} | ||
|
||
/// Execute the plan and show the result. | ||
public func show() async throws { | ||
try await execute() | ||
|
||
if let schema = self.schema { | ||
var columns: [TextTableColumn] = [] | ||
for f in schema.struct.fields { | ||
columns.append(TextTableColumn(header: f.name)) | ||
} | ||
var table = TextTable(columns: columns) | ||
for batch in self.batches { | ||
for i in 0..<batch.length { | ||
var values: [String] = [] | ||
for column in batch.columns { | ||
let str = column.array as! AsString | ||
if column.data.isNull(i) { | ||
values.append("NULL") | ||
} else { | ||
values.append(str.asString(i)) | ||
} | ||
} | ||
table.addRow(values: values) | ||
} | ||
} | ||
print(table.render()) | ||
} | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,179 @@ | ||
// | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
// | ||
|
||
import Foundation | ||
import GRPCCore | ||
import GRPCNIOTransportHTTP2 | ||
import GRPCProtobuf | ||
import Synchronization | ||
|
||
/// The entry point to programming Spark with ``DataFrame`` API. | ||
/// | ||
/// Use the builder to get a session. | ||
/// | ||
/// ```swift | ||
/// let spark = try await SparkSession.builder.getOrCreate() | ||
/// ``` | ||
public actor SparkSession { | ||
|
||
public static let builder: Builder = Builder() | ||
|
||
let client: SparkConnectClient | ||
|
||
/// Runtime configuration interface for Spark. | ||
public let conf: RuntimeConf | ||
|
||
/// Create a session that uses the specified connection string and userID. | ||
/// - Parameters: | ||
/// - connection: a string in a patter, `sc://{host}:{port}` | ||
/// - userID: an optional user ID. If absent, `SPARK_USER` environment or ``ProcessInfo.processInfo.userName`` is used. | ||
init(_ connection: String, _ userID: String? = nil) { | ||
let processInfo = ProcessInfo.processInfo | ||
let userName = processInfo.environment["SPARK_USER"] ?? processInfo.userName | ||
self.client = SparkConnectClient(remote: connection, user: userID ?? userName) | ||
self.conf = RuntimeConf(self.client) | ||
} | ||
|
||
/// The Spark version of Spark Connect Servier. This is supposed to be overwritten during establishing connections. | ||
public var version: String = "" | ||
|
||
func setVersion(_ version: String) { | ||
self.version = version | ||
} | ||
|
||
/// A unique session ID for this session from client. | ||
var sessionID: String = UUID().uuidString | ||
|
||
/// Get the current session ID | ||
/// - Returns: the current session ID | ||
func getSessionID() -> String { | ||
sessionID | ||
} | ||
|
||
/// A server-side generated session ID. This is supposed to be overwritten during establishing connections. | ||
var serverSideSessionID: String = "" | ||
|
||
/// A variable for ``SparkContext``. This is designed to throw exceptions by Apache Spark. | ||
var sparkContext: SparkContext { | ||
get throws { | ||
// SQLSTATE: 0A000 | ||
// [UNSUPPORTED_CONNECT_FEATURE.SESSION_SPARK_CONTEXT] | ||
// Feature is not supported in Spark Connect: Access to the SparkContext. | ||
throw SparkConnectError.UnsupportedOperationException | ||
} | ||
} | ||
|
||
/// Stop the current client. | ||
public func stop() async { | ||
await client.stop() | ||
} | ||
|
||
/// Create a ``DataFrame`` with a single ``Int64`` column name `id`, containing elements in a | ||
/// range from 0 to `end` (exclusive) with step value 1. | ||
/// | ||
/// - Parameter end: A value for the end of range. | ||
/// - Returns: A ``DataFrame`` instance. | ||
public func range(_ end: Int64) async throws -> DataFrame { | ||
return try await range(0, end) | ||
} | ||
|
||
/// Create a ``DataFrame`` with a single ``Int64`` column named `id`, containing elements in a | ||
/// range from `start` to `end` (exclusive) with a step value (default: 1). | ||
/// | ||
/// - Parameters: | ||
/// - start: A value for the start of range. | ||
/// - end: A value for the end of range. | ||
/// - step: A value for the step. | ||
/// - Returns: A ``DataFrame`` instance. | ||
public func range(_ start: Int64, _ end: Int64, _ step: Int64 = 1) async throws -> DataFrame { | ||
return try await DataFrame(spark: self, plan: client.getPlanRange(start, end, step)) | ||
} | ||
|
||
/// Create a ``DataFrame`` for the given SQL statement. | ||
/// - Parameter sqlText: A SQL string. | ||
/// - Returns: A ``DataFrame`` instance. | ||
public func sql(_ sqlText: String) async throws -> DataFrame { | ||
return try await DataFrame(spark: self, sqlText: sqlText) | ||
} | ||
|
||
/// This is defined as the return type of `SparkSession.sparkContext` method. | ||
/// This is an empty `Struct` type because `sparkContext` method is designed to throw | ||
/// `UNSUPPORTED_CONNECT_FEATURE.SESSION_SPARK_CONTEXT`. | ||
struct SparkContext { | ||
} | ||
|
||
/// A builder to create ``SparkSession`` | ||
public actor Builder { | ||
var sparkConf: [String: String] = [:] | ||
|
||
/// Set a new configuration. | ||
/// - Parameters: | ||
/// - key: A string for the configuration key. | ||
/// - value: A string for the configuration value. | ||
/// - Returns: self | ||
public func config(_ key: String, _ value: String) -> Builder { | ||
sparkConf[key] = value | ||
return self | ||
} | ||
|
||
/// Remove all stored configurations. | ||
/// - Returns: self | ||
func clear() -> Builder { | ||
sparkConf.removeAll() | ||
return self | ||
} | ||
|
||
/// Set a url for remote connection. | ||
/// - Parameter url: A connection string in a pattern, `sc://{host}:{post}`. | ||
/// - Returns: self | ||
public func remote(_ url: String) -> Builder { | ||
return config("spark.remote", url) | ||
} | ||
|
||
/// Set `appName` of this session. | ||
/// - Parameter name: A string for application name | ||
/// - Returns: self | ||
public func appName(_ name: String) -> Builder { | ||
return config("spark.app.name", name) | ||
} | ||
|
||
/// Enable `Apache Hive` metastore support configuration. | ||
/// - Returns: self | ||
func enableHiveSupport() -> Builder { | ||
return config("spark.sql.catalogImplementation", "hive") | ||
} | ||
|
||
/// Create a new ``SparkSession``. If `spark.remote` is not given, `sc://localhost:15002` is used. | ||
/// - Returns: A newly created `SparkSession`. | ||
func create() async throws -> SparkSession { | ||
let session = SparkSession(sparkConf["spark.remote"] ?? "sc://localhost:15002") | ||
let response = try await session.client.connect(session.sessionID) | ||
await session.setVersion(response.sparkVersion.version) | ||
let isSuccess = try await session.client.setConf(map: sparkConf) | ||
assert(isSuccess) | ||
return session | ||
} | ||
|
||
/// Create a ``SparkSession`` from the given configurations. | ||
/// - Returns: A spark session. | ||
public func getOrCreate() async throws -> SparkSession { | ||
return try await create() | ||
} | ||
} | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.