Skip to content

Adding queue name and change the database schema #20

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions Sources/JobsPostgres/Migrations/CreateSwiftJobsMigrations.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Hummingbird server framework project
//
// Copyright (c) 2024 the Hummingbird authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Logging
import PostgresMigrations
import PostgresNIO

struct CreateSwiftJobsMigrations: DatabaseMigration {

func apply(connection: PostgresNIO.PostgresConnection, logger: Logging.Logger) async throws {

try await connection.query("CREATE SCHEMA IF NOT EXISTS swift_jobs;", logger: logger)

try await connection.query(
"""
CREATE TABLE IF NOT EXISTS swift_jobs.jobs (
id uuid PRIMARY KEY,
job bytea NOT NULL,
last_modified TIMESTAMPTZ NOT NULL DEFAULT now(),
queue_name TEXT NOT NULL DEFAULT 'default',
status smallint NOT NULL
);
""",
logger: logger
)

try await connection.query(
"""
CREATE TABLE IF NOT EXISTS swift_jobs.queues(
job_id uuid PRIMARY KEY,
created_at TIMESTAMPTZ NOT NULL,
delayed_until TIMESTAMPTZ NOT NULL DEFAULT now(),
queue_name TEXT NOT NULL DEFAULT 'default'
);
""",
logger: logger
)

try await connection.query(
"""
CREATE INDEX CONCURRENTLY IF NOT EXISTS queues_delayed_until_queue_name_idx
ON swift_jobs.queues(delayed_until, queue_name)
""",
logger: logger
)

try await connection.query(
"""
CREATE TABLE IF NOT EXISTS swift_jobs.queues_metadata(
key text PRIMARY KEY,
value bytea NOT NULL,
queue_name TEXT NOT NULL DEFAULT 'default'
)
""",
logger: logger
)

try await connection.query(
"""
CREATE INDEX CONCURRENTLY IF NOT EXISTS queues_metadata_key_queue_name_idx
ON swift_jobs.queues_metadata(key, queue_name)
""",
logger: logger
)
}

func revert(connection: PostgresNIO.PostgresConnection, logger: Logging.Logger) async throws {
try await connection.query(
"""
DROP SCHEMA swift_jobs CASCADE;
""",
logger: logger
)
}

var description: String { "__CreateSwiftJobsMigrations__" }
var group: DatabaseMigrationGroup { .jobQueue }
}
72 changes: 36 additions & 36 deletions Sources/JobsPostgres/PostgresJobsQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import PostgresNIO
/// ```
public final class PostgresJobQueue: JobQueueDriver {
public typealias JobID = UUID

/// what to do with failed/processing jobs from last time queue was handled
public enum JobCleanup: Sendable {
case doNothing
Expand Down Expand Up @@ -75,13 +74,17 @@ public final class PostgresJobQueue: JobQueueDriver {
public struct Configuration: Sendable {
/// Queue poll time to wait if queue empties
let pollTime: Duration
/// Which Queue to push jobs into
let queueName: String

/// Initialize configuration
/// - Parameter pollTime: Queue poll time to wait if queue empties
public init(
pollTime: Duration = .milliseconds(100)
pollTime: Duration = .milliseconds(100),
queueName: String = "default"
) {
self.pollTime = pollTime
self.queueName = queueName
}
}

Expand All @@ -103,11 +106,7 @@ public final class PostgresJobQueue: JobQueueDriver {
self.logger = logger
self.isStopped = .init(false)
self.migrations = migrations
await migrations.add(CreateJobs())
await migrations.add(CreateJobQueue())
await migrations.add(CreateJobQueueMetadata())
await migrations.add(CreateJobDelay())
await migrations.add(UpdateJobDelay())
await migrations.add(CreateSwiftJobsMigrations())
}

public func onInit() async throws {
Expand Down Expand Up @@ -175,8 +174,8 @@ public final class PostgresJobQueue: JobQueueDriver {
@discardableResult public func push<Parameters>(_ jobRequest: JobRequest<Parameters>, options: JobOptions) async throws -> JobID {
let jobID = JobID()
try await self.client.withTransaction(logger: self.logger) { connection in
try await self.add(jobID: jobID, jobRequest: jobRequest, connection: connection)
try await self.addToQueue(jobID: jobID, connection: connection, delayUntil: options.delayUntil)
try await self.add(jobID: jobID, jobRequest: jobRequest, queueName: configuration.queueName, connection: connection)
try await self.addToQueue(jobID: jobID, queueName: configuration.queueName, delayUntil: options.delayUntil, connection: connection)
}
return jobID
}
Expand All @@ -187,7 +186,7 @@ public final class PostgresJobQueue: JobQueueDriver {
let buffer = try self.jobRegistry.encode(jobRequest: jobRequest)
try await self.client.withTransaction(logger: self.logger) { connection in
try await self.updateJob(id: id, buffer: buffer, connection: connection)
try await self.addToQueue(jobID: id, connection: connection, delayUntil: options.delayUntil)
try await self.addToQueue(jobID: id, queueName: configuration.queueName, delayUntil: options.delayUntil, connection: connection)
}
}

Expand All @@ -211,7 +210,7 @@ public final class PostgresJobQueue: JobQueueDriver {

public func getMetadata(_ key: String) async throws -> ByteBuffer? {
let stream = try await self.client.query(
"SELECT value FROM _hb_pg_job_queue_metadata WHERE key = \(key)",
"SELECT value FROM swift_jobs.queues_metadata WHERE key = \(key) AND queue_name = \(configuration.queueName)",
logger: self.logger
)
for try await value in stream.decode(ByteBuffer.self) {
Expand All @@ -223,7 +222,8 @@ public final class PostgresJobQueue: JobQueueDriver {
public func setMetadata(key: String, value: ByteBuffer) async throws {
try await self.client.query(
"""
INSERT INTO _hb_pg_job_queue_metadata (key, value) VALUES (\(key), \(value))
INSERT INTO swift_jobs.queues_metadata (key, value, queue_name)
VALUES (\(key), \(value), \(configuration.queueName))
ON CONFLICT (key)
DO UPDATE SET value = \(value)
""",
Expand All @@ -248,14 +248,15 @@ public final class PostgresJobQueue: JobQueueDriver {
WITH next_job AS (
SELECT
job_id
FROM _hb_pg_job_queue
FROM swift_jobs.queues
WHERE delayed_until <= NOW()
ORDER BY createdAt, delayed_until ASC
AND queue_name = \(configuration.queueName)
ORDER BY created_at, delayed_until ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
DELETE FROM
_hb_pg_job_queue
swift_jobs.queues
WHERE job_id = (SELECT job_id FROM next_job)
RETURNING job_id
""",
Expand All @@ -270,7 +271,7 @@ public final class PostgresJobQueue: JobQueueDriver {

// select job from job table
let stream2 = try await connection.query(
"SELECT job FROM _hb_pg_jobs WHERE id = \(jobID)",
"SELECT job FROM swift_jobs.jobs WHERE id = \(jobID)",
logger: self.logger
)
guard let row = try await stream2.first(where: { _ in true }) else {
Expand Down Expand Up @@ -317,11 +318,11 @@ public final class PostgresJobQueue: JobQueueDriver {
}
}

func add<Parameters>(jobID: JobID, jobRequest: JobRequest<Parameters>, connection: PostgresConnection) async throws {
func add<Parameters>(jobID: JobID, jobRequest: JobRequest<Parameters>, queueName: String, connection: PostgresConnection) async throws {
try await connection.query(
"""
INSERT INTO _hb_pg_jobs (id, job, status)
VALUES (\(jobID), \(jobRequest), \(Status.pending))
INSERT INTO swift_jobs.jobs (id, job, status, queue_name)
VALUES (\(jobID), \(jobRequest), \(Status.pending), \(queueName))
""",
logger: self.logger
)
Expand All @@ -330,9 +331,9 @@ public final class PostgresJobQueue: JobQueueDriver {
func updateJob(id: JobID, buffer: ByteBuffer, connection: PostgresConnection) async throws {
try await connection.query(
"""
UPDATE _hb_pg_jobs
UPDATE swift_jobs.jobs
SET job = \(buffer),
lastModified = \(Date.now),
last_modified = \(Date.now),
status = \(Status.failed)
WHERE id = \(id)
""",
Expand All @@ -342,17 +343,16 @@ public final class PostgresJobQueue: JobQueueDriver {

func delete(jobID: JobID) async throws {
try await self.client.query(
"DELETE FROM _hb_pg_jobs WHERE id = \(jobID)",
"DELETE FROM swift_jobs.jobs WHERE id = \(jobID)",
logger: self.logger
)
}

func addToQueue(jobID: JobID, connection: PostgresConnection, delayUntil: Date) async throws {
// TODO: assign Date.now in swift-jobs options?
func addToQueue(jobID: JobID, queueName: String, delayUntil: Date, connection: PostgresConnection) async throws {
try await connection.query(
"""
INSERT INTO _hb_pg_job_queue (job_id, createdAt, delayed_until)
VALUES (\(jobID), \(Date.now), \(delayUntil))
INSERT INTO swift_jobs.queues (job_id, created_at, delayed_until, queue_name)
VALUES (\(jobID), \(Date.now), \(delayUntil), \(queueName))
-- We have found an existing job with the same id, SKIP this INSERT
ON CONFLICT (job_id) DO NOTHING
""",
Expand All @@ -362,26 +362,26 @@ public final class PostgresJobQueue: JobQueueDriver {

func setStatus(jobID: JobID, status: Status, connection: PostgresConnection) async throws {
try await connection.query(
"UPDATE _hb_pg_jobs SET status = \(status), lastModified = \(Date.now) WHERE id = \(jobID)",
"UPDATE swift_jobs.jobs SET status = \(status), last_modified = \(Date.now) WHERE id = \(jobID)",
logger: self.logger
)
}

func setStatus(jobID: JobID, status: Status) async throws {
try await self.client.query(
"UPDATE _hb_pg_jobs SET status = \(status), lastModified = \(Date.now) WHERE id = \(jobID)",
"UPDATE swift_jobs.jobs SET status = \(status), last_modified = \(Date.now) WHERE id = \(jobID)",
logger: self.logger
)
}

func getJobs(withStatus status: Status) async throws -> [JobID] {
func getJobs(withStatus status: Status) async throws -> [(id: JobID, queue: String)] {
let stream = try await self.client.query(
"SELECT id FROM _hb_pg_jobs WHERE status = \(status) FOR UPDATE SKIP LOCKED",
"SELECT id, queue_name FROM swift_jobs.jobs WHERE status = \(status)",
logger: self.logger
)
var jobs: [JobID] = []
for try await id in stream.decode(JobID.self, context: .default) {
jobs.append(id)
var jobs: [(id: JobID, queue: String)] = []
for try await (id, queue) in stream.decode((JobID, String).self, context: .default) {
jobs.append((id: id, queue: queue))
}
return jobs
}
Expand All @@ -390,15 +390,15 @@ public final class PostgresJobQueue: JobQueueDriver {
switch onInit {
case .remove:
try await connection.query(
"DELETE FROM _hb_pg_jobs WHERE status = \(status) ",
"DELETE FROM swift_jobs.jobs WHERE status = \(status) ",
logger: self.logger
)

case .rerun:
let jobs = try await getJobs(withStatus: status)
self.logger.info("Moving \(jobs.count) jobs with status: \(status) to job queue")
for jobID in jobs {
try await self.addToQueue(jobID: jobID, connection: connection, delayUntil: Date.now)
for job in jobs {
try await self.addToQueue(jobID: job.id, queueName: job.queue, delayUntil: Date.now, connection: connection)
}

case .doNothing:
Expand Down
Loading