From 963da6901e271a7fbfcf7cc6974706dba4031846 Mon Sep 17 00:00:00 2001 From: Anurag Anjaria Date: Thu, 24 Apr 2025 16:32:49 +0100 Subject: [PATCH 1/4] add FreshnessStateChange event --- .../dagster/dagster/_core/definitions/freshness.py | 11 +++++++++++ .../dagster/dagster/_core/events/__init__.py | 4 +++- .../dagster/dagster/_core/instance/__init__.py | 10 +++++++++- 3 files changed, 23 insertions(+), 2 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/freshness.py b/python_modules/dagster/dagster/_core/definitions/freshness.py index e941ca08462d0..3f31c94f32294 100644 --- a/python_modules/dagster/dagster/_core/definitions/freshness.py +++ b/python_modules/dagster/dagster/_core/definitions/freshness.py @@ -28,6 +28,17 @@ class FreshnessStateEvaluation: freshness_state: FreshnessState +@whitelist_for_serdes +@record +class FreshnessStateChange: + """Event that is emitted when the freshness state of an asset changes.""" + + key: AssetKey + new_state: FreshnessState + previous_state: FreshnessState + state_change_timestamp: float + + INTERNAL_FRESHNESS_POLICY_METADATA_KEY = "dagster/internal_freshness_policy" diff --git a/python_modules/dagster/dagster/_core/events/__init__.py b/python_modules/dagster/dagster/_core/events/__init__.py index eed4afe2f06be..d7c8aa2e69711 100644 --- a/python_modules/dagster/dagster/_core/events/__init__.py +++ b/python_modules/dagster/dagster/_core/events/__init__.py @@ -67,7 +67,7 @@ if TYPE_CHECKING: from dagster._core.definitions.events import ObjectStoreOperation - from dagster._core.definitions.freshness import FreshnessStateEvaluation + from dagster._core.definitions.freshness import FreshnessStateChange, FreshnessStateEvaluation from dagster._core.execution.plan.plan import ExecutionPlan from dagster._core.execution.plan.step import StepKind @@ -95,6 +95,7 @@ "AssetFailedToMaterializeData", "RunEnqueuedData", "FreshnessStateEvaluation", + "FreshnessStateChange", ] @@ -172,6 +173,7 @@ class DagsterEventType(str, Enum): LOGS_CAPTURED = "LOGS_CAPTURED" FRESHNESS_STATE_EVALUATION = "FRESHNESS_STATE_EVALUATION" + FRESHNESS_STATE_CHANGE = "FRESHNESS_STATE_CHANGE" EVENT_TYPE_TO_DISPLAY_STRING = { diff --git a/python_modules/dagster/dagster/_core/instance/__init__.py b/python_modules/dagster/dagster/_core/instance/__init__.py index fe50830435796..ece9281897119 100644 --- a/python_modules/dagster/dagster/_core/instance/__init__.py +++ b/python_modules/dagster/dagster/_core/instance/__init__.py @@ -38,7 +38,11 @@ ) from dagster._core.definitions.data_version import extract_data_provenance_from_entry from dagster._core.definitions.events import AssetKey, AssetObservation -from dagster._core.definitions.freshness import FreshnessStateEvaluation, FreshnessStateRecord +from dagster._core.definitions.freshness import ( + FreshnessStateChange, + FreshnessStateEvaluation, + FreshnessStateRecord, +) from dagster._core.definitions.partition_key_range import PartitionKeyRange from dagster._core.errors import ( DagsterHomeNotSetError, @@ -3479,6 +3483,7 @@ def report_runless_asset_event( "AssetObservation", "AssetCheckEvaluation", "FreshnessStateEvaluation", + "FreshnessStateChange", ], ): """Record an event log entry related to assets that does not belong to a Dagster run.""" @@ -3502,6 +3507,9 @@ def report_runless_asset_event( elif isinstance(asset_event, FreshnessStateEvaluation): event_type_value = DagsterEventType.FRESHNESS_STATE_EVALUATION.value data_payload = asset_event + elif isinstance(asset_event, FreshnessStateChange): + event_type_value = DagsterEventType.FRESHNESS_STATE_CHANGE.value + data_payload = asset_event else: raise DagsterInvariantViolationError( f"Received unexpected asset event type {asset_event}, expected" From b0c8fccdd528d57da63ff82789b4502b1072c614 Mon Sep 17 00:00:00 2001 From: Anurag Anjaria Date: Fri, 2 May 2025 17:22:08 -0400 Subject: [PATCH 2/4] add _report_runless_asset_event --- .../dagster/_core/instance/__init__.py | 52 ++++++++++++++++++- 1 file changed, 50 insertions(+), 2 deletions(-) diff --git a/python_modules/dagster/dagster/_core/instance/__init__.py b/python_modules/dagster/dagster/_core/instance/__init__.py index ece9281897119..183bee1433ff0 100644 --- a/python_modules/dagster/dagster/_core/instance/__init__.py +++ b/python_modules/dagster/dagster/_core/instance/__init__.py @@ -3483,7 +3483,6 @@ def report_runless_asset_event( "AssetObservation", "AssetCheckEvaluation", "FreshnessStateEvaluation", - "FreshnessStateChange", ], ): """Record an event log entry related to assets that does not belong to a Dagster run.""" @@ -3495,6 +3494,55 @@ def report_runless_asset_event( StepMaterializationData, ) + if isinstance(asset_event, AssetMaterialization): + event_type_value = DagsterEventType.ASSET_MATERIALIZATION.value + data_payload = StepMaterializationData(asset_event) + elif isinstance(asset_event, AssetCheckEvaluation): + event_type_value = DagsterEventType.ASSET_CHECK_EVALUATION.value + data_payload = asset_event + elif isinstance(asset_event, AssetObservation): + event_type_value = DagsterEventType.ASSET_OBSERVATION.value + data_payload = AssetObservationData(asset_event) + elif isinstance(asset_event, FreshnessStateEvaluation): + event_type_value = DagsterEventType.FRESHNESS_STATE_EVALUATION.value + data_payload = asset_event + else: + raise DagsterInvariantViolationError( + f"Received unexpected asset event type {asset_event}, expected" + " AssetMaterialization, AssetObservation, AssetCheckEvaluation or FreshnessStateEvaluation" + ) + + return self.report_dagster_event( + run_id=RUNLESS_RUN_ID, + dagster_event=DagsterEvent( + event_type_value=event_type_value, + event_specific_data=data_payload, + job_name=RUNLESS_JOB_NAME, + ), + ) + + def _report_runless_asset_event( + self, + asset_event: Union[ + "AssetMaterialization", + "AssetObservation", + "AssetCheckEvaluation", + "FreshnessStateEvaluation", + "FreshnessStateChange", + ], + ): + """Does exactly the same thing as report_runless_asset_event, but includes internal event types as well. + + Use this over report_runless_asset_event for internal events. + """ + from dagster._core.events import ( + AssetMaterialization, + AssetObservationData, + DagsterEvent, + DagsterEventType, + StepMaterializationData, + ) + if isinstance(asset_event, AssetMaterialization): event_type_value = DagsterEventType.ASSET_MATERIALIZATION.value data_payload = StepMaterializationData(asset_event) @@ -3513,7 +3561,7 @@ def report_runless_asset_event( else: raise DagsterInvariantViolationError( f"Received unexpected asset event type {asset_event}, expected" - " AssetMaterialization, AssetObservation, AssetCheckEvaluation or FreshnessStateEvaluation" + " AssetMaterialization, AssetObservation, AssetCheckEvaluation, FreshnessStateEvaluation or FreshnessStateChange" ) return self.report_dagster_event( From dd5578b370160f297fdbd0750de6cb5df9d40e5d Mon Sep 17 00:00:00 2001 From: Anurag Anjaria Date: Fri, 2 May 2025 18:01:22 -0400 Subject: [PATCH 3/4] use new _report method in public report method --- .../dagster/_core/instance/__init__.py | 44 +++++-------------- 1 file changed, 12 insertions(+), 32 deletions(-) diff --git a/python_modules/dagster/dagster/_core/instance/__init__.py b/python_modules/dagster/dagster/_core/instance/__init__.py index 183bee1433ff0..6b70cf690e110 100644 --- a/python_modules/dagster/dagster/_core/instance/__init__.py +++ b/python_modules/dagster/dagster/_core/instance/__init__.py @@ -3486,40 +3486,23 @@ def report_runless_asset_event( ], ): """Record an event log entry related to assets that does not belong to a Dagster run.""" - from dagster._core.events import ( - AssetMaterialization, - AssetObservationData, - DagsterEvent, - DagsterEventType, - StepMaterializationData, - ) + from dagster._core.events import AssetMaterialization - if isinstance(asset_event, AssetMaterialization): - event_type_value = DagsterEventType.ASSET_MATERIALIZATION.value - data_payload = StepMaterializationData(asset_event) - elif isinstance(asset_event, AssetCheckEvaluation): - event_type_value = DagsterEventType.ASSET_CHECK_EVALUATION.value - data_payload = asset_event - elif isinstance(asset_event, AssetObservation): - event_type_value = DagsterEventType.ASSET_OBSERVATION.value - data_payload = AssetObservationData(asset_event) - elif isinstance(asset_event, FreshnessStateEvaluation): - event_type_value = DagsterEventType.FRESHNESS_STATE_EVALUATION.value - data_payload = asset_event - else: + if not isinstance( + asset_event, + ( + AssetMaterialization, + AssetObservation, + AssetCheckEvaluation, + FreshnessStateEvaluation, + ), + ): raise DagsterInvariantViolationError( f"Received unexpected asset event type {asset_event}, expected" " AssetMaterialization, AssetObservation, AssetCheckEvaluation or FreshnessStateEvaluation" ) - return self.report_dagster_event( - run_id=RUNLESS_RUN_ID, - dagster_event=DagsterEvent( - event_type_value=event_type_value, - event_specific_data=data_payload, - job_name=RUNLESS_JOB_NAME, - ), - ) + return self._report_runless_asset_event(asset_event) def _report_runless_asset_event( self, @@ -3531,10 +3514,7 @@ def _report_runless_asset_event( "FreshnessStateChange", ], ): - """Does exactly the same thing as report_runless_asset_event, but includes internal event types as well. - - Use this over report_runless_asset_event for internal events. - """ + """Use this directly over report_runless_asset_event to emit internal events.""" from dagster._core.events import ( AssetMaterialization, AssetObservationData, From dd19365e2abe9121bfba3e7f1e2840807888969e Mon Sep 17 00:00:00 2001 From: Anurag Anjaria Date: Mon, 5 May 2025 13:51:48 -0400 Subject: [PATCH 4/4] regen gql --- .../dagster-ui/packages/ui-core/src/graphql/schema.graphql | 1 + js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts | 1 + 2 files changed, 2 insertions(+) diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql index b52ec90ce28ff..6082a23a81d18 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql @@ -79,6 +79,7 @@ enum DagsterEventType { ALERT_FAILURE LOGS_CAPTURED FRESHNESS_STATE_EVALUATION + FRESHNESS_STATE_CHANGE } type ExecutionStepFailureEvent implements MessageEvent & StepEvent & ErrorEvent { diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts index e2adeb603e5bf..df30ab1809d54 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts @@ -1077,6 +1077,7 @@ export enum DagsterEventType { ASSET_OBSERVATION = 'ASSET_OBSERVATION', ASSET_STORE_OPERATION = 'ASSET_STORE_OPERATION', ENGINE_EVENT = 'ENGINE_EVENT', + FRESHNESS_STATE_CHANGE = 'FRESHNESS_STATE_CHANGE', FRESHNESS_STATE_EVALUATION = 'FRESHNESS_STATE_EVALUATION', HANDLED_OUTPUT = 'HANDLED_OUTPUT', HOOK_COMPLETED = 'HOOK_COMPLETED',