Skip to content

Dynamic send and batch send splits for milestone 1.5 #367

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
b8ad4c0
work in progress
Gsantomaggio Nov 29, 2024
f8153b5
work in progress
Gsantomaggio Nov 29, 2024
d744d95
bump to 1.22 golang
Gsantomaggio Dec 1, 2024
88ce242
Merge branch 'main' into dynamic_send
Gsantomaggio Dec 3, 2024
4dc144d
change the batchSend
Gsantomaggio Dec 3, 2024
cee445a
Merge branch 'dynamic_send' of github.com:rabbitmq/rabbitmq-stream-go…
Gsantomaggio Dec 3, 2024
b1001e1
change the batchSend
Gsantomaggio Dec 3, 2024
fb0a6c9
adding tests
Gsantomaggio Dec 3, 2024
3930f60
send
Gsantomaggio Dec 9, 2024
16018dd
code refactor
Gsantomaggio Dec 10, 2024
31f86c5
unconfirmed
Gsantomaggio Dec 10, 2024
1642636
documentation
Gsantomaggio Dec 11, 2024
5aa81ed
Update pkg/stream/producer.go
Gsantomaggio Dec 18, 2024
2163a7f
Update pkg/stream/producer_unconfirmed.go
Gsantomaggio Dec 18, 2024
2d39c3c
Update pkg/stream/producer_unconfirmed.go
Gsantomaggio Dec 18, 2024
967fe44
Update README.md
Gsantomaggio Dec 18, 2024
3de11e0
replace the channel with a blocking queue
Gsantomaggio Dec 20, 2024
fc5fe5e
make the unconfirmed operations atomics
Gsantomaggio Dec 27, 2024
57b8473
rename variable
Gsantomaggio Dec 27, 2024
37ff466
Improve producer edge cases
Gsantomaggio Jan 7, 2025
5218fc3
Improve producer edge cases
Gsantomaggio Jan 7, 2025
978a07e
bump windows version to rabbitmq 4.0.5 and erlang 27.2
Gsantomaggio Jan 7, 2025
4f3075f
udpate dependencies
Gsantomaggio Jan 7, 2025
5293c48
more logs for windows CI
Gsantomaggio Jan 7, 2025
3bbc832
restore old timeout
Gsantomaggio Jan 8, 2025
d1db816
temporany remove windows test due of https://github.com/actions/chec…
Gsantomaggio Jan 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/ha/ha_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func (c *ReliableConsumer) handleNotifyClose(channelClose stream.ChannelClose) {
go func() {
for event := range channelClose {
if strings.EqualFold(event.Reason, stream.SocketClosed) || strings.EqualFold(event.Reason, stream.MetaDataUpdate) {
c.setStatus(StatusReconnecting)
logs.LogWarn("[Reliable] - %s closed unexpectedly.. Reconnecting..", c.getInfo())
c.bootstrap = false
err, reconnected := retry(0, c)
Expand Down
3 changes: 2 additions & 1 deletion pkg/ha/ha_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ func (p *ReliableProducer) handleNotifyClose(channelClose stream.ChannelClose) {
go func() {
for event := range channelClose {
if strings.EqualFold(event.Reason, stream.SocketClosed) || strings.EqualFold(event.Reason, stream.MetaDataUpdate) {
p.setStatus(StatusReconnecting)
logs.LogWarn("[Reliable] - %s closed unexpectedly.. Reconnecting..", p.getInfo())
err, reconnected := retry(0, p)
if err != nil {
logs.LogInfo(""+
logs.LogInfo(
"[Reliable] - %s won't be reconnected. Error: %s", p.getInfo(), err)
}
if reconnected {
Expand Down
1 change: 0 additions & 1 deletion pkg/ha/reliable_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ type IReliable interface {
// In both cases it retries the reconnection

func retry(backoff int, reliable IReliable) (error, bool) {
reliable.setStatus(StatusReconnecting)
sleepValue := rand.Intn(int((reliable.getTimeOut().Seconds()-2+1)+2)*1000) + backoff*1000
logs.LogInfo("[Reliable] - The %s for the stream %s is in reconnection in %d milliseconds", reliable.getInfo(), reliable.getStreamName(), sleepValue)
time.Sleep(time.Duration(sleepValue) * time.Millisecond)
Expand Down
40 changes: 38 additions & 2 deletions pkg/stream/blocking_queue.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,43 @@
package stream

import "time"
import (
"errors"
"sync/atomic"
"time"
)

var ErrBlockingQueueStopped = errors.New("blocking queue stopped")

type BlockingQueue[T any] struct {
queue chan T
capacity int
status int32
}

// NewBlockingQueue initializes a new BlockingQueue with the given capacity
func NewBlockingQueue[T any](capacity int) *BlockingQueue[T] {
return &BlockingQueue[T]{
queue: make(chan T, capacity),
capacity: capacity,
status: 0,
}
}

// Enqueue adds an item to the queue, blocking if the queue is full
func (bq *BlockingQueue[T]) Enqueue(item T) {
func (bq *BlockingQueue[T]) Enqueue(item T) error {
if bq.IsStopped() {
return ErrBlockingQueueStopped
}
bq.queue <- item // This will block if the queue is full
return nil
}

// Dequeue removes an item from the queue with a timeout
func (bq *BlockingQueue[T]) Dequeue(timeout time.Duration) T {
if bq.IsStopped() {
var zeroValue T // Zero value of type T
return zeroValue
}
select {
case item := <-bq.queue:
return item
Expand All @@ -38,3 +54,23 @@ func (bq *BlockingQueue[T]) Size() int {
func (bq *BlockingQueue[T]) IsEmpty() bool {
return len(bq.queue) == 0
}

// Stop stops the queue from accepting new items
// but allows the existing items to be processed
// Stop is different from Close in that it allows the
// existing items to be processed.
// That avoids the need to drain the queue before closing it.
func (bq *BlockingQueue[T]) Stop() {
atomic.StoreInt32(&bq.status, 1)
}

func (bq *BlockingQueue[T]) Close() {
if bq.IsStopped() {
atomic.StoreInt32(&bq.status, 2)
close(bq.queue)
}
}

func (bq *BlockingQueue[T]) IsStopped() bool {
return atomic.LoadInt32(&bq.status) == 1 || atomic.LoadInt32(&bq.status) == 2
}
2 changes: 1 addition & 1 deletion pkg/stream/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ func (c *Client) DeclarePublisher(streamName string, options *ProducerOptions) (
res := c.internalDeclarePublisher(streamName, producer)
if res.Err == nil {
producer.startUnconfirmedMessagesTimeOutTask()
producer.processSendingMessages()
producer.processPendingSequencesQueue()
}
return producer, res.Err
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/stream/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ func lookErrorCode(errorCode uint16) error {
return InternalError
case responseCodeAuthenticationFailureLoopback:
return AuthenticationFailureLoopbackError
case timeoutError:
return ConfirmationTimoutError
default:
{
logs.LogWarn("Error not handled %d", errorCode)
Expand Down
55 changes: 37 additions & 18 deletions pkg/stream/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,21 +66,20 @@ func (coordinator *Coordinator) NewProducer(
return nil, err
}
var producer = &Producer{id: lastId,
options: parameters,
mutex: &sync.RWMutex{},
unConfirmed: newUnConfirmed(),
timeoutTicker: time.NewTicker(tickerTime),
doneTimeoutTicker: make(chan struct{}),
status: open,
//dynamicSendCh: make(chan *messageSequence, dynSize),
pendingMessagesQueue: NewBlockingQueue[*messageSequence](dynSize),
options: parameters,
mutex: &sync.RWMutex{},
unConfirmed: newUnConfirmed(),
confirmationTimeoutTicker: time.NewTicker(tickerTime),
doneTimeoutTicker: make(chan struct{}, 1),
status: open,
pendingSequencesQueue: NewBlockingQueue[*messageSequence](dynSize),
}
coordinator.producers[lastId] = producer
return producer, err
}

func (coordinator *Coordinator) RemoveConsumerById(id interface{}, reason Event) error {
consumer, err := coordinator.GetConsumerById(id)
consumer, err := coordinator.ExtractConsumerById(id)
if err != nil {
return err
}
Expand All @@ -92,27 +91,23 @@ func (coordinator *Coordinator) RemoveConsumerById(id interface{}, reason Event)
close(closeHandler)
}

return coordinator.removeById(id, coordinator.consumers)
return nil
}
func (coordinator *Coordinator) RemoveProducerById(id uint8, reason Event) error {

producer, err := coordinator.GetProducerById(id)
producer, err := coordinator.ExtractProducerById(id)
if err != nil {
return err
}
reason.StreamName = producer.GetStreamName()
reason.Name = producer.GetName()
tentatives := 0
for producer.lenUnConfirmed() > 0 && tentatives < 3 {
time.Sleep(200 * time.Millisecond)
tentatives++
}

if producer.closeHandler != nil {
producer.closeHandler <- reason
}

return coordinator.removeById(id, coordinator.producers)
producer.stopAndWaitPendingSequencesQueue()

return nil
}

func (coordinator *Coordinator) RemoveResponseById(id interface{}) error {
Expand Down Expand Up @@ -221,6 +216,18 @@ func (coordinator *Coordinator) GetConsumerById(id interface{}) (*Consumer, erro
return v.(*Consumer), err
}

func (coordinator *Coordinator) ExtractConsumerById(id interface{}) (*Consumer, error) {
coordinator.mutex.Lock()
defer coordinator.mutex.Unlock()
if coordinator.consumers[id] == nil {
return nil, errors.New("item #{id} not found ")
}
consumer := coordinator.consumers[id].(*Consumer)
coordinator.consumers[id] = nil
delete(coordinator.consumers, id)
return consumer, nil
}

func (coordinator *Coordinator) GetResponseById(id uint32) (*Response, error) {
v, err := coordinator.getById(fmt.Sprintf("%d", id), coordinator.responses)
if err != nil {
Expand All @@ -241,6 +248,18 @@ func (coordinator *Coordinator) GetProducerById(id interface{}) (*Producer, erro
return v.(*Producer), err
}

func (coordinator *Coordinator) ExtractProducerById(id interface{}) (*Producer, error) {
coordinator.mutex.Lock()
defer coordinator.mutex.Unlock()
if coordinator.producers[id] == nil {
return nil, errors.New("item #{id} not found ")
}
producer := coordinator.producers[id].(*Producer)
coordinator.producers[id] = nil
delete(coordinator.producers, id)
return producer, nil
}

// general functions

func (coordinator *Coordinator) getById(id interface{}, refmap map[interface{}]interface{}) (interface{}, error) {
Expand Down
Loading
Loading