Skip to content

Commit 232446a

Browse files
authored
Additional high-level describe details (#72)
1 parent b8a80e0 commit 232446a

File tree

3 files changed

+118
-39
lines changed

3 files changed

+118
-39
lines changed

temporalio/client.py

Lines changed: 99 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import uuid
66
import warnings
77
from dataclasses import dataclass
8-
from datetime import timedelta
8+
from datetime import datetime, timedelta, timezone
99
from enum import IntEnum
1010
from typing import (
1111
Any,
@@ -867,7 +867,7 @@ async def cancel(self) -> None:
867867

868868
async def describe(
869869
self,
870-
) -> WorkflowDescription:
870+
) -> WorkflowExecutionDescription:
871871
"""Get workflow details.
872872
873873
This will get details for :py:attr:`run_id` if present. To use a
@@ -1162,29 +1162,95 @@ async def report_cancellation(self, *details: Any) -> None:
11621162
)
11631163

11641164

1165-
class WorkflowDescription:
1166-
"""Description for a workflow."""
1165+
@dataclass
1166+
class WorkflowExecutionDescription:
1167+
"""Description for a single workflow execution run."""
11671168

1168-
def __init__(
1169-
self,
1170-
raw_message: temporalio.api.workflowservice.v1.DescribeWorkflowExecutionResponse,
1171-
):
1172-
"""Create a workflow description from a describe response."""
1173-
self._raw_message = raw_message
1174-
status = raw_message.workflow_execution_info.status
1175-
self._status = WorkflowExecutionStatus(status) if status else None
1169+
close_time: Optional[datetime]
1170+
"""When the workflow was closed if closed."""
11761171

1177-
@property
1178-
def raw_message(
1179-
self,
1180-
) -> temporalio.api.workflowservice.v1.DescribeWorkflowExecutionResponse:
1181-
"""Underlying workflow description response."""
1182-
return self._raw_message
1172+
execution_time: Optional[datetime]
1173+
"""When this workflow run started or should start."""
11831174

1184-
@property
1185-
def status(self) -> Optional[WorkflowExecutionStatus]:
1186-
"""Status of the workflow."""
1187-
return self._status
1175+
history_length: int
1176+
"""Number of events in the history."""
1177+
1178+
id: str
1179+
"""ID for the workflow."""
1180+
1181+
memo: Mapping[str, Any]
1182+
"""Memo values on the workflow if any."""
1183+
1184+
parent_id: Optional[str]
1185+
"""ID for the parent workflow if this was started as a child."""
1186+
1187+
parent_run_id: Optional[str]
1188+
"""Run ID for the parent workflow if this was started as a child."""
1189+
1190+
raw: temporalio.api.workflowservice.v1.DescribeWorkflowExecutionResponse
1191+
"""Underlying API describe response."""
1192+
1193+
run_id: str
1194+
"""Run ID for this workflow run."""
1195+
1196+
search_attributes: temporalio.common.SearchAttributes
1197+
"""Current set of search attributes if any."""
1198+
1199+
start_time: datetime
1200+
"""When the workflow was created."""
1201+
1202+
status: Optional[WorkflowExecutionStatus]
1203+
"""Status for the workflow."""
1204+
1205+
task_queue: str
1206+
"""Task queue for the workflow."""
1207+
1208+
workflow_type: str
1209+
"""Type name for the workflow."""
1210+
1211+
@staticmethod
1212+
async def from_raw(
1213+
raw: temporalio.api.workflowservice.v1.DescribeWorkflowExecutionResponse,
1214+
converter: temporalio.converter.DataConverter,
1215+
) -> WorkflowExecutionDescription:
1216+
"""Create a description from a raw description response."""
1217+
return WorkflowExecutionDescription(
1218+
close_time=raw.workflow_execution_info.close_time.ToDatetime().replace(
1219+
tzinfo=timezone.utc
1220+
)
1221+
if raw.workflow_execution_info.HasField("close_time")
1222+
else None,
1223+
execution_time=raw.workflow_execution_info.execution_time.ToDatetime().replace(
1224+
tzinfo=timezone.utc
1225+
)
1226+
if raw.workflow_execution_info.HasField("execution_time")
1227+
else None,
1228+
history_length=raw.workflow_execution_info.history_length,
1229+
id=raw.workflow_execution_info.execution.workflow_id,
1230+
memo={
1231+
k: (await converter.decode([v]))[0]
1232+
for k, v in raw.workflow_execution_info.memo.fields.items()
1233+
},
1234+
parent_id=raw.workflow_execution_info.parent_execution.workflow_id
1235+
if raw.workflow_execution_info.HasField("parent_execution")
1236+
else None,
1237+
parent_run_id=raw.workflow_execution_info.parent_execution.run_id
1238+
if raw.workflow_execution_info.HasField("parent_execution")
1239+
else None,
1240+
raw=raw,
1241+
run_id=raw.workflow_execution_info.execution.run_id,
1242+
search_attributes=temporalio.converter.decode_search_attributes(
1243+
raw.workflow_execution_info.search_attributes
1244+
),
1245+
start_time=raw.workflow_execution_info.start_time.ToDatetime().replace(
1246+
tzinfo=timezone.utc
1247+
),
1248+
status=WorkflowExecutionStatus(raw.workflow_execution_info.status)
1249+
if raw.workflow_execution_info.status
1250+
else None,
1251+
task_queue=raw.workflow_execution_info.task_queue,
1252+
workflow_type=raw.workflow_execution_info.type.name,
1253+
)
11881254

11891255

11901256
class WorkflowExecutionStatus(IntEnum):
@@ -1434,7 +1500,7 @@ async def cancel_workflow(self, input: CancelWorkflowInput) -> None:
14341500

14351501
async def describe_workflow(
14361502
self, input: DescribeWorkflowInput
1437-
) -> WorkflowDescription:
1503+
) -> WorkflowExecutionDescription:
14381504
"""Called for every :py:meth:`WorkflowHandle.describe` call."""
14391505

14401506
async def query_workflow(self, input: QueryWorkflowInput) -> Any:
@@ -1522,16 +1588,18 @@ async def start_workflow(
15221588
req.cron_schedule = input.cron_schedule
15231589
if input.memo is not None:
15241590
for k, v in input.memo.items():
1525-
req.memo.fields[k] = (await self._client.data_converter.encode([v]))[0]
1591+
req.memo.fields[k].CopyFrom(
1592+
(await self._client.data_converter.encode([v]))[0]
1593+
)
15261594
if input.search_attributes is not None:
15271595
temporalio.converter.encode_search_attributes(
15281596
input.search_attributes, req.search_attributes
15291597
)
15301598
if input.header is not None:
15311599
for k, v in input.header.items():
1532-
req.header.fields[k] = (await self._client.data_converter.encode([v]))[
1533-
0
1534-
]
1600+
req.header.fields[k].CopyFrom(
1601+
(await self._client.data_converter.encode([v]))[0]
1602+
)
15351603

15361604
# Start with signal or just normal start
15371605
resp: Union[
@@ -1574,8 +1642,8 @@ async def cancel_workflow(self, input: CancelWorkflowInput) -> None:
15741642

15751643
async def describe_workflow(
15761644
self, input: DescribeWorkflowInput
1577-
) -> WorkflowDescription:
1578-
return WorkflowDescription(
1645+
) -> WorkflowExecutionDescription:
1646+
return await WorkflowExecutionDescription.from_raw(
15791647
await self._client.service.describe_workflow_execution(
15801648
temporalio.api.workflowservice.v1.DescribeWorkflowExecutionRequest(
15811649
namespace=self._client.namespace,
@@ -1585,7 +1653,8 @@ async def describe_workflow(
15851653
),
15861654
),
15871655
retry=True,
1588-
)
1656+
),
1657+
self._client.data_converter,
15891658
)
15901659

15911660
async def query_workflow(self, input: QueryWorkflowInput) -> Any:

tests/test_client.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import uuid
2-
from datetime import timedelta
2+
from datetime import datetime, timedelta, timezone
33
from typing import Any, List, Optional, Tuple
44

55
import pytest
@@ -172,14 +172,25 @@ async def test_describe(client: Client, worker: ExternalWorker):
172172
KSWorkflowParams(actions=[KSAction(result=KSResultAction(value="some value"))]),
173173
id=str(uuid.uuid4()),
174174
task_queue=worker.task_queue,
175+
memo={"foo": "bar"},
175176
)
176177
assert "some value" == await handle.result()
177178
desc = await handle.describe()
179+
assert desc.close_time and abs(
180+
desc.close_time - datetime.now(timezone.utc)
181+
) < timedelta(seconds=20)
182+
assert desc.execution_time and abs(
183+
desc.execution_time - datetime.now(timezone.utc)
184+
) < timedelta(seconds=20)
185+
assert desc.id == handle.id
186+
assert desc.memo == {"foo": "bar"}
187+
assert not desc.parent_id
188+
assert not desc.parent_run_id
189+
assert desc.run_id == handle.first_execution_run_id
190+
assert abs(desc.start_time - datetime.now(timezone.utc)) < timedelta(seconds=20)
178191
assert desc.status == WorkflowExecutionStatus.COMPLETED
179-
assert (
180-
desc.raw_message.workflow_execution_info.status
181-
== temporalio.api.enums.v1.WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED
182-
)
192+
assert desc.task_queue == worker.task_queue
193+
assert desc.workflow_type == "kitchen_sink"
183194

184195

185196
async def test_query(client: Client, worker: ExternalWorker):

tests/worker/test_workflow.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1291,11 +1291,10 @@ async def search_attributes_present() -> bool:
12911291

12921292
# Also confirm it matches describe from the server
12931293
desc = await handle.describe()
1294-
attrs = decode_search_attributes(
1295-
desc.raw_message.workflow_execution_info.search_attributes
1296-
)
12971294
# Remove attrs without our prefix
1298-
attrs = {k: v for k, v in attrs.items() if k.startswith(sa_prefix)}
1295+
attrs = {
1296+
k: v for k, v in desc.search_attributes.items() if k.startswith(sa_prefix)
1297+
}
12991298
assert expected == search_attrs_to_dict_with_type(attrs)
13001299

13011300

0 commit comments

Comments
 (0)