Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,19 @@ jobs:
strategy:
matrix:
include:
- python-version: '3.10'
- python-version: '3.11'
- python-version: '3.12'
- python-version: '3.13'
- python-version: '3.13'
install-extra: '[sentry]'
# With the Redis client
- { python-version: '3.10', install-extra: '[redis]' }
- { python-version: '3.11', install-extra: '[redis]' }
- { python-version: '3.12', install-extra: '[redis]' }
- { python-version: '3.13', install-extra: '[redis]' }
- { python-version: '3.13', install-extra: '[redis,sentry]'}
# With the Valkey server
- { python-version: '3.13', install-extra: '[redis]', redis-image: 'valkey/valkey:8'}
# With the Valkey client and Valkey server
- { python-version: '3.13', install-extra: '[valkey,sentry]', redis-image: 'valkey/valkey:8'}
services:
redis:
image: redis
image: ${{ matrix.redis-image || 'redis' }}
ports:
- 6379:6379
env:
Expand Down
20 changes: 13 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,24 +1,30 @@
# minique /miniːk/

A minimal Redis 4.0+ job queue for Python 3.10 and above.
A minimal Redis 4.0+/Valkey job queue for Python 3.10 and above.

## Requirements

### Client

* Python 3.10+
* Redis 4.0+
* Either redis-py 2.10+, or valkey 6.0

### Server

* A Redis 4.0+ or Valkey server

## Usage

- Have a Redis 4.0+ server running.
- Have a Redis 4.0+ or Valkey server running.

### Client

```python
from redis import StrictRedis
from redis import Redis # (or from valkey import Valkey as Redis)
from minique.api import enqueue, get_job

# Get a Redis connection, somehow.
redis = StrictRedis.from_url('redis://localhost:6379/4')
redis = Redis.from_url('redis://localhost:6379/4')

job = enqueue(
redis=redis,
Expand Down Expand Up @@ -62,11 +68,11 @@ processed.
Priority queue requires Lua scripting permissions from the Redis queue service.

```python
from redis import StrictRedis
from redis import Redis # (or from valkey import Valkey as Redis)
from minique.api import enqueue_priority, get_job

# Get a Redis connection, somehow.
redis = StrictRedis.from_url('redis://localhost:6379/4')
redis = Redis.from_url('redis://localhost:6379/4')

job = enqueue_priority(
redis=redis,
Expand Down
14 changes: 7 additions & 7 deletions minique/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
from minique.utils import get_random_pronounceable_string

if TYPE_CHECKING:
from redis import Redis
from minique.types import RedisClient


def enqueue(
redis: Redis[bytes],
redis: RedisClient,
queue_name: str,
callable: Callable[..., Any] | str,
kwargs: dict[str, Any] | None = None,
Expand Down Expand Up @@ -55,7 +55,7 @@ def enqueue(


def enqueue_priority(
redis: Redis[bytes],
redis: RedisClient,
queue_name: str,
callable: Callable[..., Any] | str,
kwargs: dict[str, Any] | None = None,
Expand Down Expand Up @@ -96,7 +96,7 @@ def enqueue_priority(


def store(
redis: Redis[bytes],
redis: RedisClient,
callable: Callable[..., Any] | str,
kwargs: dict[str, Any] | None = None,
job_id: str | None = None,
Expand Down Expand Up @@ -129,7 +129,7 @@ def store(


def get_job(
redis: Redis[bytes],
redis: RedisClient,
job_id: str,
) -> Job:
job = Job(redis, job_id)
Expand All @@ -138,7 +138,7 @@ def get_job(


def cancel_job(
redis: Redis[bytes],
redis: RedisClient,
job_id: str,
expire_time: int | None = None,
) -> bool:
Expand Down Expand Up @@ -173,7 +173,7 @@ def cancel_job(

def _define_and_store_job(
*,
redis: Redis[bytes],
redis: RedisClient,
callable: Callable[..., Any] | str,
kwargs: dict[str, Any] | None = None,
job_id: str | None = None,
Expand Down
19 changes: 16 additions & 3 deletions minique/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

import argparse
import logging

from redis import StrictRedis
from typing import TYPE_CHECKING

from minique.compat import sentry_sdk
from minique.work.worker import Worker

if TYPE_CHECKING:
from minique.types import RedisClient


def get_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser()
Expand All @@ -18,11 +20,22 @@ def get_parser() -> argparse.ArgumentParser:
return parser


def get_redis(url: str) -> RedisClient:
try:
from valkey import Valkey

return Valkey.from_url(url) # type: ignore[return-value]
except ImportError:
from redis import Redis

return Redis.from_url(url)


def main(argv: list[str] | None = None) -> None:
parser = get_parser()
args = parser.parse_args(argv)
logging.basicConfig(datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
redis = StrictRedis.from_url(args.redis_url)
redis = get_redis(args.redis_url)
worker = Worker.for_queue_names(redis=redis, queue_names=args.queues)
worker.allowed_callable_patterns = set(args.allow_callable)
worker.log.info("Worker initialized")
Expand Down
5 changes: 2 additions & 3 deletions minique/models/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,17 @@
from minique.utils import cached_property

if TYPE_CHECKING:
from redis import Redis

from minique.models.priority_queue import PriorityQueue
from minique.models.queue import Queue
from minique.types import RedisClient


class Job:
# Used for testing:
replacement_callable: Callable[..., Any] | None = None
replacement_kwargs: dict[str, Any] | None = None

def __init__(self, redis: Redis[bytes], id: str) -> None:
def __init__(self, redis: RedisClient, id: str) -> None:
self.redis = redis
self.id = str(id)

Expand Down
5 changes: 2 additions & 3 deletions minique/models/priority_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@
from minique.models.queue import Queue

if TYPE_CHECKING:
from redis import Redis

from minique.models.job import Job
from minique.types import RedisClient


ADD_JOB_SCRIPT = """
Expand Down Expand Up @@ -100,7 +99,7 @@ class PriorityQueue(Queue):
remove stale keys from the priority lookup hash.
"""

def __init__(self, redis: Redis[bytes], name: str):
def __init__(self, redis: RedisClient, name: str):
super().__init__(redis, name)
self.add_job_script = redis.register_script(ADD_JOB_SCRIPT)
self.hash_clean_script = redis.register_script(PRIO_HASH_CLEANER_SCRIPT)
Expand Down
5 changes: 2 additions & 3 deletions minique/models/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@
from minique.utils.redis_list import read_list

if TYPE_CHECKING:
from redis import Redis

from minique.models.job import Job
from minique.types import RedisClient


class Queue:
def __init__(self, redis: Redis[bytes], name: str) -> None:
def __init__(self, redis: RedisClient, name: str) -> None:
self.redis = redis
self.name = str(name)

Expand Down
13 changes: 12 additions & 1 deletion minique/types.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,20 @@
from __future__ import annotations

from types import TracebackType
from typing import Any, TypeAlias
from typing import TYPE_CHECKING, Any, TypeAlias

ExcInfo: TypeAlias = (
tuple[type[BaseException], BaseException, TracebackType] | tuple[None, None, None]
)
ContextDict: TypeAlias = dict[str, Any]

if TYPE_CHECKING:
import redis

RedisClient: TypeAlias = redis.Redis[bytes]

# TODO: the types for `valkey` are suboptimal (since they are so in the original
# pre-fork `redis` package), so adding `| valkey.Valkey` to the above type alias
# practically breaks everything. The stub types from `types-redis` are OK, but
# naturally won't apply to `valkey`.
# See https://github.com/valkey-io/valkey-py/issues/164.
4 changes: 2 additions & 2 deletions minique/utils/redis_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from redis import Redis
from minique.types import RedisClient


def read_list(
redis_conn: Redis[bytes],
redis_conn: RedisClient,
key: str,
*,
chunk_size: int = 4096,
Expand Down
4 changes: 1 addition & 3 deletions minique/work/job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
from collections.abc import Callable
from typing import TYPE_CHECKING, Any

from redis import Redis

from minique.enums import JobStatus
from minique.excs import AlreadyAcquired, AlreadyResulted, InvalidJob
from minique.models.job import Job
Expand All @@ -27,7 +25,7 @@ def __init__(self, worker: Worker, job: Job) -> None:
self.worker = worker
self.job = job
self.redis = job.redis
assert isinstance(self.redis, Redis)
assert hasattr(self.redis, "hset") # Smells redisy enough for us.
self.log: logging.Logger = logging.getLogger(
f"{__name__}.{str(self.job.id).replace('.', '_')}"
)
Expand Down
8 changes: 3 additions & 5 deletions minique/work/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@
from minique.work.job_runner import JobRunner

if TYPE_CHECKING:
from redis import Redis

from minique.types import ContextDict, ExcInfo
from minique.types import ContextDict, ExcInfo, RedisClient


class Worker:
Expand All @@ -27,7 +25,7 @@ class Worker:

def __init__(
self,
redis: Redis[bytes],
redis: RedisClient,
queues: list[Queue],
) -> None:
self.id = self.compute_id()
Expand All @@ -39,7 +37,7 @@ def __init__(
@classmethod
def for_queue_names(
cls,
redis: Redis[bytes],
redis: RedisClient,
queue_names: list[str] | str,
**kwargs: Any,
) -> Worker:
Expand Down
11 changes: 8 additions & 3 deletions minique_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@

import logging
import os
from typing import TYPE_CHECKING
from unittest.mock import Mock

import pytest
from redis import Redis

from minique.compat import sentry_sdk
from minique.utils import get_random_pronounceable_string

if TYPE_CHECKING:
from minique.types import RedisClient


def pytest_configure() -> None:
logging.basicConfig(datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
Expand All @@ -24,8 +27,10 @@ def redis_url() -> str:


@pytest.fixture()
def redis(redis_url: str) -> Redis:
return Redis.from_url(redis_url)
def redis(redis_url: str) -> RedisClient:
from minique.cli import get_redis

return get_redis(redis_url)


@pytest.fixture()
Expand Down
12 changes: 9 additions & 3 deletions minique_tests/test_customization.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
from __future__ import annotations

from typing import TYPE_CHECKING

import pytest
from redis import Redis

from minique.api import enqueue
from minique.encoding import JSONEncoding, register_encoding
from minique.testing import run_synchronously
from minique.work.job_runner import JobRunner
from minique_tests.worker import TestWorker

if TYPE_CHECKING:
from minique.types import RedisClient


def special_dump(o):
if isinstance(o, set):
Expand Down Expand Up @@ -40,7 +46,7 @@ class HonkWorker(TestWorker):

@pytest.mark.parametrize("problem", (False, True))
def test_job_runner_override(
redis: Redis, random_queue_name: str, capsys, problem: bool
redis: RedisClient, random_queue_name: str, capsys, problem: bool
):
args = {"a": "err", "b": -8} if problem else {"a": 10, "b": 15}
job = enqueue(
Expand All @@ -56,7 +62,7 @@ def test_job_runner_override(
assert ("Alarmed honk!" in output) == problem


def test_custom_encoding(redis: Redis, random_queue_name: str):
def test_custom_encoding(redis: RedisClient, random_queue_name: str):
job = enqueue(
redis,
random_queue_name,
Expand Down
Loading