Skip to content

Commit a6d59f7

Browse files
committed
Added conn to pgqueue
1 parent 2b6c6d5 commit a6d59f7

File tree

3 files changed

+13
-0
lines changed

3 files changed

+13
-0
lines changed

pkg/pgqueue/config/task.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ type task struct {
2929
}
3030

3131
var _ server.Task = (*task)(nil)
32+
var _ server.PGQueue = (*task)(nil)
3233

3334
////////////////////////////////////////////////////////////////////////////////
3435
// LIFECYCLE
@@ -154,6 +155,11 @@ FOR_LOOP:
154155
////////////////////////////////////////////////////////////////////////////////
155156
// PUBLIC METHODS
156157

158+
// Conn returns the underlying connection pool object.
159+
func (t *task) Conn() pg.PoolConn {
160+
return t.manager.Conn()
161+
}
162+
157163
// RegisterTicker registers a periodic task (ticker) with a callback function.
158164
// It returns the metadata of the registered ticker.
159165
func (t *task) RegisterTicker(ctx context.Context, meta schema.TickerMeta, fn server.PGCallback) (*schema.Ticker, error) {

pkg/pgqueue/manager.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ func (manager *Manager) Worker() string {
9090
return manager.worker
9191
}
9292

93+
func (manager *Manager) Conn() pg.PoolConn {
94+
return manager.conn
95+
}
96+
9397
////////////////////////////////////////////////////////////////////////////////
9498
// PUBLIC METHODS - TICKER
9599

plugin.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,9 @@ type PGCallback func(context.Context, any) error
116116

117117
// PGQueue defines methods for interacting with a PostgreSQL-backed task queue.
118118
type PGQueue interface {
119+
// Conn returns the underlying connection pool object.
120+
Conn() pg.PoolConn
121+
119122
// RegisterTicker registers a periodic task (ticker) with a callback function.
120123
// It returns the metadata of the registered ticker.
121124
RegisterTicker(context.Context, pgschema.TickerMeta, PGCallback) (*pgschema.Ticker, error)

0 commit comments

Comments
 (0)