Skip to content

Commit 9ff963f

Browse files
committed
rename
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent 62d6125 commit 9ff963f

File tree

4 files changed

+28
-29
lines changed

4 files changed

+28
-29
lines changed

pkg/stream/client.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -112,14 +112,14 @@ func newClient(connectionName string, broker *Broker,
112112
}
113113

114114
func (c *Client) getSocket() *socket {
115-
//c.mutex.Lock()
116-
//defer c.mutex.Unlock()
115+
//c.mutexMessageMap.Lock()
116+
//defer c.mutexMessageMap.Unlock()
117117
return &c.socket
118118
}
119119

120120
func (c *Client) setSocketConnection(connection net.Conn) {
121-
//c.mutex.Lock()
122-
//defer c.mutex.Unlock()
121+
//c.mutexMessageMap.Lock()
122+
//defer c.mutexMessageMap.Unlock()
123123
c.socket.connection = connection
124124
c.socket.writer = bufio.NewWriter(connection)
125125
}

pkg/stream/consumer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,7 @@ func (consumer *Consumer) cacheStoreOffset() {
378378
consumer.mutex.Lock()
379379
consumer.lastAutoCommitStored = time.Now()
380380
consumer.messageCountBeforeStorage = 0
381-
consumer.mutex.Unlock() // updateLastStoredOffset() in internalStoreOffset() also locks mutex, so not using defer for unlock
381+
consumer.mutex.Unlock() // updateLastStoredOffset() in internalStoreOffset() also locks mutexMessageMap, so not using defer for unlock
382382

383383
err := consumer.internalStoreOffset()
384384
if err != nil {

pkg/stream/producer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -535,8 +535,8 @@ func (producer *Producer) aggregateEntities(msgs []*messageSequence, size int, c
535535
// / the producer id is always the producer.GetID(). This function is needed only for testing
536536
// some condition, like simulate publish error.
537537
func (producer *Producer) internalBatchSendProdId(messagesSequence []*messageSequence, producerID uint8) error {
538-
//producer.options.client.socket.mutex.Lock()
539-
//defer producer.options.client.socket.mutex.Unlock()
538+
//producer.options.client.socket.mutexMessageMap.Lock()
539+
//defer producer.options.client.socket.mutexMessageMap.Unlock()
540540
if producer.getStatus() == closed {
541541
return fmt.Errorf("producer id: %d closed", producer.id)
542542
}

pkg/stream/producer_unconfirmed.go

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,21 @@ import (
1515
// The confirmation status is updated when the confirmation is received from the broker (see server_frame.go)
1616
// or due of timeout. The Timeout is configurable, and it is calculated client side.
1717
type unConfirmed struct {
18-
messages map[int64]*ConfirmationStatus
19-
tmp []*ConfirmationStatus
20-
tmpMutex sync.Mutex
21-
mutex sync.RWMutex
18+
messages map[int64]*ConfirmationStatus
19+
tmp []*ConfirmationStatus
20+
tmpMutex sync.Mutex
21+
mutexMessageMap sync.RWMutex
2222
}
2323

2424
const DefaultUnconfirmedSize = 10_000
2525

2626
func newUnConfirmed() *unConfirmed {
2727

2828
r := &unConfirmed{
29-
messages: make(map[int64]*ConfirmationStatus, DefaultUnconfirmedSize),
30-
tmp: []*ConfirmationStatus{},
31-
tmpMutex: sync.Mutex{},
32-
mutex: sync.RWMutex{},
29+
messages: make(map[int64]*ConfirmationStatus, DefaultUnconfirmedSize),
30+
tmp: []*ConfirmationStatus{},
31+
tmpMutex: sync.Mutex{},
32+
mutexMessageMap: sync.RWMutex{},
3333
}
3434

3535
return r
@@ -39,7 +39,6 @@ func (u *unConfirmed) addFromSequence(message *messageSequence, source *message.
3939

4040
u.tmpMutex.Lock()
4141
u.tmp = append(u.tmp, &ConfirmationStatus{
42-
//u.messages[message.publishingId] = &ConfirmationStatus{
4342
inserted: time.Now(),
4443
message: *source,
4544
producerID: producerID,
@@ -50,19 +49,19 @@ func (u *unConfirmed) addFromSequence(message *messageSequence, source *message.
5049
}
5150

5251
func (u *unConfirmed) link(from int64, to int64) {
53-
u.mutex.Lock()
54-
defer u.mutex.Unlock()
52+
u.mutexMessageMap.Lock()
53+
defer u.mutexMessageMap.Unlock()
5554
r := u.messages[from]
5655
if r != nil {
5756
r.linkedTo = append(r.linkedTo, u.messages[to])
5857
}
5958
}
6059

6160
func (u *unConfirmed) extractWithConfirms(ids []int64) []*ConfirmationStatus {
62-
u.mutex.Lock()
63-
defer u.mutex.Unlock()
61+
u.mutexMessageMap.Lock()
62+
defer u.mutexMessageMap.Unlock()
6463
var res []*ConfirmationStatus
65-
u.fromTmpToMap()
64+
u.fromTmpToMap() ///
6665

6766
for _, v := range ids {
6867
m := u.extract(v, 0, true)
@@ -87,8 +86,8 @@ func (u *unConfirmed) fromTmpToMap() {
8786
}
8887

8988
func (u *unConfirmed) extractWithError(id int64, errorCode uint16) *ConfirmationStatus {
90-
u.mutex.Lock()
91-
defer u.mutex.Unlock()
89+
u.mutexMessageMap.Lock()
90+
defer u.mutexMessageMap.Unlock()
9291
u.fromTmpToMap()
9392
return u.extract(id, errorCode, false)
9493
}
@@ -117,8 +116,8 @@ func (u *unConfirmed) updateStatus(rootMessage *ConfirmationStatus, errorCode ui
117116
}
118117

119118
func (u *unConfirmed) extractWithTimeOut(timeout time.Duration) []*ConfirmationStatus {
120-
u.mutex.Lock()
121-
defer u.mutex.Unlock()
119+
u.mutexMessageMap.Lock()
120+
defer u.mutexMessageMap.Unlock()
122121
u.fromTmpToMap()
123122
var res []*ConfirmationStatus
124123
for _, v := range u.messages {
@@ -131,15 +130,15 @@ func (u *unConfirmed) extractWithTimeOut(timeout time.Duration) []*ConfirmationS
131130
}
132131

133132
func (u *unConfirmed) size() int {
134-
u.mutex.Lock()
135-
defer u.mutex.Unlock()
133+
u.mutexMessageMap.Lock()
134+
defer u.mutexMessageMap.Unlock()
136135
u.fromTmpToMap()
137136
return len(u.messages)
138137
}
139138

140139
func (u *unConfirmed) getAll() map[int64]*ConfirmationStatus {
141-
u.mutex.Lock()
142-
defer u.mutex.Unlock()
140+
u.mutexMessageMap.Lock()
141+
defer u.mutexMessageMap.Unlock()
143142
u.fromTmpToMap()
144143
return u.messages
145144
}

0 commit comments

Comments
 (0)