Skip to content

Commit d91593d

Browse files
authored
Add get_current_history_length() to workflow info (#73)
1 parent a47f573 commit d91593d

File tree

3 files changed

+21
-1
lines changed

3 files changed

+21
-1
lines changed

temporalio/worker/workflow_instance.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ def __init__(self, det: WorkflowInstanceDetails) -> None:
149149
self._info = det.info
150150
self._primary_task: Optional[asyncio.Task[None]] = None
151151
self._time = 0.0
152+
self._current_history_length = 0
152153
# Handles which are ready to run on the next event loop iteration
153154
self._ready: Deque[asyncio.Handle] = collections.deque()
154155
self._conditions: List[Tuple[Callable[[], bool], asyncio.Future]] = []
@@ -231,6 +232,7 @@ def activate(
231232
)
232233
self._current_completion.successful.SetInParent()
233234
self._current_activation_error: Optional[Exception] = None
235+
self._current_history_length = act.history_length
234236
self._time = act.timestamp.ToMicroseconds() / 1e6
235237
self._is_replaying = act.is_replaying
236238

@@ -652,6 +654,9 @@ def workflow_continue_as_new(
652654
# TODO(cretz): Why can't MyPy infer the above never returns?
653655
raise RuntimeError("Unreachable")
654656

657+
def workflow_get_current_history_length(self) -> int:
658+
return self._current_history_length
659+
655660
def workflow_get_external_workflow_handle(
656661
self, id: str, *, run_id: Optional[str]
657662
) -> temporalio.workflow.ExternalWorkflowHandle[Any]:

temporalio/workflow.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,14 @@ def _logger_details(self) -> Mapping[str, Any]:
293293
"workflow_type": self.workflow_type,
294294
}
295295

296+
def get_current_history_length(self) -> int:
297+
"""Get the current number of events in history.
298+
299+
Returns:
300+
Current number of events in history (up until the current task).
301+
"""
302+
return _Runtime.current().workflow_get_current_history_length()
303+
296304

297305
@dataclass(frozen=True)
298306
class ParentInfo:
@@ -347,6 +355,10 @@ def workflow_continue_as_new(
347355
) -> NoReturn:
348356
...
349357

358+
@abstractmethod
359+
def workflow_get_current_history_length(self) -> int:
360+
...
361+
350362
@abstractmethod
351363
def workflow_get_external_workflow_handle(
352364
self, id: str, *, run_id: Optional[str]

tests/worker/test_workflow.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,9 @@ class InfoWorkflow:
110110
@workflow.run
111111
async def run(self) -> Dict:
112112
# Convert to JSON and back so it'll stringify un-JSON-able pieces
113-
return json.loads(json.dumps(dataclasses.asdict(workflow.info()), default=str))
113+
ret = dataclasses.asdict(workflow.info())
114+
ret["current_history_length"] = workflow.info().get_current_history_length()
115+
return json.loads(json.dumps(ret, default=str))
114116

115117

116118
async def test_workflow_info(client: Client):
@@ -130,6 +132,7 @@ async def test_workflow_info(client: Client):
130132
)
131133
assert info["attempt"] == 1
132134
assert info["cron_schedule"] is None
135+
assert info["current_history_length"] == 3
133136
assert info["execution_timeout"] is None
134137
assert info["namespace"] == client.namespace
135138
assert info["retry_policy"] == json.loads(

0 commit comments

Comments
 (0)