Skip to content

Commit 5a40883

Browse files
fix: Fix materialization details error handling (#282)
* fix: Fix materialization details error handling * pixi update * Somehow sa.select("*") can create a race condition with prefect backend. Thus changed to sa.select(sa.text("*")). This error vanished always on second run: Split tests into 2 parallelization groups: dask_engine, prefect_engine ..............F.................................................ssssssss [ 52%] s..................................................sssssssss.... [100%] =================================== FAILURES =================================== ______________ test_materialize_view_union[prefect_engine-False] _______________ instance = '*' @inspection._inspects(object) def _inspect_mapped_object(instance: _T) -> Optional[InstanceState[_T]]: try: > return instance_state(instance) ^^^^^^^^^^^^^^^^^^^^^^^^ E AttributeError: 'str' object has no attribute '_sa_instance_state' .pixi/envs/py311all/lib/python3.11/site-packages/sqlalchemy/orm/base.py:430: AttributeError * Prevent version conflict for ibm_db=3.2.7 * Convert auto_table error to warning in pipedag tests. It is a bit lame to catch one specific type but this type is helpful in the general auto_table arguments and this is hard to make dependent on pydiverse.transform being installed. * Convert auto_table error to warning in pipedag tests. Prepare release 0.11.0 --------- Co-authored-by: windiana42 <windiana@users.sf.net>
1 parent edb893d commit 5a40883

File tree

13 files changed

+12886
-11551
lines changed

13 files changed

+12886
-11551
lines changed

pixi.lock

Lines changed: 12823 additions & 11507 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pixi.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ dataframely = ">=1.2.1,<2"
101101
platforms = ["linux-64", "osx-arm64", "win-64"]
102102

103103
[feature.ibm-db.dependencies]
104-
ibm_db = ">=3.2.5"
104+
ibm_db = ">=3.2.5, <3.2.7" # somehoew there is a version conflict for 3.2.7 that fails empty queries
105105
ibm_db_sa = ">=0.3.8"
106106

107107
# S3
@@ -127,6 +127,7 @@ sphinx-copybutton = ">=0.5.2"
127127
[tasks]
128128
postinstall = "pip install --no-build-isolation --no-deps --disable-pip-version-check -e ."
129129
zip-examples = "sh -c 'set -ex; for d in example*; do if test -f $d/run_pipeline.py; then echo $d; cd $d; pixi install; cd -; rm -r $d/.pixi $d/__pycache__ || echo ignore; rm docs/source/examples/zip/$d.zip || echo $d.zip is new; zip -r docs/source/examples/zip/$d.zip $d/; fi; done'"
130+
install-all = "sh -c 'cd .pixi/envs/ && for i in */; do pixi install -e $(basename $i /); done && cd -'"
130131

131132
[feature.docs.tasks]
132133
docs = "cd docs && make html "

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "pydiverse-pipedag"
3-
version = "0.10.12"
3+
version = "0.11.0"
44
description = "A pipeline orchestration library executing tasks within one python session. It takes care of SQL table (de)materialization, caching and cache invalidation. Blob storage is supported as well for example for storing model files."
55
authors = [
66
{ name = "QuantCo, Inc." },

src/pydiverse/pipedag/backend/table/sql/ddl.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -876,7 +876,7 @@ def insert_into_in_query(select_sql, schema, table):
876876
def visit_create_alias(create_alias: CreateAlias, compiler, **kw):
877877
from_name = compiler.preparer.quote(create_alias.from_name)
878878
from_schema = compiler.preparer.format_schema(create_alias.from_schema.get())
879-
query = sa.select("*").select_from(sa.text(f"{from_schema}.{from_name}"))
879+
query = sa.select(sa.text("*")).select_from(sa.text(f"{from_schema}.{from_name}"))
880880
return compiler.process(CreateViewAsSelect(create_alias.to_name, create_alias.to_schema, query), **kw)
881881

882882

@@ -912,7 +912,7 @@ def visit_create_alias(create_alias: CreateAlias, compiler, **kw):
912912
def visit_copy_table(copy_table: CopyTable, compiler, **kw):
913913
from_name = compiler.preparer.quote(copy_table.from_name)
914914
from_schema = compiler.preparer.format_schema(copy_table.from_schema.get())
915-
query = sa.select("*").select_from(sa.text(f"{from_schema}.{from_name}"))
915+
query = sa.select(sa.text("*")).select_from(sa.text(f"{from_schema}.{from_name}"))
916916
create = CreateTableAsSelect(
917917
copy_table.to_name,
918918
copy_table.to_schema,

src/pydiverse/pipedag/backend/table/sql/dialects/ibm_db2.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -186,12 +186,10 @@ def resolve_alias(self, table: Table, stage_name: str) -> tuple[str, str]:
186186
table_name, schema = super().resolve_alias(table, stage_name)
187187
return PipedagDB2Reflection.resolve_alias(self.engine, table_name, schema)
188188

189-
def check_materialization_details_supported(self, label: str | None) -> None:
190-
_ = label
191-
return
192-
193-
def _set_materialization_details(self, materialization_details: dict[str, dict[str | list[str]]] | None) -> None:
194-
self.materialization_details = IBMDB2MaterializationDetails.create_materialization_details_dict(
189+
def _create_materialization_details(
190+
self, materialization_details: dict[str, dict[str | list[str]]] | None
191+
) -> BaseMaterializationDetails:
192+
return IBMDB2MaterializationDetails.create_materialization_details_dict(
195193
materialization_details,
196194
self.strict_materialization_details,
197195
self.default_materialization_details,

src/pydiverse/pipedag/backend/table/sql/dialects/mssql.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -372,8 +372,10 @@ def resolve_alias(self, table: Table, stage_name: str):
372372
table_name, schema = super().resolve_alias(table, stage_name)
373373
return PipedagMSSqlReflection.resolve_alias(self.engine, table_name, schema)
374374

375-
def _set_materialization_details(self, materialization_details: dict[str, dict[str | list[str]]] | None) -> None:
376-
self.materialization_details = MSSqlMaterializationDetails.create_materialization_details_dict(
375+
def _create_materialization_details(
376+
self, materialization_details: dict[str, dict[str | list[str]]] | None
377+
) -> BaseMaterializationDetails:
378+
return MSSqlMaterializationDetails.create_materialization_details_dict(
377379
materialization_details,
378380
self.strict_materialization_details,
379381
self.default_materialization_details,

src/pydiverse/pipedag/backend/table/sql/dialects/postgres.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -109,12 +109,10 @@ def lock_source_table(self, table: Table | str, schema: Schema | str, conn: Any
109109
self.execute(stmt, conn=conn)
110110
return [stmt]
111111

112-
def check_materialization_details_supported(self, label: str | None) -> None:
113-
_ = label
114-
return
115-
116-
def _set_materialization_details(self, materialization_details: dict[str, dict[str | list[str]]] | None) -> None:
117-
self.materialization_details = PostgresMaterializationDetails.create_materialization_details_dict(
112+
def _create_materialization_details(
113+
self, materialization_details: dict[str, dict[str | list[str]]] | None
114+
) -> BaseMaterializationDetails:
115+
return PostgresMaterializationDetails.create_materialization_details_dict(
118116
materialization_details,
119117
self.strict_materialization_details,
120118
self.default_materialization_details,

src/pydiverse/pipedag/backend/table/sql/hooks.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,7 @@ def materialize(
405405
):
406406
query = table.obj
407407
if isinstance(table.obj, (sa.Table, sa.sql.expression.Alias)):
408-
query = sa.select("*").select_from(table.obj)
408+
query = sa.select(sa.text("*")).select_from(table.obj)
409409
tbl = table.obj
410410
while hasattr(tbl, "original"):
411411
tbl = tbl.original
@@ -617,7 +617,7 @@ def retrieve(
617617
@classmethod
618618
def lazy_query_str(cls, store, obj) -> str:
619619
if isinstance(obj, sa.sql.expression.FromClause):
620-
query = sa.select("*").select_from(obj)
620+
query = sa.select(sa.text("*")).select_from(obj)
621621
else:
622622
query = obj
623623
query_str = str(query.compile(store.engine, compile_kwargs={"literal_binds": True}))
@@ -1062,7 +1062,7 @@ def _build_retrieve_query(
10621062
cols, dtypes = cls._adjust_cols_retrieve(store, cols, dtypes)
10631063

10641064
if cols is None:
1065-
query = sa.select("*").select_from(query)
1065+
query = sa.select(sa.text("*")).select_from(query)
10661066
else:
10671067
query = sa.select(*cols.values()).select_from(query)
10681068
if limit is not None:

src/pydiverse/pipedag/backend/table/sql/reflection.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,15 @@ class PipedagDB2Reflection:
88
@staticmethod
99
def get_alias_names(engine: Engine, schema: str) -> list[str]:
1010
"""Returns all aliases in a schema"""
11-
return PipedagDB2Reflection._get_tabname(engine, schema, "A")
11+
return PipedagDB2Reflection._get_tabnames(engine, schema, "A")
1212

1313
@staticmethod
1414
def get_nickname_names(engine: Engine, schema: str) -> list[str]:
1515
"""Returns all nicknames in a schema"""
16-
return PipedagDB2Reflection._get_tabname(engine, schema, "N")
16+
return PipedagDB2Reflection._get_tabnames(engine, schema, "N")
1717

1818
@staticmethod
19-
def _get_tabname(engine: Engine, schema: str, _type: str):
19+
def _get_tabnames(engine: Engine, schema: str, _type: str):
2020
schema = engine.dialect.denormalize_name(schema)
2121
query = f"""
2222
SELECT TABNAME

src/pydiverse/pipedag/backend/table/sql/sql.py

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
)
4141
from pydiverse.pipedag.context.run_context import DeferredTableStoreOp
4242
from pydiverse.pipedag.errors import CacheError
43-
from pydiverse.pipedag.materialize.details import resolve_materialization_details_label
43+
from pydiverse.pipedag.materialize.details import BaseMaterializationDetails, resolve_materialization_details_label
4444
from pydiverse.pipedag.materialize.materializing_task import MaterializingTask
4545
from pydiverse.pipedag.materialize.metadata import (
4646
LazyTableMetadata,
@@ -344,7 +344,7 @@ def __init__(
344344

345345
self.default_materialization_details = default_materialization_details
346346

347-
self._set_materialization_details(materialization_details)
347+
self.materialization_details = self._create_materialization_details(materialization_details)
348348

349349
self.logger.info(
350350
"Initialized SQL Table Store",
@@ -611,18 +611,27 @@ def drop_subquery_table(
611611
def check_materialization_details_supported(self, label: str | None) -> None:
612612
if label is None:
613613
return
614-
error_msg = f"Materialization details are not supported for store {type(self).__name__}."
614+
if self.materialization_details and label in self.materialization_details:
615+
return
616+
msg = f"{label} is an unknown materialization details label."
615617
if self.strict_materialization_details:
616-
raise ValueError(f"{error_msg} To silence this exception set strict_materialization_details=False")
618+
raise ValueError(f"{msg} To silence this exception set strict_materialization_details=False")
617619
else:
618-
self.logger.warning(f"{error_msg}")
620+
self.logger.warning(msg)
621+
622+
def _create_materialization_details(
623+
self, materialization_details: dict[str, dict[str | list[str]]] | None
624+
) -> BaseMaterializationDetails | None:
625+
"""This function causes an error or exception if materialization details are specified for a table
626+
store that does not support them.
619627
620-
def _set_materialization_details(self, materialization_details: dict[str, dict[str | list[str]]] | None) -> None:
628+
Any table store that supports materialization details should override this method."""
621629
if materialization_details is not None or self.default_materialization_details is not None:
622-
error_msg = f"{type(self).__name__} does not support materialization details."
630+
msg = f"{type(self).__name__} does not support materialization details."
623631
if self.strict_materialization_details:
624-
raise TypeError(f"{error_msg} To suppress this exception, use strict_materialization_details=False")
625-
self.logger.error(error_msg)
632+
raise ValueError(f"{msg} To silence this exception, use strict_materialization_details=False")
633+
self.logger.warning(msg)
634+
return None # no materialization details object
626635

627636
def get_unlogged(self, materialization_details_label: str | None) -> bool:
628637
_ = materialization_details_label

0 commit comments

Comments
 (0)