Skip to content

Commit 5ae652f

Browse files
committed
Updated pgqueue
1 parent 16efb8c commit 5ae652f

File tree

7 files changed

+143
-27
lines changed

7 files changed

+143
-27
lines changed

pkg/pgqueue/pgqueue.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,11 @@ func (client *Client) Close(ctx context.Context) error {
7575
////////////////////////////////////////////////////////////////////////////////
7676
// PUBLIC METHODS
7777

78+
// Worker returns the worker name
79+
func (client *Client) Worker() string {
80+
return client.worker
81+
}
82+
7883
// RegisterQueue creates a new queue, or updates an existing queue, and returns it.
7984
func (client *Client) RegisterQueue(ctx context.Context, meta schema.Queue) (*schema.Queue, error) {
8085
var queue schema.Queue
@@ -98,7 +103,15 @@ func (client *Client) RegisterQueue(ctx context.Context, meta schema.Queue) (*sc
98103
// CreateQueue creates a new queue, and returns it.
99104
func (client *Client) CreateQueue(ctx context.Context, meta schema.Queue) (*schema.Queue, error) {
100105
var queue schema.Queue
101-
if err := client.conn.Insert(ctx, &queue, meta); err != nil {
106+
if err := client.conn.Tx(ctx, func(conn pg.Conn) error {
107+
if err := client.conn.Insert(ctx, &queue, meta); err != nil {
108+
return err
109+
} else if err := conn.Update(ctx, &queue, schema.QueueName(queue.Queue), meta); err != nil {
110+
return err
111+
}
112+
// Commit the transaction
113+
return nil
114+
}); err != nil {
102115
return nil, err
103116
}
104117
return &queue, nil

pkg/pgqueue/schema/queue.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,9 @@ func (q Queue) Insert(bind *pg.Bind) (string, error) {
207207
bind.Set("queue", queue)
208208
}
209209

210+
// Note: Inserts default values for ttl, retries, retry_delay
211+
// A subsequent update is required to set these values
212+
210213
// Return the insert query
211214
return queueInsert, nil
212215
}

pkg/pgqueue/schema/task.go

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -336,9 +336,7 @@ const (
336336
FROM
337337
${"schema"}."queue"
338338
WHERE
339-
"queue" = q
340-
AND
341-
"ns" = ns
339+
"queue" = q AND "ns" = ns
342340
LIMIT
343341
1
344342
) INSERT INTO
@@ -375,36 +373,36 @@ const (
375373
`
376374
taskRetainFunc = `
377375
-- A specific worker locks a task in a queue for processing
378-
CREATE OR REPLACE FUNCTION ${"schema"}.queue_lock(q TEXT, w TEXT) RETURNS BIGINT AS $$
376+
CREATE OR REPLACE FUNCTION ${"schema"}.queue_lock(ns TEXT, q TEXT, w TEXT) RETURNS BIGINT AS $$
379377
UPDATE ${"schema"}."task" SET
380378
"started_at" = TIMEZONE('UTC', NOW()), "worker" = w, "result" = 'null'
381379
WHERE "id" = (
382380
SELECT
383-
"id"
381+
"id"
384382
FROM
385-
${"schema"}."task"
383+
${"schema"}."task"
386384
WHERE
387-
"queue" = q
385+
("queue" = q AND "ns" = ns)
388386
AND
389-
("started_at" IS NULL AND "finished_at" IS NULL AND "dies_at" > TIMEZONE('UTC', NOW()))
387+
("started_at" IS NULL AND "finished_at" IS NULL AND "dies_at" > TIMEZONE('UTC', NOW()))
390388
AND
391-
("delayed_at" IS NULL OR "delayed_at" <= TIMEZONE('UTC', NOW()))
389+
("delayed_at" IS NULL OR "delayed_at" <= TIMEZONE('UTC', NOW()))
392390
AND
393-
("retries" > 0)
391+
("retries" > 0)
394392
ORDER BY
395-
"created_at"
393+
"created_at"
396394
FOR UPDATE SKIP LOCKED LIMIT 1
397395
) RETURNING
398-
"id"
396+
"id"
399397
$$ LANGUAGE SQL
400398
`
401399
taskReleaseFunc = `
402400
-- Unlock a task in a queue with successful result
403-
CREATE OR REPLACE FUNCTION ${"schema"}.queue_unlock(q TEXT, tid BIGINT, r JSONB) RETURNS BIGINT AS $$
401+
CREATE OR REPLACE FUNCTION ${"schema"}.queue_unlock(ns TEXT, q TEXT, tid BIGINT, r JSONB) RETURNS BIGINT AS $$
404402
UPDATE ${"schema"}."task" SET
405403
"finished_at" = TIMEZONE('UTC', NOW()), "dies_at" = NULL, "result" = r
406404
WHERE
407-
("id" = tid) AND ("queue" = q)
405+
("id" = tid) AND ("ns" = ns) AND ("queue" = q)
408406
AND
409407
("started_at" IS NOT NULL AND "finished_at" IS NULL AND "dies_at" > TIMEZONE('UTC', NOW()))
410408
RETURNING
@@ -449,25 +447,25 @@ const (
449447
JOIN
450448
${"schema"}."queue" Q
451449
ON
452-
T.queue = Q.queue
450+
T."queue" = Q."queue" AND T."ns" = Q."ns"
453451
WHERE
454452
T."id" = tid
455453
$$ LANGUAGE SQL
456454
`
457455
taskFailFunc = `
458456
-- Unlock a task in a queue with fail result
459-
CREATE OR REPLACE FUNCTION ${"schema"}.queue_fail(q TEXT, tid BIGINT, r JSONB) RETURNS BIGINT AS $$
457+
CREATE OR REPLACE FUNCTION ${"schema"}.queue_fail(ns TEXT, q TEXT, tid BIGINT, r JSONB) RETURNS BIGINT AS $$
460458
UPDATE ${"schema"}."task" SET
461459
"retries" = "retries" - 1, "result" = r, "started_at" = NULL, "finished_at" = NULL, "delayed_at" = ${"schema"}.queue_backoff(tid)
462460
WHERE
463-
"queue" = q AND "id" = tid AND "retries" > 0 AND ("started_at" IS NOT NULL AND "finished_at" IS NULL)
461+
"ns" = ns AND "queue" = q AND "id" = tid AND "retries" > 0 AND ("started_at" IS NOT NULL AND "finished_at" IS NULL)
464462
RETURNING
465463
"id"
466464
$$ LANGUAGE SQL
467465
`
468466
taskCleanFunc = `
469467
-- Cleanup tasks in a queue which are in an end state
470-
CREATE OR REPLACE FUNCTION ${"schema"}.queue_clean(q TEXT) RETURNS TABLE (
468+
CREATE OR REPLACE FUNCTION ${"schema"}.queue_clean(ns TEXT, q TEXT) RETURNS TABLE (
471469
"id" BIGINT, "queue" TEXT, "payload" JSONB, "result" JSONB, "worker" TEXT, "created_at" TIMESTAMP, "delayed_at" TIMESTAMP, "started_at" TIMESTAMP, "finished_at" TIMESTAMP, "dies_at" TIMESTAMP, "retries" INTEGER
472470
) AS $$
473471
DELETE FROM
@@ -480,7 +478,7 @@ const (
480478
FROM
481479
${"schema"}."task"
482480
WHERE
483-
"queue" = q
481+
"ns" = ns AND "queue" = q
484482
AND
485483
(dies_at IS NULL OR dies_at < TIMEZONE('UTC', NOW()))
486484
) SELECT
@@ -500,15 +498,15 @@ const (
500498
`
501499
taskRetain = `
502500
-- Returns the id of the task which has been retained
503-
SELECT ${"schema"}.queue_lock(@id, @worker)
501+
SELECT ${"schema"}.queue_lock(@ns, @id, @worker)
504502
`
505503
taskRelease = `
506504
-- Returns the id of the task which has been released
507-
SELECT ${"schema"}.queue_unlock(@id, @task, @result)
505+
SELECT ${"schema"}.queue_unlock(@ns, @id, @task, @result)
508506
`
509507
taskFail = `
510508
-- Returns the id of the task which has been failed
511-
SELECT ${"schema"}.queue_fail(@id, @task, @result)
509+
SELECT ${"schema"}.queue_fail(@ns, @id, @task, @result)
512510
`
513511
taskSelect = `
514512
SELECT

pkg/pgqueue/task.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@ import (
44
"context"
55

66
// Packages
7+
8+
"github.com/djthorpe/go-pg"
79
schema "github.com/mutablelogic/go-server/pkg/pgqueue/schema"
10+
"github.com/mutablelogic/go-server/pkg/types"
811
)
912

1013
////////////////////////////////////////////////////////////////////////////////
@@ -21,3 +24,41 @@ func (client *Client) CreateTask(ctx context.Context, queue string, meta schema.
2124
}
2225
return &task.Task, nil
2326
}
27+
28+
// RetainTask retains a task from a queue, and returns it. Returns nil if there is no
29+
// task to retain
30+
func (client *Client) RetainTask(ctx context.Context, queue string) (*schema.Task, error) {
31+
var taskId schema.TaskId
32+
var task schema.TaskWithStatus
33+
if err := client.conn.Get(ctx, &taskId, schema.TaskRetain{
34+
Queue: queue,
35+
Worker: client.worker,
36+
}); err != nil {
37+
return nil, err
38+
} else if taskId.Id == nil {
39+
// No task to retain
40+
return nil, nil
41+
} else if err := client.conn.Get(ctx, &task, taskId); err != nil {
42+
return nil, err
43+
}
44+
return &task.Task, nil
45+
}
46+
47+
// ReleaseTask releases a task from a queue, and returns it.
48+
func (client *Client) ReleaseTask(ctx context.Context, task uint64, result any) (*schema.Task, error) {
49+
var taskId schema.TaskId
50+
var taskObj schema.TaskWithStatus
51+
if err := client.conn.Get(ctx, &taskId, schema.TaskRelease{
52+
TaskId: schema.TaskId{Id: types.Uint64Ptr(task)},
53+
Fail: false,
54+
Result: result,
55+
}); err != nil {
56+
return nil, err
57+
} else if taskId.Id == nil {
58+
// No task found
59+
return nil, pg.ErrNotFound
60+
} else if err := client.conn.Get(ctx, &taskObj, taskId); err != nil {
61+
return nil, err
62+
}
63+
return &taskObj.Task, nil
64+
}

pkg/pgqueue/task_test.go

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,71 @@ func Test_Task_001(t *testing.T) {
2828
})
2929
assert.NoError(err)
3030
assert.NotNil(queue)
31-
assert.Equal(types.PtrUint64(queue.Retries), 10)
31+
assert.Equal(types.PtrUint64(queue.Retries), uint64(10))
3232

33-
// Create a task
34-
t.Run("CreateTask_1", func(t *testing.T) {
33+
t.Run("CreateTask", func(t *testing.T) {
34+
// Create a task
3535
task, err := client.CreateTask(context.TODO(), queue.Queue, schema.TaskMeta{
3636
Payload: "task_payload_1",
3737
})
3838
if assert.NoError(err) {
3939
assert.NotNil(task)
4040
assert.Equal(task.Queue, queue.Queue)
4141
assert.Equal(task.Payload, "task_payload_1")
42-
assert.Equal(types.PtrUint64(task.Retries), 10)
42+
assert.Equal(types.PtrUint64(task.Retries), uint64(10))
4343
t.Log("Task created", task)
4444
}
45+
46+
// Retain the task
47+
task2, err := client.RetainTask(context.TODO(), queue.Queue)
48+
if assert.NoError(err) {
49+
assert.NotNil(task2)
50+
assert.Equal(task2.Queue, queue.Queue)
51+
assert.Equal(task2.Payload, "task_payload_1")
52+
assert.Equal(types.PtrString(task2.Worker), client.Worker())
53+
assert.Equal(types.PtrUint64(task2.Retries), uint64(10))
54+
assert.NotNil(task2.CreatedAt)
55+
assert.NotNil(task2.StartedAt)
56+
assert.NotNil(task2.DiesAt)
57+
t.Log("Task retained", task2)
58+
}
59+
})
60+
61+
t.Run("RetainTask", func(t *testing.T) {
62+
// Retain a task which does not exist
63+
task2, err := client.RetainTask(context.TODO(), queue.Queue)
64+
if assert.NoError(err) {
65+
assert.Nil(task2)
66+
}
67+
})
68+
69+
t.Run("ReleaseTask", func(t *testing.T) {
70+
// Create a task
71+
task, err := client.CreateTask(context.TODO(), queue.Queue, schema.TaskMeta{
72+
Payload: "task_payload_2",
73+
})
74+
if !assert.NoError(err) {
75+
t.FailNow()
76+
}
77+
assert.NotNil(task)
78+
t.Log("Task created", task)
79+
80+
// Retain the task
81+
task2, err := client.RetainTask(context.TODO(), queue.Queue)
82+
if !assert.NoError(err) {
83+
t.FailNow()
84+
}
85+
assert.NotNil(task2)
86+
assert.Equal(task2.Payload, "task_payload_2")
87+
t.Log("Task retained", task2)
88+
89+
// Release the task
90+
task3, err := client.ReleaseTask(context.TODO(), types.PtrUint64(task2.Id), "completed")
91+
if !assert.NoError(err) {
92+
t.FailNow()
93+
}
94+
assert.NotNil(task3)
95+
assert.Equal(task3.Payload, "completed")
96+
t.Log("Task released", task3)
4597
})
4698
}

plugin.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,9 @@ type PGCallback func(context.Context, any) error
9494
type PGQueue interface {
9595
Task
9696

97+
// Return the worker name
98+
Worker() string
99+
97100
// Register a ticker with a callback, and return the registered ticker
98101
RegisterTicker(context.Context, schema.TickerMeta, PGCallback) (*schema.Ticker, error)
99102

plugin/pgqueue/task.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ type exec struct {
3030
}
3131

3232
var _ server.Task = (*task)(nil)
33+
var _ server.PGQueue = (*task)(nil)
3334

3435
////////////////////////////////////////////////////////////////////////////////
3536
// LIFECYCLE
@@ -92,6 +93,11 @@ FOR_LOOP:
9293
return result
9394
}
9495

96+
// Return the worker name
97+
func (t *task) Worker() string {
98+
return t.client.Worker()
99+
}
100+
95101
// Register a ticker with a callback, and return the registered ticker
96102
func (t *task) RegisterTicker(ctx context.Context, meta schema.TickerMeta, fn server.PGCallback) (*schema.Ticker, error) {
97103
ticker, err := t.client.RegisterTicker(ctx, meta)

0 commit comments

Comments
 (0)