diff --git a/Tests/SparkConnectTests/IcebergTests.swift b/Tests/SparkConnectTests/IcebergTests.swift index ef3be51..0483f5c 100644 --- a/Tests/SparkConnectTests/IcebergTests.swift +++ b/Tests/SparkConnectTests/IcebergTests.swift @@ -18,9 +18,8 @@ // import Foundation -import Testing - import SparkConnect +import Testing /// A test suite for `Apache Iceberg` integration @Suite(.serialized) @@ -31,8 +30,10 @@ struct IcebergTests { @Test func test() async throws { guard icebergEnabled else { return } - let t1 = "\(ICEBERG_DATABASE).TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "") - let t2 = "\(ICEBERG_DATABASE).TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "") + let t1 = + "\(ICEBERG_DATABASE).TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "") + let t2 = + "\(ICEBERG_DATABASE).TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "") let spark = try await SparkSession.builder.getOrCreate() @@ -56,14 +57,103 @@ struct IcebergTests { try await spark.table(t1).writeTo(t2).append() #expect(try await spark.table(t2).count() == 6) + try await spark.sql("INSERT INTO \(t2) VALUES (1, 'a'), (2, 'b'), (3, 'c')").count() + #expect(try await spark.table(t2).count() == 9) + try await spark.table(t1).writeTo(t2).replace() #expect(try await spark.table(t2).count() == 3) + try await spark.sql("INSERT OVERWRITE \(t2) VALUES (1, 'a'), (2, 'b'), (3, 'c')").count() + #expect(try await spark.table(t2).count() == 3) + + try await spark.sql("DELETE FROM \(t2) WHERE id = 1").count() + #expect(try await spark.table(t2).count() == 2) + + try await spark.sql("UPDATE \(t2) SET data = 'new' WHERE id = 2").count() + #expect(try await spark.sql("SELECT * FROM \(t2) WHERE data = 'new'").count() == 1) + try await spark.table(t1).writeTo(t2).overwrite("true") #expect(try await spark.table(t2).count() == 3) try await spark.table(t1).writeTo(t2).overwrite("false") #expect(try await spark.table(t2).count() == 6) + + try await spark.sql("INSERT OVERWRITE \(t2) VALUES (1, 'a')").count() + #expect(try await spark.table(t2).count() == 1) + + try await spark.table(t1).writeTo(t2).overwrite("id = 1") + #expect(try await spark.table(t2).count() == 3) + }) + + await spark.stop() + } + + @Test + func partition() async throws { + guard icebergEnabled else { return } + let t1 = + "\(ICEBERG_DATABASE).TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "") + let t2 = + "\(ICEBERG_DATABASE).TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "") + + let spark = try await SparkSession.builder.getOrCreate() + + try await SQLHelper.withTable(spark, t1, t2)({ + try await spark.sql( + """ + CREATE TABLE \(t1) ( + id BIGINT, + data STRING, + category STRING) + USING ICEBERG + PARTITIONED BY (category) + """ + ).count() + try await spark.sql( + """ + CREATE TABLE \(t2) ( + id BIGINT, + data STRING, + category STRING) + USING ICEBERG + PARTITIONED BY (category) + """ + ).count() + + #expect(try await spark.catalog.tableExists(t1)) + #expect(try await spark.catalog.tableExists(t2)) + + #expect(try await spark.table(t1).count() == 0) + #expect(try await spark.table(t2).count() == 0) + + try await spark.sql("INSERT INTO \(t1) VALUES (1, 'a', 'A'), (2, 'b', 'B'), (3, 'c', 'C')") + .count() + #expect(try await spark.table(t1).count() == 3) + #expect(try await spark.table(t2).count() == 0) + + try await spark.table(t1).writeTo(t2).append() + #expect(try await spark.table(t2).count() == 3) + + try await spark.table(t1).writeTo(t2).replace() + #expect(try await spark.table(t2).count() == 3) + + try await spark.sql("DELETE FROM \(t2) WHERE id = 1").count() + #expect(try await spark.table(t2).count() == 2) + + try await spark.sql("UPDATE \(t2) SET data = 'new' WHERE id = 2").count() + #expect(try await spark.sql("SELECT * FROM \(t2) WHERE data = 'new'").count() == 1) + + try await spark.table(t1).writeTo(t2).overwritePartitions() + #expect(try await spark.table(t2).count() == 3) + + try await spark.sql("INSERT OVERWRITE \(t2) SELECT * FROM \(t1)").count() + #expect(try await spark.table(t2).count() == 3) + + try await spark.sql("INSERT OVERWRITE \(t2) SELECT * FROM \(t1) WHERE category = 'C'").count() + #expect(try await spark.table(t2).count() == 1) + + try await spark.table(t1).writeTo(t2).overwrite("category = 'C'") + #expect(try await spark.table(t2).count() == 3) }) await spark.stop()