Skip to content

Commit 3097b96

Browse files
ceorourkemifu67
andauthored
feat(ACI): Allow processing dynamic data conditions (#93851)
Enable processing dynamic data conditions through workflow engine. --------- Co-authored-by: Michelle Fu <michelle.fu@sentry.io>
1 parent 186e9e8 commit 3097b96

File tree

8 files changed

+523
-151
lines changed

8 files changed

+523
-151
lines changed

src/sentry/incidents/grouptype.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,11 @@ def extract_dedupe_value(self, data_packet: DataPacket[QuerySubscriptionUpdate])
100100
return int(data_packet.packet.get("timestamp", datetime.now(UTC)).timestamp())
101101

102102
def extract_value(self, data_packet: DataPacket[QuerySubscriptionUpdate]) -> int:
103-
return data_packet.packet["values"]["value"]
103+
# this is a bit of a hack - anomaly detection data packets send extra data we need to pass along
104+
values = data_packet.packet["values"]
105+
if values.get("value") is not None:
106+
return values.get("value")
107+
return values
104108

105109
def construct_title(
106110
self,

src/sentry/incidents/handlers/condition/anomaly_detection_handler.py

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
import logging
2-
from typing import Any
2+
from datetime import datetime
3+
from typing import Any, TypedDict
34

45
from django.conf import settings
56

67
from sentry.net.http import connection_from_url
7-
from sentry.seer.anomaly_detection.get_anomaly_data import get_anomaly_data_from_seer
88
from sentry.seer.anomaly_detection.types import (
99
AnomalyDetectionSeasonality,
1010
AnomalyDetectionSensitivity,
1111
AnomalyDetectionThresholdType,
1212
AnomalyType,
1313
)
1414
from sentry.snuba.models import QuerySubscription
15-
from sentry.workflow_engine.models import Condition, DataPacket
15+
from sentry.workflow_engine.models import Condition
1616
from sentry.workflow_engine.models.data_condition import DataConditionEvaluationException
1717
from sentry.workflow_engine.registry import condition_handler_registry
1818
from sentry.workflow_engine.types import DataConditionHandler, DetectorPriorityLevel
@@ -31,8 +31,15 @@
3131
}
3232

3333

34+
class AnomalyDetectionUpdate(TypedDict):
35+
value: int
36+
source_id: int
37+
subscription_id: int
38+
timestamp: datetime
39+
40+
3441
@condition_handler_registry.register(Condition.ANOMALY_DETECTION)
35-
class AnomalyDetectionHandler(DataConditionHandler[DataPacket]):
42+
class AnomalyDetectionHandler(DataConditionHandler[AnomalyDetectionUpdate]):
3643
group = DataConditionHandler.Group.DETECTOR_TRIGGER
3744
comparison_json_schema = {
3845
"type": "object",
@@ -55,21 +62,24 @@ class AnomalyDetectionHandler(DataConditionHandler[DataPacket]):
5562
}
5663

5764
@staticmethod
58-
def evaluate_value(update: DataPacket, comparison: Any) -> DetectorPriorityLevel:
65+
def evaluate_value(update: AnomalyDetectionUpdate, comparison: Any) -> DetectorPriorityLevel:
66+
from sentry.seer.anomaly_detection.get_anomaly_data import get_anomaly_data_from_seer
67+
5968
sensitivity = comparison["sensitivity"]
6069
seasonality = comparison["seasonality"]
6170
threshold_type = comparison["threshold_type"]
6271

63-
subscription: QuerySubscription = QuerySubscription.objects.get(id=int(update.source_id))
72+
source_id = update.get("source_id")
73+
assert source_id
6474

65-
subscription_update = update.packet
75+
subscription: QuerySubscription = QuerySubscription.objects.get(id=int(source_id))
6676

6777
anomaly_data = get_anomaly_data_from_seer(
6878
sensitivity=sensitivity,
6979
seasonality=seasonality,
7080
threshold_type=threshold_type,
7181
subscription=subscription,
72-
subscription_update=subscription_update,
82+
subscription_update=update,
7383
)
7484
# covers both None and []
7585
if not anomaly_data:

src/sentry/incidents/subscription_processor.py

Lines changed: 104 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from sentry_redis_tools.retrying_cluster import RetryingRedisCluster
1414

1515
from sentry import features
16+
from sentry.api.exceptions import ResourceDoesNotExist
1617
from sentry.constants import ObjectStatus
1718
from sentry.incidents.logic import (
1819
CRITICAL_TRIGGER_LABEL,
@@ -50,6 +51,9 @@
5051
)
5152
from sentry.models.project import Project
5253
from sentry.seer.anomaly_detection.get_anomaly_data import get_anomaly_data_from_seer_legacy
54+
from sentry.seer.anomaly_detection.get_historical_anomalies import (
55+
get_anomaly_evaluation_from_workflow_engine,
56+
)
5357
from sentry.seer.anomaly_detection.utils import anomaly_has_confidence, has_anomaly
5458
from sentry.snuba.dataset import Dataset
5559
from sentry.snuba.models import QuerySubscription
@@ -253,6 +257,40 @@ def get_aggregation_value(
253257

254258
return aggregation_value
255259

260+
def handle_trigger_anomalies(
261+
self,
262+
has_anomaly: bool,
263+
trigger: AlertRuleTrigger,
264+
aggregation_value: float,
265+
fired_incident_triggers: list[IncidentTrigger],
266+
) -> list[IncidentTrigger]:
267+
trigger_matches_status = self.check_trigger_matches_status(trigger, TriggerStatus.ACTIVE)
268+
269+
if has_anomaly and not trigger_matches_status:
270+
metrics.incr(
271+
"incidents.alert_rules.threshold.alert",
272+
tags={"detection_type": self.alert_rule.detection_type},
273+
)
274+
incident_trigger = self.trigger_alert_threshold(trigger, aggregation_value)
275+
if incident_trigger is not None:
276+
fired_incident_triggers.append(incident_trigger)
277+
else:
278+
self.trigger_alert_counts[trigger.id] = 0
279+
280+
if not has_anomaly and self.active_incident and trigger_matches_status:
281+
metrics.incr(
282+
"incidents.alert_rules.threshold.resolve",
283+
tags={"detection_type": self.alert_rule.detection_type},
284+
)
285+
incident_trigger = self.trigger_resolve_threshold(trigger, aggregation_value)
286+
287+
if incident_trigger is not None:
288+
fired_incident_triggers.append(incident_trigger)
289+
else:
290+
self.trigger_resolve_counts[trigger.id] = 0
291+
292+
return fired_incident_triggers
293+
256294
def process_update(self, subscription_update: QuerySubscriptionUpdate) -> None:
257295
"""
258296
This is the core processing method utilized when Query Subscription Consumer fetches updates from kafka
@@ -311,12 +349,14 @@ def process_update(self, subscription_update: QuerySubscriptionUpdate) -> None:
311349
has_metric_alert_processing = features.has(
312350
"organizations:workflow-engine-metric-alert-processing", organization
313351
)
352+
has_anomaly_detection = features.has(
353+
"organizations:anomaly-detection-alerts", organization
354+
) and features.has("organizations:anomaly-detection-rollout", organization)
355+
314356
comparison_delta = None
357+
detector = None
315358

316-
if (
317-
has_metric_alert_processing
318-
and not self.alert_rule.detection_type == AlertRuleDetectionType.DYNAMIC
319-
):
359+
if has_metric_alert_processing:
320360
try:
321361
detector = Detector.objects.get(
322362
data_sources__source_id=str(self.subscription.id),
@@ -335,51 +375,51 @@ def process_update(self, subscription_update: QuerySubscriptionUpdate) -> None:
335375

336376
if aggregation_value is not None:
337377
if has_metric_alert_processing:
338-
packet = QuerySubscriptionUpdate(
339-
entity=subscription_update.get("entity", ""),
340-
subscription_id=subscription_update["subscription_id"],
341-
values={"value": aggregation_value},
342-
timestamp=self.last_update,
343-
)
378+
if self.alert_rule.detection_type == AlertRuleDetectionType.DYNAMIC:
379+
packet = QuerySubscriptionUpdate(
380+
entity=subscription_update.get("entity", ""),
381+
subscription_id=subscription_update["subscription_id"],
382+
values={
383+
"values": {
384+
"value": aggregation_value,
385+
"source_id": str(self.subscription.id),
386+
"subscription_id": subscription_update["subscription_id"],
387+
"timestamp": self.last_update,
388+
},
389+
},
390+
timestamp=self.last_update,
391+
)
392+
else:
393+
packet = QuerySubscriptionUpdate(
394+
entity=subscription_update.get("entity", ""),
395+
subscription_id=subscription_update["subscription_id"],
396+
values={"value": aggregation_value},
397+
timestamp=self.last_update,
398+
)
344399
data_packet = DataPacket[QuerySubscriptionUpdate](
345400
source_id=str(self.subscription.id), packet=packet
346401
)
347-
# temporarily skip processing any anomaly detection alerts
348-
if self.alert_rule.detection_type != AlertRuleDetectionType.DYNAMIC:
349-
results = process_data_packets(
350-
[data_packet], DATA_SOURCE_SNUBA_QUERY_SUBSCRIPTION
402+
results = process_data_packets([data_packet], DATA_SOURCE_SNUBA_QUERY_SUBSCRIPTION)
403+
if features.has(
404+
"organizations:workflow-engine-metric-alert-dual-processing-logs",
405+
self.alert_rule.organization,
406+
):
407+
logger.info(
408+
"dual processing results for alert rule",
409+
extra={
410+
"results": results,
411+
"num_results": len(results),
412+
"value": aggregation_value,
413+
"rule_id": self.alert_rule.id,
414+
},
351415
)
352-
if features.has(
353-
"organizations:workflow-engine-metric-alert-dual-processing-logs",
354-
self.alert_rule.organization,
355-
):
356-
logger.info(
357-
"dual processing results for alert rule",
358-
extra={
359-
"results": results,
360-
"num_results": len(results),
361-
"value": aggregation_value,
362-
"rule_id": self.alert_rule.id,
363-
},
364-
)
365-
366-
has_anomaly_detection = features.has(
367-
"organizations:anomaly-detection-alerts", organization
368-
) and features.has("organizations:anomaly-detection-rollout", organization)
369416

370417
potential_anomalies = None
371418
if (
372419
has_anomaly_detection
373420
and self.alert_rule.detection_type == AlertRuleDetectionType.DYNAMIC
421+
and not has_metric_alert_processing
374422
):
375-
logger.info(
376-
"Raw subscription update",
377-
extra={
378-
"result": subscription_update,
379-
"aggregation_value": aggregation_value,
380-
"rule_id": self.alert_rule.id,
381-
},
382-
)
383423
with metrics.timer(
384424
"incidents.subscription_processor.process_update.get_anomaly_data_from_seer_legacy"
385425
):
@@ -390,28 +430,37 @@ def process_update(self, subscription_update: QuerySubscriptionUpdate) -> None:
390430
aggregation_value=aggregation_value,
391431
)
392432
if potential_anomalies is None:
393-
logger.info(
394-
"No potential anomalies found",
395-
extra={
396-
"subscription_id": self.subscription.id,
397-
"dataset": self.alert_rule.snuba_query.dataset,
398-
"organization_id": self.subscription.project.organization.id,
399-
"project_id": self.subscription.project_id,
400-
"alert_rule_id": self.alert_rule.id,
401-
},
402-
)
403433
return
404434

405435
if aggregation_value is None:
406436
metrics.incr("incidents.alert_rules.skipping_update_invalid_aggregation_value")
407437
return
408438

409-
fired_incident_triggers = []
439+
fired_incident_triggers: list[IncidentTrigger] = []
410440
with transaction.atomic(router.db_for_write(AlertRule)):
411441
# Triggers is the threshold - NOT an instance of a trigger
412442
metrics_incremented = False
413443
for trigger in self.triggers:
414-
if potential_anomalies:
444+
# dual processing of anomaly detection alerts
445+
if (
446+
has_anomaly_detection
447+
and has_metric_alert_processing
448+
and self.alert_rule.detection_type == AlertRuleDetectionType.DYNAMIC
449+
):
450+
if not detector:
451+
raise ResourceDoesNotExist("Detector not found, cannot evaluate anomaly")
452+
453+
is_anomalous = get_anomaly_evaluation_from_workflow_engine(detector, results)
454+
if is_anomalous is None:
455+
# we only care about True and False — None indicates no change
456+
continue
457+
458+
assert isinstance(is_anomalous, bool)
459+
fired_incident_triggers = self.handle_trigger_anomalies(
460+
is_anomalous, trigger, aggregation_value, fired_incident_triggers
461+
)
462+
463+
elif potential_anomalies:
415464
# NOTE: There should only be one anomaly in the list
416465
for potential_anomaly in potential_anomalies:
417466
# check to see if we have enough data for the dynamic alert rule now
@@ -425,38 +474,10 @@ def process_update(self, subscription_update: QuerySubscriptionUpdate) -> None:
425474
# we don't need to check if the alert should fire if the alert can't fire yet
426475
continue
427476

428-
if has_anomaly(
429-
potential_anomaly, trigger.label
430-
) and not self.check_trigger_matches_status(trigger, TriggerStatus.ACTIVE):
431-
metrics.incr(
432-
"incidents.alert_rules.threshold.alert",
433-
tags={"detection_type": self.alert_rule.detection_type},
434-
)
435-
incident_trigger = self.trigger_alert_threshold(
436-
trigger, aggregation_value
437-
)
438-
if incident_trigger is not None:
439-
fired_incident_triggers.append(incident_trigger)
440-
else:
441-
self.trigger_alert_counts[trigger.id] = 0
442-
443-
if (
444-
not has_anomaly(potential_anomaly, trigger.label)
445-
and self.active_incident
446-
and self.check_trigger_matches_status(trigger, TriggerStatus.ACTIVE)
447-
):
448-
metrics.incr(
449-
"incidents.alert_rules.threshold.resolve",
450-
tags={"detection_type": self.alert_rule.detection_type},
451-
)
452-
incident_trigger = self.trigger_resolve_threshold(
453-
trigger, aggregation_value
454-
)
455-
456-
if incident_trigger is not None:
457-
fired_incident_triggers.append(incident_trigger)
458-
else:
459-
self.trigger_resolve_counts[trigger.id] = 0
477+
is_anomalous = has_anomaly(potential_anomaly, trigger.label)
478+
fired_incident_triggers = self.handle_trigger_anomalies(
479+
is_anomalous, trigger, aggregation_value, fired_incident_triggers
480+
)
460481
else:
461482
# OVER/UNDER value trigger
462483
alert_operator, resolve_operator = self.THRESHOLD_TYPE_OPERATORS[

src/sentry/seer/anomaly_detection/get_anomaly_data.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
from urllib3.exceptions import MaxRetryError, TimeoutError
55

66
from sentry.conf.server import SEER_ANOMALY_DETECTION_ENDPOINT_URL
7+
from sentry.incidents.handlers.condition.anomaly_detection_handler import AnomalyDetectionUpdate
78
from sentry.incidents.models.alert_rule import AlertRule
8-
from sentry.incidents.utils.types import QuerySubscriptionUpdate
99
from sentry.net.http import connection_from_url
1010
from sentry.seer.anomaly_detection.types import (
1111
AlertInSeer,
@@ -166,10 +166,10 @@ def get_anomaly_data_from_seer(
166166
seasonality: AnomalyDetectionSeasonality,
167167
threshold_type: AnomalyDetectionThresholdType,
168168
subscription: QuerySubscription,
169-
subscription_update: QuerySubscriptionUpdate,
169+
subscription_update: AnomalyDetectionUpdate,
170170
) -> list[TimeSeriesPoint] | None:
171171
snuba_query: SnubaQuery = subscription.snuba_query
172-
aggregation_value = subscription_update["values"].get("value")
172+
aggregation_value = subscription_update.get("value")
173173
source_id = subscription.id
174174
source_type = DataSourceType.SNUBA_QUERY_SUBSCRIPTION
175175
if aggregation_value is None:
@@ -185,6 +185,8 @@ def get_anomaly_data_from_seer(
185185
"source_id": source_id,
186186
"source_type": source_type,
187187
}
188+
timestamp = subscription_update.get("timestamp")
189+
assert timestamp
188190

189191
anomaly_detection_config = AnomalyDetectionConfig(
190192
time_period=int(snuba_query.time_window / 60),
@@ -195,9 +197,7 @@ def get_anomaly_data_from_seer(
195197
context = AlertInSeer(
196198
source_id=source_id,
197199
source_type=source_type,
198-
cur_window=TimeSeriesPoint(
199-
timestamp=subscription_update["timestamp"].timestamp(), value=aggregation_value
200-
),
200+
cur_window=TimeSeriesPoint(timestamp=timestamp.timestamp(), value=aggregation_value),
201201
)
202202
detect_anomalies_request = DetectAnomaliesRequest(
203203
organization_id=subscription.project.organization.id,

0 commit comments

Comments
 (0)