|
44 | 44 | )
|
45 | 45 |
|
46 | 46 | import nexusrpc.handler
|
| 47 | +import xray |
| 48 | +from google.protobuf.json_format import MessageToJson |
47 | 49 | from nexusrpc import InputT, OutputT
|
48 | 50 | from typing_extensions import Self, TypeAlias, TypedDict
|
49 | 51 |
|
@@ -355,10 +357,30 @@ def get_thread_id(self) -> Optional[int]:
|
355 | 357 | # "_make_workflow_input", all other calls are "_apply_" + the job field
|
356 | 358 | # name.
|
357 | 359 |
|
| 360 | + # The commented code below had line numbers, which are now removed. |
| 361 | + |
358 | 362 | def activate(
|
359 | 363 | self, act: temporalio.bridge.proto.workflow_activation.WorkflowActivation
|
360 | 364 | ) -> temporalio.bridge.proto.workflow_completion.WorkflowActivationCompletion:
|
361 |
| - print("activate -------------------------------------------------------------") |
| 365 | + with xray.start_as_current_span( |
| 366 | + actor=xray.Actor.WORKFLOW_USER, |
| 367 | + workflow_id=self._info.workflow_id, |
| 368 | + name="handle_activation", |
| 369 | + request_payload=MessageToJson(act), |
| 370 | + ) as span: |
| 371 | + print( |
| 372 | + f">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> WFT {self._info.workflow_id}\n" |
| 373 | + ) |
| 374 | + |
| 375 | + completion = self._activate(act) |
| 376 | + print("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<") |
| 377 | + if span: |
| 378 | + span.set_attribute("sdk.response.payload", MessageToJson(completion)) |
| 379 | + return completion |
| 380 | + |
| 381 | + def _activate( |
| 382 | + self, act: temporalio.bridge.proto.workflow_activation.WorkflowActivation |
| 383 | + ) -> temporalio.bridge.proto.workflow_completion.WorkflowActivationCompletion: |
362 | 384 | # Reset current completion, time, and whether replaying
|
363 | 385 | self._current_completion = (
|
364 | 386 | temporalio.bridge.proto.workflow_completion.WorkflowActivationCompletion()
|
|
0 commit comments