diff --git a/src/sentry/sentry_apps/tasks/sentry_apps.py b/src/sentry/sentry_apps/tasks/sentry_apps.py index 6c71280cf384df..f1406b49c16287 100644 --- a/src/sentry/sentry_apps/tasks/sentry_apps.py +++ b/src/sentry/sentry_apps/tasks/sentry_apps.py @@ -330,7 +330,7 @@ def _process_resource_change( except model.DoesNotExist as e: # Explicitly requeue the task, so we don't report this to Sentry until # we hit the max number of retries. - return retry_task(e) + retry_task(e) org = None diff --git a/src/sentry/taskworker/retry.py b/src/sentry/taskworker/retry.py index 3074608bd15571..04c8fba951a932 100644 --- a/src/sentry/taskworker/retry.py +++ b/src/sentry/taskworker/retry.py @@ -2,6 +2,7 @@ from enum import Enum from multiprocessing.context import TimeoutError +from typing import NoReturn from celery import current_task from sentry_protos.taskbroker.v1.taskbroker_pb2 import ( @@ -41,7 +42,7 @@ def to_proto(self) -> OnAttemptsExceeded.ValueType: raise ValueError(f"Unknown LastAction: {self}") -def retry_task(exc: Exception | None = None) -> None: +def retry_task(exc: Exception | None = None) -> NoReturn: """ Helper for triggering retry errors. If all retries have been consumed, this will raise a @@ -55,6 +56,7 @@ def retry_task(exc: Exception | None = None) -> None: celery_retry = getattr(current_task, "retry", None) if celery_retry: current_task.retry(exc=exc) + assert False, "unreachable" else: current = taskworker_current_task() if current and not current.retries_remaining: diff --git a/src/sentry/workflow_engine/processors/delayed_workflow.py b/src/sentry/workflow_engine/processors/delayed_workflow.py index 7062fb5fc11132..9b8f74e12b3a48 100644 --- a/src/sentry/workflow_engine/processors/delayed_workflow.py +++ b/src/sentry/workflow_engine/processors/delayed_workflow.py @@ -65,6 +65,7 @@ ) from sentry.workflow_engine.processors.workflow_fire_history import create_workflow_fire_histories from sentry.workflow_engine.tasks.actions import build_trigger_action_task_params, trigger_action +from sentry.workflow_engine.tasks.utils import retry_timeouts from sentry.workflow_engine.types import DataConditionHandler, WorkflowEventData from sentry.workflow_engine.utils import log_context @@ -760,6 +761,7 @@ def repr_keys[T, V](d: dict[T, V]) -> dict[str, V]: ), ) @retry +@retry_timeouts @log_context.root() def process_delayed_workflows( project_id: int, batch_key: str | None = None, *args: Any, **kwargs: Any diff --git a/src/sentry/workflow_engine/tasks/utils.py b/src/sentry/workflow_engine/tasks/utils.py index 38bfa8c55f6ea7..b91cd61fdd9ca4 100644 --- a/src/sentry/workflow_engine/tasks/utils.py +++ b/src/sentry/workflow_engine/tasks/utils.py @@ -1,3 +1,6 @@ +from functools import wraps + +import sentry_sdk from google.api_core.exceptions import DeadlineExceeded, RetryError, ServiceUnavailable from sentry import nodestore @@ -6,6 +9,8 @@ from sentry.issues.issue_occurrence import IssueOccurrence from sentry.models.environment import Environment from sentry.models.group import Group +from sentry.taskworker.retry import retry_task +from sentry.taskworker.workerchild import ProcessingDeadlineExceeded from sentry.types.activity import ActivityType from sentry.utils.retries import ConditionalRetryPolicy, exponential_delay from sentry.workflow_engine.types import WorkflowEventData @@ -82,3 +87,20 @@ def build_workflow_event_data_from_event( has_escalated=has_escalated, workflow_env=workflow_env, ) + + +def retry_timeouts(func): + """ + Schedule a task retry if the function raises ProcessingDeadlineExceeded. + This exists because the standard retry decorator doesn't allow BaseExceptions. + """ + + @wraps(func) + def wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + except ProcessingDeadlineExceeded: + sentry_sdk.capture_exception(level="info") + retry_task() + + return wrapper diff --git a/tests/sentry/tasks/test_base.py b/tests/sentry/tasks/test_base.py index 836429b7e99425..7609f0cb933208 100644 --- a/tests/sentry/tasks/test_base.py +++ b/tests/sentry/tasks/test_base.py @@ -160,12 +160,17 @@ def test_validate_parameters_call(): assert "region_task was called with a parameter that cannot be JSON encoded" in str(err) +@override_settings(SILO_MODE=SiloMode.CONTROL) @patch("sentry.taskworker.retry.current_task") @patch("sentry_sdk.capture_exception") def test_retry_on(capture_exception, current_task): + class ExpectedException(Exception): + pass + + current_task.retry.side_effect = ExpectedException("some exception") - # In reality current_task.retry will cause the given exception to be re-raised but we patch it here so no need to .raises :bufo-shrug: - retry_on_task("bruh") + with pytest.raises(ExpectedException): + retry_on_task("bruh") assert capture_exception.call_count == 1 assert current_task.retry.call_count == 1