Skip to content

Commit 33f4fdd

Browse files
authored
feat(uptime): Add ability to use queues to manage parallelism (#95633)
One potential problem we have with batch processing is that any one slow item will clog up the whole batch. This pr implements a queueing method instead, where we keep N queues that each have their own workers. There's still a chance of individual items backlogging a queue, but we can try increased concurrency here to reduce the chances of that happening <!-- Describe your PR here. -->
1 parent 5c94383 commit 33f4fdd

File tree

7 files changed

+1276
-6
lines changed

7 files changed

+1276
-6
lines changed

src/sentry/consumers/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ def uptime_options() -> list[click.Option]:
118118
options = [
119119
click.Option(
120120
["--mode", "mode"],
121-
type=click.Choice(["serial", "parallel", "batched-parallel"]),
121+
type=click.Choice(["serial", "parallel", "batched-parallel", "thread-queue-parallel"]),
122122
default="serial",
123123
help="The mode to process results in. Parallel uses multithreading.",
124124
),
@@ -138,7 +138,7 @@ def uptime_options() -> list[click.Option]:
138138
["--max-workers", "max_workers"],
139139
type=int,
140140
default=None,
141-
help="The maximum number of threads to spawn in parallel mode.",
141+
help="The maximum amount of parallelism to use when in a parallel mode.",
142142
),
143143
click.Option(["--processes", "num_processes"], default=1, type=int),
144144
click.Option(["--input-block-size"], type=int, default=None),
Lines changed: 345 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,345 @@
1+
from __future__ import annotations
2+
3+
import logging
4+
import queue
5+
import threading
6+
import time
7+
from collections import defaultdict
8+
from collections.abc import Callable
9+
from dataclasses import dataclass
10+
from typing import Any, Generic, TypeVar
11+
12+
import sentry_sdk
13+
from arroyo.backends.kafka.consumer import KafkaPayload
14+
from arroyo.processing.strategies import ProcessingStrategy
15+
from arroyo.types import BrokerValue, FilteredPayload, Message, Partition
16+
17+
from sentry.utils import metrics
18+
19+
logger = logging.getLogger(__name__)
20+
21+
T = TypeVar("T")
22+
23+
24+
@dataclass
25+
class WorkItem(Generic[T]):
26+
"""Work item that includes the original message for offset tracking."""
27+
28+
partition: Partition
29+
offset: int
30+
result: T
31+
message: Message[KafkaPayload | FilteredPayload]
32+
33+
34+
class OffsetTracker:
35+
"""
36+
Tracks outstanding offsets and determines which offsets are safe to commit.
37+
38+
- Tracks offsets per partition
39+
- Only commits offsets when all prior offsets are processed
40+
- Thread-safe for concurrent access with per-partition locks
41+
"""
42+
43+
def __init__(self) -> None:
44+
self.all_offsets: dict[Partition, set[int]] = defaultdict(set)
45+
self.outstanding: dict[Partition, set[int]] = defaultdict(set)
46+
self.last_committed: dict[Partition, int] = {}
47+
self.partition_locks: dict[Partition, threading.Lock] = {}
48+
49+
def _get_partition_lock(self, partition: Partition) -> threading.Lock:
50+
"""Get or create a lock for a partition."""
51+
lock = self.partition_locks.get(partition)
52+
if lock:
53+
return lock
54+
return self.partition_locks.setdefault(partition, threading.Lock())
55+
56+
def add_offset(self, partition: Partition, offset: int) -> None:
57+
"""Record that we've started processing an offset."""
58+
with self._get_partition_lock(partition):
59+
self.all_offsets[partition].add(offset)
60+
self.outstanding[partition].add(offset)
61+
62+
def complete_offset(self, partition: Partition, offset: int) -> None:
63+
"""Mark an offset as completed."""
64+
with self._get_partition_lock(partition):
65+
self.outstanding[partition].discard(offset)
66+
67+
def get_committable_offsets(self) -> dict[Partition, int]:
68+
"""
69+
Get the highest offset per partition that can be safely committed.
70+
71+
For each partition, finds the highest contiguous offset that has been processed.
72+
"""
73+
committable = {}
74+
for partition in list(self.all_offsets.keys()):
75+
with self._get_partition_lock(partition):
76+
all_offsets = self.all_offsets[partition]
77+
if not all_offsets:
78+
continue
79+
80+
outstanding = self.outstanding[partition]
81+
last_committed = self.last_committed.get(partition, -1)
82+
83+
min_offset = min(all_offsets)
84+
max_offset = max(all_offsets)
85+
86+
start = max(last_committed + 1, min_offset)
87+
88+
highest_committable = last_committed
89+
for offset in range(start, max_offset + 1):
90+
if offset in all_offsets and offset not in outstanding:
91+
highest_committable = offset
92+
else:
93+
break
94+
95+
if highest_committable > last_committed:
96+
committable[partition] = highest_committable
97+
98+
return committable
99+
100+
def mark_committed(self, partition: Partition, offset: int) -> None:
101+
"""Update the last committed offset for a partition."""
102+
with self._get_partition_lock(partition):
103+
self.last_committed[partition] = offset
104+
# Remove all offsets <= committed offset
105+
self.all_offsets[partition] = {o for o in self.all_offsets[partition] if o > offset}
106+
107+
108+
class OrderedQueueWorker(threading.Thread, Generic[T]):
109+
"""Worker thread that processes items from a queue in order."""
110+
111+
def __init__(
112+
self,
113+
worker_id: int,
114+
work_queue: queue.Queue[WorkItem[T]],
115+
result_processor: Callable[[str, T], None],
116+
identifier: str,
117+
offset_tracker: OffsetTracker,
118+
) -> None:
119+
super().__init__(daemon=True)
120+
self.worker_id = worker_id
121+
self.work_queue = work_queue
122+
self.result_processor = result_processor
123+
self.identifier = identifier
124+
self.offset_tracker = offset_tracker
125+
self.shutdown = False
126+
127+
def run(self) -> None:
128+
"""Process items from the queue in order."""
129+
while not self.shutdown:
130+
try:
131+
work_item = self.work_queue.get()
132+
except queue.ShutDown:
133+
break
134+
135+
try:
136+
with sentry_sdk.start_transaction(
137+
op="queue_worker.process",
138+
name=f"monitors.{self.identifier}.worker_{self.worker_id}",
139+
):
140+
self.result_processor(self.identifier, work_item.result)
141+
142+
except queue.ShutDown:
143+
break
144+
except Exception:
145+
logger.exception(
146+
"Unexpected error in queue worker", extra={"worker_id": self.worker_id}
147+
)
148+
finally:
149+
self.offset_tracker.complete_offset(work_item.partition, work_item.offset)
150+
metrics.gauge(
151+
"remote_subscriptions.queue_worker.queue_depth",
152+
self.work_queue.qsize(),
153+
tags={
154+
"identifier": self.identifier,
155+
},
156+
)
157+
158+
159+
class FixedQueuePool(Generic[T]):
160+
"""
161+
Fixed pool of queues that guarantees order within groups.
162+
163+
Key properties:
164+
- Each group is consistently assigned to the same queue
165+
- Each queue has exactly one worker thread
166+
- Items within a queue are processed in FIFO order
167+
- No dynamic reassignment that could break ordering
168+
- Tracks offset completion for safe commits
169+
"""
170+
171+
def __init__(
172+
self,
173+
result_processor: Callable[[str, T], None],
174+
identifier: str,
175+
num_queues: int = 20,
176+
) -> None:
177+
self.result_processor = result_processor
178+
self.identifier = identifier
179+
self.num_queues = num_queues
180+
self.offset_tracker = OffsetTracker()
181+
self.queues: list[queue.Queue[WorkItem[T]]] = []
182+
self.workers: list[OrderedQueueWorker[T]] = []
183+
184+
for i in range(num_queues):
185+
work_queue: queue.Queue[WorkItem[T]] = queue.Queue()
186+
self.queues.append(work_queue)
187+
188+
worker = OrderedQueueWorker[T](
189+
worker_id=i,
190+
work_queue=work_queue,
191+
result_processor=result_processor,
192+
identifier=identifier,
193+
offset_tracker=self.offset_tracker,
194+
)
195+
worker.start()
196+
self.workers.append(worker)
197+
198+
def get_queue_for_group(self, group_key: str) -> int:
199+
"""
200+
Get queue index for a group using consistent hashing.
201+
"""
202+
return hash(group_key) % self.num_queues
203+
204+
def submit(self, group_key: str, work_item: WorkItem[T]) -> None:
205+
"""
206+
Submit a work item to the appropriate queue.
207+
"""
208+
queue_index = self.get_queue_for_group(group_key)
209+
work_queue = self.queues[queue_index]
210+
211+
self.offset_tracker.add_offset(work_item.partition, work_item.offset)
212+
work_queue.put(work_item)
213+
214+
def get_stats(self) -> dict[str, Any]:
215+
"""Get statistics about queue depths."""
216+
queue_depths = [q.qsize() for q in self.queues]
217+
return {
218+
"queue_depths": queue_depths,
219+
"total_items": sum(queue_depths),
220+
}
221+
222+
def wait_until_empty(self, timeout: float = 5.0) -> bool:
223+
"""Wait until all queues are empty. Returns True if successful, False if timeout."""
224+
start_time = time.time()
225+
while time.time() - start_time < timeout:
226+
if self.get_stats()["total_items"] == 0:
227+
return True
228+
time.sleep(0.01)
229+
return False
230+
231+
def shutdown(self) -> None:
232+
"""Gracefully shutdown all workers."""
233+
for worker in self.workers:
234+
worker.shutdown = True
235+
236+
for q in self.queues:
237+
try:
238+
q.shutdown(immediate=False)
239+
except Exception:
240+
logger.exception("Error shutting down queue")
241+
242+
for worker in self.workers:
243+
worker.join(timeout=5.0)
244+
245+
246+
class SimpleQueueProcessingStrategy(ProcessingStrategy[KafkaPayload], Generic[T]):
247+
"""
248+
Processing strategy that uses a fixed pool of queues.
249+
250+
Guarantees:
251+
- Items for the same group are processed in order
252+
- No item is lost or processed out of order
253+
- Natural backpressure when queues fill up
254+
- Only commits offsets after successful processing
255+
"""
256+
257+
def __init__(
258+
self,
259+
queue_pool: FixedQueuePool[T],
260+
decoder: Callable[[KafkaPayload | FilteredPayload], T | None],
261+
grouping_fn: Callable[[T], str],
262+
commit_function: Callable[[dict[Partition, int]], None],
263+
) -> None:
264+
self.queue_pool = queue_pool
265+
self.decoder = decoder
266+
self.grouping_fn = grouping_fn
267+
self.commit_function = commit_function
268+
self.shutdown_event = threading.Event()
269+
270+
self.commit_thread = threading.Thread(target=self._commit_loop, daemon=True)
271+
self.commit_thread.start()
272+
273+
def _commit_loop(self) -> None:
274+
while not self.shutdown_event.is_set():
275+
try:
276+
self.shutdown_event.wait(1.0)
277+
278+
committable = self.queue_pool.offset_tracker.get_committable_offsets()
279+
280+
if committable:
281+
metrics.incr(
282+
"remote_subscriptions.queue_pool.offsets_committed",
283+
len(committable),
284+
tags={"identifier": self.queue_pool.identifier},
285+
)
286+
287+
self.commit_function(committable)
288+
for partition, offset in committable.items():
289+
self.queue_pool.offset_tracker.mark_committed(partition, offset)
290+
except Exception:
291+
logger.exception("Error in commit loop")
292+
293+
def submit(self, message: Message[KafkaPayload | FilteredPayload]) -> None:
294+
try:
295+
result = self.decoder(message.payload)
296+
297+
assert isinstance(message.value, BrokerValue)
298+
partition = message.value.partition
299+
offset = message.value.offset
300+
301+
if result is None:
302+
self.queue_pool.offset_tracker.add_offset(partition, offset)
303+
self.queue_pool.offset_tracker.complete_offset(partition, offset)
304+
return
305+
306+
group_key = self.grouping_fn(result)
307+
308+
work_item = WorkItem(
309+
partition=partition,
310+
offset=offset,
311+
result=result,
312+
message=message,
313+
)
314+
315+
self.queue_pool.submit(group_key, work_item)
316+
317+
except Exception:
318+
logger.exception("Error submitting message to queue")
319+
if isinstance(message.value, BrokerValue):
320+
self.queue_pool.offset_tracker.add_offset(
321+
message.value.partition, message.value.offset
322+
)
323+
self.queue_pool.offset_tracker.complete_offset(
324+
message.value.partition, message.value.offset
325+
)
326+
327+
def poll(self) -> None:
328+
stats = self.queue_pool.get_stats()
329+
metrics.gauge(
330+
"remote_subscriptions.queue_pool.total_queued",
331+
stats["total_items"],
332+
tags={"identifier": self.queue_pool.identifier},
333+
)
334+
335+
def close(self) -> None:
336+
self.shutdown_event.set()
337+
self.commit_thread.join(timeout=5.0)
338+
self.queue_pool.shutdown()
339+
340+
def terminate(self) -> None:
341+
self.shutdown_event.set()
342+
self.queue_pool.shutdown()
343+
344+
def join(self, timeout: float | None = None) -> None:
345+
self.close()

0 commit comments

Comments
 (0)