Skip to content

Commit 33fca9f

Browse files
committed
add FreshnessStateChange event
1 parent cea3b6e commit 33fca9f

File tree

3 files changed

+23
-2
lines changed

3 files changed

+23
-2
lines changed

python_modules/dagster/dagster/_core/definitions/freshness.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,17 @@ class FreshnessStateEvaluation:
2828
freshness_state: FreshnessState
2929

3030

31+
@whitelist_for_serdes
32+
@record
33+
class FreshnessStateChange:
34+
"""Event that is emitted when the freshness state of an asset changes."""
35+
36+
key: AssetKey
37+
new_state: FreshnessState
38+
previous_state: FreshnessState
39+
state_change_timestamp: float
40+
41+
3142
INTERNAL_FRESHNESS_POLICY_METADATA_KEY = "dagster/internal_freshness_policy"
3243

3344

python_modules/dagster/dagster/_core/events/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@
6767

6868
if TYPE_CHECKING:
6969
from dagster._core.definitions.events import ObjectStoreOperation
70-
from dagster._core.definitions.freshness import FreshnessStateEvaluation
70+
from dagster._core.definitions.freshness import FreshnessStateChange, FreshnessStateEvaluation
7171
from dagster._core.execution.plan.plan import ExecutionPlan
7272
from dagster._core.execution.plan.step import StepKind
7373

@@ -95,6 +95,7 @@
9595
"AssetFailedToMaterializeData",
9696
"RunEnqueuedData",
9797
"FreshnessStateEvaluation",
98+
"FreshnessStateChange",
9899
]
99100

100101

@@ -172,6 +173,7 @@ class DagsterEventType(str, Enum):
172173
LOGS_CAPTURED = "LOGS_CAPTURED"
173174

174175
FRESHNESS_STATE_EVALUATION = "FRESHNESS_STATE_EVALUATION"
176+
FRESHNESS_STATE_CHANGE = "FRESHNESS_STATE_CHANGE"
175177

176178

177179
EVENT_TYPE_TO_DISPLAY_STRING = {

python_modules/dagster/dagster/_core/instance/__init__.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,11 @@
3838
)
3939
from dagster._core.definitions.data_version import extract_data_provenance_from_entry
4040
from dagster._core.definitions.events import AssetKey, AssetObservation
41-
from dagster._core.definitions.freshness import FreshnessStateEvaluation, FreshnessStateRecord
41+
from dagster._core.definitions.freshness import (
42+
FreshnessStateChange,
43+
FreshnessStateEvaluation,
44+
FreshnessStateRecord,
45+
)
4246
from dagster._core.definitions.partition_key_range import PartitionKeyRange
4347
from dagster._core.errors import (
4448
DagsterHomeNotSetError,
@@ -3479,6 +3483,7 @@ def report_runless_asset_event(
34793483
"AssetObservation",
34803484
"AssetCheckEvaluation",
34813485
"FreshnessStateEvaluation",
3486+
"FreshnessStateChange",
34823487
],
34833488
):
34843489
"""Record an event log entry related to assets that does not belong to a Dagster run."""
@@ -3502,6 +3507,9 @@ def report_runless_asset_event(
35023507
elif isinstance(asset_event, FreshnessStateEvaluation):
35033508
event_type_value = DagsterEventType.FRESHNESS_STATE_EVALUATION.value
35043509
data_payload = asset_event
3510+
elif isinstance(asset_event, FreshnessStateChange):
3511+
event_type_value = DagsterEventType.FRESHNESS_STATE_CHANGE.value
3512+
data_payload = asset_event
35053513
else:
35063514
raise DagsterInvariantViolationError(
35073515
f"Received unexpected asset event type {asset_event}, expected"

0 commit comments

Comments
 (0)