Skip to content

Commit 8d843a0

Browse files
jshimkus-rhmkanoorDostonbek1
authored
AAP-27923: implement EDA HA Cluster Redis support. (#1000)
Co-authored-by: Madhu Kanoor <mkanoor@redhat.com> Co-authored-by: Doston <31990136+Dostonbek1@users.noreply.github.com>
1 parent ca215ca commit 8d843a0

File tree

11 files changed

+259
-48
lines changed

11 files changed

+259
-48
lines changed

poetry.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/aap_eda/core/tasking/__init__.py

Lines changed: 181 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,19 @@
44
import logging
55
from datetime import datetime, timedelta
66
from types import MethodType
7-
from typing import Any, Callable, Iterable, Optional, Protocol, Type, Union
7+
from typing import (
8+
Any,
9+
Callable,
10+
Iterable,
11+
List,
12+
Optional,
13+
Protocol,
14+
Type,
15+
Union,
16+
)
817

918
import redis
1019
import rq
11-
import rq_scheduler
1220
from ansible_base.lib.redis.client import (
1321
DABRedis,
1422
DABRedisCluster,
@@ -24,7 +32,9 @@
2432
DEFAULT_WORKER_TTL,
2533
)
2634
from rq.job import Job as _Job, JobStatus
35+
from rq.registry import StartedJobRegistry
2736
from rq.serializers import JSONSerializer
37+
from rq_scheduler import Scheduler as _Scheduler
2838

2939
from aap_eda.settings import default
3040

@@ -73,9 +83,47 @@ def get_redis_client(**kwargs):
7383
DAB will return an appropriate client for HA based on the passed
7484
parameters.
7585
"""
86+
# HA cluster does not support an alternate redis db and will generate an
87+
# exception if we pass a value (even the default). If we're in that
88+
# situation we drop the db and, if the db is anything other than the
89+
# default log an informational message.
90+
db = kwargs.get("db", None)
91+
if (db is not None) and (kwargs.get("mode", "") == "cluster"):
92+
del kwargs["db"]
93+
if db != default.DEFAULT_REDIS_DB:
94+
logger.info(
95+
f"clustered redis supports only the default db"
96+
f"; db specified: {db}"
97+
)
98+
7699
return _get_redis_client(_create_url_from_parameters(**kwargs), **kwargs)
77100

78101

102+
class Scheduler(_Scheduler):
103+
"""Custom scheduler class."""
104+
105+
def __init__(
106+
self,
107+
queue_name="default",
108+
queue=None,
109+
interval=60,
110+
connection=None,
111+
job_class=None,
112+
queue_class=None,
113+
name=None,
114+
):
115+
connection = _get_necessary_client_connection(connection)
116+
super().__init__(
117+
queue_name=queue_name,
118+
queue=queue,
119+
interval=interval,
120+
connection=connection,
121+
job_class=job_class,
122+
queue_class=queue_class,
123+
name=name,
124+
)
125+
126+
79127
def enable_redis_prefix():
80128
redis_prefix = settings.RQ_REDIS_PREFIX
81129

@@ -102,16 +150,12 @@ def enable_redis_prefix():
102150
f"{redis_prefix}:canceled:{0}"
103151
)
104152

105-
rq_scheduler.Scheduler.redis_scheduler_namespace_prefix = (
153+
Scheduler.redis_scheduler_namespace_prefix = (
106154
f"{redis_prefix}:scheduler_instance:"
107155
)
108-
rq_scheduler.Scheduler.scheduler_key = f"{redis_prefix}:scheduler"
109-
rq_scheduler.Scheduler.scheduler_lock_key = (
110-
f"{redis_prefix}:scheduler_lock"
111-
)
112-
rq_scheduler.Scheduler.scheduled_jobs_key = (
113-
f"{redis_prefix}:scheduler:scheduled_jobs"
114-
)
156+
Scheduler.scheduler_key = f"{redis_prefix}:scheduler"
157+
Scheduler.scheduler_lock_key = f"{redis_prefix}:scheduler_lock"
158+
Scheduler.scheduled_jobs_key = f"{redis_prefix}:scheduler:scheduled_jobs"
115159

116160
def eda_get_key(job_id):
117161
return f"{redis_prefix}:results:{job_id}"
@@ -168,7 +212,7 @@ def __init__(
168212
super().__init__(
169213
name=name,
170214
default_timeout=default_timeout,
171-
connection=connection,
215+
connection=_get_necessary_client_connection(connection),
172216
is_async=is_async,
173217
job_class=job_class,
174218
serializer=serializer,
@@ -190,6 +234,7 @@ def __init__(
190234
):
191235
if serializer is None:
192236
serializer = JSONSerializer
237+
connection = _get_necessary_client_connection(connection)
193238

194239
super().__init__(id, connection, serializer)
195240

@@ -207,7 +252,130 @@ def _get_necessary_client_connection(connection: Connection) -> Connection:
207252
return connection
208253

209254

210-
class DefaultWorker(_Worker):
255+
class Worker(_Worker):
256+
"""Custom worker class.
257+
258+
Provides establishment of DAB Redis client and work arounds for various
259+
DABRedisCluster issues.
260+
"""
261+
262+
def __init__(
263+
self,
264+
queues: Iterable[Union[Queue, str]],
265+
name: Optional[str] = None,
266+
default_result_ttl: int = DEFAULT_RESULT_TTL,
267+
connection: Optional[Connection] = None,
268+
exc_handler: Any = None,
269+
exception_handlers: _ErrorHandlersArgType = None,
270+
default_worker_ttl: int = DEFAULT_WORKER_TTL,
271+
job_class: Type[_Job] = None,
272+
queue_class: Type[_Queue] = None,
273+
log_job_description: bool = True,
274+
job_monitoring_interval: int = DEFAULT_JOB_MONITORING_INTERVAL,
275+
disable_default_exception_handler: bool = False,
276+
prepare_for_work: bool = True,
277+
serializer: Optional[SerializerProtocol] = None,
278+
):
279+
connection = _get_necessary_client_connection(connection)
280+
super().__init__(
281+
queues=queues,
282+
name=name,
283+
default_result_ttl=default_result_ttl,
284+
connection=connection,
285+
exc_handler=exc_handler,
286+
exception_handlers=exception_handlers,
287+
default_worker_ttl=default_worker_ttl,
288+
job_class=job_class,
289+
queue_class=queue_class,
290+
log_job_description=log_job_description,
291+
job_monitoring_interval=job_monitoring_interval,
292+
disable_default_exception_handler=disable_default_exception_handler, # noqa: E501
293+
prepare_for_work=prepare_for_work,
294+
serializer=JSONSerializer,
295+
)
296+
297+
def _set_connection(
298+
self,
299+
connection: Union[DABRedis, DABRedisCluster],
300+
) -> Union[DABRedis, DABRedisCluster]:
301+
# A DABRedis connection doesn't need intervention.
302+
if isinstance(connection, DABRedis):
303+
return super()._set_connection(connection)
304+
305+
try:
306+
connection_pool = connection.connection_pool
307+
current_socket_timeout = connection_pool.connection_kwargs.get(
308+
"socket_timeout"
309+
)
310+
if current_socket_timeout is None:
311+
timeout_config = {"socket_timeout": self.connection_timeout}
312+
connection_pool.connection_kwargs.update(timeout_config)
313+
except AttributeError:
314+
nodes = connection.get_nodes()
315+
for node in nodes:
316+
connection_pool = node.redis_connection.connection_pool
317+
current_socket_timeout = connection_pool.connection_kwargs.get(
318+
"socket_timeout"
319+
)
320+
if current_socket_timeout is None:
321+
timeout_config = {
322+
"socket_timeout": self.connection_timeout
323+
}
324+
connection_pool.connection_kwargs.update(timeout_config)
325+
return connection
326+
327+
@classmethod
328+
def all(
329+
cls,
330+
connection: Optional[Union[DABRedis, DABRedisCluster]] = None,
331+
job_class: Optional[Type[Job]] = None,
332+
queue_class: Optional[Type[Queue]] = None,
333+
queue: Optional[Queue] = None,
334+
serializer=None,
335+
) -> List[Worker]:
336+
# If we don't have a queue (whose connection would be used) make
337+
# certain that we have an appropriate connection and pass it
338+
# to the superclass.
339+
if queue is None:
340+
connection = _get_necessary_client_connection(connection)
341+
return super().all(
342+
connection,
343+
job_class,
344+
queue_class,
345+
queue,
346+
serializer,
347+
)
348+
349+
def handle_job_success(
350+
self, job: Job, queue: Queue, started_job_registry: StartedJobRegistry
351+
):
352+
# A DABRedis connection doesn't need intervention.
353+
if isinstance(self.connection, DABRedis):
354+
return super().handle_job_success(job, queue, started_job_registry)
355+
356+
# For DABRedisCluster perform success handling.
357+
# DABRedisCluster doesn't provide the watch, multi, etc. methods
358+
# necessary for the superclass implementation, but we don't need
359+
# them as there's no dependencies in how we use the jobs.
360+
with self.connection.pipeline() as pipeline:
361+
self.set_current_job_id(None, pipeline=pipeline)
362+
self.increment_successful_job_count(pipeline=pipeline)
363+
self.increment_total_working_time(
364+
job.ended_at - job.started_at,
365+
pipeline,
366+
)
367+
368+
result_ttl = job.get_result_ttl(self.default_result_ttl)
369+
if result_ttl != 0:
370+
job._handle_success(result_ttl, pipeline=pipeline)
371+
372+
job.cleanup(result_ttl, pipeline=pipeline, remove_from_queue=False)
373+
started_job_registry.remove(job, pipeline=pipeline)
374+
375+
pipeline.execute()
376+
377+
378+
class DefaultWorker(Worker):
211379
"""Custom default worker class used for non-activation tasks.
212380
213381
Uses JSONSerializer as a default one.
@@ -234,7 +402,6 @@ def __init__(
234402
job_class = Job
235403
if queue_class is None:
236404
queue_class = Queue
237-
connection = _get_necessary_client_connection(connection)
238405

239406
super().__init__(
240407
queues=queues,
@@ -254,7 +421,7 @@ def __init__(
254421
)
255422

256423

257-
class ActivationWorker(_Worker):
424+
class ActivationWorker(Worker):
258425
"""Custom worker class used for activation related tasks.
259426
260427
Uses JSONSerializer as a default one.
@@ -281,7 +448,6 @@ def __init__(
281448
job_class = Job
282449
if queue_class is None:
283450
queue_class = Queue
284-
connection = _get_necessary_client_connection(connection)
285451
queue_name = settings.RULEBOOK_QUEUE_NAME
286452

287453
super().__init__(

src/aap_eda/settings/default.py

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -358,8 +358,10 @@ def _get_databases_settings() -> dict:
358358
# TASKING SETTINGS
359359
# ---------------------------------------------------------
360360
RQ = {
361-
"QUEUE_CLASS": "aap_eda.core.tasking.Queue",
362361
"JOB_CLASS": "aap_eda.core.tasking.Job",
362+
"QUEUE_CLASS": "aap_eda.core.tasking.Queue",
363+
"SCHEDULER_CLASS": "aap_eda.core.tasking.Scheduler",
364+
"WORKER_CLASS": "aap_eda.core.tasking.Worker",
363365
}
364366

365367
REDIS_UNIX_SOCKET_PATH = settings.get("MQ_UNIX_SOCKET_PATH", None)
@@ -370,9 +372,18 @@ def _get_databases_settings() -> dict:
370372
REDIS_CLIENT_CACERT_PATH = settings.get("MQ_CLIENT_CACERT_PATH", None)
371373
REDIS_CLIENT_CERT_PATH = settings.get("MQ_CLIENT_CERT_PATH", None)
372374
REDIS_CLIENT_KEY_PATH = settings.get("MQ_CLIENT_KEY_PATH", None)
373-
REDIS_DB = settings.get("MQ_DB", 0)
375+
DEFAULT_REDIS_DB = 0
376+
REDIS_DB = settings.get("MQ_DB", DEFAULT_REDIS_DB)
374377
RQ_REDIS_PREFIX = settings.get("RQ_REDIS_PREFIX", "eda-rq")
375378

379+
# The HA cluster hosts is a string of <host>:<port>[,<host>:port>]+
380+
# and is exhaustive; i.e., not in addition to REDIS_HOST:REDIS_PORT.
381+
# EDA does not validate the content, but relies on DAB to do so.
382+
#
383+
# In establishing an HA Cluster Redis client connection DAB ignores
384+
# the host and port kwargs.
385+
REDIS_HA_CLUSTER_HOSTS = settings.get("MQ_REDIS_HA_CLUSTER_HOSTS", "").strip()
386+
376387

377388
def _rq_common_parameters():
378389
params = {
@@ -403,14 +414,25 @@ def _rq_redis_client_additional_parameters():
403414
return params
404415

405416

406-
def rq_redis_client_instantiation_parameters():
417+
def rq_standalone_redis_client_instantiation_parameters():
407418
params = _rq_common_parameters() | _rq_redis_client_additional_parameters()
408419

409420
# Convert to lowercase for use in instantiating a redis client.
410421
params = {k.lower(): v for (k, v) in params.items()}
411422
return params
412423

413424

425+
def rq_redis_client_instantiation_parameters():
426+
params = rq_standalone_redis_client_instantiation_parameters()
427+
428+
# Include the HA cluster parameters.
429+
if REDIS_HA_CLUSTER_HOSTS:
430+
params["mode"] = "cluster"
431+
params["redis_hosts"] = REDIS_HA_CLUSTER_HOSTS
432+
433+
return params
434+
435+
414436
# A list of queues to be used in multinode mode
415437
# If the list is empty, use the default singlenode queue name
416438
RULEBOOK_WORKER_QUEUES = settings.get("RULEBOOK_WORKER_QUEUES", [])

src/aap_eda/tasks/orchestrator.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
from django.conf import settings
2222
from django.core.exceptions import ObjectDoesNotExist
2323
from django_rq import get_queue
24-
from rq import Worker
2524

2625
import aap_eda.tasks.activation_request_queue as requests_queue
2726
from aap_eda.core import models
@@ -31,7 +30,7 @@
3130
ProcessParentType,
3231
)
3332
from aap_eda.core.models import Activation, ActivationRequestQueue, EventStream
34-
from aap_eda.core.tasking import unique_enqueue
33+
from aap_eda.core.tasking import Worker, unique_enqueue
3534
from aap_eda.services.activation import exceptions
3635
from aap_eda.services.activation.activation_manager import (
3736
ActivationManager,

0 commit comments

Comments
 (0)