Skip to content

Commit 216aebb

Browse files
sputn1ckhieblmi
authored andcommitted
notifications: add support for new ntfn
This commit adds support for the new notification of type NotificationTypeStaticLoopInSweepRequest. This notification is sent when a sweep request is received from the server.
1 parent ed1585d commit 216aebb

File tree

1 file changed

+53
-7
lines changed

1 file changed

+53
-7
lines changed

notifications/manager.go

Lines changed: 53 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ const (
2020
// NotificationTypeReservation is the notification type for reservation
2121
// notifications.
2222
NotificationTypeReservation
23+
24+
// NotificationTypeStaticLoopInSweepRequest is the notification type for
25+
// static loop in sweep requests.
26+
NotificationTypeStaticLoopInSweepRequest
2327
)
2428

2529
// Client is the interface that the notification manager needs to implement in
@@ -79,7 +83,8 @@ func (m *Manager) SubscribeReservations(ctx context.Context,
7983

8084
m.addSubscriber(NotificationTypeReservation, sub)
8185

82-
// Start a goroutine to remove the subscriber when the context is canceled
86+
// Start a goroutine to remove the subscriber when the context is
87+
// canceled.
8388
go func() {
8489
<-ctx.Done()
8590
m.removeSubscriber(NotificationTypeReservation, sub)
@@ -89,6 +94,34 @@ func (m *Manager) SubscribeReservations(ctx context.Context,
8994
return notifChan
9095
}
9196

97+
// SubscribeStaticLoopInSweepRequests subscribes to the static loop in sweep
98+
// requests.
99+
func (m *Manager) SubscribeStaticLoopInSweepRequests(ctx context.Context,
100+
) <-chan *swapserverrpc.ServerStaticLoopInSweepNotification {
101+
102+
notifChan := make(
103+
chan *swapserverrpc.ServerStaticLoopInSweepNotification, 1,
104+
)
105+
sub := subscriber{
106+
subCtx: ctx,
107+
recvChan: notifChan,
108+
}
109+
110+
m.addSubscriber(NotificationTypeStaticLoopInSweepRequest, sub)
111+
112+
// Start a goroutine to remove the subscriber when the context is
113+
// canceled.
114+
go func() {
115+
<-ctx.Done()
116+
m.removeSubscriber(
117+
NotificationTypeStaticLoopInSweepRequest, sub,
118+
)
119+
close(notifChan)
120+
}()
121+
122+
return notifChan
123+
}
124+
92125
// Run starts the notification manager. It will keep on running until the
93126
// context is canceled. It will subscribe to notifications and forward them to
94127
// the subscribers. On a first successful connection to the server, it will
@@ -160,7 +193,7 @@ func (m *Manager) subscribeNotifications(ctx context.Context,
160193
for {
161194
notification, err := notifStream.Recv()
162195
if err == nil && notification != nil {
163-
log.Debugf("Received notification: %v", notification)
196+
log.Tracef("Received notification: %v", notification)
164197
m.handleNotification(notification)
165198
continue
166199
}
@@ -173,13 +206,13 @@ func (m *Manager) subscribeNotifications(ctx context.Context,
173206

174207
// handleNotification handles an incoming notification from the server,
175208
// forwarding it to the appropriate subscribers.
176-
func (m *Manager) handleNotification(notification *swapserverrpc.
209+
func (m *Manager) handleNotification(ntfn *swapserverrpc.
177210
SubscribeNotificationsResponse) {
178211

179-
switch notification.Notification.(type) {
180-
case *swapserverrpc.SubscribeNotificationsResponse_ReservationNotification:
212+
switch ntfn.Notification.(type) {
213+
case *swapserverrpc.SubscribeNotificationsResponse_ReservationNotification: // nolint: lll
181214
// We'll forward the reservation notification to all subscribers.
182-
reservationNtfn := notification.GetReservationNotification()
215+
reservationNtfn := ntfn.GetReservationNotification()
183216
m.Lock()
184217
defer m.Unlock()
185218

@@ -189,10 +222,23 @@ func (m *Manager) handleNotification(notification *swapserverrpc.
189222

190223
recvChan <- reservationNtfn
191224
}
225+
case *swapserverrpc.SubscribeNotificationsResponse_StaticLoopInSweep: // nolint: lll
226+
// We'll forward the static loop in sweep request to all
227+
// subscribers.
228+
staticLoopInSweepRequestNtfn := ntfn.GetStaticLoopInSweep()
229+
m.Lock()
230+
defer m.Unlock()
231+
232+
for _, sub := range m.subscribers[NotificationTypeStaticLoopInSweepRequest] { // nolint: lll
233+
recvChan := sub.recvChan.(chan *swapserverrpc.
234+
ServerStaticLoopInSweepNotification)
235+
236+
recvChan <- staticLoopInSweepRequestNtfn
237+
}
192238

193239
default:
194240
log.Warnf("Received unknown notification type: %v",
195-
notification)
241+
ntfn)
196242
}
197243
}
198244

0 commit comments

Comments
 (0)