Skip to content

Commit 5399776

Browse files
authored
1 parent 329e27b commit 5399776

File tree

3 files changed

+73
-14
lines changed

3 files changed

+73
-14
lines changed

src/sentry/workflow_engine/processors/data_source.py

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,30 +9,51 @@
99
logger = logging.getLogger("sentry.workflow_engine.process_data_source")
1010

1111

12-
# TODO - @saponifi3d - change the text choices to an enum
12+
def bulk_fetch_enabled_detectors(
13+
source_ids: set[str], query_type: str
14+
) -> dict[str, list[Detector]]:
15+
"""
16+
Get all of the enabled detectors for a list of detector source ids and types.
17+
This will also prefetch all the subsequent data models for evaluating the detector.
18+
"""
19+
data_sources = (
20+
DataSource.objects.filter(
21+
source_id__in=source_ids,
22+
type=query_type,
23+
detectors__enabled=True,
24+
)
25+
.prefetch_related(
26+
Prefetch(
27+
"detectors",
28+
queryset=Detector.objects.filter(enabled=True)
29+
.select_related("workflow_condition_group")
30+
.prefetch_related("workflow_condition_group__conditions"),
31+
)
32+
)
33+
.distinct()
34+
)
35+
36+
result: dict[str, list[Detector]] = {}
37+
for data_source in data_sources:
38+
result[data_source.source_id] = list(data_source.detectors.all())
39+
40+
return result
41+
42+
1343
# TODO - @saponifi3d - make query_type optional override, otherwise infer from the data packet.
1444
def process_data_sources[
1545
T
1646
](data_packets: list[DataPacket[T]], query_type: str) -> list[tuple[DataPacket[T], list[Detector]]]:
1747
metrics.incr("workflow_engine.process_data_sources", tags={"query_type": query_type})
1848

19-
data_packet_ids = {packet.source_id for packet in data_packets}
20-
21-
# Fetch all data sources and associated detectors for the given data packets
22-
with sentry_sdk.start_span(op="workflow_engine.process_data_sources.fetch_data_sources"):
23-
data_sources = DataSource.objects.filter(
24-
source_id__in=data_packet_ids,
25-
type=query_type,
26-
detectors__enabled=True,
27-
).prefetch_related(Prefetch("detectors"))
28-
29-
# Build a lookup dict for source_id to detectors
30-
source_id_to_detectors = {ds.source_id: list(ds.detectors.all()) for ds in data_sources}
49+
with sentry_sdk.start_span(op="workflow_engine.process_data_sources.get_enabled_detectors"):
50+
packet_source_ids = {packet.source_id for packet in data_packets}
51+
source_to_detector = bulk_fetch_enabled_detectors(packet_source_ids, query_type)
3152

3253
# Create the result tuples
3354
result = []
3455
for packet in data_packets:
35-
detectors = source_id_to_detectors.get(packet.source_id)
56+
detectors: list[Detector] = source_to_detector.get(packet.source_id, [])
3657

3758
if detectors:
3859
data_packet_tuple = (packet, detectors)

src/sentry/workflow_engine/processors/detector.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import logging
44

5+
import sentry_sdk
6+
57
from sentry.eventstore.models import GroupEvent
68
from sentry.issues.issue_occurrence import IssueOccurrence
79
from sentry.issues.producer import PayloadType, produce_occurrence_to_kafka
@@ -76,6 +78,7 @@ def create_issue_platform_payload(result: DetectorEvaluationResult) -> None:
7678
)
7779

7880

81+
@sentry_sdk.trace
7982
def process_detectors[
8083
T
8184
](data_packet: DataPacket[T], detectors: list[Detector]) -> list[

tests/sentry/workflow_engine/processors/test_data_sources.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,18 @@ def setUp(self):
2828
self.detector_one = self.create_detector(name="test_detector1")
2929
self.detector_two = self.create_detector(name="test_detector2", type="metric_issue")
3030

31+
self.detector_one.workflow_condition_group = self.create_data_condition_group(
32+
logic_type="any"
33+
)
34+
35+
self.create_data_condition(
36+
condition_group=self.detector_one.workflow_condition_group,
37+
type="eq",
38+
comparison="bar",
39+
condition_result=True,
40+
)
41+
self.detector_one.save()
42+
3143
self.ds1 = self.create_data_source(source_id=self.query.id, type="test")
3244
self.ds1.detectors.set([self.detector_one])
3345

@@ -141,3 +153,26 @@ def test_metrics_for_many_detectors(self):
141153
2,
142154
tags={"query_type": "test"},
143155
)
156+
157+
def test_sql_cascades(self):
158+
with self.assertNumQueries(3):
159+
"""
160+
There should be 3 total SQL queries for `bulk_fetch_enabled_detectors`:
161+
- Get all the detectors
162+
- Get all the data condition groups for those detectors
163+
- Get all the data conditions for those groups
164+
"""
165+
results = process_data_sources(self.data_packets, "test")
166+
167+
for packet, detectors in results:
168+
# If the detector is not prefetched this will increase the query count
169+
assert all(detector.enabled for detector in detectors)
170+
171+
for detector in detectors:
172+
if detector.workflow_condition_group:
173+
# Trigger a SQL query if not prefetched, and fail the assertion
174+
assert detector.workflow_condition_group.id is not None
175+
176+
for condition in detector.workflow_condition_group.conditions.all():
177+
# Trigger a SQL query if not prefetched, and fail the assertion
178+
assert condition.id is not None

0 commit comments

Comments
 (0)