Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ jobs:
strategy:
matrix:
image:
- 'swift:5.10'
- 'swift:6.0'
- 'swift:6.1'
- 'swiftlang/swift:nightly-6.2-noble'
postgres-image:
- 'postgres:17'
- 'postgres:16'
Expand Down
4 changes: 2 additions & 2 deletions Package.swift
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
// swift-tools-version: 5.10
// swift-tools-version: 6.0

import PackageDescription

let package = Package(
name: "swift-jobs-postgres",
platforms: [.macOS(.v14), .iOS(.v17), .tvOS(.v17)],
platforms: [.macOS(.v15), .iOS(.v18), .tvOS(.v18)],
products: [
.library(name: "JobsPostgres", targets: ["JobsPostgres"])
],
Expand Down
1 change: 0 additions & 1 deletion Sources/JobsPostgres/PostgresJobsQueue+cleanup.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import Foundation
import Jobs
import Logging
import NIOConcurrencyHelpers
import NIOCore
import PostgresMigrations
import PostgresNIO
Expand Down
16 changes: 10 additions & 6 deletions Sources/JobsPostgres/PostgresJobsQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
import Foundation
import Jobs
import Logging
import NIOConcurrencyHelpers
import NIOCore
import PostgresMigrations
import PostgresNIO
import Synchronization

/// Postgres Job queue implementation
///
Expand Down Expand Up @@ -147,7 +147,7 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
public let logger: Logger
let migrations: DatabaseMigrations
@usableFromInline
let isStopped: NIOLockedValueBox<Bool>
let isStopped: Mutex<Bool>

/// Initialize a PostgresJobQueue
public init(client: PostgresClient, migrations: DatabaseMigrations, configuration: Configuration = .init(), logger: Logger) async {
Expand All @@ -157,9 +157,8 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
self.logger = logger
self.isStopped = .init(false)
self.migrations = migrations
await migrations.add(CreateSwiftJobsMigrations(), skipDuplicates: true)
await migrations.add(CreateJobMetadataMigration(), skipDuplicates: true)
self.registerCleanupJob()
await Self.addMigrations(to: self.migrations)
}

public func waitUntilReady() async throws {
Expand All @@ -168,6 +167,11 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma
try await self.migrations.waitUntilCompleted()
}

package static func addMigrations(to migrations: DatabaseMigrations) async {
await migrations.add(CreateSwiftJobsMigrations(), skipDuplicates: true)
await migrations.add(CreateJobMetadataMigration(), skipDuplicates: true)
}

/// Cancel job
///
/// This function is used to cancel a job. Job cancellation is not gaurenteed howerever.
Expand Down Expand Up @@ -285,7 +289,7 @@ public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, Resuma

/// stop serving jobs
public func stop() async {
self.isStopped.withLockedValue { $0 = true }
self.isStopped.withLock { $0 = true }
}

/// shutdown queue once all active jobs have been processed
Expand Down Expand Up @@ -533,7 +537,7 @@ extension PostgresJobQueue {
@inlinable
public func next() async throws -> Element? {
while true {
if self.queue.isStopped.withLockedValue({ $0 }) {
if self.queue.isStopped.withLock({ $0 }) {
return nil
}

Expand Down
Loading
Loading