Skip to content

Commit cf1399d

Browse files
authored
Queue refactor. (#153)
* Queue refactor wip * still wip * rework workers * Queue refactor tests pass * Update process_message test * - Enabled Message Retry - Updated Golden files * Refactored Implementation * Clean up
1 parent 424bb25 commit cf1399d

32 files changed

+1365
-570
lines changed

cmd/main.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,21 @@ import (
77
"time"
88
_ "time/tzdata"
99

10+
convoyRedis "github.com/frain-dev/convoy/queue/redis"
1011
"github.com/getsentry/sentry-go"
1112
"github.com/google/uuid"
1213
"go.mongodb.org/mongo-driver/bson/primitive"
1314

1415
"github.com/frain-dev/convoy/util"
16+
"github.com/go-redis/redis/v8"
1517
log "github.com/sirupsen/logrus"
18+
"github.com/vmihailenco/taskq/v3"
1619
prefixed "github.com/x-cray/logrus-prefixed-formatter"
1720

1821
"github.com/frain-dev/convoy"
1922
"github.com/frain-dev/convoy/config"
2023
"github.com/frain-dev/convoy/datastore"
2124
"github.com/frain-dev/convoy/queue"
22-
"github.com/frain-dev/convoy/queue/redis"
2325
"github.com/spf13/cobra"
2426
"go.mongodb.org/mongo-driver/mongo"
2527
)
@@ -83,10 +85,11 @@ func main() {
8385
sentryHook := convoy.NewSentryHook(convoy.DefaultLevels)
8486
log.AddHook(sentryHook)
8587

86-
var queuer queue.Queuer
88+
var qFn taskq.Factory
89+
var rC *redis.Client
8790

8891
if cfg.Queue.Type == config.RedisQueueProvider {
89-
queuer, err = redis.New(cfg)
92+
rC, qFn, err = convoyRedis.NewClient(cfg)
9093
if err != nil {
9194
return err
9295
}
@@ -102,7 +105,8 @@ func main() {
102105
app.groupRepo = datastore.NewGroupRepo(conn)
103106
app.applicationRepo = datastore.NewApplicationRepo(conn)
104107
app.messageRepo = datastore.NewMessageRepository(conn)
105-
app.queue = queuer
108+
app.scheduleQueue = convoyRedis.NewQueue(rC, qFn, "ScheduleQueue")
109+
app.deadLetterQueue = convoyRedis.NewQueue(rC, qFn, "DeadLetterQueue")
106110

107111
ensureMongoIndices(conn)
108112
err = ensureDefaultGroup(context.Background(), app.groupRepo)
@@ -114,7 +118,12 @@ func main() {
114118
},
115119
PersistentPostRunE: func(cmd *cobra.Command, args []string) error {
116120
defer func() {
117-
err := app.queue.Close()
121+
err := app.scheduleQueue.Close()
122+
if err != nil {
123+
log.Errorln("failed to close app queue - ", err)
124+
}
125+
126+
err = app.deadLetterQueue.Close()
118127
if err != nil {
119128
log.Errorln("failed to close app queue - ", err)
120129
}
@@ -180,7 +189,8 @@ type app struct {
180189
groupRepo convoy.GroupRepository
181190
applicationRepo convoy.ApplicationRepository
182191
messageRepo convoy.MessageRepository
183-
queue queue.Queuer
192+
scheduleQueue queue.Queuer
193+
deadLetterQueue queue.Queuer
184194
}
185195

186196
func getCtx() (context.Context, context.CancelFunc) {

cmd/server.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,13 @@ import (
44
"errors"
55
"time"
66

7+
"github.com/frain-dev/convoy"
78
"github.com/frain-dev/convoy/config"
9+
convoy_queue "github.com/frain-dev/convoy/queue/redis"
810
"github.com/frain-dev/convoy/server"
911
"github.com/frain-dev/convoy/util"
1012
"github.com/frain-dev/convoy/worker"
13+
convoy_task "github.com/frain-dev/convoy/worker/task"
1114
log "github.com/sirupsen/logrus"
1215
"github.com/spf13/cobra"
1316
)
@@ -36,11 +39,20 @@ func addServerCommand(a *app) *cobra.Command {
3639
return errors.New("please provide the HTTP port in the convoy.json file")
3740
}
3841

39-
srv := server.New(cfg, a.messageRepo, a.applicationRepo, a.groupRepo)
42+
srv := server.New(cfg, a.messageRepo, a.applicationRepo, a.groupRepo, a.scheduleQueue)
4043

41-
worker.NewCleaner(a.queue, a.messageRepo).Start()
42-
worker.NewScheduler(a.queue, a.messageRepo).Start()
43-
worker.NewProducer(a.queue, a.groupRepo, a.applicationRepo, a.messageRepo, cfg.Signature, cfg.SMTP).Start()
44+
// register workers.
45+
if queue, ok := a.scheduleQueue.(*convoy_queue.RedisQueue); ok {
46+
worker.NewProducer(queue).Start()
47+
}
48+
49+
if queue, ok := a.deadLetterQueue.(*convoy_queue.RedisQueue); ok {
50+
worker.NewCleaner(queue).Start()
51+
}
52+
53+
// register tasks.
54+
convoy_task.CreateTask(convoy.EventProcessor, cfg, convoy_task.ProcessMessages(a.applicationRepo, a.messageRepo, a.groupRepo))
55+
convoy_task.CreateTask(convoy.DeadLetterProcessor, cfg, convoy_task.ProcessDeadLetters)
4456

4557
log.Infof("Started convoy server in %s", time.Since(start))
4658
return srv.ListenAndServe()

convoy-docker.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"sentry": {
33
"dsn": "<insert-sentry-dsn>"
4-
},
4+
},
55
"database": {
66
"environment": "dev",
77
"dsn": "mongodb://root:rootpassword@mongodb:27017"

docs/docs.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
// Package docs GENERATED BY THE COMMAND ABOVE; DO NOT EDIT
22
// This file was generated by swaggo/swag at
3-
// 2021-10-15 12:55:36.391586 +0100 WAT m=+25.226284585
3+
// 2021-10-20 13:55:35.278383 +0100 WAT m=+41.113373043
44
package docs
55

66
import (
@@ -1028,6 +1028,14 @@ var doc = `{
10281028
"Group"
10291029
],
10301030
"summary": "Get groups",
1031+
"parameters": [
1032+
{
1033+
"type": "string",
1034+
"description": "group name",
1035+
"name": "name",
1036+
"in": "query"
1037+
}
1038+
],
10311039
"responses": {
10321040
"200": {
10331041
"description": "OK",

docs/swagger.json

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1017,6 +1017,14 @@
10171017
"Group"
10181018
],
10191019
"summary": "Get groups",
1020+
"parameters": [
1021+
{
1022+
"type": "string",
1023+
"description": "group name",
1024+
"name": "name",
1025+
"in": "query"
1026+
}
1027+
],
10201028
"responses": {
10211029
"200": {
10221030
"description": "OK",

docs/swagger.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -702,6 +702,11 @@ paths:
702702
consumes:
703703
- application/json
704704
description: This endpoint fetches groups
705+
parameters:
706+
- description: group name
707+
in: query
708+
name: name
709+
type: string
705710
produces:
706711
- application/json
707712
responses:

docs/v3/openapi3.json

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1319,6 +1319,16 @@
13191319
"/groups": {
13201320
"get": {
13211321
"description": "This endpoint fetches groups",
1322+
"parameters": [
1323+
{
1324+
"description": "group name",
1325+
"in": "query",
1326+
"name": "name",
1327+
"schema": {
1328+
"type": "string"
1329+
}
1330+
}
1331+
],
13221332
"responses": {
13231333
"200": {
13241334
"content": {

docs/v3/openapi3.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -805,6 +805,12 @@ paths:
805805
/groups:
806806
get:
807807
description: This endpoint fetches groups
808+
parameters:
809+
- description: group name
810+
in: query
811+
name: name
812+
schema:
813+
type: string
808814
responses:
809815
"200":
810816
content:

go.mod

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,16 @@ require (
66
github.com/dgrijalva/jwt-go v3.2.0+incompatible
77
github.com/felixge/httpsnoop v1.0.2
88
github.com/getkin/kin-openapi v0.78.0
9-
github.com/getsentry/sentry-go v0.11.0 // indirect
9+
github.com/getsentry/sentry-go v0.11.0
1010
github.com/ghodss/yaml v1.0.0
1111
github.com/go-chi/chi/v5 v5.0.3
1212
github.com/go-chi/render v1.0.1
13-
github.com/go-co-op/gocron v1.7.0
14-
github.com/go-redis/redis/v8 v8.8.2
13+
github.com/go-redis/redis/v8 v8.11.4
1514
github.com/gobeam/mongo-go-pagination v0.0.7
1615
github.com/golang/mock v1.4.4
1716
github.com/golang/snappy v0.0.4 // indirect
1817
github.com/google/uuid v1.2.0
19-
github.com/klauspost/compress v1.13.4 // indirect
18+
github.com/jarcoal/httpmock v1.0.8
2019
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d // indirect
2120
github.com/olekukonko/tablewriter v0.0.5
2221
github.com/pkg/errors v0.9.1
@@ -27,12 +26,14 @@ require (
2726
github.com/spf13/cobra v1.1.3
2827
github.com/stretchr/testify v1.7.0
2928
github.com/swaggo/swag v1.7.3
29+
github.com/vmihailenco/taskq/v3 v3.2.6
3030
github.com/x-cray/logrus-prefixed-formatter v0.5.2
3131
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect
3232
go.mongodb.org/mongo-driver v1.7.1
3333
golang.org/x/crypto v0.0.0-20210813211128-0a44fdfbc16e
34-
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 // indirect
34+
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
3535
golang.org/x/text v0.3.7 // indirect
3636
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
3737
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df
38+
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
3839
)

0 commit comments

Comments
 (0)