Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions dbldatagen/spec/column_spec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from __future__ import annotations

from typing import Any, Literal

from .compat import BaseModel, root_validator


DbldatagenBasicType = Literal[
"string",
"int",
"long",
"float",
"double",
"decimal",
"boolean",
"date",
"timestamp",
"short",
"byte",
"binary",
"integer",
"bigint",
"tinyint",
]
class ColumnDefinition(BaseModel):
name: str
type: DbldatagenBasicType | None = None
primary: bool = False
options: dict[str, Any] | None = {}
nullable: bool | None = False
omit: bool | None = False
baseColumn: str | None = "id"
baseColumnType: str | None = "auto"

@root_validator()
def check_model_constraints(cls, values: dict[str, Any]) -> dict[str, Any]:
"""
Validates constraints across the entire model after individual fields are processed.
"""
is_primary = values.get("primary")
options = values.get("options", {})
name = values.get("name")
is_nullable = values.get("nullable")
column_type = values.get("type")

if is_primary:
if "min" in options or "max" in options:
raise ValueError(f"Primary column '{name}' cannot have min/max options.")

if is_nullable:
raise ValueError(f"Primary column '{name}' cannot be nullable.")

if column_type is None:
raise ValueError(f"Primary column '{name}' must have a type defined.")
return values
31 changes: 31 additions & 0 deletions dbldatagen/spec/compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# This module acts as a compatibility layer for Pydantic V1 and V2.

try:
# This will succeed on environments with Pydantic V2.x
# It imports the V1 API that is bundled within V2.
from pydantic.v1 import BaseModel, Field, validator, constr, root_validator

except ImportError:
# This will be executed on environments with only Pydantic V1.x
from pydantic import BaseModel, Field, validator, constr, root_validator

# In your application code, do this:
# from .compat import BaseModel
# NOT this:
# from pydantic import BaseModel

# FastAPI Notes
# https://github.com/fastapi/fastapi/blob/master/fastapi/_compat.py


"""
## Why This Approach
No Installation Required: It directly addresses your core requirement.
You don't need to %pip install anything, which avoids conflicts with the pre-installed libraries on Databricks.
Single Codebase: You maintain one set of code that is guaranteed to work with the Pydantic V1 API, which is available in both runtimes.

Environment Agnostic: Your application code in models.py has no idea which version of Pydantic is actually installed. The compat.py module handles that complexity completely.

Future-Ready: When you eventually decide to migrate fully to the Pydantic V2 API (to take advantage of its speed and features),
you only need to change your application code and your compat.py import statements, making the transition much clearer.
"""
278 changes: 278 additions & 0 deletions dbldatagen/spec/generator_spec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
from __future__ import annotations

from typing import Any, Literal, Union

import pandas as pd
from IPython.display import HTML, display

from dbldatagen.spec.column_spec import ColumnDefinition

from .compat import BaseModel, validator


class UCSchemaTarget(BaseModel):
catalog: str
schema_: str
output_format: str = "delta" # Default to delta for UC Schema

@validator("catalog", "schema_")
def validate_identifiers(cls, v): # noqa: N805, pylint: disable=no-self-argument
if not v.strip():
raise ValueError("Identifier must be non-empty.")
if not v.isidentifier():
logger.warning(
f"'{v}' is not a basic Python identifier. Ensure validity for Unity Catalog.")
return v.strip()

def __str__(self):
return f"{self.catalog}.{self.schema_} (Format: {self.output_format}, Type: UC Table)"


class FilePathTarget(BaseModel):
base_path: str
output_format: Literal["csv", "parquet"] # No default, must be specified

@validator("base_path")
def validate_base_path(cls, v): # noqa: N805, pylint: disable=no-self-argument
if not v.strip():
raise ValueError("base_path must be non-empty.")
return v.strip()

def __str__(self):
return f"{self.base_path} (Format: {self.output_format}, Type: File Path)"


class TableDefinition(BaseModel):
number_of_rows: int
partitions: int | None = None
columns: list[ColumnDefinition]


class ValidationResult:
"""Container for validation results with errors and warnings."""

def __init__(self) -> None:
self.errors: list[str] = []
self.warnings: list[str] = []

def add_error(self, message: str) -> None:
"""Add an error message."""
self.errors.append(message)

def add_warning(self, message: str) -> None:
"""Add a warning message."""
self.warnings.append(message)

def is_valid(self) -> bool:
"""Returns True if there are no errors."""
return len(self.errors) == 0

def __str__(self) -> str:
"""String representation of validation results."""
lines = []
if self.is_valid():
lines.append("✓ Validation passed successfully")
else:
lines.append("✗ Validation failed")

if self.errors:
lines.append(f"\nErrors ({len(self.errors)}):")
for i, error in enumerate(self.errors, 1):
lines.append(f" {i}. {error}")

if self.warnings:
lines.append(f"\nWarnings ({len(self.warnings)}):")
for i, warning in enumerate(self.warnings, 1):
lines.append(f" {i}. {warning}")

return "\n".join(lines)

class DatagenSpec(BaseModel):
tables: dict[str, TableDefinition]
output_destination: Union[UCSchemaTarget, FilePathTarget] | None = None # there is a abstraction, may be we can use that? talk to Greg
generator_options: dict[str, Any] | None = {}
intended_for_databricks: bool | None = None # May be infered.

def _check_circular_dependencies(
self,
table_name: str,
columns: list[ColumnDefinition]
) -> list[str]:
"""
Check for circular dependencies in baseColumn references.
Returns a list of error messages if circular dependencies are found.
"""
errors = []
column_map = {col.name: col for col in columns}

for col in columns:
if col.baseColumn and col.baseColumn != "id":
# Track the dependency chain
visited = set()
current = col.name

while current:
if current in visited:
# Found a cycle
cycle_path = " -> ".join(list(visited) + [current])
errors.append(
f"Table '{table_name}': Circular dependency detected in column '{col.name}': {cycle_path}"
)
break

visited.add(current)
current_col = column_map.get(current)

if not current_col:
break

# Move to the next column in the chain
if current_col.baseColumn and current_col.baseColumn != "id":
if current_col.baseColumn not in column_map:
# baseColumn doesn't exist - we'll catch this in another validation
break
current = current_col.baseColumn
else:
# Reached a column that doesn't have a baseColumn or uses "id"
break

return errors

def validate(self, strict: bool = True) -> ValidationResult:
"""
Validates the entire DatagenSpec configuration.
Always runs all validation checks and collects all errors and warnings.

Args:
strict: If True, raises ValueError if any errors or warnings are found.
If False, only raises ValueError if errors (not warnings) are found.

Returns:
ValidationResult object containing all errors and warnings found.

Raises:
ValueError: If validation fails based on strict mode setting.
The exception message contains all errors and warnings.
"""
result = ValidationResult()

# 1. Check that there's at least one table
if not self.tables:
result.add_error("Spec must contain at least one table definition")

# 2. Validate each table (continue checking all tables even if errors found)
for table_name, table_def in self.tables.items():
# Check table has at least one column
if not table_def.columns:
result.add_error(f"Table '{table_name}' must have at least one column")
continue # Skip further checks for this table since it has no columns

# Check row count is positive
if table_def.number_of_rows <= 0:
result.add_error(
f"Table '{table_name}' has invalid number_of_rows: {table_def.number_of_rows}. "
"Must be a positive integer."
)

# Check partitions if specified
#TODO: though this can be a model field check, we are checking here so that one can correct
# Can we find a way to use the default way?
if table_def.partitions is not None and table_def.partitions <= 0:
result.add_error(
f"Table '{table_name}' has invalid partitions: {table_def.partitions}. "
"Must be a positive integer or None."
)

# Check for duplicate column names
# TODO: Not something possible if we right model, recheck
column_names = [col.name for col in table_def.columns]
duplicates = [name for name in set(column_names) if column_names.count(name) > 1]
if duplicates:
result.add_error(
f"Table '{table_name}' has duplicate column names: {', '.join(duplicates)}"
)

# Build column map for reference checking
column_map = {col.name: col for col in table_def.columns}

# TODO: Check baseColumn references, this is tricky? check the dbldefaults
for col in table_def.columns:
if col.baseColumn and col.baseColumn != "id":
if col.baseColumn not in column_map:
result.add_error(
f"Table '{table_name}', column '{col.name}': "
f"baseColumn '{col.baseColumn}' does not exist in the table"
)

# Check for circular dependencies in baseColumn references
circular_errors = self._check_circular_dependencies(table_name, table_def.columns)
for error in circular_errors:
result.add_error(error)

# Check primary key constraints
primary_columns = [col for col in table_def.columns if col.primary]
if len(primary_columns) > 1:
primary_names = [col.name for col in primary_columns]
result.add_warning(
f"Table '{table_name}' has multiple primary columns: {', '.join(primary_names)}. "
"This may not be the intended behavior."
)

# Check for columns with no type and not using baseColumn properly
for col in table_def.columns:
if not col.primary and not col.type and not col.options:
result.add_warning(
f"Table '{table_name}', column '{col.name}': "
"No type specified and no options provided. "
"Column may not generate data as expected."
)

# 3. Check output destination
if not self.output_destination:
result.add_warning(
"No output_destination specified. Data will be generated but not persisted. "
"Set output_destination to save generated data."
)

# 4. Validate generator options (if any known options)
if self.generator_options:
known_options = [
"random", "randomSeed", "randomSeedMethod", "verbose",
"debug", "seedColumnName"
]
for key in self.generator_options:
if key not in known_options:
result.add_warning(
f"Unknown generator option: '{key}'. "
"This may be ignored during generation."
)

# Now that all validations are complete, decide whether to raise
if (strict and (result.errors or result.warnings)) or (not strict and result.errors):
raise ValueError(str(result))

return result


def display_all_tables(self) -> None:
for table_name, table_def in self.tables.items():
print(f"Table: {table_name}")

if self.output_destination:
output = f"{self.output_destination}"
display(HTML(f"<strong>Output destination:</strong> {output}"))
else:
message = (
"<strong>Output destination:</strong> "
"<span style='color: red; font-weight: bold;'>None</span><br>"
"<span style='color: gray;'>Set it using the <code>output_destination</code> "
"attribute on your <code>DatagenSpec</code> object "
"(e.g., <code>my_spec.output_destination = UCSchemaTarget(...)</code>).</span>"
)
display(HTML(message))

df = pd.DataFrame([col.dict() for col in table_def.columns])
try:
display(df)
except NameError:
print(df.to_string())
Loading
Loading