Skip to content

Commit fe83545

Browse files
authored
Merge pull request #472 from yacovm/nonblockingSubmit
Fail fast if request already exists in pool
2 parents 6610591 + 8b9eb99 commit fe83545

File tree

2 files changed

+17
-4
lines changed

2 files changed

+17
-4
lines changed

internal/bft/requestpool.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ const (
2222
defaultRequestTimeout = 10 * time.Second // for unit tests only
2323
)
2424

25+
var (
26+
ErrReqAlreadyExists = fmt.Errorf("request already exists")
27+
)
28+
2529
//go:generate mockery -dir . -name RequestTimeoutHandler -case underscore -output ./mocks/
2630

2731
// RequestTimeoutHandler defines the methods called by request timeout timers created by time.AfterFunc.
@@ -46,7 +50,7 @@ type Pool struct {
4650
inspector api.RequestInspector
4751
options PoolOptions
4852

49-
lock sync.Mutex
53+
lock sync.RWMutex
5054
fifo *list.List
5155
semaphore *semaphore.Weighted
5256
existMap map[types.RequestInfo]*list.Element
@@ -138,6 +142,15 @@ func (rp *Pool) Submit(request []byte) error {
138142
return errors.Errorf("pool closed, request rejected: %s", reqInfo)
139143
}
140144

145+
rp.lock.RLock()
146+
_, alreadyExists := rp.existMap[reqInfo]
147+
rp.lock.RUnlock()
148+
149+
if alreadyExists {
150+
rp.logger.Debugf("request %s already exists in the pool", reqInfo)
151+
return ErrReqAlreadyExists
152+
}
153+
141154
// do not wait for a semaphore with a lock, as it will prevent draining the pool.
142155
if err := rp.semaphore.Acquire(context.Background(), 1); err != nil {
143156
return errors.Wrapf(err, "acquiring semaphore for request: %s", reqInfo)
@@ -150,9 +163,9 @@ func (rp *Pool) Submit(request []byte) error {
150163

151164
if _, exist := rp.existMap[reqInfo]; exist {
152165
rp.semaphore.Release(1)
153-
errStr := fmt.Sprintf("request %s already exists in the pool", reqInfo)
166+
errStr := fmt.Sprintf("request %s has been already added to the pool", reqInfo)
154167
rp.logger.Debugf(errStr)
155-
return errors.New(errStr)
168+
return ErrReqAlreadyExists
156169
}
157170

158171
to := time.AfterFunc(

internal/bft/requestpool_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,7 @@ func TestReqPoolTimeout(t *testing.T) {
471471

472472
pool.RestartTimers()
473473
err = pool.Submit(byteReq2)
474-
assert.EqualError(t, err, "request {2 2} already exists in the pool")
474+
assert.Equal(t, bft.ErrReqAlreadyExists, err)
475475
pool.StopTimers()
476476
pool.RestartTimers()
477477

0 commit comments

Comments
 (0)