Skip to content

fix(aci): Retry process_delayed_workflows timeouts #95379

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/sentry/sentry_apps/tasks/sentry_apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ def _process_resource_change(
except model.DoesNotExist as e:
# Explicitly requeue the task, so we don't report this to Sentry until
# we hit the max number of retries.
return retry_task(e)
retry_task(e)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq: why did we have to remove return from here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I redefined retry_task as officially never returning as far as the type system is concerned.
So, this return was kinda incorrect, and due to my change mypy was able to flag it.


org = None

Expand Down
4 changes: 3 additions & 1 deletion src/sentry/taskworker/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from enum import Enum
from multiprocessing.context import TimeoutError
from typing import NoReturn

from celery import current_task
from sentry_protos.taskbroker.v1.taskbroker_pb2 import (
Expand Down Expand Up @@ -41,7 +42,7 @@ def to_proto(self) -> OnAttemptsExceeded.ValueType:
raise ValueError(f"Unknown LastAction: {self}")


def retry_task(exc: Exception | None = None) -> None:
def retry_task(exc: Exception | None = None) -> NoReturn:
"""
Helper for triggering retry errors.
If all retries have been consumed, this will raise a
Expand All @@ -55,6 +56,7 @@ def retry_task(exc: Exception | None = None) -> None:
celery_retry = getattr(current_task, "retry", None)
if celery_retry:
current_task.retry(exc=exc)
assert False, "unreachable"
else:
current = taskworker_current_task()
if current and not current.retries_remaining:
Expand Down
2 changes: 2 additions & 0 deletions src/sentry/workflow_engine/processors/delayed_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
)
from sentry.workflow_engine.processors.workflow_fire_history import create_workflow_fire_histories
from sentry.workflow_engine.tasks.actions import build_trigger_action_task_params, trigger_action
from sentry.workflow_engine.tasks.utils import retry_timeouts
from sentry.workflow_engine.types import DataConditionHandler, WorkflowEventData
from sentry.workflow_engine.utils import log_context

Expand Down Expand Up @@ -760,6 +761,7 @@ def repr_keys[T, V](d: dict[T, V]) -> dict[str, V]:
),
)
@retry
@retry_timeouts
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any thoughts on some convenience decorator to group the 2 retries together. something like @retry_including_timeouts

probably will increase adoption. there are a lot of ecosystem tasks i would use this for.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

follow-up coming soon.

@log_context.root()
def process_delayed_workflows(
project_id: int, batch_key: str | None = None, *args: Any, **kwargs: Any
Expand Down
22 changes: 22 additions & 0 deletions src/sentry/workflow_engine/tasks/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from functools import wraps

import sentry_sdk
from google.api_core.exceptions import DeadlineExceeded, RetryError, ServiceUnavailable

from sentry import nodestore
Expand All @@ -6,6 +9,8 @@
from sentry.issues.issue_occurrence import IssueOccurrence
from sentry.models.environment import Environment
from sentry.models.group import Group
from sentry.taskworker.retry import retry_task
from sentry.taskworker.workerchild import ProcessingDeadlineExceeded
from sentry.types.activity import ActivityType
from sentry.utils.retries import ConditionalRetryPolicy, exponential_delay
from sentry.workflow_engine.types import WorkflowEventData
Expand Down Expand Up @@ -82,3 +87,20 @@ def build_workflow_event_data_from_event(
has_escalated=has_escalated,
workflow_env=workflow_env,
)


def retry_timeouts(func):
"""
Schedule a task retry if the function raises ProcessingDeadlineExceeded.
This exists because the standard retry decorator doesn't allow BaseExceptions.
"""
Comment on lines +92 to +96
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you find yourself using this frequently, we could introduce deadline approaching exceptions that happen some time before the processing deadline is crossed.


@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except ProcessingDeadlineExceeded:
sentry_sdk.capture_exception(level="info")
retry_task()

return wrapper
9 changes: 7 additions & 2 deletions tests/sentry/tasks/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,17 @@ def test_validate_parameters_call():
assert "region_task was called with a parameter that cannot be JSON encoded" in str(err)


@override_settings(SILO_MODE=SiloMode.CONTROL)
@patch("sentry.taskworker.retry.current_task")
@patch("sentry_sdk.capture_exception")
def test_retry_on(capture_exception, current_task):
class ExpectedException(Exception):
pass

current_task.retry.side_effect = ExpectedException("some exception")

# In reality current_task.retry will cause the given exception to be re-raised but we patch it here so no need to .raises :bufo-shrug:
retry_on_task("bruh")
with pytest.raises(ExpectedException):
retry_on_task("bruh")

assert capture_exception.call_count == 1
assert current_task.retry.call_count == 1
Expand Down
Loading