Job Status Automatically Changing to Completed #556
-
Hello, I'm new to this library and I'm encountering an issue. I'm not sure if it's due to my implementation or if this is how pg-boss inherently works without any configuration options to change it. The problem is that pg-boss changes job statuses from active to completed as soon as a handler receives the job. This isn't the behavior I want—I need jobs to remain active while the handler executes (since these are designed for large, asynchronous tasks that won't complete immediately). Currently, when users or I check the job state in the database, it shows as completed even though the handler is still processing the job. Is there a way to configure this behavior, or is this just how pg-boss works? Any guidance would be appreciated. My implementation looks like this: The port: import {
AddWorkerToJobFailed,
CloseQueueFailed,
EmitJobFailed,
GetJobStateFailed,
UpdateJobStateFailed,
} from '../exceptions/queue.exceptions';
import { Result } from '@common/utils';
export type JobState = 'created' | 'retry' | 'active' | 'completed' | 'cancelled' | 'failed';
export type QueueOptions = {
retryLimit?: number;
delay?: number;
priority?: number;
};
export interface QueueServicePort {
/**
* Adds a job to the queue.
* @param jobName - The name of the job and queue.
* @param data - The payload for the job.
* @param options - Additional options for the job (e.g., retry limit, delay).
* @returns - The id of the job
*/
emitJob<T extends object>(jobName: string, data: T, options?: QueueOptions): Promise<Result<string, EmitJobFailed>>;
/**
* Add a worker to job in the queue.
* @param jobName - The jobName and queue to be updated.
* @param handler - The handler function to process each job.
*/
addWorkerToJob<T extends object>(
jobName: string,
handler: (jobId: string, data: T) => Promise<void>, // Changed to return Promise<void>
): Promise<Result<string, AddWorkerToJobFailed>>;
/**
* Retrieves a specific job by its ID.
* @param jobName - The jobName and queue to be updated
* @param jobId - The jobId/instance to be updated
* @returns The job if found, or null if not found.
*/
getJobState(jobName: string, jobId: string): Promise<Result<JobState, GetJobStateFailed>>;
/**
* Change status of a job
* @param jobName - The jobName and queue to be updated
* @param jobId - The jobId/instance to be updated
* @returns The new JobState or null on failing cases
*/
updateJobState(jobName: string, jobId: string, newState: JobState): Promise<Result<JobState, UpdateJobStateFailed>>;
/**
* Closes the queue and releases resources.
*/
close(): Promise<Result<boolean, CloseQueueFailed>>;
} The adapter: import { Result } from '@common/utils';
import PgBoss from 'pg-boss';
import { ConnectionConfigService } from '../../connection-config.service';
import {
AddWorkerToJobFailed,
BossDontInitialized,
CloseQueueFailed,
EmitJobFailed,
GetJobStateFailed,
InvalidJobStateException,
JobCreationFailed,
JobNotFoundException,
UpdateJobStateFailed,
} from '../exceptions/queue.exceptions';
import { JobState, QueueOptions, QueueServicePort } from '../ports/queue.port';
export class PgBossQueueAdapter implements QueueServicePort {
private readonly boss: PgBoss;
private isInitialized = false;
constructor(boss: PgBoss) {
const config = ConnectionConfigService.getPgBossConfig();
console.log(config.connectionString);
this.boss = boss;
this.boss.on('error', (error) => {
console.error('PgBoss error:', error);
});
}
async initialize(): Promise<void> {
await this.boss.start();
this.isInitialized = true;
}
async emitJob<T extends object>(
jobName: string,
data: T,
options?: QueueOptions,
): Promise<Result<string, EmitJobFailed>> {
if (!this.isInitialized) return Result.makeFail(new EmitJobFailed(new BossDontInitialized()));
try {
const jobId = options ? await this.boss.send(jobName, data, options) : await this.boss.send(jobName, data);
if (!jobId) {
return Result.makeFail(new EmitJobFailed(new JobCreationFailed()));
}
return Result.makeOk(jobId);
} catch (error) {
console.error(`Error emitting job ${jobName}:`, error);
return Result.makeFail(new EmitJobFailed());
}
}
async addWorkerToJob<T extends object>(
jobName: string,
handler: (jobId: string, data: T) => Promise<void>,
): Promise<Result<string, AddWorkerToJobFailed>> {
try {
const queue = await this.boss.getQueue(jobName);
if (!queue) await this.boss.createQueue(jobName);
await this.boss.subscribe(jobName, jobName);
const result = await this.boss.work<T>(jobName, { includeMetadata: true }, async ([pgBossJob]) => {
console.log(pgBossJob.state);
if (pgBossJob) await handler(pgBossJob.state, pgBossJob.data);
});
if (!result || result.length < 1) {
return Result.makeFail(new AddWorkerToJobFailed());
}
return Result.makeOk(result);
} catch (error) {
console.log(`Error adding worker to job ${jobName}`);
return Result.makeFail(new AddWorkerToJobFailed());
}
}
async getJobState(jobName: string, jobId: string): Promise<Result<JobState, GetJobStateFailed>> {
try {
const job = await this.boss.getJobById(jobName, jobId, { includeArchive: true });
if (!job) return Result.makeFail(new GetJobStateFailed(new JobNotFoundException()));
return Result.makeOk(job.state);
} catch (error) {
console.error(`Error retrieving job ${jobId} from queue ${jobName}:`, error);
return Result.makeFail(new GetJobStateFailed());
}
}
async updateJobState(
jobName: string,
jobId: string,
newState: JobState,
): Promise<Result<JobState, UpdateJobStateFailed>> {
try {
switch (newState) {
case 'completed':
await this.boss.complete(jobName, jobId);
console.log(`Job ${jobId} in queue ${jobName} marked as 'completed'.`);
return Result.makeOk('completed');
case 'cancelled':
await this.boss.cancel(jobName, jobId);
console.log(`Job ${jobId} in queue ${jobName} cancelled.`);
return Result.makeOk('cancelled');
case 'failed':
await this.boss.fail(jobName, jobId);
console.log(`Job ${jobId} in queue ${jobName} marked as 'failed'.`);
return Result.makeOk('failed');
default:
console.error(`Invalid job state: ${newState}`);
return Result.makeFail(new UpdateJobStateFailed(new InvalidJobStateException()));
}
} catch (error) {
console.error(`Error updating job ${jobId} in queue ${jobName} to state ${newState}:`, error);
return Result.makeFail(new UpdateJobStateFailed());
}
}
async close(): Promise<Result<boolean, CloseQueueFailed>> {
try {
await this.boss.stop();
return Result.makeOk(true);
} catch (error) {
console.log('Error closing queue');
return Result.makeFail(new CloseQueueFailed());
}
}
} |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment
-
Hello! I've identified the issue, and it was actually my mistake 😅. I was passing the following callback: import { Inject, Injectable, OnModuleInit } from '@nestjs/common';
import { UploadUgsPopulationJobName, UploadUgsPopulationPayload } from './job-payloads/save-ugs-population.job-payload';
import { EVENT_STORE, EventStorePort, QUEUE_SERVICE, QueueServicePort } from '@common-api/core';
import { LOGGER_SERVICE, LoggerPort } from '@common-api/logger-config';
@Injectable()
export class SaveUgsPopulationPgWorker implements OnModuleInit {
constructor(
@Inject(LOGGER_SERVICE)
private readonly logger: LoggerPort,
@Inject(QUEUE_SERVICE)
private readonly queueService: QueueServicePort,
@Inject(EVENT_STORE)
private readonly eventStore: EventStorePort,
) {}
async onModuleInit() {
await this.queueService.addWorkerToJob<UploadUgsPopulationPayload>(
'UploadUgsPopulationJobName',
async (jobId, data) => {
try {
await this.handle(jobId, data);
} catch (error) {
this.logger.error(`Error procesing job: ${UploadUgsPopulationJobName} for ${jobId}:`, error);
}
},
);
}
async handle(jobId: string, data: UploadUgsPopulationPayload) {
const { electoralEventId, population } = data;
setTimeout(async () => {
... worker real execution
}, 60000);
}
} I thought that using setTimeout would delay the callback execution, allowing me to test how clients requested job states. However, after reviewing the library's source code and seeing how jobs are marked as "completed" after successful callback execution, I realized that setTimeout breaks the promise chain. |
Beta Was this translation helpful? Give feedback.
Hello! I've identified the issue, and it was actually my mistake 😅.
I was passing the following callback: