Skip to content

Commit f8bea01

Browse files
committed
add report_internal_runless_asset_event
1 parent a014e75 commit f8bea01

File tree

1 file changed

+23
-4
lines changed
  • python_modules/dagster/dagster/_core/instance

1 file changed

+23
-4
lines changed

python_modules/dagster/dagster/_core/instance/__init__.py

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3483,7 +3483,6 @@ def report_runless_asset_event(
34833483
"AssetObservation",
34843484
"AssetCheckEvaluation",
34853485
"FreshnessStateEvaluation",
3486-
"FreshnessStateChange",
34873486
],
34883487
):
34893488
"""Record an event log entry related to assets that does not belong to a Dagster run."""
@@ -3507,9 +3506,6 @@ def report_runless_asset_event(
35073506
elif isinstance(asset_event, FreshnessStateEvaluation):
35083507
event_type_value = DagsterEventType.FRESHNESS_STATE_EVALUATION.value
35093508
data_payload = asset_event
3510-
elif isinstance(asset_event, FreshnessStateChange):
3511-
event_type_value = DagsterEventType.FRESHNESS_STATE_CHANGE.value
3512-
data_payload = asset_event
35133509
else:
35143510
raise DagsterInvariantViolationError(
35153511
f"Received unexpected asset event type {asset_event}, expected"
@@ -3525,6 +3521,29 @@ def report_runless_asset_event(
35253521
),
35263522
)
35273523

3524+
def report_internal_runless_asset_event(self, asset_event: Union["FreshnessStateChange",]):
3525+
"""Does exactly the same thing as report_runless_asset_event, only used for internal events.
3526+
3527+
Use this over report_runless_asset_event for internal events.
3528+
"""
3529+
from dagster._core.events import DagsterEvent, DagsterEventType
3530+
3531+
if isinstance(asset_event, FreshnessStateChange):
3532+
event_type_value = DagsterEventType.FRESHNESS_STATE_CHANGE.value
3533+
data_payload = asset_event
3534+
else:
3535+
raise DagsterInvariantViolationError(
3536+
f"Received unexpected asset event type {asset_event}, expected FreshnessStateChange"
3537+
)
3538+
return self.report_dagster_event(
3539+
run_id=RUNLESS_RUN_ID,
3540+
dagster_event=DagsterEvent(
3541+
event_type_value=event_type_value,
3542+
event_specific_data=data_payload,
3543+
job_name=RUNLESS_JOB_NAME,
3544+
),
3545+
)
3546+
35283547
def get_entity_freshness_state(self, entity_key: AssetKey) -> Optional[FreshnessStateRecord]:
35293548
warnings.warn("`get_entity_freshness_state` is not yet implemented for OSS.")
35303549
return None

0 commit comments

Comments
 (0)