Skip to content

Commit f7152bf

Browse files
authored
Merge pull request #599 from mlaflamm/concurrent-sub-unsub-race
Fix concurrent subscribe race condition
2 parents f9f9d6c + 8f69547 commit f7152bf

File tree

2 files changed

+93
-2
lines changed

2 files changed

+93
-2
lines changed

src/redis-pubsub.ts

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ export class RedisPubSub implements PubSubEngine {
8080

8181
this.subscriptionMap = {};
8282
this.subsRefsMap = new Map<string, Set<number>>();
83+
this.subsPendingRefsMap = new Map<string, { refs: number[], pending: Promise<number> }>();
8384
this.currentSubscriptionId = 0;
8485
}
8586

@@ -108,22 +109,44 @@ export class RedisPubSub implements PubSubEngine {
108109
}
109110

110111
const refs = this.subsRefsMap.get(triggerName);
111-
if (refs.size > 0) {
112+
113+
const pendingRefs = this.subsPendingRefsMap.get(triggerName)
114+
if (pendingRefs != null) {
115+
// A pending remote subscribe call is currently in flight, piggyback on it
116+
pendingRefs.refs.push(id)
117+
return pendingRefs.pending.then(() => id)
118+
} else if (refs.size > 0) {
119+
// Already actively subscribed to redis
112120
refs.add(id);
113121
return Promise.resolve(id);
114122
} else {
115-
return new Promise<number>((resolve, reject) => {
123+
// New subscription.
124+
// Keep a pending state until the remote subscribe call is completed
125+
const pending = new Deferred()
126+
const subsPendingRefsMap = this.subsPendingRefsMap
127+
subsPendingRefsMap.set(triggerName, { refs: [], pending });
128+
129+
const sub = new Promise<number>((resolve, reject) => {
116130
const subscribeFn = options['pattern'] ? this.redisSubscriber.psubscribe : this.redisSubscriber.subscribe;
117131

118132
subscribeFn.call(this.redisSubscriber, triggerName, err => {
119133
if (err) {
134+
subsPendingRefsMap.delete(triggerName)
120135
reject(err);
121136
} else {
137+
// Add ids of subscribe calls initiated when waiting for the remote call response
138+
const pendingRefs = subsPendingRefsMap.get(triggerName)
139+
pendingRefs.refs.forEach((id) => refs.add(id))
140+
subsPendingRefsMap.delete(triggerName)
141+
122142
refs.add(id);
123143
resolve(id);
124144
}
125145
});
126146
});
147+
// Ensure waiting subscribe will complete
148+
sub.then(pending.resolve).catch(pending.reject)
149+
return sub;
127150
}
128151
}
129152

@@ -173,6 +196,7 @@ export class RedisPubSub implements PubSubEngine {
173196

174197
private readonly subscriptionMap: { [subId: number]: [string, OnMessage<unknown>] };
175198
private readonly subsRefsMap: Map<string, Set<number>>;
199+
private readonly subsPendingRefsMap: Map<string, { refs: number[], pending: Promise<number> }>;
176200
private currentSubscriptionId: number;
177201

178202
private onMessage(pattern: string, channel: string | Buffer, message: string | Buffer) {
@@ -203,6 +227,19 @@ export class RedisPubSub implements PubSubEngine {
203227
}
204228
}
205229

230+
// Unexported deferrable promise used to complete waiting subscribe calls
231+
function Deferred() {
232+
const p = this.promise = new Promise((resolve, reject) => {
233+
this.resolve = resolve;
234+
this.reject = reject;
235+
});
236+
this.then = p.then.bind(p);
237+
this.catch = p.catch.bind(p);
238+
if (p.finally) {
239+
this.finally = p.finally.bind(p);
240+
}
241+
}
242+
206243
export type Path = Array<string | number>;
207244
export type Trigger = string | Path;
208245
export type TriggerTransform = (

src/test/tests.ts

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,60 @@ describe('RedisPubSub', () => {
140140
});
141141
});
142142

143+
it('concurrent subscribe, unsubscribe first sub before second sub complete', done => {
144+
const promises = {
145+
firstSub: null as Promise<number>,
146+
secondSub: null as Promise<number>,
147+
}
148+
149+
let firstCb, secondCb
150+
const redisSubCallback = (channel, cb) => {
151+
process.nextTick(() => {
152+
if (!firstCb) {
153+
firstCb = () => cb(null, channel)
154+
// Handling first call, init second sub
155+
promises.secondSub = pubSub.subscribe('Posts', () => null)
156+
// Continue first sub callback
157+
firstCb()
158+
} else {
159+
secondCb = () => cb(null, channel)
160+
}
161+
})
162+
}
163+
const subscribeStub = stub().callFn(redisSubCallback);
164+
const mockRedisClientWithSubStub = {...mockRedisClient, ...{subscribe: subscribeStub}};
165+
const mockOptionsWithSubStub = {...mockOptions, ...{subscriber: (mockRedisClientWithSubStub as any)}}
166+
const pubSub = new RedisPubSub(mockOptionsWithSubStub);
167+
168+
// First leg of the test, init first sub and immediately unsubscribe. The second sub is triggered in the redis cb
169+
// before the first promise sub complete
170+
promises.firstSub = pubSub.subscribe('Posts', () => null)
171+
.then(subId => {
172+
// This assertion is done against a private member, if you change the internals, you may want to change that
173+
expect((pubSub as any).subscriptionMap[subId]).not.to.be.an('undefined');
174+
pubSub.unsubscribe(subId);
175+
176+
// Continue second sub callback
177+
promises.firstSub.then(() => secondCb())
178+
return subId;
179+
});
180+
181+
// Second leg of the test, here we have unsubscribed from the first sub. We try unsubbing from the second sub
182+
// as soon it is ready
183+
promises.firstSub
184+
.then((subId) => {
185+
// This assertion is done against a private member, if you change the internals, you may want to change that
186+
expect((pubSub as any).subscriptionMap[subId]).to.be.an('undefined');
187+
expect(() => pubSub.unsubscribe(subId)).to.throw(`There is no subscription of id "${subId}"`);
188+
189+
return promises.secondSub.then(secondSubId => {
190+
pubSub.unsubscribe(secondSubId);
191+
})
192+
.then(done)
193+
.catch(done)
194+
});
195+
});
196+
143197
it('will not unsubscribe from the redis channel if there is another subscriber on it\'s subscriber list', done => {
144198
const pubSub = new RedisPubSub(mockOptions);
145199
const subscriptionPromises = [

0 commit comments

Comments
 (0)