Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 145 additions & 0 deletions tests/integration/reconcile/test_schema_compare.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,3 +328,148 @@ def test_schema_compare(mock_spark):
assert df.count() == 2
assert df.filter("is_valid = 'true'").count() == 2
assert df.filter("is_valid = 'false'").count() == 0


def test_schema_compare_large_column_count_bug_validation(mock_spark):
"""
Test to validate the bug in issue #1973 where schema comparison
dashboard contains only 50 rows even when table has more than 50 columns.
This test creates a schema with more than 50 columns to reproduce the bug.

The bug is likely in the data persistence layer where collect_list might
have a default limit or the explode operation in the dashboard query
might be limited.
"""
# Create 60 columns to test the 50-row limit bug
src_schema = []
tgt_schema = []

for i in range(1, 61): # 60 columns
col_name = f"col_{i:03d}"
src_schema.append(schema_fixture_factory(col_name, "string", f"`{col_name}`", f"`{col_name}`"))
tgt_schema.append(schema_fixture_factory(col_name, "string", f"`{col_name}`", f"`{col_name}`"))

spark = mock_spark
table_conf = Table(
source_name="large_table",
target_name="large_table",
drop_columns=[],
column_mapping=[],
)

schema_compare_output = SchemaCompare(spark).compare(
src_schema,
tgt_schema,
get_dialect("databricks"),
table_conf,
)
df = schema_compare_output.compare_df

# The bug is that we expect 60 rows but only get 50
# If the bug exists, this assertion will fail
actual_count = df.count()
valid_count = df.filter("is_valid = 'true'").count()
invalid_count = df.filter("is_valid = 'false'").count()

# Debug information for troubleshooting
print(f"DEBUG: Created {len(src_schema)} source columns and {len(tgt_schema)} target columns")
print(f"DEBUG: Schema comparison returned {actual_count} rows")
print(f"DEBUG: Valid rows: {valid_count}, Invalid rows: {invalid_count}")

assert actual_count == 60, (
f"BUG CONFIRMED (Issue #1973): Expected 60 rows in schema comparison result, "
f"but got {actual_count}. This confirms the 50-row limit bug exists. "
f"Schema comparison processed {len(src_schema)} columns but only returned {actual_count} rows."
)
assert valid_count == 60
assert invalid_count == 0
assert schema_compare_output.is_valid


def test_schema_compare_exactly_50_columns(mock_spark):
"""
Test with exactly 50 columns to see if this works fine.
This helps isolate whether the issue is specifically with >50 columns.
"""
# Create exactly 50 columns
src_schema = []
tgt_schema = []

for i in range(1, 51): # 50 columns
col_name = f"col_{i:03d}"
src_schema.append(schema_fixture_factory(col_name, "string", f"`{col_name}`", f"`{col_name}`"))
tgt_schema.append(schema_fixture_factory(col_name, "string", f"`{col_name}`", f"`{col_name}`"))

spark = mock_spark
table_conf = Table(
source_name="fifty_col_table",
target_name="fifty_col_table",
drop_columns=[],
column_mapping=[],
)

schema_compare_output = SchemaCompare(spark).compare(
src_schema,
tgt_schema,
get_dialect("databricks"),
table_conf,
)
df = schema_compare_output.compare_df

# This should work fine with exactly 50 columns
assert df.count() == 50, f"Expected 50 rows in schema comparison result, but got {df.count()}"
assert df.filter("is_valid = 'true'").count() == 50
assert df.filter("is_valid = 'false'").count() == 0
assert schema_compare_output.is_valid


def test_schema_compare_51_columns_edge_case(mock_spark):
"""
Test with exactly 51 columns to see if the issue starts at >50.
This helps pinpoint the exact threshold where the bug occurs.
"""
# Create 51 columns to test the edge case
src_schema = []
tgt_schema = []

for i in range(1, 52): # 51 columns
col_name = f"col_{i:03d}"
src_schema.append(schema_fixture_factory(col_name, "string", f"`{col_name}`", f"`{col_name}`"))
tgt_schema.append(schema_fixture_factory(col_name, "string", f"`{col_name}`", f"`{col_name}`"))

spark = mock_spark
table_conf = Table(
source_name="fifty_one_col_table",
target_name="fifty_one_col_table",
drop_columns=[],
column_mapping=[],
)

schema_compare_output = SchemaCompare(spark).compare(
src_schema,
tgt_schema,
get_dialect("databricks"),
table_conf,
)
df = schema_compare_output.compare_df

# If the bug exists, this might return only 50 rows instead of 51
actual_count = df.count()
valid_count = df.filter("is_valid = 'true'").count()
invalid_count = df.filter("is_valid = 'false'").count()

print(f"DEBUG: Edge case test - Created {len(src_schema)} columns, got {actual_count} rows")

if actual_count == 50:
# Bug confirmed: 51 columns but only 50 rows returned
assert False, (
f"BUG DETECTED (Issue #1973): Expected 51 rows but got {actual_count}. "
f"The 50-row limit bug is confirmed. Schema had {len(src_schema)} columns "
f"but comparison returned only {actual_count} rows."
)
else:
# No bug: all 51 rows returned as expected
assert actual_count == 51, f"Expected 51 rows in schema comparison result, but got {actual_count}"
assert valid_count == 51
assert invalid_count == 0
assert schema_compare_output.is_valid
Loading