Skip to content

Commit 9f14e5d

Browse files
authored
fix(aci): Retry process_delayed_workflows timeouts (#95379)
Timeouts are expected here and assumed transient, and the existing `retry` decorator and config don't support retrying `ProcessingDeadlineExceeded`. So, we add a new task decorator to trigger retries on timeouts. This avoids opening the door to other BaseExceptions and keeps it opt-in as designed.
1 parent eed44b6 commit 9f14e5d

File tree

5 files changed

+35
-4
lines changed

5 files changed

+35
-4
lines changed

src/sentry/sentry_apps/tasks/sentry_apps.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ def _process_resource_change(
330330
except model.DoesNotExist as e:
331331
# Explicitly requeue the task, so we don't report this to Sentry until
332332
# we hit the max number of retries.
333-
return retry_task(e)
333+
retry_task(e)
334334

335335
org = None
336336

src/sentry/taskworker/retry.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from enum import Enum
44
from multiprocessing.context import TimeoutError
5+
from typing import NoReturn
56

67
from celery import current_task
78
from sentry_protos.taskbroker.v1.taskbroker_pb2 import (
@@ -41,7 +42,7 @@ def to_proto(self) -> OnAttemptsExceeded.ValueType:
4142
raise ValueError(f"Unknown LastAction: {self}")
4243

4344

44-
def retry_task(exc: Exception | None = None) -> None:
45+
def retry_task(exc: Exception | None = None) -> NoReturn:
4546
"""
4647
Helper for triggering retry errors.
4748
If all retries have been consumed, this will raise a
@@ -55,6 +56,7 @@ def retry_task(exc: Exception | None = None) -> None:
5556
celery_retry = getattr(current_task, "retry", None)
5657
if celery_retry:
5758
current_task.retry(exc=exc)
59+
assert False, "unreachable"
5860
else:
5961
current = taskworker_current_task()
6062
if current and not current.retries_remaining:

src/sentry/workflow_engine/processors/delayed_workflow.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
)
6666
from sentry.workflow_engine.processors.workflow_fire_history import create_workflow_fire_histories
6767
from sentry.workflow_engine.tasks.actions import build_trigger_action_task_params, trigger_action
68+
from sentry.workflow_engine.tasks.utils import retry_timeouts
6869
from sentry.workflow_engine.types import DataConditionHandler, WorkflowEventData
6970
from sentry.workflow_engine.utils import log_context
7071

@@ -760,6 +761,7 @@ def repr_keys[T, V](d: dict[T, V]) -> dict[str, V]:
760761
),
761762
)
762763
@retry
764+
@retry_timeouts
763765
@log_context.root()
764766
def process_delayed_workflows(
765767
project_id: int, batch_key: str | None = None, *args: Any, **kwargs: Any

src/sentry/workflow_engine/tasks/utils.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
from functools import wraps
2+
3+
import sentry_sdk
14
from google.api_core.exceptions import DeadlineExceeded, RetryError, ServiceUnavailable
25

36
from sentry import nodestore
@@ -6,6 +9,8 @@
69
from sentry.issues.issue_occurrence import IssueOccurrence
710
from sentry.models.environment import Environment
811
from sentry.models.group import Group
12+
from sentry.taskworker.retry import retry_task
13+
from sentry.taskworker.workerchild import ProcessingDeadlineExceeded
914
from sentry.types.activity import ActivityType
1015
from sentry.utils.retries import ConditionalRetryPolicy, exponential_delay
1116
from sentry.workflow_engine.types import WorkflowEventData
@@ -82,3 +87,20 @@ def build_workflow_event_data_from_event(
8287
has_escalated=has_escalated,
8388
workflow_env=workflow_env,
8489
)
90+
91+
92+
def retry_timeouts(func):
93+
"""
94+
Schedule a task retry if the function raises ProcessingDeadlineExceeded.
95+
This exists because the standard retry decorator doesn't allow BaseExceptions.
96+
"""
97+
98+
@wraps(func)
99+
def wrapper(*args, **kwargs):
100+
try:
101+
return func(*args, **kwargs)
102+
except ProcessingDeadlineExceeded:
103+
sentry_sdk.capture_exception(level="info")
104+
retry_task()
105+
106+
return wrapper

tests/sentry/tasks/test_base.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,12 +160,17 @@ def test_validate_parameters_call():
160160
assert "region_task was called with a parameter that cannot be JSON encoded" in str(err)
161161

162162

163+
@override_settings(SILO_MODE=SiloMode.CONTROL)
163164
@patch("sentry.taskworker.retry.current_task")
164165
@patch("sentry_sdk.capture_exception")
165166
def test_retry_on(capture_exception, current_task):
167+
class ExpectedException(Exception):
168+
pass
169+
170+
current_task.retry.side_effect = ExpectedException("some exception")
166171

167-
# In reality current_task.retry will cause the given exception to be re-raised but we patch it here so no need to .raises :bufo-shrug:
168-
retry_on_task("bruh")
172+
with pytest.raises(ExpectedException):
173+
retry_on_task("bruh")
169174

170175
assert capture_exception.call_count == 1
171176
assert current_task.retry.call_count == 1

0 commit comments

Comments
 (0)