Skip to content

Commit b94942b

Browse files
authored
Merge pull request #222 from tock-ibm/hb-dispatch
Heartbeat dispatch & skeleton
2 parents d36bf21 + 0e24a7d commit b94942b

File tree

6 files changed

+143
-11
lines changed

6 files changed

+143
-11
lines changed

internal/bft/controller.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -213,11 +213,25 @@ func (c *Controller) OnAutoRemoveTimeout(requestInfo types.RequestInfo) {
213213

214214
// ProcessMessages dispatches the incoming message to the required component
215215
func (c *Controller) ProcessMessages(sender uint64, m *protos.Message) {
216-
if IsViewMessage(m) {
216+
switch m.GetContent().(type) {
217+
case *protos.Message_PrePrepare, *protos.Message_Prepare, *protos.Message_Commit:
217218
c.currView.HandleMessage(sender, m)
219+
c.Logger.Debugf("Node %d handled message %v from %d with seq %d", c.ID, m, sender, proposalSequence(m))
220+
221+
case *protos.Message_ViewChange, *protos.Message_ViewData, *protos.Message_NewView:
222+
// TODO view change
223+
c.Logger.Debugf("View change not yet implemented, ignoring message: %v, from %d", m, sender)
224+
225+
case *protos.Message_HeartBeat:
226+
//TODO heartbeat monitor
227+
c.Logger.Debugf("Heartbeat monitor not yet implemented, ignoring message: %v, from %d", m, sender)
228+
229+
case *protos.Message_Error:
230+
c.Logger.Debugf("Error message handling not yet implemented, ignoring message: %v, from %d", m, sender)
231+
232+
default:
233+
c.Logger.Warnf("Unexpected message type, ignoring")
218234
}
219-
c.Logger.Debugf("Node %d handled message %v from %d with seq %d", c.ID, m, sender, proposalSequence(m))
220-
// TODO the msg can be a view change message or a tx req coming from a node after a timeout
221235
}
222236

223237
func (c *Controller) startView(proposalSequence uint64) {

internal/bft/heartbeatmonitor.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// Copyright IBM Corp. All Rights Reserved.
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
//
5+
6+
package bft
7+
8+
import (
9+
"sync"
10+
"time"
11+
12+
"github.com/SmartBFT-Go/consensus/pkg/api"
13+
"github.com/SmartBFT-Go/consensus/smartbftprotos"
14+
)
15+
16+
const (
17+
DefaultHeartbeatTimeout = 60 * time.Second
18+
)
19+
20+
//go:generate mockery -dir . -name HeartbeatTimeoutHandler -case underscore -output ./mocks/
21+
22+
// HeartbeatTimeoutHandler defines who to call when a heartbeat timeout expires.
23+
type HeartbeatTimeoutHandler interface {
24+
OnHeartbeatTimeout(view uint64, leaderID uint64)
25+
}
26+
27+
type HeartbeatMonitor struct {
28+
logger api.Logger
29+
hbTimeout time.Duration
30+
hbInterval time.Duration
31+
comm Comm
32+
handler HeartbeatTimeoutHandler
33+
mutex sync.Mutex
34+
timer *time.Timer
35+
view uint64
36+
leaderID uint64
37+
follower bool
38+
}
39+
40+
func NewHeartbeatMonitor(
41+
logger api.Logger,
42+
heartbeatTimeout time.Duration,
43+
comm Comm,
44+
) *HeartbeatMonitor {
45+
if heartbeatTimeout/10 < time.Nanosecond {
46+
return nil
47+
}
48+
49+
hm := &HeartbeatMonitor{
50+
logger: logger,
51+
hbTimeout: heartbeatTimeout,
52+
hbInterval: heartbeatTimeout / 10,
53+
comm: comm,
54+
}
55+
return hm
56+
}
57+
58+
func (hm *HeartbeatMonitor) SetTimeoutHandler(handler HeartbeatTimeoutHandler) {
59+
// TODO
60+
}
61+
62+
// StartFollower will start following the heartbeats of the leader of the view.
63+
func (hm *HeartbeatMonitor) StartFollower(view uint64, leaderID uint64) {
64+
// TODO
65+
}
66+
67+
// StartLeader will start sending heartbeats to all followers.
68+
func (hm *HeartbeatMonitor) StartLeader(view uint64, leaderID uint64) {
69+
// TODO
70+
}
71+
72+
// ProcessMsg handles an incoming heartbeat.
73+
func (hm *HeartbeatMonitor) ProcessMsg(sender uint64, msg *smartbftprotos.HeartBeat) {
74+
// TODO
75+
}
76+
77+
// Close stops following or sending heartbeats.
78+
func (hm *HeartbeatMonitor) Close() {
79+
// TODO
80+
}

internal/bft/heartbeatmonitor_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
// Copyright IBM Corp. All Rights Reserved.
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
//
5+
6+
package bft_test
7+
8+
import (
9+
"testing"
10+
"time"
11+
12+
"github.com/SmartBFT-Go/consensus/internal/bft"
13+
"github.com/SmartBFT-Go/consensus/internal/bft/mocks"
14+
"github.com/stretchr/testify/assert"
15+
"go.uber.org/zap"
16+
)
17+
18+
func TestHeartbeatMonitor_New(t *testing.T) {
19+
basicLog, err := zap.NewDevelopment()
20+
assert.NoError(t, err)
21+
log := basicLog.Sugar()
22+
23+
comm := &mocks.CommMock{}
24+
handler := &mocks.HeartbeatTimeoutHandler{}
25+
26+
hm := bft.NewHeartbeatMonitor(log, time.Hour, comm)
27+
assert.NotNil(t, hm)
28+
hm.SetTimeoutHandler(handler)
29+
hm.Close()
30+
}

internal/bft/mocks/heartbeat_timeout_handler.go

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/bft/util.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,6 @@ import (
1313
"github.com/golang/protobuf/proto"
1414
)
1515

16-
func IsViewMessage(m *protos.Message) bool {
17-
return m.GetCommit() != nil || m.GetPrepare() != nil || m.GetPrePrepare() != nil
18-
}
19-
2016
func viewNumber(m *protos.Message) uint64 {
2117
if pp := m.GetPrePrepare(); pp != nil {
2218
return pp.GetView()

pkg/consensus/consensus.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,7 @@ func (c *Consensus) Stop() {
9898
}
9999

100100
func (c *Consensus) HandleMessage(sender uint64, m *protos.Message) {
101-
if algorithm.IsViewMessage(m) {
102-
c.controller.ProcessMessages(sender, m)
103-
}
104-
101+
c.controller.ProcessMessages(sender, m)
105102
}
106103

107104
func (c *Consensus) HandleRequest(sender uint64, req []byte) {

0 commit comments

Comments
 (0)