Skip to content

Commit 28fb1c6

Browse files
juspenceJustin Spencer
andauthored
Add retry logic for Django and Psycopg2 InterfaceErrors (#290)
* Add retry logic for Django and Psycopg2 InterfaceErrors * Add unit tests for retry logic Co-authored-by: Justin Spencer <juspence+gitlab@redhat.com>
1 parent 6167179 commit 28fb1c6

File tree

2 files changed

+117
-1
lines changed

2 files changed

+117
-1
lines changed

django_celery_results/backends/database.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,17 @@
77
from celery.result import GroupResult, allow_join_result, result_from_tuple
88
from celery.utils.log import get_logger
99
from celery.utils.serialization import b64decode, b64encode
10-
from django.db import transaction
10+
from django.db import connection, transaction
11+
from django.db.utils import InterfaceError
1112
from kombu.exceptions import DecodeError
13+
from psycopg2 import InterfaceError as Psycopg2InterfaceError
1214

1315
from ..models import ChordCounter
1416
from ..models import GroupResult as GroupResultModel
1517
from ..models import TaskResult
1618

19+
EXCEPTIONS_TO_CATCH = (InterfaceError, Psycopg2InterfaceError)
20+
1721
logger = get_logger(__name__)
1822

1923

@@ -24,6 +28,27 @@ class DatabaseBackend(BaseDictBackend):
2428
GroupModel = GroupResultModel
2529
subpolling_interval = 0.5
2630

31+
def exception_safe_to_retry(self, exc):
32+
"""Check if an exception is safe to retry.
33+
34+
Backends have to overload this method with correct predicates
35+
dealing with their exceptions.
36+
37+
By default no exception is safe to retry, it's up to
38+
backend implementation to define which exceptions are safe.
39+
40+
For Celery / django-celery-results, retry Django / Psycopg2
41+
InterfaceErrors, like "Connection already closed", with new connection.
42+
43+
Set result_backend_always_retry to True in order to enable retries.
44+
"""
45+
for exc_type in EXCEPTIONS_TO_CATCH:
46+
if isinstance(exc, exc_type):
47+
# Only called if InterfaceError occurs and always_retry is True
48+
connection.close()
49+
return True
50+
return False
51+
2752
def _store_result(
2853
self,
2954
task_id,

t/unit/test_models.py

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
from datetime import datetime, timedelta
2+
from unittest.mock import patch
23

34
import pytest
45
from celery import states, uuid
56
from django.db import transaction
7+
from django.db.utils import InterfaceError
68
from django.test import TransactionTestCase
79

10+
from django_celery_results.backends import DatabaseBackend
811
from django_celery_results.models import GroupResult, TaskResult
912
from django_celery_results.utils import now
1013

@@ -93,6 +96,94 @@ class TransactionError(Exception):
9396
assert TaskResult.objects.get_task(m1.task_id).status != states.SUCCESS
9497
assert TaskResult.objects.get_task(m2.task_id).status == states.SUCCESS
9598

99+
def test_retry_store_result_fails(self):
100+
"""
101+
Test the retry logic for InterfaceErrors.
102+
When result_backend_always_retry is False,
103+
and an InterfaceError is raised during _store_result(),
104+
then the InterfaceError will be re-raised.
105+
"""
106+
m = self.create_task_result()
107+
assert set(TaskResult.objects.all()) == set(
108+
TaskResult.objects.using('secondary').all()
109+
)
110+
111+
always_retry = self.app.conf.get('result_backend_always_retry')
112+
self.app.conf.result_backend_always_retry = False
113+
backend = DatabaseBackend(self.app)
114+
115+
with patch.object(
116+
backend,
117+
'_store_result',
118+
side_effect=[
119+
InterfaceError('Connection closed')
120+
]
121+
) as patched_store_result:
122+
with patch.object(
123+
backend,
124+
'exception_safe_to_retry',
125+
return_value=backend.exception_safe_to_retry
126+
) as patched_safe_to_retry:
127+
# InterfaceError should be re-raised
128+
with pytest.raises(InterfaceError):
129+
backend.store_result(
130+
m.task_id,
131+
result=states.SUCCESS,
132+
state=states.SUCCESS
133+
)
134+
assert patched_safe_to_retry.call_count == 0
135+
assert patched_store_result.call_count == 1
136+
137+
self.app.conf.result_backend_always_retry = always_retry
138+
if always_retry is None:
139+
del self.app.conf.result_backend_always_retry
140+
141+
def test_retry_store_result_succeeds(self):
142+
"""
143+
Test the retry logic for InterfaceErrors.
144+
When result_backend_always_retry is True,
145+
and an InterfaceError is raised during _store_result(),
146+
then the InterfaceError will be hidden,
147+
the connection to the database will be closed,
148+
and then automatically reopened for the next retry.
149+
"""
150+
m = self.create_task_result()
151+
assert set(TaskResult.objects.all()) == set(
152+
TaskResult.objects.using('secondary').all()
153+
)
154+
155+
always_retry = self.app.conf.get('result_backend_always_retry')
156+
self.app.conf.result_backend_always_retry = True
157+
backend = DatabaseBackend(self.app)
158+
159+
with patch.object(
160+
backend,
161+
'_store_result',
162+
side_effect=[
163+
InterfaceError('Connection closed'),
164+
backend._store_result
165+
]
166+
) as patched_store_result:
167+
with patch.object(
168+
backend,
169+
'exception_safe_to_retry',
170+
return_value=backend.exception_safe_to_retry
171+
) as patched_safe_to_retry:
172+
# InterfaceError should be hidden
173+
# And new connection opened
174+
# Then unpatched function called for retry
175+
backend.store_result(
176+
m.task_id,
177+
result=states.SUCCESS,
178+
state=states.SUCCESS
179+
)
180+
assert patched_safe_to_retry.call_count == 1
181+
assert patched_store_result.call_count == 2
182+
183+
self.app.conf.result_backend_always_retry = always_retry
184+
if always_retry is None:
185+
del self.app.conf.result_backend_always_retry
186+
96187
def create_group_result(self):
97188
id = uuid()
98189
taskmeta, created = GroupResult.objects.get_or_create(group_id=id)

0 commit comments

Comments
 (0)