1
1
import logging
2
+ import zlib
2
3
from collections .abc import Mapping
4
+ from typing import cast
3
5
4
6
import sentry_sdk
7
+ import sentry_sdk .scope
5
8
from arroyo .backends .kafka .consumer import KafkaPayload
6
9
from arroyo .processing .strategies import RunTask , RunTaskInThreads
7
10
from arroyo .processing .strategies .abstract import ProcessingStrategy , ProcessingStrategyFactory
8
11
from arroyo .processing .strategies .commit import CommitOffsets
9
12
from arroyo .types import Commit , FilteredPayload , Message , Partition
10
13
from django .conf import settings
14
+ from sentry_kafka_schemas .codecs import Codec , ValidationError
15
+ from sentry_kafka_schemas .schema_types .ingest_replay_recordings_v1 import ReplayRecording
16
+ from sentry_sdk import set_tag
11
17
18
+ from sentry .conf .types .kafka_definition import Topic , get_topic_codec
12
19
from sentry .filestore .gcs import GCS_RETRYABLE_ERRORS
13
20
from sentry .replays .usecases .ingest import (
14
- DropSilently ,
15
- ProcessedRecordingMessage ,
21
+ DropEvent ,
22
+ Event ,
23
+ ProcessedEvent ,
16
24
commit_recording_message ,
17
- parse_recording_message ,
18
- process_recording_message ,
25
+ process_recording_event ,
19
26
track_recording_metadata ,
20
27
)
28
+ from sentry .utils import json
29
+
30
+ RECORDINGS_CODEC : Codec [ReplayRecording ] = get_topic_codec (Topic .INGEST_REPLAYS_RECORDINGS )
21
31
22
32
logger = logging .getLogger (__name__ )
23
33
24
34
35
+ class DropSilently (Exception ):
36
+ pass
37
+
38
+
25
39
class ProcessReplayRecordingStrategyFactory (ProcessingStrategyFactory [KafkaPayload ]):
26
40
def __init__ (
27
41
self ,
@@ -61,7 +75,10 @@ def create_with_partitions(
61
75
)
62
76
63
77
64
- def process_message (message : Message [KafkaPayload ]) -> ProcessedRecordingMessage | FilteredPayload :
78
+ # Processing Task
79
+
80
+
81
+ def process_message (message : Message [KafkaPayload ]) -> ProcessedEvent | FilteredPayload :
65
82
with sentry_sdk .start_transaction (
66
83
name = "replays.consumer.recording_buffered.process_message" ,
67
84
op = "replays.consumer.recording_buffered.process_message" ,
@@ -70,15 +87,87 @@ def process_message(message: Message[KafkaPayload]) -> ProcessedRecordingMessage
70
87
},
71
88
):
72
89
try :
73
- return process_recording_message (parse_recording_message (message .payload .value ))
90
+ recording_event = parse_recording_event (message .payload .value )
91
+ set_tag ("org_id" , recording_event ["context" ]["org_id" ])
92
+ set_tag ("project_id" , recording_event ["context" ]["project_id" ])
93
+ return process_recording_event (recording_event )
74
94
except DropSilently :
75
95
return FilteredPayload ()
76
96
except Exception :
77
97
logger .exception ("Failed to process replay recording message." )
78
98
return FilteredPayload ()
79
99
80
100
81
- def commit_message (message : Message [ProcessedRecordingMessage ]) -> None :
101
+ @sentry_sdk .trace
102
+ def parse_recording_event (message : bytes ) -> Event :
103
+ recording = parse_request_message (message )
104
+ segment_id , payload = parse_headers (cast (bytes , recording ["payload" ]), recording ["replay_id" ])
105
+ compressed , decompressed = decompress_segment (payload )
106
+
107
+ replay_event_json = recording .get ("replay_event" )
108
+ if replay_event_json :
109
+ replay_event = json .loads (cast (bytes , replay_event_json ))
110
+ else :
111
+ replay_event = None
112
+
113
+ replay_video_raw = recording .get ("replay_video" )
114
+ if replay_video_raw is not None :
115
+ replay_video = cast (bytes , replay_video_raw )
116
+ else :
117
+ replay_video = None
118
+
119
+ return {
120
+ "context" : {
121
+ "key_id" : recording .get ("key_id" ),
122
+ "org_id" : recording ["org_id" ],
123
+ "project_id" : recording ["project_id" ],
124
+ "received" : recording ["received" ],
125
+ "replay_id" : recording ["replay_id" ],
126
+ "retention_days" : recording ["retention_days" ],
127
+ "segment_id" : segment_id ,
128
+ },
129
+ "payload_compressed" : compressed ,
130
+ "payload" : decompressed ,
131
+ "replay_event" : replay_event ,
132
+ "replay_video" : replay_video ,
133
+ }
134
+
135
+
136
+ @sentry_sdk .trace
137
+ def parse_request_message (message : bytes ) -> ReplayRecording :
138
+ try :
139
+ return RECORDINGS_CODEC .decode (message )
140
+ except ValidationError :
141
+ logger .exception ("Could not decode recording message." )
142
+ raise DropSilently ()
143
+
144
+
145
+ @sentry_sdk .trace
146
+ def decompress_segment (segment : bytes ) -> tuple [bytes , bytes ]:
147
+ try :
148
+ return (segment , zlib .decompress (segment ))
149
+ except zlib .error :
150
+ if segment and segment [0 ] == ord ("[" ):
151
+ return (zlib .compress (segment ), segment )
152
+ else :
153
+ logger .exception ("Invalid recording body." )
154
+ raise DropSilently ()
155
+
156
+
157
+ @sentry_sdk .trace
158
+ def parse_headers (recording : bytes , replay_id : str ) -> tuple [int , bytes ]:
159
+ try :
160
+ recording_headers_json , recording_segment = recording .split (b"\n " , 1 )
161
+ return int (json .loads (recording_headers_json )["segment_id" ]), recording_segment
162
+ except Exception :
163
+ logger .exception ("Recording headers could not be extracted %s" , replay_id )
164
+ raise DropSilently ()
165
+
166
+
167
+ # I/O Task
168
+
169
+
170
+ def commit_message (message : Message [ProcessedEvent ]) -> None :
82
171
isolation_scope = sentry_sdk .get_isolation_scope ().fork ()
83
172
with sentry_sdk .scope .use_isolation_scope (isolation_scope ):
84
173
with sentry_sdk .start_transaction (
@@ -96,7 +185,7 @@ def commit_message(message: Message[ProcessedRecordingMessage]) -> None:
96
185
return None
97
186
except GCS_RETRYABLE_ERRORS :
98
187
raise
99
- except DropSilently :
188
+ except DropEvent :
100
189
return None
101
190
except Exception :
102
191
logger .exception ("Failed to commit replay recording message." )
0 commit comments