From 8f6e0e38950571ca36e1b1345ad11a8e2bf7cbba Mon Sep 17 00:00:00 2001 From: Stevenson Michel <130018170+thoven87@users.noreply.github.com> Date: Sat, 22 Feb 2025 23:13:13 -0500 Subject: [PATCH 1/9] Conforming to JobQueueDriver * Conforming to JobQueueDriver with queueName implementation * Migrate the Postgres driver to `swift_jobs` schema --- Package.swift | 2 +- README.md | 48 ++++++++++ .../CreateSwiftJobsMigrations.swift | 88 +++++++++++++++++++ Sources/JobsPostgres/PostgresJobsQueue.swift | 50 ++++++----- 4 files changed, 164 insertions(+), 24 deletions(-) create mode 100644 Sources/JobsPostgres/Migrations/CreateSwiftJobsMigrations.swift diff --git a/Package.swift b/Package.swift index 7f6a392..e451e01 100644 --- a/Package.swift +++ b/Package.swift @@ -10,7 +10,7 @@ let package = Package( ], dependencies: [ // TODO: use a released version of swift-jobs - .package(url: "https://github.com/hummingbird-project/swift-jobs.git", branch: "main"), + .package(url: "https://github.com/thoven87/swift-jobs.git", branch: "queue-name"), .package(url: "https://github.com/hummingbird-project/hummingbird-postgres.git", from: "0.5.0"), .package(url: "https://github.com/vapor/postgres-nio.git", from: "1.25.0"), ], diff --git a/README.md b/README.md index 6ac46f2..cd8a8e7 100644 --- a/README.md +++ b/README.md @@ -23,3 +23,51 @@ PostgreSQL is a powerful, open source object-relational database system. ## Documentation Reference documentation for JobsPostgres can be found [here](https://docs.hummingbird.codes/2.0/documentation/jobspostgres) + +## Breaking changes from 1.0.0-beta.1 + +The all jobs related tables have been moved to `swift_jobs` schema + +The following tables have also been renamed + +* _hb_pg_job_queue -> swift_jobs.queues +* _hb_pg_jobs -> swift_jobs.jobs +* _hb_pg_job_queue_metadata -> swift_jobs.queues_metadata + +if you have jobs queues in the previous schema, you'll need to move these jobs over to the new schema as follow + +```SQL +INSERT INTO swift_jobs.jobs(id, status, last_modified, job) +SELECT + id, + status, + last_modified, + job +FROM _hb_pg_jobs +``` +should you have any pending jobs from the previous queue, you'll need to also run the following query + +```SQL +INSERT INTO swift_jobs.queues(job_id, created_at, delayed_until, queue_name) +SELECT + job_id, + createdAt, + delayed_until, + 'default' -- UNLESS queueName was changed, you'll need to match the name of the queue here +FROM _hb_pg_job_queue +``` +and finally +* `DROP TABLE _hb_pg_jobs` +* `DROP TABLE _hb_pg_job_queue` +* `DROP TABLE _hb_pg_job_queue_metadata` + +Should you also want to preseve the job metadata, yoo'll need to run + +```SQL +INSERT INTO swift_jobs.queues_metadata(key, value, queue_name) +SELECT + key, + value, + 'default' -- UNLESS queueName was changed, you'll need to match the name of the queue here +FROM _hb_pg_job_queue_metadata +``` diff --git a/Sources/JobsPostgres/Migrations/CreateSwiftJobsMigrations.swift b/Sources/JobsPostgres/Migrations/CreateSwiftJobsMigrations.swift new file mode 100644 index 0000000..57b083b --- /dev/null +++ b/Sources/JobsPostgres/Migrations/CreateSwiftJobsMigrations.swift @@ -0,0 +1,88 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2024 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Logging +import PostgresMigrations +import PostgresNIO + +struct CreateSwiftJobsMigrations: DatabaseMigration { + + func apply(connection: PostgresNIO.PostgresConnection, logger: Logging.Logger) async throws { + + try await connection.query("CREATE SCHEMA IF NOT EXISTS swift_jobs;", logger: logger) + + try await connection.query( + """ + CREATE TABLE IF NOT EXISTS swift_jobs.jobs ( + id uuid PRIMARY KEY, + job bytea NOT NULL, + status smallint NOT NULL, + last_modified TIMESTAMPTZ NOT NULL DEFAULT now() + ); + """, + logger: logger + ) + + try await connection.query( + """ + CREATE TABLE IF NOT EXISTS swift_jobs.queues( + job_id uuid PRIMARY KEY, + created_at TIMESTAMPTZ NOT NULL, + delayed_until TIMESTAMPTZ NOT NULL DEFAULT now(), + queue_name TEXT NOT NULL DEFAULT 'default' + ); + """, + logger: logger + ) + + try await connection.query( + """ + CREATE INDEX CONCURRENTLY IF NOT EXISTS queues_delayed_until_queue_name_idx + ON swift_jobs.queues(delayed_until, queue_name) + """, + logger: logger + ) + + try await connection.query( + """ + CREATE TABLE IF NOT EXISTS swift_jobs.queues_metadata( + key text PRIMARY KEY, + value bytea NOT NULL, + queue_name TEXT NOT NULL DEFAULT 'default' + ) + """, + logger: logger + ) + + try await connection.query( + """ + CREATE INDEX CONCURRENTLY IF NOT EXISTS queues_metadata_key_queue_name_idx + ON swift_jobs.queues_metadata(key, queue_name) + """, + logger: logger + ) + } + + func revert(connection: PostgresNIO.PostgresConnection, logger: Logging.Logger) async throws { + try await connection.query( + """ + DROP SCHEMA swift_jobs CASCADE; + """, + logger: logger + ) + } + + var description: String { "__CreateSwiftJobsMigrations__" } + var group: DatabaseMigrationGroup { .jobQueue } +} diff --git a/Sources/JobsPostgres/PostgresJobsQueue.swift b/Sources/JobsPostgres/PostgresJobsQueue.swift index 869f06a..748bbc0 100644 --- a/Sources/JobsPostgres/PostgresJobsQueue.swift +++ b/Sources/JobsPostgres/PostgresJobsQueue.swift @@ -44,6 +44,8 @@ import PostgresNIO /// ``` public final class PostgresJobQueue: JobQueueDriver { public typealias JobID = UUID + /// Queue to push jobs into + public let queueName: String /// what to do with failed/processing jobs from last time queue was handled public enum JobCleanup: Sendable { @@ -75,13 +77,17 @@ public final class PostgresJobQueue: JobQueueDriver { public struct Configuration: Sendable { /// Queue poll time to wait if queue empties let pollTime: Duration + /// Which Queue to push jobs into + let queueName: String /// Initialize configuration /// - Parameter pollTime: Queue poll time to wait if queue empties public init( - pollTime: Duration = .milliseconds(100) + pollTime: Duration = .milliseconds(100), + queueName: String = "default" ) { self.pollTime = pollTime + self.queueName = queueName } } @@ -103,11 +109,8 @@ public final class PostgresJobQueue: JobQueueDriver { self.logger = logger self.isStopped = .init(false) self.migrations = migrations - await migrations.add(CreateJobs()) - await migrations.add(CreateJobQueue()) - await migrations.add(CreateJobQueueMetadata()) - await migrations.add(CreateJobDelay()) - await migrations.add(UpdateJobDelay()) + self.queueName = configuration.queueName + await migrations.add(CreateSwiftJobsMigrations()) } public func onInit() async throws { @@ -211,7 +214,7 @@ public final class PostgresJobQueue: JobQueueDriver { public func getMetadata(_ key: String) async throws -> ByteBuffer? { let stream = try await self.client.query( - "SELECT value FROM _hb_pg_job_queue_metadata WHERE key = \(key)", + "SELECT value FROM swift_jobs.queues_metadata WHERE key = \(key) AND queue_name = \(configuration.queueName)", logger: self.logger ) for try await value in stream.decode(ByteBuffer.self) { @@ -223,7 +226,8 @@ public final class PostgresJobQueue: JobQueueDriver { public func setMetadata(key: String, value: ByteBuffer) async throws { try await self.client.query( """ - INSERT INTO _hb_pg_job_queue_metadata (key, value) VALUES (\(key), \(value)) + INSERT INTO swift_jobs.queues_metadata (key, value, queue_name) + VALUES (\(key), \(value), \(configuration.queueName)) ON CONFLICT (key) DO UPDATE SET value = \(value) """, @@ -248,14 +252,15 @@ public final class PostgresJobQueue: JobQueueDriver { WITH next_job AS ( SELECT job_id - FROM _hb_pg_job_queue + FROM swift_jobs.queues WHERE delayed_until <= NOW() - ORDER BY createdAt, delayed_until ASC + AND queue_name = \(configuration.queueName) + ORDER BY created_at, delayed_until ASC FOR UPDATE SKIP LOCKED LIMIT 1 ) DELETE FROM - _hb_pg_job_queue + swift_jobs.queues WHERE job_id = (SELECT job_id FROM next_job) RETURNING job_id """, @@ -270,7 +275,7 @@ public final class PostgresJobQueue: JobQueueDriver { // select job from job table let stream2 = try await connection.query( - "SELECT job FROM _hb_pg_jobs WHERE id = \(jobID)", + "SELECT job FROM swift_jobs.jobs WHERE id = \(jobID)", logger: self.logger ) guard let row = try await stream2.first(where: { _ in true }) else { @@ -320,7 +325,7 @@ public final class PostgresJobQueue: JobQueueDriver { func add(jobID: JobID, jobRequest: JobRequest, connection: PostgresConnection) async throws { try await connection.query( """ - INSERT INTO _hb_pg_jobs (id, job, status) + INSERT INTO swift_jobs.jobs (id, job, status) VALUES (\(jobID), \(jobRequest), \(Status.pending)) """, logger: self.logger @@ -330,9 +335,9 @@ public final class PostgresJobQueue: JobQueueDriver { func updateJob(id: JobID, buffer: ByteBuffer, connection: PostgresConnection) async throws { try await connection.query( """ - UPDATE _hb_pg_jobs + UPDATE swift_jobs.jobs SET job = \(buffer), - lastModified = \(Date.now), + last_modified = \(Date.now), status = \(Status.failed) WHERE id = \(id) """, @@ -342,17 +347,16 @@ public final class PostgresJobQueue: JobQueueDriver { func delete(jobID: JobID) async throws { try await self.client.query( - "DELETE FROM _hb_pg_jobs WHERE id = \(jobID)", + "DELETE FROM swift_jobs.jobs WHERE id = \(jobID)", logger: self.logger ) } func addToQueue(jobID: JobID, connection: PostgresConnection, delayUntil: Date) async throws { - // TODO: assign Date.now in swift-jobs options? try await connection.query( """ - INSERT INTO _hb_pg_job_queue (job_id, createdAt, delayed_until) - VALUES (\(jobID), \(Date.now), \(delayUntil)) + INSERT INTO swift_jobs.queues (job_id, created_at, delayed_until, queue_name) + VALUES (\(jobID), \(Date.now), \(delayUntil), \(configuration.queueName)) -- We have found an existing job with the same id, SKIP this INSERT ON CONFLICT (job_id) DO NOTHING """, @@ -362,21 +366,21 @@ public final class PostgresJobQueue: JobQueueDriver { func setStatus(jobID: JobID, status: Status, connection: PostgresConnection) async throws { try await connection.query( - "UPDATE _hb_pg_jobs SET status = \(status), lastModified = \(Date.now) WHERE id = \(jobID)", + "UPDATE swift_jobs.jobs SET status = \(status), last_modified = \(Date.now) WHERE id = \(jobID)", logger: self.logger ) } func setStatus(jobID: JobID, status: Status) async throws { try await self.client.query( - "UPDATE _hb_pg_jobs SET status = \(status), lastModified = \(Date.now) WHERE id = \(jobID)", + "UPDATE swift_jobs.jobs SET status = \(status), last_modified = \(Date.now) WHERE id = \(jobID)", logger: self.logger ) } func getJobs(withStatus status: Status) async throws -> [JobID] { let stream = try await self.client.query( - "SELECT id FROM _hb_pg_jobs WHERE status = \(status) FOR UPDATE SKIP LOCKED", + "SELECT id FROM swift_jobs.jobs WHERE status = \(status)", logger: self.logger ) var jobs: [JobID] = [] @@ -390,7 +394,7 @@ public final class PostgresJobQueue: JobQueueDriver { switch onInit { case .remove: try await connection.query( - "DELETE FROM _hb_pg_jobs WHERE status = \(status) ", + "DELETE FROM swift_jobs.jobs WHERE status = \(status) ", logger: self.logger ) From a0fe65602ba9c63afcd829ffe0f9f97b738ed85e Mon Sep 17 00:00:00 2001 From: Stevenson Michel <130018170+thoven87@users.noreply.github.com> Date: Sat, 22 Feb 2025 23:15:00 -0500 Subject: [PATCH 2/9] Swift format --- .../JobsPostgres/Migrations/CreateSwiftJobsMigrations.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Sources/JobsPostgres/Migrations/CreateSwiftJobsMigrations.swift b/Sources/JobsPostgres/Migrations/CreateSwiftJobsMigrations.swift index 57b083b..47e2f9d 100644 --- a/Sources/JobsPostgres/Migrations/CreateSwiftJobsMigrations.swift +++ b/Sources/JobsPostgres/Migrations/CreateSwiftJobsMigrations.swift @@ -45,7 +45,7 @@ struct CreateSwiftJobsMigrations: DatabaseMigration { """, logger: logger ) - + try await connection.query( """ CREATE INDEX CONCURRENTLY IF NOT EXISTS queues_delayed_until_queue_name_idx @@ -64,7 +64,7 @@ struct CreateSwiftJobsMigrations: DatabaseMigration { """, logger: logger ) - + try await connection.query( """ CREATE INDEX CONCURRENTLY IF NOT EXISTS queues_metadata_key_queue_name_idx From efcac8aff0f3e4e7643eb1b696163ff4532da421 Mon Sep 17 00:00:00 2001 From: Stevenson Michel <130018170+thoven87@users.noreply.github.com> Date: Sun, 23 Feb 2025 10:55:16 -0500 Subject: [PATCH 3/9] use the queue name from config instead --- Package.swift | 2 +- Sources/JobsPostgres/PostgresJobsQueue.swift | 4 ---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/Package.swift b/Package.swift index e451e01..7f6a392 100644 --- a/Package.swift +++ b/Package.swift @@ -10,7 +10,7 @@ let package = Package( ], dependencies: [ // TODO: use a released version of swift-jobs - .package(url: "https://github.com/thoven87/swift-jobs.git", branch: "queue-name"), + .package(url: "https://github.com/hummingbird-project/swift-jobs.git", branch: "main"), .package(url: "https://github.com/hummingbird-project/hummingbird-postgres.git", from: "0.5.0"), .package(url: "https://github.com/vapor/postgres-nio.git", from: "1.25.0"), ], diff --git a/Sources/JobsPostgres/PostgresJobsQueue.swift b/Sources/JobsPostgres/PostgresJobsQueue.swift index 748bbc0..ea9e917 100644 --- a/Sources/JobsPostgres/PostgresJobsQueue.swift +++ b/Sources/JobsPostgres/PostgresJobsQueue.swift @@ -44,9 +44,6 @@ import PostgresNIO /// ``` public final class PostgresJobQueue: JobQueueDriver { public typealias JobID = UUID - /// Queue to push jobs into - public let queueName: String - /// what to do with failed/processing jobs from last time queue was handled public enum JobCleanup: Sendable { case doNothing @@ -109,7 +106,6 @@ public final class PostgresJobQueue: JobQueueDriver { self.logger = logger self.isStopped = .init(false) self.migrations = migrations - self.queueName = configuration.queueName await migrations.add(CreateSwiftJobsMigrations()) } From 0d11fbe2563cac21b79ea7ef6046abe71f4dedec Mon Sep 17 00:00:00 2001 From: Stevenson Michel <130018170+thoven87@users.noreply.github.com> Date: Mon, 24 Feb 2025 10:12:04 -0500 Subject: [PATCH 4/9] Removed breaking changes notes from README, keep track of which queue a job belongs to --- README.md | 48 ------------------- .../CreateSwiftJobsMigrations.swift | 9 ++-- Sources/JobsPostgres/PostgresJobsQueue.swift | 30 ++++++------ 3 files changed, 20 insertions(+), 67 deletions(-) diff --git a/README.md b/README.md index cd8a8e7..6ac46f2 100644 --- a/README.md +++ b/README.md @@ -23,51 +23,3 @@ PostgreSQL is a powerful, open source object-relational database system. ## Documentation Reference documentation for JobsPostgres can be found [here](https://docs.hummingbird.codes/2.0/documentation/jobspostgres) - -## Breaking changes from 1.0.0-beta.1 - -The all jobs related tables have been moved to `swift_jobs` schema - -The following tables have also been renamed - -* _hb_pg_job_queue -> swift_jobs.queues -* _hb_pg_jobs -> swift_jobs.jobs -* _hb_pg_job_queue_metadata -> swift_jobs.queues_metadata - -if you have jobs queues in the previous schema, you'll need to move these jobs over to the new schema as follow - -```SQL -INSERT INTO swift_jobs.jobs(id, status, last_modified, job) -SELECT - id, - status, - last_modified, - job -FROM _hb_pg_jobs -``` -should you have any pending jobs from the previous queue, you'll need to also run the following query - -```SQL -INSERT INTO swift_jobs.queues(job_id, created_at, delayed_until, queue_name) -SELECT - job_id, - createdAt, - delayed_until, - 'default' -- UNLESS queueName was changed, you'll need to match the name of the queue here -FROM _hb_pg_job_queue -``` -and finally -* `DROP TABLE _hb_pg_jobs` -* `DROP TABLE _hb_pg_job_queue` -* `DROP TABLE _hb_pg_job_queue_metadata` - -Should you also want to preseve the job metadata, yoo'll need to run - -```SQL -INSERT INTO swift_jobs.queues_metadata(key, value, queue_name) -SELECT - key, - value, - 'default' -- UNLESS queueName was changed, you'll need to match the name of the queue here -FROM _hb_pg_job_queue_metadata -``` diff --git a/Sources/JobsPostgres/Migrations/CreateSwiftJobsMigrations.swift b/Sources/JobsPostgres/Migrations/CreateSwiftJobsMigrations.swift index 47e2f9d..0192754 100644 --- a/Sources/JobsPostgres/Migrations/CreateSwiftJobsMigrations.swift +++ b/Sources/JobsPostgres/Migrations/CreateSwiftJobsMigrations.swift @@ -25,10 +25,11 @@ struct CreateSwiftJobsMigrations: DatabaseMigration { try await connection.query( """ CREATE TABLE IF NOT EXISTS swift_jobs.jobs ( - id uuid PRIMARY KEY, - job bytea NOT NULL, - status smallint NOT NULL, - last_modified TIMESTAMPTZ NOT NULL DEFAULT now() + id uuid PRIMARY KEY, + job bytea NOT NULL, + last_modified TIMESTAMPTZ NOT NULL DEFAULT now(), + queue_name TEXT NOT NULL DEFAULT 'default', + status smallint NOT NULL ); """, logger: logger diff --git a/Sources/JobsPostgres/PostgresJobsQueue.swift b/Sources/JobsPostgres/PostgresJobsQueue.swift index ea9e917..631e760 100644 --- a/Sources/JobsPostgres/PostgresJobsQueue.swift +++ b/Sources/JobsPostgres/PostgresJobsQueue.swift @@ -174,8 +174,8 @@ public final class PostgresJobQueue: JobQueueDriver { @discardableResult public func push(_ jobRequest: JobRequest, options: JobOptions) async throws -> JobID { let jobID = JobID() try await self.client.withTransaction(logger: self.logger) { connection in - try await self.add(jobID: jobID, jobRequest: jobRequest, connection: connection) - try await self.addToQueue(jobID: jobID, connection: connection, delayUntil: options.delayUntil) + try await self.add(jobID: jobID, jobRequest: jobRequest, queueName: configuration.queueName, connection: connection) + try await self.addToQueue(jobID: jobID, queueName: configuration.queueName, delayUntil: options.delayUntil, connection: connection) } return jobID } @@ -186,7 +186,7 @@ public final class PostgresJobQueue: JobQueueDriver { let buffer = try self.jobRegistry.encode(jobRequest: jobRequest) try await self.client.withTransaction(logger: self.logger) { connection in try await self.updateJob(id: id, buffer: buffer, connection: connection) - try await self.addToQueue(jobID: id, connection: connection, delayUntil: options.delayUntil) + try await self.addToQueue(jobID: id, queueName: configuration.queueName, delayUntil: options.delayUntil, connection: connection) } } @@ -318,11 +318,11 @@ public final class PostgresJobQueue: JobQueueDriver { } } - func add(jobID: JobID, jobRequest: JobRequest, connection: PostgresConnection) async throws { + func add(jobID: JobID, jobRequest: JobRequest, queueName: String, connection: PostgresConnection) async throws { try await connection.query( """ - INSERT INTO swift_jobs.jobs (id, job, status) - VALUES (\(jobID), \(jobRequest), \(Status.pending)) + INSERT INTO swift_jobs.jobs (id, job, status, queue_name) + VALUES (\(jobID), \(jobRequest), \(Status.pending), \(queueName)) """, logger: self.logger ) @@ -348,11 +348,11 @@ public final class PostgresJobQueue: JobQueueDriver { ) } - func addToQueue(jobID: JobID, connection: PostgresConnection, delayUntil: Date) async throws { + func addToQueue(jobID: JobID, queueName: String, delayUntil: Date, connection: PostgresConnection) async throws { try await connection.query( """ INSERT INTO swift_jobs.queues (job_id, created_at, delayed_until, queue_name) - VALUES (\(jobID), \(Date.now), \(delayUntil), \(configuration.queueName)) + VALUES (\(jobID), \(Date.now), \(delayUntil), \(queueName)) -- We have found an existing job with the same id, SKIP this INSERT ON CONFLICT (job_id) DO NOTHING """, @@ -374,14 +374,14 @@ public final class PostgresJobQueue: JobQueueDriver { ) } - func getJobs(withStatus status: Status) async throws -> [JobID] { + func getJobs(withStatus status: Status) async throws -> [(id: JobID, queue: String)] { let stream = try await self.client.query( - "SELECT id FROM swift_jobs.jobs WHERE status = \(status)", + "SELECT id, queue_name FROM swift_jobs.jobs WHERE status = \(status)", logger: self.logger ) - var jobs: [JobID] = [] - for try await id in stream.decode(JobID.self, context: .default) { - jobs.append(id) + var jobs: [(id: JobID, queue: String)] = [] + for try await (id, queue) in stream.decode((JobID, String).self, context: .default) { + jobs.append((id: id, queue: queue)) } return jobs } @@ -397,8 +397,8 @@ public final class PostgresJobQueue: JobQueueDriver { case .rerun: let jobs = try await getJobs(withStatus: status) self.logger.info("Moving \(jobs.count) jobs with status: \(status) to job queue") - for jobID in jobs { - try await self.addToQueue(jobID: jobID, connection: connection, delayUntil: Date.now) + for job in jobs { + try await self.addToQueue(jobID: job.id, queueName: job.queue, delayUntil: Date.now, connection: connection) } case .doNothing: From 74e259297dc69c63f4fce1f3c56ac21f20fafdda Mon Sep 17 00:00:00 2001 From: Stevenson Michel <130018170+thoven87@users.noreply.github.com> Date: Mon, 24 Feb 2025 11:21:01 -0500 Subject: [PATCH 5/9] Update Sources/JobsPostgres/PostgresJobsQueue.swift Co-authored-by: Adam Fowler --- Sources/JobsPostgres/PostgresJobsQueue.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/JobsPostgres/PostgresJobsQueue.swift b/Sources/JobsPostgres/PostgresJobsQueue.swift index 631e760..65c9d7a 100644 --- a/Sources/JobsPostgres/PostgresJobsQueue.swift +++ b/Sources/JobsPostgres/PostgresJobsQueue.swift @@ -376,7 +376,7 @@ public final class PostgresJobQueue: JobQueueDriver { func getJobs(withStatus status: Status) async throws -> [(id: JobID, queue: String)] { let stream = try await self.client.query( - "SELECT id, queue_name FROM swift_jobs.jobs WHERE status = \(status)", + "SELECT id FROM swift_jobs.jobs WHERE status = \(status) AND queue_name = \(configuration.queueName)", logger: self.logger ) var jobs: [(id: JobID, queue: String)] = [] From b64d911fa7a8ead9f4db8bb06e65fcb9aa3a4ac7 Mon Sep 17 00:00:00 2001 From: Stevenson Michel <130018170+thoven87@users.noreply.github.com> Date: Mon, 24 Feb 2025 13:10:06 -0500 Subject: [PATCH 6/9] make select queue_name implicit and updated the test multiple job queue to split tasks --- Sources/JobsPostgres/PostgresJobsQueue.swift | 52 +++++++++++++++----- Tests/JobsPostgresTests/JobsTests.swift | 6 ++- 2 files changed, 46 insertions(+), 12 deletions(-) diff --git a/Sources/JobsPostgres/PostgresJobsQueue.swift b/Sources/JobsPostgres/PostgresJobsQueue.swift index 65c9d7a..cc309da 100644 --- a/Sources/JobsPostgres/PostgresJobsQueue.swift +++ b/Sources/JobsPostgres/PostgresJobsQueue.swift @@ -155,7 +155,7 @@ public final class PostgresJobQueue: JobQueueDriver { logger.error( "JobQueue initialization failed", metadata: [ - "error": "\(String(reflecting: error))" + "Error": "\(String(reflecting: error))" ] ) throw error @@ -271,14 +271,20 @@ public final class PostgresJobQueue: JobQueueDriver { // select job from job table let stream2 = try await connection.query( - "SELECT job FROM swift_jobs.jobs WHERE id = \(jobID)", + """ + SELECT + job + FROM swift_jobs.jobs + WHERE id = \(jobID) AND queue_name = \(configuration.queueName) + """, logger: self.logger ) guard let row = try await stream2.first(where: { _ in true }) else { logger.info( "Failed to find job with id", metadata: [ - "JobID": "\(jobID)" + "JobID": "\(jobID)", + "Queue": "\(configuration.queueName)", ] ) // if failed to find the job in the job table return error @@ -303,7 +309,8 @@ public final class PostgresJobQueue: JobQueueDriver { logger.info( "Failed to get job from queue", metadata: [ - "error": "\(String(reflecting: error))" + "Error": "\(String(reflecting: error))", + "Queue": "\(configuration.queueName)", ] ) throw error @@ -311,7 +318,8 @@ public final class PostgresJobQueue: JobQueueDriver { logger.info( "Job failed", metadata: [ - "error": "\(String(reflecting: error))" + "Error": "\(String(reflecting: error))", + "Queue": "\(configuration.queueName)", ] ) throw error @@ -335,7 +343,7 @@ public final class PostgresJobQueue: JobQueueDriver { SET job = \(buffer), last_modified = \(Date.now), status = \(Status.failed) - WHERE id = \(id) + WHERE id = \(id) AND queue_name = \(configuration.queueName) """, logger: self.logger ) @@ -343,7 +351,10 @@ public final class PostgresJobQueue: JobQueueDriver { func delete(jobID: JobID) async throws { try await self.client.query( - "DELETE FROM swift_jobs.jobs WHERE id = \(jobID)", + """ + DELETE FROM swift_jobs.jobs + WHERE id = \(jobID) AND queue_name = \(configuration.queueName) + """, logger: self.logger ) } @@ -362,21 +373,37 @@ public final class PostgresJobQueue: JobQueueDriver { func setStatus(jobID: JobID, status: Status, connection: PostgresConnection) async throws { try await connection.query( - "UPDATE swift_jobs.jobs SET status = \(status), last_modified = \(Date.now) WHERE id = \(jobID)", + """ + UPDATE swift_jobs.jobs + SET status = \(status), + last_modified = \(Date.now) + WHERE id = \(jobID) AND queue_name = \(configuration.queueName) + """, logger: self.logger ) } func setStatus(jobID: JobID, status: Status) async throws { try await self.client.query( - "UPDATE swift_jobs.jobs SET status = \(status), last_modified = \(Date.now) WHERE id = \(jobID)", + """ + UPDATE swift_jobs.jobs + SET status = \(status), + last_modified = \(Date.now) + WHERE id = \(jobID) AND queue_name = \(configuration.queueName) + """, logger: self.logger ) } func getJobs(withStatus status: Status) async throws -> [(id: JobID, queue: String)] { let stream = try await self.client.query( - "SELECT id FROM swift_jobs.jobs WHERE status = \(status) AND queue_name = \(configuration.queueName)", + """ + SELECT + id, + queue_name + FROM swift_jobs.jobs + WHERE status = \(status) AND queue_name = \(configuration.queueName) + """, logger: self.logger ) var jobs: [(id: JobID, queue: String)] = [] @@ -390,7 +417,10 @@ public final class PostgresJobQueue: JobQueueDriver { switch onInit { case .remove: try await connection.query( - "DELETE FROM swift_jobs.jobs WHERE status = \(status) ", + """ + DELETE FROM swift_jobs.jobs + WHERE status = \(status) AND queue_name = \(configuration.queueName) + """, logger: self.logger ) diff --git a/Tests/JobsPostgresTests/JobsTests.swift b/Tests/JobsPostgresTests/JobsTests.swift index 1c4aeb5..2318a34 100644 --- a/Tests/JobsPostgresTests/JobsTests.swift +++ b/Tests/JobsPostgresTests/JobsTests.swift @@ -436,6 +436,9 @@ final class JobsTests: XCTestCase { .postgres( client: postgresClient, migrations: postgresMigrations2, + configuration: .init( + queueName: "job_queue_2" + ), logger: logger ), numWorkers: 2, @@ -460,7 +463,8 @@ final class JobsTests: XCTestCase { try await jobQueue.queue.cleanup(failedJobs: .remove, processingJobs: .remove) try await jobQueue2.queue.cleanup(failedJobs: .remove, processingJobs: .remove) do { - for i in 0..<200 { + for i in 0..<100 { + try await jobQueue2.push(id: jobIdentifer, parameters: i) try await jobQueue.push(id: jobIdentifer, parameters: i) } await self.wait(for: [expectation], timeout: 5) From 18ae25e0906d53f3b7b1c04bab95f1843e7164bc Mon Sep 17 00:00:00 2001 From: Stevenson Michel <130018170+thoven87@users.noreply.github.com> Date: Mon, 24 Feb 2025 16:01:46 -0500 Subject: [PATCH 7/9] Deleting unused migrations --- .../Migrations/CreateJobDelay.swift | 38 ------------- .../Migrations/CreateJobQueue.swift | 48 ---------------- .../Migrations/CreateJobQueueMetadata.swift | 41 -------------- .../JobsPostgres/Migrations/CreateJobs.swift | 55 ------------------- .../Migrations/UpdateJobDelay.swift | 41 -------------- 5 files changed, 223 deletions(-) delete mode 100644 Sources/JobsPostgres/Migrations/CreateJobDelay.swift delete mode 100644 Sources/JobsPostgres/Migrations/CreateJobQueue.swift delete mode 100644 Sources/JobsPostgres/Migrations/CreateJobQueueMetadata.swift delete mode 100644 Sources/JobsPostgres/Migrations/CreateJobs.swift delete mode 100644 Sources/JobsPostgres/Migrations/UpdateJobDelay.swift diff --git a/Sources/JobsPostgres/Migrations/CreateJobDelay.swift b/Sources/JobsPostgres/Migrations/CreateJobDelay.swift deleted file mode 100644 index aff928c..0000000 --- a/Sources/JobsPostgres/Migrations/CreateJobDelay.swift +++ /dev/null @@ -1,38 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Hummingbird server framework project -// -// Copyright (c) 2024 the Hummingbird authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import Logging -import PostgresMigrations -import PostgresNIO - -struct CreateJobDelay: DatabaseMigration { - func apply(connection: PostgresConnection, logger: Logger) async throws { - try await connection.query( - """ - ALTER TABLE _hb_pg_job_queue ADD COLUMN IF NOT EXISTS delayed_until TIMESTAMP WITH TIME ZONE - """, - logger: logger - ) - } - - func revert(connection: PostgresConnection, logger: Logger) async throws { - try await connection.query( - "ALTER TABLE _hb_pg_job_queue DROP COLUMN delayed_until", - logger: logger - ) - } - - var name: String { "_Create_JobQueueDelay_Table_" } - var group: DatabaseMigrationGroup { .jobQueue } -} diff --git a/Sources/JobsPostgres/Migrations/CreateJobQueue.swift b/Sources/JobsPostgres/Migrations/CreateJobQueue.swift deleted file mode 100644 index c64f958..0000000 --- a/Sources/JobsPostgres/Migrations/CreateJobQueue.swift +++ /dev/null @@ -1,48 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Hummingbird server framework project -// -// Copyright (c) 2024 the Hummingbird authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import Logging -import PostgresMigrations -import PostgresNIO - -struct CreateJobQueue: DatabaseMigration { - func apply(connection: PostgresConnection, logger: Logger) async throws { - try await connection.query( - """ - CREATE TABLE IF NOT EXISTS _hb_pg_job_queue ( - job_id uuid PRIMARY KEY, - createdAt timestamp with time zone - ) - """, - logger: logger - ) - try await connection.query( - """ - CREATE INDEX IF NOT EXISTS _hb_job_queueidx - ON _hb_pg_job_queue(createdAt ASC) - """, - logger: logger - ) - } - - func revert(connection: PostgresConnection, logger: Logger) async throws { - try await connection.query( - "DROP TABLE _hb_pg_job_queue", - logger: logger - ) - } - - var name: String { "_Create_JobQueue_Table_" } - var group: DatabaseMigrationGroup { .jobQueue } -} diff --git a/Sources/JobsPostgres/Migrations/CreateJobQueueMetadata.swift b/Sources/JobsPostgres/Migrations/CreateJobQueueMetadata.swift deleted file mode 100644 index 7c7c2fe..0000000 --- a/Sources/JobsPostgres/Migrations/CreateJobQueueMetadata.swift +++ /dev/null @@ -1,41 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Hummingbird server framework project -// -// Copyright (c) 2024 the Hummingbird authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import Logging -import PostgresMigrations -import PostgresNIO - -struct CreateJobQueueMetadata: DatabaseMigration { - func apply(connection: PostgresConnection, logger: Logger) async throws { - try await connection.query( - """ - CREATE TABLE IF NOT EXISTS _hb_pg_job_queue_metadata ( - key text PRIMARY KEY, - value bytea - ) - """, - logger: logger - ) - } - - func revert(connection: PostgresConnection, logger: Logger) async throws { - try await connection.query( - "DROP TABLE _hb_pg_job_queue_metadata", - logger: logger - ) - } - - var name: String { "_Create_JobQueue_Metadata_Table_" } - var group: DatabaseMigrationGroup { .jobQueue } -} diff --git a/Sources/JobsPostgres/Migrations/CreateJobs.swift b/Sources/JobsPostgres/Migrations/CreateJobs.swift deleted file mode 100644 index a900263..0000000 --- a/Sources/JobsPostgres/Migrations/CreateJobs.swift +++ /dev/null @@ -1,55 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Hummingbird server framework project -// -// Copyright (c) 2024 the Hummingbird authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import Logging -import PostgresMigrations -import PostgresNIO - -struct CreateJobs: DatabaseMigration { - func apply(connection: PostgresConnection, logger: Logger) async throws { - try await connection.query( - """ - CREATE TABLE IF NOT EXISTS _hb_pg_jobs ( - id uuid PRIMARY KEY, - job bytea, - status smallint, - lastModified TIMESTAMPTZ DEFAULT NOW() - ) - """, - logger: logger - ) - try await connection.query( - """ - CREATE INDEX IF NOT EXISTS _hb_job_status - ON _hb_pg_jobs(status) - """, - logger: logger - ) - } - - func revert(connection: PostgresConnection, logger: Logger) async throws { - try await connection.query( - "DROP TABLE _hb_pg_jobs", - logger: logger - ) - } - - var name: String { "_Create_Jobs_Table_" } - var group: DatabaseMigrationGroup { .jobQueue } -} - -extension DatabaseMigrationGroup { - /// JobQueue migration group - public static var jobQueue: Self { .init("_hb_jobqueue") } -} diff --git a/Sources/JobsPostgres/Migrations/UpdateJobDelay.swift b/Sources/JobsPostgres/Migrations/UpdateJobDelay.swift deleted file mode 100644 index 9c8bde4..0000000 --- a/Sources/JobsPostgres/Migrations/UpdateJobDelay.swift +++ /dev/null @@ -1,41 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Hummingbird server framework project -// -// Copyright (c) 2024 the Hummingbird authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import Logging -import PostgresMigrations -import PostgresNIO - -struct UpdateJobDelay: DatabaseMigration { - func apply(connection: PostgresConnection, logger: Logger) async throws { - try await connection.query( - """ - ALTER TABLE _hb_pg_job_queue ALTER COLUMN delayed_until TYPE TIMESTAMPTZ USING COALESCE(delayed_until, NOW()), - ALTER COLUMN delayed_until SET DEFAULT NOW(), - ALTER COLUMN delayed_until SET NOT NULL - - """, - logger: logger - ) - } - - func revert(connection: PostgresConnection, logger: Logger) async throws { - try await connection.query( - "ALTER TABLE _hb_pg_job_queue ALTER COLUMN delayed_until DROP NOT NULL", - logger: logger - ) - } - - var name: String { "_Update_JobQueueDelay_Table_" } - var group: DatabaseMigrationGroup { .jobQueue } -} From 34a68447ce4ee667c001d7c010fc47f5e4921c54 Mon Sep 17 00:00:00 2001 From: Stevenson Michel <130018170+thoven87@users.noreply.github.com> Date: Mon, 24 Feb 2025 16:02:13 -0500 Subject: [PATCH 8/9] adding migration group --- .../Migrations/Database+Migrations.swift | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 Sources/JobsPostgres/Migrations/Database+Migrations.swift diff --git a/Sources/JobsPostgres/Migrations/Database+Migrations.swift b/Sources/JobsPostgres/Migrations/Database+Migrations.swift new file mode 100644 index 0000000..11577d5 --- /dev/null +++ b/Sources/JobsPostgres/Migrations/Database+Migrations.swift @@ -0,0 +1,20 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2024 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import PostgresMigrations + +extension DatabaseMigrationGroup { + /// JobQueue migration group + public static var jobQueue: Self { .init("_hb_jobqueue") } +} From 74e59893327bc8ae6bf7e1c658c52f8786017461 Mon Sep 17 00:00:00 2001 From: Stevenson Michel <130018170+thoven87@users.noreply.github.com> Date: Tue, 25 Feb 2025 07:02:01 -0500 Subject: [PATCH 9/9] PR feedback --- Sources/JobsPostgres/PostgresJobsQueue.swift | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/Sources/JobsPostgres/PostgresJobsQueue.swift b/Sources/JobsPostgres/PostgresJobsQueue.swift index cc309da..2b0ee28 100644 --- a/Sources/JobsPostgres/PostgresJobsQueue.swift +++ b/Sources/JobsPostgres/PostgresJobsQueue.swift @@ -395,20 +395,19 @@ public final class PostgresJobQueue: JobQueueDriver { ) } - func getJobs(withStatus status: Status) async throws -> [(id: JobID, queue: String)] { + func getJobs(withStatus status: Status) async throws -> [JobID] { let stream = try await self.client.query( """ SELECT - id, - queue_name + id FROM swift_jobs.jobs WHERE status = \(status) AND queue_name = \(configuration.queueName) """, logger: self.logger ) - var jobs: [(id: JobID, queue: String)] = [] - for try await (id, queue) in stream.decode((JobID, String).self, context: .default) { - jobs.append((id: id, queue: queue)) + var jobs: [JobID] = [] + for try await id in stream.decode(JobID.self, context: .default) { + jobs.append(id) } return jobs } @@ -427,8 +426,8 @@ public final class PostgresJobQueue: JobQueueDriver { case .rerun: let jobs = try await getJobs(withStatus: status) self.logger.info("Moving \(jobs.count) jobs with status: \(status) to job queue") - for job in jobs { - try await self.addToQueue(jobID: job.id, queueName: job.queue, delayUntil: Date.now, connection: connection) + for jobID in jobs { + try await self.addToQueue(jobID: jobID, queueName: configuration.queueName, delayUntil: Date.now, connection: connection) } case .doNothing: