diff --git a/Sources/JobsPostgres/Migrations/CreateJobDelay.swift b/Sources/JobsPostgres/Migrations/CreateJobDelay.swift deleted file mode 100644 index aff928c..0000000 --- a/Sources/JobsPostgres/Migrations/CreateJobDelay.swift +++ /dev/null @@ -1,38 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Hummingbird server framework project -// -// Copyright (c) 2024 the Hummingbird authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import Logging -import PostgresMigrations -import PostgresNIO - -struct CreateJobDelay: DatabaseMigration { - func apply(connection: PostgresConnection, logger: Logger) async throws { - try await connection.query( - """ - ALTER TABLE _hb_pg_job_queue ADD COLUMN IF NOT EXISTS delayed_until TIMESTAMP WITH TIME ZONE - """, - logger: logger - ) - } - - func revert(connection: PostgresConnection, logger: Logger) async throws { - try await connection.query( - "ALTER TABLE _hb_pg_job_queue DROP COLUMN delayed_until", - logger: logger - ) - } - - var name: String { "_Create_JobQueueDelay_Table_" } - var group: DatabaseMigrationGroup { .jobQueue } -} diff --git a/Sources/JobsPostgres/Migrations/CreateJobQueue.swift b/Sources/JobsPostgres/Migrations/CreateJobQueue.swift deleted file mode 100644 index c64f958..0000000 --- a/Sources/JobsPostgres/Migrations/CreateJobQueue.swift +++ /dev/null @@ -1,48 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Hummingbird server framework project -// -// Copyright (c) 2024 the Hummingbird authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import Logging -import PostgresMigrations -import PostgresNIO - -struct CreateJobQueue: DatabaseMigration { - func apply(connection: PostgresConnection, logger: Logger) async throws { - try await connection.query( - """ - CREATE TABLE IF NOT EXISTS _hb_pg_job_queue ( - job_id uuid PRIMARY KEY, - createdAt timestamp with time zone - ) - """, - logger: logger - ) - try await connection.query( - """ - CREATE INDEX IF NOT EXISTS _hb_job_queueidx - ON _hb_pg_job_queue(createdAt ASC) - """, - logger: logger - ) - } - - func revert(connection: PostgresConnection, logger: Logger) async throws { - try await connection.query( - "DROP TABLE _hb_pg_job_queue", - logger: logger - ) - } - - var name: String { "_Create_JobQueue_Table_" } - var group: DatabaseMigrationGroup { .jobQueue } -} diff --git a/Sources/JobsPostgres/Migrations/CreateJobQueueMetadata.swift b/Sources/JobsPostgres/Migrations/CreateJobQueueMetadata.swift deleted file mode 100644 index 7c7c2fe..0000000 --- a/Sources/JobsPostgres/Migrations/CreateJobQueueMetadata.swift +++ /dev/null @@ -1,41 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Hummingbird server framework project -// -// Copyright (c) 2024 the Hummingbird authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import Logging -import PostgresMigrations -import PostgresNIO - -struct CreateJobQueueMetadata: DatabaseMigration { - func apply(connection: PostgresConnection, logger: Logger) async throws { - try await connection.query( - """ - CREATE TABLE IF NOT EXISTS _hb_pg_job_queue_metadata ( - key text PRIMARY KEY, - value bytea - ) - """, - logger: logger - ) - } - - func revert(connection: PostgresConnection, logger: Logger) async throws { - try await connection.query( - "DROP TABLE _hb_pg_job_queue_metadata", - logger: logger - ) - } - - var name: String { "_Create_JobQueue_Metadata_Table_" } - var group: DatabaseMigrationGroup { .jobQueue } -} diff --git a/Sources/JobsPostgres/Migrations/CreateJobs.swift b/Sources/JobsPostgres/Migrations/CreateJobs.swift deleted file mode 100644 index a900263..0000000 --- a/Sources/JobsPostgres/Migrations/CreateJobs.swift +++ /dev/null @@ -1,55 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Hummingbird server framework project -// -// Copyright (c) 2024 the Hummingbird authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import Logging -import PostgresMigrations -import PostgresNIO - -struct CreateJobs: DatabaseMigration { - func apply(connection: PostgresConnection, logger: Logger) async throws { - try await connection.query( - """ - CREATE TABLE IF NOT EXISTS _hb_pg_jobs ( - id uuid PRIMARY KEY, - job bytea, - status smallint, - lastModified TIMESTAMPTZ DEFAULT NOW() - ) - """, - logger: logger - ) - try await connection.query( - """ - CREATE INDEX IF NOT EXISTS _hb_job_status - ON _hb_pg_jobs(status) - """, - logger: logger - ) - } - - func revert(connection: PostgresConnection, logger: Logger) async throws { - try await connection.query( - "DROP TABLE _hb_pg_jobs", - logger: logger - ) - } - - var name: String { "_Create_Jobs_Table_" } - var group: DatabaseMigrationGroup { .jobQueue } -} - -extension DatabaseMigrationGroup { - /// JobQueue migration group - public static var jobQueue: Self { .init("_hb_jobqueue") } -} diff --git a/Sources/JobsPostgres/Migrations/CreateSwiftJobsMigrations.swift b/Sources/JobsPostgres/Migrations/CreateSwiftJobsMigrations.swift new file mode 100644 index 0000000..0192754 --- /dev/null +++ b/Sources/JobsPostgres/Migrations/CreateSwiftJobsMigrations.swift @@ -0,0 +1,89 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2024 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Logging +import PostgresMigrations +import PostgresNIO + +struct CreateSwiftJobsMigrations: DatabaseMigration { + + func apply(connection: PostgresNIO.PostgresConnection, logger: Logging.Logger) async throws { + + try await connection.query("CREATE SCHEMA IF NOT EXISTS swift_jobs;", logger: logger) + + try await connection.query( + """ + CREATE TABLE IF NOT EXISTS swift_jobs.jobs ( + id uuid PRIMARY KEY, + job bytea NOT NULL, + last_modified TIMESTAMPTZ NOT NULL DEFAULT now(), + queue_name TEXT NOT NULL DEFAULT 'default', + status smallint NOT NULL + ); + """, + logger: logger + ) + + try await connection.query( + """ + CREATE TABLE IF NOT EXISTS swift_jobs.queues( + job_id uuid PRIMARY KEY, + created_at TIMESTAMPTZ NOT NULL, + delayed_until TIMESTAMPTZ NOT NULL DEFAULT now(), + queue_name TEXT NOT NULL DEFAULT 'default' + ); + """, + logger: logger + ) + + try await connection.query( + """ + CREATE INDEX CONCURRENTLY IF NOT EXISTS queues_delayed_until_queue_name_idx + ON swift_jobs.queues(delayed_until, queue_name) + """, + logger: logger + ) + + try await connection.query( + """ + CREATE TABLE IF NOT EXISTS swift_jobs.queues_metadata( + key text PRIMARY KEY, + value bytea NOT NULL, + queue_name TEXT NOT NULL DEFAULT 'default' + ) + """, + logger: logger + ) + + try await connection.query( + """ + CREATE INDEX CONCURRENTLY IF NOT EXISTS queues_metadata_key_queue_name_idx + ON swift_jobs.queues_metadata(key, queue_name) + """, + logger: logger + ) + } + + func revert(connection: PostgresNIO.PostgresConnection, logger: Logging.Logger) async throws { + try await connection.query( + """ + DROP SCHEMA swift_jobs CASCADE; + """, + logger: logger + ) + } + + var description: String { "__CreateSwiftJobsMigrations__" } + var group: DatabaseMigrationGroup { .jobQueue } +} diff --git a/Sources/JobsPostgres/Migrations/Database+Migrations.swift b/Sources/JobsPostgres/Migrations/Database+Migrations.swift new file mode 100644 index 0000000..11577d5 --- /dev/null +++ b/Sources/JobsPostgres/Migrations/Database+Migrations.swift @@ -0,0 +1,20 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2024 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import PostgresMigrations + +extension DatabaseMigrationGroup { + /// JobQueue migration group + public static var jobQueue: Self { .init("_hb_jobqueue") } +} diff --git a/Sources/JobsPostgres/Migrations/UpdateJobDelay.swift b/Sources/JobsPostgres/Migrations/UpdateJobDelay.swift deleted file mode 100644 index 9c8bde4..0000000 --- a/Sources/JobsPostgres/Migrations/UpdateJobDelay.swift +++ /dev/null @@ -1,41 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Hummingbird server framework project -// -// Copyright (c) 2024 the Hummingbird authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import Logging -import PostgresMigrations -import PostgresNIO - -struct UpdateJobDelay: DatabaseMigration { - func apply(connection: PostgresConnection, logger: Logger) async throws { - try await connection.query( - """ - ALTER TABLE _hb_pg_job_queue ALTER COLUMN delayed_until TYPE TIMESTAMPTZ USING COALESCE(delayed_until, NOW()), - ALTER COLUMN delayed_until SET DEFAULT NOW(), - ALTER COLUMN delayed_until SET NOT NULL - - """, - logger: logger - ) - } - - func revert(connection: PostgresConnection, logger: Logger) async throws { - try await connection.query( - "ALTER TABLE _hb_pg_job_queue ALTER COLUMN delayed_until DROP NOT NULL", - logger: logger - ) - } - - var name: String { "_Update_JobQueueDelay_Table_" } - var group: DatabaseMigrationGroup { .jobQueue } -} diff --git a/Sources/JobsPostgres/PostgresJobsQueue.swift b/Sources/JobsPostgres/PostgresJobsQueue.swift index 869f06a..2b0ee28 100644 --- a/Sources/JobsPostgres/PostgresJobsQueue.swift +++ b/Sources/JobsPostgres/PostgresJobsQueue.swift @@ -44,7 +44,6 @@ import PostgresNIO /// ``` public final class PostgresJobQueue: JobQueueDriver { public typealias JobID = UUID - /// what to do with failed/processing jobs from last time queue was handled public enum JobCleanup: Sendable { case doNothing @@ -75,13 +74,17 @@ public final class PostgresJobQueue: JobQueueDriver { public struct Configuration: Sendable { /// Queue poll time to wait if queue empties let pollTime: Duration + /// Which Queue to push jobs into + let queueName: String /// Initialize configuration /// - Parameter pollTime: Queue poll time to wait if queue empties public init( - pollTime: Duration = .milliseconds(100) + pollTime: Duration = .milliseconds(100), + queueName: String = "default" ) { self.pollTime = pollTime + self.queueName = queueName } } @@ -103,11 +106,7 @@ public final class PostgresJobQueue: JobQueueDriver { self.logger = logger self.isStopped = .init(false) self.migrations = migrations - await migrations.add(CreateJobs()) - await migrations.add(CreateJobQueue()) - await migrations.add(CreateJobQueueMetadata()) - await migrations.add(CreateJobDelay()) - await migrations.add(UpdateJobDelay()) + await migrations.add(CreateSwiftJobsMigrations()) } public func onInit() async throws { @@ -156,7 +155,7 @@ public final class PostgresJobQueue: JobQueueDriver { logger.error( "JobQueue initialization failed", metadata: [ - "error": "\(String(reflecting: error))" + "Error": "\(String(reflecting: error))" ] ) throw error @@ -175,8 +174,8 @@ public final class PostgresJobQueue: JobQueueDriver { @discardableResult public func push(_ jobRequest: JobRequest, options: JobOptions) async throws -> JobID { let jobID = JobID() try await self.client.withTransaction(logger: self.logger) { connection in - try await self.add(jobID: jobID, jobRequest: jobRequest, connection: connection) - try await self.addToQueue(jobID: jobID, connection: connection, delayUntil: options.delayUntil) + try await self.add(jobID: jobID, jobRequest: jobRequest, queueName: configuration.queueName, connection: connection) + try await self.addToQueue(jobID: jobID, queueName: configuration.queueName, delayUntil: options.delayUntil, connection: connection) } return jobID } @@ -187,7 +186,7 @@ public final class PostgresJobQueue: JobQueueDriver { let buffer = try self.jobRegistry.encode(jobRequest: jobRequest) try await self.client.withTransaction(logger: self.logger) { connection in try await self.updateJob(id: id, buffer: buffer, connection: connection) - try await self.addToQueue(jobID: id, connection: connection, delayUntil: options.delayUntil) + try await self.addToQueue(jobID: id, queueName: configuration.queueName, delayUntil: options.delayUntil, connection: connection) } } @@ -211,7 +210,7 @@ public final class PostgresJobQueue: JobQueueDriver { public func getMetadata(_ key: String) async throws -> ByteBuffer? { let stream = try await self.client.query( - "SELECT value FROM _hb_pg_job_queue_metadata WHERE key = \(key)", + "SELECT value FROM swift_jobs.queues_metadata WHERE key = \(key) AND queue_name = \(configuration.queueName)", logger: self.logger ) for try await value in stream.decode(ByteBuffer.self) { @@ -223,7 +222,8 @@ public final class PostgresJobQueue: JobQueueDriver { public func setMetadata(key: String, value: ByteBuffer) async throws { try await self.client.query( """ - INSERT INTO _hb_pg_job_queue_metadata (key, value) VALUES (\(key), \(value)) + INSERT INTO swift_jobs.queues_metadata (key, value, queue_name) + VALUES (\(key), \(value), \(configuration.queueName)) ON CONFLICT (key) DO UPDATE SET value = \(value) """, @@ -248,14 +248,15 @@ public final class PostgresJobQueue: JobQueueDriver { WITH next_job AS ( SELECT job_id - FROM _hb_pg_job_queue + FROM swift_jobs.queues WHERE delayed_until <= NOW() - ORDER BY createdAt, delayed_until ASC + AND queue_name = \(configuration.queueName) + ORDER BY created_at, delayed_until ASC FOR UPDATE SKIP LOCKED LIMIT 1 ) DELETE FROM - _hb_pg_job_queue + swift_jobs.queues WHERE job_id = (SELECT job_id FROM next_job) RETURNING job_id """, @@ -270,14 +271,20 @@ public final class PostgresJobQueue: JobQueueDriver { // select job from job table let stream2 = try await connection.query( - "SELECT job FROM _hb_pg_jobs WHERE id = \(jobID)", + """ + SELECT + job + FROM swift_jobs.jobs + WHERE id = \(jobID) AND queue_name = \(configuration.queueName) + """, logger: self.logger ) guard let row = try await stream2.first(where: { _ in true }) else { logger.info( "Failed to find job with id", metadata: [ - "JobID": "\(jobID)" + "JobID": "\(jobID)", + "Queue": "\(configuration.queueName)", ] ) // if failed to find the job in the job table return error @@ -302,7 +309,8 @@ public final class PostgresJobQueue: JobQueueDriver { logger.info( "Failed to get job from queue", metadata: [ - "error": "\(String(reflecting: error))" + "Error": "\(String(reflecting: error))", + "Queue": "\(configuration.queueName)", ] ) throw error @@ -310,18 +318,19 @@ public final class PostgresJobQueue: JobQueueDriver { logger.info( "Job failed", metadata: [ - "error": "\(String(reflecting: error))" + "Error": "\(String(reflecting: error))", + "Queue": "\(configuration.queueName)", ] ) throw error } } - func add(jobID: JobID, jobRequest: JobRequest, connection: PostgresConnection) async throws { + func add(jobID: JobID, jobRequest: JobRequest, queueName: String, connection: PostgresConnection) async throws { try await connection.query( """ - INSERT INTO _hb_pg_jobs (id, job, status) - VALUES (\(jobID), \(jobRequest), \(Status.pending)) + INSERT INTO swift_jobs.jobs (id, job, status, queue_name) + VALUES (\(jobID), \(jobRequest), \(Status.pending), \(queueName)) """, logger: self.logger ) @@ -330,11 +339,11 @@ public final class PostgresJobQueue: JobQueueDriver { func updateJob(id: JobID, buffer: ByteBuffer, connection: PostgresConnection) async throws { try await connection.query( """ - UPDATE _hb_pg_jobs + UPDATE swift_jobs.jobs SET job = \(buffer), - lastModified = \(Date.now), + last_modified = \(Date.now), status = \(Status.failed) - WHERE id = \(id) + WHERE id = \(id) AND queue_name = \(configuration.queueName) """, logger: self.logger ) @@ -342,17 +351,19 @@ public final class PostgresJobQueue: JobQueueDriver { func delete(jobID: JobID) async throws { try await self.client.query( - "DELETE FROM _hb_pg_jobs WHERE id = \(jobID)", + """ + DELETE FROM swift_jobs.jobs + WHERE id = \(jobID) AND queue_name = \(configuration.queueName) + """, logger: self.logger ) } - func addToQueue(jobID: JobID, connection: PostgresConnection, delayUntil: Date) async throws { - // TODO: assign Date.now in swift-jobs options? + func addToQueue(jobID: JobID, queueName: String, delayUntil: Date, connection: PostgresConnection) async throws { try await connection.query( """ - INSERT INTO _hb_pg_job_queue (job_id, createdAt, delayed_until) - VALUES (\(jobID), \(Date.now), \(delayUntil)) + INSERT INTO swift_jobs.queues (job_id, created_at, delayed_until, queue_name) + VALUES (\(jobID), \(Date.now), \(delayUntil), \(queueName)) -- We have found an existing job with the same id, SKIP this INSERT ON CONFLICT (job_id) DO NOTHING """, @@ -362,21 +373,36 @@ public final class PostgresJobQueue: JobQueueDriver { func setStatus(jobID: JobID, status: Status, connection: PostgresConnection) async throws { try await connection.query( - "UPDATE _hb_pg_jobs SET status = \(status), lastModified = \(Date.now) WHERE id = \(jobID)", + """ + UPDATE swift_jobs.jobs + SET status = \(status), + last_modified = \(Date.now) + WHERE id = \(jobID) AND queue_name = \(configuration.queueName) + """, logger: self.logger ) } func setStatus(jobID: JobID, status: Status) async throws { try await self.client.query( - "UPDATE _hb_pg_jobs SET status = \(status), lastModified = \(Date.now) WHERE id = \(jobID)", + """ + UPDATE swift_jobs.jobs + SET status = \(status), + last_modified = \(Date.now) + WHERE id = \(jobID) AND queue_name = \(configuration.queueName) + """, logger: self.logger ) } func getJobs(withStatus status: Status) async throws -> [JobID] { let stream = try await self.client.query( - "SELECT id FROM _hb_pg_jobs WHERE status = \(status) FOR UPDATE SKIP LOCKED", + """ + SELECT + id + FROM swift_jobs.jobs + WHERE status = \(status) AND queue_name = \(configuration.queueName) + """, logger: self.logger ) var jobs: [JobID] = [] @@ -390,7 +416,10 @@ public final class PostgresJobQueue: JobQueueDriver { switch onInit { case .remove: try await connection.query( - "DELETE FROM _hb_pg_jobs WHERE status = \(status) ", + """ + DELETE FROM swift_jobs.jobs + WHERE status = \(status) AND queue_name = \(configuration.queueName) + """, logger: self.logger ) @@ -398,7 +427,7 @@ public final class PostgresJobQueue: JobQueueDriver { let jobs = try await getJobs(withStatus: status) self.logger.info("Moving \(jobs.count) jobs with status: \(status) to job queue") for jobID in jobs { - try await self.addToQueue(jobID: jobID, connection: connection, delayUntil: Date.now) + try await self.addToQueue(jobID: jobID, queueName: configuration.queueName, delayUntil: Date.now, connection: connection) } case .doNothing: diff --git a/Tests/JobsPostgresTests/JobsTests.swift b/Tests/JobsPostgresTests/JobsTests.swift index 1c4aeb5..2318a34 100644 --- a/Tests/JobsPostgresTests/JobsTests.swift +++ b/Tests/JobsPostgresTests/JobsTests.swift @@ -436,6 +436,9 @@ final class JobsTests: XCTestCase { .postgres( client: postgresClient, migrations: postgresMigrations2, + configuration: .init( + queueName: "job_queue_2" + ), logger: logger ), numWorkers: 2, @@ -460,7 +463,8 @@ final class JobsTests: XCTestCase { try await jobQueue.queue.cleanup(failedJobs: .remove, processingJobs: .remove) try await jobQueue2.queue.cleanup(failedJobs: .remove, processingJobs: .remove) do { - for i in 0..<200 { + for i in 0..<100 { + try await jobQueue2.push(id: jobIdentifer, parameters: i) try await jobQueue.push(id: jobIdentifer, parameters: i) } await self.wait(for: [expectation], timeout: 5)