Skip to content

Commit a3b6e51

Browse files
committed
Move rpc message type definitions into proto file (#741)
1 parent 4d6efae commit a3b6e51

File tree

8 files changed

+297
-319
lines changed

8 files changed

+297
-319
lines changed

dispatcher/dispatcher.go

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ type blockMsg struct {
6666
ctx context.Context
6767
chainID uint32
6868
block *iotextypes.Block
69-
blkType uint32
7069
}
7170

7271
func (m blockMsg) ChainID() uint32 {
@@ -101,7 +100,7 @@ type IotxDispatcher struct {
101100
started int32
102101
shutdown int32
103102
eventChan chan interface{}
104-
eventAudit map[uint32]int
103+
eventAudit map[iotexrpc.MessageType]int
105104
eventAuditLock sync.RWMutex
106105
wg sync.WaitGroup
107106
quit chan struct{}
@@ -114,7 +113,7 @@ type IotxDispatcher struct {
114113
func NewDispatcher(cfg config.Config) (Dispatcher, error) {
115114
d := &IotxDispatcher{
116115
eventChan: make(chan interface{}, cfg.Dispatcher.EventChanSize),
117-
eventAudit: make(map[uint32]int),
116+
eventAudit: make(map[iotexrpc.MessageType]int),
118117
quit: make(chan struct{}),
119118
subscribers: make(map[uint32]Subscriber),
120119
}
@@ -160,10 +159,10 @@ func (d *IotxDispatcher) EventChan() *chan interface{} {
160159
}
161160

162161
// EventAudit returns the event audit map
163-
func (d *IotxDispatcher) EventAudit() map[uint32]int {
162+
func (d *IotxDispatcher) EventAudit() map[iotexrpc.MessageType]int {
164163
d.eventAuditLock.RLock()
165164
defer d.eventAuditLock.RUnlock()
166-
snapshot := make(map[uint32]int)
165+
snapshot := make(map[iotexrpc.MessageType]int)
167166
for k, v := range d.eventAudit {
168167
snapshot[k] = v
169168
}
@@ -199,7 +198,7 @@ loop:
199198

200199
// handleActionMsg handles actionMsg from all peers.
201200
func (d *IotxDispatcher) handleActionMsg(m *actionMsg) {
202-
d.updateEventAudit(protogen.MsgActionType)
201+
d.updateEventAudit(iotexrpc.MessageType_ACTION)
203202
if subscriber, ok := d.subscribers[m.ChainID()]; ok {
204203
if err := subscriber.HandleAction(m.ctx, m.action); err != nil {
205204
requestMtc.WithLabelValues("AddAction", "false").Inc()
@@ -215,7 +214,7 @@ func (d *IotxDispatcher) handleBlockMsg(m *blockMsg) {
215214
d.subscribersMU.RLock()
216215
defer d.subscribersMU.RUnlock()
217216
if subscriber, ok := d.subscribers[m.ChainID()]; ok {
218-
d.updateEventAudit(protogen.MsgBlockProtoMsgType)
217+
d.updateEventAudit(iotexrpc.MessageType_BLOCK)
219218
if err := subscriber.HandleBlock(m.ctx, m.block); err != nil {
220219
log.L().Error("Fail to handle the block.", zap.Error(err))
221220
}
@@ -231,7 +230,7 @@ func (d *IotxDispatcher) handleBlockSyncMsg(m *blockSyncMsg) {
231230
zap.Uint64("start", m.sync.Start),
232231
zap.Uint64("end", m.sync.End))
233232

234-
d.updateEventAudit(protogen.MsgBlockSyncReqType)
233+
d.updateEventAudit(iotexrpc.MessageType_BLOCK_REQUEST)
235234
if subscriber, ok := d.subscribers[m.ChainID()]; ok {
236235
// dispatch to block sync
237236
if err := subscriber.HandleSyncRequest(m.ctx, m.peer, m.sync); err != nil {
@@ -263,7 +262,6 @@ func (d *IotxDispatcher) dispatchBlockCommit(ctx context.Context, chainID uint32
263262
ctx: ctx,
264263
chainID: chainID,
265264
block: (msg).(*iotextypes.Block),
266-
blkType: protogen.MsgBlockProtoMsgType,
267265
})
268266
}
269267

@@ -282,7 +280,7 @@ func (d *IotxDispatcher) dispatchBlockSyncReq(ctx context.Context, chainID uint3
282280

283281
// HandleBroadcast handles incoming broadcast message
284282
func (d *IotxDispatcher) HandleBroadcast(ctx context.Context, chainID uint32, message proto.Message) {
285-
msgType, err := protogen.GetTypeFromProtoMsg(message)
283+
msgType, err := protogen.GetTypeFromRPCMsg(message)
286284
if err != nil {
287285
log.L().Warn("Unexpected message handled by HandleBroadcast.", zap.Error(err))
288286
}
@@ -296,33 +294,33 @@ func (d *IotxDispatcher) HandleBroadcast(ctx context.Context, chainID uint32, me
296294
d.subscribersMU.RUnlock()
297295

298296
switch msgType {
299-
case protogen.MsgConsensusType:
297+
case iotexrpc.MessageType_CONSENSUS:
300298
err := subscriber.HandleConsensusMsg(message.(*iotexrpc.Consensus))
301299
if err != nil {
302300
log.L().Error("Failed to handle block propose.", zap.Error(err))
303301
}
304-
case protogen.MsgActionType:
302+
case iotexrpc.MessageType_ACTION:
305303
d.dispatchAction(ctx, chainID, message)
306-
case protogen.MsgBlockProtoMsgType:
304+
case iotexrpc.MessageType_BLOCK:
307305
d.dispatchBlockCommit(ctx, chainID, message)
308306
default:
309-
log.L().Warn("Unexpected msgType handled by HandleBroadcast.", zap.Uint32("msgType", msgType))
307+
log.L().Warn("Unexpected msgType handled by HandleBroadcast.", zap.Any("msgType", msgType))
310308
}
311309
}
312310

313311
// HandleTell handles incoming unicast message
314312
func (d *IotxDispatcher) HandleTell(ctx context.Context, chainID uint32, peer peerstore.PeerInfo, message proto.Message) {
315-
msgType, err := protogen.GetTypeFromProtoMsg(message)
313+
msgType, err := protogen.GetTypeFromRPCMsg(message)
316314
if err != nil {
317315
log.L().Warn("Unexpected message handled by HandleTell.", zap.Error(err))
318316
}
319317
switch msgType {
320-
case protogen.MsgBlockSyncReqType:
318+
case iotexrpc.MessageType_BLOCK_REQUEST:
321319
d.dispatchBlockSyncReq(ctx, chainID, peer, message)
322-
case protogen.MsgBlockProtoMsgType:
320+
case iotexrpc.MessageType_BLOCK:
323321
d.dispatchBlockCommit(ctx, chainID, message)
324322
default:
325-
log.L().Warn("Unexpected msgType handled by HandleTell.", zap.Uint32("msgType", msgType))
323+
log.L().Warn("Unexpected msgType handled by HandleTell.", zap.Any("msgType", msgType))
326324
}
327325
}
328326

@@ -336,7 +334,7 @@ func (d *IotxDispatcher) enqueueEvent(event interface{}) {
336334
}()
337335
}
338336

339-
func (d *IotxDispatcher) updateEventAudit(t uint32) {
337+
func (d *IotxDispatcher) updateEventAudit(t iotexrpc.MessageType) {
340338
d.eventAuditLock.Lock()
341339
defer d.eventAuditLock.Unlock()
342340
d.eventAudit[t]++

p2p/agent.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ import (
2424
"go.uber.org/zap"
2525

2626
"github.com/iotexproject/iotex-core/config"
27-
p2ppb "github.com/iotexproject/iotex-core/p2p/pb"
2827
"github.com/iotexproject/iotex-core/pkg/log"
2928
"github.com/iotexproject/iotex-core/protogen"
29+
"github.com/iotexproject/iotex-core/protogen/iotexrpc"
3030
)
3131

3232
const (
@@ -115,7 +115,7 @@ func (p *Agent) Start(ctx context.Context) error {
115115
<-ready
116116
var (
117117
peerID string
118-
broadcast p2ppb.BroadcastMsg
118+
broadcast iotexrpc.BroadcastMsg
119119
latency int64
120120
)
121121
skip := false
@@ -150,7 +150,7 @@ func (p *Agent) Start(ctx context.Context) error {
150150
t, _ := ptypes.Timestamp(broadcast.GetTimestamp())
151151
latency = time.Since(t).Nanoseconds() / time.Millisecond.Nanoseconds()
152152

153-
msg, err := protogen.TypifyProtoMsg(broadcast.MsgType, broadcast.MsgBody)
153+
msg, err := protogen.TypifyRPCMsg(broadcast.MsgType, broadcast.MsgBody)
154154
if err != nil {
155155
err = errors.Wrap(err, "error when typifying broadcast message")
156156
return
@@ -165,7 +165,7 @@ func (p *Agent) Start(ctx context.Context) error {
165165
// Blocking handling the unicast message until the agent is started
166166
<-ready
167167
var (
168-
unicast p2ppb.UnicastMsg
168+
unicast iotexrpc.UnicastMsg
169169
peerID string
170170
latency int64
171171
)
@@ -181,7 +181,7 @@ func (p *Agent) Start(ctx context.Context) error {
181181
err = errors.Wrap(err, "error when marshaling unicast message")
182182
return
183183
}
184-
msg, err := protogen.TypifyProtoMsg(unicast.MsgType, unicast.MsgBody)
184+
msg, err := protogen.TypifyRPCMsg(unicast.MsgType, unicast.MsgBody)
185185
if err != nil {
186186
err = errors.Wrap(err, "error when typifying unicast message")
187187
return
@@ -277,7 +277,7 @@ func (p *Agent) Stop(ctx context.Context) error {
277277

278278
// BroadcastOutbound sends a broadcast message to the whole network
279279
func (p *Agent) BroadcastOutbound(ctx context.Context, msg proto.Message) (err error) {
280-
var msgType uint32
280+
var msgType iotexrpc.MessageType
281281
var msgBody []byte
282282
defer func() {
283283
status := successStr
@@ -301,7 +301,7 @@ func (p *Agent) BroadcastOutbound(ctx context.Context, msg proto.Message) (err e
301301
err = errors.New("P2P context doesn't exist")
302302
return
303303
}
304-
broadcast := p2ppb.BroadcastMsg{
304+
broadcast := iotexrpc.BroadcastMsg{
305305
ChainId: p2pCtx.ChainID,
306306
PeerId: p.host.HostIdentity(),
307307
MsgType: msgType,
@@ -322,7 +322,7 @@ func (p *Agent) BroadcastOutbound(ctx context.Context, msg proto.Message) (err e
322322

323323
// UnicastOutbound sends a unicast message to the given address
324324
func (p *Agent) UnicastOutbound(ctx context.Context, peer peerstore.PeerInfo, msg proto.Message) (err error) {
325-
var msgType uint32
325+
var msgType iotexrpc.MessageType
326326
var msgBody []byte
327327
defer func() {
328328
status := successStr
@@ -340,7 +340,7 @@ func (p *Agent) UnicastOutbound(ctx context.Context, peer peerstore.PeerInfo, ms
340340
err = errors.New("P2P context doesn't exist")
341341
return
342342
}
343-
unicast := p2ppb.UnicastMsg{
343+
unicast := iotexrpc.UnicastMsg{
344344
ChainId: p2pCtx.ChainID,
345345
PeerId: p.host.HostIdentity(),
346346
MsgType: msgType,
@@ -370,8 +370,8 @@ func (p *Agent) Neighbors(ctx context.Context) ([]peerstore.PeerInfo, error) {
370370
return p.host.Neighbors(ctx)
371371
}
372372

373-
func convertAppMsg(msg proto.Message) (uint32, []byte, error) {
374-
msgType, err := protogen.GetTypeFromProtoMsg(msg)
373+
func convertAppMsg(msg proto.Message) (iotexrpc.MessageType, []byte, error) {
374+
msgType, err := protogen.GetTypeFromRPCMsg(msg)
375375
if err != nil {
376376
return 0, nil, errors.Wrap(err, "error when converting application message to proto")
377377
}

0 commit comments

Comments
 (0)