Skip to content

Commit 130345e

Browse files
authored
fix(grouping): Schedule seer deletion tasks with less hashes (#95156)
The original code would always pass all hashes to all tasks spawned, thus, we could end up with massive payloads for tasks causing trouble to taskbroker. We got into such a situation in the last few days when the deletion of a project would lead to hundreds of thousands of hashes being passed to tasks (179k+ hashes -> 6MB+ task payloads). The changes here would take all hashes from a task, chunk the hashes and spawn new tasks with a small size of hashes. This moves us from sequential scheduling of tasks to parallelized scheduling. This could have an impact on the Seer service if a massive number of hashes are requested for deletion. Ref inc-1236
1 parent a77143a commit 130345e

File tree

2 files changed

+20
-5
lines changed

2 files changed

+20
-5
lines changed

src/sentry/tasks/delete_seer_grouping_records.py

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from sentry.tasks.base import instrumented_task
1515
from sentry.taskworker.config import TaskworkerConfig
1616
from sentry.taskworker.namespaces import seer_tasks
17+
from sentry.utils import metrics
1718
from sentry.utils.query import RangeQuerySetWrapper
1819

1920
logger = logging.getLogger(__name__)
@@ -49,10 +50,23 @@ def delete_seer_grouping_records_by_hash(
4950

5051
batch_size = options.get("embeddings-grouping.seer.delete-record-batch-size") or 100
5152
len_hashes = len(hashes)
52-
end_index = min(last_deleted_index + batch_size, len_hashes)
53-
call_seer_to_delete_these_hashes(project_id, hashes[last_deleted_index:end_index])
54-
if end_index < len_hashes:
55-
delete_seer_grouping_records_by_hash.apply_async(args=[project_id, hashes, end_index])
53+
if len_hashes <= batch_size: # Base case
54+
call_seer_to_delete_these_hashes(project_id, hashes)
55+
else:
56+
if last_deleted_index != 0:
57+
# This tracks which tasks are still being scheduled with the whole list of hashes
58+
metrics.incr(
59+
"grouping.similarity.delete_seer_grouping_records_by_hash.batch_size_exceeded",
60+
sample_rate=options.get("seer.similarity.metrics_sample_rate"),
61+
)
62+
63+
# Iterate through hashes in chunks and schedule a task for each chunk
64+
# There are tasks passing last_deleted_index, thus, we need to start from that index
65+
# Eventually all tasks will pass 0
66+
for i in range(last_deleted_index, len_hashes, batch_size):
67+
# Slice operations are safe and will not raise IndexError
68+
chunked_hashes = hashes[i : i + batch_size]
69+
delete_seer_grouping_records_by_hash.apply_async(args=[project_id, chunked_hashes, 0])
5670

5771

5872
def call_delete_seer_grouping_records_by_hash(

tests/sentry/tasks/test_delete_seer_grouping_records.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ def test_delete_seer_grouping_records_by_hash_batches(
3232
# We call it as a function and will schedule a task for the extra hash
3333
delete_seer_grouping_records_by_hash(project_id, hashes, 0)
3434
assert mock_delete_seer_grouping_records_by_hash_apply_async.call_args[1] == {
35-
"args": [project_id, hashes, 100]
35+
# We do not schedule the task with all the hashes, but only the extra ones
36+
"args": [project_id, hashes[batch_size:], 0]
3637
}
3738

3839
@patch(

0 commit comments

Comments
 (0)