Skip to content

Commit e964366

Browse files
committed
Updated
1 parent 1cd1687 commit e964366

File tree

8 files changed

+92
-21
lines changed

8 files changed

+92
-21
lines changed

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5Qvfr
3232
github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E=
3333
github.com/djthorpe/go-errors v1.0.3 h1:GZeMPkC1mx2vteXLI/gvxZS0Ee9zxzwD1mcYyKU5jD0=
3434
github.com/djthorpe/go-errors v1.0.3/go.mod h1:HtfrZnMd6HsX75Mtbv9Qcnn0BqOrrFArvCaj3RMnZhY=
35-
github.com/djthorpe/go-pg v1.0.4 h1:yw/qsNV4s5deiHjBh/5u0qAR50uzyN+xR7sNRxRVIlo=
36-
github.com/djthorpe/go-pg v1.0.4/go.mod h1:XHl/w8+66Hs746nOYd+gdjqPImNuLVZ5UsXLI47rb4c=
35+
github.com/djthorpe/go-pg v1.0.5 h1:UYCV5fSXOJEFTafem1wB57RK2J0V7Nr9nCquC5sO+ZE=
36+
github.com/djthorpe/go-pg v1.0.5/go.mod h1:XHl/w8+66Hs746nOYd+gdjqPImNuLVZ5UsXLI47rb4c=
3737
github.com/docker/docker v27.1.1+incompatible h1:hO/M4MtV36kzKldqnA37IWhebRA+LnqqcqDja6kVaKY=
3838
github.com/docker/docker v27.1.1+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
3939
github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c=

pkg/pgqueue/client/queue.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,3 +79,16 @@ func (c *Client) UpdateQueue(ctx context.Context, name string, meta schema.Queue
7979
// Return the response
8080
return &response, nil
8181
}
82+
83+
func (c *Client) CleanQueue(ctx context.Context, name string) ([]schema.Task, error) {
84+
req := client.NewRequest()
85+
86+
// Perform request
87+
var response []schema.Task
88+
if err := c.DoWithContext(ctx, req, &response, client.OptPath("queue", name, "clean")); err != nil {
89+
return nil, err
90+
}
91+
92+
// Return the response
93+
return response, nil
94+
}

pkg/pgqueue/cmd/queue.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ type QueueCommands struct {
1919
CreateQueue QueueCreateCommand `cmd:"create" group:"QUEUE" help:"Create a new queue"`
2020
UpdateQueue QueueUpdateCommand `cmd:"update" group:"QUEUE" help:"Update queue"`
2121
DeleteQueue QueueDeleteCommand `cmd:"delete" group:"QUEUE" help:"Delete queue"`
22+
CleanQueue QueueCleanCommand `cmd:"clean" group:"QUEUE" help:"Clean queue"`
2223
}
2324

2425
type QueueListCommand struct {
@@ -42,6 +43,10 @@ type QueueDeleteCommand struct {
4243
QueueGetCommand
4344
}
4445

46+
type QueueCleanCommand struct {
47+
QueueGetCommand
48+
}
49+
4550
///////////////////////////////////////////////////////////////////////////////
4651
// PUBLIC METHODS
4752

@@ -106,3 +111,16 @@ func (cmd QueueUpdateCommand) Run(ctx server.Cmd) error {
106111
return nil
107112
})
108113
}
114+
115+
func (cmd QueueCleanCommand) Run(ctx server.Cmd) error {
116+
return run(ctx, func(ctx context.Context, provider *client.Client) error {
117+
tasks, err := provider.CleanQueue(ctx, cmd.Queue)
118+
if err != nil {
119+
return err
120+
}
121+
122+
// Print tasks that were cleaned
123+
fmt.Println(tasks)
124+
return nil
125+
})
126+
}

pkg/pgqueue/config/task.go

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -174,14 +174,19 @@ func (t *task) tryTicker(ctx context.Context, taskpool *pgqueue.TaskPool, ticker
174174
}
175175

176176
func (t *task) tryTask(ctx context.Context, taskpool *pgqueue.TaskPool, task *schema.Task) {
177+
now := time.Now()
177178
taskpool.RunTask(ctx, task, taskFunc, task.Payload, func(err error) {
178-
// Task succeeded
179-
if err == nil {
180-
t.manager.ReleaseTask(ctx, task.Id, true, nil, nil)
181-
return
179+
var status string
180+
delta := time.Since(now).Truncate(time.Millisecond)
181+
if _, err_ := t.manager.ReleaseTask(context.TODO(), task.Id, err == nil, err, &status); err_ != nil {
182+
err = errors.Join(err, err_)
183+
}
184+
switch {
185+
case err == nil:
186+
ref.Log(ctx).With("task", task, "delta_ms", delta.Milliseconds(), "status", "success").Print(ctx, "Task completed in ", delta)
187+
default:
188+
ref.Log(ctx).With("task", task, "delta_ms", delta.Milliseconds(), "status", status, "error", err.Error()).Printf(ctx, "Failed (with %s) after %v: %v", status, delta, err)
182189
}
183-
// Fail the task
184-
t.manager.ReleaseTask(ctx, task.Id, false, err.Error(), nil)
185190
})
186191
}
187192

@@ -190,23 +195,15 @@ func (t *task) tryTask(ctx context.Context, taskpool *pgqueue.TaskPool, task *sc
190195

191196
func taskFunc(ctx context.Context, payload any) error {
192197
var err error
193-
if rand.Intn(2) == 0 {
198+
if rand.Intn(2) == 1 {
194199
err = errors.New("random error")
195200
}
196-
197-
ref.Log(ctx).With("task", ref.Task(ctx)).Print(ctx, "Running task with payload ", payload)
198201
select {
199202
case <-ctx.Done():
200-
ref.Log(ctx).With("task", ref.Task(ctx)).Print(ctx, "Task deadline exceeded")
201203
return ctx.Err()
202204
case <-time.After(time.Second * time.Duration(rand.Intn(10))):
203-
if err != nil {
204-
ref.Log(ctx).With("task", ref.Task(ctx)).Print(ctx, "Task failed: ", err)
205-
} else {
206-
ref.Log(ctx).With("task", ref.Task(ctx)).Print(ctx, "Task succeeded")
207-
}
205+
return err
208206
}
209-
return err
210207
}
211208

212209
func tickerFunc(ctx context.Context, payload any) error {

pkg/pgqueue/handler/queue.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,20 @@ func registerQueue(ctx context.Context, router server.HTTPRouter, prefix string,
5151
_ = httpresponse.Error(w, httpresponse.Err(http.StatusMethodNotAllowed), r.Method)
5252
}
5353
})
54+
55+
router.HandleFunc(ctx, types.JoinPath(prefix, "queue/{name}/clean"), func(w http.ResponseWriter, r *http.Request) {
56+
defer r.Body.Close()
57+
httpresponse.Cors(w, r, router.Origin(), http.MethodGet)
58+
59+
switch r.Method {
60+
case http.MethodOptions:
61+
_ = httpresponse.Empty(w, http.StatusOK)
62+
case http.MethodGet:
63+
_ = queueClean(w, r, manager, r.PathValue("name"))
64+
default:
65+
_ = httpresponse.Error(w, httpresponse.Err(http.StatusMethodNotAllowed), r.Method)
66+
}
67+
})
5468
}
5569

5670
///////////////////////////////////////////////////////////////////////////////
@@ -73,6 +87,17 @@ func queueList(w http.ResponseWriter, r *http.Request, manager *pgqueue.Manager)
7387
return httpresponse.JSON(w, http.StatusOK, httprequest.Indent(r), response)
7488
}
7589

90+
func queueClean(w http.ResponseWriter, r *http.Request, manager *pgqueue.Manager, name string) error {
91+
// Get the dead tasks
92+
response, err := manager.CleanQueue(r.Context(), name)
93+
if err != nil {
94+
return httpresponse.Error(w, err)
95+
}
96+
97+
// Return success
98+
return httpresponse.JSON(w, http.StatusOK, httprequest.Indent(r), response)
99+
}
100+
76101
func queueCreate(w http.ResponseWriter, r *http.Request, manager *pgqueue.Manager) error {
77102
// Parse request
78103
var req schema.QueueMeta

pkg/pgqueue/manager.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,15 @@ func (manager *Manager) UpdateQueue(ctx context.Context, name string, meta schem
319319
return &queue, nil
320320
}
321321

322+
// CleanQueue removes stale tasks from a queue, and returns the tasks removed
323+
func (manager *Manager) CleanQueue(ctx context.Context, name string) ([]schema.Task, error) {
324+
var resp schema.QueueCleanResponse
325+
if err := manager.conn.List(ctx, &resp, schema.QueueCleanRequest{Queue: name}); err != nil {
326+
return nil, httperr(err)
327+
}
328+
return resp.Body, nil
329+
}
330+
322331
////////////////////////////////////////////////////////////////////////////////
323332
// PUBLIC METHODS - TASK
324333

pkg/pgqueue/schema/queue.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@ type QueueList struct {
3939
Body []Queue `json:"body,omitempty"`
4040
}
4141

42-
type QueueCleanRequest struct{}
42+
type QueueCleanRequest struct {
43+
Queue string `json:"queue,omitempty" arg:"" help:"Queue name"`
44+
}
4345

4446
type QueueCleanResponse struct {
4547
Body []Task `json:"body,omitempty"`
@@ -164,6 +166,13 @@ func (q QueueName) Select(bind *pg.Bind, op pg.Op) (string, error) {
164166
}
165167

166168
func (q QueueCleanRequest) Select(bind *pg.Bind, op pg.Op) (string, error) {
169+
// Set queue name
170+
if name, err := QueueName(q.Queue).queueName(); err != nil {
171+
return "", err
172+
} else {
173+
bind.Set("id", name)
174+
}
175+
167176
switch op {
168177
case pg.List:
169178
return queueClean, nil

pkg/pgqueue/taskpool.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,8 @@ func runTask(parent context.Context, deadline time.Duration, fn server.PGCallbac
121121
errs = errors.Join(errs, err)
122122
}
123123

124-
// Concatenate any errors from the deadline
125-
if ctx.Err() != nil {
124+
// Concatenate any errors from the context if not already present
125+
if ctx.Err() != nil && !errors.Is(errs, ctx.Err()) {
126126
errs = errors.Join(errs, ctx.Err())
127127
}
128128

0 commit comments

Comments
 (0)