Skip to content

Commit b4563a8

Browse files
committed
add function to process the messages
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent 8b3c4a1 commit b4563a8

File tree

2 files changed

+68
-18
lines changed

2 files changed

+68
-18
lines changed

pkg/stream/blocking_queue.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,20 @@ func (bq *BlockingQueue[T]) Dequeue(timeout time.Duration) T {
5555
}
5656
}
5757

58+
func (bq *BlockingQueue[T]) Process(f func(T)) {
59+
isActive := true
60+
for isActive {
61+
select {
62+
case item, ok := <-bq.queue:
63+
if !ok {
64+
isActive = false
65+
return
66+
}
67+
f(item)
68+
}
69+
}
70+
}
71+
5872
func (bq *BlockingQueue[T]) Size() int {
5973
return len(bq.queue)
6074
}

pkg/stream/producer.go

Lines changed: 54 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -289,20 +289,11 @@ func (producer *Producer) processPendingSequencesQueue() {
289289
maxFrame := producer.options.client.getTuneState().requestedMaxFrameSize
290290
var avarage = 0
291291
iterations := 0
292-
293-
// the buffer is initialized with the size of the header
294-
sequenceToSend := make([]*messageSequence, 0)
295292
go func() {
293+
sequenceToSend := make([]*messageSequence, 0)
296294
totalBufferToSend := initBufferPublishSize
297-
for {
295+
producer.pendingSequencesQueue.Process(func(msg *messageSequence) {
298296
var lastError error
299-
// the dequeue is blocking with a timeout of 500ms
300-
// as soon as a message is available the Dequeue will be unblocked
301-
msg := producer.pendingSequencesQueue.Dequeue(time.Millisecond * 500)
302-
if producer.pendingSequencesQueue.IsStopped() {
303-
break
304-
}
305-
306297
if msg != nil {
307298
// There is something in the queue.Checks the buffer is still less than the maxFrame
308299
totalBufferToSend += msg.unCompressedSize
@@ -313,18 +304,14 @@ func (producer *Producer) processPendingSequencesQueue() {
313304
sequenceToSend = sequenceToSend[:0]
314305
totalBufferToSend = initBufferPublishSize
315306
}
316-
317307
sequenceToSend = append(sequenceToSend, msg)
318308
}
319-
320-
// if producer.pendingSequencesQueue.IsEmpty() means that the queue is empty so the producer is not sending
321-
// the messages during the checks of the buffer. In this case
322309
if producer.pendingSequencesQueue.IsReadyToSend() || len(sequenceToSend) >= producer.options.BatchSize {
323310
if len(sequenceToSend) > 0 {
324311
avarage += len(sequenceToSend)
325312
iterations++
326313
if iterations > 10000 {
327-
logs.LogInfo("producer %d avarage: %d", producer.id, avarage/iterations)
314+
logs.LogInfo("producer %d average: %d", producer.id, avarage/iterations)
328315
avarage = 0
329316
iterations = 0
330317
}
@@ -337,9 +324,58 @@ func (producer *Producer) processPendingSequencesQueue() {
337324
if lastError != nil {
338325
logs.LogError("error during sending messages: %s", lastError)
339326
}
340-
}
341-
logs.LogDebug("producer %d processPendingSequencesQueue closed", producer.id)
327+
328+
})
342329
}()
330+
logs.LogDebug("producer %d processPendingSequencesQueue closed", producer.id)
331+
// the buffer is initialized with the size of the header
332+
333+
// for {
334+
// var lastError error
335+
// // the dequeue is blocking with a timeout of 500ms
336+
// // as soon as a message is available the Dequeue will be unblocked
337+
// msg := producer.pendingSequencesQueue.Dequeue(time.Millisecond * 500)
338+
// if producer.pendingSequencesQueue.IsStopped() {
339+
// break
340+
// }
341+
//
342+
// if msg != nil {
343+
// // There is something in the queue.Checks the buffer is still less than the maxFrame
344+
// totalBufferToSend += msg.unCompressedSize
345+
// if totalBufferToSend > maxFrame {
346+
// // if the totalBufferToSend is greater than the requestedMaxFrameSize
347+
// // the producer sends the messages and reset the buffer
348+
// lastError = producer.internalBatchSend(sequenceToSend)
349+
// sequenceToSend = sequenceToSend[:0]
350+
// totalBufferToSend = initBufferPublishSize
351+
// }
352+
//
353+
// sequenceToSend = append(sequenceToSend, msg)
354+
// }
355+
//
356+
// // if producer.pendingSequencesQueue.IsEmpty() means that the queue is empty so the producer is not sending
357+
// // the messages during the checks of the buffer. In this case
358+
// if producer.pendingSequencesQueue.IsReadyToSend() || len(sequenceToSend) >= producer.options.BatchSize {
359+
// if len(sequenceToSend) > 0 {
360+
// avarage += len(sequenceToSend)
361+
// iterations++
362+
// if iterations > 10000 {
363+
// logs.LogInfo("producer %d average: %d", producer.id, avarage/iterations)
364+
// avarage = 0
365+
// iterations = 0
366+
// }
367+
//
368+
// lastError = producer.internalBatchSend(sequenceToSend)
369+
// sequenceToSend = sequenceToSend[:0]
370+
// totalBufferToSend += initBufferPublishSize
371+
// }
372+
// }
373+
// if lastError != nil {
374+
// logs.LogError("error during sending messages: %s", lastError)
375+
// }
376+
// }
377+
// logs.LogDebug("producer %d processPendingSequencesQueue closed", producer.id)
378+
//}()
343379
}
344380

345381
func (producer *Producer) assignPublishingID(message message.StreamMessage) int64 {

0 commit comments

Comments
 (0)