Skip to content

Commit 8d38e74

Browse files
committed
Updated
1 parent 5cc733b commit 8d38e74

File tree

1 file changed

+26
-83
lines changed

1 file changed

+26
-83
lines changed

plugin/pgqueue/task.go

Lines changed: 26 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ type task struct {
2222
sync.WaitGroup
2323
client *pgqueue.Client
2424
tickers map[string]exec
25+
queues map[string]exec
2526
}
2627

2728
type exec struct {
@@ -39,6 +40,7 @@ func taskWith(queue *pgqueue.Client) *task {
3940
return &task{
4041
client: queue,
4142
tickers: make(map[string]exec, 10),
43+
queues: make(map[string]exec, 10),
4244
}
4345
}
4446

@@ -102,8 +104,7 @@ FOR_LOOP:
102104
case ticker := <-tickerch:
103105
t.execTicker(ctx, ticker, errch)
104106
case task := <-taskch:
105-
// TODO: Handle tasks, for now, just log them
106-
log.Println("TASK:", task)
107+
t.execTask(ctx, task, errch)
107108
case err := <-errch:
108109
// TODO: Handle errors, for now, just log them
109110
log.Println("ERROR:", err)
@@ -157,6 +158,12 @@ func (t *task) RegisterQueue(ctx context.Context, meta schema.Queue, fn server.P
157158
return nil, err
158159
}
159160

161+
// Add the queue to the map
162+
t.queues[queue.Queue] = exec{
163+
deadline: types.PtrDuration(queue.TTL),
164+
fn: fn,
165+
}
166+
160167
// Return success
161168
return queue, nil
162169
}
@@ -196,7 +203,7 @@ func (t *task) exec(ctx context.Context, fn exec, in any) (result error) {
196203
}
197204
}()
198205

199-
// Run the ticker handler
206+
// Run the ticker or task handler
200207
err := fn.fn(deadline, in)
201208
if err != nil {
202209
result = errors.Join(result, err)
@@ -222,92 +229,28 @@ func (t *task) execTicker(ctx context.Context, ticker *schema.Ticker, errch chan
222229
t.Add(1)
223230
go func(ctx context.Context) {
224231
defer t.Done()
225-
if err := t.exec(ctx, fn, ticker); err != nil {
232+
if err := t.exec(ctx, fn, nil); err != nil {
226233
errch <- fmt.Errorf("TICKER %q: %w", ticker.Ticker, err)
227234
}
228235
}(contextWithTicker(ctx, ticker))
229236
}
230237

231-
/*
232-
233-
// Defer closing the listener
234-
defer func() {
235-
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
236-
defer cancel()
237-
t.listener.Close(ctx)
238-
}()
239-
240-
// Subscribe to pg topics
241-
for _, topic := range t.topics {
242-
if err := t.listener.Listen(ctx, topic); err != nil {
243-
return err
244-
}
238+
// Execute a task callback
239+
func (t *task) execTask(ctx context.Context, task *schema.Task, errch chan<- error) {
240+
fn, exists := t.queues[task.Queue]
241+
if !exists {
242+
return
245243
}
246244

247-
// Create channel for notifications, which we close when we're done
248-
evt := make(chan *pg.Notification)
249-
defer close(evt)
250-
251-
// Notifications
252-
t.wg.Add(1)
245+
// Execute the callback function in a goroutine - and then release or fail the task
246+
t.Add(1)
253247
go func(ctx context.Context) {
254-
defer t.wg.Done()
255-
runNotificationsForListener(ctx, t.listener, evt)
256-
}(ctx)
257-
258-
// Ticker namespaces
259-
tickerch := make(chan *schema.Ticker)
260-
defer close(tickerch)
261-
for namespace := range t.ticker {
262-
t.wg.Add(1)
263-
go func(ctx context.Context) {
264-
defer t.wg.Done()
265-
t.runTickerForNamespace(ctx, namespace, tickerch)
266-
}(ctx)
267-
}
268-
269-
// Queue processing
270-
defer close(t.taskch)
271-
272-
// Process notifications and tickers in the main loop
273-
FOR_LOOP:
274-
for {
275-
select {
276-
case <-ctx.Done():
277-
break FOR_LOOP
278-
case notification := <-evt:
279-
// Notification that a queue task has been inserted
280-
now := time.Now()
281-
provider.Log(ctx).With("channel", notification.Channel, "payload", string(notification.Payload)).Print(ctx, "NOTIFICATION")
282-
if err := t.processNotificationTopic(ctx, notification.Channel, string(notification.Payload)); err != nil {
283-
provider.Log(ctx).With("channel", notification.Channel, "duration_ms", time.Since(now).Milliseconds()).Print(ctx, " ", err)
284-
} else {
285-
provider.Log(ctx).With("channel", notification.Channel, "duration_ms", time.Since(now).Milliseconds()).Print(ctx, " ", "COMPLETED")
286-
}
287-
case ticker := <-tickerch:
288-
// Notification that a ticker has fired
289-
now := time.Now()
290-
provider.Log(ctx).With("namespace", ticker.Namespace, "ticker", ticker.Ticker).Print(ctx, "TICKER")
291-
if err := t.processTicker(ctx, ticker); err != nil {
292-
provider.Log(ctx).With("namespace", ticker.Namespace, "ticker", ticker.Ticker, "duration_ms", time.Since(now).Milliseconds()).Print(ctx, " ", err)
293-
} else {
294-
provider.Log(ctx).With("namespace", ticker.Namespace, "ticker", ticker.Ticker, "duration_ms", time.Since(now).Milliseconds()).Print(ctx, " ", "COMPLETED")
295-
}
296-
case task := <-t.taskch:
297-
// Notification that a task has been locked for processing
298-
now := time.Now()
299-
provider.Log(ctx).With("queue", task.Queue, "id", *task.Id).Print(ctx, "TASK")
300-
if result, err := t.processTask(ctx, task); err != nil {
301-
provider.Log(ctx).With("queue", task.Queue, "id", *task.Id, "duration_ms", time.Since(now).Milliseconds()).Print(ctx, " ", err)
302-
} else {
303-
provider.Log(ctx).With("queue", task.Queue, "id", *task.Id, "result", result, "duration_ms", time.Since(now).Milliseconds()).Print(ctx, " ", "COMPLETED")
304-
}
248+
defer t.Done()
249+
if err := t.exec(ctx, fn, task.Payload); err != nil {
250+
// TODO: Fail task
251+
errch <- fmt.Errorf("TASK %q_%d: %w", task.Queue, task.Id, err)
252+
} else {
253+
// TODO: Release task
305254
}
306-
}
307-
308-
// Wait for goroutines to finish
309-
t.wg.Wait()
310-
311-
// Return success
312-
return nil
313-
}*/
255+
}(contextWithTask(ctx, task))
256+
}

0 commit comments

Comments
 (0)