Skip to content

Commit 1510458

Browse files
authored
Merge pull request #92 from mutablelogic/v5
Marshaler fixes
2 parents 384a108 + daf42bf commit 1510458

File tree

5 files changed

+84
-5
lines changed

5 files changed

+84
-5
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ go 1.23.5
44

55
require (
66
github.com/alecthomas/kong v1.10.0
7-
github.com/djthorpe/go-marshaler v0.0.15
7+
github.com/djthorpe/go-marshaler v1.0.0
88
github.com/djthorpe/go-pg v1.0.5
99
github.com/golang-jwt/jwt/v5 v5.2.2
1010
github.com/mutablelogic/go-client v1.0.12

pkg/pgqueue/config/task.go

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package config
33
import (
44
"context"
55
"errors"
6+
"reflect"
67
"strings"
78
"sync"
89
"time"
@@ -29,6 +30,7 @@ type task struct {
2930
}
3031

3132
var _ server.Task = (*task)(nil)
33+
var _ server.PGQueue = (*task)(nil)
3234

3335
////////////////////////////////////////////////////////////////////////////////
3436
// LIFECYCLE
@@ -38,7 +40,13 @@ func NewTask(manager *pgqueue.Manager, threads uint) (server.Task, error) {
3840
self.manager = manager
3941
self.taskpool = pgqueue.NewTaskPool(threads)
4042
self.callbacks = make(map[string]server.PGCallback, 100)
41-
self.decoder = marshaler.NewDecoder("json", marshaler.ConvertTime, marshaler.ConvertDuration, marshaler.ConvertIntUint)
43+
self.decoder = marshaler.NewDecoder("json",
44+
convertPtr,
45+
convertFloatToIntUint,
46+
marshaler.ConvertTime,
47+
marshaler.ConvertDuration,
48+
marshaler.ConvertIntUint,
49+
)
4250
return self, nil
4351
}
4452

@@ -154,6 +162,11 @@ FOR_LOOP:
154162
////////////////////////////////////////////////////////////////////////////////
155163
// PUBLIC METHODS
156164

165+
// Conn returns the underlying connection pool object.
166+
func (t *task) Conn() pg.PoolConn {
167+
return t.manager.Conn()
168+
}
169+
157170
// RegisterTicker registers a periodic task (ticker) with a callback function.
158171
// It returns the metadata of the registered ticker.
159172
func (t *task) RegisterTicker(ctx context.Context, meta schema.TickerMeta, fn server.PGCallback) (*schema.Ticker, error) {
@@ -283,3 +296,53 @@ func joinName(parts ...string) string {
283296
func splitName(name string, n int) []string {
284297
return strings.SplitN(name, namespaceSeparator, n)
285298
}
299+
300+
// //////////////////////////////////////////////////////////////////////////////
301+
// PRIVATE METHODS
302+
var (
303+
nilValue = reflect.ValueOf(nil)
304+
)
305+
306+
// convertPtr returns value if pointer
307+
func convertPtr(src reflect.Value, dest reflect.Type) (reflect.Value, error) {
308+
// Pass value through
309+
if src.Type() == dest {
310+
return src, nil
311+
}
312+
313+
// Convert src to elem
314+
if dest.Kind() == reflect.Ptr {
315+
if dest.Elem() == src.Type() {
316+
if src.CanAddr() {
317+
return src.Addr(), nil
318+
} else {
319+
src_ := reflect.New(dest.Elem())
320+
src_.Elem().Set(src)
321+
return src_, nil
322+
}
323+
}
324+
}
325+
326+
// Skip
327+
return nilValue, nil
328+
}
329+
330+
// convert float types to int or uint
331+
func convertFloatToIntUint(src reflect.Value, dest reflect.Type) (reflect.Value, error) {
332+
// Pass value through
333+
if src.Type() == dest {
334+
return src, nil
335+
}
336+
switch dest.Kind() {
337+
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
338+
if src.Kind() == reflect.Float64 || src.Kind() == reflect.Float32 {
339+
return reflect.ValueOf(int64(src.Float())), nil
340+
}
341+
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
342+
if src.Kind() == reflect.Float64 || src.Kind() == reflect.Float32 {
343+
return reflect.ValueOf(uint64(src.Float())), nil
344+
}
345+
}
346+
// Skip
347+
return nilValue, nil
348+
}

pkg/pgqueue/manager.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ func (manager *Manager) Worker() string {
9090
return manager.worker
9191
}
9292

93+
func (manager *Manager) Conn() pg.PoolConn {
94+
return manager.conn
95+
}
96+
9397
////////////////////////////////////////////////////////////////////////////////
9498
// PUBLIC METHODS - TICKER
9599

pkg/provider/meta/meta.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ func newMetaField(rf reflect.StructField) (*Meta, error) {
168168

169169
var (
170170
timeType = reflect.TypeOf(time.Time{})
171-
urlType = reflect.TypeOf((*url.URL)(nil)).Elem()
171+
urlType = reflect.TypeOf((*url.URL)(nil))
172172
durationType = reflect.TypeOf(time.Duration(0))
173173
)
174174

@@ -219,8 +219,16 @@ func setValue(rv reflect.Value, str string) error {
219219
case reflect.String:
220220
// String
221221
rv.SetString(str)
222+
case reflect.Ptr:
223+
if rv.Type() == urlType {
224+
if v, err := url.Parse(str); err == nil {
225+
rv.Set(reflect.ValueOf(v))
226+
return nil
227+
}
228+
}
229+
fallthrough
222230
default:
223-
// TODO URL and Datetime
231+
// TODO: Datetime
224232
return httpresponse.ErrBadRequest.Withf("invalid value for %s: %q", rv.Type(), str)
225233
}
226234

@@ -254,10 +262,11 @@ func typeName(rt reflect.Type) string {
254262
if subtype := typeName(rt.Elem()); subtype != "" && rt.Key().Kind() == reflect.String {
255263
return "map(" + subtype + ")"
256264
}
257-
case reflect.Struct:
265+
case reflect.Ptr:
258266
if rt == urlType {
259267
return "url"
260268
}
269+
case reflect.Struct:
261270
if rt == timeType {
262271
return "datetime"
263272
}

plugin.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,9 @@ type PGCallback func(context.Context, any) error
116116

117117
// PGQueue defines methods for interacting with a PostgreSQL-backed task queue.
118118
type PGQueue interface {
119+
// Conn returns the underlying connection pool object.
120+
Conn() pg.PoolConn
121+
119122
// RegisterTicker registers a periodic task (ticker) with a callback function.
120123
// It returns the metadata of the registered ticker.
121124
RegisterTicker(context.Context, pgschema.TickerMeta, PGCallback) (*pgschema.Ticker, error)

0 commit comments

Comments
 (0)