@@ -3,13 +3,11 @@ package pgqueue
3
3
import (
4
4
"context"
5
5
"errors"
6
- "time"
7
6
8
7
// Packages
9
8
pg "github.com/djthorpe/go-pg"
10
9
httpresponse "github.com/mutablelogic/go-server/pkg/httpresponse"
11
10
schema "github.com/mutablelogic/go-server/pkg/pgqueue/schema"
12
- "github.com/mutablelogic/go-server/pkg/types"
13
11
)
14
12
15
13
////////////////////////////////////////////////////////////////////////////////
@@ -79,214 +77,3 @@ func (client *Client) Close(ctx context.Context) error {
79
77
func (client * Client ) Worker () string {
80
78
return client .worker
81
79
}
82
-
83
- // RegisterQueue creates a new queue, or updates an existing queue, and returns it.
84
- func (client * Client ) RegisterQueue (ctx context.Context , meta schema.Queue ) (* schema.Queue , error ) {
85
- var queue schema.Queue
86
- if err := client .conn .Tx (ctx , func (conn pg.Conn ) error {
87
- // Get a queue
88
- if err := conn .Get (ctx , & queue , schema .QueueName (meta .Queue )); err != nil && ! errors .Is (err , pg .ErrNotFound ) {
89
- return err
90
- } else if errors .Is (err , pg .ErrNotFound ) {
91
- // If the queue does not exist, then create it
92
- return conn .Insert (ctx , & queue , meta )
93
- } else {
94
- // If the queue exists, then update it
95
- return conn .Update (ctx , & queue , schema .QueueName (meta .Queue ), meta )
96
- }
97
- }); err != nil {
98
- return nil , err
99
- }
100
- return & queue , nil
101
- }
102
-
103
- // CreateQueue creates a new queue, and returns it.
104
- func (client * Client ) CreateQueue (ctx context.Context , meta schema.Queue ) (* schema.Queue , error ) {
105
- var queue schema.Queue
106
- if err := client .conn .Tx (ctx , func (conn pg.Conn ) error {
107
- if err := client .conn .Insert (ctx , & queue , meta ); err != nil {
108
- return err
109
- } else if err := conn .Update (ctx , & queue , schema .QueueName (queue .Queue ), meta ); err != nil {
110
- return err
111
- }
112
- // Commit the transaction
113
- return nil
114
- }); err != nil {
115
- return nil , err
116
- }
117
- return & queue , nil
118
- }
119
-
120
- // GetQueue returns a queue with the given name.
121
- func (client * Client ) GetQueue (ctx context.Context , name string ) (* schema.Queue , error ) {
122
- var queue schema.Queue
123
- if err := client .conn .Get (ctx , & queue , schema .QueueName (name )); err != nil {
124
- if errors .Is (err , pg .ErrNotFound ) {
125
- return nil , httpresponse .ErrNotFound .Withf ("Queue %q not found" , name )
126
- }
127
- return nil , err
128
- }
129
- return & queue , nil
130
- }
131
-
132
- // DeleteQueue deletes a queue with the given name, and returns the deleted queue.
133
- func (client * Client ) DeleteQueue (ctx context.Context , name string ) (* schema.Queue , error ) {
134
- var queue schema.Queue
135
- if err := client .conn .Delete (ctx , & queue , schema .QueueName (name )); err != nil {
136
- if errors .Is (err , pg .ErrNotFound ) {
137
- return nil , httpresponse .ErrNotFound .Withf ("Queue %q not found" , name )
138
- }
139
- return nil , err
140
- }
141
- return & queue , nil
142
- }
143
-
144
- // UpdateQueue updates an existing queue with the given name, and returns the queue.
145
- func (client * Client ) UpdateQueue (ctx context.Context , name string , meta schema.Queue ) (* schema.Queue , error ) {
146
- var queue schema.Queue
147
- if err := client .conn .Update (ctx , & queue , schema .QueueName (name ), meta ); err != nil {
148
- if errors .Is (err , pg .ErrNotFound ) {
149
- return nil , httpresponse .ErrNotFound .Withf ("Queue %q not found" , name )
150
- }
151
- return nil , err
152
- }
153
- return & queue , nil
154
- }
155
-
156
- // ListQueues returns all queues as a list
157
- func (client * Client ) ListQueues (ctx context.Context , req schema.QueueListRequest ) (* schema.QueueList , error ) {
158
- var list schema.QueueList
159
-
160
- // Perform list
161
- list .Body = make ([]schema.Queue , 0 , 10 )
162
- if err := client .conn .List (ctx , & list , req ); err != nil {
163
- return nil , err
164
- }
165
- return & list , nil
166
- }
167
-
168
- // RegisterTicker creates a new ticker, or updates an existing ticker, and returns it.
169
- func (client * Client ) RegisterTicker (ctx context.Context , meta schema.TickerMeta ) (* schema.Ticker , error ) {
170
- var ticker schema.Ticker
171
- if err := client .conn .Tx (ctx , func (conn pg.Conn ) error {
172
- // Get a ticker
173
- if err := conn .Get (ctx , & ticker , schema .TickerName (meta .Ticker )); err != nil && ! errors .Is (err , pg .ErrNotFound ) {
174
- return err
175
- } else if errors .Is (err , pg .ErrNotFound ) {
176
- // If the ticker does not exist, then create it
177
- return conn .Insert (ctx , & ticker , meta )
178
- } else {
179
- // If the ticker exists, then update it
180
- return conn .Update (ctx , & ticker , schema .TickerName (meta .Ticker ), meta )
181
- }
182
- }); err != nil {
183
- return nil , err
184
- }
185
- return & ticker , nil
186
- }
187
-
188
- // CreateTicker creates a new ticker, and returns it.
189
- func (client * Client ) CreateTicker (ctx context.Context , meta schema.TickerMeta ) (* schema.Ticker , error ) {
190
- var ticker schema.Ticker
191
- if err := client .conn .Tx (ctx , func (conn pg.Conn ) error {
192
- return client .conn .Insert (ctx , & ticker , meta )
193
- }); err != nil {
194
- return nil , err
195
- }
196
- return & ticker , nil
197
- }
198
-
199
- // GetTicker returns a ticker with the given name.
200
- func (client * Client ) GetTicker (ctx context.Context , name string ) (* schema.Ticker , error ) {
201
- var ticker schema.Ticker
202
- if err := client .conn .Get (ctx , & ticker , schema .TickerName (name )); err != nil {
203
- if errors .Is (err , pg .ErrNotFound ) {
204
- return nil , httpresponse .ErrNotFound .Withf ("Ticker %q not found" , name )
205
- }
206
- return nil , err
207
- }
208
- return & ticker , nil
209
- }
210
-
211
- // UpdateTicker updates a ticker with the given name.
212
- func (client * Client ) UpdateTicker (ctx context.Context , name string , meta schema.TickerMeta ) (* schema.Ticker , error ) {
213
- var ticker schema.Ticker
214
- if err := client .conn .Update (ctx , & ticker , schema .TickerName (name ), meta ); err != nil {
215
- if errors .Is (err , pg .ErrNotFound ) {
216
- return nil , httpresponse .ErrNotFound .Withf ("Ticker %q not found" , name )
217
- }
218
- return nil , err
219
- }
220
- return & ticker , nil
221
- }
222
-
223
- // DeleteTicker deletes an existing ticker, and returns the deleted ticker.
224
- func (client * Client ) DeleteTicker (ctx context.Context , name string ) (* schema.Ticker , error ) {
225
- var ticker schema.Ticker
226
- if err := client .conn .Delete (ctx , & ticker , schema .TickerName (name )); err != nil {
227
- if errors .Is (err , pg .ErrNotFound ) {
228
- return nil , httpresponse .ErrNotFound .Withf ("Ticker %q not found" , name )
229
- }
230
- return nil , err
231
- }
232
- return & ticker , nil
233
- }
234
-
235
- // ListTickers returns all tickers in a namespace as a list
236
- func (client * Client ) ListTickers (ctx context.Context , req schema.TickerListRequest ) (* schema.TickerList , error ) {
237
- var list schema.TickerList
238
-
239
- // Perform list
240
- list .Body = make ([]schema.Ticker , 0 , 10 )
241
- if err := client .conn .List (ctx , & list , req ); err != nil {
242
- return nil , err
243
- }
244
- return & list , nil
245
- }
246
-
247
- // NextTicker returns the next matured ticker, or nil
248
- func (client * Client ) NextTicker (ctx context.Context ) (* schema.Ticker , error ) {
249
- var ticker schema.Ticker
250
- if err := client .conn .Get (ctx , & ticker , schema.TickerNext {}); errors .Is (err , pg .ErrNotFound ) {
251
- // No matured ticker
252
- return nil , nil
253
- } else if err != nil {
254
- return nil , err
255
- }
256
-
257
- // Return matured ticker
258
- return & ticker , nil
259
- }
260
-
261
- // RunTickerLoop runs a loop to process matured tickers, or NextTicker returns an error
262
- func (client * Client ) RunTickerLoop (ctx context.Context , ch chan <- * schema.Ticker ) error {
263
- delta := schema .TickerPeriod
264
- timer := time .NewTimer (100 * time .Millisecond )
265
- defer timer .Stop ()
266
-
267
- // Loop until context is cancelled
268
- for {
269
- select {
270
- case <- ctx .Done ():
271
- return nil
272
- case <- timer .C :
273
- // Check for matured tickers
274
- ticker , err := client .NextTicker (ctx )
275
- if err != nil {
276
- return err
277
- }
278
-
279
- if ticker != nil {
280
- ch <- ticker
281
-
282
- // Reset timer to minimum period
283
- if dur := types .PtrDuration (ticker .Interval ); dur >= time .Second && dur < delta {
284
- delta = dur
285
- }
286
- }
287
-
288
- // Next loop
289
- timer .Reset (delta )
290
- }
291
- }
292
- }
0 commit comments