Skip to content

Commit 76f649b

Browse files
authored
feat(workflow_engine): Hook up activity updates to process_workflows (#94678)
## Description Complete support for `Activity` updates from the issue platform. This will filter to known activities that we can handle (right now, it's just issue resolution). - Needed to fix a few areas of logic to support the full end to end of `process_workflows` with an activity update. - Added Tests to ensure we could create an activity, run the task and it will fetch any associated data an evaluate as exected.
1 parent e68403a commit 76f649b

File tree

4 files changed

+210
-32
lines changed

4 files changed

+210
-32
lines changed

src/sentry/workflow_engine/processors/workflow.py

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
from sentry import buffer, features
1313
from sentry.eventstore.models import GroupEvent
14+
from sentry.models.activity import Activity
1415
from sentry.models.environment import Environment
1516
from sentry.utils import json
1617
from sentry.workflow_engine.models import (
@@ -182,7 +183,7 @@ def evaluate_workflow_triggers(
182183
"group_id": event_data.group.id,
183184
"event_id": event_id,
184185
"event_data": asdict(event_data),
185-
"event_environment_id": environment.id,
186+
"event_environment_id": environment.id if environment else None,
186187
"triggered_workflows": [workflow.id for workflow in triggered_workflows],
187188
},
188189
)
@@ -271,7 +272,7 @@ def evaluate_workflows_action_filters(
271272
return filtered_action_groups
272273

273274

274-
def get_environment_by_event(event_data: WorkflowEventData) -> Environment:
275+
def get_environment_by_event(event_data: WorkflowEventData) -> Environment | None:
275276
if isinstance(event_data.event, GroupEvent):
276277
try:
277278
environment = event_data.event.get_environment()
@@ -283,22 +284,27 @@ def get_environment_by_event(event_data: WorkflowEventData) -> Environment:
283284
raise Environment.DoesNotExist("Environment does not exist for the event")
284285

285286
return environment
287+
elif isinstance(event_data.event, Activity):
288+
return None
286289

287-
raise TypeError(
288-
"Expected event_data.event to be an instance of GroupEvent, got %s" % type(event_data.event)
289-
)
290+
raise TypeError(f"Cannot access the environment from, {type(event_data.event)}.")
290291

291292

292293
def _get_associated_workflows(
293-
detector: Detector, environment: Environment, event_data: WorkflowEventData
294+
detector: Detector, environment: Environment | None, event_data: WorkflowEventData
294295
) -> set[Workflow]:
295296
"""
296297
This is a wrapper method to get the workflows associated with a detector and environment.
297298
Used in process_workflows to wrap the query + logging into a single method
298299
"""
300+
environment_filter = (
301+
(Q(environment_id=None) | Q(environment_id=environment.id))
302+
if environment
303+
else Q(environment_id=None)
304+
)
299305
workflows = set(
300306
Workflow.objects.filter(
301-
(Q(environment_id=None) | Q(environment_id=environment.id)),
307+
environment_filter,
302308
detectorworkflow__detector_id=detector.id,
303309
enabled=True,
304310
)
@@ -324,7 +330,7 @@ def _get_associated_workflows(
324330
"group_id": event_data.group.id,
325331
"event_id": event_id,
326332
"event_data": asdict(event_data),
327-
"event_environment_id": environment.id,
333+
"event_environment_id": environment.id if environment else None,
328334
"workflows": [workflow.id for workflow in workflows],
329335
"detector_type": detector.type,
330336
},
@@ -335,7 +341,7 @@ def _get_associated_workflows(
335341

336342
@log_context.root()
337343
def process_workflows(
338-
event_data: WorkflowEventData, detector_id: DetectorId = None
344+
event_data: WorkflowEventData, detector: Detector | None = None
339345
) -> set[Workflow]:
340346
"""
341347
This method will get the detector based on the event, and then gather the associated workflows.
@@ -345,13 +351,11 @@ def process_workflows(
345351
Finally, each of the triggered workflows will have their actions evaluated and executed.
346352
"""
347353
try:
348-
detector: Detector
349-
if detector_id is not None:
350-
detector = Detector.objects.get(id=detector_id)
351-
elif isinstance(event_data.event, GroupEvent):
354+
if detector is None and isinstance(event_data.event, GroupEvent):
352355
detector = get_detector_by_event(event_data)
353-
else:
354-
raise ValueError("Unable to determine the detector_id for the event")
356+
357+
if detector is None:
358+
raise ValueError("Unable to determine the detector for the event")
355359

356360
log_context.add_extras(detector_id=detector.id)
357361
organization = detector.project.organization
@@ -397,9 +401,11 @@ def process_workflows(
397401

398402
actions_to_trigger = evaluate_workflows_action_filters(triggered_workflows, event_data)
399403
actions = filter_recently_fired_workflow_actions(actions_to_trigger, event_data)
404+
400405
if not actions:
401406
# If there aren't any actions on the associated workflows, there's nothing to trigger
402407
return triggered_workflows
408+
403409
create_workflow_fire_histories(detector, actions, event_data)
404410

405411
with sentry_sdk.start_span(op="workflow_engine.process_workflows.trigger_actions"):

src/sentry/workflow_engine/tasks.py

Lines changed: 41 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1+
from django.db import router, transaction
12
from google.api_core.exceptions import DeadlineExceeded, RetryError, ServiceUnavailable
23

3-
from sentry import nodestore
4+
from sentry import features, nodestore
45
from sentry.eventstore.models import Event, GroupEvent
56
from sentry.eventstream.base import GroupState
67
from sentry.issues.issue_occurrence import IssueOccurrence
@@ -15,6 +16,7 @@
1516
from sentry.types.activity import ActivityType
1617
from sentry.utils import metrics
1718
from sentry.utils.retries import ConditionalRetryPolicy, exponential_delay
19+
from sentry.workflow_engine.models import Detector
1820
from sentry.workflow_engine.processors.workflow import process_workflows
1921
from sentry.workflow_engine.types import WorkflowEventData
2022
from sentry.workflow_engine.utils import log_context
@@ -42,17 +44,39 @@
4244
),
4345
),
4446
)
45-
def process_workflow_activity(activity_id: int, detector_id: int) -> None:
47+
def process_workflow_activity(activity_id: int, group_id: int, detector_id: int) -> None:
4648
"""
47-
Process a workflow task identified by the given Activity ID and Detector ID.
49+
Process a workflow task identified by the given activity, group, and detector.
4850
4951
The task will get the Activity from the database, create a WorkflowEventData object,
5052
and then process the data in `process_workflows`.
5153
"""
52-
# TODO - @saponifi3d - implement this in a follow-up PR. This update will require WorkflowEventData
53-
# to allow for an activity in the `event` attribute. That refactor is a bit noisy
54-
# and will be done in a subsequent pr.
55-
pass
54+
with transaction.atomic(router.db_for_write(Detector)):
55+
try:
56+
activity = Activity.objects.get(id=activity_id)
57+
group = Group.objects.get(id=group_id)
58+
detector = Detector.objects.get(id=detector_id)
59+
except (Activity.DoesNotExist, Group.DoesNotExist, Detector.DoesNotExist):
60+
logger.exception(
61+
"Unable to fetch data to process workflow activity",
62+
extra={
63+
"activity_id": activity_id,
64+
"group_id": group_id,
65+
"detector_id": detector_id,
66+
},
67+
)
68+
return # Exit execution that we cannot recover from
69+
70+
event_data = WorkflowEventData(
71+
event=activity,
72+
group=group,
73+
)
74+
75+
process_workflows(event_data, detector)
76+
metrics.incr(
77+
"workflow_engine.process_workflow.activity_update.executed",
78+
tags={"activity_type": activity.type},
79+
)
5680

5781

5882
@group_status_update_registry.register("workflow_status_update")
@@ -63,8 +87,10 @@ def workflow_status_update_handler(
6387
Hook the process_workflow_task into the activity creation registry.
6488
6589
Since this handler is called in process for the activity, we want
66-
to queue a task to process workflows asynchronously.
67-
"""
90+
to queue a task to process workflows asynchronously."""
91+
metrics.incr(
92+
"workflow_engine.process_workflow.activity_update", tags={"activity_type": activity.type}
93+
)
6894
if activity.type not in SUPPORTED_ACTIVITIES:
6995
# If the activity type is not supported, we do not need to process it.
7096
return
@@ -77,11 +103,12 @@ def workflow_status_update_handler(
77103
metrics.incr("workflow_engine.error.tasks.no_detector_id")
78104
return
79105

80-
# TODO - implement in follow-up PR for now, just track a metric that we are seeing the activities.
81-
# process_workflow_task.delay(activity.id, detector_id)
82-
metrics.incr(
83-
"workflow_engine.process_workflow.activity_update", tags={"activity_type": activity.type}
84-
)
106+
if features.has("organizations:workflow-engine-process-activity", group.organization):
107+
process_workflow_activity.delay(
108+
activity_id=activity.id,
109+
group_id=group.id,
110+
detector_id=detector_id,
111+
)
85112

86113

87114
def _should_retry_nodestore_fetch(attempt: int, e: Exception) -> bool:

tests/sentry/workflow_engine/test_task.py

Lines changed: 146 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,14 @@
66
from sentry.models.activity import Activity
77
from sentry.models.group import GroupStatus
88
from sentry.testutils.cases import TestCase
9+
from sentry.testutils.helpers import with_feature
910
from sentry.types.activity import ActivityType
10-
from sentry.workflow_engine.tasks import fetch_event, workflow_status_update_handler
11+
from sentry.workflow_engine.tasks import (
12+
fetch_event,
13+
process_workflow_activity,
14+
workflow_status_update_handler,
15+
)
16+
from sentry.workflow_engine.types import WorkflowEventData
1117

1218

1319
class FetchEventTests(TestCase):
@@ -57,3 +63,142 @@ def test__no_detector_id(self):
5763
with mock.patch("sentry.workflow_engine.tasks.metrics.incr") as mock_incr:
5864
workflow_status_update_handler(group, message, activity)
5965
mock_incr.assert_called_with("workflow_engine.error.tasks.no_detector_id")
66+
67+
def test__feature_flag(self):
68+
detector = self.create_detector(project=self.project)
69+
group = self.create_group(project=self.project)
70+
activity = Activity(
71+
project=self.project,
72+
group=group,
73+
type=ActivityType.SET_RESOLVED.value,
74+
data={"fingerprint": ["test_fingerprint"]},
75+
)
76+
message = StatusChangeMessageData(
77+
id="test_message_id",
78+
project_id=self.project.id,
79+
new_status=GroupStatus.RESOLVED,
80+
new_substatus=None,
81+
fingerprint=["test_fingerprint"],
82+
detector_id=detector.id,
83+
)
84+
85+
with mock.patch(
86+
"sentry.workflow_engine.tasks.process_workflow_activity.delay"
87+
) as mock_delay:
88+
workflow_status_update_handler(group, message, activity)
89+
mock_delay.assert_not_called()
90+
91+
@with_feature("organizations:workflow-engine-process-activity")
92+
def test(self):
93+
detector = self.create_detector(project=self.project)
94+
group = self.create_group(project=self.project)
95+
activity = Activity(
96+
project=self.project,
97+
group=group,
98+
type=ActivityType.SET_RESOLVED.value,
99+
data={"fingerprint": ["test_fingerprint"]},
100+
)
101+
message = StatusChangeMessageData(
102+
id="test_message_id",
103+
project_id=self.project.id,
104+
new_status=GroupStatus.RESOLVED,
105+
new_substatus=None,
106+
fingerprint=["test_fingerprint"],
107+
detector_id=detector.id,
108+
)
109+
110+
with mock.patch(
111+
"sentry.workflow_engine.tasks.process_workflow_activity.delay"
112+
) as mock_delay:
113+
workflow_status_update_handler(group, message, activity)
114+
mock_delay.assert_called_once_with(
115+
activity_id=activity.id,
116+
group_id=group.id,
117+
detector_id=detector.id,
118+
)
119+
120+
121+
class TestProcessWorkflowActivity(TestCase):
122+
def setUp(self):
123+
self.group = self.create_group(project=self.project)
124+
self.activity = Activity(
125+
project=self.project,
126+
group=self.group,
127+
type=ActivityType.SET_RESOLVED.value,
128+
data={"fingerprint": ["test_fingerprint"]},
129+
)
130+
self.activity.save()
131+
self.detector = self.create_detector()
132+
133+
def test_process_workflow_activity__no_workflows(self):
134+
with mock.patch(
135+
"sentry.workflow_engine.processors.workflow.evaluate_workflow_triggers",
136+
return_value=set(),
137+
) as mock_evaluate:
138+
process_workflow_activity.run(
139+
activity_id=self.activity.id,
140+
group_id=self.group.id,
141+
detector_id=self.detector.id,
142+
)
143+
# Short-circuit evaluation, no workflows associated
144+
assert mock_evaluate.call_count == 0
145+
146+
@mock.patch(
147+
"sentry.workflow_engine.processors.workflow.evaluate_workflow_triggers", return_value=set()
148+
)
149+
@mock.patch(
150+
"sentry.workflow_engine.processors.workflow.evaluate_workflows_action_filters",
151+
return_value=set(),
152+
)
153+
def test_process_workflow_activity__workflows__no_actions(
154+
self, mock_eval_actions, mock_evaluate
155+
):
156+
self.workflow = self.create_workflow(organization=self.organization)
157+
self.create_detector_workflow(
158+
detector=self.detector,
159+
workflow=self.workflow,
160+
)
161+
162+
process_workflow_activity.run(
163+
activity_id=self.activity.id,
164+
group_id=self.group.id,
165+
detector_id=self.detector.id,
166+
)
167+
168+
event_data = WorkflowEventData(
169+
event=self.activity,
170+
group=self.group,
171+
)
172+
173+
mock_evaluate.assert_called_once_with({self.workflow}, event_data)
174+
assert mock_eval_actions.call_count == 0
175+
176+
@mock.patch("sentry.workflow_engine.processors.workflow.filter_recently_fired_workflow_actions")
177+
def test_process_workflow_activity(self, mock_filter_actions):
178+
self.workflow = self.create_workflow(organization=self.organization)
179+
180+
self.action_group = self.create_data_condition_group(logic_type="any-short")
181+
self.action = self.create_action()
182+
self.create_data_condition_group_action(
183+
condition_group=self.action_group,
184+
action=self.action,
185+
)
186+
self.create_workflow_data_condition_group(self.workflow, self.action_group)
187+
188+
self.create_detector_workflow(
189+
detector=self.detector,
190+
workflow=self.workflow,
191+
)
192+
193+
expected_event_data = WorkflowEventData(
194+
event=self.activity,
195+
group=self.group,
196+
)
197+
198+
process_workflow_activity.run(
199+
activity_id=self.activity.id,
200+
group_id=self.group.id,
201+
detector_id=self.detector.id,
202+
)
203+
204+
mock_filter_actions.assert_called_once_with({self.action_group}, expected_event_data)

tests/sentry/workflow_engine/test_task_integration.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ def test_handler_invoked__when_update_status_called(self):
4444
with mock.patch("sentry.workflow_engine.tasks.metrics.incr") as mock_incr:
4545
_process_message(message)
4646

47-
mock_incr.assert_called_with(
47+
mock_incr.assert_any_call(
4848
"workflow_engine.process_workflow.activity_update",
4949
tags={"activity_type": ActivityType.SET_RESOLVED.value},
5050
)
@@ -66,7 +66,7 @@ def test_handler_invoked__when_resolved(self):
6666

6767
with mock.patch("sentry.workflow_engine.tasks.metrics.incr") as mock_incr:
6868
update_status(self.group, message)
69-
mock_incr.assert_called_with(
69+
mock_incr.assert_any_call(
7070
"workflow_engine.process_workflow.activity_update",
7171
tags={"activity_type": ActivityType.SET_RESOLVED.value},
7272
)

0 commit comments

Comments
 (0)