Skip to content

Commit 948dc30

Browse files
committed
tmp buffer
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent d9ac9ac commit 948dc30

File tree

2 files changed

+29
-9
lines changed

2 files changed

+29
-9
lines changed

pkg/stream/producer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -529,8 +529,8 @@ func (producer *Producer) aggregateEntities(msgs []*messageSequence, size int, c
529529
// / the producer id is always the producer.GetID(). This function is needed only for testing
530530
// some condition, like simulate publish error.
531531
func (producer *Producer) internalBatchSendProdId(messagesSequence []*messageSequence, producerID uint8) error {
532-
producer.options.client.socket.mutex.Lock()
533-
defer producer.options.client.socket.mutex.Unlock()
532+
//producer.options.client.socket.mutex.Lock()
533+
//defer producer.options.client.socket.mutex.Unlock()
534534
if producer.getStatus() == closed {
535535
return fmt.Errorf("producer id: %d closed", producer.id)
536536
}

pkg/stream/producer_unconfirmed.go

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,19 @@ import (
1515
// or due of timeout. The Timeout is configurable, and it is calculated client side.
1616
type unConfirmed struct {
1717
messages map[int64]*ConfirmationStatus
18+
tmp []*ConfirmationStatus
19+
tmpMutex sync.Mutex
1820
mutex sync.RWMutex
1921
}
2022

21-
const DefaultUnconfirmedSize = 10000
23+
const DefaultUnconfirmedSize = 10_000
2224

2325
func newUnConfirmed() *unConfirmed {
2426

2527
r := &unConfirmed{
2628
messages: make(map[int64]*ConfirmationStatus, DefaultUnconfirmedSize),
29+
tmp: []*ConfirmationStatus{},
30+
tmpMutex: sync.Mutex{},
2731
mutex: sync.RWMutex{},
2832
}
2933

@@ -32,15 +36,16 @@ func newUnConfirmed() *unConfirmed {
3236

3337
func (u *unConfirmed) addFromSequence(message *messageSequence, producerID uint8) {
3438

35-
u.mutex.Lock()
36-
u.messages[message.publishingId] = &ConfirmationStatus{
39+
u.tmpMutex.Lock()
40+
u.tmp = append(u.tmp, &ConfirmationStatus{
41+
//u.messages[message.publishingId] = &ConfirmationStatus{
3742
inserted: time.Now(),
3843
message: *message.refMessage,
3944
producerID: producerID,
4045
publishingId: message.publishingId,
4146
confirmed: false,
42-
}
43-
u.mutex.Unlock()
47+
})
48+
u.tmpMutex.Unlock()
4449
}
4550

4651
func (u *unConfirmed) link(from int64, to int64) {
@@ -52,11 +57,13 @@ func (u *unConfirmed) link(from int64, to int64) {
5257
}
5358
}
5459

55-
func (u *unConfirmed) extractWithConfirms(id []int64) []*ConfirmationStatus {
60+
func (u *unConfirmed) extractWithConfirms(ids []int64) []*ConfirmationStatus {
5661
u.mutex.Lock()
5762
defer u.mutex.Unlock()
5863
var res []*ConfirmationStatus
59-
for _, v := range id {
64+
u.fromTmpToMap()
65+
66+
for _, v := range ids {
6067
m := u.extract(v, 0, true)
6168
if m != nil {
6269
res = append(res, m)
@@ -69,9 +76,19 @@ func (u *unConfirmed) extractWithConfirms(id []int64) []*ConfirmationStatus {
6976

7077
}
7178

79+
func (u *unConfirmed) fromTmpToMap() {
80+
u.tmpMutex.Lock()
81+
defer u.tmpMutex.Unlock()
82+
for i := range u.tmp {
83+
u.messages[u.tmp[i].publishingId] = u.tmp[i]
84+
}
85+
u.tmp = u.tmp[:0]
86+
}
87+
7288
func (u *unConfirmed) extractWithError(id int64, errorCode uint16) *ConfirmationStatus {
7389
u.mutex.Lock()
7490
defer u.mutex.Unlock()
91+
u.fromTmpToMap()
7592
return u.extract(id, errorCode, false)
7693
}
7794

@@ -101,6 +118,7 @@ func (u *unConfirmed) updateStatus(rootMessage *ConfirmationStatus, errorCode ui
101118
func (u *unConfirmed) extractWithTimeOut(timeout time.Duration) []*ConfirmationStatus {
102119
u.mutex.Lock()
103120
defer u.mutex.Unlock()
121+
u.fromTmpToMap()
104122
var res []*ConfirmationStatus
105123
for _, v := range u.messages {
106124
if time.Since(v.inserted) > timeout {
@@ -114,11 +132,13 @@ func (u *unConfirmed) extractWithTimeOut(timeout time.Duration) []*ConfirmationS
114132
func (u *unConfirmed) size() int {
115133
u.mutex.Lock()
116134
defer u.mutex.Unlock()
135+
u.fromTmpToMap()
117136
return len(u.messages)
118137
}
119138

120139
func (u *unConfirmed) getAll() map[int64]*ConfirmationStatus {
121140
u.mutex.Lock()
122141
defer u.mutex.Unlock()
142+
u.fromTmpToMap()
123143
return u.messages
124144
}

0 commit comments

Comments
 (0)