Skip to content

mit-kavli-institute/pdoflow

Repository files navigation

PDOFlow

A Python project which manages job queues using PostgreSQL as the source of truth.

Usage

There are two entrypoints required to run PDOFlow. The first requires decorating a function which work will be assigned to various computing clusters. The other is actually running and maintaining an active worker pool for each machine in the cluster.

Decorating Functions

The use case is very simple. Decorate a function and push work to that function.

# File mymodule/*/foo.py
from pdoflow import cluster

@cluster.job()
def some_unit_of_work(a: int, b: float, c: str) -> str:
    result = a * b
    with open(c, "wt") as fout:
        fout.write(f"{result:.06f}")

# File mymodule/*/workload.py | doesn't necessarily need to be in another
# file

from pdoflow.registry import Registry
from mymodule.foo import some_unit_of_work


Registry[some_unit_of_work].post_work(
    (
        [1, 2.0, "just_twos.txt"],
        [10, -100, "big_numbers.txt"],
    ),
    []  # Work is being done through positional arguments, no keyword
        # parameters are being used.
)
# Done from the client's point of view.

# You can also specify priorities for jobs
Registry[some_unit_of_work].post_work(
    posargs=[
        (1, 2.0, "urgent.txt"),
        (2, 3.0, "normal.txt"),
        (3, 4.0, "low_priority.txt"),
    ],
    kwargs=[],
    priority=[100, 0, -50]  # Higher values = higher priority
)

# Or use a single priority for all jobs
Registry[some_unit_of_work].post_work(
    posargs=[(i, float(i), f"file_{i}.txt") for i in range(10)],
    kwargs=[],
    priority=50  # All jobs get priority 50
)

Running clusters

You may instantiate clusters using the provided CLI.

pdoflow pool --max-workers 16

This will create a pool of a maximum of 16 concurrent python processes which will execute posted workloads.

CLI Commands

PDOFlow provides several CLI commands for managing and monitoring jobs:

# Start a worker pool
pdoflow pool --max-workers 16 --batchsize 10

# Check posting status (use --show-jobs to see individual job priorities)
pdoflow posting-status <uuid> --show-jobs

# List all postings
pdoflow list-postings

# View priority statistics for waiting jobs
pdoflow priority-stats

# Manually execute a specific job
pdoflow execute-job <uuid>

# Set posting status
pdoflow set-posting-status <uuid> executing

Or you may run pool in a Python program:

from pdoflow import cluster
from time import sleep


worker_pool = cluster.ClusterPool(max_workers=16)

with worker_pool:
    while True:
        worker_pool.upkeep()
        sleep(1.0)

Drawbacks

Work is posted to a PostgreSQL database utilizing JSON serializable fields for work parameters. The biggest draw back currently is the inability to pass NaN and inf values.

While functions are executed, any return values are dropped and ignored. So these functions should be impure and have some non-volatile side-effect.

Advantages

Unlike other cluster packages which require OS/Kernel level libraries or an additional message queue like RabbitMQ or Redis (which open-source standing has been flaky recently); this package only requires PostgreSQL (or another database with transaction level UPDATE LOCK such as Oracle).

So long as code is importable, this package can run on any computing machine.

Priority Queue Support

PDOFlow includes built-in priority queue support:

  • Jobs with higher priority values are executed first
  • Jobs with the same priority are executed in FIFO order (preventing starvation)
  • Priority values range from -2,147,483,648 to 2,147,483,647 (PostgreSQL INT)
  • Default priority is 0 for backward compatibility

Shared Variables (Experimental)

PDOFlow provides shared variables for coordination between workers processing the same JobPosting:

  • Key-value storage associated with each JobPosting
  • Atomic operations using PostgreSQL row-level locking
  • JSON-serializable values for flexibility
  • Automatic cleanup when JobPosting is deleted

The shared variables API is available for direct use with database sessions:

from pdoflow.io import Session
from pdoflow.shared_vars import get_shared_variable, set_shared_variable, update_shared_variable

# In your application code
with Session() as session:
    # Set a variable
    set_shared_variable(session, posting_id, "config", {"timeout": 30})

    # Atomically increment a counter
    count = update_shared_variable(
        session, posting_id, "counter",
        lambda x: (x or 0) + 1, default=0
    )

    # Read a variable with optional locking
    value = get_shared_variable(session, posting_id, "config", lock=True)

    session.commit()

Note: Integration with the @job decorator for automatic session/posting_id injection is planned for a future release.

Security Implications

This package runs arbitrary code passed from clients. Some mitigations have been made. For example, all posted parameters must be JSON serializable and the code must be importable in some fashion.

However, anyone can point to a file on disk. So the user owning the cluster worker pool should be given the smallest amount of privileges. And the users authorized to SELECT, UPDATE, and INSERT into the job queue should be limited.

TODO

  • Implement a hardware decorator which is added to functions which require specific hardware which might not be available an arbitrary compute cluster. These should be somehow passed/identified when creating a worker pool server.
  • Implement priority queues while negating starvation problems.
  • Figure out mechanism for passing NaN and inf values since this package is being used in a scientific compute context.
  • See how complicated it would be to orchistrate worker pools. So they can coordinate and pass workloads to each other using PostgreSQL.

Changelog

0.1.1 Worker pools now only pull jobs from their user.

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Packages

No packages published