A Python project which manages job queues using PostgreSQL as the source of truth.
There are two entrypoints required to run PDOFlow
. The first
requires decorating a function which work will be assigned to various
computing clusters. The other is actually running and maintaining an
active worker pool for each machine in the cluster.
The use case is very simple. Decorate a function and push work to that function.
# File mymodule/*/foo.py
from pdoflow import cluster
@cluster.job()
def some_unit_of_work(a: int, b: float, c: str) -> str:
result = a * b
with open(c, "wt") as fout:
fout.write(f"{result:.06f}")
# File mymodule/*/workload.py | doesn't necessarily need to be in another
# file
from pdoflow.registry import Registry
from mymodule.foo import some_unit_of_work
Registry[some_unit_of_work].post_work(
(
[1, 2.0, "just_twos.txt"],
[10, -100, "big_numbers.txt"],
),
[] # Work is being done through positional arguments, no keyword
# parameters are being used.
)
# Done from the client's point of view.
# You can also specify priorities for jobs
Registry[some_unit_of_work].post_work(
posargs=[
(1, 2.0, "urgent.txt"),
(2, 3.0, "normal.txt"),
(3, 4.0, "low_priority.txt"),
],
kwargs=[],
priority=[100, 0, -50] # Higher values = higher priority
)
# Or use a single priority for all jobs
Registry[some_unit_of_work].post_work(
posargs=[(i, float(i), f"file_{i}.txt") for i in range(10)],
kwargs=[],
priority=50 # All jobs get priority 50
)
You may instantiate clusters using the provided CLI.
pdoflow pool --max-workers 16
This will create a pool of a maximum of 16 concurrent python processes which will execute posted workloads.
PDOFlow provides several CLI commands for managing and monitoring jobs:
# Start a worker pool
pdoflow pool --max-workers 16 --batchsize 10
# Check posting status (use --show-jobs to see individual job priorities)
pdoflow posting-status <uuid> --show-jobs
# List all postings
pdoflow list-postings
# View priority statistics for waiting jobs
pdoflow priority-stats
# Manually execute a specific job
pdoflow execute-job <uuid>
# Set posting status
pdoflow set-posting-status <uuid> executing
Or you may run pool in a Python program:
from pdoflow import cluster
from time import sleep
worker_pool = cluster.ClusterPool(max_workers=16)
with worker_pool:
while True:
worker_pool.upkeep()
sleep(1.0)
Work is posted to a PostgreSQL database utilizing JSON serializable
fields for work parameters. The biggest draw back currently is the
inability to pass NaN
and inf
values.
While functions are executed, any return values are dropped and ignored. So these functions should be impure and have some non-volatile side-effect.
Unlike other cluster packages which require OS/Kernel level libraries or an additional message queue like RabbitMQ or Redis (which open-source standing has been flaky recently); this package only requires PostgreSQL (or another database with transaction level UPDATE LOCK such as Oracle).
So long as code is importable, this package can run on any computing machine.
PDOFlow includes built-in priority queue support:
- Jobs with higher priority values are executed first
- Jobs with the same priority are executed in FIFO order (preventing starvation)
- Priority values range from -2,147,483,648 to 2,147,483,647 (PostgreSQL INT)
- Default priority is 0 for backward compatibility
PDOFlow provides shared variables for coordination between workers processing the same JobPosting:
- Key-value storage associated with each JobPosting
- Atomic operations using PostgreSQL row-level locking
- JSON-serializable values for flexibility
- Automatic cleanup when JobPosting is deleted
The shared variables API is available for direct use with database sessions:
from pdoflow.io import Session
from pdoflow.shared_vars import get_shared_variable, set_shared_variable, update_shared_variable
# In your application code
with Session() as session:
# Set a variable
set_shared_variable(session, posting_id, "config", {"timeout": 30})
# Atomically increment a counter
count = update_shared_variable(
session, posting_id, "counter",
lambda x: (x or 0) + 1, default=0
)
# Read a variable with optional locking
value = get_shared_variable(session, posting_id, "config", lock=True)
session.commit()
Note: Integration with the @job decorator for automatic session/posting_id injection is planned for a future release.
This package runs arbitrary code passed from clients. Some mitigations have been made. For example, all posted parameters must be JSON serializable and the code must be importable in some fashion.
However, anyone can point to a file on disk. So the user owning the cluster worker pool should be given the smallest amount of privileges. And the users authorized to SELECT, UPDATE, and INSERT into the job queue should be limited.
- Implement a
hardware
decorator which is added to functions which require specific hardware which might not be available an arbitrary compute cluster. These should be somehow passed/identified when creating a worker pool server. - Implement priority queues while negating starvation problems.
- Figure out mechanism for passing
NaN
andinf
values since this package is being used in a scientific compute context. - See how complicated it would be to orchistrate worker pools. So they can coordinate and pass workloads to each other using PostgreSQL.
0.1.1 Worker pools now only pull jobs from their user.