Skip to content

Commit b16336f

Browse files
authored
ref(aci): fix N+1 query for DataConditionGroups in delayed workflow (#95058)
1 parent cd8375f commit b16336f

File tree

5 files changed

+455
-61
lines changed

5 files changed

+455
-61
lines changed

src/sentry/workflow_engine/processors/delayed_workflow.py

Lines changed: 76 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -50,16 +50,17 @@
5050
SLOW_CONDITIONS,
5151
Condition,
5252
)
53+
from sentry.workflow_engine.models.workflow_data_condition_group import WorkflowDataConditionGroup
5354
from sentry.workflow_engine.processors.action import filter_recently_fired_workflow_actions
5455
from sentry.workflow_engine.processors.data_condition_group import (
5556
evaluate_data_conditions,
5657
get_slow_conditions_for_groups,
5758
)
58-
from sentry.workflow_engine.processors.detector import get_detector_by_event
59+
from sentry.workflow_engine.processors.detector import get_detectors_by_groupevents_bulk
5960
from sentry.workflow_engine.processors.log_util import log_if_slow, track_batch_performance
6061
from sentry.workflow_engine.processors.workflow import (
6162
WORKFLOW_ENGINE_BUFFER_LIST_KEY,
62-
evaluate_workflows_action_filters,
63+
evaluate_action_filters,
6364
)
6465
from sentry.workflow_engine.processors.workflow_fire_history import create_workflow_fire_histories
6566
from sentry.workflow_engine.tasks.actions import build_trigger_action_task_params, trigger_action
@@ -567,6 +568,27 @@ def get_group_to_groupevent(
567568
return group_to_groupevent
568569

569570

571+
def get_dcgs_by_group(
572+
groups_to_fire: dict[GroupId, set[DataConditionGroup]],
573+
event_data: EventRedisData,
574+
dcg_type: DataConditionHandler.Group,
575+
) -> dict[GroupId, set[DataConditionGroup]]:
576+
"""
577+
Extract DataConditionGroups from groups_to_fire, grouped by group ID, for a particular DataConditionGroup type (e.g. workflow trigger)
578+
trigger_group_to_dcg_model is the mapping from DataConditionGroup type to DataConditionGroup id to Workflow id
579+
Returns a dict mapping GroupId to set of DCGs.
580+
"""
581+
workflow_dcg_ids = set(event_data.trigger_group_to_dcg_model[dcg_type].keys())
582+
583+
workflow_dcgs_by_group = {}
584+
for group_id, dcgs in groups_to_fire.items():
585+
workflow_dcgs = {dcg for dcg in dcgs if dcg.id in workflow_dcg_ids}
586+
if workflow_dcgs:
587+
workflow_dcgs_by_group[group_id] = workflow_dcgs
588+
589+
return workflow_dcgs_by_group
590+
591+
570592
@sentry_sdk.trace
571593
def fire_actions_for_groups(
572594
organization: Organization,
@@ -585,6 +607,27 @@ def fire_actions_for_groups(
585607
},
586608
)
587609

610+
workflow_triggers = get_dcgs_by_group(
611+
groups_to_fire, event_data, DataConditionHandler.Group.WORKFLOW_TRIGGER
612+
)
613+
action_filters = get_dcgs_by_group(
614+
groups_to_fire, event_data, DataConditionHandler.Group.ACTION_FILTER
615+
)
616+
all_workflow_triggers = set().union(*list(workflow_triggers.values()))
617+
618+
# Bulk fetch detectors
619+
event_id_to_detector = get_detectors_by_groupevents_bulk(list(group_to_groupevent.values()))
620+
621+
# Bulk fetch action filters for workflow triggers
622+
workflows = Workflow.objects.filter(when_condition_group_id__in=all_workflow_triggers)
623+
624+
dcg_to_workflow = {
625+
wdcg.condition_group: wdcg.workflow
626+
for wdcg in WorkflowDataConditionGroup.objects.select_related(
627+
"workflow", "condition_group"
628+
).filter(workflow__in=workflows)
629+
}
630+
588631
total_actions = 0
589632
with track_batch_performance(
590633
"workflow_engine.delayed_workflow.fire_actions_for_groups.loop",
@@ -594,42 +637,45 @@ def fire_actions_for_groups(
594637
for group, group_event in group_to_groupevent.items():
595638
with tracker.track(str(group.id)), log_context.new_context(group_id=group.id):
596639
workflow_event_data = WorkflowEventData(event=group_event, group=group)
597-
detector = get_detector_by_event(workflow_event_data)
598-
599-
workflow_triggers: set[DataConditionGroup] = set()
600-
action_filters: set[DataConditionGroup] = set()
601-
for dcg in groups_to_fire[group.id]:
602-
if (
603-
dcg.id
604-
in event_data.trigger_group_to_dcg_model[
605-
DataConditionHandler.Group.WORKFLOW_TRIGGER
606-
]
607-
):
608-
workflow_triggers.add(dcg)
609-
elif (
610-
dcg.id
611-
in event_data.trigger_group_to_dcg_model[
612-
DataConditionHandler.Group.ACTION_FILTER
613-
]
614-
):
615-
action_filters.add(dcg)
616-
617-
# process workflow_triggers
618-
workflows = set(
619-
Workflow.objects.filter(when_condition_group_id__in=workflow_triggers)
620-
)
640+
detector = event_id_to_detector.get(group_event.event_id)
641+
642+
if detector is None:
643+
logger.warning(
644+
"No detector found for event, skipping",
645+
extra={
646+
"event_id": group_event.event_id,
647+
"group_id": group.id,
648+
},
649+
)
650+
continue
651+
652+
workflow_triggers_for_group = workflow_triggers.get(group.id, set())
653+
action_filters_for_group = action_filters.get(group.id, set())
621654

622655
with log_if_slow(
623656
logger,
624657
"workflow_engine.delayed_workflow.slow_evaluate_workflows_action_filters",
625658
extra={"event_data": workflow_event_data},
626659
threshold_seconds=1,
627660
):
628-
workflows_actions = evaluate_workflows_action_filters(
629-
workflows, workflow_event_data
661+
# Process workflow filters for passing trigger groups
662+
triggered_workflow_ids = {
663+
event_data.dcg_to_workflow[dcg.id] for dcg in workflow_triggers_for_group
664+
}
665+
666+
filter_dcg_to_workflow: dict[DataConditionGroup, Workflow] = {
667+
dcg: workflow
668+
for dcg, workflow in dcg_to_workflow.items()
669+
if workflow.id in triggered_workflow_ids
670+
}
671+
672+
workflows_actions = evaluate_action_filters(
673+
workflow_event_data,
674+
filter_dcg_to_workflow,
630675
)
676+
631677
filtered_actions = filter_recently_fired_workflow_actions(
632-
action_filters | workflows_actions, workflow_event_data
678+
action_filters_for_group | workflows_actions, workflow_event_data
633679
)
634680
create_workflow_fire_histories(detector, filtered_actions, workflow_event_data)
635681

@@ -647,7 +693,7 @@ def fire_actions_for_groups(
647693
logger.debug(
648694
"workflow_engine.delayed_workflow.triggered_actions",
649695
extra={
650-
"workflow_ids": [workflow.id for workflow in workflows],
696+
"workflow_ids": triggered_workflow_ids,
651697
"actions": [action.id for action in filtered_actions],
652698
"event_data": workflow_event_data,
653699
"event_id": event_id,

src/sentry/workflow_engine/processors/detector.py

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
from __future__ import annotations
22

33
import logging
4+
from collections import defaultdict
5+
from collections.abc import Callable, Mapping
6+
from typing import NamedTuple
47

58
import sentry_sdk
69

10+
from sentry.db.models.manager.base_query_set import BaseQuerySet
711
from sentry.eventstore.models import GroupEvent
812
from sentry.issues.issue_occurrence import IssueOccurrence
913
from sentry.issues.producer import PayloadType, produce_occurrence_to_kafka
@@ -57,6 +61,142 @@ def get_detector_by_event(event_data: WorkflowEventData) -> Detector:
5761
return detector
5862

5963

64+
class _SplitEvents(NamedTuple):
65+
events_with_occurrences: list[tuple[GroupEvent, int]]
66+
error_events: list[GroupEvent]
67+
events_missing_detectors: list[GroupEvent]
68+
69+
70+
def _split_events_by_occurrence(
71+
event_list: list[GroupEvent],
72+
) -> _SplitEvents:
73+
from sentry.grouping.grouptype import ErrorGroupType
74+
75+
events_with_occurrences: list[tuple[GroupEvent, int]] = []
76+
error_events: list[GroupEvent] = [] # only error events don't have occurrences
77+
events_missing_detectors: list[GroupEvent] = []
78+
79+
for event in event_list:
80+
issue_occurrence = event.occurrence
81+
if issue_occurrence is None:
82+
assert event.group.issue_type.slug == ErrorGroupType.slug
83+
error_events.append(event)
84+
elif detector_id := issue_occurrence.evidence_data.get("detector_id"):
85+
events_with_occurrences.append((event, detector_id))
86+
else:
87+
events_missing_detectors.append(event)
88+
89+
return _SplitEvents(
90+
events_with_occurrences,
91+
error_events,
92+
events_missing_detectors,
93+
)
94+
95+
96+
def _create_event_detector_map(
97+
detectors: BaseQuerySet[Detector],
98+
key_event_map: dict[int, list[GroupEvent]],
99+
detector_key_extractor: Callable[[Detector], int],
100+
) -> tuple[dict[str, Detector], set[int]]:
101+
result: dict[str, Detector] = {}
102+
103+
# used to track existing keys (detector_id or project_id) to log missing keys
104+
keys = set()
105+
106+
for detector in detectors:
107+
key = detector_key_extractor(detector)
108+
keys.add(key)
109+
detector_events = key_event_map[key]
110+
result.update({event.event_id: detector for event in detector_events})
111+
112+
return result, keys
113+
114+
115+
def get_detectors_by_groupevents_bulk(
116+
event_list: list[GroupEvent],
117+
) -> Mapping[str, Detector]:
118+
"""
119+
Given a list of GroupEvents, return a mapping of event_id to Detector.
120+
"""
121+
from sentry.grouping.grouptype import ErrorGroupType
122+
123+
if not event_list:
124+
return {}
125+
126+
result: dict[str, Detector] = {}
127+
128+
# Separate events by whether they have occurrences or not
129+
events_with_occurrences, error_events, events_missing_detectors = _split_events_by_occurrence(
130+
event_list
131+
)
132+
133+
# Fetch detectors for events with occurrences (by detector_id)
134+
missing_detector_ids = set()
135+
if events_with_occurrences:
136+
detector_id_to_events: dict[int, list[GroupEvent]] = defaultdict(list)
137+
138+
for event, detector_id in events_with_occurrences:
139+
detector_id_to_events[detector_id].append(event)
140+
141+
def _extract_events_lookup_key(detector: Detector) -> int:
142+
return detector.id
143+
144+
if detector_id_to_events:
145+
detectors = Detector.objects.filter(id__in=list(detector_id_to_events.keys()))
146+
mapping, found_detector_ids = _create_event_detector_map(
147+
detectors,
148+
key_event_map=detector_id_to_events,
149+
detector_key_extractor=_extract_events_lookup_key,
150+
)
151+
result.update(mapping)
152+
153+
missing_detector_ids = set(detector_id_to_events.keys()) - found_detector_ids
154+
155+
# Fetch detectors for events without occurrences (by project_id)
156+
projects_missing_detectors = set()
157+
if error_events:
158+
# Group events by project_id
159+
project_to_events: dict[int, list[GroupEvent]] = defaultdict(list)
160+
161+
for event in error_events:
162+
project_to_events[event.project_id].append(event)
163+
164+
def _extract_events_lookup_key(detector: Detector) -> int:
165+
return detector.project_id
166+
167+
detectors = Detector.objects.filter(
168+
project_id__in=project_to_events.keys(),
169+
type=ErrorGroupType.slug,
170+
)
171+
mapping, projects_with_error_detectors = _create_event_detector_map(
172+
detectors,
173+
key_event_map=project_to_events,
174+
detector_key_extractor=_extract_events_lookup_key,
175+
)
176+
result.update(mapping)
177+
178+
projects_missing_detectors = set(project_to_events.keys()) - projects_with_error_detectors
179+
180+
# Log all missing detectors
181+
if missing_detector_ids or projects_missing_detectors or events_missing_detectors:
182+
metrics.incr(
183+
"workflow_engine.detectors.error",
184+
amount=len(projects_missing_detectors) + len(missing_detector_ids),
185+
)
186+
logger.error(
187+
"Detectors not found for events",
188+
extra={
189+
"projects_missing_error_detectors": projects_missing_detectors,
190+
"missing_detectors": missing_detector_ids,
191+
"events_missing_detectors": [
192+
(event.event_id, event.group_id) for event in events_missing_detectors
193+
],
194+
},
195+
)
196+
197+
return result
198+
199+
60200
def create_issue_platform_payload(result: DetectorEvaluationResult) -> None:
61201
occurrence, status_change = None, None
62202

0 commit comments

Comments
 (0)