Skip to content

Commit 8ffa58e

Browse files
authored
Search attributes (#43)
1 parent 2d4405e commit 8ffa58e

File tree

21 files changed

+610
-424
lines changed

21 files changed

+610
-424
lines changed

.github/workflows/ci.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ jobs:
3535
# Needed for tests since they use external server
3636
- uses: actions/setup-go@v2
3737
with:
38-
go-version: "1.17"
38+
go-version: "1.18"
3939
- run: python -m pip install --upgrade wheel poetry poethepoet
4040
- run: poetry install --no-root
4141
- run: poe lint
@@ -97,7 +97,7 @@ jobs:
9797
# Needed for tests since they use external server
9898
- uses: actions/setup-go@v2
9999
with:
100-
go-version: "1.17"
100+
go-version: "1.18"
101101
- run: python -m pip install --upgrade wheel poetry poethepoet
102102
- run: poetry install --no-root
103103
- run: poetry build

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ These steps can be followed to use with a virtual environment and `pip`:
5757

5858
The SDK is now ready for use. To build from source, see "Building" near the end of this documentation.
5959

60+
**NOTE: This README is for the current branch and not necessarily what's released on `PyPI`.**
61+
6062
### Implementing a Workflow
6163

6264
Create the following script at `run_worker.py`:

scripts/gen_protos.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,9 @@ def fix_generated_output(base_path: Path):
7878
# MyPy protobuf does not document this experimental class, see
7979
# https://github.com/nipunn1313/mypy-protobuf/issues/212#issuecomment-885300106
8080
import_suffix = ""
81-
if stem == "service_pb2_grpc" and message == "WorkflowService":
81+
if stem == "service_pb2_grpc" and (
82+
message == "WorkflowService" or message == "OperatorService"
83+
):
8284
import_suffix = " # type: ignore"
8385
f.write(f"from .{stem} import {message}{import_suffix}\n")
8486
message_names.append(message)

temporalio/api/operatorservice/v1/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
RemoveSearchAttributesRequest,
99
RemoveSearchAttributesResponse,
1010
)
11+
from .service_pb2_grpc import OperatorService # type: ignore
1112
from .service_pb2_grpc import (
12-
OperatorService,
1313
OperatorServiceServicer,
1414
OperatorServiceStub,
1515
add_OperatorServiceServicer_to_server,

temporalio/bridge/worker.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"""Worker using SDK Core."""
22

33
from dataclasses import dataclass
4-
from typing import TYPE_CHECKING, Awaitable, Callable, Iterable, List
4+
from typing import TYPE_CHECKING, Awaitable, Callable, Iterable, List, Mapping
55

66
import google.protobuf.internal.containers
77

@@ -366,3 +366,14 @@ async def encode_completion(
366366
)
367367
for val in command.start_child_workflow_execution.memo.values():
368368
await _encode_bridge_payload(val, codec)
369+
370+
371+
def encode_search_attributes(
372+
attrs: temporalio.common.SearchAttributes,
373+
payloads: Mapping[str, temporalio.bridge.proto.common.Payload],
374+
) -> None:
375+
"""Encode search attributes as bridge payloads."""
376+
for k, vals in attrs.items():
377+
payloads[k].CopyFrom(
378+
to_bridge_payload(temporalio.converter.encode_search_attribute_values(vals))
379+
)

temporalio/client.py

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ async def start_workflow(
209209
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
210210
cron_schedule: str = "",
211211
memo: Optional[Mapping[str, Any]] = None,
212-
search_attributes: Optional[Mapping[str, Any]] = None,
212+
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
213213
header: Optional[Mapping[str, Any]] = None,
214214
start_signal: Optional[str] = None,
215215
start_signal_args: Iterable[Any] = [],
@@ -234,7 +234,7 @@ async def start_workflow(
234234
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
235235
cron_schedule: str = "",
236236
memo: Optional[Mapping[str, Any]] = None,
237-
search_attributes: Optional[Mapping[str, Any]] = None,
237+
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
238238
header: Optional[Mapping[str, Any]] = None,
239239
start_signal: Optional[str] = None,
240240
start_signal_args: Iterable[Any] = [],
@@ -259,7 +259,7 @@ async def start_workflow(
259259
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
260260
cron_schedule: str = "",
261261
memo: Optional[Mapping[str, Any]] = None,
262-
search_attributes: Optional[Mapping[str, Any]] = None,
262+
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
263263
header: Optional[Mapping[str, Any]] = None,
264264
start_signal: Optional[str] = None,
265265
start_signal_args: Iterable[Any] = [],
@@ -283,7 +283,7 @@ async def start_workflow(
283283
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
284284
cron_schedule: str = "",
285285
memo: Optional[Mapping[str, Any]] = None,
286-
search_attributes: Optional[Mapping[str, Any]] = None,
286+
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
287287
header: Optional[Mapping[str, Any]] = None,
288288
start_signal: Optional[str] = None,
289289
start_signal_args: Iterable[Any] = [],
@@ -305,7 +305,7 @@ async def start_workflow(
305305
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
306306
cron_schedule: str = "",
307307
memo: Optional[Mapping[str, Any]] = None,
308-
search_attributes: Optional[Mapping[str, Any]] = None,
308+
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
309309
header: Optional[Mapping[str, Any]] = None,
310310
start_signal: Optional[str] = None,
311311
start_signal_args: Iterable[Any] = [],
@@ -388,7 +388,7 @@ async def execute_workflow(
388388
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
389389
cron_schedule: str = "",
390390
memo: Optional[Mapping[str, Any]] = None,
391-
search_attributes: Optional[Mapping[str, Any]] = None,
391+
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
392392
header: Optional[Mapping[str, Any]] = None,
393393
start_signal: Optional[str] = None,
394394
start_signal_args: Iterable[Any] = [],
@@ -413,7 +413,7 @@ async def execute_workflow(
413413
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
414414
cron_schedule: str = "",
415415
memo: Optional[Mapping[str, Any]] = None,
416-
search_attributes: Optional[Mapping[str, Any]] = None,
416+
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
417417
header: Optional[Mapping[str, Any]] = None,
418418
start_signal: Optional[str] = None,
419419
start_signal_args: Iterable[Any] = [],
@@ -438,7 +438,7 @@ async def execute_workflow(
438438
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
439439
cron_schedule: str = "",
440440
memo: Optional[Mapping[str, Any]] = None,
441-
search_attributes: Optional[Mapping[str, Any]] = None,
441+
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
442442
header: Optional[Mapping[str, Any]] = None,
443443
start_signal: Optional[str] = None,
444444
start_signal_args: Iterable[Any] = [],
@@ -462,7 +462,7 @@ async def execute_workflow(
462462
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
463463
cron_schedule: str = "",
464464
memo: Optional[Mapping[str, Any]] = None,
465-
search_attributes: Optional[Mapping[str, Any]] = None,
465+
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
466466
header: Optional[Mapping[str, Any]] = None,
467467
start_signal: Optional[str] = None,
468468
start_signal_args: Iterable[Any] = [],
@@ -484,7 +484,7 @@ async def execute_workflow(
484484
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
485485
cron_schedule: str = "",
486486
memo: Optional[Mapping[str, Any]] = None,
487-
search_attributes: Optional[Mapping[str, Any]] = None,
487+
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
488488
header: Optional[Mapping[str, Any]] = None,
489489
start_signal: Optional[str] = None,
490490
start_signal_args: Iterable[Any] = [],
@@ -1297,7 +1297,7 @@ class StartWorkflowInput:
12971297
retry_policy: Optional[temporalio.common.RetryPolicy]
12981298
cron_schedule: str
12991299
memo: Optional[Mapping[str, Any]]
1300-
search_attributes: Optional[Mapping[str, Any]]
1300+
search_attributes: Optional[temporalio.common.SearchAttributes]
13011301
header: Optional[Mapping[str, Any]]
13021302
start_signal: Optional[str]
13031303
start_signal_args: Iterable[Any]
@@ -1527,10 +1527,9 @@ async def start_workflow(
15271527
for k, v in input.memo.items():
15281528
req.memo.fields[k] = (await self._client.data_converter.encode([v]))[0]
15291529
if input.search_attributes is not None:
1530-
for k, v in input.search_attributes.items():
1531-
req.search_attributes.indexed_fields[k] = (
1532-
await self._client.data_converter.encode([v])
1533-
)[0]
1530+
temporalio.converter.encode_search_attributes(
1531+
input.search_attributes, req.search_attributes
1532+
)
15341533
if input.header is not None:
15351534
for k, v in input.header.items():
15361535
req.header.fields[k] = (await self._client.data_converter.encode([v]))[

temporalio/common.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@
33
from __future__ import annotations
44

55
from dataclasses import dataclass
6-
from datetime import timedelta
6+
from datetime import datetime, timedelta
77
from enum import IntEnum
8-
from typing import Any, Iterable, Optional
8+
from typing import Any, Iterable, List, Mapping, Optional, Union
9+
10+
from typing_extensions import TypeAlias
911

1012
import temporalio.api.common.v1
1113
import temporalio.api.enums.v1
@@ -128,6 +130,14 @@ class QueryRejectCondition(IntEnum):
128130
"""See :py:attr:`temporalio.api.enums.v1.QueryRejectCondition.QUERY_REJECT_CONDITION_NOT_COMPLETED_CLEANLY`."""
129131

130132

133+
SearchAttributeValue: TypeAlias = Union[str, int, float, bool, datetime]
134+
135+
# We choose to make this a list instead of an iterable so we can catch if people
136+
# are not sending lists each time but maybe accidentally sending a string (which
137+
# is iterable)
138+
SearchAttributes: TypeAlias = Mapping[str, List[SearchAttributeValue]]
139+
140+
131141
# Should be set as the "arg" argument for _arg_or_args checks where the argument
132142
# is unset. This is different than None which is a legitimate argument.
133143
_arg_unset = object()

temporalio/converter.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import sys
99
from abc import ABC, abstractmethod
1010
from dataclasses import dataclass
11+
from datetime import datetime
1112
from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Tuple, Type
1213

1314
import dacite
@@ -16,6 +17,7 @@
1617
import google.protobuf.symbol_database
1718

1819
import temporalio.api.common.v1
20+
import temporalio.common
1921

2022

2123
class PayloadConverter(ABC):
@@ -594,6 +596,80 @@ def default() -> DataConverter:
594596
return _default
595597

596598

599+
def encode_search_attributes(
600+
attrs: temporalio.common.SearchAttributes,
601+
api: temporalio.api.common.v1.SearchAttributes,
602+
) -> None:
603+
"""Convert search attributes into an API message.
604+
605+
Args:
606+
attrs: Search attributes to convert.
607+
api: API message to set converted attributes on.
608+
"""
609+
for k, v in attrs.items():
610+
api.indexed_fields[k].CopyFrom(encode_search_attribute_values(v))
611+
612+
613+
def encode_search_attribute_values(
614+
vals: List[temporalio.common.SearchAttributeValue],
615+
) -> temporalio.api.common.v1.Payload:
616+
"""Convert search attribute values into a payload.
617+
618+
Args:
619+
vals: List of values to convert.
620+
"""
621+
if not isinstance(vals, list):
622+
raise TypeError("Search attribute values must be lists")
623+
# Confirm all types are the same
624+
val_type: Optional[Type] = None
625+
# Convert dates to strings
626+
safe_vals = []
627+
for v in vals:
628+
if isinstance(v, datetime):
629+
if v.tzinfo is None:
630+
raise ValueError(
631+
"Timezone must be present on all search attribute dates"
632+
)
633+
v = v.isoformat()
634+
elif not isinstance(v, (str, int, float, bool)):
635+
raise TypeError(
636+
f"Search attribute value of type {type(v).__name__} not one of str, int, float, bool, or datetime"
637+
)
638+
elif val_type and type(v) is not val_type:
639+
raise TypeError(
640+
f"Search attribute values must have the same type for the same key"
641+
)
642+
elif not val_type:
643+
val_type = type(v)
644+
safe_vals.append(v)
645+
return default().payload_converter.to_payloads([safe_vals])[0]
646+
647+
648+
def decode_search_attributes(
649+
api: temporalio.api.common.v1.SearchAttributes,
650+
) -> temporalio.common.SearchAttributes:
651+
"""Decode API search attributes to values.
652+
653+
Args:
654+
api: API message with search attribute values to convert.
655+
656+
Returns:
657+
Converted search attribute values.
658+
"""
659+
conv = default().payload_converter
660+
ret = {}
661+
for k, v in api.indexed_fields.items():
662+
val = conv.from_payloads([v])[0]
663+
# If a value did not come back as a list, make it a single-item list
664+
if not isinstance(val, list):
665+
val = [val]
666+
# Convert each item to datetime if necessary
667+
if v.metadata.get("type") == b"Datetime":
668+
val = [datetime.fromisoformat(v) for v in val]
669+
ret[k] = val
670+
return ret
671+
672+
597673
class _FunctionTypeLookup:
598674
def __init__(self, type_hint_eval_str: bool) -> None:
599675
# Keyed by callable __qualname__, value is optional arg types and

temporalio/worker/interceptor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ class ContinueAsNewInput:
126126
run_timeout: Optional[timedelta]
127127
task_timeout: Optional[timedelta]
128128
memo: Optional[Mapping[str, Any]]
129-
search_attributes: Optional[Mapping[str, Any]]
129+
search_attributes: Optional[temporalio.common.SearchAttributes]
130130
# The types may be absent
131131
arg_types: Optional[List[Type]]
132132

@@ -203,7 +203,7 @@ class StartChildWorkflowInput:
203203
retry_policy: Optional[temporalio.common.RetryPolicy]
204204
cron_schedule: str
205205
memo: Optional[Mapping[str, Any]]
206-
search_attributes: Optional[Mapping[str, Any]]
206+
search_attributes: Optional[temporalio.common.SearchAttributes]
207207
# The types may be absent
208208
arg_types: Optional[List[Type]]
209209
ret_type: Optional[Type]

temporalio/worker/workflow.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,9 @@ async def _create_workflow_instance(
263263
run_timeout=start.workflow_run_timeout.ToTimedelta()
264264
if start.HasField("workflow_run_timeout")
265265
else None,
266+
search_attributes=temporalio.converter.decode_search_attributes(
267+
start.search_attributes
268+
),
266269
start_time=act.timestamp.ToDatetime().replace(tzinfo=timezone.utc),
267270
task_queue=self._task_queue,
268271
task_timeout=start.workflow_task_timeout.ToTimedelta(),

0 commit comments

Comments
 (0)