Skip to content

Commit df586f3

Browse files
committed
restore map
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent 811d615 commit df586f3

File tree

2 files changed

+7
-23
lines changed

2 files changed

+7
-23
lines changed

pkg/stream/consumer.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,9 @@ func (consumer *Consumer) Close() error {
325325
func (consumer *Consumer) close(reason Event) error {
326326

327327
if consumer.options == nil {
328-
logs.LogWarn("consumer options is nil, the close will be ignored")
328+
// the config is usually set. this check is just to avoid panic and to make some test
329+
// easier to write
330+
logs.LogDebug("consumer options is nil, the close will be ignored")
329331
return nil
330332
}
331333
consumer.cacheStoreOffset()

pkg/stream/producer_unconfirmed.go

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@ import (
1616
// or due of timeout. The Timeout is configurable, and it is calculated client side.
1717
type unConfirmed struct {
1818
messages map[int64]*ConfirmationStatus
19-
tmp []*ConfirmationStatus
20-
tmpMutex sync.Mutex
2119
mutexMessageMap sync.RWMutex
2220
}
2321

@@ -27,8 +25,6 @@ func newUnConfirmed() *unConfirmed {
2725

2826
r := &unConfirmed{
2927
messages: make(map[int64]*ConfirmationStatus, DefaultUnconfirmedSize),
30-
tmp: []*ConfirmationStatus{},
31-
tmpMutex: sync.Mutex{},
3228
mutexMessageMap: sync.RWMutex{},
3329
}
3430

@@ -37,15 +33,15 @@ func newUnConfirmed() *unConfirmed {
3733

3834
func (u *unConfirmed) addFromSequence(message *messageSequence, source *message.StreamMessage, producerID uint8) {
3935

40-
u.tmpMutex.Lock()
41-
u.tmp = append(u.tmp, &ConfirmationStatus{
36+
u.mutexMessageMap.Lock()
37+
u.messages[message.publishingId] = &ConfirmationStatus{
4238
inserted: time.Now(),
4339
message: *source,
4440
producerID: producerID,
4541
publishingId: message.publishingId,
4642
confirmed: false,
47-
})
48-
u.tmpMutex.Unlock()
43+
}
44+
u.mutexMessageMap.Unlock()
4945
}
5046

5147
func (u *unConfirmed) link(from int64, to int64) {
@@ -61,7 +57,6 @@ func (u *unConfirmed) extractWithConfirms(ids []int64) []*ConfirmationStatus {
6157
u.mutexMessageMap.Lock()
6258
defer u.mutexMessageMap.Unlock()
6359
var res []*ConfirmationStatus
64-
u.fromTmpToMap() ///
6560

6661
for _, v := range ids {
6762
m := u.extract(v, 0, true)
@@ -76,19 +71,9 @@ func (u *unConfirmed) extractWithConfirms(ids []int64) []*ConfirmationStatus {
7671

7772
}
7873

79-
func (u *unConfirmed) fromTmpToMap() {
80-
u.tmpMutex.Lock()
81-
defer u.tmpMutex.Unlock()
82-
for i := range u.tmp {
83-
u.messages[u.tmp[i].publishingId] = u.tmp[i]
84-
}
85-
u.tmp = u.tmp[:0]
86-
}
87-
8874
func (u *unConfirmed) extractWithError(id int64, errorCode uint16) *ConfirmationStatus {
8975
u.mutexMessageMap.Lock()
9076
defer u.mutexMessageMap.Unlock()
91-
u.fromTmpToMap()
9277
return u.extract(id, errorCode, false)
9378
}
9479

@@ -118,7 +103,6 @@ func (u *unConfirmed) updateStatus(rootMessage *ConfirmationStatus, errorCode ui
118103
func (u *unConfirmed) extractWithTimeOut(timeout time.Duration) []*ConfirmationStatus {
119104
u.mutexMessageMap.Lock()
120105
defer u.mutexMessageMap.Unlock()
121-
u.fromTmpToMap()
122106
var res []*ConfirmationStatus
123107
for _, v := range u.messages {
124108
if time.Since(v.inserted) > timeout {
@@ -132,13 +116,11 @@ func (u *unConfirmed) extractWithTimeOut(timeout time.Duration) []*ConfirmationS
132116
func (u *unConfirmed) size() int {
133117
u.mutexMessageMap.Lock()
134118
defer u.mutexMessageMap.Unlock()
135-
u.fromTmpToMap()
136119
return len(u.messages)
137120
}
138121

139122
func (u *unConfirmed) getAll() map[int64]*ConfirmationStatus {
140123
u.mutexMessageMap.Lock()
141124
defer u.mutexMessageMap.Unlock()
142-
u.fromTmpToMap()
143125
return u.messages
144126
}

0 commit comments

Comments
 (0)