Skip to content

Commit ea6dd78

Browse files
committed
utils: add subscription manager
1 parent d198bb8 commit ea6dd78

File tree

1 file changed

+148
-0
lines changed

1 file changed

+148
-0
lines changed

utils/subscription.go

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
package utils
2+
3+
import (
4+
"context"
5+
"sync"
6+
"time"
7+
)
8+
9+
// Subscription is an interface that allows to subscribe to events and handle
10+
// them.
11+
type Subscription[E any] interface {
12+
Subscribe(ctx context.Context) (eventChan <-chan E, errChan <-chan error, err error)
13+
HandleEvent(event E) error
14+
HandleError(err error)
15+
}
16+
17+
// SubscriptionManager is a manager that handles the subscription lifecycle.
18+
type SubscriptionManager[E any] struct {
19+
subscription Subscription[E]
20+
isSubscribed bool
21+
mutex sync.Mutex
22+
backoff time.Duration
23+
quitChan chan struct{}
24+
}
25+
26+
// NewSubscriptionManager creates a new subscription manager.
27+
func NewSubscriptionManager[E any](subscription Subscription[E],
28+
) *SubscriptionManager[E] {
29+
30+
return &SubscriptionManager[E]{
31+
subscription: subscription,
32+
backoff: time.Second * 2,
33+
quitChan: make(chan struct{}),
34+
}
35+
}
36+
37+
// IsSubscribed returns true if the subscription manager is currently
38+
// subscribed.
39+
func (sm *SubscriptionManager[E]) IsSubscribed() bool {
40+
sm.mutex.Lock()
41+
defer sm.mutex.Unlock()
42+
return sm.isSubscribed
43+
}
44+
45+
// Start starts the subscription manager.
46+
func (sm *SubscriptionManager[E]) Start(ctx context.Context) {
47+
sm.mutex.Lock()
48+
if sm.isSubscribed {
49+
sm.mutex.Unlock()
50+
return
51+
}
52+
sm.mutex.Unlock()
53+
54+
go sm.manageSubscription(ctx)
55+
}
56+
57+
// Stop stops the subscription manager.
58+
func (sm *SubscriptionManager[E]) Stop() {
59+
close(sm.quitChan)
60+
}
61+
62+
// manageSubscription manages the subscription lifecycle.
63+
func (sm *SubscriptionManager[E]) manageSubscription(ctx context.Context) {
64+
defer func() {
65+
sm.mutex.Lock()
66+
sm.isSubscribed = false
67+
sm.mutex.Unlock()
68+
}()
69+
70+
// The outer loop is used to retry the subscription. In case of an
71+
// error it will retry the subscription after a backoff, until the
72+
// context is done or the quit channel is closed.
73+
for {
74+
eventChan, errChan, err := sm.subscription.Subscribe(ctx)
75+
if err != nil {
76+
sm.subscription.HandleError(err)
77+
if !sm.shouldRetry(ctx) {
78+
return
79+
}
80+
continue
81+
}
82+
83+
sm.mutex.Lock()
84+
sm.isSubscribed = true
85+
sm.mutex.Unlock()
86+
87+
// The inner loop is used to handle events and errors. It will
88+
// retry the subscription in case of an error, until the context
89+
// is done or the quit channel is closed.
90+
handleLoop:
91+
for {
92+
select {
93+
case event, ok := <-eventChan:
94+
if !ok {
95+
if !sm.shouldRetry(ctx) {
96+
return
97+
}
98+
break handleLoop
99+
}
100+
if err := sm.subscription.HandleEvent(event); err != nil {
101+
sm.subscription.HandleError(err)
102+
}
103+
104+
case err, ok := <-errChan:
105+
if !ok || err == nil {
106+
if !sm.shouldRetry(ctx) {
107+
return
108+
}
109+
break handleLoop
110+
}
111+
sm.subscription.HandleError(err)
112+
if !sm.shouldRetry(ctx) {
113+
return
114+
}
115+
// If we receive an error we break out of the
116+
// handleLoop to retry the subscription.
117+
break handleLoop
118+
119+
case <-ctx.Done():
120+
return
121+
122+
case <-sm.quitChan:
123+
return
124+
}
125+
}
126+
}
127+
}
128+
129+
// shouldRetry determines if the subscription manager should retry the
130+
// subscription.
131+
func (sm *SubscriptionManager[E]) shouldRetry(ctx context.Context) bool {
132+
sm.mutex.Lock()
133+
sm.isSubscribed = false
134+
sm.mutex.Unlock()
135+
136+
select {
137+
case <-sm.quitChan:
138+
return false
139+
case <-ctx.Done():
140+
return false
141+
case <-time.After(sm.backoff):
142+
// Exponential backoff with cap
143+
if sm.backoff < time.Minute {
144+
sm.backoff *= 2
145+
}
146+
return true
147+
}
148+
}

0 commit comments

Comments
 (0)