1
1
import functools
2
2
import logging
3
3
from collections .abc import Generator , Iterator
4
- from typing import Any
4
+ from typing import Any , TypedDict
5
5
6
6
import requests
7
7
import sentry_sdk
10
10
from rest_framework .request import Request
11
11
from rest_framework .response import Response
12
12
13
- from sentry import features
13
+ from sentry import features , nodestore
14
14
from sentry .api .api_owners import ApiOwner
15
15
from sentry .api .api_publish_status import ApiPublishStatus
16
16
from sentry .api .base import region_silo_endpoint
17
17
from sentry .api .bases .project import ProjectEndpoint
18
18
from sentry .api .paginator import GenericOffsetPaginator
19
+ from sentry .eventstore .models import Event
20
+ from sentry .models .project import Project
19
21
from sentry .replays .lib .storage import RecordingSegmentStorageMeta , storage
22
+ from sentry .replays .post_process import process_raw_response
23
+ from sentry .replays .query import query_replay_instance
20
24
from sentry .replays .usecases .ingest .event_parser import as_log_message
21
25
from sentry .replays .usecases .reader import fetch_segments_metadata , iter_segment_data
22
26
from sentry .seer .signed_seer_api import sign_with_seer_secret
25
29
logger = logging .getLogger (__name__ )
26
30
27
31
32
+ class ErrorEvent (TypedDict ):
33
+ id : str
34
+ title : str
35
+ message : str
36
+ timestamp : float
37
+ category : str
38
+
39
+
28
40
@region_silo_endpoint
29
41
@extend_schema (tags = ["Replays" ])
30
42
class ProjectReplaySummarizeBreadcrumbsEndpoint (ProjectEndpoint ):
@@ -37,7 +49,7 @@ def __init__(self, **options) -> None:
37
49
storage .initialize_client ()
38
50
super ().__init__ (** options )
39
51
40
- def get (self , request : Request , project , replay_id : str ) -> Response :
52
+ def get (self , request : Request , project : Project , replay_id : str ) -> Response :
41
53
"""Return a collection of replay recording segments."""
42
54
if (
43
55
not features .has (
@@ -52,17 +64,117 @@ def get(self, request: Request, project, replay_id: str) -> Response:
52
64
):
53
65
return self .respond (status = 404 )
54
66
67
+ filter_params = self .get_filter_params (request , project )
68
+
69
+ # Fetch the replay's error IDs from the replay_id.
70
+ snuba_response = query_replay_instance (
71
+ project_id = project .id ,
72
+ replay_id = replay_id ,
73
+ start = filter_params ["start" ],
74
+ end = filter_params ["end" ],
75
+ organization = project .organization ,
76
+ request_user_id = request .user .id ,
77
+ )
78
+
79
+ response = process_raw_response (
80
+ snuba_response ,
81
+ fields = request .query_params .getlist ("field" ),
82
+ )
83
+
84
+ error_ids = response [0 ].get ("error_ids" , []) if response else []
85
+
86
+ # Check if error fetching should be disabled
87
+ disable_error_fetching = (
88
+ request .query_params .get ("enable_error_context" , "true" ).lower () == "false"
89
+ )
90
+
91
+ if disable_error_fetching :
92
+ error_events = []
93
+ else :
94
+ error_events = fetch_error_details (project_id = project .id , error_ids = error_ids )
95
+
55
96
return self .paginate (
56
97
request = request ,
57
98
paginator_cls = GenericOffsetPaginator ,
58
99
data_fn = functools .partial (fetch_segments_metadata , project .id , replay_id ),
59
- on_results = analyze_recording_segments ,
100
+ on_results = functools . partial ( analyze_recording_segments , error_events ) ,
60
101
)
61
102
62
103
104
+ def fetch_error_details (project_id : int , error_ids : list [str ]) -> list [ErrorEvent ]:
105
+ """Fetch error details given error IDs and return a list of ErrorEvent objects."""
106
+ try :
107
+ node_ids = [Event .generate_node_id (project_id , event_id = id ) for id in error_ids ]
108
+ events = nodestore .backend .get_multi (node_ids )
109
+
110
+ return [
111
+ ErrorEvent (
112
+ category = "error" ,
113
+ id = event_id ,
114
+ title = data .get ("title" , "" ),
115
+ timestamp = data .get ("timestamp" , 0.0 ),
116
+ message = data .get ("message" , "" ),
117
+ )
118
+ for event_id , data in zip (error_ids , events .values ())
119
+ if data is not None
120
+ ]
121
+ except Exception as e :
122
+ sentry_sdk .capture_exception (e )
123
+ return []
124
+
125
+
126
+ def generate_error_log_message (error : ErrorEvent ) -> str :
127
+ title = error ["title" ]
128
+ message = error ["message" ]
129
+ timestamp = error ["timestamp" ]
130
+
131
+ return f"User experienced an error: '{ title } : { message } ' at { timestamp } "
132
+
133
+
134
+ def get_request_data (
135
+ iterator : Iterator [tuple [int , memoryview ]], error_events : list [ErrorEvent ]
136
+ ) -> list [str ]:
137
+ # Sort error events by timestamp
138
+ error_events .sort (key = lambda x : x ["timestamp" ])
139
+ return list (gen_request_data (iterator , error_events ))
140
+
141
+
142
+ def gen_request_data (
143
+ iterator : Iterator [tuple [int , memoryview ]], error_events : list [ErrorEvent ]
144
+ ) -> Generator [str ]:
145
+ """Generate log messages from events and errors in chronological order."""
146
+ error_idx = 0
147
+
148
+ # Process segments
149
+ for _ , segment in iterator :
150
+ events = json .loads (segment .tobytes ().decode ("utf-8" ))
151
+ for event in events :
152
+ # Check if we need to yield any error messages that occurred before this event
153
+ while error_idx < len (error_events ) and error_events [error_idx ][
154
+ "timestamp"
155
+ ] < event .get ("timestamp" , 0 ):
156
+ error = error_events [error_idx ]
157
+ yield generate_error_log_message (error )
158
+ error_idx += 1
159
+
160
+ # Yield the current event's log message
161
+ if message := as_log_message (event ):
162
+ yield message
163
+
164
+ # Yield any remaining error messages
165
+ while error_idx < len (error_events ):
166
+ error = error_events [error_idx ]
167
+ yield generate_error_log_message (error )
168
+ error_idx += 1
169
+
170
+
63
171
@sentry_sdk .trace
64
- def analyze_recording_segments (segments : list [RecordingSegmentStorageMeta ]) -> dict [str , Any ]:
65
- request_data = json .dumps ({"logs" : get_request_data (iter_segment_data (segments ))})
172
+ def analyze_recording_segments (
173
+ error_events : list [ErrorEvent ],
174
+ segments : list [RecordingSegmentStorageMeta ],
175
+ ) -> dict [str , Any ]:
176
+ # Combine breadcrumbs and error details
177
+ request_data = json .dumps ({"logs" : get_request_data (iter_segment_data (segments ), error_events )})
66
178
67
179
# XXX: I have to deserialize this request so it can be "automatically" reserialized by the
68
180
# paginate method. This is less than ideal.
@@ -94,15 +206,3 @@ def make_seer_request(request_data: str) -> bytes:
94
206
response .raise_for_status ()
95
207
96
208
return response .content
97
-
98
-
99
- def get_request_data (iterator : Iterator [tuple [int , memoryview ]]) -> list [str ]:
100
- return list (gen_request_data (map (lambda r : r [1 ], iterator )))
101
-
102
-
103
- def gen_request_data (segments : Iterator [memoryview ]) -> Generator [str ]:
104
- for segment in segments :
105
- for event in json .loads (segment .tobytes ().decode ("utf-8" )):
106
- message = as_log_message (event )
107
- if message :
108
- yield message
0 commit comments