From b828523d1547122106eeddced080fdfefcb8a611 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Thu, 15 May 2025 15:40:16 -0700 Subject: [PATCH] [SPARK-52168] Support `to` for `DataFrame` --- Sources/SparkConnect/DataFrame.swift | 14 +++++++++++++ Sources/SparkConnect/SparkConnectClient.swift | 11 ++++++++++ Tests/SparkConnectTests/DataFrameTests.swift | 20 +++++++++++++++++++ 3 files changed, 45 insertions(+) diff --git a/Sources/SparkConnect/DataFrame.swift b/Sources/SparkConnect/DataFrame.swift index f5acc02..90c72b2 100644 --- a/Sources/SparkConnect/DataFrame.swift +++ b/Sources/SparkConnect/DataFrame.swift @@ -94,6 +94,7 @@ import Synchronization /// - ``show(_:_:_:)`` /// /// ### Transformation Operations +/// - ``to(_:)`` /// - ``toDF(_:)`` /// - ``toJSON()`` /// - ``select(_:)`` @@ -478,6 +479,19 @@ public actor DataFrame: Sendable { return df } + /// Returns a new DataFrame where each row is reconciled to match the specified schema. + /// - Parameter schema: The given schema. + /// - Returns: A ``DataFrame`` with the given schema. + public func to(_ schema: String) async throws -> DataFrame { + // Validate by parsing. + do { + let dataType = try await sparkSession.client.ddlParse(schema) + return DataFrame(spark: self.spark, plan: SparkConnectClient.getToSchema(self.plan.root, dataType)) + } catch { + throw SparkConnectError.InvalidTypeException + } + } + /// Returns the content of the Dataset as a Dataset of JSON strings. /// - Returns: A ``DataFrame`` with a single string column whose content is JSON. public func toJSON() -> DataFrame { diff --git a/Sources/SparkConnect/SparkConnectClient.swift b/Sources/SparkConnect/SparkConnectClient.swift index 5c48c55..ab08721 100644 --- a/Sources/SparkConnect/SparkConnectClient.swift +++ b/Sources/SparkConnect/SparkConnectClient.swift @@ -505,6 +505,17 @@ public actor SparkConnectClient { return plan } + static func getToSchema(_ child: Relation, _ schema: Spark_Connect_DataType) -> Plan { + var toSchema = Spark_Connect_ToSchema() + toSchema.input = child + toSchema.schema = schema + var relation = Relation() + relation.toSchema = toSchema + var plan = Plan() + plan.opType = .root(relation) + return plan + } + static func getProjectExprs(_ child: Relation, _ exprs: [String]) -> Plan { var project = Project() project.input = child diff --git a/Tests/SparkConnectTests/DataFrameTests.swift b/Tests/SparkConnectTests/DataFrameTests.swift index 7b6c4c2..3df06e1 100644 --- a/Tests/SparkConnectTests/DataFrameTests.swift +++ b/Tests/SparkConnectTests/DataFrameTests.swift @@ -201,6 +201,26 @@ struct DataFrameTests { await spark.stop() } + @Test + func to() async throws { + let spark = try await SparkSession.builder.getOrCreate() + + let schema1 = try await spark.range(1).to("shortID SHORT").schema + #expect( + schema1 + == #"{"struct":{"fields":[{"name":"shortID","dataType":{"short":{}},"nullable":true}]}}"# + ) + + let schema2 = try await spark.sql("SELECT '1'").to("id INT").schema + print(schema2) + #expect( + schema2 + == #"{"struct":{"fields":[{"name":"id","dataType":{"integer":{}},"nullable":true}]}}"# + ) + + await spark.stop() + } + @Test func selectMultipleColumns() async throws { let spark = try await SparkSession.builder.getOrCreate()