Skip to content

Commit e226fef

Browse files
committed
Updated
1 parent 30ac1a4 commit e226fef

File tree

3 files changed

+42
-1
lines changed

3 files changed

+42
-1
lines changed

cmd/server/service.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,13 @@ func (cmd *ServiceRunCommand) Run(app server.Cmd) error {
9292
config.Pool = pool
9393
}
9494

95+
// Set the queue
96+
if queue, ok := ref.Provider(ctx).Task(ctx, "pgqueue").(server.PGQueue); !ok || queue == nil {
97+
return nil, httpresponse.ErrInternalError.Withf("Invalid connection pool %q", "pgqueue")
98+
} else {
99+
config.Queue = queue
100+
}
101+
95102
return config, nil
96103

97104
case "httpserver":

pkg/cert/config/config.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ import (
1010
certhandler "github.com/mutablelogic/go-server/pkg/cert/handler"
1111
schema "github.com/mutablelogic/go-server/pkg/cert/schema"
1212
httpresponse "github.com/mutablelogic/go-server/pkg/httpresponse"
13+
pgqueue "github.com/mutablelogic/go-server/pkg/pgqueue/schema"
14+
"github.com/mutablelogic/go-server/pkg/ref"
15+
"github.com/mutablelogic/go-server/pkg/types"
1316
)
1417

1518
////////////////////////////////////////////////////////////////////////////////
@@ -28,6 +31,7 @@ type Config struct {
2831
StreetAddress string `name:"street" help:"Street address"`
2932
PostalCode string `name:"postal" help:"Postal code"`
3033
} `embed:"" prefix:"root."`
34+
Queue server.PGQueue `kong:"-"` // Connection to queue
3135
}
3236

3337
////////////////////////////////////////////////////////////////////////////////
@@ -67,6 +71,17 @@ func (c Config) New(ctx context.Context) (server.Task, error) {
6771
certhandler.RegisterCert(ctx, c.Router, c.Prefix, certmanager)
6872
}
6973

74+
// Queue task to check for SSL expiry
75+
if c.Queue != nil {
76+
c.Queue.RegisterTicker(ctx, pgqueue.TickerMeta{
77+
Ticker: c.Name(),
78+
Interval: types.DurationPtr(time.Minute),
79+
}, func(ctx context.Context, _ any) error {
80+
ref.Log(ctx).Print(ctx, "Checking for SSL expiry...")
81+
return nil
82+
})
83+
}
84+
7085
// Return the task
7186
return newTaskWith(certmanager), nil
7287
}

pkg/pgqueue/config/task.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,11 @@ func (t *task) RegisterTicker(ctx context.Context, meta schema.TickerMeta, fn se
130130
if err != nil {
131131
return nil, err
132132
}
133-
// TODO: Register the ticker callback
133+
134+
// Register the ticker callback
135+
t.registerCallback(ticker.Namespace, ticker.Ticker, fn)
136+
137+
// Return the ticker metadata
134138
return ticker, nil
135139
}
136140

@@ -141,14 +145,29 @@ func (t *task) RegisterQueue(ctx context.Context, meta schema.QueueMeta, fn serv
141145
if err != nil {
142146
return nil, err
143147
}
148+
144149
// Register a queue cleanup timer
150+
// TODO: queue name should include the namespace
145151
if _, err := t.manager.RegisterTickerNs(ctx, schema.CleanupNamespace, schema.TickerMeta{
146152
Ticker: queue.Queue,
147153
Interval: queue.TTL,
148154
}); err != nil {
149155
_, err_ := t.manager.DeleteQueue(ctx, meta.Queue)
150156
return nil, errors.Join(err, err_)
157+
} else {
158+
t.registerCallback(schema.CleanupNamespace, queue.Queue, func(ctx context.Context, _ any) error {
159+
// Cleanup the queue
160+
if _, err := t.manager.CleanQueue(ctx, queue.Queue); err != nil {
161+
return err
162+
}
163+
return nil
164+
})
151165
}
166+
167+
// Register the task callback
168+
t.registerCallback(queue.Namespace, queue.Queue, fn)
169+
170+
// Return the queue metadata
152171
return queue, nil
153172
}
154173

0 commit comments

Comments
 (0)