Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ let package = Package(
],
dependencies: [
// TODO: use a released version of swift-jobs
.package(url: "https://github.com/hummingbird-project/swift-jobs.git", branch: "main"),
.package(url: "https://github.com/hummingbird-project/swift-jobs.git", branch: "lock-schedule"),
.package(url: "https://github.com/hummingbird-project/postgres-migrations.git", from: "0.1.0"),
.package(url: "https://github.com/vapor/postgres-nio.git", from: "1.25.0"),
],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Hummingbird server framework project
//
// Copyright (c) 2025 the Hummingbird authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Logging
import PostgresMigrations
import PostgresNIO

struct CreateJobMetadataMigration: DatabaseMigration {

func apply(connection: PostgresNIO.PostgresConnection, logger: Logging.Logger) async throws {
try await connection.query(
"""
CREATE TABLE IF NOT EXISTS swift_jobs.metadata(
key TEXT NOT NULL,
value BYTEA NOT NULL,
expires TIMESTAMPTZ NOT NULL DEFAULT 'infinity',
queue_name TEXT NOT NULL DEFAULT 'default',
CONSTRAINT key_queue_name PRIMARY KEY (key, queue_name)
)
""",
logger: logger
)
}

func revert(connection: PostgresNIO.PostgresConnection, logger: Logging.Logger) async throws {
try await connection.query(
"""
DROP TABLE swift_jobs.metadata
""",
logger: logger
)
}

var description: String { "__JobMetadataMigration__" }

Check warning on line 45 in Sources/JobsPostgres/Migrations/CreateJobMetadataMigration.swift

View check run for this annotation

Codecov / codecov/patch

Sources/JobsPostgres/Migrations/CreateJobMetadataMigration.swift#L45

Added line #L45 was not covered by tests
var group: DatabaseMigrationGroup { .jobQueue }
}
19 changes: 0 additions & 19 deletions Sources/JobsPostgres/Migrations/CreateSwiftJobsMigrations.swift
Original file line number Diff line number Diff line change
Expand Up @@ -55,25 +55,6 @@ struct CreateSwiftJobsMigrations: DatabaseMigration {
""",
logger: logger
)

try await connection.query(
"""
CREATE TABLE IF NOT EXISTS swift_jobs.queues_metadata(
key TEXT PRIMARY KEY,
value BYTEA NOT NULL,
queue_name TEXT NOT NULL DEFAULT 'default'
)
""",
logger: logger
)

try await connection.query(
"""
CREATE INDEX IF NOT EXISTS queues_metadata_key_queue_name_idx
ON swift_jobs.queues_metadata(key, queue_name)
""",
logger: logger
)
}

func revert(connection: PostgresNIO.PostgresConnection, logger: Logging.Logger) async throws {
Expand Down
103 changes: 77 additions & 26 deletions Sources/JobsPostgres/PostgresJobsQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
/// try await migrations.apply(client: postgresClient, logger: logger, dryRun: applyMigrations)
/// }
/// ```
public final class PostgresJobQueue: JobQueueDriver, JobMetadataDriver, CancellableJobQueue, ResumableJobQueue {
public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, ResumableJobQueue {

public typealias JobID = UUID

Expand Down Expand Up @@ -158,6 +158,7 @@
self.isStopped = .init(false)
self.migrations = migrations
await migrations.add(CreateSwiftJobsMigrations(), skipDuplicates: true)
await migrations.add(CreateJobMetadataMigration(), skipDuplicates: true)
self.registerCleanupJob()
}

Expand Down Expand Up @@ -290,31 +291,6 @@
/// shutdown queue once all active jobs have been processed
public func shutdownGracefully() async {}

@inlinable
public func getMetadata(_ key: String) async throws -> ByteBuffer? {
let stream = try await self.client.query(
"SELECT value FROM swift_jobs.queues_metadata WHERE key = \(key) AND queue_name = \(configuration.queueName)",
logger: self.logger
)
for try await value in stream.decode(ByteBuffer.self) {
return value
}
return nil
}

@inlinable
public func setMetadata(key: String, value: ByteBuffer) async throws {
try await self.client.query(
"""
INSERT INTO swift_jobs.queues_metadata (key, value, queue_name)
VALUES (\(key), \(value), \(configuration.queueName))
ON CONFLICT (key)
DO UPDATE SET value = \(value)
""",
logger: self.logger
)
}

@usableFromInline
func popFirst() async throws -> JobQueueResult<JobID>? {
enum PopFirstResult {
Expand Down Expand Up @@ -575,6 +551,81 @@
}
}

extension PostgresJobQueue: JobMetadataDriver {
@inlinable
public func getMetadata(_ key: String) async throws -> ByteBuffer? {
let stream = try await self.client.query(
"SELECT value FROM swift_jobs.metadata WHERE key = \(key) AND queue_name = \(self.configuration.queueName)",
logger: self.logger
)
for try await value in stream.decode(ByteBuffer.self) {
return value
}
return nil

Check warning on line 564 in Sources/JobsPostgres/PostgresJobsQueue.swift

View check run for this annotation

Codecov / codecov/patch

Sources/JobsPostgres/PostgresJobsQueue.swift#L564

Added line #L564 was not covered by tests
}

@inlinable
public func setMetadata(key: String, value: ByteBuffer) async throws {
try await self.client.query(
"""
INSERT INTO swift_jobs.metadata (key, value, queue_name)
VALUES (\(key), \(value), \(self.configuration.queueName))
ON CONFLICT (key, queue_name)
DO UPDATE SET value = \(value)
""",
logger: self.logger
)
}

/// Acquire metadata lock
///
/// - Parameters:
/// - key: Metadata key
/// - id: Lock identifier
/// - expiresIn: When lock will expire
/// - Returns: If lock was acquired
@inlinable
public func acquireLock(key: String, id: ByteBuffer, expiresIn: TimeInterval) async throws -> Bool {
let expires = Date.now + expiresIn
// insert key, value, expiration into table. On conflict with key and queue_name only set value and
// expiration if expiration is out of date or value is the same
let stream = try await self.client.query(
"""
INSERT INTO swift_jobs.metadata (key, value, expires, queue_name)
VALUES (\(key), \(id), \(expires), \(self.configuration.queueName))
ON CONFLICT (key, queue_name)
DO UPDATE
SET value = \(id), expires = \(expires)
WHERE swift_jobs.metadata.expires <= now()
OR swift_jobs.metadata.value = \(id)
RETURNING value
""",
logger: self.logger
)
for try await value in stream.decode(ByteBuffer.self) {
return value == id
}
return false
}

/// Release metadata lock
///
/// - Parameters:
/// - key: Metadata key
/// - id: Lock identifier
@inlinable
public func releaseLock(key: String, id: ByteBuffer) async throws {
_ = try await self.client.query(
"""
DELETE FROM swift_jobs.metadata
WHERE key = \(key)
AND value = \(id)
AND queue_name = \(self.configuration.queueName)
"""
)
}
}

extension JobQueueDriver where Self == PostgresJobQueue {
/// Return Postgres driver for Job Queue
/// - Parameters:
Expand Down
64 changes: 64 additions & 0 deletions Tests/JobsPostgresTests/JobsTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,19 @@ final class JobsTests: XCTestCase {
}
}

func testMultipleQueueMetadata() async throws {
try await self.testJobQueue(numWorkers: 1, configuration: .init(queueName: "queue1")) { jobQueue1 in
try await self.testJobQueue(numWorkers: 1, configuration: .init(queueName: "queue2")) { jobQueue2 in
try await jobQueue1.queue.setMetadata(key: "test", value: .init(string: "queue1"))
try await jobQueue2.queue.setMetadata(key: "test", value: .init(string: "queue2"))
let value1 = try await jobQueue1.queue.getMetadata("test")
let value2 = try await jobQueue2.queue.getMetadata("test")
XCTAssertEqual(value1.map { String(buffer: $0) }, "queue1")
XCTAssertEqual(value2.map { String(buffer: $0) }, "queue2")
}
}
}

func testResumableAndPausableJobs() async throws {
struct TestParameters: JobParameters {
static let jobName = "TestJob"
Expand Down Expand Up @@ -993,4 +1006,55 @@ final class JobsTests: XCTestCase {
}
}
}

func testMetadataLock() async throws {
try await self.testJobQueue(numWorkers: 1) { jobQueue in
// 1 - acquire lock
var result = try await jobQueue.queue.acquireLock(key: "lock", id: .init(string: "one"), expiresIn: 10)
XCTAssertTrue(result)
// 2 - check I can acquire lock once I already have the lock
result = try await jobQueue.queue.acquireLock(key: "lock", id: .init(string: "one"), expiresIn: 10)
XCTAssertTrue(result)
// 3 - check I cannot acquire lock if a different identifer has it
result = try await jobQueue.queue.acquireLock(key: "lock", id: .init(string: "two"), expiresIn: 10)
XCTAssertFalse(result)
// 4 - release lock with identifier that doesn own it
try await jobQueue.queue.releaseLock(key: "lock", id: .init(string: "two"))
// 5 - check I still cannot acquire lock
result = try await jobQueue.queue.acquireLock(key: "lock", id: .init(string: "two"), expiresIn: 10)
XCTAssertFalse(result)
// 6 - release lock
try await jobQueue.queue.releaseLock(key: "lock", id: .init(string: "one"))
// 7 - check I can acquire lock after it has been released
result = try await jobQueue.queue.acquireLock(key: "lock", id: .init(string: "two"), expiresIn: 1)
XCTAssertTrue(result)
// 8 - check I can acquire lock after it has expired
try await Task.sleep(for: .seconds(1.5))
result = try await jobQueue.queue.acquireLock(key: "lock", id: .init(string: "one"), expiresIn: 10)
XCTAssertTrue(result)
// 9 - release lock
try await jobQueue.queue.releaseLock(key: "lock", id: .init(string: "one"))
}
}

func testMultipleQueueMetadataLock() async throws {
try await self.testJobQueue(numWorkers: 1, configuration: .init(queueName: "queue1")) { jobQueue1 in
try await self.testJobQueue(numWorkers: 1, configuration: .init(queueName: "queue2")) { jobQueue2 in
let result1 = try await jobQueue1.queue.acquireLock(
key: "testMultipleQueueMetadataLock",
id: .init(string: "queue1"),
expiresIn: 60
)
let result2 = try await jobQueue2.queue.acquireLock(
key: "testMultipleQueueMetadataLock",
id: .init(string: "queue2"),
expiresIn: 60
)
XCTAssert(result1)
XCTAssert(result2)
try await jobQueue1.queue.releaseLock(key: "testMultipleQueueMetadataLock", id: .init(string: "queue1"))
try await jobQueue2.queue.releaseLock(key: "testMultipleQueueMetadataLock", id: .init(string: "queue2"))
}
}
}
}
Loading