Skip to content

Commit c1bd4c2

Browse files
committed
feat(models): add AbstractChordCounter and update ChordCounter
- Added new abstract model `AbstractChordCounter` in `abstract.py` for Chord synchronization, including fields for `group_id`, `sub_tasks`, and `count`. - Updated `ChordCounter` model in `generic.py` to inherit from `AbstractChordCounter`. - Moved `group_result` method from `ChordCounter` to `AbstractChordCounter`. - Added helper function `chordcounter_model` in `helpers.py` to return the active `ChordCounter` model. - Updated import statements in `database.py` to use `chordcounter_model` helper function. - Added `ChordCounterModel` attribute to `DatabaseBackend` class, and updated `create` and `get` methods to use `ChordCounterModel` instead of `ChordCounter`. Relates to: #305
1 parent 1cf3b52 commit c1bd4c2

File tree

4 files changed

+84
-49
lines changed

4 files changed

+84
-49
lines changed

django_celery_results/backends/database.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,7 @@
1212
from django.db.utils import InterfaceError
1313
from kombu.exceptions import DecodeError
1414

15-
from ..models import ChordCounter
16-
from ..models.helpers import groupresult_model, taskresult_model
15+
from ..models.helpers import chordcounter_model, groupresult_model, taskresult_model
1716
from ..settings import get_task_props_extension
1817

1918
EXCEPTIONS_TO_CATCH = (InterfaceError,)
@@ -32,6 +31,7 @@ class DatabaseBackend(BaseDictBackend):
3231

3332
TaskModel = taskresult_model()
3433
GroupModel = groupresult_model()
34+
ChordCounterModel = chordcounter_model()
3535
subpolling_interval = 0.5
3636

3737
def exception_safe_to_retry(self, exc):
@@ -248,7 +248,7 @@ def apply_chord(self, header_result_args, body, **kwargs):
248248
results = [r.as_tuple() for r in header_result]
249249
chord_size = body.get("chord_size", None) or len(results)
250250
data = json.dumps(results)
251-
ChordCounter.objects.create(
251+
self.ChordCounterModel.objects.create(
252252
group_id=header_result.id, sub_tasks=data, count=chord_size
253253
)
254254

@@ -265,10 +265,10 @@ def on_chord_part_return(self, request, state, result, **kwargs):
265265
# SELECT FOR UPDATE is not supported on all databases
266266
try:
267267
chord_counter = (
268-
ChordCounter.objects.select_for_update()
268+
self.ChordCounterModel.objects.select_for_update()
269269
.get(group_id=gid)
270270
)
271-
except ChordCounter.DoesNotExist:
271+
except self.ChordCounterModel.DoesNotExist:
272272
logger.warning("Can't find ChordCounter for Group %s", gid)
273273
return
274274
chord_counter.count -= 1

django_celery_results/models/abstract.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
"""Abstract models."""
22

3+
import json
4+
35
from celery import states
6+
from celery.result import result_from_tuple
47
from django.conf import settings
58
from django.db import models
69
from django.utils.translation import gettext_lazy as _
710

811
from .. import managers
12+
from ..models.helpers import groupresult_model
913

1014
ALL_STATES = sorted(states.ALL_STATES)
1115
TASK_STATE_CHOICES = sorted(zip(ALL_STATES, ALL_STATES))
@@ -132,6 +136,53 @@ def __str__(self):
132136
return '<Task: {0.task_id} ({0.status})>'.format(self)
133137

134138

139+
class AbstractChordCounter(models.Model):
140+
"""Abstract Chord synchronisation."""
141+
142+
group_id = models.CharField(
143+
max_length=getattr(
144+
settings,
145+
"DJANGO_CELERY_RESULTS_TASK_ID_MAX_LENGTH",
146+
255
147+
),
148+
unique=True,
149+
verbose_name=_("Group ID"),
150+
help_text=_("Celery ID for the Chord header group"),
151+
)
152+
sub_tasks = models.TextField(
153+
help_text=_(
154+
"JSON serialized list of task result tuples. "
155+
"use .group_result() to decode"
156+
)
157+
)
158+
count = models.PositiveIntegerField(
159+
help_text=_(
160+
"Starts at len(chord header) and decrements after each task is "
161+
"finished"
162+
)
163+
)
164+
165+
class Meta:
166+
"""Table information."""
167+
168+
abstract = True
169+
170+
def group_result(self, app=None):
171+
"""Return the GroupResult of self.
172+
173+
Arguments:
174+
---------
175+
app (Celery): app instance to create the GroupResult with.
176+
177+
"""
178+
return groupresult_model()(
179+
self.group_id,
180+
[result_from_tuple(r, app=app)
181+
for r in json.loads(self.sub_tasks)],
182+
app=app
183+
)
184+
185+
135186
class AbstractGroupResult(models.Model):
136187
"""Abstract Task Group result/status."""
137188

django_celery_results/models/generic.py

Lines changed: 4 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,10 @@
22

33
import json
44

5-
from celery.result import GroupResult as CeleryGroupResult
6-
from celery.result import result_from_tuple
7-
from django.conf import settings
8-
from django.db import models
95
from django.utils.translation import gettext_lazy as _
106

117
from django_celery_results.models.abstract import (
8+
AbstractChordCounter,
129
AbstractGroupResult,
1310
AbstractTaskResult,
1411
)
@@ -24,49 +21,13 @@ class Meta(AbstractTaskResult.Meta):
2421
app_label = "django_celery_results"
2522

2623

27-
class ChordCounter(models.Model):
24+
class ChordCounter(AbstractChordCounter):
2825
"""Chord synchronisation."""
2926

30-
group_id = models.CharField(
31-
max_length=getattr(
32-
settings,
33-
"DJANGO_CELERY_RESULTS_TASK_ID_MAX_LENGTH",
34-
255),
35-
unique=True,
36-
verbose_name=_("Group ID"),
37-
help_text=_("Celery ID for the Chord header group"),
38-
)
39-
sub_tasks = models.TextField(
40-
help_text=_(
41-
"JSON serialized list of task result tuples. "
42-
"use .group_result() to decode"
43-
)
44-
)
45-
count = models.PositiveIntegerField(
46-
help_text=_(
47-
"Starts at len(chord header) and decrements after each task is "
48-
"finished"
49-
)
50-
)
51-
52-
class Meta:
27+
class Meta(AbstractChordCounter.Meta):
28+
abstract = False
5329
app_label = "django_celery_results"
5430

55-
def group_result(self, app=None):
56-
"""Return the GroupResult of self.
57-
58-
Arguments:
59-
---------
60-
app (Celery): app instance to create the GroupResult with.
61-
62-
"""
63-
return CeleryGroupResult(
64-
self.group_id,
65-
[result_from_tuple(r, app=app)
66-
for r in json.loads(self.sub_tasks)],
67-
app=app
68-
)
69-
7031

7132
class GroupResult(AbstractGroupResult):
7233
"""Task Group result/status."""

django_celery_results/models/helpers.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from django.conf import settings
33
from django.core.exceptions import ImproperlyConfigured
44

5-
from .generic import GroupResult, TaskResult
5+
from .generic import ChordCounter, GroupResult, TaskResult
66

77

88
def taskresult_model():
@@ -27,6 +27,29 @@ def taskresult_model():
2727
)
2828

2929

30+
def chordcounter_model():
31+
"""Return the ChordCounter model that is active in this project."""
32+
33+
if not hasattr(settings, 'CELERY_RESULTS_CHORDCOUNTER_MODEL'):
34+
return ChordCounter
35+
36+
try:
37+
return apps.get_model(
38+
settings.CELERY_RESULTS_CHORDCOUNTER_MODEL
39+
)
40+
except ValueError:
41+
raise ImproperlyConfigured(
42+
"CELERY_RESULTS_CHORDCOUNTER_MODEL must be of the form "
43+
"'app_label.model_name'"
44+
)
45+
except LookupError:
46+
raise ImproperlyConfigured(
47+
"CELERY_RESULTS_CHORDCOUNTER_MODEL refers to model "
48+
f"'{settings.CELERY_RESULTS_CHORDCOUNTER_MODEL}' that has not "
49+
"been installed"
50+
)
51+
52+
3053
def groupresult_model():
3154
"""Return the GroupResult model that is active in this project."""
3255
if not hasattr(settings, 'CELERY_RESULTS_GROUPRESULT_MODEL'):

0 commit comments

Comments
 (0)