Skip to content

Commit dc171cd

Browse files
authored
Merge pull request #19 from jfontan/fix/read-retries
Support int16 and int64 in retries header
2 parents a667403 + c01d65a commit dc171cd

File tree

2 files changed

+109
-8
lines changed

2 files changed

+109
-8
lines changed

amqp/amqp.go

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
package queue
1+
package amqp
22

33
import (
44
"fmt"
5+
"math"
56
"os"
67
"strings"
78
"sync"
@@ -626,18 +627,53 @@ func fromDelivery(d *amqp.Delivery) (*queue.Job, error) {
626627
j.Raw = d.Body
627628

628629
if retries, ok := d.Headers[DefaultConfiguration.RetriesHeader]; ok {
629-
retries, ok := retries.(int32)
630-
if !ok {
631-
return nil, ErrRetrievingHeader.New(DefaultConfiguration.RetriesHeader, d.MessageId)
632-
}
630+
switch r := retries.(type) {
631+
case int16:
632+
j.Retries = int32(r)
633+
634+
case int32:
635+
j.Retries = int32(r)
636+
637+
case int64:
638+
if r <= math.MaxInt32 {
639+
j.Retries = int32(r)
640+
} else {
641+
j.Retries = 0
642+
}
633643

634-
j.Retries = retries
644+
default:
645+
err = d.Reject(false)
646+
if err != nil {
647+
return nil, ErrRetrievingHeader.Wrap(
648+
err,
649+
DefaultConfiguration.RetriesHeader,
650+
d.MessageId,
651+
)
652+
}
653+
654+
return nil, ErrRetrievingHeader.New(
655+
DefaultConfiguration.RetriesHeader,
656+
d.MessageId,
657+
)
658+
}
635659
}
636660

637661
if errorType, ok := d.Headers[DefaultConfiguration.ErrorHeader]; ok {
638662
errorType, ok := errorType.(string)
639663
if !ok {
640-
return nil, ErrRetrievingHeader.New(DefaultConfiguration.ErrorHeader, d.MessageId)
664+
err = d.Reject(false)
665+
if err != nil {
666+
return nil, ErrRetrievingHeader.Wrap(
667+
err,
668+
DefaultConfiguration.ErrorHeader,
669+
d.MessageId,
670+
)
671+
}
672+
673+
return nil, ErrRetrievingHeader.New(
674+
DefaultConfiguration.ErrorHeader,
675+
d.MessageId,
676+
)
641677
}
642678

643679
j.ErrorType = errorType

amqp/amqp_test.go

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package queue
1+
package amqp
22

33
import (
44
"context"
@@ -11,6 +11,7 @@ import (
1111
"gopkg.in/src-d/go-queue.v1"
1212
"gopkg.in/src-d/go-queue.v1/test"
1313

14+
"github.com/streadway/amqp"
1415
"github.com/stretchr/testify/assert"
1516
"github.com/stretchr/testify/require"
1617
"github.com/stretchr/testify/suite"
@@ -177,6 +178,70 @@ func TestAMQPHeaders(t *testing.T) {
177178
}
178179
}
179180

181+
func TestAMQPHeaderRetriesType(t *testing.T) {
182+
broker, err := queue.NewBroker(testAMQPURI)
183+
require.NoError(t, err)
184+
defer func() { require.NoError(t, broker.Close()) }()
185+
186+
q, err := broker.Queue(test.NewName())
187+
require.NoError(t, err)
188+
189+
qa, ok := q.(*Queue)
190+
require.True(t, ok)
191+
192+
tests := []struct {
193+
name string
194+
retries interface{}
195+
}{
196+
{
197+
name: "int16",
198+
retries: int16(42),
199+
},
200+
{
201+
name: "int32",
202+
retries: int32(42),
203+
},
204+
{
205+
name: "int64",
206+
retries: int64(42),
207+
},
208+
}
209+
210+
for _, test := range tests {
211+
headers := amqp.Table{}
212+
headers[DefaultConfiguration.RetriesHeader] = test.retries
213+
err := qa.conn.channel().Publish(
214+
"", // exchange
215+
qa.queue.Name, // routing key
216+
false, // mandatory
217+
false,
218+
amqp.Publishing{
219+
DeliveryMode: amqp.Persistent,
220+
MessageId: "id",
221+
Priority: uint8(queue.PriorityNormal),
222+
Timestamp: time.Now(),
223+
ContentType: "application/msgpack",
224+
Body: []byte("gaxSZXBvc2l0b3J5SUTEEAFmXSlGxxOsFGMLs/gl7Qw="),
225+
Headers: headers,
226+
},
227+
)
228+
require.NoError(t, err)
229+
}
230+
231+
jobIter, err := q.Consume(len(tests))
232+
require.NoError(t, err)
233+
234+
for _, test := range tests {
235+
t.Run(test.name, func(t *testing.T) {
236+
job, err := jobIter.Next()
237+
require.NoError(t, err)
238+
require.NotNil(t, job)
239+
240+
require.Equal(t, int32(42), job.Retries)
241+
})
242+
}
243+
}
244+
180245
func TestAMQPRepublishBuried(t *testing.T) {
181246
broker, err := queue.NewBroker(testAMQPURI)
182247
require.NoError(t, err)

0 commit comments

Comments
 (0)