Skip to content

Commit 61393b4

Browse files
committed
Updated
1 parent 5ae652f commit 61393b4

File tree

3 files changed

+25
-0
lines changed

3 files changed

+25
-0
lines changed

pkg/types/ptr.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,14 @@ func PtrBool(v *bool) bool {
6767
return *v
6868
}
6969

70+
// PtrTime returns a pointer to a time.Time
71+
func TimePtr(t time.Time) *time.Time {
72+
if t.IsZero() {
73+
return nil
74+
}
75+
return &t
76+
}
77+
7078
// PtrTime returns a time.Time from a pointer
7179
func PtrTime(t *time.Time) time.Time {
7280
if t == nil {

plugin.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package server
33
import (
44
"context"
55
"net/http"
6+
"time"
67

78
// Packages
89
pg "github.com/djthorpe/go-pg"
@@ -103,6 +104,9 @@ type PGQueue interface {
103104
// Register a queue with a callback, and return the registered queue
104105
RegisterQueue(context.Context, schema.Queue, PGCallback) (*schema.Queue, error)
105106

107+
// Create a task for a queue with a payload and optional delay, and return it
108+
CreateTask(context.Context, string, any, time.Duration) (*schema.Task, error)
109+
106110
// Delete a ticker by name
107111
DeleteTicker(context.Context, string) error
108112
}

plugin/pgqueue/task.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ FOR_LOOP:
8282
case ticker := <-tickerch:
8383
t.execTicker(ctx, ticker, errch)
8484
case err := <-errch:
85+
// TODO: Handle errors, for now, just log them
8586
log.Println("ERROR:", err)
8687
}
8788
}
@@ -140,6 +141,18 @@ func (t *task) RegisterQueue(ctx context.Context, meta schema.Queue, fn server.P
140141
return queue, nil
141142
}
142143

144+
// Push a task onto a queue, and return the task
145+
func (t *task) CreateTask(ctx context.Context, queue string, payload any, delay time.Duration) (*schema.Task, error) {
146+
var delayedAt *time.Time
147+
if delay > 0 {
148+
delayedAt = types.TimePtr(time.Now().Add(delay))
149+
}
150+
return t.client.CreateTask(ctx, queue, schema.TaskMeta{
151+
Payload: payload,
152+
DelayedAt: delayedAt,
153+
})
154+
}
155+
143156
////////////////////////////////////////////////////////////////////////////////
144157
// PRIVATE METHODS
145158

0 commit comments

Comments
 (0)