diff --git a/Sources/SparkConnect/DataFrame.swift b/Sources/SparkConnect/DataFrame.swift index 75f555f..945651e 100644 --- a/Sources/SparkConnect/DataFrame.swift +++ b/Sources/SparkConnect/DataFrame.swift @@ -117,6 +117,7 @@ import Synchronization /// - ``transpose()`` /// - ``transpose(_:)`` /// - ``hint(_:_:)`` +/// - ``withWatermark(_:_:)`` /// /// ### Join Operations /// - ``join(_:)`` @@ -1444,6 +1445,19 @@ public actor DataFrame: Sendable { try await checkpoint(eager, false, storageLevel) } + /// Defines an event time watermark for this ``DataFrame``. A watermark tracks a point in time + /// before which we assume no more late data is going to arrive. + /// - Parameters: + /// - eventTime: the name of the column that contains the event time of the row. + /// - delayThreshold: the minimum delay to wait to data to arrive late, relative to + /// the latest record that has been processed in the form of an interval (e.g. "1 minute" or "5 hours"). + /// NOTE: This should not be negative. + /// - Returns: A ``DataFrame`` instance. + public func withWatermark(_ eventTime: String, _ delayThreshold: String) -> DataFrame { + let plan = SparkConnectClient.getWithWatermark(self.plan.root, eventTime, delayThreshold) + return DataFrame(spark: self.spark, plan: plan) + } + /// Returns a ``DataFrameWriter`` that can be used to write non-streaming data. public var write: DataFrameWriter { get { diff --git a/Sources/SparkConnect/SparkConnectClient.swift b/Sources/SparkConnect/SparkConnectClient.swift index c69888f..a5306dc 100644 --- a/Sources/SparkConnect/SparkConnectClient.swift +++ b/Sources/SparkConnect/SparkConnectClient.swift @@ -1022,6 +1022,23 @@ public actor SparkConnectClient { return plan } + static func getWithWatermark( + _ child: Relation, + _ eventTime: String, + _ delayThreshold: String + ) -> Plan { + var withWatermark = Spark_Connect_WithWatermark() + withWatermark.input = child + withWatermark.eventTime = eventTime + withWatermark.delayThreshold = delayThreshold + + var relation = Relation() + relation.withWatermark = withWatermark + var plan = Plan() + plan.opType = .root(relation) + return plan + } + func createTempView( _ child: Relation, _ viewName: String, replace: Bool, isGlobal: Bool ) async throws { diff --git a/Tests/SparkConnectTests/DataFrameTests.swift b/Tests/SparkConnectTests/DataFrameTests.swift index fd15496..09aab96 100644 --- a/Tests/SparkConnectTests/DataFrameTests.swift +++ b/Tests/SparkConnectTests/DataFrameTests.swift @@ -744,6 +744,22 @@ struct DataFrameTests { await spark.stop() } + @Test + func withWatermark() async throws { + let spark = try await SparkSession.builder.getOrCreate() + let df = try await spark + .sql(""" + SELECT * FROM VALUES + (1, now()), + (1, now() - INTERVAL 1 HOUR), + (1, now() - INTERVAL 2 HOUR) + T(data, eventTime) + """) + .withWatermark("eventTime", "1 minute") // This tests only API for now + #expect(try await df.dropDuplicatesWithinWatermark("data").count() == 1) + await spark.stop() + } + @Test func describe() async throws { let spark = try await SparkSession.builder.getOrCreate()