Skip to content

[SPARK-52162] Add DELETE/UPDATE and partitioned table tests #153

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 94 additions & 4 deletions Tests/SparkConnectTests/IcebergTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
//

import Foundation
import Testing

import SparkConnect
import Testing

/// A test suite for `Apache Iceberg` integration
@Suite(.serialized)
Expand All @@ -31,8 +30,10 @@ struct IcebergTests {
@Test
func test() async throws {
guard icebergEnabled else { return }
let t1 = "\(ICEBERG_DATABASE).TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "")
let t2 = "\(ICEBERG_DATABASE).TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "")
let t1 =
"\(ICEBERG_DATABASE).TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "")
let t2 =
"\(ICEBERG_DATABASE).TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "")

let spark = try await SparkSession.builder.getOrCreate()

Expand All @@ -56,14 +57,103 @@ struct IcebergTests {
try await spark.table(t1).writeTo(t2).append()
#expect(try await spark.table(t2).count() == 6)

try await spark.sql("INSERT INTO \(t2) VALUES (1, 'a'), (2, 'b'), (3, 'c')").count()
#expect(try await spark.table(t2).count() == 9)

try await spark.table(t1).writeTo(t2).replace()
#expect(try await spark.table(t2).count() == 3)

try await spark.sql("INSERT OVERWRITE \(t2) VALUES (1, 'a'), (2, 'b'), (3, 'c')").count()
#expect(try await spark.table(t2).count() == 3)

try await spark.sql("DELETE FROM \(t2) WHERE id = 1").count()
#expect(try await spark.table(t2).count() == 2)

try await spark.sql("UPDATE \(t2) SET data = 'new' WHERE id = 2").count()
#expect(try await spark.sql("SELECT * FROM \(t2) WHERE data = 'new'").count() == 1)

try await spark.table(t1).writeTo(t2).overwrite("true")
#expect(try await spark.table(t2).count() == 3)

try await spark.table(t1).writeTo(t2).overwrite("false")
#expect(try await spark.table(t2).count() == 6)

try await spark.sql("INSERT OVERWRITE \(t2) VALUES (1, 'a')").count()
#expect(try await spark.table(t2).count() == 1)

try await spark.table(t1).writeTo(t2).overwrite("id = 1")
#expect(try await spark.table(t2).count() == 3)
})

await spark.stop()
}

@Test
func partition() async throws {
guard icebergEnabled else { return }
let t1 =
"\(ICEBERG_DATABASE).TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "")
let t2 =
"\(ICEBERG_DATABASE).TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "")

let spark = try await SparkSession.builder.getOrCreate()

try await SQLHelper.withTable(spark, t1, t2)({
try await spark.sql(
"""
CREATE TABLE \(t1) (
id BIGINT,
data STRING,
category STRING)
USING ICEBERG
PARTITIONED BY (category)
"""
).count()
try await spark.sql(
"""
CREATE TABLE \(t2) (
id BIGINT,
data STRING,
category STRING)
USING ICEBERG
PARTITIONED BY (category)
"""
).count()

#expect(try await spark.catalog.tableExists(t1))
#expect(try await spark.catalog.tableExists(t2))

#expect(try await spark.table(t1).count() == 0)
#expect(try await spark.table(t2).count() == 0)

try await spark.sql("INSERT INTO \(t1) VALUES (1, 'a', 'A'), (2, 'b', 'B'), (3, 'c', 'C')")
.count()
#expect(try await spark.table(t1).count() == 3)
#expect(try await spark.table(t2).count() == 0)

try await spark.table(t1).writeTo(t2).append()
#expect(try await spark.table(t2).count() == 3)

try await spark.table(t1).writeTo(t2).replace()
#expect(try await spark.table(t2).count() == 3)

try await spark.sql("DELETE FROM \(t2) WHERE id = 1").count()
#expect(try await spark.table(t2).count() == 2)

try await spark.sql("UPDATE \(t2) SET data = 'new' WHERE id = 2").count()
#expect(try await spark.sql("SELECT * FROM \(t2) WHERE data = 'new'").count() == 1)

try await spark.table(t1).writeTo(t2).overwritePartitions()
#expect(try await spark.table(t2).count() == 3)

try await spark.sql("INSERT OVERWRITE \(t2) SELECT * FROM \(t1)").count()
#expect(try await spark.table(t2).count() == 3)

try await spark.sql("INSERT OVERWRITE \(t2) SELECT * FROM \(t1) WHERE category = 'C'").count()
#expect(try await spark.table(t2).count() == 1)

try await spark.table(t1).writeTo(t2).overwrite("category = 'C'")
#expect(try await spark.table(t2).count() == 3)
})

await spark.stop()
Expand Down
Loading