Skip to content

Commit 5fc04b3

Browse files
committed
utils: add subscription manager
1 parent 9ac9e2f commit 5fc04b3

File tree

1 file changed

+115
-0
lines changed

1 file changed

+115
-0
lines changed

utils/subscription.go

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
package utils
2+
3+
import (
4+
"context"
5+
"sync"
6+
"time"
7+
)
8+
9+
type Subscription[E any] interface {
10+
Subscribe(ctx context.Context) (eventChan <-chan E, errChan <-chan error, err error)
11+
HandleEvent(event E) error
12+
HandleError(err error)
13+
}
14+
15+
type SubscriptionManager[E any] struct {
16+
subscription Subscription[E]
17+
isSubscribed bool
18+
mutex sync.Mutex
19+
backoff time.Duration
20+
}
21+
22+
func NewSubscriptionManager[E any](subscription Subscription[E]) *SubscriptionManager[E] {
23+
return &SubscriptionManager[E]{
24+
subscription: subscription,
25+
backoff: time.Second,
26+
}
27+
}
28+
29+
func (sm *SubscriptionManager[E]) IsSubscribed() bool {
30+
sm.mutex.Lock()
31+
defer sm.mutex.Unlock()
32+
return sm.isSubscribed
33+
}
34+
35+
func (sm *SubscriptionManager[E]) Start(ctx context.Context) {
36+
sm.mutex.Lock()
37+
if sm.isSubscribed {
38+
sm.mutex.Unlock()
39+
return
40+
}
41+
sm.mutex.Unlock()
42+
43+
go sm.manageSubscription(ctx)
44+
}
45+
46+
func (sm *SubscriptionManager[E]) manageSubscription(ctx context.Context) {
47+
defer func() {
48+
sm.mutex.Lock()
49+
sm.isSubscribed = false
50+
sm.mutex.Unlock()
51+
}()
52+
53+
for {
54+
eventChan, errChan, err := sm.subscription.Subscribe(ctx)
55+
if err != nil {
56+
sm.subscription.HandleError(err)
57+
if !sm.shouldRetry(ctx) {
58+
return
59+
}
60+
continue
61+
}
62+
63+
sm.mutex.Lock()
64+
sm.isSubscribed = true
65+
sm.mutex.Unlock()
66+
67+
handleLoop:
68+
for {
69+
select {
70+
case event, ok := <-eventChan:
71+
if !ok {
72+
if !sm.shouldRetry(ctx) {
73+
return
74+
}
75+
break
76+
}
77+
if err := sm.subscription.HandleEvent(event); err != nil {
78+
sm.subscription.HandleError(err)
79+
}
80+
case err, ok := <-errChan:
81+
if !ok || err == nil {
82+
if !sm.shouldRetry(ctx) {
83+
return
84+
}
85+
break
86+
}
87+
sm.subscription.HandleError(err)
88+
if !sm.shouldRetry(ctx) {
89+
return
90+
}
91+
break handleLoop
92+
case <-ctx.Done():
93+
return
94+
}
95+
}
96+
}
97+
}
98+
99+
func (sm *SubscriptionManager[E]) shouldRetry(ctx context.Context) bool {
100+
sm.mutex.Lock()
101+
sm.isSubscribed = false
102+
sm.mutex.Unlock()
103+
104+
select {
105+
case <-ctx.Done():
106+
return false
107+
case <-time.After(sm.backoff):
108+
// Exponential backoff with cap
109+
if sm.backoff < time.Minute {
110+
sm.backoff *= 2
111+
}
112+
sm.Start(ctx)
113+
return true
114+
}
115+
}

0 commit comments

Comments
 (0)