From b77fe7e91ce70b2dbfbd4958d720f4c1049473d0 Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Thu, 3 Jul 2025 10:43:01 -0700 Subject: [PATCH 1/9] Adding encoding of payload headers, currently defaults to true --- temporalio/bridge/worker.py | 63 ++++++++++ temporalio/client.py | 58 +++++++-- temporalio/worker/_activity.py | 11 ++ temporalio/worker/_replayer.py | 4 + temporalio/worker/_worker.py | 2 + temporalio/worker/_workflow.py | 10 +- temporalio/worker/_workflow_instance.py | 1 + tests/worker/test_workflow.py | 156 ++++++++++++++++++++++++ 8 files changed, 294 insertions(+), 11 deletions(-) diff --git a/temporalio/bridge/worker.py b/temporalio/bridge/worker.py index 74cf55bfd..09da7181b 100644 --- a/temporalio/bridge/worker.py +++ b/temporalio/bridge/worker.py @@ -271,6 +271,41 @@ async def finalize_shutdown(self) -> None: ) +async def _apply_to_headers( + headers: google.protobuf.internal.containers.MessageMap[ + str, temporalio.api.common.v1.Payload + ], + cb: Callable[ + [Sequence[temporalio.api.common.v1.Payload]], + Awaitable[List[temporalio.api.common.v1.Payload]], + ], +) -> None: + """Apply API payload callback to headers.""" + for payload in headers.values(): + new_payload = (await cb([payload]))[0] + payload.CopyFrom(new_payload) + + +async def _decode_headers( + headers: google.protobuf.internal.containers.MessageMap[ + str, temporalio.api.common.v1.Payload + ], + codec: temporalio.converter.PayloadCodec, +) -> None: + """Decode headers with the given codec.""" + return await _apply_to_headers(headers, codec.decode) + + +async def _encode_headers( + headers: google.protobuf.internal.containers.MessageMap[ + str, temporalio.api.common.v1.Payload + ], + codec: temporalio.converter.PayloadCodec, +) -> None: + """Encode headers with the given codec.""" + return await _apply_to_headers(headers, codec.encode) + + async def _apply_to_payloads( payloads: PayloadContainer, cb: Callable[ @@ -336,11 +371,14 @@ async def _encode_payload( async def decode_activation( act: temporalio.bridge.proto.workflow_activation.WorkflowActivation, codec: temporalio.converter.PayloadCodec, + decode_headers: bool, ) -> None: """Decode the given activation with the codec.""" for job in act.jobs: if job.HasField("query_workflow"): await _decode_payloads(job.query_workflow.arguments, codec) + if decode_headers: + await _decode_headers(job.query_workflow.headers, codec) elif job.HasField("resolve_activity"): if job.resolve_activity.result.HasField("cancelled"): await codec.decode_failure( @@ -385,8 +423,12 @@ async def decode_activation( await codec.decode_failure(job.resolve_signal_external_workflow.failure) elif job.HasField("signal_workflow"): await _decode_payloads(job.signal_workflow.input, codec) + if decode_headers: + await _decode_headers(job.signal_workflow.headers, codec) elif job.HasField("initialize_workflow"): await _decode_payloads(job.initialize_workflow.arguments, codec) + if decode_headers: + await _decode_headers(job.initialize_workflow.headers, codec) if job.initialize_workflow.HasField("continued_failure"): await codec.decode_failure(job.initialize_workflow.continued_failure) for val in job.initialize_workflow.memo.fields.values(): @@ -400,11 +442,14 @@ async def decode_activation( val.data = new_payload.data elif job.HasField("do_update"): await _decode_payloads(job.do_update.input, codec) + if decode_headers: + await _decode_headers(job.do_update.headers, codec) async def encode_completion( comp: temporalio.bridge.proto.workflow_completion.WorkflowActivationCompletion, codec: temporalio.converter.PayloadCodec, + encode_headers: bool, ) -> None: """Recursively encode the given completion with the codec.""" if comp.HasField("failed"): @@ -420,6 +465,10 @@ async def encode_completion( await _encode_payloads( command.continue_as_new_workflow_execution.arguments, codec ) + if encode_headers: + await _encode_headers( + command.continue_as_new_workflow_execution.headers, codec + ) for val in command.continue_as_new_workflow_execution.memo.values(): await _encode_payload(val, codec) elif command.HasField("fail_workflow_execution"): @@ -435,16 +484,30 @@ async def encode_completion( ) elif command.HasField("schedule_activity"): await _encode_payloads(command.schedule_activity.arguments, codec) + if encode_headers: + await _encode_headers(command.schedule_activity.headers, codec) elif command.HasField("schedule_local_activity"): await _encode_payloads(command.schedule_local_activity.arguments, codec) + if encode_headers: + await _encode_headers( + command.schedule_local_activity.headers, codec + ) elif command.HasField("signal_external_workflow_execution"): await _encode_payloads( command.signal_external_workflow_execution.args, codec ) + if encode_headers: + await _encode_headers( + command.signal_external_workflow_execution.headers, codec + ) elif command.HasField("start_child_workflow_execution"): await _encode_payloads( command.start_child_workflow_execution.input, codec ) + if encode_headers: + await _encode_headers( + command.start_child_workflow_execution.headers, codec + ) for val in command.start_child_workflow_execution.memo.values(): await _encode_payload(val, codec) elif command.HasField("update_response"): diff --git a/temporalio/client.py b/temporalio/client.py index f46297eb9..8afbe6845 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -27,6 +27,7 @@ Mapping, Optional, Sequence, + Text, Tuple, Type, Union, @@ -37,6 +38,7 @@ import google.protobuf.duration_pb2 import google.protobuf.json_format import google.protobuf.timestamp_pb2 +from google.protobuf.internal.containers import MessageMap from typing_extensions import Concatenate, Required, TypedDict import temporalio.api.common.v1 @@ -191,6 +193,7 @@ def __init__( default_workflow_query_reject_condition: Optional[ temporalio.common.QueryRejectCondition ] = None, + encode_headers: bool = True, ): """Create a Temporal client from a service client. @@ -208,6 +211,7 @@ def __init__( data_converter=data_converter, interceptors=interceptors, default_workflow_query_reject_condition=default_workflow_query_reject_condition, + encode_headers=encode_headers, ) def config(self) -> ClientConfig: @@ -1491,6 +1495,7 @@ class ClientConfig(TypedDict, total=False): default_workflow_query_reject_condition: Required[ Optional[temporalio.common.QueryRejectCondition] ] + encode_headers: Required[bool] class WorkflowHistoryEventFilterType(IntEnum): @@ -3849,6 +3854,10 @@ class ScheduleActionStartWorkflow(ScheduleAction): priority: temporalio.common.Priority headers: Optional[Mapping[str, temporalio.api.common.v1.Payload]] + """ + Headers may still be encoded by the payload codec if present. + """ + _from_raw: bool @staticmethod def _from_proto( # pyright: ignore @@ -3975,6 +3984,7 @@ def __init__( """ super().__init__() if raw_info: + self._from_raw = True # Ignore other fields self.workflow = raw_info.workflow_type.name self.args = raw_info.input.payloads if raw_info.input else [] @@ -4034,6 +4044,7 @@ def __init__( else temporalio.common.Priority.default ) else: + self._from_raw = False if not id: raise ValueError("ID required") if not task_queue: @@ -4057,7 +4068,7 @@ def __init__( self.memo = memo self.typed_search_attributes = typed_search_attributes self.untyped_search_attributes = untyped_search_attributes - self.headers = headers + self.headers = headers # encode here self.static_summary = static_summary self.static_details = static_details self.priority = priority @@ -4134,9 +4145,13 @@ async def _to_proto( temporalio.converter.encode_search_attributes( self.typed_search_attributes, action.start_workflow.search_attributes ) + print("Schedule Headers:", self.headers) if self.headers: - temporalio.common._apply_headers( - self.headers, action.start_workflow.header.fields + await _apply_headers( + self.headers, + action.start_workflow.header.fields, + client.config()["encode_headers"] and not self._from_raw, + client.data_converter.payload_codec, ) return action @@ -5884,7 +5899,7 @@ async def _populate_start_workflow_execution_request( if input.start_delay is not None: req.workflow_start_delay.FromTimedelta(input.start_delay) if input.headers is not None: - temporalio.common._apply_headers(input.headers, req.header.fields) + await self._apply_headers(input.headers, req.header.fields) if input.priority is not None: req.priority.CopyFrom(input.priority._to_proto()) if input.versioning_override is not None: @@ -5970,7 +5985,7 @@ async def query_workflow(self, input: QueryWorkflowInput) -> Any: await self._client.data_converter.encode(input.args) ) if input.headers is not None: - temporalio.common._apply_headers(input.headers, req.query.header.fields) + await self._apply_headers(input.headers, req.query.header.fields) try: resp = await self._client.workflow_service.query_workflow( req, retry=True, metadata=input.rpc_metadata, timeout=input.rpc_timeout @@ -6016,7 +6031,7 @@ async def signal_workflow(self, input: SignalWorkflowInput) -> None: await self._client.data_converter.encode(input.args) ) if input.headers is not None: - temporalio.common._apply_headers(input.headers, req.header.fields) + await self._apply_headers(input.headers, req.header.fields) await self._client.workflow_service.signal_workflow_execution( req, retry=True, metadata=input.rpc_metadata, timeout=input.rpc_timeout ) @@ -6127,9 +6142,7 @@ async def _build_update_workflow_execution_request( await self._client.data_converter.encode(input.args) ) if input.headers is not None: - temporalio.common._apply_headers( - input.headers, req.request.input.header.fields - ) + await self._apply_headers(input.headers, req.request.input.header.fields) return req async def start_update_with_start_workflow( @@ -6685,6 +6698,33 @@ async def get_worker_task_reachability( ) return WorkerTaskReachability._from_proto(resp) + async def _apply_headers( + self, + source: Optional[Mapping[str, temporalio.api.common.v1.Payload]], + dest: MessageMap[Text, temporalio.api.common.v1.Payload], + ) -> None: + await _apply_headers( + source, + dest, + self._client.config()["encode_headers"], + self._client.data_converter.payload_codec, + ) + + +async def _apply_headers( + source: Optional[Mapping[str, temporalio.api.common.v1.Payload]], + dest: MessageMap[Text, temporalio.api.common.v1.Payload], + encode_headers: bool, + codec: Optional[temporalio.converter.PayloadCodec], +) -> None: + if source is None: + return + if encode_headers and codec is not None: + for payload in source.values(): + new_payload = (await codec.encode([payload]))[0] + payload.CopyFrom(new_payload) + temporalio.common._apply_headers(source, dest) + def _history_from_json( history: Union[str, Dict[str, Any]], diff --git a/temporalio/worker/_activity.py b/temporalio/worker/_activity.py index fe18d1f18..68dc3f6c2 100644 --- a/temporalio/worker/_activity.py +++ b/temporalio/worker/_activity.py @@ -69,6 +69,7 @@ def __init__( data_converter: temporalio.converter.DataConverter, interceptors: Sequence[Interceptor], metric_meter: temporalio.common.MetricMeter, + encode_headers: bool, ) -> None: self._bridge_worker = bridge_worker self._task_queue = task_queue @@ -78,6 +79,7 @@ def __init__( self._data_converter = data_converter self._interceptors = interceptors self._metric_meter = metric_meter + self._encode_headers = encode_headers self._fail_worker_exception_queue: asyncio.Queue[Exception] = asyncio.Queue() # Lazily created on first activity self._worker_shutdown_event: Optional[temporalio.activity._CompositeEvent] = ( @@ -542,6 +544,15 @@ async def _execute_activity( workflow_type=start.workflow_type, priority=temporalio.common.Priority._from_proto(start.priority), ) + + if self._encode_headers and self._data_converter.payload_codec is not None: + logger.info("headers: %s", start.header_fields) + for payload in start.header_fields.values(): + new_payload = ( + await self._data_converter.payload_codec.decode([payload]) + )[0] + payload.CopyFrom(new_payload) + running_activity.info = info input = ExecuteActivityInput( fn=activity_def.fn, diff --git a/temporalio/worker/_replayer.py b/temporalio/worker/_replayer.py index 7600118d6..54a2dd90e 100644 --- a/temporalio/worker/_replayer.py +++ b/temporalio/worker/_replayer.py @@ -47,6 +47,7 @@ def __init__( debug_mode: bool = False, runtime: Optional[temporalio.runtime.Runtime] = None, disable_safe_workflow_eviction: bool = False, + encode_headers: bool = True, ) -> None: """Create a replayer to replay workflows from history. @@ -78,6 +79,7 @@ def __init__( debug_mode=debug_mode, runtime=runtime, disable_safe_workflow_eviction=disable_safe_workflow_eviction, + encode_headers=encode_headers, ) def config(self) -> ReplayerConfig: @@ -217,6 +219,7 @@ def on_eviction_hook( disable_safe_eviction=self._config["disable_safe_workflow_eviction"], should_enforce_versioning_behavior=False, assert_local_activity_valid=lambda a: None, + encode_headers=self._config["encode_headers"], ) # Create bridge worker bridge_worker, pusher = temporalio.bridge.worker.Worker.for_replay( @@ -335,6 +338,7 @@ class ReplayerConfig(TypedDict, total=False): debug_mode: bool runtime: Optional[temporalio.runtime.Runtime] disable_safe_workflow_eviction: bool + encode_headers: bool @dataclass(frozen=True) diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index 4793f675e..87313e959 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -392,6 +392,7 @@ def __init__( data_converter=client_config["data_converter"], interceptors=interceptors, metric_meter=self._runtime.metric_meter, + encode_headers=client_config["encode_headers"], ) self._workflow_worker: Optional[_WorkflowWorker] = None if workflows: @@ -429,6 +430,7 @@ def check_activity(activity): disable_safe_eviction=disable_safe_workflow_eviction, should_enforce_versioning_behavior=should_enforce_versioning_behavior, assert_local_activity_valid=check_activity, + encode_headers=client_config["encode_headers"], ) if tuner is not None: diff --git a/temporalio/worker/_workflow.py b/temporalio/worker/_workflow.py index 35e7f0578..4f42ca4ae 100644 --- a/temporalio/worker/_workflow.py +++ b/temporalio/worker/_workflow.py @@ -79,6 +79,7 @@ def __init__( disable_safe_eviction: bool, should_enforce_versioning_behavior: bool, assert_local_activity_valid: Callable[[str], None], + encode_headers: bool, ) -> None: self._bridge_worker = bridge_worker self._namespace = namespace @@ -116,6 +117,7 @@ def __init__( self._disable_eager_activity_execution = disable_eager_activity_execution self._on_eviction_hook = on_eviction_hook self._disable_safe_eviction = disable_safe_eviction + self._encode_headers = encode_headers self._throw_after_activation: Optional[Exception] = None # If there's a debug mode or a truthy TEMPORAL_DEBUG env var, disable @@ -255,7 +257,9 @@ async def _handle_activation( # Decode the activation if there's a codec and not cache remove job if self._data_converter.payload_codec: await temporalio.bridge.worker.decode_activation( - act, self._data_converter.payload_codec + act, + self._data_converter.payload_codec, + decode_headers=self._encode_headers, ) if LOG_PROTOS: @@ -342,7 +346,9 @@ async def _handle_activation( if self._data_converter.payload_codec: try: await temporalio.bridge.worker.encode_completion( - completion, self._data_converter.payload_codec + completion, + self._data_converter.payload_codec, + encode_headers=self._encode_headers, ) except Exception as err: logger.exception( diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index d9de1ba40..ace29c2e6 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -2595,6 +2595,7 @@ def _apply_schedule_command( v.seq = self._seq v.activity_id = self._input.activity_id or str(self._seq) v.activity_type = self._input.activity + logger.info("Headers in workflow %s:", self._input.headers) if self._input.headers: temporalio.common._apply_headers(self._input.headers, v.headers) if payloads: diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 0b3574563..d130653de 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -58,8 +58,12 @@ from temporalio.client import ( AsyncActivityCancelledError, Client, + CreateScheduleInput, RPCError, RPCStatusCode, + ScheduleActionStartWorkflow, + ScheduleHandle, + SignalWorkflowInput, WorkflowExecutionStatus, WorkflowFailureError, WorkflowHandle, @@ -110,6 +114,8 @@ from temporalio.service import __version__ from temporalio.testing import WorkflowEnvironment from temporalio.worker import ( + ExecuteWorkflowInput, + HandleSignalInput, UnsandboxedWorkflowRunner, Worker, WorkflowInstance, @@ -1518,6 +1524,7 @@ async def test_workflow_with_codec(client: Client, env: WorkflowEnvironment): await test_workflow_signal_and_query_errors(client) await test_workflow_simple_activity(client) await test_workflow_update_handlers_happy(client) + assert False class PassThroughCodec(PayloadCodec): @@ -8117,3 +8124,152 @@ async def test_signal_handler_in_interceptor(client: Client): id=f"workflow-{uuid.uuid4()}", task_queue=worker.task_queue, ) + + +class HeaderWorkerInterceptor(temporalio.worker.Interceptor): + def intercept_activity( + self, next: temporalio.worker.ActivityInboundInterceptor + ) -> temporalio.worker.ActivityInboundInterceptor: + return HeaderActivityInboundInterceptor(super().intercept_activity(next)) + + def workflow_interceptor_class( + self, input: temporalio.worker.WorkflowInterceptorClassInput + ) -> Optional[Type[temporalio.worker.WorkflowInboundInterceptor]]: + return HeaderWorkflowInboundInterceptor + + +class HeaderActivityInboundInterceptor(temporalio.worker.ActivityInboundInterceptor): + def init(self, outbound: temporalio.worker.ActivityOutboundInterceptor) -> None: + super().init(HeaderActivityOutboundInterceptor(outbound)) + + async def execute_activity( + self, input: temporalio.worker.ExecuteActivityInput + ) -> Any: + # Header should be decoded + assert input.headers["foo"].data == b"bar" + return await super().execute_activity(input) + + +class HeaderActivityOutboundInterceptor(temporalio.worker.ActivityOutboundInterceptor): + def info(self) -> activity.Info: + return super().info() + + def heartbeat(self, *details: Any) -> None: + super().heartbeat(*details) + + +class HeaderWorkflowInboundInterceptor(temporalio.worker.WorkflowInboundInterceptor): + def init(self, outbound: temporalio.worker.WorkflowOutboundInterceptor) -> None: + super().init(HeaderWorkflowOutboundInterceptor(outbound)) + + async def handle_signal(self, input: HandleSignalInput) -> None: + assert input.headers["foo"].data == b"bar" + await super().handle_signal(input) + + async def execute_workflow(self, input: ExecuteWorkflowInput) -> Any: + assert input.headers["foo"].data == b"bar" + return await super().execute_workflow(input) + + +class HeaderWorkflowOutboundInterceptor(temporalio.worker.WorkflowOutboundInterceptor): + def start_activity( + self, input: temporalio.worker.StartActivityInput + ) -> workflow.ActivityHandle: + # Add a header to the outbound activity call + input.headers = {"foo": Payload(data=b"bar")} + return super().start_activity(input) + + +class HeaderClientInterceptor(temporalio.client.Interceptor): + def intercept_client( + self, next: temporalio.client.OutboundInterceptor + ) -> temporalio.client.OutboundInterceptor: + return HeaderClientOutboundInterceptor(super().intercept_client(next)) + + +class HeaderClientOutboundInterceptor(temporalio.client.OutboundInterceptor): + def __init__(self, next: temporalio.client.OutboundInterceptor) -> None: + super().__init__(next) + + async def start_workflow( + self, input: temporalio.client.StartWorkflowInput + ) -> WorkflowHandle[Any, Any]: + input.headers = {"foo": Payload(data=b"bar")} + return await super().start_workflow(input) + + async def signal_workflow(self, input: SignalWorkflowInput) -> None: + input.headers = {"foo": Payload(data=b"bar")} + return await super().signal_workflow(input) + + async def create_schedule(self, input: CreateScheduleInput) -> ScheduleHandle: + cast(ScheduleActionStartWorkflow, input.schedule.action).headers = { + "foo": Payload(data=b"bar") + } + return await super().create_schedule(input) + + +async def test_workflow_headers_with_codec(client: Client, env: WorkflowEnvironment): + # Make client with this codec and run a couple of existing tests + config = client.config() + config["data_converter"] = DataConverter(payload_codec=SimpleCodec()) + config["interceptors"] = [HeaderClientInterceptor()] + client = Client(**config) + + async with new_worker( + client, + SimpleActivityWorkflow, + SignalAndQueryWorkflow, + activities=[say_hello], + interceptors=[HeaderWorkerInterceptor()], + ) as worker: + workflow_handle = await client.start_workflow( + SimpleActivityWorkflow.run, + "Temporal", + id=f"workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + assert await workflow_handle.result() == "Hello, Temporal!" + + async for e in workflow_handle.fetch_history_events(): + if e.HasField("activity_task_scheduled_event_attributes"): + header = e.activity_task_scheduled_event_attributes.header.fields["foo"] + assert "simple-codec" in header.metadata + + handle = await client.start_workflow( + SignalAndQueryWorkflow.run, + id=f"workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + + # Simple signals and queries + await handle.signal(SignalAndQueryWorkflow.signal1, "some arg") + assert "signal1: some arg" == await handle.query( + SignalAndQueryWorkflow.last_event + ) + + async for e in handle.fetch_history_events(): + if e.HasField("workflow_execution_signaled_event_attributes"): + header = e.workflow_execution_signaled_event_attributes.header.fields[ + "foo" + ] + assert "simple-codec" in header.metadata + + schedule_handle = await client.create_schedule( + f"schedule-{uuid.uuid4()}", + temporalio.client.Schedule( + action=temporalio.client.ScheduleActionStartWorkflow( + "SimpleActivityWorkflow", + id=f"workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + ), + spec=temporalio.client.ScheduleSpec( + calendars=[temporalio.client.ScheduleCalendarSpec()] + ), + state=temporalio.client.ScheduleState(paused=True), + ), + ) + description = await schedule_handle.describe() + + # Header payload is still encoded due to limitations + headers = cast(ScheduleActionStartWorkflow, description.schedule.action).headers + assert headers is not None and not headers["foo"].data == b"bar" From 9cc6cf164d5502b6d543a41e93d1c4adf36c9b82 Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Mon, 7 Jul 2025 10:01:49 -0700 Subject: [PATCH 2/9] Switching to enum for header codec behavior --- temporalio/client.py | 12 ++--- temporalio/common.py | 6 +++ temporalio/worker/_worker.py | 6 +-- temporalio/worker/_workflow_instance.py | 1 - tests/worker/test_workflow.py | 62 +++++++++++++++---------- 5 files changed, 52 insertions(+), 35 deletions(-) diff --git a/temporalio/client.py b/temporalio/client.py index 8afbe6845..92d8ebf6c 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -67,6 +67,7 @@ RPCStatusCode, TLSConfig, ) +from .common import HeaderCodecBehavior from .types import ( AnyType, @@ -193,7 +194,7 @@ def __init__( default_workflow_query_reject_condition: Optional[ temporalio.common.QueryRejectCondition ] = None, - encode_headers: bool = True, + header_codec_behavior: HeaderCodecBehavior = HeaderCodecBehavior.NO_CODEC, ): """Create a Temporal client from a service client. @@ -211,7 +212,7 @@ def __init__( data_converter=data_converter, interceptors=interceptors, default_workflow_query_reject_condition=default_workflow_query_reject_condition, - encode_headers=encode_headers, + header_codec_behavior=header_codec_behavior, ) def config(self) -> ClientConfig: @@ -1495,7 +1496,7 @@ class ClientConfig(TypedDict, total=False): default_workflow_query_reject_condition: Required[ Optional[temporalio.common.QueryRejectCondition] ] - encode_headers: Required[bool] + header_codec_behavior: Required[HeaderCodecBehavior] class WorkflowHistoryEventFilterType(IntEnum): @@ -4145,12 +4146,11 @@ async def _to_proto( temporalio.converter.encode_search_attributes( self.typed_search_attributes, action.start_workflow.search_attributes ) - print("Schedule Headers:", self.headers) if self.headers: await _apply_headers( self.headers, action.start_workflow.header.fields, - client.config()["encode_headers"] and not self._from_raw, + client.config()["header_codec_behavior"] == HeaderCodecBehavior.CODEC and not self._from_raw, client.data_converter.payload_codec, ) return action @@ -6706,7 +6706,7 @@ async def _apply_headers( await _apply_headers( source, dest, - self._client.config()["encode_headers"], + self._client.config()["header_codec_behavior"] == HeaderCodecBehavior.CODEC, self._client.data_converter.payload_codec, ) diff --git a/temporalio/common.py b/temporalio/common.py index 3349f70e9..230d7a128 100644 --- a/temporalio/common.py +++ b/temporalio/common.py @@ -1230,3 +1230,9 @@ def _type_hints_from_func( # necessarily args.append(arg_hint) # type: ignore return args, ret + + +class HeaderCodecBehavior(IntEnum): + NO_CODEC = 1 + CODEC = 2 + WORKFLOW_ONLY_CODEC = 3 diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index 87313e959..767864421 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -37,7 +37,7 @@ import temporalio.exceptions import temporalio.runtime import temporalio.service -from temporalio.common import VersioningBehavior, WorkerDeploymentVersion +from temporalio.common import VersioningBehavior, WorkerDeploymentVersion, HeaderCodecBehavior from ._activity import SharedStateManager, _ActivityWorker from ._interceptor import Interceptor @@ -392,7 +392,7 @@ def __init__( data_converter=client_config["data_converter"], interceptors=interceptors, metric_meter=self._runtime.metric_meter, - encode_headers=client_config["encode_headers"], + encode_headers=client_config["header_codec_behavior"] == HeaderCodecBehavior.CODEC, ) self._workflow_worker: Optional[_WorkflowWorker] = None if workflows: @@ -430,7 +430,7 @@ def check_activity(activity): disable_safe_eviction=disable_safe_workflow_eviction, should_enforce_versioning_behavior=should_enforce_versioning_behavior, assert_local_activity_valid=check_activity, - encode_headers=client_config["encode_headers"], + encode_headers=client_config["header_codec_behavior"] != HeaderCodecBehavior.NO_CODEC, ) if tuner is not None: diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index ace29c2e6..d9de1ba40 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -2595,7 +2595,6 @@ def _apply_schedule_command( v.seq = self._seq v.activity_id = self._input.activity_id or str(self._seq) v.activity_type = self._input.activity - logger.info("Headers in workflow %s:", self._input.headers) if self._input.headers: temporalio.common._apply_headers(self._input.headers, v.headers) if payloads: diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index d130653de..3f51af06f 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -82,7 +82,7 @@ SearchAttributes, SearchAttributeValues, TypedSearchAttributes, - WorkflowIDConflictPolicy, + WorkflowIDConflictPolicy, HeaderCodecBehavior, ) from temporalio.converter import ( DataConverter, @@ -1524,7 +1524,6 @@ async def test_workflow_with_codec(client: Client, env: WorkflowEnvironment): await test_workflow_signal_and_query_errors(client) await test_workflow_simple_activity(client) await test_workflow_update_handlers_happy(client) - assert False class PassThroughCodec(PayloadCodec): @@ -8137,25 +8136,18 @@ def workflow_interceptor_class( ) -> Optional[Type[temporalio.worker.WorkflowInboundInterceptor]]: return HeaderWorkflowInboundInterceptor +global_header_codec_behavior: HeaderCodecBehavior class HeaderActivityInboundInterceptor(temporalio.worker.ActivityInboundInterceptor): - def init(self, outbound: temporalio.worker.ActivityOutboundInterceptor) -> None: - super().init(HeaderActivityOutboundInterceptor(outbound)) - async def execute_activity( self, input: temporalio.worker.ExecuteActivityInput ) -> Any: - # Header should be decoded - assert input.headers["foo"].data == b"bar" - return await super().execute_activity(input) - - -class HeaderActivityOutboundInterceptor(temporalio.worker.ActivityOutboundInterceptor): - def info(self) -> activity.Info: - return super().info() + if global_header_codec_behavior == HeaderCodecBehavior.WORKFLOW_ONLY_CODEC: + assert input.headers["foo"].data == b"\n\x05\x12\x03bar" + else: + assert input.headers["foo"].data == b"bar" - def heartbeat(self, *details: Any) -> None: - super().heartbeat(*details) + return await super().execute_activity(input) class HeaderWorkflowInboundInterceptor(temporalio.worker.WorkflowInboundInterceptor): @@ -8181,40 +8173,54 @@ def start_activity( class HeaderClientInterceptor(temporalio.client.Interceptor): + def __init__(self, header: Payload): + self.header = header + super().__init__() + def intercept_client( self, next: temporalio.client.OutboundInterceptor ) -> temporalio.client.OutboundInterceptor: - return HeaderClientOutboundInterceptor(super().intercept_client(next)) + return HeaderClientOutboundInterceptor(super().intercept_client(next), self.header) class HeaderClientOutboundInterceptor(temporalio.client.OutboundInterceptor): - def __init__(self, next: temporalio.client.OutboundInterceptor) -> None: + def __init__(self, next: temporalio.client.OutboundInterceptor, header: Payload) -> None: + self.header = header super().__init__(next) async def start_workflow( self, input: temporalio.client.StartWorkflowInput ) -> WorkflowHandle[Any, Any]: - input.headers = {"foo": Payload(data=b"bar")} + input.headers = {"foo": self.header.__deepcopy__()} return await super().start_workflow(input) async def signal_workflow(self, input: SignalWorkflowInput) -> None: - input.headers = {"foo": Payload(data=b"bar")} + input.headers = {"foo": self.header.__deepcopy__()} return await super().signal_workflow(input) async def create_schedule(self, input: CreateScheduleInput) -> ScheduleHandle: cast(ScheduleActionStartWorkflow, input.schedule.action).headers = { - "foo": Payload(data=b"bar") + "foo": self.header.__deepcopy__() } return await super().create_schedule(input) -async def test_workflow_headers_with_codec(client: Client, env: WorkflowEnvironment): +@pytest.mark.parametrize("header_codec_behavior", [HeaderCodecBehavior.NO_CODEC, HeaderCodecBehavior.CODEC, HeaderCodecBehavior.WORKFLOW_ONLY_CODEC]) +async def test_workflow_headers_with_codec(client: Client, header_codec_behavior: HeaderCodecBehavior): + header_payload = Payload(data=b"bar") + if header_codec_behavior == HeaderCodecBehavior.WORKFLOW_ONLY_CODEC: + header_payload = (await SimpleCodec().encode([header_payload]))[0] + # Make client with this codec and run a couple of existing tests config = client.config() config["data_converter"] = DataConverter(payload_codec=SimpleCodec()) - config["interceptors"] = [HeaderClientInterceptor()] + config["interceptors"] = [HeaderClientInterceptor(header_payload)] + config["header_codec_behavior"] = header_codec_behavior client = Client(**config) + global global_header_codec_behavior + global_header_codec_behavior = header_codec_behavior + async with new_worker( client, SimpleActivityWorkflow, @@ -8233,7 +8239,8 @@ async def test_workflow_headers_with_codec(client: Client, env: WorkflowEnvironm async for e in workflow_handle.fetch_history_events(): if e.HasField("activity_task_scheduled_event_attributes"): header = e.activity_task_scheduled_event_attributes.header.fields["foo"] - assert "simple-codec" in header.metadata + if header_codec_behavior == HeaderCodecBehavior.CODEC: + assert "simple-codec" in header.metadata handle = await client.start_workflow( SignalAndQueryWorkflow.run, @@ -8252,7 +8259,8 @@ async def test_workflow_headers_with_codec(client: Client, env: WorkflowEnvironm header = e.workflow_execution_signaled_event_attributes.header.fields[ "foo" ] - assert "simple-codec" in header.metadata + if header_codec_behavior == HeaderCodecBehavior.CODEC: + assert "simple-codec" in header.metadata schedule_handle = await client.create_schedule( f"schedule-{uuid.uuid4()}", @@ -8272,4 +8280,8 @@ async def test_workflow_headers_with_codec(client: Client, env: WorkflowEnvironm # Header payload is still encoded due to limitations headers = cast(ScheduleActionStartWorkflow, description.schedule.action).headers - assert headers is not None and not headers["foo"].data == b"bar" + assert headers is not None + if header_codec_behavior == HeaderCodecBehavior.NO_CODEC: + assert headers["foo"].data == b"bar" + else: + assert headers["foo"].data != b"bar" From d4e88b25dd34c5abd3f2fa80172664fba238bec9 Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Mon, 7 Jul 2025 10:05:37 -0700 Subject: [PATCH 3/9] Linting --- temporalio/client.py | 5 +++-- temporalio/worker/_worker.py | 12 +++++++++--- tests/worker/test_workflow.py | 26 +++++++++++++++++++++----- 3 files changed, 33 insertions(+), 10 deletions(-) diff --git a/temporalio/client.py b/temporalio/client.py index 92d8ebf6c..00b50ec20 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -67,8 +67,8 @@ RPCStatusCode, TLSConfig, ) -from .common import HeaderCodecBehavior +from .common import HeaderCodecBehavior from .types import ( AnyType, LocalReturnType, @@ -4150,7 +4150,8 @@ async def _to_proto( await _apply_headers( self.headers, action.start_workflow.header.fields, - client.config()["header_codec_behavior"] == HeaderCodecBehavior.CODEC and not self._from_raw, + client.config()["header_codec_behavior"] == HeaderCodecBehavior.CODEC + and not self._from_raw, client.data_converter.payload_codec, ) return action diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index 767864421..82089a6c4 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -37,7 +37,11 @@ import temporalio.exceptions import temporalio.runtime import temporalio.service -from temporalio.common import VersioningBehavior, WorkerDeploymentVersion, HeaderCodecBehavior +from temporalio.common import ( + HeaderCodecBehavior, + VersioningBehavior, + WorkerDeploymentVersion, +) from ._activity import SharedStateManager, _ActivityWorker from ._interceptor import Interceptor @@ -392,7 +396,8 @@ def __init__( data_converter=client_config["data_converter"], interceptors=interceptors, metric_meter=self._runtime.metric_meter, - encode_headers=client_config["header_codec_behavior"] == HeaderCodecBehavior.CODEC, + encode_headers=client_config["header_codec_behavior"] + == HeaderCodecBehavior.CODEC, ) self._workflow_worker: Optional[_WorkflowWorker] = None if workflows: @@ -430,7 +435,8 @@ def check_activity(activity): disable_safe_eviction=disable_safe_workflow_eviction, should_enforce_versioning_behavior=should_enforce_versioning_behavior, assert_local_activity_valid=check_activity, - encode_headers=client_config["header_codec_behavior"] != HeaderCodecBehavior.NO_CODEC, + encode_headers=client_config["header_codec_behavior"] + != HeaderCodecBehavior.NO_CODEC, ) if tuner is not None: diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 2321f55c3..994ce4e9c 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -74,6 +74,7 @@ WorkflowUpdateStage, ) from temporalio.common import ( + HeaderCodecBehavior, Priority, RawValue, RetryPolicy, @@ -82,7 +83,7 @@ SearchAttributes, SearchAttributeValues, TypedSearchAttributes, - WorkflowIDConflictPolicy, HeaderCodecBehavior, + WorkflowIDConflictPolicy, ) from temporalio.converter import ( DataConverter, @@ -8132,8 +8133,10 @@ def workflow_interceptor_class( ) -> Optional[Type[temporalio.worker.WorkflowInboundInterceptor]]: return HeaderWorkflowInboundInterceptor + global_header_codec_behavior: HeaderCodecBehavior + class HeaderActivityInboundInterceptor(temporalio.worker.ActivityInboundInterceptor): async def execute_activity( self, input: temporalio.worker.ExecuteActivityInput @@ -8176,11 +8179,15 @@ def __init__(self, header: Payload): def intercept_client( self, next: temporalio.client.OutboundInterceptor ) -> temporalio.client.OutboundInterceptor: - return HeaderClientOutboundInterceptor(super().intercept_client(next), self.header) + return HeaderClientOutboundInterceptor( + super().intercept_client(next), self.header + ) class HeaderClientOutboundInterceptor(temporalio.client.OutboundInterceptor): - def __init__(self, next: temporalio.client.OutboundInterceptor, header: Payload) -> None: + def __init__( + self, next: temporalio.client.OutboundInterceptor, header: Payload + ) -> None: self.header = header super().__init__(next) @@ -8201,8 +8208,17 @@ async def create_schedule(self, input: CreateScheduleInput) -> ScheduleHandle: return await super().create_schedule(input) -@pytest.mark.parametrize("header_codec_behavior", [HeaderCodecBehavior.NO_CODEC, HeaderCodecBehavior.CODEC, HeaderCodecBehavior.WORKFLOW_ONLY_CODEC]) -async def test_workflow_headers_with_codec(client: Client, header_codec_behavior: HeaderCodecBehavior): +@pytest.mark.parametrize( + "header_codec_behavior", + [ + HeaderCodecBehavior.NO_CODEC, + HeaderCodecBehavior.CODEC, + HeaderCodecBehavior.WORKFLOW_ONLY_CODEC, + ], +) +async def test_workflow_headers_with_codec( + client: Client, header_codec_behavior: HeaderCodecBehavior +): header_payload = Payload(data=b"bar") if header_codec_behavior == HeaderCodecBehavior.WORKFLOW_ONLY_CODEC: header_payload = (await SimpleCodec().encode([header_payload]))[0] From a0ac24a1096c790ec7fbcc5893e44afde63973d9 Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Mon, 7 Jul 2025 15:25:54 -0700 Subject: [PATCH 4/9] Address comments --- temporalio/bridge/worker.py | 13 ++++--------- temporalio/client.py | 5 +++++ temporalio/common.py | 5 +++++ temporalio/worker/_activity.py | 1 - temporalio/worker/_replayer.py | 10 ++++++---- 5 files changed, 20 insertions(+), 14 deletions(-) diff --git a/temporalio/bridge/worker.py b/temporalio/bridge/worker.py index 09da7181b..8184806a9 100644 --- a/temporalio/bridge/worker.py +++ b/temporalio/bridge/worker.py @@ -11,6 +11,7 @@ Awaitable, Callable, List, + Mapping, Optional, Sequence, Set, @@ -272,9 +273,7 @@ async def finalize_shutdown(self) -> None: async def _apply_to_headers( - headers: google.protobuf.internal.containers.MessageMap[ - str, temporalio.api.common.v1.Payload - ], + headers: Mapping[str, temporalio.api.common.v1.Payload], cb: Callable[ [Sequence[temporalio.api.common.v1.Payload]], Awaitable[List[temporalio.api.common.v1.Payload]], @@ -287,9 +286,7 @@ async def _apply_to_headers( async def _decode_headers( - headers: google.protobuf.internal.containers.MessageMap[ - str, temporalio.api.common.v1.Payload - ], + headers: Mapping[str, temporalio.api.common.v1.Payload], codec: temporalio.converter.PayloadCodec, ) -> None: """Decode headers with the given codec.""" @@ -297,9 +294,7 @@ async def _decode_headers( async def _encode_headers( - headers: google.protobuf.internal.containers.MessageMap[ - str, temporalio.api.common.v1.Payload - ], + headers: Mapping[str, temporalio.api.common.v1.Payload], codec: temporalio.converter.PayloadCodec, ) -> None: """Encode headers with the given codec.""" diff --git a/temporalio/client.py b/temporalio/client.py index 00b50ec20..98e6c0550 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -119,6 +119,7 @@ async def connect( lazy: bool = False, runtime: Optional[temporalio.runtime.Runtime] = None, http_connect_proxy_config: Optional[HttpConnectProxyConfig] = None, + header_codec_behavior: Optional[HeaderCodecBehavior] = None, ) -> Client: """Connect to a Temporal server. @@ -163,6 +164,7 @@ async def connect( used for workers. runtime: The runtime for this client, or the default if unset. http_connect_proxy_config: Configuration for HTTP CONNECT proxy. + header_codec_behavior: Encoding behavior for headers sent by the client. """ connect_config = temporalio.service.ConnectConfig( target_host=target_host, @@ -182,6 +184,9 @@ async def connect( data_converter=data_converter, interceptors=interceptors, default_workflow_query_reject_condition=default_workflow_query_reject_condition, + header_codec_behavior=HeaderCodecBehavior.NO_CODEC + if header_codec_behavior is None + else header_codec_behavior, ) def __init__( diff --git a/temporalio/common.py b/temporalio/common.py index 230d7a128..4d6af75bd 100644 --- a/temporalio/common.py +++ b/temporalio/common.py @@ -1233,6 +1233,11 @@ def _type_hints_from_func( class HeaderCodecBehavior(IntEnum): + """Different ways to handle header encoding""" + NO_CODEC = 1 + """Don't encode or decode any headers automatically""" CODEC = 2 + """Encode and decode all headers automatically""" WORKFLOW_ONLY_CODEC = 3 + """Only automatically encode and decode headers in workflow activation encoding and decoding.""" diff --git a/temporalio/worker/_activity.py b/temporalio/worker/_activity.py index 68dc3f6c2..cd82c4f4e 100644 --- a/temporalio/worker/_activity.py +++ b/temporalio/worker/_activity.py @@ -546,7 +546,6 @@ async def _execute_activity( ) if self._encode_headers and self._data_converter.payload_codec is not None: - logger.info("headers: %s", start.header_fields) for payload in start.header_fields.values(): new_payload = ( await self._data_converter.payload_codec.decode([payload]) diff --git a/temporalio/worker/_replayer.py b/temporalio/worker/_replayer.py index 54a2dd90e..bf2132a9c 100644 --- a/temporalio/worker/_replayer.py +++ b/temporalio/worker/_replayer.py @@ -19,6 +19,7 @@ import temporalio.runtime import temporalio.workflow +from ..common import HeaderCodecBehavior from ._interceptor import Interceptor from ._worker import load_default_build_id from ._workflow import _WorkflowWorker @@ -47,7 +48,7 @@ def __init__( debug_mode: bool = False, runtime: Optional[temporalio.runtime.Runtime] = None, disable_safe_workflow_eviction: bool = False, - encode_headers: bool = True, + header_codec_behavior: HeaderCodecBehavior = HeaderCodecBehavior.NO_CODEC, ) -> None: """Create a replayer to replay workflows from history. @@ -79,7 +80,7 @@ def __init__( debug_mode=debug_mode, runtime=runtime, disable_safe_workflow_eviction=disable_safe_workflow_eviction, - encode_headers=encode_headers, + header_codec_behavior=header_codec_behavior, ) def config(self) -> ReplayerConfig: @@ -219,7 +220,8 @@ def on_eviction_hook( disable_safe_eviction=self._config["disable_safe_workflow_eviction"], should_enforce_versioning_behavior=False, assert_local_activity_valid=lambda a: None, - encode_headers=self._config["encode_headers"], + encode_headers=self._config["header_codec_behavior"] + != HeaderCodecBehavior.NO_CODEC, ) # Create bridge worker bridge_worker, pusher = temporalio.bridge.worker.Worker.for_replay( @@ -338,7 +340,7 @@ class ReplayerConfig(TypedDict, total=False): debug_mode: bool runtime: Optional[temporalio.runtime.Runtime] disable_safe_workflow_eviction: bool - encode_headers: bool + header_codec_behavior: HeaderCodecBehavior @dataclass(frozen=True) From dafa0bdcdc9b9cf05093ea2b62e3d53b447132f5 Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Tue, 8 Jul 2025 08:19:33 -0700 Subject: [PATCH 5/9] Fix connect argument --- temporalio/client.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/temporalio/client.py b/temporalio/client.py index 98e6c0550..cc5437908 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -119,7 +119,7 @@ async def connect( lazy: bool = False, runtime: Optional[temporalio.runtime.Runtime] = None, http_connect_proxy_config: Optional[HttpConnectProxyConfig] = None, - header_codec_behavior: Optional[HeaderCodecBehavior] = None, + header_codec_behavior: HeaderCodecBehavior = HeaderCodecBehavior.NO_CODEC, ) -> Client: """Connect to a Temporal server. @@ -184,9 +184,7 @@ async def connect( data_converter=data_converter, interceptors=interceptors, default_workflow_query_reject_condition=default_workflow_query_reject_condition, - header_codec_behavior=HeaderCodecBehavior.NO_CODEC - if header_codec_behavior is None - else header_codec_behavior, + header_codec_behavior=header_codec_behavior, ) def __init__( From 086b91de2bf02b909f6a1dfd1e923c532abc4a5e Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Tue, 8 Jul 2025 08:50:57 -0700 Subject: [PATCH 6/9] Exclude _from_raw from eq comparison --- temporalio/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/temporalio/client.py b/temporalio/client.py index 9184065af..32d287f26 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -3871,7 +3871,7 @@ class ScheduleActionStartWorkflow(ScheduleAction): """ Headers may still be encoded by the payload codec if present. """ - _from_raw: bool + _from_raw: bool = dataclasses.field(compare=False) @staticmethod def _from_proto( # pyright: ignore From 3456b401d58d60442c64c9a4eec3d96f6328532a Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Tue, 8 Jul 2025 09:08:30 -0700 Subject: [PATCH 7/9] Exclude _from_raw from init --- temporalio/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/temporalio/client.py b/temporalio/client.py index 32d287f26..0aab85465 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -3871,7 +3871,7 @@ class ScheduleActionStartWorkflow(ScheduleAction): """ Headers may still be encoded by the payload codec if present. """ - _from_raw: bool = dataclasses.field(compare=False) + _from_raw: bool = dataclasses.field(compare=False, init=False) @staticmethod def _from_proto( # pyright: ignore From c071469f87cd9ab833a5e6e3546638faab157ccd Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Tue, 8 Jul 2025 09:58:14 -0700 Subject: [PATCH 8/9] Debugging CI --- tests/worker/test_workflow.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 994ce4e9c..b98d0b005 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -8154,6 +8154,7 @@ def init(self, outbound: temporalio.worker.WorkflowOutboundInterceptor) -> None: super().init(HeaderWorkflowOutboundInterceptor(outbound)) async def handle_signal(self, input: HandleSignalInput) -> None: + print("Signal header inbound:", input.headers) assert input.headers["foo"].data == b"bar" await super().handle_signal(input) @@ -8198,6 +8199,7 @@ async def start_workflow( return await super().start_workflow(input) async def signal_workflow(self, input: SignalWorkflowInput) -> None: + print("Signal header:", self.header) input.headers = {"foo": self.header.__deepcopy__()} return await super().signal_workflow(input) From 85fecdb91197d407aec008e9cd8996a9aac4ca03 Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Tue, 8 Jul 2025 10:32:23 -0700 Subject: [PATCH 9/9] Skip on time skipping server --- tests/worker/test_workflow.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index b98d0b005..8b333cbae 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -8154,7 +8154,6 @@ def init(self, outbound: temporalio.worker.WorkflowOutboundInterceptor) -> None: super().init(HeaderWorkflowOutboundInterceptor(outbound)) async def handle_signal(self, input: HandleSignalInput) -> None: - print("Signal header inbound:", input.headers) assert input.headers["foo"].data == b"bar" await super().handle_signal(input) @@ -8199,7 +8198,6 @@ async def start_workflow( return await super().start_workflow(input) async def signal_workflow(self, input: SignalWorkflowInput) -> None: - print("Signal header:", self.header) input.headers = {"foo": self.header.__deepcopy__()} return await super().signal_workflow(input) @@ -8219,8 +8217,11 @@ async def create_schedule(self, input: CreateScheduleInput) -> ScheduleHandle: ], ) async def test_workflow_headers_with_codec( - client: Client, header_codec_behavior: HeaderCodecBehavior + client: Client, env: WorkflowEnvironment, header_codec_behavior: HeaderCodecBehavior ): + if env.supports_time_skipping: + pytest.skip("Time skipping server doesn't persist headers.") + header_payload = Payload(data=b"bar") if header_codec_behavior == HeaderCodecBehavior.WORKFLOW_ONLY_CODEC: header_payload = (await SimpleCodec().encode([header_payload]))[0]