18
18
from sentry import killswitches
19
19
from sentry .spans .buffer import Span , SpansBuffer
20
20
from sentry .spans .consumers .process .flusher import SpanFlusher
21
+ from sentry .utils import metrics
21
22
from sentry .utils .arroyo import MultiprocessingPool , run_task_with_multiprocessing
22
23
23
24
logger = logging .getLogger (__name__ )
@@ -122,6 +123,7 @@ def shutdown(self) -> None:
122
123
self .__pool .close ()
123
124
124
125
126
+ @metrics .wraps ("spans.buffer.process_batch" )
125
127
def process_batch (
126
128
buffer : SpansBuffer ,
127
129
values : Message [ValuesBatch [tuple [int , KafkaPayload ]]],
@@ -136,17 +138,16 @@ def process_batch(
136
138
if min_timestamp is None or timestamp < min_timestamp :
137
139
min_timestamp = timestamp
138
140
139
- val = cast (SpanEvent , rapidjson .loads (payload .value ))
140
-
141
- partition_id = value .partition .index
141
+ with metrics .timer ("spans.buffer.process_batch.decode" ):
142
+ val = cast (SpanEvent , rapidjson .loads (payload .value ))
142
143
143
144
if killswitches .killswitch_matches_context (
144
145
"spans.drop-in-buffer" ,
145
146
{
146
147
"org_id" : val .get ("organization_id" ),
147
148
"project_id" : val .get ("project_id" ),
148
149
"trace_id" : val .get ("trace_id" ),
149
- "partition_id" : partition_id ,
150
+ "partition_id" : value . partition . index ,
150
151
},
151
152
):
152
153
continue
0 commit comments