Skip to content

Commit 2bb2eaf

Browse files
authored
Merge pull request #225 from yacovm/createPoolWithTimeoutHandler
Create pool with timeout handler
2 parents b94942b + 84d0f9b commit 2bb2eaf

File tree

4 files changed

+36
-39
lines changed

4 files changed

+36
-39
lines changed

internal/bft/batcher_test.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,21 @@ import (
1111
"time"
1212

1313
"github.com/SmartBFT-Go/consensus/internal/bft"
14+
"github.com/SmartBFT-Go/consensus/internal/bft/mocks"
1415
"github.com/SmartBFT-Go/consensus/pkg/types"
1516
"github.com/stretchr/testify/assert"
17+
"github.com/stretchr/testify/mock"
1618
"go.uber.org/zap"
1719
)
1820

21+
var (
22+
noopTimeoutHandler = &mocks.RequestTimeoutHandler{}
23+
)
24+
25+
func init() {
26+
noopTimeoutHandler.On("OnRequestTimeout", mock.Anything, mock.Anything)
27+
}
28+
1929
func TestBatcherBasic(t *testing.T) {
2030
basicLog, err := zap.NewDevelopment()
2131
assert.NoError(t, err)
@@ -25,7 +35,7 @@ func TestBatcherBasic(t *testing.T) {
2535
byteReq1 := makeTestRequest("1", "1", "foo")
2636
byteReq2 := makeTestRequest("2", "2", "foo")
2737
byteReq3 := makeTestRequest("3", "3", "foo")
28-
pool := bft.NewPool(log, insp, bft.PoolOptions{QueueSize: 3})
38+
pool := bft.NewPool(log, insp, noopTimeoutHandler, bft.PoolOptions{QueueSize: 3})
2939
err = pool.Submit(byteReq1)
3040
assert.NoError(t, err)
3141

@@ -95,7 +105,7 @@ func TestBatcherWhileSubmitting(t *testing.T) {
95105
assert.NoError(t, err)
96106
log := basicLog.Sugar()
97107
insp := &testRequestInspector{}
98-
pool := bft.NewPool(log, insp, bft.PoolOptions{QueueSize: 200})
108+
pool := bft.NewPool(log, insp, noopTimeoutHandler, bft.PoolOptions{QueueSize: 200})
99109

100110
batcher := bft.NewBatchBuilder(pool, 100, 100*time.Second) // long time
101111

@@ -136,7 +146,7 @@ func TestBatcherClose(t *testing.T) {
136146
insp := &testRequestInspector{}
137147

138148
byteReq := makeTestRequest("1", "1", "foo")
139-
pool := bft.NewPool(log, insp, bft.PoolOptions{QueueSize: 3})
149+
pool := bft.NewPool(log, insp, noopTimeoutHandler, bft.PoolOptions{QueueSize: 3})
140150
err = pool.Submit(byteReq)
141151
assert.NoError(t, err)
142152

@@ -170,7 +180,7 @@ func TestBatcherReset(t *testing.T) {
170180
insp := &testRequestInspector{}
171181

172182
byteReq1 := makeTestRequest("1", "1", "foo")
173-
pool := bft.NewPool(log, insp, bft.PoolOptions{QueueSize: 3})
183+
pool := bft.NewPool(log, insp, noopTimeoutHandler, bft.PoolOptions{QueueSize: 3})
174184
err = pool.Submit(byteReq1)
175185
assert.NoError(t, err)
176186

internal/bft/requestpool.go

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import (
1919
)
2020

2121
const (
22-
DefaultRequestTimeout = 60000 * time.Millisecond
22+
DefaultRequestTimeout = 10 * time.Second
2323
)
2424

2525
//go:generate mockery -dir . -name RequestTimeoutHandler -case underscore -output ./mocks/
@@ -68,7 +68,7 @@ type PoolOptions struct {
6868
}
6969

7070
// NewPool constructs new requests pool
71-
func NewPool(log api.Logger, inspector api.RequestInspector, options PoolOptions) *Pool {
71+
func NewPool(log api.Logger, inspector api.RequestInspector, th RequestTimeoutHandler, options PoolOptions) *Pool {
7272
if options.RequestTimeout == 0 {
7373
options.RequestTimeout = DefaultRequestTimeout
7474
}
@@ -80,19 +80,16 @@ func NewPool(log api.Logger, inspector api.RequestInspector, options PoolOptions
8080
}
8181

8282
return &Pool{
83-
logger: log,
84-
inspector: inspector,
85-
fifo: list.New(),
86-
semaphore: semaphore.NewWeighted(options.QueueSize),
87-
existMap: make(map[types.RequestInfo]*list.Element),
88-
options: options,
83+
timeoutHandler: th,
84+
logger: log,
85+
inspector: inspector,
86+
fifo: list.New(),
87+
semaphore: semaphore.NewWeighted(options.QueueSize),
88+
existMap: make(map[types.RequestInfo]*list.Element),
89+
options: options,
8990
}
9091
}
9192

92-
func (rp *Pool) SetTimeoutHandler(handler RequestTimeoutHandler) {
93-
rp.timeoutHandler = handler
94-
}
95-
9693
func (rp *Pool) isStopped() bool {
9794
rp.lock.Lock()
9895
defer rp.lock.Unlock()

internal/bft/requestpool_test.go

Lines changed: 9 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,7 @@ func TestReqPoolBasic(t *testing.T) {
3636
t.Run("create close", func(t *testing.T) {
3737
timeoutHandler := &mocks.RequestTimeoutHandler{}
3838

39-
pool := bft.NewPool(log, insp, bft.PoolOptions{QueueSize: 3, RequestTimeout: time.Hour})
40-
pool.SetTimeoutHandler(timeoutHandler)
39+
pool := bft.NewPool(log, insp, timeoutHandler, bft.PoolOptions{QueueSize: 3, RequestTimeout: time.Hour})
4140

4241
assert.Equal(t, 0, pool.Size())
4342
err = pool.Submit(byteReq1)
@@ -53,9 +52,8 @@ func TestReqPoolBasic(t *testing.T) {
5352
t.Run("submit remove next", func(t *testing.T) {
5453
timeoutHandler := &mocks.RequestTimeoutHandler{}
5554

56-
pool := bft.NewPool(log, insp, bft.PoolOptions{QueueSize: 3, RequestTimeout: time.Hour})
55+
pool := bft.NewPool(log, insp, timeoutHandler, bft.PoolOptions{QueueSize: 3, RequestTimeout: time.Hour})
5756
defer pool.Close()
58-
pool.SetTimeoutHandler(timeoutHandler)
5957

6058
assert.Equal(t, 0, pool.Size())
6159
err = pool.Submit(byteReq1)
@@ -152,9 +150,8 @@ func TestReqPoolCapacity(t *testing.T) {
152150

153151
t.Run("eventually submit", func(t *testing.T) {
154152
timeoutHandler := &mocks.RequestTimeoutHandler{}
155-
pool := bft.NewPool(log, insp, bft.PoolOptions{QueueSize: 50, RequestTimeout: time.Hour})
153+
pool := bft.NewPool(log, insp, timeoutHandler, bft.PoolOptions{QueueSize: 50, RequestTimeout: time.Hour})
156154
defer pool.Close()
157-
pool.SetTimeoutHandler(timeoutHandler)
158155

159156
wg := sync.WaitGroup{}
160157
wg.Add(2 * numReq)
@@ -188,9 +185,8 @@ func TestReqPoolCapacity(t *testing.T) {
188185

189186
t.Run("submit storm", func(t *testing.T) {
190187
timeoutHandler := &mocks.RequestTimeoutHandler{}
191-
pool := bft.NewPool(log, insp, bft.PoolOptions{QueueSize: 50, RequestTimeout: time.Hour})
188+
pool := bft.NewPool(log, insp, timeoutHandler, bft.PoolOptions{QueueSize: 50, RequestTimeout: time.Hour})
192189
defer pool.Close()
193-
pool.SetTimeoutHandler(timeoutHandler)
194190

195191
wg := sync.WaitGroup{}
196192
wg.Add(2 * numReq)
@@ -237,9 +233,8 @@ func TestReqPoolPrune(t *testing.T) {
237233

238234
byteReq1 := makeTestRequest("1", "1", "foo")
239235
byteReq2 := makeTestRequest("2", "2", "bar")
240-
pool := bft.NewPool(log, insp, bft.PoolOptions{QueueSize: 3, RequestTimeout: time.Hour})
236+
pool := bft.NewPool(log, insp, timeoutHandler, bft.PoolOptions{QueueSize: 3, RequestTimeout: time.Hour})
241237
defer pool.Close()
242-
pool.SetTimeoutHandler(timeoutHandler)
243238

244239
assert.Equal(t, 0, pool.Size())
245240

@@ -291,7 +286,7 @@ func TestReqPoolTimeout(t *testing.T) {
291286
assert.Fail(t, "called OnAutoRemoveTimeout")
292287
}).Return()
293288

294-
pool := bft.NewPool(log, insp,
289+
pool := bft.NewPool(log, insp, timeoutHandler,
295290
bft.PoolOptions{
296291
QueueSize: 3,
297292
RequestTimeout: 10 * time.Millisecond,
@@ -300,7 +295,6 @@ func TestReqPoolTimeout(t *testing.T) {
300295
},
301296
)
302297
defer pool.Close()
303-
pool.SetTimeoutHandler(timeoutHandler)
304298

305299
assert.Equal(t, 0, pool.Size())
306300
err = pool.Submit(byteReq1)
@@ -336,7 +330,7 @@ func TestReqPoolTimeout(t *testing.T) {
336330
assert.Fail(t, "called OnAutoRemoveTimeout")
337331
}).Return()
338332

339-
pool := bft.NewPool(log, insp,
333+
pool := bft.NewPool(log, insp, timeoutHandler,
340334
bft.PoolOptions{
341335
QueueSize: 3,
342336
RequestTimeout: 10 * time.Millisecond,
@@ -345,7 +339,6 @@ func TestReqPoolTimeout(t *testing.T) {
345339
},
346340
)
347341
defer pool.Close()
348-
pool.SetTimeoutHandler(timeoutHandler)
349342

350343
assert.Equal(t, 0, pool.Size())
351344
err = pool.Submit(byteReq1)
@@ -383,7 +376,7 @@ func TestReqPoolTimeout(t *testing.T) {
383376
to3WG.Done()
384377
}).Return()
385378

386-
pool := bft.NewPool(log, insp,
379+
pool := bft.NewPool(log, insp, timeoutHandler,
387380
bft.PoolOptions{
388381
QueueSize: 3,
389382
RequestTimeout: 10 * time.Millisecond,
@@ -392,7 +385,6 @@ func TestReqPoolTimeout(t *testing.T) {
392385
},
393386
)
394387
defer pool.Close()
395-
pool.SetTimeoutHandler(timeoutHandler)
396388

397389
assert.Equal(t, 0, pool.Size())
398390
err = pool.Submit(byteReq1)
@@ -429,7 +421,7 @@ func TestReqPoolTimeout(t *testing.T) {
429421
assert.Fail(t, "called OnAutoRemoveTimeout")
430422
}).Return()
431423

432-
pool := bft.NewPool(log, insp,
424+
pool := bft.NewPool(log, insp, timeoutHandler,
433425
bft.PoolOptions{
434426
QueueSize: 3,
435427
RequestTimeout: 100 * time.Millisecond,
@@ -438,7 +430,6 @@ func TestReqPoolTimeout(t *testing.T) {
438430
},
439431
)
440432
defer pool.Close()
441-
pool.SetTimeoutHandler(timeoutHandler)
442433
assert.Equal(t, 0, pool.Size())
443434
err = pool.Submit(byteReq1)
444435
assert.NoError(t, err)

pkg/consensus/consensus.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,16 +64,12 @@ func (c *Consensus) Start() {
6464
LeaderFwdTimeout: requestTimeout,
6565
AutoRemoveTimeout: requestTimeout,
6666
}
67-
pool := algorithm.NewPool(c.Logger, c.RequestInspector, opts)
68-
batchBuilder := algorithm.NewBatchBuilder(pool, c.BatchSize, c.BatchTimeout)
6967

7068
c.controller = &algorithm.Controller{
7169
ProposerBuilder: c,
7270
WAL: c.WAL,
7371
ID: c.SelfID,
7472
N: c.N,
75-
Batcher: batchBuilder,
76-
RequestPool: pool,
7773
RequestTimeout: requestTimeout,
7874
Verifier: c.Verifier,
7975
Logger: c.Logger,
@@ -86,7 +82,10 @@ func (c *Consensus) Start() {
8682
RequestInspector: c.RequestInspector,
8783
}
8884

89-
pool.SetTimeoutHandler(c.controller)
85+
pool := algorithm.NewPool(c.Logger, c.RequestInspector, c.controller, opts)
86+
batchBuilder := algorithm.NewBatchBuilder(pool, c.BatchSize, c.BatchTimeout)
87+
c.controller.RequestPool = pool
88+
c.controller.Batcher = batchBuilder
9089

9190
// If we delivered to the application proposal with sequence i,
9291
// then we are expecting to be proposed a proposal with sequence i+1.

0 commit comments

Comments
 (0)