Skip to content

Commit cf4c7cb

Browse files
authored
Ensure extra data on task fail logs (#502)
Fixes #500
1 parent 1001653 commit cf4c7cb

File tree

2 files changed

+99
-23
lines changed

2 files changed

+99
-23
lines changed

temporalio/worker/_workflow_instance.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,7 @@ def activate(
349349
logger.warning(
350350
f"Failed activation on workflow {self._info.workflow_type} with ID {self._info.workflow_id} and run ID {self._info.run_id}",
351351
exc_info=activation_err,
352+
extra={"temporal_workflow": self._info._logger_details()},
352353
)
353354
# Set completion failure
354355
self._current_completion.failed.failure.SetInParent()

tests/worker/test_workflow.py

Lines changed: 98 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import typing
1010
import uuid
1111
from abc import ABC, abstractmethod
12+
from contextlib import contextmanager
1213
from dataclasses import dataclass
1314
from datetime import datetime, timedelta, timezone
1415
from typing import (
@@ -30,6 +31,7 @@
3031
from google.protobuf.timestamp_pb2 import Timestamp
3132
from typing_extensions import Protocol, runtime_checkable
3233

34+
import temporalio.worker
3335
from temporalio import activity, workflow
3436
from temporalio.api.common.v1 import Payload, Payloads, WorkflowExecution
3537
from temporalio.api.enums.v1 import EventType
@@ -1876,22 +1878,37 @@ def last_signal(self) -> str:
18761878
return self._last_signal
18771879

18781880

1879-
async def test_workflow_logging(client: Client, env: WorkflowEnvironment):
1880-
# Use queue to capture log statements
1881-
log_queue: queue.Queue[logging.LogRecord] = queue.Queue()
1882-
handler = logging.handlers.QueueHandler(log_queue)
1883-
workflow.logger.base_logger.addHandler(handler)
1884-
prev_level = workflow.logger.base_logger.level
1885-
workflow.logger.base_logger.setLevel(logging.INFO)
1886-
workflow.logger.full_workflow_info_on_extra = True
1881+
class LogCapturer:
1882+
def __init__(self) -> None:
1883+
self.log_queue: queue.Queue[logging.LogRecord] = queue.Queue()
18871884

1888-
def find_log(starts_with: str) -> Optional[logging.LogRecord]:
1889-
for record in cast(List[logging.LogRecord], log_queue.queue):
1885+
@contextmanager
1886+
def logs_captured(self, *loggers: logging.Logger):
1887+
handler = logging.handlers.QueueHandler(self.log_queue)
1888+
1889+
prev_levels = [l.level for l in loggers]
1890+
for l in loggers:
1891+
l.setLevel(logging.INFO)
1892+
l.addHandler(handler)
1893+
try:
1894+
yield self
1895+
finally:
1896+
for i, l in enumerate(loggers):
1897+
l.removeHandler(handler)
1898+
l.setLevel(prev_levels[i])
1899+
1900+
def find_log(self, starts_with: str) -> Optional[logging.LogRecord]:
1901+
for record in cast(List[logging.LogRecord], self.log_queue.queue):
18901902
if record.message.startswith(starts_with):
18911903
return record
18921904
return None
18931905

1894-
try:
1906+
1907+
async def test_workflow_logging(client: Client, env: WorkflowEnvironment):
1908+
workflow.logger.full_workflow_info_on_extra = True
1909+
with LogCapturer().logs_captured(
1910+
workflow.logger.base_logger, activity.logger.base_logger
1911+
) as capturer:
18951912
# Log two signals and kill worker before completing. Need to disable
18961913
# workflow cache since we restart the worker and don't want to pay the
18971914
# sticky queue penalty.
@@ -1909,11 +1926,11 @@ def find_log(starts_with: str) -> Optional[logging.LogRecord]:
19091926
assert "signal 2" == await handle.query(LoggingWorkflow.last_signal)
19101927

19111928
# Confirm two logs happened
1912-
assert find_log("Signal: signal 1 ({'attempt':")
1913-
assert find_log("Signal: signal 2")
1914-
assert not find_log("Signal: signal 3")
1929+
assert capturer.find_log("Signal: signal 1 ({'attempt':")
1930+
assert capturer.find_log("Signal: signal 2")
1931+
assert not capturer.find_log("Signal: signal 3")
19151932
# Also make sure it has some workflow info and correct funcName
1916-
record = find_log("Signal: signal 1")
1933+
record = capturer.find_log("Signal: signal 1")
19171934
assert (
19181935
record
19191936
and record.__dict__["temporal_workflow"]["workflow_type"]
@@ -1924,7 +1941,7 @@ def find_log(starts_with: str) -> Optional[logging.LogRecord]:
19241941
assert isinstance(record.__dict__["workflow_info"], workflow.Info)
19251942

19261943
# Clear queue and start a new one with more signals
1927-
log_queue.queue.clear()
1944+
capturer.log_queue.queue.clear()
19281945
async with new_worker(
19291946
client,
19301947
LoggingWorkflow,
@@ -1937,13 +1954,71 @@ def find_log(starts_with: str) -> Optional[logging.LogRecord]:
19371954
await handle.result()
19381955

19391956
# Confirm replayed logs are not present but new ones are
1940-
assert not find_log("Signal: signal 1")
1941-
assert not find_log("Signal: signal 2")
1942-
assert find_log("Signal: signal 3")
1943-
assert find_log("Signal: finish")
1944-
finally:
1945-
workflow.logger.base_logger.removeHandler(handler)
1946-
workflow.logger.base_logger.setLevel(prev_level)
1957+
assert not capturer.find_log("Signal: signal 1")
1958+
assert not capturer.find_log("Signal: signal 2")
1959+
assert capturer.find_log("Signal: signal 3")
1960+
assert capturer.find_log("Signal: finish")
1961+
1962+
1963+
@activity.defn
1964+
async def task_fail_once_activity() -> None:
1965+
if activity.info().attempt == 1:
1966+
raise RuntimeError("Intentional activity task failure")
1967+
1968+
1969+
task_fail_once_workflow_has_failed = False
1970+
1971+
1972+
@workflow.defn(sandboxed=False)
1973+
class TaskFailOnceWorkflow:
1974+
@workflow.run
1975+
async def run(self) -> None:
1976+
# Fail on first attempt
1977+
global task_fail_once_workflow_has_failed
1978+
if not task_fail_once_workflow_has_failed:
1979+
task_fail_once_workflow_has_failed = True
1980+
raise RuntimeError("Intentional workflow task failure")
1981+
1982+
# Execute activity that will fail once
1983+
await workflow.execute_activity(
1984+
task_fail_once_activity,
1985+
start_to_close_timeout=timedelta(seconds=30),
1986+
retry_policy=RetryPolicy(
1987+
initial_interval=timedelta(milliseconds=1),
1988+
backoff_coefficient=1.0,
1989+
maximum_attempts=2,
1990+
),
1991+
)
1992+
1993+
1994+
async def test_workflow_logging_task_fail(client: Client):
1995+
with LogCapturer().logs_captured(
1996+
activity.logger.base_logger, temporalio.worker._workflow_instance.logger
1997+
) as capturer:
1998+
async with new_worker(
1999+
client, TaskFailOnceWorkflow, activities=[task_fail_once_activity]
2000+
) as worker:
2001+
await client.execute_workflow(
2002+
TaskFailOnceWorkflow.run,
2003+
id=f"workflow-{uuid.uuid4()}",
2004+
task_queue=worker.task_queue,
2005+
)
2006+
2007+
wf_task_record = capturer.find_log("Failed activation on workflow")
2008+
assert wf_task_record
2009+
assert "Intentional workflow task failure" in wf_task_record.message
2010+
assert (
2011+
getattr(wf_task_record, "temporal_workflow")["workflow_type"]
2012+
== "TaskFailOnceWorkflow"
2013+
)
2014+
2015+
act_task_record = capturer.find_log("Completing activity as failed")
2016+
assert act_task_record
2017+
assert "Intentional activity task failure" in act_task_record.message
2018+
assert (
2019+
getattr(act_task_record, "temporal_activity")["activity_type"]
2020+
== "task_fail_once_activity"
2021+
)
19472022

19482023

19492024
@workflow.defn

0 commit comments

Comments
 (0)