From 40de8fc3479a493bd7c9dbbacead868c6dd3238d Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 17 Jun 2025 20:55:49 -0700 Subject: [PATCH] [SPARK-52523] Update `arrow-swift` code for Timestamp --- Sources/SparkConnect/ArrowArray.swift | 82 ++++++++++++++++++++ Sources/SparkConnect/ArrowArrayBuilder.swift | 17 ++++ Sources/SparkConnect/ArrowReaderHelper.swift | 39 +++++++++- Sources/SparkConnect/ArrowType.swift | 72 +++++++++++++++++ Sources/SparkConnect/ArrowWriterHelper.swift | 28 +++++++ Sources/SparkConnect/ProtoUtil.swift | 16 ++++ 6 files changed, 253 insertions(+), 1 deletion(-) diff --git a/Sources/SparkConnect/ArrowArray.swift b/Sources/SparkConnect/ArrowArray.swift index cc348ed..61a4e49 100644 --- a/Sources/SparkConnect/ArrowArray.swift +++ b/Sources/SparkConnect/ArrowArray.swift @@ -111,6 +111,8 @@ public class ArrowArrayHolderImpl: ArrowArrayHolder { return try ArrowArrayHolderImpl(Time32Array(with)) case .time64: return try ArrowArrayHolderImpl(Time64Array(with)) + case .timestamp: + return try ArrowArrayHolderImpl(TimestampArray(with)) case .string: return try ArrowArrayHolderImpl(StringArray(with)) case .boolean: @@ -269,6 +271,86 @@ public class Decimal128Array: FixedArray { } } +public class TimestampArray: FixedArray { + + public struct FormattingOptions: Equatable { + public var dateFormat: String = "yyyy-MM-dd HH:mm:ss.SSS" + public var locale: Locale = .current + public var includeTimezone: Bool = true + public var fallbackToRaw: Bool = true + + public init( + dateFormat: String = "yyyy-MM-dd HH:mm:ss.SSS", + locale: Locale = .current, + includeTimezone: Bool = true, + fallbackToRaw: Bool = true + ) { + self.dateFormat = dateFormat + self.locale = locale + self.includeTimezone = includeTimezone + self.fallbackToRaw = fallbackToRaw + } + + public static func == (lhs: FormattingOptions, rhs: FormattingOptions) -> Bool { + return lhs.dateFormat == rhs.dateFormat && lhs.locale.identifier == rhs.locale.identifier + && lhs.includeTimezone == rhs.includeTimezone && lhs.fallbackToRaw == rhs.fallbackToRaw + } + } + + private var cachedFormatter: DateFormatter? + private var cachedOptions: FormattingOptions? + + public func formattedDate(at index: UInt, options: FormattingOptions = FormattingOptions()) + -> String? + { + guard let timestamp = self[index] else { return nil } + + guard let timestampType = self.arrowData.type as? ArrowTypeTimestamp else { + return options.fallbackToRaw ? "\(timestamp)" : nil + } + + let date = dateFromTimestamp(timestamp, unit: timestampType.unit) + + if cachedFormatter == nil || cachedOptions != options { + let formatter = DateFormatter() + formatter.dateFormat = options.dateFormat + formatter.locale = options.locale + if options.includeTimezone, let timezone = timestampType.timezone { + formatter.timeZone = TimeZone(identifier: timezone) + } + cachedFormatter = formatter + cachedOptions = options + } + + return cachedFormatter?.string(from: date) + } + + private func dateFromTimestamp(_ timestamp: Int64, unit: ArrowTimestampUnit) -> Date { + let timeInterval: TimeInterval + + switch unit { + case .seconds: + timeInterval = TimeInterval(timestamp) + case .milliseconds: + timeInterval = TimeInterval(timestamp) / 1_000 + case .microseconds: + timeInterval = TimeInterval(timestamp) / 1_000_000 + case .nanoseconds: + timeInterval = TimeInterval(timestamp) / 1_000_000_000 + } + + return Date(timeIntervalSince1970: timeInterval) + } + + public override func asString(_ index: UInt) -> String { + if let formatted = formattedDate(at: index) { + return formatted + } + + return super.asString(index) + } +} + /// @nodoc public class BinaryArray: ArrowArray { public struct Options { diff --git a/Sources/SparkConnect/ArrowArrayBuilder.swift b/Sources/SparkConnect/ArrowArrayBuilder.swift index da7074c..2b977c1 100644 --- a/Sources/SparkConnect/ArrowArrayBuilder.swift +++ b/Sources/SparkConnect/ArrowArrayBuilder.swift @@ -129,6 +129,12 @@ public class Decimal128ArrayBuilder: ArrowArrayBuilder, TimestampArray> { + fileprivate convenience init(_ unit: ArrowTimestampUnit, timezone: String? = nil) throws { + try self.init(ArrowTypeTimestamp(unit, timezone: timezone)) + } +} + public class StructArrayBuilder: ArrowArrayBuilder { let builders: [any ArrowArrayHolderBuilder] let fields: [ArrowField] @@ -293,6 +299,11 @@ public class ArrowArrayBuilders { throw ArrowError.invalid("Expected ArrowTypeDecimal128 for decimal128 type") } return try Decimal128ArrayBuilder(precision: decimalType.precision, scale: decimalType.scale) + case .timestamp: + guard let timestampType = arrowType as? ArrowTypeTimestamp else { + throw ArrowError.invalid("Expected arrow type for \(arrowType.id) not found") + } + return try TimestampArrayBuilder(timestampType.unit) default: throw ArrowError.unknownType("Builder not found for arrow type: \(arrowType.id)") } @@ -355,6 +366,12 @@ public class ArrowArrayBuilders { return try Time64ArrayBuilder(unit) } + public static func loadTimestampArrayBuilder(_ unit: ArrowTimestampUnit, timezone: String? = nil) + throws -> TimestampArrayBuilder + { + return try TimestampArrayBuilder(unit, timezone: timezone) + } + public static func loadDecimal128ArrayBuilder( _ precision: Int32 = 38, _ scale: Int32 = 18 diff --git a/Sources/SparkConnect/ArrowReaderHelper.swift b/Sources/SparkConnect/ArrowReaderHelper.swift index c955d16..6ccca2f 100644 --- a/Sources/SparkConnect/ArrowReaderHelper.swift +++ b/Sources/SparkConnect/ArrowReaderHelper.swift @@ -112,6 +112,25 @@ private func makeTimeHolder( } } +private func makeTimestampHolder( + _ field: ArrowField, + buffers: [ArrowBuffer], + nullCount: UInt +) -> Result { + do { + if let arrowType = field.type as? ArrowTypeTimestamp { + let arrowData = try ArrowData(arrowType, buffers: buffers, nullCount: nullCount) + return .success(ArrowArrayHolderImpl(try TimestampArray(arrowData))) + } else { + return .failure(.invalid("Incorrect field type for timestamp: \(field.type)")) + } + } catch let error as ArrowError { + return .failure(error) + } catch { + return .failure(.unknownError("\(error)")) + } +} + private func makeBoolHolder( _ buffers: [ArrowBuffer], nullCount: UInt @@ -214,6 +233,8 @@ func makeArrayHolder( // swiftlint:disable:this cyclomatic_complexity return makeDateHolder(field, buffers: buffers, nullCount: nullCount) case .time32, .time64: return makeTimeHolder(field, buffers: buffers, nullCount: nullCount) + case .timestamp: + return makeTimestampHolder(field, buffers: buffers, nullCount: nullCount) case .strct: return makeStructHolder( field, buffers: buffers, nullCount: nullCount, children: children!, rbLength: rbLength) @@ -234,7 +255,7 @@ func makeBuffer( func isFixedPrimitive(_ type: org_apache_arrow_flatbuf_Type_) -> Bool { switch type { - case .int, .bool, .floatingpoint, .date, .time, .decimal: + case .int, .bool, .floatingpoint, .date, .time, .timestamp, .decimal: return true default: return false @@ -307,6 +328,22 @@ func findArrowType( // swiftlint:disable:this cyclomatic_complexity function_bo } return ArrowTypeTime64(timeType.unit == .microsecond ? .microseconds : .nanoseconds) + case .timestamp: + let timestampType = field.type(type: org_apache_arrow_flatbuf_Timestamp.self)! + let arrowUnit: ArrowTimestampUnit + switch timestampType.unit { + case .second: + arrowUnit = .seconds + case .millisecond: + arrowUnit = .milliseconds + case .microsecond: + arrowUnit = .microseconds + case .nanosecond: + arrowUnit = .nanoseconds + } + + let timezone = timestampType.timezone + return ArrowTypeTimestamp(arrowUnit, timezone: timezone) case .struct_: _ = field.type(type: org_apache_arrow_flatbuf_Struct_.self)! var fields = [ArrowField]() diff --git a/Sources/SparkConnect/ArrowType.swift b/Sources/SparkConnect/ArrowType.swift index a617b3a..d7b773e 100644 --- a/Sources/SparkConnect/ArrowType.swift +++ b/Sources/SparkConnect/ArrowType.swift @@ -25,6 +25,7 @@ public typealias Time64 = Int64 public typealias Date32 = Int32 /// @nodoc public typealias Date64 = Int64 +public typealias Timestamp = Int64 func FlatBuffersVersion_23_1_4() { // swiftlint:disable:this identifier_name } @@ -70,6 +71,7 @@ public enum ArrowTypeId: Sendable, Equatable { case strct case time32 case time64 + case timestamp case time case uint16 case uint32 @@ -146,6 +148,47 @@ public class ArrowTypeDecimal128: ArrowType { } } +public enum ArrowTimestampUnit { + case seconds + case milliseconds + case microseconds + case nanoseconds +} + +public class ArrowTypeTimestamp: ArrowType { + let unit: ArrowTimestampUnit + let timezone: String? + + public init(_ unit: ArrowTimestampUnit, timezone: String? = nil) { + self.unit = unit + self.timezone = timezone + + super.init(ArrowType.ArrowTimestamp) + } + + public convenience init(type: ArrowTypeId) { + self.init(.milliseconds, timezone: nil) + } + + public override var cDataFormatId: String { + get throws { + let unitChar: String + switch self.unit { + case .seconds: unitChar = "s" + case .milliseconds: unitChar = "m" + case .microseconds: unitChar = "u" + case .nanoseconds: unitChar = "n" + } + + if let timezone = self.timezone { + return "ts\(unitChar):\(timezone)" + } else { + return "ts\(unitChar)" + } + } + } +} + /// @nodoc public class ArrowNestedType: ArrowType { let fields: [ArrowField] @@ -177,6 +220,7 @@ public class ArrowType { public static let ArrowBinary = Info.variableInfo(ArrowTypeId.binary) public static let ArrowTime32 = Info.timeInfo(ArrowTypeId.time32) public static let ArrowTime64 = Info.timeInfo(ArrowTypeId.time64) + public static let ArrowTimestamp = Info.timeInfo(ArrowTypeId.timestamp) public static let ArrowStruct = Info.complexInfo(ArrowTypeId.strct) public init(_ info: ArrowType.Info) { @@ -305,6 +349,8 @@ public class ArrowType { return MemoryLayout.stride case .time64: return MemoryLayout.stride + case .timestamp: + return MemoryLayout.stride case .binary: return MemoryLayout.stride case .string: @@ -357,6 +403,11 @@ public class ArrowType { return try time64.cDataFormatId } return "ttu" + case ArrowTypeId.timestamp: + if let timestamp = self as? ArrowTypeTimestamp { + return try timestamp.cDataFormatId + } + return "tsu" case ArrowTypeId.binary: return "z" case ArrowTypeId.string: @@ -409,6 +460,27 @@ public class ArrowType { return ArrowTypeTime64(.microseconds) } else if from == "ttn" { return ArrowTypeTime64(.nanoseconds) + } else if from.starts(with: "ts") { + let components = from.split(separator: ":", maxSplits: 1) + guard let unitPart = components.first, unitPart.count == 3 else { + throw ArrowError.invalid( + "Invalid timestamp format '\(from)'. Expected format 'ts[s|m|u|n][:timezone]'") + } + + let unitChar = unitPart.suffix(1) + let unit: ArrowTimestampUnit + switch unitChar { + case "s": unit = .seconds + case "m": unit = .milliseconds + case "u": unit = .microseconds + case "n": unit = .nanoseconds + default: + throw ArrowError.invalid( + "Unrecognized timestamp unit '\(unitChar)'. Expected 's', 'm', 'u', or 'n'.") + } + + let timezone = components.count > 1 ? String(components[1]) : nil + return ArrowTypeTimestamp(unit, timezone: timezone) } else if from == "z" { return ArrowType(ArrowType.ArrowBinary) } else if from == "u" { diff --git a/Sources/SparkConnect/ArrowWriterHelper.swift b/Sources/SparkConnect/ArrowWriterHelper.swift index 13856af..bcebe0f 100644 --- a/Sources/SparkConnect/ArrowWriterHelper.swift +++ b/Sources/SparkConnect/ArrowWriterHelper.swift @@ -41,6 +41,8 @@ func toFBTypeEnum(_ arrowType: ArrowType) -> Result