1
1
"""Result Task Admin interface."""
2
2
3
+ from celery import current_app as celery_app
3
4
from django .conf import settings
4
- from django .contrib import admin
5
+ from django .contrib import admin , messages
5
6
from django .utils .translation import gettext_lazy as _
6
7
7
8
try :
@@ -58,6 +59,7 @@ class TaskResultAdmin(admin.ModelAdmin):
58
59
'classes' : ('extrapretty' , 'wide' )
59
60
}),
60
61
)
62
+ actions = ['terminate_task' ]
61
63
62
64
def get_readonly_fields (self , request , obj = None ):
63
65
if ALLOW_EDITS :
@@ -67,6 +69,25 @@ def get_readonly_fields(self, request, obj=None):
67
69
field .name for field in self .opts .local_fields
68
70
})
69
71
72
+ def terminate_task (self , request , queryset ):
73
+ """Terminate selected tasks."""
74
+ task_ids = list (queryset .values_list ('task_id' , flat = True ))
75
+ try :
76
+ celery_app .control .terminate (task_ids )
77
+ self .message_user (
78
+ request ,
79
+ f"{ len (task_ids )} Task was terminated successfully." ,
80
+ messages .SUCCESS ,
81
+ )
82
+ except Exception as e :
83
+ self .message_user (
84
+ request ,
85
+ f"Error while terminating tasks: { e } " ,
86
+ messages .ERROR ,
87
+ )
88
+
89
+ terminate_task .short_description = "Terminate selected tasks"
90
+
70
91
71
92
admin .site .register (TaskResult , TaskResultAdmin )
72
93
0 commit comments