-
Notifications
You must be signed in to change notification settings - Fork 17
Open
Labels
bugSomething isn't workingSomething isn't working
Description
Jobs sometimes executed multiple times.
Issue
I have found a concurrency safety issue.
A dispatched job will be dequeued twice with same jobID and payload.
Reproducing
This is a reproducing repository: https://github.com/sidepelican/QueuesFluentDriverMultipleExecution
This repository dispatches a simple job several times, and automatically detects when the same job launched multiple times.
$ swift run
...
2022-12-19T19:08:24+0900 info main : job_id=940F558A-EC00-47B9-8935-45D884281708 p_id=51675C99-B581-4555-9072-A376D1E95770 [App] EchoJob!
2022-12-19T19:08:24+0900 info main : job_id=940F558A-EC00-47B9-8935-45D884281708 p_id=51675C99-B581-4555-9072-A376D1E95770 [App] EchoJob!
p_id=51675C99-B581-4555-9072-A376D1E95770 is multiple executed!
Cause
- Queues calls
Queue.set
andQueue.push
inQueue.dispatch
.
FluentQueue.set
save aJobModel
.JobModel.state
has.pending
as initial state.
public func set(_ id: JobIdentifier, to jobStorage: JobData) -> EventLoopFuture<Void> { | |
let jobModel = JobModel(id: id, queue: queueName.string, jobData: JobDataModel(jobData: jobStorage)) | |
// If the job must run at a later time, ensure it won't be picked earlier since | |
// we sort pending jobs by date when querying | |
jobModel.runAtOrAfter = jobStorage.delayUntil ?? Date() | |
return jobModel.save(on: db).map { metadata in | |
return | |
} | |
} |
FluentQueue.push
writes the job's state topending
. The default value ofstate
is.pending
, so this operation is seemingly meaningless.
public func push(_ id: JobIdentifier) -> EventLoopFuture<Void> { | |
guard let sqlDb = db as? SQLDatabase else { | |
return self.context.eventLoop.makeFailedFuture(QueuesFluentError.databaseNotFound) | |
} | |
return sqlDb | |
.update(JobModel.schema) | |
.set(SQLColumn("\(FieldKey.state)"), to: SQLBind(QueuesFluentJobState.pending)) | |
.where(SQLColumn("\(FieldKey.id)"), .equal, SQLBind(id.string)) | |
.run() | |
} |
- The jobs set in 2 is ready for the workers to dequeue. What happens if a worker dequeues a job set in 2 between 2 and 3? The worker set the
state
to.processing
and then it is overridden to.pending
in 3. - The state is
.pending
so another worker can dequeue the job. Incident happens.
How to fix?
I think there are two ways.
One is to add .initialized
to QueuesFluentJobState
and use it as an initial value of JobModel.state
.
The other is to do nothing in FluentQueue.push
.
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't working