Skip to content

resolve #305 abstract models #314

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 29 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
5e10a6a
[#305] Added abstract models.
jose-padin May 4, 2022
1b1facb
[#305] `ChordCounter` moved from `abstract` module to `generic` module.
jose-padin May 4, 2022
b597c1c
Issue 305: abstract models
jose-padin May 12, 2022
1311f5e
Update django_celery_results/models/__init__.py
auvipy Jun 7, 2022
ab74d4d
[#305]: Improving abstract models implementation.
diegocastrum Aug 10, 2022
8c83433
[#305]: `extend_task_props_callback` relocated.
diegocastrum Aug 10, 2022
0ea8ab8
[#305]: Added a default callable to `get_callback_function`
diegocastrum Aug 15, 2022
603637b
Added newline to the end of
diegocastrum Aug 16, 2022
20a03e0
[#305] Added a sanity check to `task_props_extension`
diegocastrum Aug 16, 2022
97880a2
Fixed a NoneType error when the callback is not defined in project se…
diegocastrum Aug 16, 2022
49874bc
[#305] Added documentation about this feature.
diegocastrum Oct 13, 2022
8509a64
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 17, 2022
d72b7e8
Fixed a "wrong" description for the `ImproperlyConfigured` exception …
diegocastrum Aug 17, 2022
4b9dfef
[#305] Fixed some pre-commit failures
diegocastrum Oct 13, 2022
60fd681
Update docs/extending_task_results.rst
diegocastrum Oct 17, 2022
c2042fa
Update docs/extending_task_results.rst
diegocastrum Oct 17, 2022
1cf3b52
Update docs/extending_task_results.rst
diegocastrum Oct 17, 2022
c1bd4c2
feat(models): add `AbstractChordCounter` and update `ChordCounter`
diegocastrum Oct 21, 2024
3591140
fix: refactor helper functions to avoid circular dependencies
diegocastrum Oct 29, 2024
885f08c
fix: undefined name 'ChordCounter' and minor fixes
diegocastrum Oct 29, 2024
3f1bfac
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 29, 2024
271b080
fix: 'TypeError' introduced in previous commit
diegocastrum Oct 31, 2024
7395269
fix: include 'extra_fields' conditionally
diegocastrum Oct 31, 2024
ed17294
fix: 'get_task_props_extensions()' missing 1 required argument
diegocastrum Nov 1, 2024
7be45b5
fix: TypeError introducedn on prev commit on 'AbstractChordCounter'
diegocastrum Nov 5, 2024
e7fb95e
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 5, 2024
6245d83
fix: ImportError introduced in previous commit in 'abstract.py'
diegocastrum Nov 6, 2024
34e36cd
style: Reformat import statements for better readability in 'database…
diegocastrum Nov 8, 2024
4c24fde
fix: Update configuration for isort and black to enforce line length …
diegocastrum Nov 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ cover/
htmlcov/
coverage.xml
.env
*.ignore
*.ignore
.vscode
5 changes: 4 additions & 1 deletion django_celery_results/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
ALLOW_EDITS = False
pass

from .models import GroupResult, TaskResult
from .models.helpers import groupresult_model, taskresult_model

GroupResult = groupresult_model()
TaskResult = taskresult_model()


class TaskResultAdmin(admin.ModelAdmin):
Expand Down
37 changes: 25 additions & 12 deletions django_celery_results/backends/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@
from django.db.utils import InterfaceError
from kombu.exceptions import DecodeError

from ..models import ChordCounter
from ..models import GroupResult as GroupResultModel
from ..models import TaskResult
from ..models.helpers import (
chordcounter_model,
groupresult_model,
taskresult_model,
)
from ..settings import get_task_props_extension

EXCEPTIONS_TO_CATCH = (InterfaceError,)

Expand All @@ -30,8 +33,9 @@
class DatabaseBackend(BaseDictBackend):
"""The Django database backend, using models to store task state."""

TaskModel = TaskResult
GroupModel = GroupResultModel
TaskModel = taskresult_model()
GroupModel = groupresult_model()
ChordCounterModel = chordcounter_model()
subpolling_interval = 0.5

def exception_safe_to_retry(self, exc):
Expand Down Expand Up @@ -80,6 +84,14 @@ def _get_extended_properties(self, request, traceback):
# task protocol 1
task_kwargs = getattr(request, 'kwargs', None)

# TODO: We assume that task protocol 1 could be always in use. :/
extra_fields = get_task_props_extension(
request,
getattr(request, 'kwargs', None)
)
if extra_fields:
extended_props.update({"extra_fields": extra_fields})

# Encode input arguments
if task_args is not None:
_, _, task_args = self.encode_content(task_args)
Expand Down Expand Up @@ -141,9 +153,8 @@ def _store_result(
'using': using,
}

task_props.update(
self._get_extended_properties(request, traceback)
)
task_props.update(self._get_extended_properties(request, traceback))
task_props.update(get_task_props_extension(request, dict(task_props)))

if status == states.STARTED:
task_props['date_started'] = Now()
Expand Down Expand Up @@ -242,7 +253,7 @@ def apply_chord(self, header_result_args, body, **kwargs):
results = [r.as_tuple() for r in header_result]
chord_size = body.get("chord_size", None) or len(results)
data = json.dumps(results)
ChordCounter.objects.create(
self.ChordCounterModel.objects.create(
group_id=header_result.id, sub_tasks=data, count=chord_size
)

Expand All @@ -252,17 +263,19 @@ def on_chord_part_return(self, request, state, result, **kwargs):
if not gid or not tid:
return
call_callback = False
with transaction.atomic(using=router.db_for_write(ChordCounter)):
with transaction.atomic(
using=router.db_for_write(self.ChordCounterModel)
):
# We need to know if `count` hits 0.
# wrap the update in a transaction
# with a `select_for_update` lock to prevent race conditions.
# SELECT FOR UPDATE is not supported on all databases
try:
chord_counter = (
ChordCounter.objects.select_for_update()
self.ChordCounterModel.objects.select_for_update()
.get(group_id=gid)
)
except ChordCounter.DoesNotExist:
except self.ChordCounterModel.DoesNotExist:
logger.warning("Can't find ChordCounter for Group %s", gid)
return
chord_counter.count -= 1
Expand Down
9 changes: 7 additions & 2 deletions django_celery_results/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def store_result(self, content_type, content_encoding,
traceback=None, meta=None,
periodic_task_name=None,
task_name=None, task_args=None, task_kwargs=None,
worker=None, using=None, **kwargs):
worker=None, using=None, extra_fields=None, **kwargs):
"""Store the result and status of a task.

Arguments:
Expand All @@ -140,6 +140,7 @@ def store_result(self, content_type, content_encoding,
exception (only passed if the task failed).
meta (str): Serialized result meta data (this contains e.g.
children).
extra_fields (dict, optional): Extra (model) fields to store.

Keyword Arguments:
exception_retry_count (int): How many times to retry by
Expand All @@ -159,8 +160,12 @@ def store_result(self, content_type, content_encoding,
'task_name': task_name,
'task_args': task_args,
'task_kwargs': task_kwargs,
'worker': worker
'worker': worker,
}

if extra_fields is not None:
fields.update(extra_fields)

if 'date_started' in kwargs:
fields['date_started'] = kwargs['date_started']

Expand Down
3 changes: 3 additions & 0 deletions django_celery_results/models/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .generic import ChordCounter, GroupResult, TaskResult

__all__ = ["ChordCounter", "GroupResult", "TaskResult"]
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Database models."""
"""Abstract models."""

import json

Expand All @@ -9,14 +9,14 @@
from django.db import models
from django.utils.translation import gettext_lazy as _

from . import managers
from .. import managers

ALL_STATES = sorted(states.ALL_STATES)
TASK_STATE_CHOICES = sorted(zip(ALL_STATES, ALL_STATES))


class TaskResult(models.Model):
"""Task result/status."""
class AbstractTaskResult(models.Model):
"""Abstract Task result/status."""

task_id = models.CharField(
max_length=getattr(
Expand Down Expand Up @@ -97,8 +97,8 @@ class TaskResult(models.Model):
class Meta:
"""Table information."""

abstract = True
ordering = ['-date_done']

verbose_name = _('task result')
verbose_name_plural = _('task results')

Expand Down Expand Up @@ -136,14 +136,15 @@ def __str__(self):
return '<Task: {0.task_id} ({0.status})>'.format(self)


class ChordCounter(models.Model):
"""Chord synchronisation."""
class AbstractChordCounter(models.Model):
"""Abstract Chord synchronisation."""

group_id = models.CharField(
max_length=getattr(
settings,
"DJANGO_CELERY_RESULTS_TASK_ID_MAX_LENGTH",
255),
255
),
unique=True,
verbose_name=_("Group ID"),
help_text=_("Celery ID for the Chord header group"),
Expand All @@ -161,12 +162,17 @@ class ChordCounter(models.Model):
)
)

class Meta:
"""Table information."""

abstract = True

def group_result(self, app=None):
"""Return the :class:`celery.result.GroupResult` of self.
"""Return the GroupResult of self.

Arguments:
app (celery.app.base.Celery): app instance to create the
:class:`celery.result.GroupResult` with.
---------
app (Celery): app instance to create the GroupResult with.

"""
return CeleryGroupResult(
Expand All @@ -177,8 +183,8 @@ def group_result(self, app=None):
)


class GroupResult(models.Model):
"""Task Group result/status."""
class AbstractGroupResult(models.Model):
"""Abstract Task Group result/status."""

group_id = models.CharField(
max_length=getattr(
Expand Down Expand Up @@ -231,10 +237,10 @@ def __str__(self):
class Meta:
"""Table information."""

ordering = ['-date_done']

abstract = True
verbose_name = _('group result')
verbose_name_plural = _('group results')
ordering = ['-date_done']

# Explicit names to solve https://code.djangoproject.com/ticket/33483
indexes = [
Expand Down
37 changes: 37 additions & 0 deletions django_celery_results/models/generic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""Database models."""

from django_celery_results.models.abstract import (
AbstractChordCounter,
AbstractGroupResult,
AbstractTaskResult,
)


class TaskResult(AbstractTaskResult):
"""Task result/status."""

class Meta(AbstractTaskResult.Meta):
"""Table information."""

abstract = False
app_label = "django_celery_results"


class ChordCounter(AbstractChordCounter):
"""Chord synchronisation."""

class Meta(AbstractChordCounter.Meta):
"""Table information."""

abstract = False
app_label = "django_celery_results"


class GroupResult(AbstractGroupResult):
"""Task Group result/status."""

class Meta(AbstractGroupResult.Meta):
"""Table information."""

abstract = False
app_label = "django_celery_results"
72 changes: 72 additions & 0 deletions django_celery_results/models/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from django.apps import apps
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured

from .generic import ChordCounter, GroupResult, TaskResult


def taskresult_model():
"""Return the TaskResult model that is active in this project."""
if not hasattr(settings, 'CELERY_RESULTS_TASKRESULT_MODEL'):
return TaskResult

try:
return apps.get_model(
settings.CELERY_RESULTS_TASKRESULT_MODEL
)
except ValueError:
raise ImproperlyConfigured(
"CELERY_RESULTS_TASKRESULT_MODEL must be of the form "
"'app_label.model_name'"
)
except LookupError:
raise ImproperlyConfigured(
"CELERY_RESULTS_TASKRESULT_MODEL refers to model "
f"'{settings.CELERY_RESULTS_TASKRESULT_MODEL}' that has not "
"been installed"
)


def chordcounter_model():
"""Return the ChordCounter model that is active in this project."""

if not hasattr(settings, 'CELERY_RESULTS_CHORDCOUNTER_MODEL'):
return ChordCounter

try:
return apps.get_model(
settings.CELERY_RESULTS_CHORDCOUNTER_MODEL
)
except ValueError:
raise ImproperlyConfigured(
"CELERY_RESULTS_CHORDCOUNTER_MODEL must be of the form "
"'app_label.model_name'"
)
except LookupError:
raise ImproperlyConfigured(
"CELERY_RESULTS_CHORDCOUNTER_MODEL refers to model "
f"'{settings.CELERY_RESULTS_CHORDCOUNTER_MODEL}' that has not "
"been installed"
)


def groupresult_model():
"""Return the GroupResult model that is active in this project."""
if not hasattr(settings, 'CELERY_RESULTS_GROUPRESULT_MODEL'):
return GroupResult

try:
return apps.get_model(
settings.CELERY_RESULTS_GROUPRESULT_MODEL
)
except ValueError:
raise ImproperlyConfigured(
"CELERY_RESULTS_GROUPRESULT_MODEL must be of the form "
"'app_label.model_name'"
)
except LookupError:
raise ImproperlyConfigured(
"CELERY_RESULTS_GROUPRESULT_MODEL refers to model "
f"'{settings.CELERY_RESULTS_GROUPRESULT_MODEL}' that has not "
"been installed"
)
35 changes: 35 additions & 0 deletions django_celery_results/settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from collections.abc import Mapping

from django.conf import settings
from django.core.exceptions import ImproperlyConfigured


def get_callback_function(settings_name, default=None):
"""Return the callback function for the given settings name."""
callback = getattr(settings, settings_name, None)
if not callback:
return default

if not callable(callback):
raise ImproperlyConfigured(f"{settings_name} must be callable.")

return callback


extend_task_props_callback = get_callback_function(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding some sanity checks on the return value of the callback.
For example that it complies to the Mapping protocol.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, get_callback_function is quite generic, and could be used in the future for another purposes, returning different types of data. So far I can see, the sanity checks could be in _store_results() where extend_task_props_callback() is called. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm personally not a fan of this generic function, I'd err of the side of caution and make the callback handling as clean and explicit as possible so any errors we need to raise have a clear source.
But this is more of a code-style thing so maybe one of the maintainers (@auvipy) can comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point @AllexVeldman, what do you think @auvipy?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AllexVeldman & @auvipy I think I found a proper way to control the callback internally being able to check explicitly that return value.

I just created a new function called get_task_props_extension() into the settings module in charge to return an empty dict in case that the CELERY_RESULTS_EXTEND_TASK_PROPS_CALLBACK is undefined and otherwise will check that the return value complies with the Mapping protocol.

Let me know what you think!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

"CELERY_RESULTS_EXTEND_TASK_PROPS_CALLBACK"
)


def get_task_props_extension(request, task_props):
"""Extend the task properties with custom props to fill custom models."""
if not extend_task_props_callback:
return {}

task_props_extension = extend_task_props_callback(request, task_props) or {} # noqa E501
if not isinstance(task_props_extension, Mapping):
raise ImproperlyConfigured(
"CELERY_RESULTS_EXTEND_TASK_PROPS_CALLBACK must return a Mapping."
)

return task_props_extension
Loading