Skip to content

AAP-27923: implement EDA HA Cluster Redis support. #1000

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

196 changes: 181 additions & 15 deletions src/aap_eda/core/tasking/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,19 @@
import logging
from datetime import datetime, timedelta
from types import MethodType
from typing import Any, Callable, Iterable, Optional, Protocol, Type, Union
from typing import (
Any,
Callable,
Iterable,
List,
Optional,
Protocol,
Type,
Union,
)

import redis
import rq
import rq_scheduler
from ansible_base.lib.redis.client import (
DABRedis,
DABRedisCluster,
Expand All @@ -24,7 +32,9 @@
DEFAULT_WORKER_TTL,
)
from rq.job import Job as _Job, JobStatus
from rq.registry import StartedJobRegistry
from rq.serializers import JSONSerializer
from rq_scheduler import Scheduler as _Scheduler

from aap_eda.settings import default

Expand Down Expand Up @@ -73,9 +83,47 @@ def get_redis_client(**kwargs):
DAB will return an appropriate client for HA based on the passed
parameters.
"""
# HA cluster does not support an alternate redis db and will generate an
# exception if we pass a value (even the default). If we're in that
# situation we drop the db and, if the db is anything other than the
# default log an informational message.
db = kwargs.get("db", None)
if (db is not None) and (kwargs.get("mode", "") == "cluster"):
del kwargs["db"]
if db != default.DEFAULT_REDIS_DB:
logger.info(
f"clustered redis supports only the default db"
f"; db specified: {db}"
)

return _get_redis_client(_create_url_from_parameters(**kwargs), **kwargs)


class Scheduler(_Scheduler):
"""Custom scheduler class."""

def __init__(
self,
queue_name="default",
queue=None,
interval=60,
connection=None,
job_class=None,
queue_class=None,
name=None,
):
connection = _get_necessary_client_connection(connection)
super().__init__(
queue_name=queue_name,
queue=queue,
interval=interval,
connection=connection,
job_class=job_class,
queue_class=queue_class,
name=name,
)


def enable_redis_prefix():
redis_prefix = settings.RQ_REDIS_PREFIX

Expand All @@ -102,16 +150,12 @@ def enable_redis_prefix():
f"{redis_prefix}:canceled:{0}"
)

rq_scheduler.Scheduler.redis_scheduler_namespace_prefix = (
Scheduler.redis_scheduler_namespace_prefix = (
f"{redis_prefix}:scheduler_instance:"
)
rq_scheduler.Scheduler.scheduler_key = f"{redis_prefix}:scheduler"
rq_scheduler.Scheduler.scheduler_lock_key = (
f"{redis_prefix}:scheduler_lock"
)
rq_scheduler.Scheduler.scheduled_jobs_key = (
f"{redis_prefix}:scheduler:scheduled_jobs"
)
Scheduler.scheduler_key = f"{redis_prefix}:scheduler"
Scheduler.scheduler_lock_key = f"{redis_prefix}:scheduler_lock"
Scheduler.scheduled_jobs_key = f"{redis_prefix}:scheduler:scheduled_jobs"

def eda_get_key(job_id):
return f"{redis_prefix}:results:{job_id}"
Expand Down Expand Up @@ -168,7 +212,7 @@ def __init__(
super().__init__(
name=name,
default_timeout=default_timeout,
connection=connection,
connection=_get_necessary_client_connection(connection),
is_async=is_async,
job_class=job_class,
serializer=serializer,
Expand All @@ -190,6 +234,7 @@ def __init__(
):
if serializer is None:
serializer = JSONSerializer
connection = _get_necessary_client_connection(connection)

super().__init__(id, connection, serializer)

Expand All @@ -207,7 +252,130 @@ def _get_necessary_client_connection(connection: Connection) -> Connection:
return connection


class DefaultWorker(_Worker):
class Worker(_Worker):
"""Custom worker class.

Provides establishment of DAB Redis client and work arounds for various
DABRedisCluster issues.
"""

def __init__(
self,
queues: Iterable[Union[Queue, str]],
name: Optional[str] = None,
default_result_ttl: int = DEFAULT_RESULT_TTL,
connection: Optional[Connection] = None,
exc_handler: Any = None,
exception_handlers: _ErrorHandlersArgType = None,
default_worker_ttl: int = DEFAULT_WORKER_TTL,
job_class: Type[_Job] = None,
queue_class: Type[_Queue] = None,
log_job_description: bool = True,
job_monitoring_interval: int = DEFAULT_JOB_MONITORING_INTERVAL,
disable_default_exception_handler: bool = False,
prepare_for_work: bool = True,
serializer: Optional[SerializerProtocol] = None,
):
connection = _get_necessary_client_connection(connection)
super().__init__(
queues=queues,
name=name,
default_result_ttl=default_result_ttl,
connection=connection,
exc_handler=exc_handler,
exception_handlers=exception_handlers,
default_worker_ttl=default_worker_ttl,
job_class=job_class,
queue_class=queue_class,
log_job_description=log_job_description,
job_monitoring_interval=job_monitoring_interval,
disable_default_exception_handler=disable_default_exception_handler, # noqa: E501
prepare_for_work=prepare_for_work,
serializer=JSONSerializer,
)

def _set_connection(
self,
connection: Union[DABRedis, DABRedisCluster],
) -> Union[DABRedis, DABRedisCluster]:
# A DABRedis connection doesn't need intervention.
if isinstance(connection, DABRedis):
return super()._set_connection(connection)

try:
connection_pool = connection.connection_pool
current_socket_timeout = connection_pool.connection_kwargs.get(
"socket_timeout"
)
if current_socket_timeout is None:
timeout_config = {"socket_timeout": self.connection_timeout}
connection_pool.connection_kwargs.update(timeout_config)
except AttributeError:
nodes = connection.get_nodes()
for node in nodes:
connection_pool = node.redis_connection.connection_pool
current_socket_timeout = connection_pool.connection_kwargs.get(
"socket_timeout"
)
if current_socket_timeout is None:
timeout_config = {
"socket_timeout": self.connection_timeout
}
connection_pool.connection_kwargs.update(timeout_config)
return connection

@classmethod
def all(
cls,
connection: Optional[Union[DABRedis, DABRedisCluster]] = None,
job_class: Optional[Type[Job]] = None,
queue_class: Optional[Type[Queue]] = None,
queue: Optional[Queue] = None,
serializer=None,
) -> List[Worker]:
# If we don't have a queue (whose connection would be used) make
# certain that we have an appropriate connection and pass it
# to the superclass.
if queue is None:
connection = _get_necessary_client_connection(connection)
return super().all(
connection,
job_class,
queue_class,
queue,
serializer,
)

def handle_job_success(
self, job: Job, queue: Queue, started_job_registry: StartedJobRegistry
):
# A DABRedis connection doesn't need intervention.
if isinstance(self.connection, DABRedis):
return super().handle_job_success(job, queue, started_job_registry)

# For DABRedisCluster perform success handling.
# DABRedisCluster doesn't provide the watch, multi, etc. methods
# necessary for the superclass implementation, but we don't need
# them as there's no dependencies in how we use the jobs.
with self.connection.pipeline() as pipeline:
self.set_current_job_id(None, pipeline=pipeline)
self.increment_successful_job_count(pipeline=pipeline)
self.increment_total_working_time(
job.ended_at - job.started_at,
pipeline,
)

result_ttl = job.get_result_ttl(self.default_result_ttl)
if result_ttl != 0:
job._handle_success(result_ttl, pipeline=pipeline)

job.cleanup(result_ttl, pipeline=pipeline, remove_from_queue=False)
started_job_registry.remove(job, pipeline=pipeline)

pipeline.execute()


class DefaultWorker(Worker):
"""Custom default worker class used for non-activation tasks.

Uses JSONSerializer as a default one.
Expand All @@ -234,7 +402,6 @@ def __init__(
job_class = Job
if queue_class is None:
queue_class = Queue
connection = _get_necessary_client_connection(connection)

super().__init__(
queues=queues,
Expand All @@ -254,7 +421,7 @@ def __init__(
)


class ActivationWorker(_Worker):
class ActivationWorker(Worker):
"""Custom worker class used for activation related tasks.

Uses JSONSerializer as a default one.
Expand All @@ -281,7 +448,6 @@ def __init__(
job_class = Job
if queue_class is None:
queue_class = Queue
connection = _get_necessary_client_connection(connection)
queue_name = settings.RULEBOOK_QUEUE_NAME

super().__init__(
Expand Down
28 changes: 25 additions & 3 deletions src/aap_eda/settings/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,10 @@ def _get_databases_settings() -> dict:
# TASKING SETTINGS
# ---------------------------------------------------------
RQ = {
"QUEUE_CLASS": "aap_eda.core.tasking.Queue",
"JOB_CLASS": "aap_eda.core.tasking.Job",
"QUEUE_CLASS": "aap_eda.core.tasking.Queue",
"SCHEDULER_CLASS": "aap_eda.core.tasking.Scheduler",
"WORKER_CLASS": "aap_eda.core.tasking.Worker",
}

REDIS_UNIX_SOCKET_PATH = settings.get("MQ_UNIX_SOCKET_PATH", None)
Expand All @@ -370,9 +372,18 @@ def _get_databases_settings() -> dict:
REDIS_CLIENT_CACERT_PATH = settings.get("MQ_CLIENT_CACERT_PATH", None)
REDIS_CLIENT_CERT_PATH = settings.get("MQ_CLIENT_CERT_PATH", None)
REDIS_CLIENT_KEY_PATH = settings.get("MQ_CLIENT_KEY_PATH", None)
REDIS_DB = settings.get("MQ_DB", 0)
DEFAULT_REDIS_DB = 0
REDIS_DB = settings.get("MQ_DB", DEFAULT_REDIS_DB)
RQ_REDIS_PREFIX = settings.get("RQ_REDIS_PREFIX", "eda-rq")

# The HA cluster hosts is a string of <host>:<port>[,<host>:port>]+
# and is exhaustive; i.e., not in addition to REDIS_HOST:REDIS_PORT.
# EDA does not validate the content, but relies on DAB to do so.
#
# In establishing an HA Cluster Redis client connection DAB ignores
# the host and port kwargs.
REDIS_HA_CLUSTER_HOSTS = settings.get("MQ_REDIS_HA_CLUSTER_HOSTS", "").strip()


def _rq_common_parameters():
params = {
Expand Down Expand Up @@ -403,14 +414,25 @@ def _rq_redis_client_additional_parameters():
return params


def rq_redis_client_instantiation_parameters():
def rq_standalone_redis_client_instantiation_parameters():
params = _rq_common_parameters() | _rq_redis_client_additional_parameters()

# Convert to lowercase for use in instantiating a redis client.
params = {k.lower(): v for (k, v) in params.items()}
return params


def rq_redis_client_instantiation_parameters():
params = rq_standalone_redis_client_instantiation_parameters()

# Include the HA cluster parameters.
if REDIS_HA_CLUSTER_HOSTS:
params["mode"] = "cluster"
params["redis_hosts"] = REDIS_HA_CLUSTER_HOSTS

return params


# A list of queues to be used in multinode mode
# If the list is empty, use the default singlenode queue name
RULEBOOK_WORKER_QUEUES = settings.get("RULEBOOK_WORKER_QUEUES", [])
Expand Down
3 changes: 1 addition & 2 deletions src/aap_eda/tasks/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist
from django_rq import get_queue
from rq import Worker

import aap_eda.tasks.activation_request_queue as requests_queue
from aap_eda.core import models
Expand All @@ -31,7 +30,7 @@
ProcessParentType,
)
from aap_eda.core.models import Activation, ActivationRequestQueue, EventStream
from aap_eda.core.tasking import unique_enqueue
from aap_eda.core.tasking import Worker, unique_enqueue
from aap_eda.services.activation import exceptions
from aap_eda.services.activation.activation_manager import (
ActivationManager,
Expand Down
Loading
Loading