Skip to content

Move job serialisation to driver #16

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Feb 22, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 89 additions & 74 deletions Sources/JobsPostgres/PostgresJobsQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
public init(client: PostgresClient, migrations: DatabaseMigrations, configuration: Configuration = .init(), logger: Logger) async {
self.client = client
self.configuration = configuration
self.jobRegistry = .init()
self.logger = logger
self.isStopped = .init(false)
self.migrations = migrations
Expand Down Expand Up @@ -162,35 +163,43 @@
}
}

/// Register job
/// - Parameters:
/// - job: Job Definition
public func registerJob<Parameters: Codable & Sendable>(_ job: JobDefinition<Parameters>) {
self.jobRegistry.registerJob(job)
}

/// Push Job onto queue
/// - Returns: Identifier of queued job
@discardableResult public func push(_ buffer: ByteBuffer, options: JobOptions) async throws -> JobID {
@discardableResult public func push<Parameters>(_ jobRequest: JobRequest<Parameters>, options: JobOptions) async throws -> JobID {
let buffer = try self.jobRegistry.encode(jobRequest: jobRequest)
let jobID = JobID()
try await self.client.withTransaction(logger: self.logger) { connection in
let queuedJob = QueuedJob<JobID>(id: .init(), jobBuffer: buffer)
try await self.add(queuedJob, connection: connection)
try await self.addToQueue(jobId: queuedJob.id, connection: connection, delayUntil: options.delayUntil)
return queuedJob.id
try await self.add(jobID: jobID, jobBuffer: buffer, connection: connection)
try await self.addToQueue(jobID: jobID, connection: connection, delayUntil: options.delayUntil)
}
return jobID
}

/// Retry a job
/// - Returns: Bool
@discardableResult public func retry(_ id: JobID, buffer: ByteBuffer, options: JobOptions) async throws -> Bool {
public func retry<Parameters>(_ id: JobID, jobRequest: JobRequest<Parameters>, options: JobOptions) async throws {
let buffer = try self.jobRegistry.encode(jobRequest: jobRequest)
try await self.client.withTransaction(logger: self.logger) { connection in
try await self.updateJob(id: id, buffer: buffer, connection: connection)
try await self.addToQueue(jobId: id, connection: connection, delayUntil: options.delayUntil)
try await self.addToQueue(jobID: id, connection: connection, delayUntil: options.delayUntil)
}
return true
}

/// This is called to say job has finished processing and it can be deleted
public func finished(jobId: JobID) async throws {
try await self.delete(jobId: jobId)
public func finished(jobID: JobID) async throws {
try await self.delete(jobID: jobID)
}

/// This is called to say job has failed to run and should be put aside
public func failed(jobId: JobID, error: Error) async throws {
try await self.setStatus(jobId: jobId, status: .failed)
public func failed(jobID: JobID, error: Error) async throws {
try await self.setStatus(jobID: jobID, status: .failed)
}

/// stop serving jobs
Expand Down Expand Up @@ -223,60 +232,63 @@
)
}

func popFirst() async throws -> QueuedJob<JobID>? {
func popFirst() async throws -> JobQueueResult<JobID>? {
do {
let result = try await self.client.withTransaction(logger: self.logger) { connection -> Result<QueuedJob<JobID>?, Error> in
while true {
try Task.checkCancellation()

let stream = try await connection.query(
"""
WITH next_job AS (
SELECT
job_id
FROM _hb_pg_job_queue
WHERE delayed_until <= NOW()
ORDER BY createdAt, delayed_until ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
DELETE FROM
_hb_pg_job_queue
WHERE job_id = (SELECT job_id FROM next_job)
RETURNING job_id
""",
logger: self.logger
)
// return nil if nothing in queue
guard let jobId = try await stream.decode(UUID.self, context: .default).first(where: { _ in true }) else {
return Result.success(nil)
}
// select job from job table
let stream2 = try await connection.query(
"SELECT job FROM _hb_pg_jobs WHERE id = \(jobId)",
logger: self.logger
// The withTransaction closure returns a Result<(ByteBuffer, JobID)?, Error> because
// we want to be able to exit the closure without cancelling the transaction
let result = try await self.client.withTransaction(logger: self.logger) { connection -> Result<(ByteBuffer, JobID)?, Error> in
try Task.checkCancellation()

let stream = try await connection.query(
"""
WITH next_job AS (
SELECT
job_id
FROM _hb_pg_job_queue
WHERE delayed_until <= NOW()
ORDER BY createdAt, delayed_until ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
DELETE FROM
_hb_pg_job_queue
WHERE job_id = (SELECT job_id FROM next_job)
RETURNING job_id
""",
logger: self.logger
)
// return nil if nothing in queue
guard let jobID = try await stream.decode(UUID.self, context: .default).first(where: { _ in true }) else {
return Result.success(nil)
}
// set job status to processing
try await self.setStatus(jobID: jobID, status: .processing, connection: connection)

do {
try await self.setStatus(jobId: jobId, status: .processing, connection: connection)
// if failed to find a job in the job table try getting another index
guard let buffer = try await stream2.decode(ByteBuffer.self, context: .default).first(where: { _ in true }) else {
continue
}
return Result.success(QueuedJob(id: jobId, jobBuffer: buffer))
} catch {
try await self.setStatus(jobId: jobId, status: .failed, connection: connection)
return Result.failure(
JobQueueError(
code: .decodeJobFailed,
jobName: nil,
details: "\(String(reflecting: error))"
)
)
}
// select job from job table
let stream2 = try await connection.query(
"SELECT job FROM _hb_pg_jobs WHERE id = \(jobID)",
logger: self.logger
)
guard let buffer = try await stream2.decode(ByteBuffer.self, context: .default).first(where: { _ in true }) else {
logger.error(
"Failed to find job with id",
metadata: [
"JobID": "\(jobID)"
]
)
// if failed to find the job in the job table return nil
return .success(nil)

Check warning on line 280 in Sources/JobsPostgres/PostgresJobsQueue.swift

View check run for this annotation

Codecov / codecov/patch

Sources/JobsPostgres/PostgresJobsQueue.swift#L273-L280

Added lines #L273 - L280 were not covered by tests
}
return .success((buffer, jobID))

}
guard let (buffer, jobID) = try result.get() else { return nil }
do {
let jobInstance = try self.jobRegistry.decode(buffer)
return JobQueueResult(id: jobID, result: .success(jobInstance))
} catch let error as JobQueueError {
return JobQueueResult(id: jobID, result: .failure(error))
}
return try result.get()
} catch let error as PSQLError {
logger.error(
"Failed to get job from queue",
Expand All @@ -296,11 +308,11 @@
}
}

func add(_ job: QueuedJob<JobID>, connection: PostgresConnection) async throws {
func add(jobID: JobID, jobBuffer: ByteBuffer, connection: PostgresConnection) async throws {
try await connection.query(
"""
INSERT INTO _hb_pg_jobs (id, job, status)
VALUES (\(job.id), \(job.jobBuffer), \(Status.pending))
VALUES (\(jobID), \(jobBuffer), \(Status.pending))
""",
logger: self.logger
)
Expand All @@ -319,35 +331,36 @@
)
}

func delete(jobId: JobID) async throws {
func delete(jobID: JobID) async throws {
try await self.client.query(
"DELETE FROM _hb_pg_jobs WHERE id = \(jobId)",
"DELETE FROM _hb_pg_jobs WHERE id = \(jobID)",
logger: self.logger
)
}

func addToQueue(jobId: JobID, connection: PostgresConnection, delayUntil: Date) async throws {
func addToQueue(jobID: JobID, connection: PostgresConnection, delayUntil: Date) async throws {
// TODO: assign Date.now in swift-jobs options?
try await connection.query(
"""
INSERT INTO _hb_pg_job_queue (job_id, createdAt, delayed_until)
VALUES (\(jobId), \(Date.now), \(delayUntil))
VALUES (\(jobID), \(Date.now), \(delayUntil))
-- We have found an existing job with the same id, SKIP this INSERT
ON CONFLICT (job_id) DO NOTHING
""",
logger: self.logger
)
}

func setStatus(jobId: JobID, status: Status, connection: PostgresConnection) async throws {
func setStatus(jobID: JobID, status: Status, connection: PostgresConnection) async throws {
try await connection.query(
"UPDATE _hb_pg_jobs SET status = \(status), lastModified = \(Date.now) WHERE id = \(jobId)",
"UPDATE _hb_pg_jobs SET status = \(status), lastModified = \(Date.now) WHERE id = \(jobID)",
logger: self.logger
)
}

func setStatus(jobId: JobID, status: Status) async throws {
func setStatus(jobID: JobID, status: Status) async throws {
try await self.client.query(
"UPDATE _hb_pg_jobs SET status = \(status), lastModified = \(Date.now) WHERE id = \(jobId)",
"UPDATE _hb_pg_jobs SET status = \(status), lastModified = \(Date.now) WHERE id = \(jobID)",
logger: self.logger
)
}
Expand Down Expand Up @@ -375,20 +388,22 @@
case .rerun:
let jobs = try await getJobs(withStatus: status)
self.logger.info("Moving \(jobs.count) jobs with status: \(status) to job queue")
for jobId in jobs {
try await self.addToQueue(jobId: jobId, connection: connection, delayUntil: Date.now)
for jobID in jobs {
try await self.addToQueue(jobID: jobID, connection: connection, delayUntil: Date.now)
}

case .doNothing:
break
}
}

let jobRegistry: JobRegistry
}

/// extend PostgresJobQueue to conform to AsyncSequence
extension PostgresJobQueue {
public struct AsyncIterator: AsyncIteratorProtocol {
public typealias Element = QueuedJob<JobID>
public typealias Element = JobQueueResult<JobID>

let queue: PostgresJobQueue

Expand Down
Loading