diff --git a/.gitignore b/.gitignore index 1eb8fa24..cb8ce423 100644 --- a/.gitignore +++ b/.gitignore @@ -30,4 +30,5 @@ cover/ htmlcov/ coverage.xml .env -*.ignore \ No newline at end of file +*.ignore +.vscode diff --git a/django_celery_results/admin.py b/django_celery_results/admin.py index de5172af..aff82117 100644 --- a/django_celery_results/admin.py +++ b/django_celery_results/admin.py @@ -10,7 +10,10 @@ ALLOW_EDITS = False pass -from .models import GroupResult, TaskResult +from .models.helpers import groupresult_model, taskresult_model + +GroupResult = groupresult_model() +TaskResult = taskresult_model() class TaskResultAdmin(admin.ModelAdmin): diff --git a/django_celery_results/backends/database.py b/django_celery_results/backends/database.py index a4e364a6..dc20f2c1 100644 --- a/django_celery_results/backends/database.py +++ b/django_celery_results/backends/database.py @@ -12,9 +12,12 @@ from django.db.utils import InterfaceError from kombu.exceptions import DecodeError -from ..models import ChordCounter -from ..models import GroupResult as GroupResultModel -from ..models import TaskResult +from ..models.helpers import ( + chordcounter_model, + groupresult_model, + taskresult_model, +) +from ..settings import get_task_props_extension EXCEPTIONS_TO_CATCH = (InterfaceError,) @@ -30,8 +33,9 @@ class DatabaseBackend(BaseDictBackend): """The Django database backend, using models to store task state.""" - TaskModel = TaskResult - GroupModel = GroupResultModel + TaskModel = taskresult_model() + GroupModel = groupresult_model() + ChordCounterModel = chordcounter_model() subpolling_interval = 0.5 def exception_safe_to_retry(self, exc): @@ -80,6 +84,14 @@ def _get_extended_properties(self, request, traceback): # task protocol 1 task_kwargs = getattr(request, 'kwargs', None) + # TODO: We assume that task protocol 1 could be always in use. :/ + extra_fields = get_task_props_extension( + request, + getattr(request, 'kwargs', None) + ) + if extra_fields: + extended_props.update({"extra_fields": extra_fields}) + # Encode input arguments if task_args is not None: _, _, task_args = self.encode_content(task_args) @@ -141,9 +153,8 @@ def _store_result( 'using': using, } - task_props.update( - self._get_extended_properties(request, traceback) - ) + task_props.update(self._get_extended_properties(request, traceback)) + task_props.update(get_task_props_extension(request, dict(task_props))) if status == states.STARTED: task_props['date_started'] = Now() @@ -242,7 +253,7 @@ def apply_chord(self, header_result_args, body, **kwargs): results = [r.as_tuple() for r in header_result] chord_size = body.get("chord_size", None) or len(results) data = json.dumps(results) - ChordCounter.objects.create( + self.ChordCounterModel.objects.create( group_id=header_result.id, sub_tasks=data, count=chord_size ) @@ -252,17 +263,19 @@ def on_chord_part_return(self, request, state, result, **kwargs): if not gid or not tid: return call_callback = False - with transaction.atomic(using=router.db_for_write(ChordCounter)): + with transaction.atomic( + using=router.db_for_write(self.ChordCounterModel) + ): # We need to know if `count` hits 0. # wrap the update in a transaction # with a `select_for_update` lock to prevent race conditions. # SELECT FOR UPDATE is not supported on all databases try: chord_counter = ( - ChordCounter.objects.select_for_update() + self.ChordCounterModel.objects.select_for_update() .get(group_id=gid) ) - except ChordCounter.DoesNotExist: + except self.ChordCounterModel.DoesNotExist: logger.warning("Can't find ChordCounter for Group %s", gid) return chord_counter.count -= 1 diff --git a/django_celery_results/managers.py b/django_celery_results/managers.py index 1caa124b..274b85cf 100644 --- a/django_celery_results/managers.py +++ b/django_celery_results/managers.py @@ -119,7 +119,7 @@ def store_result(self, content_type, content_encoding, traceback=None, meta=None, periodic_task_name=None, task_name=None, task_args=None, task_kwargs=None, - worker=None, using=None, **kwargs): + worker=None, using=None, extra_fields=None, **kwargs): """Store the result and status of a task. Arguments: @@ -140,6 +140,7 @@ def store_result(self, content_type, content_encoding, exception (only passed if the task failed). meta (str): Serialized result meta data (this contains e.g. children). + extra_fields (dict, optional): Extra (model) fields to store. Keyword Arguments: exception_retry_count (int): How many times to retry by @@ -159,8 +160,12 @@ def store_result(self, content_type, content_encoding, 'task_name': task_name, 'task_args': task_args, 'task_kwargs': task_kwargs, - 'worker': worker + 'worker': worker, } + + if extra_fields is not None: + fields.update(extra_fields) + if 'date_started' in kwargs: fields['date_started'] = kwargs['date_started'] diff --git a/django_celery_results/models/__init__.py b/django_celery_results/models/__init__.py new file mode 100644 index 00000000..5e478755 --- /dev/null +++ b/django_celery_results/models/__init__.py @@ -0,0 +1,3 @@ +from .generic import ChordCounter, GroupResult, TaskResult + +__all__ = ["ChordCounter", "GroupResult", "TaskResult"] diff --git a/django_celery_results/models.py b/django_celery_results/models/abstract.py similarity index 93% rename from django_celery_results/models.py rename to django_celery_results/models/abstract.py index b472a5af..f193c479 100644 --- a/django_celery_results/models.py +++ b/django_celery_results/models/abstract.py @@ -1,4 +1,4 @@ -"""Database models.""" +"""Abstract models.""" import json @@ -9,14 +9,14 @@ from django.db import models from django.utils.translation import gettext_lazy as _ -from . import managers +from .. import managers ALL_STATES = sorted(states.ALL_STATES) TASK_STATE_CHOICES = sorted(zip(ALL_STATES, ALL_STATES)) -class TaskResult(models.Model): - """Task result/status.""" +class AbstractTaskResult(models.Model): + """Abstract Task result/status.""" task_id = models.CharField( max_length=getattr( @@ -97,8 +97,8 @@ class TaskResult(models.Model): class Meta: """Table information.""" + abstract = True ordering = ['-date_done'] - verbose_name = _('task result') verbose_name_plural = _('task results') @@ -136,14 +136,15 @@ def __str__(self): return ''.format(self) -class ChordCounter(models.Model): - """Chord synchronisation.""" +class AbstractChordCounter(models.Model): + """Abstract Chord synchronisation.""" group_id = models.CharField( max_length=getattr( settings, "DJANGO_CELERY_RESULTS_TASK_ID_MAX_LENGTH", - 255), + 255 + ), unique=True, verbose_name=_("Group ID"), help_text=_("Celery ID for the Chord header group"), @@ -161,12 +162,17 @@ class ChordCounter(models.Model): ) ) + class Meta: + """Table information.""" + + abstract = True + def group_result(self, app=None): - """Return the :class:`celery.result.GroupResult` of self. + """Return the GroupResult of self. Arguments: - app (celery.app.base.Celery): app instance to create the - :class:`celery.result.GroupResult` with. + --------- + app (Celery): app instance to create the GroupResult with. """ return CeleryGroupResult( @@ -177,8 +183,8 @@ def group_result(self, app=None): ) -class GroupResult(models.Model): - """Task Group result/status.""" +class AbstractGroupResult(models.Model): + """Abstract Task Group result/status.""" group_id = models.CharField( max_length=getattr( @@ -231,10 +237,10 @@ def __str__(self): class Meta: """Table information.""" - ordering = ['-date_done'] - + abstract = True verbose_name = _('group result') verbose_name_plural = _('group results') + ordering = ['-date_done'] # Explicit names to solve https://code.djangoproject.com/ticket/33483 indexes = [ diff --git a/django_celery_results/models/generic.py b/django_celery_results/models/generic.py new file mode 100644 index 00000000..7b626dae --- /dev/null +++ b/django_celery_results/models/generic.py @@ -0,0 +1,37 @@ +"""Database models.""" + +from django_celery_results.models.abstract import ( + AbstractChordCounter, + AbstractGroupResult, + AbstractTaskResult, +) + + +class TaskResult(AbstractTaskResult): + """Task result/status.""" + + class Meta(AbstractTaskResult.Meta): + """Table information.""" + + abstract = False + app_label = "django_celery_results" + + +class ChordCounter(AbstractChordCounter): + """Chord synchronisation.""" + + class Meta(AbstractChordCounter.Meta): + """Table information.""" + + abstract = False + app_label = "django_celery_results" + + +class GroupResult(AbstractGroupResult): + """Task Group result/status.""" + + class Meta(AbstractGroupResult.Meta): + """Table information.""" + + abstract = False + app_label = "django_celery_results" diff --git a/django_celery_results/models/helpers.py b/django_celery_results/models/helpers.py new file mode 100644 index 00000000..0d9da60a --- /dev/null +++ b/django_celery_results/models/helpers.py @@ -0,0 +1,72 @@ +from django.apps import apps +from django.conf import settings +from django.core.exceptions import ImproperlyConfigured + +from .generic import ChordCounter, GroupResult, TaskResult + + +def taskresult_model(): + """Return the TaskResult model that is active in this project.""" + if not hasattr(settings, 'CELERY_RESULTS_TASKRESULT_MODEL'): + return TaskResult + + try: + return apps.get_model( + settings.CELERY_RESULTS_TASKRESULT_MODEL + ) + except ValueError: + raise ImproperlyConfigured( + "CELERY_RESULTS_TASKRESULT_MODEL must be of the form " + "'app_label.model_name'" + ) + except LookupError: + raise ImproperlyConfigured( + "CELERY_RESULTS_TASKRESULT_MODEL refers to model " + f"'{settings.CELERY_RESULTS_TASKRESULT_MODEL}' that has not " + "been installed" + ) + + +def chordcounter_model(): + """Return the ChordCounter model that is active in this project.""" + + if not hasattr(settings, 'CELERY_RESULTS_CHORDCOUNTER_MODEL'): + return ChordCounter + + try: + return apps.get_model( + settings.CELERY_RESULTS_CHORDCOUNTER_MODEL + ) + except ValueError: + raise ImproperlyConfigured( + "CELERY_RESULTS_CHORDCOUNTER_MODEL must be of the form " + "'app_label.model_name'" + ) + except LookupError: + raise ImproperlyConfigured( + "CELERY_RESULTS_CHORDCOUNTER_MODEL refers to model " + f"'{settings.CELERY_RESULTS_CHORDCOUNTER_MODEL}' that has not " + "been installed" + ) + + +def groupresult_model(): + """Return the GroupResult model that is active in this project.""" + if not hasattr(settings, 'CELERY_RESULTS_GROUPRESULT_MODEL'): + return GroupResult + + try: + return apps.get_model( + settings.CELERY_RESULTS_GROUPRESULT_MODEL + ) + except ValueError: + raise ImproperlyConfigured( + "CELERY_RESULTS_GROUPRESULT_MODEL must be of the form " + "'app_label.model_name'" + ) + except LookupError: + raise ImproperlyConfigured( + "CELERY_RESULTS_GROUPRESULT_MODEL refers to model " + f"'{settings.CELERY_RESULTS_GROUPRESULT_MODEL}' that has not " + "been installed" + ) diff --git a/django_celery_results/settings.py b/django_celery_results/settings.py new file mode 100644 index 00000000..022b52a0 --- /dev/null +++ b/django_celery_results/settings.py @@ -0,0 +1,35 @@ +from collections.abc import Mapping + +from django.conf import settings +from django.core.exceptions import ImproperlyConfigured + + +def get_callback_function(settings_name, default=None): + """Return the callback function for the given settings name.""" + callback = getattr(settings, settings_name, None) + if not callback: + return default + + if not callable(callback): + raise ImproperlyConfigured(f"{settings_name} must be callable.") + + return callback + + +extend_task_props_callback = get_callback_function( + "CELERY_RESULTS_EXTEND_TASK_PROPS_CALLBACK" +) + + +def get_task_props_extension(request, task_props): + """Extend the task properties with custom props to fill custom models.""" + if not extend_task_props_callback: + return {} + + task_props_extension = extend_task_props_callback(request, task_props) or {} # noqa E501 + if not isinstance(task_props_extension, Mapping): + raise ImproperlyConfigured( + "CELERY_RESULTS_EXTEND_TASK_PROPS_CALLBACK must return a Mapping." + ) + + return task_props_extension diff --git a/docs/extending_task_results.rst b/docs/extending_task_results.rst new file mode 100644 index 00000000..167422b9 --- /dev/null +++ b/docs/extending_task_results.rst @@ -0,0 +1,43 @@ +Extending Task Results +====================== + +There are situations where you want to extend the Task Results with additional information that will make you able to retrieve information that was important at execution time of the task but not part of the task result itself. For example if you use :pypi:`django-celery-results` to track the task results from a tenant. + +To extend the Task Results model follow the next steps: + +#. Create a custom model that inherits from the abstract base class `django_celery_results.models.abstract.AbstractTaskResult`: + + .. code-block:: python + + from django_celery_results.models.abstract import AbstractTaskResult + + class TaskResult(AbstractTaskResult): + tenant = models.ForeignKey(Tenant, on_delete=models.CASCADE, null=True) + +#. Tell Django to use the custom `TaskResult` model by setting the `CELERY_RESULTS_TASKRESULT_MODEL` constant to the path of the custom model. + + .. code-block:: python + + CELERY_RESULTS_TASKRESULT_MODEL = 'myapp.TaskResult' + +#. Write a function that will consume a `request` and `task_properties` as positional arguments and will return a dictionary with the additional information that you want to store in your custom `TaskResult` model. The keys of this dictionary will be the fields of the custom model and the values the data you can retrieve from the `request` and/or `task_properties`. + + .. code-block:: python + + def extend_task_props_callback(request, task_properties): + """Extend task props with custom data from task_kwargs.""" + task_kwargs = getattr(request, "kwargs", None) + + return {"tenant_id": task_kwargs.get("tenant_id", None)} + +#. To let :pypi:`django-celery-results` call this function, you'll have to set the `CELERY_RESULTS_EXTEND_TASK_PROPS_CALLBACK` constant in your Django project's :file:`settings.py`. + + .. code-block:: python + + CELERY_RESULTS_EXTEND_TASK_PROPS_CALLBACK = extend_task_props_callback + +#. Finally make sure that you're passing the additional information to the celery task when you're calling it. + + .. code-block:: python + + task.apply_async(kwargs={"tenant_id": tenant.id}) \ No newline at end of file diff --git a/docs/index.rst b/docs/index.rst index ad00baf2..65441516 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -11,6 +11,7 @@ Contents :maxdepth: 1 getting_started + extending_task_results injecting_metadata copyright diff --git a/setup.cfg b/setup.cfg index 26f1ac65..8086949d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -18,3 +18,8 @@ match-dir = [^migrations] [isort] profile=black +line_length=79 +multi_line_output=3 + +[black] +line-length = 79