Skip to content

Commit 4ccb412

Browse files
export RQ statistics as prometheus metrics (#666)
* fix assertion error checking retry backoff * export RQ status as prometheus metrics fixes: #503 * move stats views to ``django_rq.stats_views`` module * move metrics_collector to contrib.prometheus & other minor suggestions
1 parent 1c23862 commit 4ccb412

File tree

12 files changed

+340
-33
lines changed

12 files changed

+340
-33
lines changed

.github/workflows/test.yml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,15 @@ jobs:
3737
3838
- name: Run Test
3939
run: |
40-
`which django-admin` test django_rq --settings=django_rq.tests.settings --pythonpath=.
40+
`which django-admin` test django_rq --settings=django_rq.tests.settings --pythonpath=. -v2
41+
42+
- name: Install optional dependencies
43+
run: |
44+
pip install prometheus_client
45+
46+
- name: Run Test with optional dependencies
47+
run: |
48+
`which django-admin` test django_rq --settings=django_rq.tests.settings --pythonpath=. -v2
4149
4250
mypy:
4351
runs-on: ubuntu-latest

README.rst

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,33 @@ Additionally, these statistics are also accessible from the command line.
368368
369369
.. image:: demo-django-rq-cli-dashboard.gif
370370

371+
Configuring Prometheus
372+
----------------------
373+
374+
``django_rq`` also provides a Prometheus compatible view, which can be enabled
375+
by installing ``prometheus_client`` or installing the extra "prometheus-metrics"
376+
(``pip install django-rq[prometheus]``). The metrics are exposed at
377+
``/django-rq/metrics/`` and the following is an example of the metrics that
378+
are exported::
379+
380+
# HELP rq_workers RQ workers
381+
# TYPE rq_workers gauge
382+
# HELP rq_job_successful_total RQ successful job count
383+
# TYPE rq_job_successful_total counter
384+
# HELP rq_job_failed_total RQ failed job count
385+
# TYPE rq_job_failed_total counter
386+
# HELP rq_working_seconds_total RQ total working time
387+
# TYPE rq_working_seconds_total counter
388+
# HELP rq_jobs RQ jobs by status
389+
# TYPE rq_jobs gauge
390+
rq_jobs{queue="default",status="queued"} 0.0
391+
rq_jobs{queue="default",status="started"} 0.0
392+
rq_jobs{queue="default",status="finished"} 0.0
393+
rq_jobs{queue="default",status="failed"} 0.0
394+
rq_jobs{queue="default",status="deferred"} 0.0
395+
rq_jobs{queue="default",status="scheduled"} 0.0
396+
397+
371398
Configuring Sentry
372399
-------------------
373400
Sentry

django_rq/admin.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from django.http.request import HttpRequest
55
from django.http.response import HttpResponse
66

7-
from . import views, settings, models
7+
from . import settings, stats_views, models
88

99

1010
class QueueAdmin(admin.ModelAdmin):
@@ -32,7 +32,7 @@ def has_module_permission(self, request: HttpRequest):
3232
def changelist_view(self, request: HttpRequest, extra_context: Optional[Dict[str, Any]] = None) -> HttpResponse:
3333
"""The 'change list' admin view for this model."""
3434
# proxy request to stats view
35-
return views.stats(request)
35+
return stats_views.stats(request)
3636

3737

3838
if settings.SHOW_ADMIN_LINK:

django_rq/contrib/prometheus.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
from rq.job import JobStatus
2+
3+
from ..queues import filter_connection_params, get_connection, get_queue, get_unique_connection_configs
4+
from ..workers import get_worker_class
5+
6+
try:
7+
from prometheus_client import Summary
8+
from prometheus_client.core import GaugeMetricFamily, CounterMetricFamily
9+
10+
class RQCollector:
11+
"""RQ stats collector"""
12+
13+
summary = Summary('rq_request_processing_seconds_total', 'Time spent collecting RQ data')
14+
15+
def collect(self):
16+
from ..settings import QUEUES
17+
18+
with self.summary.time():
19+
rq_workers = GaugeMetricFamily('rq_workers', 'RQ workers', labels=['name', 'state', 'queues'])
20+
rq_job_successful_total = CounterMetricFamily('rq_job_successful_total', 'RQ successful job count', labels=['name', 'queues'])
21+
rq_job_failed_total = CounterMetricFamily('rq_job_failed_total', 'RQ failed job count', labels=['name', 'queues'])
22+
rq_working_seconds_total = CounterMetricFamily('rq_working_seconds_total', 'RQ total working time', labels=['name', 'queues'])
23+
24+
rq_jobs = GaugeMetricFamily('rq_jobs', 'RQ jobs by status', labels=['queue', 'status'])
25+
26+
worker_class = get_worker_class()
27+
unique_configs = get_unique_connection_configs()
28+
connections = {}
29+
for queue_name, config in QUEUES.items():
30+
index = unique_configs.index(filter_connection_params(config))
31+
if index not in connections:
32+
connections[index] = connection = get_connection(queue_name)
33+
34+
for worker in worker_class.all(connection):
35+
name = worker.name
36+
label_queues = ','.join(worker.queue_names())
37+
rq_workers.add_metric([name, worker.get_state(), label_queues], 1)
38+
rq_job_successful_total.add_metric([name, label_queues], worker.successful_job_count)
39+
rq_job_failed_total.add_metric([name, label_queues], worker.failed_job_count)
40+
rq_working_seconds_total.add_metric([name, label_queues], worker.total_working_time)
41+
else:
42+
connection = connections[index]
43+
44+
queue = get_queue(queue_name, connection=connection)
45+
rq_jobs.add_metric([queue_name, JobStatus.QUEUED], queue.count)
46+
rq_jobs.add_metric([queue_name, JobStatus.STARTED], queue.started_job_registry.count)
47+
rq_jobs.add_metric([queue_name, JobStatus.FINISHED], queue.finished_job_registry.count)
48+
rq_jobs.add_metric([queue_name, JobStatus.FAILED], queue.failed_job_registry.count)
49+
rq_jobs.add_metric([queue_name, JobStatus.DEFERRED], queue.deferred_job_registry.count)
50+
rq_jobs.add_metric([queue_name, JobStatus.SCHEDULED], queue.scheduled_job_registry.count)
51+
52+
yield rq_workers
53+
yield rq_job_successful_total
54+
yield rq_job_failed_total
55+
yield rq_working_seconds_total
56+
yield rq_jobs
57+
58+
except ImportError:
59+
RQCollector = None # type: ignore[assignment, misc]

django_rq/stats_views.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
from django.contrib import admin
2+
from django.contrib.admin.views.decorators import staff_member_required
3+
from django.http import Http404, HttpResponse, JsonResponse
4+
from django.shortcuts import render
5+
from django.views.decorators.cache import never_cache
6+
7+
from .settings import API_TOKEN
8+
from .utils import get_scheduler_statistics, get_statistics
9+
10+
try:
11+
import prometheus_client
12+
13+
from .contrib.prometheus import RQCollector
14+
except ImportError:
15+
prometheus_client = RQCollector = None # type: ignore[assignment, misc]
16+
17+
registry = None
18+
19+
20+
@never_cache
21+
@staff_member_required
22+
def prometheus_metrics(request):
23+
global registry
24+
25+
if not RQCollector: # type: ignore[truthy-function]
26+
raise Http404('prometheus_client has not been installed; install using extra "django-rq[prometheus]"')
27+
28+
if not registry:
29+
registry = prometheus_client.CollectorRegistry(auto_describe=True)
30+
registry.register(RQCollector())
31+
32+
encoder, content_type = prometheus_client.exposition.choose_encoder(request.META.get('HTTP_ACCEPT', ''))
33+
if 'name[]' in request.GET:
34+
registry = registry.restricted_registry(request.GET.getlist('name[]'))
35+
36+
return HttpResponse(encoder(registry), headers={'Content-Type': content_type})
37+
38+
39+
@never_cache
40+
@staff_member_required
41+
def stats(request):
42+
context_data = {
43+
**admin.site.each_context(request),
44+
**get_statistics(run_maintenance_tasks=True),
45+
**get_scheduler_statistics(),
46+
}
47+
return render(request, 'django_rq/stats.html', context_data)
48+
49+
50+
def stats_json(request, token=None):
51+
if request.user.is_staff or (token and token == API_TOKEN):
52+
return JsonResponse(get_statistics())
53+
54+
return JsonResponse(
55+
{"error": True, "description": "Please configure API_TOKEN in settings.py before accessing this view."}
56+
)
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
import os
2+
from unittest import skipIf
3+
from unittest.mock import patch
4+
5+
from django.contrib.auth.models import User
6+
from django.test import TestCase, override_settings
7+
from django.test.client import Client
8+
from django.urls import NoReverseMatch, reverse
9+
10+
from django_rq import get_queue
11+
from django_rq.workers import get_worker
12+
13+
from .fixtures import access_self, failing_job
14+
15+
try:
16+
import prometheus_client
17+
except ImportError:
18+
prometheus_client = None
19+
20+
RQ_QUEUES = {
21+
'default': {
22+
'HOST': os.environ.get('REDIS_HOST', 'localhost'),
23+
'PORT': 6379,
24+
'DB': 0,
25+
},
26+
}
27+
28+
29+
@skipIf(prometheus_client is None, 'prometheus_client is required')
30+
@override_settings(RQ={'AUTOCOMMIT': True})
31+
class PrometheusTest(TestCase):
32+
def setUp(self):
33+
self.user = User.objects.create_user('foo', password='pass')
34+
self.user.is_staff = True
35+
self.user.is_active = True
36+
self.user.save()
37+
self.client = Client()
38+
self.client.force_login(self.user)
39+
get_queue('default').connection.flushall()
40+
41+
def assertMetricsContain(self, lines):
42+
response = self.client.get(reverse('rq_metrics'))
43+
self.assertEqual(response.status_code, 200)
44+
self.assertLessEqual(
45+
lines, set(response.content.decode('utf-8').splitlines())
46+
)
47+
48+
@patch('django_rq.settings.QUEUES', RQ_QUEUES)
49+
def test_metrics_default(self):
50+
self.assertMetricsContain(
51+
{
52+
'# HELP rq_jobs RQ jobs by status',
53+
'rq_jobs{queue="default",status="queued"} 0.0',
54+
'rq_jobs{queue="default",status="started"} 0.0',
55+
'rq_jobs{queue="default",status="finished"} 0.0',
56+
'rq_jobs{queue="default",status="failed"} 0.0',
57+
'rq_jobs{queue="default",status="deferred"} 0.0',
58+
'rq_jobs{queue="default",status="scheduled"} 0.0',
59+
}
60+
)
61+
62+
@patch('django_rq.settings.QUEUES', RQ_QUEUES)
63+
def test_metrics_with_jobs(self):
64+
queue = get_queue('default')
65+
queue.enqueue(failing_job)
66+
67+
for _ in range(10):
68+
queue.enqueue(access_self)
69+
70+
worker = get_worker('default', name='test_worker')
71+
worker.register_birth()
72+
73+
# override worker registration to effectively simulate non burst mode
74+
register_death = worker.register_death
75+
worker.register_birth = worker.register_death = lambda: None # type: ignore[method-assign]
76+
77+
try:
78+
self.assertMetricsContain(
79+
{
80+
# job information
81+
'# HELP rq_jobs RQ jobs by status',
82+
'rq_jobs{queue="default",status="queued"} 11.0',
83+
'rq_jobs{queue="default",status="started"} 0.0',
84+
'rq_jobs{queue="default",status="finished"} 0.0',
85+
'rq_jobs{queue="default",status="failed"} 0.0',
86+
'rq_jobs{queue="default",status="deferred"} 0.0',
87+
'rq_jobs{queue="default",status="scheduled"} 0.0',
88+
# worker information
89+
'# HELP rq_workers RQ workers',
90+
'rq_workers{name="test_worker",queues="default",state="?"} 1.0',
91+
'# HELP rq_job_successful_total RQ successful job count',
92+
'rq_job_successful_total{name="test_worker",queues="default"} 0.0',
93+
'# HELP rq_job_failed_total RQ failed job count',
94+
'rq_job_failed_total{name="test_worker",queues="default"} 0.0',
95+
'# HELP rq_working_seconds_total RQ total working time',
96+
'rq_working_seconds_total{name="test_worker",queues="default"} 0.0',
97+
}
98+
)
99+
100+
worker.work(burst=True, max_jobs=4)
101+
self.assertMetricsContain(
102+
{
103+
# job information
104+
'rq_jobs{queue="default",status="queued"} 7.0',
105+
'rq_jobs{queue="default",status="finished"} 3.0',
106+
'rq_jobs{queue="default",status="failed"} 1.0',
107+
# worker information
108+
'rq_workers{name="test_worker",queues="default",state="idle"} 1.0',
109+
'rq_job_successful_total{name="test_worker",queues="default"} 3.0',
110+
'rq_job_failed_total{name="test_worker",queues="default"} 1.0',
111+
}
112+
)
113+
114+
worker.work(burst=True)
115+
self.assertMetricsContain(
116+
{
117+
# job information
118+
'rq_jobs{queue="default",status="queued"} 0.0',
119+
'rq_jobs{queue="default",status="finished"} 10.0',
120+
'rq_jobs{queue="default",status="failed"} 1.0',
121+
# worker information
122+
'rq_workers{name="test_worker",queues="default",state="idle"} 1.0',
123+
'rq_job_successful_total{name="test_worker",queues="default"} 10.0',
124+
'rq_job_failed_total{name="test_worker",queues="default"} 1.0',
125+
}
126+
)
127+
finally:
128+
register_death()
129+
130+
131+
@skipIf(prometheus_client is not None, 'prometheus_client is installed')
132+
class NoPrometheusTest(TestCase):
133+
def test_no_metrics_without_prometheus_client(self):
134+
with self.assertRaises(NoReverseMatch):
135+
reverse('rq_metrics')

django_rq/tests/test_views.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,7 @@ def test_statistics_json_view(self):
367367

368368
# With token,
369369
token = '12345abcde'
370-
with patch('django_rq.views.API_TOKEN', new_callable=PropertyMock(return_value=token)):
370+
with patch('django_rq.stats_views.API_TOKEN', new_callable=PropertyMock(return_value=token)):
371371
response = self.client.get(reverse('rq_home_json', args=[token]))
372372
self.assertEqual(response.status_code, 200)
373373
self.assertIn("name", response.content.decode('utf-8'))

django_rq/tests/utils.py

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,24 @@
1+
from typing import Any, Dict
2+
from unittest.mock import patch
3+
14
from django_rq.queues import get_connection, get_queue_by_index
25

6+
try:
7+
from redis.backoff import ExponentialWithJitterBackoff, NoBackoff # type: ignore[attr-defined]
8+
from redis.retry import Retry
9+
except ImportError:
10+
ExponentialWithJitterBackoff = None
11+
Retry = None # type: ignore[misc, assignment]
12+
13+
14+
def _is_buggy_retry(kwargs: Dict[str, Any]) -> bool:
15+
return (
16+
Retry is not None
17+
and (retry := kwargs.get('retry')) is not None
18+
and isinstance(retry, Retry)
19+
and isinstance(retry._backoff, ExponentialWithJitterBackoff) # type: ignore[attr-defined]
20+
)
21+
322

423
def get_queue_index(name='default'):
524
"""
@@ -15,7 +34,22 @@ def get_queue_index(name='default'):
1534
continue
1635
if q.name == name:
1736
# assert that the connection is correct
18-
assert q.connection.connection_pool.connection_kwargs == connection_kwargs
37+
pool_kwargs = q.connection.connection_pool.connection_kwargs
38+
if not _is_buggy_retry(pool_kwargs) or not _is_buggy_retry(connection_kwargs):
39+
assert pool_kwargs == connection_kwargs
40+
else:
41+
# patch the retry backoff since there is a bug in the default
42+
# backoff strategy
43+
#
44+
# fixed in https://github.com/redis/redis-py/pull/3668
45+
with patch.object(
46+
pool_kwargs['retry'], '_backoff', NoBackoff()
47+
), patch.object(
48+
connection_kwargs['retry'], '_backoff', NoBackoff()
49+
):
50+
assert pool_kwargs == connection_kwargs
51+
52+
assert pool_kwargs['retry']._backoff.__dict__ == connection_kwargs['retry']._backoff.__dict__
1953

2054
return i
2155

django_rq/urls.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
11
from django.urls import re_path
22

3-
from . import views
3+
from . import stats_views, views
4+
from .contrib.prometheus import RQCollector
5+
6+
metrics_view = [
7+
re_path(r'^metrics/?$', stats_views.prometheus_metrics, name='rq_metrics'),
8+
] if RQCollector else [] # type: ignore[truthy-function]
49

510
urlpatterns = [
6-
re_path(r'^$', views.stats, name='rq_home'),
7-
re_path(r'^stats.json/(?P<token>[\w]+)?/?$', views.stats_json, name='rq_home_json'),
11+
re_path(r'^$', stats_views.stats, name='rq_home'),
12+
re_path(r'^stats.json/(?P<token>[\w]+)?/?$', stats_views.stats_json, name='rq_home_json'),
13+
*metrics_view,
814
re_path(r'^queues/(?P<queue_index>[\d]+)/$', views.jobs, name='rq_jobs'),
915
re_path(r'^workers/(?P<queue_index>[\d]+)/$', views.workers, name='rq_workers'),
1016
re_path(r'^workers/(?P<queue_index>[\d]+)/(?P<key>[-\w\.\:\$]+)/$', views.worker_details, name='rq_worker_details'),

0 commit comments

Comments
 (0)