Skip to content

Commit 22bd89f

Browse files
cmanallenandrewshie-sentry
authored andcommitted
ref(replays): Retry deletes on failure and record failed bulk delete jobs as failed (#93996)
Deletes may sometimes fail due to service flakes. We should make an attempt to retry. If we can't delete the replay go into the failed state. We should offer retry logic in a follow-up PR. Or a beat task to automate the retry. But if the error is deterministic then we'll just spin.
1 parent 0365952 commit 22bd89f

File tree

4 files changed

+58
-24
lines changed

4 files changed

+58
-24
lines changed

src/sentry/filestore/gcs.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -393,22 +393,28 @@ def _try(self, callable: Callable[[], None]) -> None:
393393
class GoogleCloudStorageWithReplayUploadPolicy(GoogleCloudStorage):
394394
"""Google cloud storage class with replay upload policy."""
395395

396-
# "try_del" and "try_get" inherit the default behavior. We don't want to exponentially
397-
# wait in those contexts. We're maintaining the status-quo for now but in the future we
398-
# can add policies for these methods or use no policy at all and implement retries at a
399-
# higher, more contextual level.
396+
# "try_get" inherits the default behavior. We don't want to exponentially wait in that
397+
# context. We're maintaining the status-quo for now but in the future we can add policies for
398+
# these methods or use no policy at all and implement retries at a higher, more contextual
399+
# level.
400400
#
401-
# def try_del(self, callable: Callable[[], None]) -> None:
402401
# def try_get(self, callable: Callable[[], None]) -> None:
403402

404-
def try_set(self, callable: Callable[[], None]) -> None:
405-
"""Upload a blob with exponential delay for a maximum of five attempts."""
403+
def create_retry_policy(self):
404+
"""Retry an action with sigmoid delay for a maximum of five attempts."""
406405

407406
def should_retry(attempt: int, e: Exception) -> bool:
408407
"""Retry gateway timeout exceptions up to the limit."""
409408
return attempt <= REPLAY_GCS_RETRIES and isinstance(e, GCS_RETRYABLE_ERRORS)
410409

411410
# Retry cadence: After a brief period of fast retries the function will retry once
412411
# per second for two minutes.
413-
policy = ConditionalRetryPolicy(should_retry, sigmoid_delay())
412+
return ConditionalRetryPolicy(should_retry, sigmoid_delay())
413+
414+
def try_set(self, callable: Callable[[], None]) -> None:
415+
policy = self.create_retry_policy()
416+
policy(callable)
417+
418+
def try_del(self, callable: Callable[[], None]) -> None:
419+
policy = self.create_retry_policy()
414420
policy(callable)

src/sentry/replays/models.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ class DeletionJobStatus(models.TextChoices):
1919
PENDING = "pending", gettext_lazy("Pending")
2020
IN_PROGRESS = "in-progress", gettext_lazy("In Progress")
2121
COMPLETED = "completed", gettext_lazy("Completed")
22+
FAILED = "failed", gettext_lazy("Failed")
2223

2324

2425
@region_silo_model

src/sentry/replays/tasks.py

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -186,25 +186,34 @@ def run_bulk_replay_delete_job(replay_delete_job_id: int, offset: int, limit: in
186186
job = ReplayDeletionJobModel.objects.get(id=replay_delete_job_id)
187187

188188
# If this is the first run of the task we set the model to in-progress.
189-
if offset == 0:
189+
if job.status == DeletionJobStatus.PENDING:
190190
job.status = DeletionJobStatus.IN_PROGRESS
191191
job.save()
192192

193-
# Delete the replays within a limited range. If more replays exist an incremented offset value
194-
# is returned.
195-
results = fetch_rows_matching_pattern(
196-
project_id=job.project_id,
197-
start=job.range_start,
198-
end=job.range_end,
199-
query=job.query,
200-
environment=job.environments,
201-
limit=limit,
202-
offset=offset,
203-
)
193+
# Exit if the job status is failed or completed.
194+
if job.status != DeletionJobStatus.IN_PROGRESS:
195+
return None
196+
197+
try:
198+
# Delete the replays within a limited range. If more replays exist an incremented offset value
199+
# is returned.
200+
results = fetch_rows_matching_pattern(
201+
project_id=job.project_id,
202+
start=job.range_start,
203+
end=job.range_end,
204+
query=job.query,
205+
environment=job.environments,
206+
limit=limit,
207+
offset=offset,
208+
)
204209

205-
# Delete the matched rows if any rows were returned.
206-
if len(results["rows"]) > 0:
207-
delete_matched_rows(job.project_id, results["rows"])
210+
# Delete the matched rows if any rows were returned.
211+
if len(results["rows"]) > 0:
212+
delete_matched_rows(job.project_id, results["rows"])
213+
except Exception:
214+
job.status = DeletionJobStatus.FAILED
215+
job.save()
216+
raise
208217

209218
# Compute the next offset to start from. If no further processing is required then this serves
210219
# as a count of replays deleted.

tests/sentry/replays/tasks/test_delete_replays_bulk.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import uuid
55
from unittest.mock import patch
66

7-
from sentry.replays.models import ReplayDeletionJobModel
7+
from sentry.replays.models import DeletionJobStatus, ReplayDeletionJobModel
88
from sentry.replays.tasks import run_bulk_replay_delete_job
99
from sentry.replays.testutils import mock_replay
1010
from sentry.testutils.cases import APITestCase, ReplaysSnubaTestCase
@@ -180,6 +180,24 @@ def test_run_bulk_replay_delete_job_chained_runs(self):
180180
assert self.job.status == "completed"
181181
assert self.job.offset == 2
182182

183+
def test_run_bulk_replay_delete_job_already_failed(self):
184+
t1 = datetime.datetime.now() - datetime.timedelta(seconds=10)
185+
replay_id1 = uuid.uuid4().hex
186+
self.store_replays(
187+
mock_replay(t1, self.project.id, replay_id1, segment_id=0, environment="prod")
188+
)
189+
190+
self.job.status = DeletionJobStatus.FAILED
191+
self.job.save()
192+
193+
with TaskRunner():
194+
run_bulk_replay_delete_job.delay(self.job.id, offset=0, limit=0)
195+
196+
# Runs were chained.
197+
self.job.refresh_from_db()
198+
assert self.job.status == "failed"
199+
assert self.job.offset == 0
200+
183201
def test_run_bulk_replay_delete_job_no_matches(self):
184202
with TaskRunner():
185203
run_bulk_replay_delete_job.delay(self.job.id, offset=0)

0 commit comments

Comments
 (0)