diff --git a/Sources/SparkConnect/DataFrame.swift b/Sources/SparkConnect/DataFrame.swift index 7ab4fc6..81b74b1 100644 --- a/Sources/SparkConnect/DataFrame.swift +++ b/Sources/SparkConnect/DataFrame.swift @@ -58,7 +58,7 @@ public actor DataFrame: Sendable { /// Add `Apache Arrow`'s `RecordBatch`s to the internal array. /// - Parameter batches: An array of ``RecordBatch``. - private func addBathes(_ batches: [RecordBatch]) { + private func addBatches(_ batches: [RecordBatch]) { self.batches.append(contentsOf: batches) } @@ -153,16 +153,35 @@ public actor DataFrame: Sendable { let arrowResult = ArrowReader.makeArrowReaderResult() _ = reader.fromMessage(schema, dataBody: Data(), result: arrowResult) _ = reader.fromMessage(dataHeader, dataBody: dataBody, result: arrowResult) - await self.addBathes(arrowResult.batches) + await self.addBatches(arrowResult.batches) } } } } } - /// This is designed not to support this feature in order to simplify the Swift client. - public func collect() async throws { - throw SparkConnectError.UnsupportedOperationException + /// Execute the plan and return the result as ``[[String?]]``. + /// - Returns: ``[[String?]]`` + public func collect() async throws -> [[String?]] { + try await execute() + + var result: [[String?]] = [] + for batch in self.batches { + for i in 0..