Skip to content

Commit 30ac1a4

Browse files
committed
Updates
1 parent dcffe0c commit 30ac1a4

File tree

2 files changed

+8
-8
lines changed

2 files changed

+8
-8
lines changed

pkg/pgqueue/config/task.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func (task *task) Run(parent context.Context) error {
8383

8484
// Task worker pool
8585
taskpool := pgqueue.NewTaskPool(task.workers)
86-
ref.Log(ctx).Debug(parent, "Created task pool with ", task.workers, " workers")
86+
ref.Log(ctx).Debug(parent, "Created task pool with ", task.workers, " threads")
8787

8888
FOR_LOOP:
8989
for {

pkg/pgqueue/schema/task.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ const (
330330
`
331331
taskCreateInsertFunc = `
332332
-- Insert a new payload into a queue
333-
CREATE OR REPLACE FUNCTION ${"schema"}.queue_insert(ns TEXT, q TEXT, p JSONB, delayed_at TIMESTAMP) RETURNS BIGINT AS $$
333+
CREATE OR REPLACE FUNCTION ${"schema"}.queue_insert(n TEXT, q TEXT, p JSONB, delayed_at TIMESTAMP) RETURNS BIGINT AS $$
334334
WITH defaults AS (
335335
-- Select the retries and ttl from the queue defaults
336336
SELECT
@@ -342,13 +342,13 @@ const (
342342
FROM
343343
${"schema"}."queue"
344344
WHERE
345-
"queue" = q AND "ns" = ns
345+
"queue" = q AND "ns" = n
346346
LIMIT
347347
1
348348
) INSERT INTO
349349
${"schema"}."task" ("ns", "queue", "payload", "delayed_at", "retries", "initial_retries", "dies_at")
350350
SELECT
351-
ns, q, p, CASE
351+
n, q, p, CASE
352352
WHEN "delayed_at" IS NULL THEN NULL
353353
WHEN "delayed_at" < TIMEZONE('UTC', NOW()) THEN (NOW() AT TIME ZONE 'UTC')
354354
ELSE "delayed_at"
@@ -379,7 +379,7 @@ const (
379379
`
380380
taskRetainFunc = `
381381
-- A specific worker locks a task in a queue for processing
382-
CREATE OR REPLACE FUNCTION ${"schema"}.queue_lock(ns TEXT, w TEXT) RETURNS BIGINT AS $$
382+
CREATE OR REPLACE FUNCTION ${"schema"}.queue_lock(n TEXT, w TEXT) RETURNS BIGINT AS $$
383383
UPDATE ${"schema"}."task" SET
384384
"started_at" = TIMEZONE('UTC', NOW()), "worker" = w, "result" = 'null'
385385
WHERE "id" = (
@@ -388,7 +388,7 @@ const (
388388
FROM
389389
${"schema"}."task"
390390
WHERE
391-
"ns" = ns
391+
"ns" = n
392392
AND
393393
("started_at" IS NULL AND "finished_at" IS NULL AND "dies_at" > TIMEZONE('UTC', NOW()))
394394
AND
@@ -471,7 +471,7 @@ const (
471471
`
472472
taskCleanFunc = `
473473
-- Cleanup tasks in a queue which are in an end state
474-
CREATE OR REPLACE FUNCTION ${"schema"}.queue_clean(ns TEXT, q TEXT) RETURNS TABLE (
474+
CREATE OR REPLACE FUNCTION ${"schema"}.queue_clean(n TEXT, q TEXT) RETURNS TABLE (
475475
"id" BIGINT, "queue" TEXT, "ns" TEXT, "payload" JSONB, "result" JSONB, "worker" TEXT, "created_at" TIMESTAMP, "delayed_at" TIMESTAMP, "started_at" TIMESTAMP, "finished_at" TIMESTAMP, "dies_at" TIMESTAMP, "retries" INTEGER
476476
) AS $$
477477
DELETE FROM
@@ -484,7 +484,7 @@ const (
484484
FROM
485485
${"schema"}."task"
486486
WHERE
487-
"ns" = ns AND "queue" = q
487+
"ns" = n AND "queue" = q
488488
AND
489489
(dies_at IS NULL OR dies_at < TIMEZONE('UTC', NOW()))
490490
) SELECT

0 commit comments

Comments
 (0)