Skip to content

Add FreshnessStateChange event type #29568

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/freshness.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,17 @@ class FreshnessStateEvaluation:
freshness_state: FreshnessState


@whitelist_for_serdes
@record
class FreshnessStateChange:
"""Event that is emitted when the freshness state of an asset changes."""

key: AssetKey
new_state: FreshnessState
previous_state: FreshnessState
state_change_timestamp: float


INTERNAL_FRESHNESS_POLICY_METADATA_KEY = "dagster/internal_freshness_policy"


Expand Down
4 changes: 3 additions & 1 deletion python_modules/dagster/dagster/_core/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@

if TYPE_CHECKING:
from dagster._core.definitions.events import ObjectStoreOperation
from dagster._core.definitions.freshness import FreshnessStateEvaluation
from dagster._core.definitions.freshness import FreshnessStateChange, FreshnessStateEvaluation
from dagster._core.execution.plan.plan import ExecutionPlan
from dagster._core.execution.plan.step import StepKind

Expand Down Expand Up @@ -95,6 +95,7 @@
"AssetFailedToMaterializeData",
"RunEnqueuedData",
"FreshnessStateEvaluation",
"FreshnessStateChange",
]


Expand Down Expand Up @@ -172,6 +173,7 @@ class DagsterEventType(str, Enum):
LOGS_CAPTURED = "LOGS_CAPTURED"

FRESHNESS_STATE_EVALUATION = "FRESHNESS_STATE_EVALUATION"
FRESHNESS_STATE_CHANGE = "FRESHNESS_STATE_CHANGE"


EVENT_TYPE_TO_DISPLAY_STRING = {
Expand Down
40 changes: 38 additions & 2 deletions python_modules/dagster/dagster/_core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@
)
from dagster._core.definitions.data_version import extract_data_provenance_from_entry
from dagster._core.definitions.events import AssetKey, AssetObservation
from dagster._core.definitions.freshness import FreshnessStateEvaluation, FreshnessStateRecord
from dagster._core.definitions.freshness import (
FreshnessStateChange,
FreshnessStateEvaluation,
FreshnessStateRecord,
)
from dagster._core.definitions.partition_key_range import PartitionKeyRange
from dagster._core.errors import (
DagsterHomeNotSetError,
Expand Down Expand Up @@ -3482,6 +3486,35 @@ def report_runless_asset_event(
],
):
"""Record an event log entry related to assets that does not belong to a Dagster run."""
from dagster._core.events import AssetMaterialization

if not isinstance(
asset_event,
(
AssetMaterialization,
AssetObservation,
AssetCheckEvaluation,
FreshnessStateEvaluation,
),
):
raise DagsterInvariantViolationError(
f"Received unexpected asset event type {asset_event}, expected"
" AssetMaterialization, AssetObservation, AssetCheckEvaluation or FreshnessStateEvaluation"
)

return self._report_runless_asset_event(asset_event)

def _report_runless_asset_event(
self,
asset_event: Union[
"AssetMaterialization",
"AssetObservation",
"AssetCheckEvaluation",
"FreshnessStateEvaluation",
"FreshnessStateChange",
],
):
"""Use this directly over report_runless_asset_event to emit internal events."""
from dagster._core.events import (
AssetMaterialization,
AssetObservationData,
Expand All @@ -3502,10 +3535,13 @@ def report_runless_asset_event(
elif isinstance(asset_event, FreshnessStateEvaluation):
event_type_value = DagsterEventType.FRESHNESS_STATE_EVALUATION.value
data_payload = asset_event
elif isinstance(asset_event, FreshnessStateChange):
event_type_value = DagsterEventType.FRESHNESS_STATE_CHANGE.value
data_payload = asset_event
else:
raise DagsterInvariantViolationError(
f"Received unexpected asset event type {asset_event}, expected"
" AssetMaterialization, AssetObservation, AssetCheckEvaluation or FreshnessStateEvaluation"
" AssetMaterialization, AssetObservation, AssetCheckEvaluation, FreshnessStateEvaluation or FreshnessStateChange"
)

return self.report_dagster_event(
Expand Down