Skip to content

Commit a667403

Browse files
authored
Merge pull request #16 from carlosms/memory-window
Add advertised window support for memory queue
2 parents 6214d25 + e25f9e5 commit a667403

File tree

2 files changed

+121
-7
lines changed

2 files changed

+121
-7
lines changed

memory/memory.go

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -117,34 +117,50 @@ func (q *Queue) Transaction(txcb queue.TxCallback) error {
117117
return nil
118118
}
119119

120-
// Consume implements Queue. MemoryQueues have infinite advertised window.
121-
func (q *Queue) Consume(_ int) (queue.JobIter, error) {
122-
return &JobIter{q: q, RWMutex: &q.RWMutex, finite: q.finite}, nil
120+
// Consume implements Queue. The advertisedWindow value is the maximum number of
121+
// unacknowledged jobs. Use 0 for an infinite window.
122+
func (q *Queue) Consume(advertisedWindow int) (queue.JobIter, error) {
123+
jobIter := JobIter{
124+
q: q,
125+
RWMutex: &q.RWMutex,
126+
finite: q.finite,
127+
}
128+
129+
if advertisedWindow > 0 {
130+
jobIter.chn = make(chan struct{}, advertisedWindow)
131+
}
132+
133+
return &jobIter, nil
123134
}
124135

125136
// JobIter implements a queue.JobIter interface.
126137
type JobIter struct {
127138
q *Queue
128139
closed bool
129140
finite bool
141+
chn chan struct{}
130142
*sync.RWMutex
131143
}
132144

133145
// Acknowledger implements a queue.Acknowledger interface.
134146
type Acknowledger struct {
135-
q *Queue
136-
j *queue.Job
147+
q *Queue
148+
j *queue.Job
149+
chn chan struct{}
137150
}
138151

139152
// Ack is called when the Job has finished.
140-
func (*Acknowledger) Ack() error {
153+
func (a *Acknowledger) Ack() error {
154+
a.release()
141155
return nil
142156
}
143157

144158
// Reject is called when the Job has errored. The argument indicates whether the Job
145159
// should be put back in queue or not. If requeue is false, the job will go to the buried
146160
// queue until Queue.RepublishBuried() is called.
147161
func (a *Acknowledger) Reject(requeue bool) error {
162+
defer a.release()
163+
148164
if !requeue {
149165
// Send to the buried queue for later republishing
150166
a.q.buriedJobs = append(a.q.buriedJobs, a.j)
@@ -154,6 +170,12 @@ func (a *Acknowledger) Reject(requeue bool) error {
154170
return a.q.Publish(a.j)
155171
}
156172

173+
func (a *Acknowledger) release() {
174+
if a.chn != nil {
175+
<-a.chn
176+
}
177+
}
178+
157179
func (i *JobIter) isClosed() bool {
158180
i.RLock()
159181
defer i.RUnlock()
@@ -162,8 +184,10 @@ func (i *JobIter) isClosed() bool {
162184

163185
// Next returns the next job in the iter.
164186
func (i *JobIter) Next() (*queue.Job, error) {
187+
i.acquire()
165188
for {
166189
if i.isClosed() {
190+
i.release()
167191
return nil, queue.ErrAlreadyClosed.New()
168192
}
169193

@@ -173,6 +197,7 @@ func (i *JobIter) Next() (*queue.Job, error) {
173197
}
174198

175199
if err == io.EOF && i.finite {
200+
i.release()
176201
return nil, err
177202
}
178203

@@ -188,7 +213,7 @@ func (i *JobIter) next() (*queue.Job, error) {
188213
}
189214

190215
j := i.q.jobs[i.q.idx]
191-
j.Acknowledger = &Acknowledger{j: j, q: i.q}
216+
j.Acknowledger = &Acknowledger{j: j, q: i.q, chn: i.chn}
192217
i.q.idx++
193218

194219
return j, nil
@@ -201,3 +226,15 @@ func (i *JobIter) Close() error {
201226
i.closed = true
202227
return nil
203228
}
229+
230+
func (i *JobIter) acquire() {
231+
if i.chn != nil {
232+
i.chn <- struct{}{}
233+
}
234+
}
235+
236+
func (i *JobIter) release() {
237+
if i.chn != nil {
238+
<-i.chn
239+
}
240+
}

test/suite.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@ import (
44
"errors"
55
"fmt"
66
"math/rand"
7+
"strconv"
78
"sync"
9+
"sync/atomic"
10+
"testing"
811
"time"
912

1013
"gopkg.in/src-d/go-queue.v1"
@@ -542,6 +545,80 @@ func (s *QueueSuite) TestRetryQueue() {
542545
<-done
543546
}
544547

548+
func (s *QueueSuite) TestConcurrent() {
549+
testCases := []int{1, 2, 13, 150}
550+
551+
for _, advertisedWindow := range testCases {
552+
s.T().Run(strconv.Itoa(advertisedWindow), func(t *testing.T) {
553+
assert := assert.New(t)
554+
555+
qName := NewName()
556+
q, err := s.Broker.Queue(qName)
557+
assert.NoError(err)
558+
assert.NotNil(q)
559+
560+
var continueWG sync.WaitGroup
561+
continueWG.Add(1)
562+
563+
var calledWG sync.WaitGroup
564+
565+
var calls int32
566+
atomic.StoreInt32(&calls, 0)
567+
568+
iter, err := q.Consume(advertisedWindow)
569+
assert.NoError(err)
570+
571+
go func() {
572+
for {
573+
j, err := iter.Next()
574+
if queue.ErrAlreadyClosed.Is(err) {
575+
return
576+
}
577+
assert.NoError(err)
578+
if j == nil {
579+
time.Sleep(300 * time.Millisecond)
580+
continue
581+
}
582+
583+
go func() {
584+
// Removes 1 from calledWG, and gets locked
585+
// until continueWG is released
586+
atomic.AddInt32(&calls, 1)
587+
588+
calledWG.Done()
589+
continueWG.Wait()
590+
591+
assert.NoError(j.Ack())
592+
}()
593+
}
594+
}()
595+
596+
assert.EqualValues(0, atomic.LoadInt32(&calls))
597+
calledWG.Add(advertisedWindow)
598+
599+
// Enqueue some jobs, 3 * advertisedWindow
600+
for i := 0; i < advertisedWindow*3; i++ {
601+
j, err := queue.NewJob()
602+
assert.NoError(err)
603+
err = j.Encode(i)
604+
assert.NoError(err)
605+
err = q.Publish(j)
606+
assert.NoError(err)
607+
}
608+
609+
// The first batch of calls should be exactly advertisedWindow
610+
calledWG.Wait()
611+
assert.EqualValues(advertisedWindow, atomic.LoadInt32(&calls))
612+
613+
// Let the iterator go though all the jobs, should be 3*advertisedWindow
614+
calledWG.Add(2 * advertisedWindow)
615+
continueWG.Done()
616+
calledWG.Wait()
617+
assert.EqualValues(3*advertisedWindow, atomic.LoadInt32(&calls))
618+
})
619+
}
620+
}
621+
545622
func (s *QueueSuite) checkNextClosed(iter queue.JobIter) chan struct{} {
546623
assert := assert.New(s.T())
547624

0 commit comments

Comments
 (0)