Skip to content

Adding queue name and change the database schema #20

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,51 @@ PostgreSQL is a powerful, open source object-relational database system.
## Documentation

Reference documentation for JobsPostgres can be found [here](https://docs.hummingbird.codes/2.0/documentation/jobspostgres)

## Breaking changes from 1.0.0-beta.1

The all jobs related tables have been moved to `swift_jobs` schema

The following tables have also been renamed

* _hb_pg_job_queue -> swift_jobs.queues
* _hb_pg_jobs -> swift_jobs.jobs
* _hb_pg_job_queue_metadata -> swift_jobs.queues_metadata

if you have jobs queues in the previous schema, you'll need to move these jobs over to the new schema as follow

```SQL
INSERT INTO swift_jobs.jobs(id, status, last_modified, job)
SELECT
id,
status,
last_modified,
job
FROM _hb_pg_jobs
```
should you have any pending jobs from the previous queue, you'll need to also run the following query

```SQL
INSERT INTO swift_jobs.queues(job_id, created_at, delayed_until, queue_name)
SELECT
job_id,
createdAt,
delayed_until,
'default' -- UNLESS queueName was changed, you'll need to match the name of the queue here
FROM _hb_pg_job_queue
```
and finally
* `DROP TABLE _hb_pg_jobs`
* `DROP TABLE _hb_pg_job_queue`
* `DROP TABLE _hb_pg_job_queue_metadata`

Should you also want to preseve the job metadata, yoo'll need to run

```SQL
INSERT INTO swift_jobs.queues_metadata(key, value, queue_name)
SELECT
key,
value,
'default' -- UNLESS queueName was changed, you'll need to match the name of the queue here
FROM _hb_pg_job_queue_metadata
```
88 changes: 88 additions & 0 deletions Sources/JobsPostgres/Migrations/CreateSwiftJobsMigrations.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Hummingbird server framework project
//
// Copyright (c) 2024 the Hummingbird authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Logging
import PostgresMigrations
import PostgresNIO

struct CreateSwiftJobsMigrations: DatabaseMigration {

func apply(connection: PostgresNIO.PostgresConnection, logger: Logging.Logger) async throws {

try await connection.query("CREATE SCHEMA IF NOT EXISTS swift_jobs;", logger: logger)

try await connection.query(
"""
CREATE TABLE IF NOT EXISTS swift_jobs.jobs (
id uuid PRIMARY KEY,
job bytea NOT NULL,
status smallint NOT NULL,
last_modified TIMESTAMPTZ NOT NULL DEFAULT now()
);
""",
logger: logger
)

try await connection.query(
"""
CREATE TABLE IF NOT EXISTS swift_jobs.queues(
job_id uuid PRIMARY KEY,
created_at TIMESTAMPTZ NOT NULL,
delayed_until TIMESTAMPTZ NOT NULL DEFAULT now(),
queue_name TEXT NOT NULL DEFAULT 'default'
);
""",
logger: logger
)

try await connection.query(
"""
CREATE INDEX CONCURRENTLY IF NOT EXISTS queues_delayed_until_queue_name_idx
ON swift_jobs.queues(delayed_until, queue_name)
""",
logger: logger
)

try await connection.query(
"""
CREATE TABLE IF NOT EXISTS swift_jobs.queues_metadata(
key text PRIMARY KEY,
value bytea NOT NULL,
queue_name TEXT NOT NULL DEFAULT 'default'
)
""",
logger: logger
)

try await connection.query(
"""
CREATE INDEX CONCURRENTLY IF NOT EXISTS queues_metadata_key_queue_name_idx
ON swift_jobs.queues_metadata(key, queue_name)
""",
logger: logger
)
}

func revert(connection: PostgresNIO.PostgresConnection, logger: Logging.Logger) async throws {
try await connection.query(
"""
DROP SCHEMA swift_jobs CASCADE;
""",
logger: logger
)
}

var description: String { "__CreateSwiftJobsMigrations__" }
var group: DatabaseMigrationGroup { .jobQueue }
}
48 changes: 24 additions & 24 deletions Sources/JobsPostgres/PostgresJobsQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import PostgresNIO
/// ```
public final class PostgresJobQueue: JobQueueDriver {
public typealias JobID = UUID

/// what to do with failed/processing jobs from last time queue was handled
public enum JobCleanup: Sendable {
case doNothing
Expand Down Expand Up @@ -75,13 +74,17 @@ public final class PostgresJobQueue: JobQueueDriver {
public struct Configuration: Sendable {
/// Queue poll time to wait if queue empties
let pollTime: Duration
/// Which Queue to push jobs into
let queueName: String

/// Initialize configuration
/// - Parameter pollTime: Queue poll time to wait if queue empties
public init(
pollTime: Duration = .milliseconds(100)
pollTime: Duration = .milliseconds(100),
queueName: String = "default"
) {
self.pollTime = pollTime
self.queueName = queueName
}
}

Expand All @@ -103,11 +106,7 @@ public final class PostgresJobQueue: JobQueueDriver {
self.logger = logger
self.isStopped = .init(false)
self.migrations = migrations
await migrations.add(CreateJobs())
await migrations.add(CreateJobQueue())
await migrations.add(CreateJobQueueMetadata())
await migrations.add(CreateJobDelay())
await migrations.add(UpdateJobDelay())
await migrations.add(CreateSwiftJobsMigrations())
}

public func onInit() async throws {
Expand Down Expand Up @@ -211,7 +210,7 @@ public final class PostgresJobQueue: JobQueueDriver {

public func getMetadata(_ key: String) async throws -> ByteBuffer? {
let stream = try await self.client.query(
"SELECT value FROM _hb_pg_job_queue_metadata WHERE key = \(key)",
"SELECT value FROM swift_jobs.queues_metadata WHERE key = \(key) AND queue_name = \(configuration.queueName)",
logger: self.logger
)
for try await value in stream.decode(ByteBuffer.self) {
Expand All @@ -223,7 +222,8 @@ public final class PostgresJobQueue: JobQueueDriver {
public func setMetadata(key: String, value: ByteBuffer) async throws {
try await self.client.query(
"""
INSERT INTO _hb_pg_job_queue_metadata (key, value) VALUES (\(key), \(value))
INSERT INTO swift_jobs.queues_metadata (key, value, queue_name)
VALUES (\(key), \(value), \(configuration.queueName))
ON CONFLICT (key)
DO UPDATE SET value = \(value)
""",
Expand All @@ -248,14 +248,15 @@ public final class PostgresJobQueue: JobQueueDriver {
WITH next_job AS (
SELECT
job_id
FROM _hb_pg_job_queue
FROM swift_jobs.queues
WHERE delayed_until <= NOW()
ORDER BY createdAt, delayed_until ASC
AND queue_name = \(configuration.queueName)
ORDER BY created_at, delayed_until ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
DELETE FROM
_hb_pg_job_queue
swift_jobs.queues
WHERE job_id = (SELECT job_id FROM next_job)
RETURNING job_id
""",
Expand All @@ -270,7 +271,7 @@ public final class PostgresJobQueue: JobQueueDriver {

// select job from job table
let stream2 = try await connection.query(
"SELECT job FROM _hb_pg_jobs WHERE id = \(jobID)",
"SELECT job FROM swift_jobs.jobs WHERE id = \(jobID)",
logger: self.logger
)
guard let row = try await stream2.first(where: { _ in true }) else {
Expand Down Expand Up @@ -320,7 +321,7 @@ public final class PostgresJobQueue: JobQueueDriver {
func add<Parameters>(jobID: JobID, jobRequest: JobRequest<Parameters>, connection: PostgresConnection) async throws {
try await connection.query(
"""
INSERT INTO _hb_pg_jobs (id, job, status)
INSERT INTO swift_jobs.jobs (id, job, status)
VALUES (\(jobID), \(jobRequest), \(Status.pending))
""",
logger: self.logger
Expand All @@ -330,9 +331,9 @@ public final class PostgresJobQueue: JobQueueDriver {
func updateJob(id: JobID, buffer: ByteBuffer, connection: PostgresConnection) async throws {
try await connection.query(
"""
UPDATE _hb_pg_jobs
UPDATE swift_jobs.jobs
SET job = \(buffer),
lastModified = \(Date.now),
last_modified = \(Date.now),
status = \(Status.failed)
WHERE id = \(id)
""",
Expand All @@ -342,17 +343,16 @@ public final class PostgresJobQueue: JobQueueDriver {

func delete(jobID: JobID) async throws {
try await self.client.query(
"DELETE FROM _hb_pg_jobs WHERE id = \(jobID)",
"DELETE FROM swift_jobs.jobs WHERE id = \(jobID)",
logger: self.logger
)
}

func addToQueue(jobID: JobID, connection: PostgresConnection, delayUntil: Date) async throws {
// TODO: assign Date.now in swift-jobs options?
try await connection.query(
"""
INSERT INTO _hb_pg_job_queue (job_id, createdAt, delayed_until)
VALUES (\(jobID), \(Date.now), \(delayUntil))
INSERT INTO swift_jobs.queues (job_id, created_at, delayed_until, queue_name)
VALUES (\(jobID), \(Date.now), \(delayUntil), \(configuration.queueName))
-- We have found an existing job with the same id, SKIP this INSERT
ON CONFLICT (job_id) DO NOTHING
""",
Expand All @@ -362,21 +362,21 @@ public final class PostgresJobQueue: JobQueueDriver {

func setStatus(jobID: JobID, status: Status, connection: PostgresConnection) async throws {
try await connection.query(
"UPDATE _hb_pg_jobs SET status = \(status), lastModified = \(Date.now) WHERE id = \(jobID)",
"UPDATE swift_jobs.jobs SET status = \(status), last_modified = \(Date.now) WHERE id = \(jobID)",
logger: self.logger
)
}

func setStatus(jobID: JobID, status: Status) async throws {
try await self.client.query(
"UPDATE _hb_pg_jobs SET status = \(status), lastModified = \(Date.now) WHERE id = \(jobID)",
"UPDATE swift_jobs.jobs SET status = \(status), last_modified = \(Date.now) WHERE id = \(jobID)",
logger: self.logger
)
}

func getJobs(withStatus status: Status) async throws -> [JobID] {
let stream = try await self.client.query(
"SELECT id FROM _hb_pg_jobs WHERE status = \(status) FOR UPDATE SKIP LOCKED",
"SELECT id FROM swift_jobs.jobs WHERE status = \(status)",
logger: self.logger
)
var jobs: [JobID] = []
Expand All @@ -390,7 +390,7 @@ public final class PostgresJobQueue: JobQueueDriver {
switch onInit {
case .remove:
try await connection.query(
"DELETE FROM _hb_pg_jobs WHERE status = \(status) ",
"DELETE FROM swift_jobs.jobs WHERE status = \(status) ",
logger: self.logger
)

Expand Down
Loading