Skip to content

Commit e2b4982

Browse files
committed
Updated
1 parent c8433f2 commit e2b4982

File tree

4 files changed

+146
-22
lines changed

4 files changed

+146
-22
lines changed

pkg/pgqueue/opt.go

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212

1313
type opt struct {
1414
worker string
15-
schema string
1615
}
1716

1817
// Opt represents a function that modifies the options
@@ -38,7 +37,7 @@ func applyOpts(opts ...Opt) (*opt, error) {
3837
////////////////////////////////////////////////////////////////////////////////
3938
// PUBLIC METHODS
4039

41-
func OptWorker(worker string) Opt {
40+
func OptWorker(v string) Opt {
4241
return func(o *opt) error {
4342
if v = strings.TrimSpace(v); v == "" {
4443
return httpresponse.ErrBadRequest.With("empty worker name")
@@ -47,13 +46,3 @@ func OptWorker(worker string) Opt {
4746
return nil
4847
}
4948
}
50-
51-
func OptSchema(v string) Opt {
52-
return func(o *opt) error {
53-
if v = strings.TrimSpace(v); v == "" {
54-
return httpresponse.ErrBadRequest.With("empty schema name")
55-
}
56-
o.schema = v
57-
return nil
58-
}
59-
}

pkg/pgqueue/pgqueue.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package pgqueue
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"os"
78
"strings"
@@ -69,6 +70,7 @@ func New(ctx context.Context, conn pg.PoolConn, opt ...Opt) (*Client, error) {
6970
////////////////////////////////////////////////////////////////////////////////
7071
// PUBLIC METHODS
7172

73+
// CreateQueue creates a new queue, and returns it.
7274
func (client *Client) CreateQueue(ctx context.Context, meta schema.Queue) (*schema.Queue, error) {
7375
var queue schema.Queue
7476
if err := client.conn.Tx(ctx, func(conn pg.Conn) error {
@@ -78,3 +80,27 @@ func (client *Client) CreateQueue(ctx context.Context, meta schema.Queue) (*sche
7880
}
7981
return &queue, nil
8082
}
83+
84+
// GetQueue returns a queue with the given name.
85+
func (client *Client) GetQueue(ctx context.Context, name string) (*schema.Queue, error) {
86+
var queue schema.Queue
87+
if err := client.conn.Get(ctx, &queue, schema.QueueName(name)); err != nil {
88+
if errors.Is(err, pg.ErrNotFound) {
89+
return nil, httpresponse.ErrNotFound.Withf("Queue %q not found", name)
90+
}
91+
return nil, err
92+
}
93+
return &queue, nil
94+
}
95+
96+
// DeleteQueue deletes a queue with the given name, and returns the deleted queue.
97+
func (client *Client) DeleteQueue(ctx context.Context, name string) (*schema.Queue, error) {
98+
var queue schema.Queue
99+
if err := client.conn.Delete(ctx, &queue, schema.QueueName(name)); err != nil {
100+
if errors.Is(err, pg.ErrNotFound) {
101+
return nil, httpresponse.ErrNotFound.Withf("Queue %q not found", name)
102+
}
103+
return nil, err
104+
}
105+
return &queue, nil
106+
}

pkg/pgqueue/pgqueue_test.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package pgqueue_test
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
// Packages
9+
test "github.com/djthorpe/go-pg/pkg/test"
10+
pgqueue "github.com/mutablelogic/go-server/pkg/pgqueue"
11+
"github.com/mutablelogic/go-server/pkg/pgqueue/schema"
12+
"github.com/mutablelogic/go-service/pkg/types"
13+
assert "github.com/stretchr/testify/assert"
14+
)
15+
16+
// Global connection variable
17+
var conn test.Conn
18+
19+
// Start up a container and test the pool
20+
func TestMain(m *testing.M) {
21+
test.Main(m, &conn)
22+
}
23+
24+
func Test_Queue_001(t *testing.T) {
25+
assert := assert.New(t)
26+
conn := conn.Begin(t)
27+
defer conn.Close()
28+
29+
// Ping the database
30+
assert.NoError(conn.Ping(context.Background()))
31+
32+
// Create pgqueue
33+
client, err := pgqueue.New(context.TODO(), conn.PoolConn, pgqueue.OptWorker(t.Name()))
34+
assert.NoError(err)
35+
assert.NotNil(client)
36+
37+
// Create queue
38+
t.Run("CreateQueue_1", func(t *testing.T) {
39+
queue, err := client.CreateQueue(context.TODO(), schema.Queue{
40+
Queue: "queue_name_1",
41+
})
42+
assert.NoError(err)
43+
assert.NotNil(queue)
44+
})
45+
46+
// Create queue
47+
t.Run("CreateQueue_2", func(t *testing.T) {
48+
queue, err := client.CreateQueue(context.TODO(), schema.Queue{
49+
Queue: "queue_name_2",
50+
TTL: types.DurationPtr(5 * time.Hour),
51+
Retries: types.Uint64Ptr(10),
52+
RetryDelay: types.DurationPtr(5 * time.Minute),
53+
})
54+
assert.NoError(err)
55+
assert.NotNil(queue)
56+
})
57+
58+
// Create queue then get queue
59+
t.Run("GetQueue", func(t *testing.T) {
60+
queue, err := client.CreateQueue(context.TODO(), schema.Queue{
61+
Queue: "queue_name_3",
62+
TTL: types.DurationPtr(10 * time.Hour),
63+
Retries: types.Uint64Ptr(10),
64+
RetryDelay: types.DurationPtr(5 * time.Minute),
65+
})
66+
assert.NoError(err)
67+
assert.NotNil(queue)
68+
69+
queue2, err := client.GetQueue(context.TODO(), queue.Queue)
70+
assert.NoError(err)
71+
assert.NotNil(queue2)
72+
assert.Equal(queue.Queue, queue2.Queue)
73+
})
74+
75+
// Get queue
76+
t.Run("GetQueue_2", func(t *testing.T) {
77+
queue, err := client.GetQueue(context.TODO(), "non_extistent_queue")
78+
assert.NoError(err)
79+
assert.NotNil(queue)
80+
t.Log(queue)
81+
})
82+
/*
83+
// Create queue then delete queue
84+
85+
t.Run("DeleteQueue", func(t *testing.T) {
86+
queue, err := client.CreateQueue(context.TODO(), schema.Queue{
87+
Queue: "queue_name_4",
88+
})
89+
assert.NoError(err)
90+
assert.NotNil(queue)
91+
92+
queue2, err := client.DeleteQueue(context.TODO(), "q")
93+
assert.NoError(err)
94+
assert.NotNil(queue2)
95+
assert.Equal(queue.Queue, queue2.Queue)
96+
97+
_, err = client.GetQueue(context.TODO(), queue.Queue)
98+
assert.NoError(err)
99+
})
100+
*/
101+
}

pkg/pgqueue/schema/queue.go

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package schema
33
import (
44
"context"
55
"encoding/json"
6-
"fmt"
76
"strings"
87
"time"
98

@@ -16,6 +15,8 @@ import (
1615
////////////////////////////////////////////////////////////////////////////////
1716
// TYPES
1817

18+
type QueueName string
19+
1920
type Queue struct {
2021
Queue string `json:"queue,omitempty"`
2122
TTL *time.Duration `json:"ttl,omitempty"`
@@ -129,7 +130,14 @@ func (l *QueueStatusResponse) Scan(row pg.Row) error {
129130
////////////////////////////////////////////////////////////////////////////////
130131
// SELECTOR
131132

132-
func (q Queue) Select(bind *pg.Bind, op pg.Op) (string, error) {
133+
func (q QueueName) Select(bind *pg.Bind, op pg.Op) (string, error) {
134+
// Set queue name
135+
if name, err := q.queueName(); err != nil {
136+
return "", err
137+
} else {
138+
bind.Set("id", name)
139+
}
140+
133141
switch op {
134142
case pg.Get:
135143
return queueGet, nil
@@ -138,7 +146,7 @@ func (q Queue) Select(bind *pg.Bind, op pg.Op) (string, error) {
138146
case pg.Delete:
139147
return queueDelete, nil
140148
default:
141-
return "", fmt.Errorf("Unsupported Queue operation %q", op)
149+
return "", httpresponse.ErrInternalError.Withf("Unsupported QueueName operation %q", op)
142150
}
143151
}
144152

@@ -147,7 +155,7 @@ func (q QueueCleanRequest) Select(bind *pg.Bind, op pg.Op) (string, error) {
147155
case pg.List:
148156
return queueClean, nil
149157
default:
150-
return "", fmt.Errorf("Unsupported QueueCleanRequest operation %q", op)
158+
return "", httpresponse.ErrInternalError.Withf("Unsupported QueueCleanRequest operation %q", op)
151159
}
152160
}
153161

@@ -160,7 +168,7 @@ func (l QueueListRequest) Select(bind *pg.Bind, op pg.Op) (string, error) {
160168
case pg.List:
161169
return queueList, nil
162170
default:
163-
return "", fmt.Errorf("Unsupported QueueListRequest operation %q", op)
171+
return "", httpresponse.ErrInternalError.Withf("Unsupported QueueListRequest operation %q", op)
164172
}
165173
}
166174

@@ -183,7 +191,7 @@ func (l QueueStatusRequest) Select(bind *pg.Bind, op pg.Op) (string, error) {
183191
case pg.List:
184192
return queueStats, nil
185193
default:
186-
return "", fmt.Errorf("Unsupported QueueStatusRequest operation %q", op)
194+
return "", httpresponse.ErrInternalError.Withf("Unsupported QueueStatusRequest operation %q", op)
187195
}
188196
}
189197

@@ -193,7 +201,7 @@ func (l QueueStatusRequest) Select(bind *pg.Bind, op pg.Op) (string, error) {
193201
// Insert
194202
func (q Queue) Insert(bind *pg.Bind) (string, error) {
195203
// Queue name
196-
queue, err := q.queueName()
204+
queue, err := QueueName(q.Queue).queueName()
197205
if err != nil {
198206
return "", err
199207
} else {
@@ -209,7 +217,7 @@ func (q Queue) Update(bind *pg.Bind) error {
209217
var patch []string
210218

211219
// Queue name
212-
if queue, err := q.queueName(); err != nil {
220+
if queue, err := QueueName(q.Queue).queueName(); err != nil {
213221
return err
214222
} else {
215223
patch = append(patch, `queue=`+bind.Set("queue", queue))
@@ -238,8 +246,8 @@ func (q Queue) Update(bind *pg.Bind) error {
238246
}
239247

240248
// Normalize queue name
241-
func (q Queue) queueName() (string, error) {
242-
if queue := strings.ToLower(strings.TrimSpace(q.Queue)); queue == "" {
249+
func (q QueueName) queueName() (string, error) {
250+
if queue := strings.ToLower(strings.TrimSpace(string(q))); queue == "" {
243251
return "", httpresponse.ErrBadRequest.With("Missing queue name")
244252
} else if !types.IsIdentifier(queue) {
245253
return "", httpresponse.ErrBadRequest.With("Invalid queue name")

0 commit comments

Comments
 (0)