Skip to content

Commit eb1b2b0

Browse files
authored
Merge pull request #6 from jfontan/improvement/use-blocking-call-for-republish
Use blocking call with timeout for republish
2 parents 100479d + 762e6f4 commit eb1b2b0

File tree

2 files changed

+15
-29
lines changed

2 files changed

+15
-29
lines changed

README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@ The list of available variables is:
7373
- `AMQP_BACKOFF_MIN` (default: 20ms): Minimum time to wait for retry the connection or queue channel assignment.
7474
- `AMQP_BACKOFF_MAX` (default: 30s): Maximum time to wait for retry the connection or queue channel assignment.
7575
- `AMQP_BACKOFF_FACTOR` (default: 2): Multiplying factor for each increment step on the retry.
76+
- `AMQP_BURIED_QUEUE_SUFFIX` (default: `.buriedQueue`): Suffix for the buried queue name.
77+
- `AMQP_BURIED_EXCHANGE_SUFFIX` (default: `.buriedExchange`): Suffix for the exchange name.
78+
- `AMQP_BURIED_TIMEOUT` (default: 500): Time in milliseconds to wait for new jobs from the buried queue.
79+
- `AMQP_RETRIES_HEADER` (default: `x-retries`): Message header to set the number of retries.
80+
- `AMQP_ERROR_HEADER` (default: `x-error-type`): Message header to set the error type.
7681

7782
License
7883
-------

amqp/amqp.go

Lines changed: 10 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@ var DefaultConfiguration Configuration
3535
// Configuration AMQP configuration settings, this settings are set using the
3636
// envinroment varabiles.
3737
type Configuration struct {
38-
BuriedQueueSuffix string `envconfig:"BURIED_QUEUE_SUFFIX" default:".buriedQueue"`
39-
BuriedExchangeSuffix string `envconfig:"BURIED_EXCHANGE_SUFFIX" default:".buriedExchange"`
40-
BuriedNonBlockingRetries int `envconfig:"BURIED_BLOCKING_RETRIES" default:"3"`
38+
BuriedQueueSuffix string `envconfig:"BURIED_QUEUE_SUFFIX" default:".buriedQueue"`
39+
BuriedExchangeSuffix string `envconfig:"BURIED_EXCHANGE_SUFFIX" default:".buriedExchange"`
40+
BuriedTimeout int `envconfig:"BURIED_TIMEOUT" default:"500"`
4141

4242
RetriesHeader string `envconfig:"RETRIES_HEADER" default:"x-retries"`
4343
ErrorHeader string `envconfig:"ERROR_HEADER" default:"x-error-type"`
@@ -345,41 +345,22 @@ func (q *Queue) RepublishBuried(conditions ...queue.RepublishConditionFunc) erro
345345

346346
defer iter.Close()
347347

348-
retries := 0
348+
timeout := time.Duration(DefaultConfiguration.BuriedTimeout) * time.Millisecond
349+
349350
var notComplying []*queue.Job
350351
var errorsPublishing []*jobErr
351352
for {
352-
j, err := iter.(*JobIter).nextNonBlocking()
353+
j, err := iter.(*JobIter).nextWithTimeout(timeout)
353354
if err != nil {
354355
return err
355356
}
356357

357358
if j == nil {
358-
log.With(log.Fields{
359-
"retries": retries,
360-
}).Debugf("received empty job")
361-
362-
// check (in non blocking mode) up to DefaultConfiguration.BuriedNonBlockingRetries
363-
// with a small delay between them just in case some job is
364-
// arriving, return if there is nothing after all the retries
365-
// (meaning: BuriedQueue is surely empty or any arriving jobs will
366-
// have to wait to the next call).
367-
if retries > DefaultConfiguration.BuriedNonBlockingRetries {
368-
log.With(log.Fields{
369-
"retries": retries,
370-
"max-retries": DefaultConfiguration.BuriedNonBlockingRetries,
371-
}).Debugf("maximum number of retries reached")
359+
log.Debugf("no more jobs in the buried queue")
372360

373-
break
374-
}
375-
376-
time.Sleep(50 * time.Millisecond)
377-
retries++
378-
continue
361+
break
379362
}
380363

381-
retries = 0
382-
383364
if err = j.Ack(); err != nil {
384365
return err
385366
}
@@ -541,15 +522,15 @@ func (i *JobIter) Next() (*queue.Job, error) {
541522
return fromDelivery(&d)
542523
}
543524

544-
func (i *JobIter) nextNonBlocking() (*queue.Job, error) {
525+
func (i *JobIter) nextWithTimeout(timeout time.Duration) (*queue.Job, error) {
545526
select {
546527
case d, ok := <-i.c:
547528
if !ok {
548529
return nil, queue.ErrAlreadyClosed.New()
549530
}
550531

551532
return fromDelivery(&d)
552-
default:
533+
case <-time.After(timeout):
553534
return nil, nil
554535
}
555536
}

0 commit comments

Comments
 (0)