Skip to content

Commit 5cc733b

Browse files
committed
Updated task loop
1 parent a4a6cea commit 5cc733b

File tree

8 files changed

+356
-58
lines changed

8 files changed

+356
-58
lines changed

pkg/pgqueue/opt.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ func OptNamespace(v string) Opt {
8686
if v = strings.TrimSpace(v); !types.IsIdentifier(v) {
8787
return httpresponse.ErrBadRequest.With("invalid namespacename ")
8888
} else {
89-
o.namespace = v
89+
o.namespace = strings.ToLower(v)
9090
}
9191
return nil
9292
}

pkg/pgqueue/pgqueue.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616
type Client struct {
1717
conn pg.PoolConn
1818
listener pg.Listener
19-
topics []string
2019
worker string
2120
}
2221

@@ -37,11 +36,19 @@ func New(ctx context.Context, conn pg.PoolConn, opt ...Opt) (*Client, error) {
3736
return nil, httpresponse.ErrInternalError.Withf("Cannot create listener")
3837
} else {
3938
self.listener = listener
40-
self.conn = conn.With(
41-
"schema", schema.SchemaName,
42-
"ns", opts.namespace,
43-
).(pg.PoolConn)
44-
self.topics = []string{schema.TopicQueueInsert}
39+
}
40+
41+
// Set the connection
42+
self.conn = conn.With(
43+
"schema", schema.SchemaName,
44+
"ns", opts.namespace,
45+
).(pg.PoolConn)
46+
47+
// Listen for topics
48+
for _, topic := range []string{opts.namespace + "_" + schema.TopicQueueInsert} {
49+
if err := self.listener.Listen(ctx, topic); err != nil {
50+
return nil, httpresponse.ErrInternalError.Withf("Cannot listen to topic %q: %v", topic, err)
51+
}
4552
}
4653

4754
// If the schema does not exist, then bootstrap it

pkg/pgqueue/schema/schema.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ const (
2020
TickerListLimit = 100
2121
TaskListLimit = 100
2222
TickerPeriod = 15 * time.Second
23+
TaskPeriod = 15 * time.Second
2324
)
2425

2526
////////////////////////////////////////////////////////////////////////////////

pkg/pgqueue/schema/task.go

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -10,25 +10,22 @@ import (
1010
// Packages
1111
pg "github.com/djthorpe/go-pg"
1212
httpresponse "github.com/mutablelogic/go-server/pkg/httpresponse"
13-
"github.com/mutablelogic/go-server/pkg/types"
13+
types "github.com/mutablelogic/go-server/pkg/types"
1414
)
1515

1616
////////////////////////////////////////////////////////////////////////////////
1717
// TYPES
1818

19-
type TaskId struct {
20-
Id *uint64 `json:"id,omitempty"`
21-
}
19+
type TaskId uint64
2220

2321
type TaskRetain struct {
24-
Queue string `json:"queue,omitempty"`
2522
Worker string `json:"worker,omitempty"`
2623
}
2724

2825
type TaskRelease struct {
29-
TaskId
30-
Fail bool `json:"fail,omitempty"`
31-
Result any `json:"result,omitempty"`
26+
Id uint64 `json:"id,omitempty"`
27+
Fail bool `json:"fail,omitempty"`
28+
Result any `json:"result,omitempty"`
3229
}
3330

3431
type TaskMeta struct {
@@ -37,7 +34,7 @@ type TaskMeta struct {
3734
}
3835

3936
type Task struct {
40-
TaskId
37+
Id uint64 `json:"id,omitempty"`
4138
TaskMeta
4239
Worker *string `json:"worker,omitempty"`
4340
Queue string `json:"queue,omitempty"`
@@ -96,7 +93,13 @@ func (t TaskList) String() string {
9693
// READER
9794

9895
func (t *TaskId) Scan(row pg.Row) error {
99-
return row.Scan(&t.Id)
96+
var id *uint64
97+
if err := row.Scan(&id); err != nil {
98+
return err
99+
} else {
100+
*t = TaskId(types.PtrUint64(id))
101+
}
102+
return nil
100103
}
101104

102105
func (t *Task) Scan(row pg.Row) error {
@@ -184,7 +187,7 @@ func (t TaskMeta) Update(bind *pg.Bind) error {
184187
// SELECTOR
185188

186189
func (t TaskId) Select(bind *pg.Bind, op pg.Op) (string, error) {
187-
bind.Set("tid", t.Id)
190+
bind.Set("tid", t)
188191
switch op {
189192
case pg.Get:
190193
return taskGet, nil
@@ -215,13 +218,6 @@ func (l TaskListRequest) Select(bind *pg.Bind, op pg.Op) (string, error) {
215218
}
216219

217220
func (t TaskRetain) Select(bind *pg.Bind, op pg.Op) (string, error) {
218-
// Queue is required
219-
if queue := strings.TrimSpace(t.Queue); queue == "" {
220-
return "", httpresponse.ErrBadRequest.Withf("Missing queue")
221-
} else {
222-
bind.Set("id", queue)
223-
}
224-
225221
// Worker is required
226222
if worker := strings.TrimSpace(t.Worker); worker == "" {
227223
return "", httpresponse.ErrBadRequest.Withf("Missing worker")
@@ -239,10 +235,10 @@ func (t TaskRetain) Select(bind *pg.Bind, op pg.Op) (string, error) {
239235
}
240236

241237
func (t TaskRelease) Select(bind *pg.Bind, op pg.Op) (string, error) {
242-
if t.Id == nil || *t.Id == 0 {
238+
if t.Id == 0 {
243239
return "", httpresponse.ErrBadRequest.Withf("Missing task id")
244240
} else {
245-
bind.Set("tid", types.PtrUint64(t.Id))
241+
bind.Set("tid", t.Id)
246242
}
247243

248244
// Result of the task
@@ -357,7 +353,7 @@ const (
357353
taskCreateNotifyFunc = `
358354
CREATE OR REPLACE FUNCTION ${"schema"}.queue_notify() RETURNS TRIGGER AS $$
359355
BEGIN
360-
PERFORM pg_notify('queue_insert', LOWER(NEW.queue));
356+
PERFORM pg_notify(LOWER(NEW.ns) || '_queue_insert', LOWER(NEW.queue));
361357
RETURN NEW;
362358
END;
363359
$$ LANGUAGE plpgsql
@@ -374,7 +370,7 @@ const (
374370
`
375371
taskRetainFunc = `
376372
-- A specific worker locks a task in a queue for processing
377-
CREATE OR REPLACE FUNCTION ${"schema"}.queue_lock(ns TEXT, q TEXT, w TEXT) RETURNS BIGINT AS $$
373+
CREATE OR REPLACE FUNCTION ${"schema"}.queue_lock(ns TEXT, w TEXT) RETURNS BIGINT AS $$
378374
UPDATE ${"schema"}."task" SET
379375
"started_at" = TIMEZONE('UTC', NOW()), "worker" = w, "result" = 'null'
380376
WHERE "id" = (
@@ -383,7 +379,7 @@ const (
383379
FROM
384380
${"schema"}."task"
385381
WHERE
386-
("queue" = q AND "ns" = ns)
382+
"ns" = ns
387383
AND
388384
("started_at" IS NULL AND "finished_at" IS NULL AND "dies_at" > TIMEZONE('UTC', NOW()))
389385
AND
@@ -461,7 +457,7 @@ const (
461457
WHERE
462458
"id" = tid AND "retries" > 0 AND ("started_at" IS NOT NULL AND "finished_at" IS NULL)
463459
RETURNING
464-
"id"
460+
"id"
465461
$$ LANGUAGE SQL
466462
`
467463
taskCleanFunc = `
@@ -499,7 +495,7 @@ const (
499495
`
500496
taskRetain = `
501497
-- Returns the id of the task which has been retained
502-
SELECT ${"schema"}.queue_lock(@ns, @id, @worker)
498+
SELECT ${"schema"}.queue_lock(@ns, @worker)
503499
`
504500
taskRelease = `
505501
-- Returns the id of the task which has been released

pkg/pgqueue/task.go

Lines changed: 117 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@ package pgqueue
22

33
import (
44
"context"
5-
"fmt"
5+
"errors"
6+
"sync"
7+
"time"
68

79
// Packages
8-
9-
"github.com/djthorpe/go-pg"
10+
pg "github.com/djthorpe/go-pg"
1011
schema "github.com/mutablelogic/go-server/pkg/pgqueue/schema"
11-
"github.com/mutablelogic/go-server/pkg/types"
1212
)
1313

1414
////////////////////////////////////////////////////////////////////////////////
@@ -26,17 +26,26 @@ func (client *Client) CreateTask(ctx context.Context, queue string, meta schema.
2626
return &task.Task, nil
2727
}
2828

29-
// RetainTask retains a task from a queue, and returns it. Returns nil if there is no
30-
// task to retain
31-
func (client *Client) RetainTask(ctx context.Context, queue string) (*schema.Task, error) {
29+
// GetTask returns a task based on identifier, and optionally sets the task status.
30+
func (client *Client) GetTask(ctx context.Context, task uint64, status *string) (*schema.Task, error) {
31+
var taskObj schema.TaskWithStatus
32+
if err := client.conn.Get(ctx, &taskObj, schema.TaskId(task)); err != nil {
33+
return nil, err
34+
} else if status != nil {
35+
*status = taskObj.Status
36+
}
37+
return &taskObj.Task, nil
38+
}
39+
40+
// NextTask retains a task, and returns it. Returns nil if there is no task to retain
41+
func (client *Client) NextTask(ctx context.Context) (*schema.Task, error) {
3242
var taskId schema.TaskId
3343
var task schema.TaskWithStatus
3444
if err := client.conn.Get(ctx, &taskId, schema.TaskRetain{
35-
Queue: queue,
3645
Worker: client.worker,
3746
}); err != nil {
3847
return nil, err
39-
} else if taskId.Id == nil {
48+
} else if taskId == 0 {
4049
// No task to retain
4150
return nil, nil
4251
} else if err := client.conn.Get(ctx, &task, taskId); err != nil {
@@ -50,18 +59,114 @@ func (client *Client) ReleaseTask(ctx context.Context, task uint64, result any)
5059
var taskId schema.TaskId
5160
var taskObj schema.TaskWithStatus
5261
if err := client.conn.Get(ctx, &taskId, schema.TaskRelease{
53-
TaskId: schema.TaskId{Id: types.Uint64Ptr(task)},
62+
Id: task,
5463
Fail: false,
5564
Result: result,
5665
}); err != nil {
5766
return nil, err
58-
} else if taskId.Id == nil {
67+
} else if taskId == 0 {
5968
// No task found
6069
return nil, pg.ErrNotFound
6170
}
62-
fmt.Println("taskId", taskId)
6371
if err := client.conn.Get(ctx, &taskObj, taskId); err != nil {
6472
return nil, err
6573
}
6674
return &taskObj.Task, nil
6775
}
76+
77+
// FailTask fails a task, either for retry or permanent failure, and returns the task and status.
78+
func (client *Client) FailTask(ctx context.Context, task uint64, result any, status *string) (*schema.Task, error) {
79+
var taskId schema.TaskId
80+
var taskObj schema.TaskWithStatus
81+
if err := client.conn.Get(ctx, &taskId, schema.TaskRelease{
82+
Id: task,
83+
Fail: true,
84+
Result: result,
85+
}); err != nil {
86+
return nil, err
87+
} else if taskId == 0 {
88+
// No task found
89+
return nil, pg.ErrNotFound
90+
}
91+
if err := client.conn.Get(ctx, &taskObj, taskId); err != nil {
92+
return nil, err
93+
} else if status != nil {
94+
*status = taskObj.Status
95+
}
96+
return &taskObj.Task, nil
97+
}
98+
99+
// RunTaskLoop runs a loop to process matured tasks, until the context is cancelled.
100+
// It does not retain or release tasks, but simply returns them to the caller.
101+
func (client *Client) RunTaskLoop(ctx context.Context, taskch chan<- *schema.Task, errch chan<- error) error {
102+
var wg sync.WaitGroup
103+
104+
max_delta := schema.TaskPeriod
105+
min_delta := schema.TaskPeriod / 10
106+
timer := time.NewTimer(200 * time.Millisecond)
107+
defer timer.Stop()
108+
109+
// Make a channel for notifications
110+
notifych := make(chan *pg.Notification)
111+
defer close(notifych)
112+
113+
// Listen for notifications
114+
wg.Add(1)
115+
go func(ctx context.Context) {
116+
defer wg.Done()
117+
for {
118+
select {
119+
case <-ctx.Done():
120+
return
121+
default:
122+
notification, err := client.listener.WaitForNotification(ctx)
123+
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
124+
errch <- err
125+
} else {
126+
notifych <- notification
127+
}
128+
}
129+
}
130+
}(ctx)
131+
132+
nextTaskFn := func() error {
133+
// Check for matured tasks
134+
task, err := client.NextTask(ctx)
135+
if err != nil {
136+
return err
137+
}
138+
if task != nil {
139+
// Emit task to channel
140+
taskch <- task
141+
// Retain task, reset timer to minimum period
142+
timer.Reset(min_delta)
143+
} else {
144+
// No task to retain, reset timer to maximum period
145+
timer.Reset(max_delta)
146+
}
147+
return nil
148+
}
149+
150+
// Loop until context is cancelled
151+
FOR_LOOP:
152+
for {
153+
select {
154+
case <-ctx.Done():
155+
break FOR_LOOP
156+
case <-timer.C:
157+
if err := nextTaskFn(); err != nil {
158+
errch <- err
159+
}
160+
case <-notifych:
161+
if err := nextTaskFn(); err != nil {
162+
errch <- err
163+
}
164+
}
165+
}
166+
167+
// Wait for all goroutines to finish
168+
wg.Wait()
169+
170+
// Return success
171+
return nil
172+
}

0 commit comments

Comments
 (0)