Skip to content

Commit 424bb25

Browse files
authored
fix panic on endpoint connection error (#160)
* fix panic on endpoint connection error * fix logging statements * remove pointers to interfaces
1 parent ebeab44 commit 424bb25

File tree

8 files changed

+34
-33
lines changed

8 files changed

+34
-33
lines changed

cmd/server.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ func addServerCommand(a *app) *cobra.Command {
3838

3939
srv := server.New(cfg, a.messageRepo, a.applicationRepo, a.groupRepo)
4040

41-
worker.NewCleaner(&a.queue, &a.messageRepo).Start()
42-
worker.NewScheduler(&a.queue, &a.messageRepo).Start()
43-
worker.NewProducer(&a.queue, &a.groupRepo, &a.applicationRepo, &a.messageRepo, cfg.Signature, cfg.SMTP).Start()
41+
worker.NewCleaner(a.queue, a.messageRepo).Start()
42+
worker.NewScheduler(a.queue, a.messageRepo).Start()
43+
worker.NewProducer(a.queue, a.groupRepo, a.applicationRepo, a.messageRepo, cfg.Signature, cfg.SMTP).Start()
4444

4545
log.Infof("Started convoy server in %s", time.Since(start))
4646
return srv.ListenAndServe()

datastore/db.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func EnsureIndex(db *mongo.Database, collectionName string, field string, unique
4646

4747
_, err := collection.Indexes().CreateOne(ctx, mod)
4848
if err != nil {
49-
log.Errorf("failed to create index on field %s in %s - %+v\n", field, collectionName, err)
49+
log.WithError(err).Errorf("failed to create index on field %s in %s", field, collectionName)
5050
return false
5151
}
5252

datastore/message.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,12 +116,12 @@ func (db *messageRepo) LoadMessageIntervals(ctx context.Context, groupID string,
116116

117117
data, err := db.inner.Aggregate(ctx, mongo.Pipeline{matchStage, groupStage, sortStage})
118118
if err != nil {
119-
log.Errorln("aggregate error - ", err)
119+
log.WithError(err).Errorln("aggregate error")
120120
return nil, err
121121
}
122122
var messagesIntervals []models.MessageInterval
123123
if err = data.All(ctx, &messagesIntervals); err != nil {
124-
log.Errorln("marshal error - ", err)
124+
log.WithError(err).Error("marshal error")
125125
return nil, err
126126
}
127127
if messagesIntervals == nil {

net/dispatcher.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func (d *Dispatcher) SendRequest(endpoint, method string, jsonData json.RawMessa
3434

3535
req, err := http.NewRequest(method, endpoint, bytes.NewBuffer(jsonData))
3636
if err != nil {
37-
log.Errorf("error occurred while creating request - %+v\n", err)
37+
log.WithError(err).Error("error occurred while creating request")
3838
return r, err
3939
}
4040
if !util.IsStringEmpty(signatureHeader) {
@@ -44,18 +44,22 @@ func (d *Dispatcher) SendRequest(endpoint, method string, jsonData json.RawMessa
4444
req.Header.Add("Content-Type", "application/json")
4545
req.Header.Add("User-Agent", string(DefaultUserAgent))
4646

47+
r.RequestHeader = req.Header
48+
r.URL = req.URL
49+
r.Method = req.Method
50+
4751
trace := &httptrace.ClientTrace{
4852
GotConn: func(connInfo httptrace.GotConnInfo) {
4953
r.IP = connInfo.Conn.RemoteAddr().String()
50-
log.Debugf("IP address resolved to: %s\n", connInfo.Conn.RemoteAddr())
54+
log.Infof("IP address resolved to: %s", connInfo.Conn.RemoteAddr())
5155
},
5256
}
5357

5458
req = req.WithContext(httptrace.WithClientTrace(req.Context(), trace))
5559

5660
response, err := d.client.Do(req)
5761
if err != nil {
58-
log.Debugf("error sending request to API endpoint - %+v\n", err)
62+
log.WithError(err).Error("error sending request to API endpoint")
5963
r.Error = err.Error()
6064
return r, err
6165
}
@@ -64,12 +68,12 @@ func (d *Dispatcher) SendRequest(endpoint, method string, jsonData json.RawMessa
6468
body, err := ioutil.ReadAll(response.Body)
6569
r.Body = body
6670
if err != nil {
67-
log.Errorf("Couldn't parse Response Body. %+v\n", err)
71+
log.WithError(err).Error("couldn't parse response body")
6872
return r, err
6973
}
7074
err = response.Body.Close()
7175
if err != nil {
72-
log.Errorf("error while closing connection - %+v\n", err)
76+
log.WithError(err).Error("error while closing connection")
7377
return r, err
7478
}
7579

@@ -91,8 +95,5 @@ type Response struct {
9195
func updateDispatchHeaders(r *Response, res *http.Response) {
9296
r.Status = res.Status
9397
r.StatusCode = res.StatusCode
94-
r.URL = res.Request.URL
95-
r.Method = res.Request.Method
96-
r.RequestHeader = res.Request.Header
9798
r.ResponseHeader = res.Header
9899
}

server/route.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func reactRootHandler(rw http.ResponseWriter, req *http.Request) {
3333
f := fs.FS(reactFS)
3434
static, err := fs.Sub(f, "ui/build")
3535
if err != nil {
36-
log.Errorf("an error has occurred with the react app - %+v\n", err)
36+
log.WithError(err).Error("an error has occurred with the react app")
3737
return
3838
}
3939
if _, err := static.Open(strings.TrimLeft(p, "/")); err != nil { // If file not found server index/html from root

worker/cleaner.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@ import (
88
)
99

1010
type Cleaner struct {
11-
queue *queue.Queuer
12-
msgRepo *convoy.MessageRepository
11+
queue queue.Queuer
12+
msgRepo convoy.MessageRepository
1313
}
1414

15-
func NewCleaner(queuer *queue.Queuer, msgRepo *convoy.MessageRepository) *Cleaner {
15+
func NewCleaner(queuer queue.Queuer, msgRepo convoy.MessageRepository) *Cleaner {
1616
return &Cleaner{
1717
queue: queuer,
1818
msgRepo: msgRepo,
@@ -21,7 +21,7 @@ func NewCleaner(queuer *queue.Queuer, msgRepo *convoy.MessageRepository) *Cleane
2121

2222
func (c *Cleaner) Start() {
2323
go func() {
24-
log.Debugln("Running cleanup tasks")
25-
task.RetryAbandonedMessages(*c.queue, *c.msgRepo)
24+
log.Infoln("Running cleanup tasks")
25+
task.RetryAbandonedMessages(c.queue, c.msgRepo)
2626
}()
2727
}

worker/producer.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,18 @@ import (
1818

1919
type Producer struct {
2020
Data chan queue.Message
21-
groupRepo *convoy.GroupRepository
22-
appRepo *convoy.ApplicationRepository
23-
msgRepo *convoy.MessageRepository
21+
groupRepo convoy.GroupRepository
22+
appRepo convoy.ApplicationRepository
23+
msgRepo convoy.MessageRepository
2424
dispatch *net.Dispatcher
2525
signatureConfig config.SignatureConfiguration
2626
smtpConfig config.SMTPConfiguration
2727
quit chan chan error
2828
}
2929

30-
func NewProducer(queuer *queue.Queuer, groupRepo *convoy.GroupRepository, appRepo *convoy.ApplicationRepository, msgRepo *convoy.MessageRepository, signatureConfig config.SignatureConfiguration, smtpConfig config.SMTPConfiguration) *Producer {
30+
func NewProducer(queuer queue.Queuer, groupRepo convoy.GroupRepository, appRepo convoy.ApplicationRepository, msgRepo convoy.MessageRepository, signatureConfig config.SignatureConfiguration, smtpConfig config.SMTPConfiguration) *Producer {
3131
return &Producer{
32-
Data: (*queuer).Read(),
32+
Data: queuer.Read(),
3333
groupRepo: groupRepo,
3434
appRepo: appRepo,
3535
msgRepo: msgRepo,
@@ -46,7 +46,7 @@ func (p *Producer) Start() {
4646
select {
4747
case data := <-p.Data:
4848
go func() {
49-
p.postMessages(*p.msgRepo, *p.appRepo, data.Data)
49+
p.postMessages(p.msgRepo, p.appRepo, data.Data)
5050
}()
5151
case ch := <-p.quit:
5252
close(p.Data)
@@ -67,7 +67,7 @@ func (p *Producer) postMessages(msgRepo convoy.MessageRepository, appRepo convoy
6767

6868
e := &m.AppMetadata.Endpoints[i]
6969
if e.Sent {
70-
log.Debugf("endpoint %s already merged with message %s\n", e.TargetURL, m.UID)
70+
log.Infof("endpoint %s already merged with message %s", e.TargetURL, m.UID)
7171
continue
7272
}
7373

@@ -223,10 +223,10 @@ func (p *Producer) postMessages(msgRepo convoy.MessageRepository, appRepo convoy
223223
}
224224
}
225225

226-
func sendEmailNotification(m *convoy.AppMetadata, o *convoy.GroupRepository, s *smtp.SmtpClient, status convoy.EndpointStatus) error {
226+
func sendEmailNotification(m *convoy.AppMetadata, o convoy.GroupRepository, s *smtp.SmtpClient, status convoy.EndpointStatus) error {
227227
email := m.SupportEmail
228228

229-
org, err := (*o).FetchGroupByID(context.Background(), m.GroupID)
229+
org, err := o.FetchGroupByID(context.Background(), m.GroupID)
230230
if err != nil {
231231
return err
232232
}

worker/scheduler.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,11 @@ import (
1212

1313
type Scheduler struct {
1414
inner *gocron.Scheduler
15-
queue *queue.Queuer
16-
msgRepo *convoy.MessageRepository
15+
queue queue.Queuer
16+
msgRepo convoy.MessageRepository
1717
}
1818

19-
func NewScheduler(queue *queue.Queuer, msgRepo *convoy.MessageRepository) *Scheduler {
19+
func NewScheduler(queue queue.Queuer, msgRepo convoy.MessageRepository) *Scheduler {
2020
return &Scheduler{
2121
inner: gocron.NewScheduler(time.UTC),
2222
queue: queue,
@@ -33,7 +33,7 @@ func (s *Scheduler) Start() {
3333

3434
func (s *Scheduler) addTask(name string, secs int, task func(queue.Queuer, convoy.MessageRepository)) {
3535
_, err := s.inner.Every(secs).Seconds().Do(func() {
36-
task(*s.queue, *s.msgRepo)
36+
task(s.queue, s.msgRepo)
3737
})
3838
if err != nil {
3939
log.Fatalf("Failed to add %s scheduler task", name)

0 commit comments

Comments
 (0)