diff --git a/temporalio/bridge/worker.py b/temporalio/bridge/worker.py index e97563bf1..e4cb05eee 100644 --- a/temporalio/bridge/worker.py +++ b/temporalio/bridge/worker.py @@ -11,6 +11,7 @@ Awaitable, Callable, List, + Mapping, Optional, Sequence, Set, @@ -287,6 +288,35 @@ async def finalize_shutdown(self) -> None: ) +async def _apply_to_headers( + headers: Mapping[str, temporalio.api.common.v1.Payload], + cb: Callable[ + [Sequence[temporalio.api.common.v1.Payload]], + Awaitable[List[temporalio.api.common.v1.Payload]], + ], +) -> None: + """Apply API payload callback to headers.""" + for payload in headers.values(): + new_payload = (await cb([payload]))[0] + payload.CopyFrom(new_payload) + + +async def _decode_headers( + headers: Mapping[str, temporalio.api.common.v1.Payload], + codec: temporalio.converter.PayloadCodec, +) -> None: + """Decode headers with the given codec.""" + return await _apply_to_headers(headers, codec.decode) + + +async def _encode_headers( + headers: Mapping[str, temporalio.api.common.v1.Payload], + codec: temporalio.converter.PayloadCodec, +) -> None: + """Encode headers with the given codec.""" + return await _apply_to_headers(headers, codec.encode) + + async def _apply_to_payloads( payloads: PayloadContainer, cb: Callable[ @@ -352,11 +382,14 @@ async def _encode_payload( async def decode_activation( act: temporalio.bridge.proto.workflow_activation.WorkflowActivation, codec: temporalio.converter.PayloadCodec, + decode_headers: bool, ) -> None: """Decode the given activation with the codec.""" for job in act.jobs: if job.HasField("query_workflow"): await _decode_payloads(job.query_workflow.arguments, codec) + if decode_headers: + await _decode_headers(job.query_workflow.headers, codec) elif job.HasField("resolve_activity"): if job.resolve_activity.result.HasField("cancelled"): await codec.decode_failure( @@ -401,8 +434,12 @@ async def decode_activation( await codec.decode_failure(job.resolve_signal_external_workflow.failure) elif job.HasField("signal_workflow"): await _decode_payloads(job.signal_workflow.input, codec) + if decode_headers: + await _decode_headers(job.signal_workflow.headers, codec) elif job.HasField("initialize_workflow"): await _decode_payloads(job.initialize_workflow.arguments, codec) + if decode_headers: + await _decode_headers(job.initialize_workflow.headers, codec) if job.initialize_workflow.HasField("continued_failure"): await codec.decode_failure(job.initialize_workflow.continued_failure) for val in job.initialize_workflow.memo.fields.values(): @@ -416,11 +453,14 @@ async def decode_activation( val.data = new_payload.data elif job.HasField("do_update"): await _decode_payloads(job.do_update.input, codec) + if decode_headers: + await _decode_headers(job.do_update.headers, codec) async def encode_completion( comp: temporalio.bridge.proto.workflow_completion.WorkflowActivationCompletion, codec: temporalio.converter.PayloadCodec, + encode_headers: bool, ) -> None: """Recursively encode the given completion with the codec.""" if comp.HasField("failed"): @@ -436,6 +476,10 @@ async def encode_completion( await _encode_payloads( command.continue_as_new_workflow_execution.arguments, codec ) + if encode_headers: + await _encode_headers( + command.continue_as_new_workflow_execution.headers, codec + ) for val in command.continue_as_new_workflow_execution.memo.values(): await _encode_payload(val, codec) elif command.HasField("fail_workflow_execution"): @@ -451,16 +495,30 @@ async def encode_completion( ) elif command.HasField("schedule_activity"): await _encode_payloads(command.schedule_activity.arguments, codec) + if encode_headers: + await _encode_headers(command.schedule_activity.headers, codec) elif command.HasField("schedule_local_activity"): await _encode_payloads(command.schedule_local_activity.arguments, codec) + if encode_headers: + await _encode_headers( + command.schedule_local_activity.headers, codec + ) elif command.HasField("signal_external_workflow_execution"): await _encode_payloads( command.signal_external_workflow_execution.args, codec ) + if encode_headers: + await _encode_headers( + command.signal_external_workflow_execution.headers, codec + ) elif command.HasField("start_child_workflow_execution"): await _encode_payloads( command.start_child_workflow_execution.input, codec ) + if encode_headers: + await _encode_headers( + command.start_child_workflow_execution.headers, codec + ) for val in command.start_child_workflow_execution.memo.values(): await _encode_payload(val, codec) elif command.HasField("update_response"): diff --git a/temporalio/client.py b/temporalio/client.py index 7110c74df..0aab85465 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -27,6 +27,7 @@ Mapping, Optional, Sequence, + Text, Tuple, Type, Union, @@ -37,6 +38,7 @@ import google.protobuf.duration_pb2 import google.protobuf.json_format import google.protobuf.timestamp_pb2 +from google.protobuf.internal.containers import MessageMap from typing_extensions import Concatenate, Required, TypedDict import temporalio.api.common.v1 @@ -66,6 +68,7 @@ TLSConfig, ) +from .common import HeaderCodecBehavior from .types import ( AnyType, LocalReturnType, @@ -116,6 +119,7 @@ async def connect( lazy: bool = False, runtime: Optional[temporalio.runtime.Runtime] = None, http_connect_proxy_config: Optional[HttpConnectProxyConfig] = None, + header_codec_behavior: HeaderCodecBehavior = HeaderCodecBehavior.NO_CODEC, ) -> Client: """Connect to a Temporal server. @@ -160,6 +164,7 @@ async def connect( used for workers. runtime: The runtime for this client, or the default if unset. http_connect_proxy_config: Configuration for HTTP CONNECT proxy. + header_codec_behavior: Encoding behavior for headers sent by the client. """ connect_config = temporalio.service.ConnectConfig( target_host=target_host, @@ -179,6 +184,7 @@ async def connect( data_converter=data_converter, interceptors=interceptors, default_workflow_query_reject_condition=default_workflow_query_reject_condition, + header_codec_behavior=header_codec_behavior, ) def __init__( @@ -191,6 +197,7 @@ def __init__( default_workflow_query_reject_condition: Optional[ temporalio.common.QueryRejectCondition ] = None, + header_codec_behavior: HeaderCodecBehavior = HeaderCodecBehavior.NO_CODEC, ): """Create a Temporal client from a service client. @@ -208,6 +215,7 @@ def __init__( data_converter=data_converter, interceptors=interceptors, default_workflow_query_reject_condition=default_workflow_query_reject_condition, + header_codec_behavior=header_codec_behavior, ) def config(self) -> ClientConfig: @@ -1501,6 +1509,7 @@ class ClientConfig(TypedDict, total=False): default_workflow_query_reject_condition: Required[ Optional[temporalio.common.QueryRejectCondition] ] + header_codec_behavior: Required[HeaderCodecBehavior] class WorkflowHistoryEventFilterType(IntEnum): @@ -3859,6 +3868,10 @@ class ScheduleActionStartWorkflow(ScheduleAction): priority: temporalio.common.Priority headers: Optional[Mapping[str, temporalio.api.common.v1.Payload]] + """ + Headers may still be encoded by the payload codec if present. + """ + _from_raw: bool = dataclasses.field(compare=False, init=False) @staticmethod def _from_proto( # pyright: ignore @@ -3985,6 +3998,7 @@ def __init__( """ super().__init__() if raw_info: + self._from_raw = True # Ignore other fields self.workflow = raw_info.workflow_type.name self.args = raw_info.input.payloads if raw_info.input else [] @@ -4044,6 +4058,7 @@ def __init__( else temporalio.common.Priority.default ) else: + self._from_raw = False if not id: raise ValueError("ID required") if not task_queue: @@ -4067,7 +4082,7 @@ def __init__( self.memo = memo self.typed_search_attributes = typed_search_attributes self.untyped_search_attributes = untyped_search_attributes - self.headers = headers + self.headers = headers # encode here self.static_summary = static_summary self.static_details = static_details self.priority = priority @@ -4145,8 +4160,12 @@ async def _to_proto( self.typed_search_attributes, action.start_workflow.search_attributes ) if self.headers: - temporalio.common._apply_headers( - self.headers, action.start_workflow.header.fields + await _apply_headers( + self.headers, + action.start_workflow.header.fields, + client.config()["header_codec_behavior"] == HeaderCodecBehavior.CODEC + and not self._from_raw, + client.data_converter.payload_codec, ) return action @@ -5920,7 +5939,7 @@ async def _populate_start_workflow_execution_request( if input.start_delay is not None: req.workflow_start_delay.FromTimedelta(input.start_delay) if input.headers is not None: - temporalio.common._apply_headers(input.headers, req.header.fields) + await self._apply_headers(input.headers, req.header.fields) if input.priority is not None: req.priority.CopyFrom(input.priority._to_proto()) if input.versioning_override is not None: @@ -6006,7 +6025,7 @@ async def query_workflow(self, input: QueryWorkflowInput) -> Any: await self._client.data_converter.encode(input.args) ) if input.headers is not None: - temporalio.common._apply_headers(input.headers, req.query.header.fields) + await self._apply_headers(input.headers, req.query.header.fields) try: resp = await self._client.workflow_service.query_workflow( req, retry=True, metadata=input.rpc_metadata, timeout=input.rpc_timeout @@ -6052,7 +6071,7 @@ async def signal_workflow(self, input: SignalWorkflowInput) -> None: await self._client.data_converter.encode(input.args) ) if input.headers is not None: - temporalio.common._apply_headers(input.headers, req.header.fields) + await self._apply_headers(input.headers, req.header.fields) await self._client.workflow_service.signal_workflow_execution( req, retry=True, metadata=input.rpc_metadata, timeout=input.rpc_timeout ) @@ -6163,9 +6182,7 @@ async def _build_update_workflow_execution_request( await self._client.data_converter.encode(input.args) ) if input.headers is not None: - temporalio.common._apply_headers( - input.headers, req.request.input.header.fields - ) + await self._apply_headers(input.headers, req.request.input.header.fields) return req async def start_update_with_start_workflow( @@ -6721,6 +6738,33 @@ async def get_worker_task_reachability( ) return WorkerTaskReachability._from_proto(resp) + async def _apply_headers( + self, + source: Optional[Mapping[str, temporalio.api.common.v1.Payload]], + dest: MessageMap[Text, temporalio.api.common.v1.Payload], + ) -> None: + await _apply_headers( + source, + dest, + self._client.config()["header_codec_behavior"] == HeaderCodecBehavior.CODEC, + self._client.data_converter.payload_codec, + ) + + +async def _apply_headers( + source: Optional[Mapping[str, temporalio.api.common.v1.Payload]], + dest: MessageMap[Text, temporalio.api.common.v1.Payload], + encode_headers: bool, + codec: Optional[temporalio.converter.PayloadCodec], +) -> None: + if source is None: + return + if encode_headers and codec is not None: + for payload in source.values(): + new_payload = (await codec.encode([payload]))[0] + payload.CopyFrom(new_payload) + temporalio.common._apply_headers(source, dest) + def _history_from_json( history: Union[str, Dict[str, Any]], diff --git a/temporalio/common.py b/temporalio/common.py index b9b088e86..52cc939a5 100644 --- a/temporalio/common.py +++ b/temporalio/common.py @@ -1230,3 +1230,14 @@ def _type_hints_from_func( # necessarily args.append(arg_hint) # type: ignore return args, ret + + +class HeaderCodecBehavior(IntEnum): + """Different ways to handle header encoding""" + + NO_CODEC = 1 + """Don't encode or decode any headers automatically""" + CODEC = 2 + """Encode and decode all headers automatically""" + WORKFLOW_ONLY_CODEC = 3 + """Only automatically encode and decode headers in workflow activation encoding and decoding.""" diff --git a/temporalio/worker/_activity.py b/temporalio/worker/_activity.py index 9bc373022..741dac510 100644 --- a/temporalio/worker/_activity.py +++ b/temporalio/worker/_activity.py @@ -69,6 +69,7 @@ def __init__( data_converter: temporalio.converter.DataConverter, interceptors: Sequence[Interceptor], metric_meter: temporalio.common.MetricMeter, + encode_headers: bool, ) -> None: self._bridge_worker = bridge_worker self._task_queue = task_queue @@ -78,6 +79,7 @@ def __init__( self._data_converter = data_converter self._interceptors = interceptors self._metric_meter = metric_meter + self._encode_headers = encode_headers self._fail_worker_exception_queue: asyncio.Queue[Exception] = asyncio.Queue() # Lazily created on first activity self._worker_shutdown_event: Optional[temporalio.activity._CompositeEvent] = ( @@ -543,6 +545,14 @@ async def _execute_activity( workflow_type=start.workflow_type, priority=temporalio.common.Priority._from_proto(start.priority), ) + + if self._encode_headers and self._data_converter.payload_codec is not None: + for payload in start.header_fields.values(): + new_payload = ( + await self._data_converter.payload_codec.decode([payload]) + )[0] + payload.CopyFrom(new_payload) + running_activity.info = info input = ExecuteActivityInput( fn=activity_def.fn, diff --git a/temporalio/worker/_replayer.py b/temporalio/worker/_replayer.py index c016495c7..6e9761b58 100644 --- a/temporalio/worker/_replayer.py +++ b/temporalio/worker/_replayer.py @@ -19,6 +19,7 @@ import temporalio.runtime import temporalio.workflow +from ..common import HeaderCodecBehavior from ._interceptor import Interceptor from ._worker import load_default_build_id from ._workflow import _WorkflowWorker @@ -47,6 +48,7 @@ def __init__( debug_mode: bool = False, runtime: Optional[temporalio.runtime.Runtime] = None, disable_safe_workflow_eviction: bool = False, + header_codec_behavior: HeaderCodecBehavior = HeaderCodecBehavior.NO_CODEC, ) -> None: """Create a replayer to replay workflows from history. @@ -78,6 +80,7 @@ def __init__( debug_mode=debug_mode, runtime=runtime, disable_safe_workflow_eviction=disable_safe_workflow_eviction, + header_codec_behavior=header_codec_behavior, ) def config(self) -> ReplayerConfig: @@ -217,6 +220,8 @@ def on_eviction_hook( disable_safe_eviction=self._config["disable_safe_workflow_eviction"], should_enforce_versioning_behavior=False, assert_local_activity_valid=lambda a: None, + encode_headers=self._config["header_codec_behavior"] + != HeaderCodecBehavior.NO_CODEC, ) # Create bridge worker bridge_worker, pusher = temporalio.bridge.worker.Worker.for_replay( @@ -338,6 +343,7 @@ class ReplayerConfig(TypedDict, total=False): debug_mode: bool runtime: Optional[temporalio.runtime.Runtime] disable_safe_workflow_eviction: bool + header_codec_behavior: HeaderCodecBehavior @dataclass(frozen=True) diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index d319f71d0..4d77e111e 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -37,7 +37,11 @@ import temporalio.exceptions import temporalio.runtime import temporalio.service -from temporalio.common import VersioningBehavior, WorkerDeploymentVersion +from temporalio.common import ( + HeaderCodecBehavior, + VersioningBehavior, + WorkerDeploymentVersion, +) from ._activity import SharedStateManager, _ActivityWorker from ._interceptor import Interceptor @@ -415,6 +419,8 @@ def __init__( data_converter=client_config["data_converter"], interceptors=interceptors, metric_meter=self._runtime.metric_meter, + encode_headers=client_config["header_codec_behavior"] + == HeaderCodecBehavior.CODEC, ) self._nexus_worker: Optional[_NexusWorker] = None if nexus_service_handlers: @@ -464,6 +470,8 @@ def check_activity(activity): disable_safe_eviction=disable_safe_workflow_eviction, should_enforce_versioning_behavior=should_enforce_versioning_behavior, assert_local_activity_valid=check_activity, + encode_headers=client_config["header_codec_behavior"] + != HeaderCodecBehavior.NO_CODEC, ) if tuner is not None: diff --git a/temporalio/worker/_workflow.py b/temporalio/worker/_workflow.py index 9f31e3574..c564a0482 100644 --- a/temporalio/worker/_workflow.py +++ b/temporalio/worker/_workflow.py @@ -79,6 +79,7 @@ def __init__( disable_safe_eviction: bool, should_enforce_versioning_behavior: bool, assert_local_activity_valid: Callable[[str], None], + encode_headers: bool, ) -> None: self._bridge_worker = bridge_worker self._namespace = namespace @@ -116,6 +117,7 @@ def __init__( self._disable_eager_activity_execution = disable_eager_activity_execution self._on_eviction_hook = on_eviction_hook self._disable_safe_eviction = disable_safe_eviction + self._encode_headers = encode_headers self._throw_after_activation: Optional[Exception] = None # If there's a debug mode or a truthy TEMPORAL_DEBUG env var, disable @@ -255,7 +257,9 @@ async def _handle_activation( # Decode the activation if there's a codec and not cache remove job if self._data_converter.payload_codec: await temporalio.bridge.worker.decode_activation( - act, self._data_converter.payload_codec + act, + self._data_converter.payload_codec, + decode_headers=self._encode_headers, ) if LOG_PROTOS: @@ -342,7 +346,9 @@ async def _handle_activation( if self._data_converter.payload_codec: try: await temporalio.bridge.worker.encode_completion( - completion, self._data_converter.payload_codec + completion, + self._data_converter.payload_codec, + encode_headers=self._encode_headers, ) except Exception as err: logger.exception( diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index fcf06fa7a..8b333cbae 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -58,8 +58,12 @@ from temporalio.client import ( AsyncActivityCancelledError, Client, + CreateScheduleInput, RPCError, RPCStatusCode, + ScheduleActionStartWorkflow, + ScheduleHandle, + SignalWorkflowInput, WorkflowExecutionStatus, WorkflowFailureError, WorkflowHandle, @@ -70,6 +74,7 @@ WorkflowUpdateStage, ) from temporalio.common import ( + HeaderCodecBehavior, Priority, RawValue, RetryPolicy, @@ -110,6 +115,8 @@ from temporalio.service import __version__ from temporalio.testing import WorkflowEnvironment from temporalio.worker import ( + ExecuteWorkflowInput, + HandleSignalInput, UnsandboxedWorkflowRunner, Worker, WorkflowInstance, @@ -8113,3 +8120,183 @@ async def test_signal_handler_in_interceptor(client: Client): id=f"workflow-{uuid.uuid4()}", task_queue=worker.task_queue, ) + + +class HeaderWorkerInterceptor(temporalio.worker.Interceptor): + def intercept_activity( + self, next: temporalio.worker.ActivityInboundInterceptor + ) -> temporalio.worker.ActivityInboundInterceptor: + return HeaderActivityInboundInterceptor(super().intercept_activity(next)) + + def workflow_interceptor_class( + self, input: temporalio.worker.WorkflowInterceptorClassInput + ) -> Optional[Type[temporalio.worker.WorkflowInboundInterceptor]]: + return HeaderWorkflowInboundInterceptor + + +global_header_codec_behavior: HeaderCodecBehavior + + +class HeaderActivityInboundInterceptor(temporalio.worker.ActivityInboundInterceptor): + async def execute_activity( + self, input: temporalio.worker.ExecuteActivityInput + ) -> Any: + if global_header_codec_behavior == HeaderCodecBehavior.WORKFLOW_ONLY_CODEC: + assert input.headers["foo"].data == b"\n\x05\x12\x03bar" + else: + assert input.headers["foo"].data == b"bar" + + return await super().execute_activity(input) + + +class HeaderWorkflowInboundInterceptor(temporalio.worker.WorkflowInboundInterceptor): + def init(self, outbound: temporalio.worker.WorkflowOutboundInterceptor) -> None: + super().init(HeaderWorkflowOutboundInterceptor(outbound)) + + async def handle_signal(self, input: HandleSignalInput) -> None: + assert input.headers["foo"].data == b"bar" + await super().handle_signal(input) + + async def execute_workflow(self, input: ExecuteWorkflowInput) -> Any: + assert input.headers["foo"].data == b"bar" + return await super().execute_workflow(input) + + +class HeaderWorkflowOutboundInterceptor(temporalio.worker.WorkflowOutboundInterceptor): + def start_activity( + self, input: temporalio.worker.StartActivityInput + ) -> workflow.ActivityHandle: + # Add a header to the outbound activity call + input.headers = {"foo": Payload(data=b"bar")} + return super().start_activity(input) + + +class HeaderClientInterceptor(temporalio.client.Interceptor): + def __init__(self, header: Payload): + self.header = header + super().__init__() + + def intercept_client( + self, next: temporalio.client.OutboundInterceptor + ) -> temporalio.client.OutboundInterceptor: + return HeaderClientOutboundInterceptor( + super().intercept_client(next), self.header + ) + + +class HeaderClientOutboundInterceptor(temporalio.client.OutboundInterceptor): + def __init__( + self, next: temporalio.client.OutboundInterceptor, header: Payload + ) -> None: + self.header = header + super().__init__(next) + + async def start_workflow( + self, input: temporalio.client.StartWorkflowInput + ) -> WorkflowHandle[Any, Any]: + input.headers = {"foo": self.header.__deepcopy__()} + return await super().start_workflow(input) + + async def signal_workflow(self, input: SignalWorkflowInput) -> None: + input.headers = {"foo": self.header.__deepcopy__()} + return await super().signal_workflow(input) + + async def create_schedule(self, input: CreateScheduleInput) -> ScheduleHandle: + cast(ScheduleActionStartWorkflow, input.schedule.action).headers = { + "foo": self.header.__deepcopy__() + } + return await super().create_schedule(input) + + +@pytest.mark.parametrize( + "header_codec_behavior", + [ + HeaderCodecBehavior.NO_CODEC, + HeaderCodecBehavior.CODEC, + HeaderCodecBehavior.WORKFLOW_ONLY_CODEC, + ], +) +async def test_workflow_headers_with_codec( + client: Client, env: WorkflowEnvironment, header_codec_behavior: HeaderCodecBehavior +): + if env.supports_time_skipping: + pytest.skip("Time skipping server doesn't persist headers.") + + header_payload = Payload(data=b"bar") + if header_codec_behavior == HeaderCodecBehavior.WORKFLOW_ONLY_CODEC: + header_payload = (await SimpleCodec().encode([header_payload]))[0] + + # Make client with this codec and run a couple of existing tests + config = client.config() + config["data_converter"] = DataConverter(payload_codec=SimpleCodec()) + config["interceptors"] = [HeaderClientInterceptor(header_payload)] + config["header_codec_behavior"] = header_codec_behavior + client = Client(**config) + + global global_header_codec_behavior + global_header_codec_behavior = header_codec_behavior + + async with new_worker( + client, + SimpleActivityWorkflow, + SignalAndQueryWorkflow, + activities=[say_hello], + interceptors=[HeaderWorkerInterceptor()], + ) as worker: + workflow_handle = await client.start_workflow( + SimpleActivityWorkflow.run, + "Temporal", + id=f"workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + assert await workflow_handle.result() == "Hello, Temporal!" + + async for e in workflow_handle.fetch_history_events(): + if e.HasField("activity_task_scheduled_event_attributes"): + header = e.activity_task_scheduled_event_attributes.header.fields["foo"] + if header_codec_behavior == HeaderCodecBehavior.CODEC: + assert "simple-codec" in header.metadata + + handle = await client.start_workflow( + SignalAndQueryWorkflow.run, + id=f"workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + + # Simple signals and queries + await handle.signal(SignalAndQueryWorkflow.signal1, "some arg") + assert "signal1: some arg" == await handle.query( + SignalAndQueryWorkflow.last_event + ) + + async for e in handle.fetch_history_events(): + if e.HasField("workflow_execution_signaled_event_attributes"): + header = e.workflow_execution_signaled_event_attributes.header.fields[ + "foo" + ] + if header_codec_behavior == HeaderCodecBehavior.CODEC: + assert "simple-codec" in header.metadata + + schedule_handle = await client.create_schedule( + f"schedule-{uuid.uuid4()}", + temporalio.client.Schedule( + action=temporalio.client.ScheduleActionStartWorkflow( + "SimpleActivityWorkflow", + id=f"workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + ), + spec=temporalio.client.ScheduleSpec( + calendars=[temporalio.client.ScheduleCalendarSpec()] + ), + state=temporalio.client.ScheduleState(paused=True), + ), + ) + description = await schedule_handle.describe() + + # Header payload is still encoded due to limitations + headers = cast(ScheduleActionStartWorkflow, description.schedule.action).headers + assert headers is not None + if header_codec_behavior == HeaderCodecBehavior.NO_CODEC: + assert headers["foo"].data == b"bar" + else: + assert headers["foo"].data != b"bar"