Skip to content

Commit e3c4bfb

Browse files
authored
✨ feat(aci): add task for trigger actions (#95159)
1 parent 40ba0d0 commit e3c4bfb

File tree

16 files changed

+288
-114
lines changed

16 files changed

+288
-114
lines changed

src/sentry/conf/server.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -853,6 +853,8 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
853853
"sentry.integrations.vsts.tasks.kickoff_subscription_check",
854854
"sentry.integrations.tasks",
855855
"sentry.demo_mode.tasks",
856+
"sentry.workflow_engine.tasks.workflows",
857+
"sentry.workflow_engine.tasks.actions",
856858
)
857859

858860
# Enable split queue routing
@@ -1023,6 +1025,7 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
10231025
Queue("release_registry", routing_key="release_registry"),
10241026
Queue("seer.seer_automation", routing_key="seer.seer_automation"),
10251027
Queue("workflow_engine.process_workflows", routing_key="workflow_engine.process_workflows"),
1028+
Queue("workflow_engine.trigger_action", routing_key="workflow_engine.trigger_action"),
10261029
]
10271030

10281031
from celery.schedules import crontab
@@ -1516,6 +1519,8 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
15161519
"sentry.uptime.rdap.tasks",
15171520
"sentry.uptime.subscriptions.tasks",
15181521
"sentry.workflow_engine.processors.delayed_workflow",
1522+
"sentry.workflow_engine.tasks.workflows",
1523+
"sentry.workflow_engine.tasks.actions",
15191524
# Used for tests
15201525
"sentry.taskworker.tasks.examples",
15211526
)

src/sentry/features/temporary.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,8 @@ def register_temporary_features(manager: FeatureManager):
515515
manager.add("organizations:workflow-engine-rule-serializers", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=False)
516516
# Enable async processing of event workflows in a task rather than as part of post_process.
517517
manager.add("organizations:workflow-engine-post-process-async", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=False)
518+
# Enable async processing of actions in a task rather than as part of post_process.
519+
manager.add("organizations:workflow-engine-action-trigger-async", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=False)
518520
# Enable EventUniqueUserFrequencyConditionWithConditions special alert condition
519521
manager.add("organizations:event-unique-user-frequency-condition-with-conditions", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=True)
520522
# Use spans instead of transactions for dynamic sampling calculations. This will become the new default.

src/sentry/tasks/post_process.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -956,7 +956,7 @@ def process_workflow_engine(job: PostProcessJob) -> None:
956956
metrics.incr("workflow_engine.issue_platform.payload.received.occurrence")
957957

958958
from sentry.workflow_engine.processors.workflow import process_workflows
959-
from sentry.workflow_engine.tasks import process_workflows_event
959+
from sentry.workflow_engine.tasks.workflows import process_workflows_event
960960

961961
# PostProcessJob event is optional, WorkflowEventData event is required
962962
if "event" not in job:

src/sentry/workflow_engine/models/action.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from sentry.backup.scopes import RelocationScope
1515
from sentry.db.models import DefaultFieldsModel, region_silo_model, sane_repr
1616
from sentry.db.models.fields.hybrid_cloud_foreign_key import HybridCloudForeignKey
17+
from sentry.utils import metrics
1718
from sentry.workflow_engine.models.json_config import JSONConfigBase
1819
from sentry.workflow_engine.registry import action_handler_registry
1920
from sentry.workflow_engine.types import ActionHandler, WorkflowEventData
@@ -88,6 +89,11 @@ def trigger(self, event_data: WorkflowEventData, detector: Detector) -> None:
8889
handler = self.get_handler()
8990
handler.execute(event_data, self, detector)
9091

92+
metrics.incr(
93+
"workflow_engine.action.trigger",
94+
tags={"action_type": self.type, "detector_type": detector.type},
95+
)
96+
9197
logger.info(
9298
"workflow_engine.action.trigger",
9399
extra={

src/sentry/workflow_engine/processors/delayed_workflow.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
evaluate_workflows_action_filters,
6262
)
6363
from sentry.workflow_engine.processors.workflow_fire_history import create_workflow_fire_histories
64+
from sentry.workflow_engine.tasks.actions import build_trigger_action_task_params, trigger_action
6465
from sentry.workflow_engine.types import DataConditionHandler, WorkflowEventData
6566
from sentry.workflow_engine.utils import log_context
6667

@@ -646,7 +647,16 @@ def fire_actions_for_groups(
646647
organization,
647648
):
648649
for action in filtered_actions:
649-
action.trigger(workflow_event_data, detector)
650+
if features.has(
651+
"organizations:workflow-engine-action-trigger-async",
652+
organization,
653+
):
654+
task_params = build_trigger_action_task_params(
655+
action, detector, workflow_event_data
656+
)
657+
trigger_action.delay(**task_params)
658+
else:
659+
action.trigger(workflow_event_data, detector)
650660

651661
logger.info(
652662
"workflow_engine.delayed_workflow.triggered_actions_summary",

src/sentry/workflow_engine/processors/workflow.py

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from sentry.workflow_engine.processors.data_condition_group import process_data_condition_group
3030
from sentry.workflow_engine.processors.detector import get_detector_by_event
3131
from sentry.workflow_engine.processors.workflow_fire_history import create_workflow_fire_histories
32+
from sentry.workflow_engine.tasks.actions import build_trigger_action_task_params, trigger_action
3233
from sentry.workflow_engine.types import WorkflowEventData
3334
from sentry.workflow_engine.utils import log_context
3435
from sentry.workflow_engine.utils.metrics import metrics_incr
@@ -414,19 +415,14 @@ def process_workflows(
414415
organization,
415416
):
416417
for action in actions:
417-
action.trigger(event_data, detector)
418-
metrics_incr(
419-
"action.trigger",
420-
tags={"action_type": action.type},
421-
)
422-
423-
logger.info(
424-
"workflow_engine.action.trigger",
425-
extra={
426-
"action_id": action.id,
427-
"event_data": asdict(event_data),
428-
},
429-
)
418+
if features.has(
419+
"organizations:workflow-engine-action-trigger-async",
420+
organization,
421+
):
422+
task_params = build_trigger_action_task_params(action, detector, event_data)
423+
trigger_action.delay(**task_params)
424+
else:
425+
action.trigger(event_data, detector)
430426
else:
431427
logger.info(
432428
"workflow_engine.triggered_actions",

src/sentry/workflow_engine/tasks/__init__.py

Whitespace-only changes.
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
from django.db.models import Value
2+
3+
from sentry.eventstore.models import GroupEvent
4+
from sentry.eventstream.base import GroupState
5+
from sentry.models.activity import Activity
6+
from sentry.silo.base import SiloMode
7+
from sentry.tasks.base import instrumented_task
8+
from sentry.taskworker import config, namespaces
9+
from sentry.taskworker.retry import Retry
10+
from sentry.utils import metrics
11+
from sentry.workflow_engine.models import Action, Detector
12+
from sentry.workflow_engine.tasks.utils import build_workflow_event_data_from_event
13+
from sentry.workflow_engine.types import WorkflowEventData
14+
from sentry.workflow_engine.utils import log_context
15+
16+
logger = log_context.get_logger(__name__)
17+
18+
19+
def build_trigger_action_task_params(action, detector, event_data: WorkflowEventData):
20+
"""Build parameters for trigger_action.delay() call."""
21+
event_id = None
22+
activity_id = None
23+
occurrence_id = None
24+
25+
if isinstance(event_data.event, GroupEvent):
26+
event_id = event_data.event.event_id
27+
occurrence_id = event_data.event.occurrence_id
28+
elif isinstance(event_data.event, Activity):
29+
activity_id = event_data.event.id
30+
31+
return {
32+
"action_id": action.id,
33+
"detector_id": detector.id,
34+
"workflow_id": getattr(action, "workflow_id", None),
35+
"event_id": event_id,
36+
"activity_id": activity_id,
37+
"group_id": event_data.event.group_id,
38+
"occurrence_id": occurrence_id,
39+
"group_state": event_data.group_state,
40+
"has_reappeared": event_data.has_reappeared,
41+
"has_escalated": event_data.has_escalated,
42+
"workflow_env_id": event_data.workflow_env.id if event_data.workflow_env else None,
43+
}
44+
45+
46+
@instrumented_task(
47+
name="sentry.workflow_engine.tasks.trigger_action",
48+
queue="workflow_engine.trigger_action",
49+
acks_late=True,
50+
default_retry_delay=5,
51+
max_retries=3,
52+
soft_time_limit=25,
53+
time_limit=30,
54+
silo_mode=SiloMode.REGION,
55+
taskworker_config=config.TaskworkerConfig(
56+
namespace=namespaces.workflow_engine_tasks,
57+
processing_deadline_duration=30,
58+
retry=Retry(
59+
times=3,
60+
delay=5,
61+
),
62+
),
63+
)
64+
def trigger_action(
65+
action_id: int,
66+
detector_id: int,
67+
workflow_id: int,
68+
event_id: str | None,
69+
activity_id: int | None,
70+
group_id: int,
71+
occurrence_id: str | None,
72+
group_state: GroupState,
73+
has_reappeared: bool,
74+
has_escalated: bool,
75+
workflow_env_id: int | None,
76+
) -> None:
77+
78+
# XOR check to ensure exactly one of event_id or activity_id is provided
79+
if (event_id is not None) == (activity_id is not None):
80+
logger.error(
81+
"Exactly one of event_id or activity_id must be provided",
82+
extra={"event_id": event_id, "activity_id": activity_id},
83+
)
84+
raise ValueError("Exactly one of event_id or activity_id must be provided")
85+
86+
action = Action.objects.annotate(workflow_id=Value(workflow_id)).get(id=action_id)
87+
detector = Detector.objects.get(id=detector_id)
88+
89+
metrics.incr(
90+
"workflow_engine.tasks.trigger_action_task_started",
91+
tags={"action_type": action.type, "detector_type": detector.type},
92+
sample_rate=1.0,
93+
)
94+
95+
project_id = detector.project_id
96+
97+
if event_id is not None:
98+
event_data = build_workflow_event_data_from_event(
99+
project_id=project_id,
100+
event_id=event_id,
101+
group_id=group_id,
102+
occurrence_id=occurrence_id,
103+
group_state=group_state,
104+
has_reappeared=has_reappeared,
105+
has_escalated=has_escalated,
106+
workflow_env_id=workflow_env_id,
107+
)
108+
109+
# TODO(iamrajjoshi): remove this once we are sure everything is working as expected
110+
logger.info(
111+
"workflow_engine.tasks.trigger_action.build_workflow_event_data_from_event",
112+
extra={
113+
"action_id": action_id,
114+
"detector_id": detector_id,
115+
"workflow_id": workflow_id,
116+
},
117+
)
118+
119+
else:
120+
# Here, we probably build the event data from the activity
121+
raise NotImplementedError("Activity ID is not supported yet")
122+
123+
action.trigger(event_data, detector)
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
from google.api_core.exceptions import DeadlineExceeded, RetryError, ServiceUnavailable
2+
3+
from sentry import nodestore
4+
from sentry.eventstore.models import Event, GroupEvent
5+
from sentry.eventstream.base import GroupState
6+
from sentry.issues.issue_occurrence import IssueOccurrence
7+
from sentry.models.environment import Environment
8+
from sentry.models.group import Group
9+
from sentry.types.activity import ActivityType
10+
from sentry.utils.retries import ConditionalRetryPolicy, exponential_delay
11+
from sentry.workflow_engine.types import WorkflowEventData
12+
from sentry.workflow_engine.utils import log_context
13+
14+
SUPPORTED_ACTIVITIES = [ActivityType.SET_RESOLVED.value]
15+
16+
17+
logger = log_context.get_logger(__name__)
18+
19+
20+
def _should_retry_nodestore_fetch(attempt: int, e: Exception) -> bool:
21+
return not attempt > 3 and (
22+
# ServiceUnavailable and DeadlineExceeded are generally retriable;
23+
# we also include RetryError because the nodestore interface doesn't let
24+
# us specify a timeout to BigTable and the default is 5s; see c5e2b40.
25+
isinstance(e, (ServiceUnavailable, RetryError, DeadlineExceeded))
26+
)
27+
28+
29+
def fetch_event(event_id: str, project_id: int) -> Event | None:
30+
"""
31+
Fetch a single Event, with retries.
32+
"""
33+
node_id = Event.generate_node_id(project_id, event_id)
34+
fetch_retry_policy = ConditionalRetryPolicy(
35+
_should_retry_nodestore_fetch, exponential_delay(1.00)
36+
)
37+
data = fetch_retry_policy(lambda: nodestore.backend.get(node_id))
38+
if data is None:
39+
return None
40+
return Event(
41+
event_id=event_id,
42+
project_id=project_id,
43+
data=data,
44+
)
45+
46+
47+
def build_workflow_event_data_from_event(
48+
project_id: int,
49+
event_id: str,
50+
group_id: int,
51+
occurrence_id: str | None = None,
52+
group_state: GroupState | None = None,
53+
has_reappeared: bool = False,
54+
has_escalated: bool = False,
55+
workflow_env_id: int | None = None,
56+
) -> WorkflowEventData:
57+
"""
58+
Build a WorkflowEventData object from individual parameters.
59+
This method handles all the database fetching and object construction logic.
60+
"""
61+
62+
event = fetch_event(event_id, project_id)
63+
if event is None:
64+
raise ValueError(f"Event not found: event_id={event_id}, project_id={project_id}")
65+
66+
occurrence = IssueOccurrence.fetch(occurrence_id, project_id) if occurrence_id else None
67+
# TODO(iamrajjoshi): Should we use get_from_cache here?
68+
group = Group.objects.get(id=group_id)
69+
group_event = GroupEvent.from_event(event, group)
70+
group_event.occurrence = occurrence
71+
72+
# Fetch environment if provided
73+
workflow_env = None
74+
if workflow_env_id:
75+
workflow_env = Environment.objects.get(id=workflow_env_id)
76+
77+
return WorkflowEventData(
78+
event=group_event,
79+
group=group,
80+
group_state=group_state,
81+
has_reappeared=has_reappeared,
82+
has_escalated=has_escalated,
83+
workflow_env=workflow_env,
84+
)

0 commit comments

Comments
 (0)