@@ -42,6 +42,7 @@ func NewTask(manager *pgqueue.Manager, threads uint) (server.Task, error) {
42
42
self .callbacks = make (map [string ]server.PGCallback , 100 )
43
43
self .decoder = marshaler .NewDecoder ("json" ,
44
44
convertPtr ,
45
+ convertPGTime ,
45
46
convertFloatToIntUint ,
46
47
marshaler .ConvertTime ,
47
48
marshaler .ConvertDuration ,
@@ -147,7 +148,9 @@ FOR_LOOP:
147
148
}
148
149
n += len (tasks )
149
150
}
150
- ref .Log (ctx ).With ("ticker" , evt ).Debug (parent , "removed " , n , " tasks from queue" )
151
+ if n > 0 {
152
+ ref .Log (ctx ).With ("ticker" , evt ).Debug (parent , "removed " , n , " tasks from queue" )
153
+ }
151
154
}
152
155
}
153
156
}
@@ -299,16 +302,32 @@ func joinName(parts ...string) string {
299
302
return strings .Join (parts , namespaceSeparator )
300
303
}
301
304
302
- func splitName (name string , n int ) []string {
303
- return strings .SplitN (name , namespaceSeparator , n )
304
- }
305
-
306
305
// //////////////////////////////////////////////////////////////////////////////
307
306
// PRIVATE METHODS
307
+
308
308
var (
309
309
nilValue = reflect .ValueOf (nil )
310
+ timeType = reflect .TypeOf (time.Time {})
310
311
)
311
312
313
+ // convertTime returns time in postgres format
314
+ func convertPGTime (src reflect.Value , dest reflect.Type ) (reflect.Value , error ) {
315
+ // Pass value through
316
+ if src .Type () == dest {
317
+ return src , nil
318
+ }
319
+
320
+ if dest == timeType {
321
+ // Convert time 2025-05-03T17:29:32.329803 => time.Time
322
+ if t , err := time .Parse ("2006-01-02T15:04:05.999999999" , src .String ()); err == nil {
323
+ return reflect .ValueOf (t ), nil
324
+ }
325
+ }
326
+
327
+ // Skip
328
+ return nilValue , nil
329
+ }
330
+
312
331
// convertPtr returns value if pointer
313
332
func convertPtr (src reflect.Value , dest reflect.Type ) (reflect.Value , error ) {
314
333
// Pass value through
0 commit comments