Skip to content

Commit 3268957

Browse files
committed
Fix concurrent sub + quick unsub race condition
1 parent 1775b7c commit 3268957

File tree

2 files changed

+93
-2
lines changed

2 files changed

+93
-2
lines changed

src/redis-pubsub.ts

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ export class RedisPubSub implements PubSubEngine {
8080

8181
this.subscriptionMap = {};
8282
this.subsRefsMap = new Map<string, Set<number>>();
83+
this.subsPendingRefsMap = new Map<string, { refs: number[], pending: Promise<number> }>();
8384
this.currentSubscriptionId = 0;
8485
}
8586

@@ -102,22 +103,44 @@ export class RedisPubSub implements PubSubEngine {
102103
}
103104

104105
const refs = this.subsRefsMap.get(triggerName);
105-
if (refs.size > 0) {
106+
107+
const pendingRefs = this.subsPendingRefsMap.get(triggerName)
108+
if (pendingRefs != null) {
109+
// A pending remote subscribe call is currently in flight, piggyback on it
110+
pendingRefs.refs.push(id)
111+
return pendingRefs.pending.then(() => id)
112+
} else if (refs.size > 0) {
113+
// Already actively subscribed to redis
106114
refs.add(id);
107115
return Promise.resolve(id);
108116
} else {
109-
return new Promise<number>((resolve, reject) => {
117+
// New subscription.
118+
// Keep a pending state until the remote subscribe call is completed
119+
const pending = new Deferred()
120+
const subsPendingRefsMap = this.subsPendingRefsMap
121+
subsPendingRefsMap.set(triggerName, { refs: [], pending });
122+
123+
const sub = new Promise<number>((resolve, reject) => {
110124
const subscribeFn = options['pattern'] ? this.redisSubscriber.psubscribe : this.redisSubscriber.subscribe;
111125

112126
subscribeFn.call(this.redisSubscriber, triggerName, err => {
113127
if (err) {
128+
subsPendingRefsMap.delete(triggerName)
114129
reject(err);
115130
} else {
131+
// Add ids of subscribe calls initiated when waiting for the remote call response
132+
const pendingRefs = subsPendingRefsMap.get(triggerName)
133+
pendingRefs.refs.forEach((id) => refs.add(id))
134+
subsPendingRefsMap.delete(triggerName)
135+
116136
refs.add(id);
117137
resolve(id);
118138
}
119139
});
120140
});
141+
// Ensure waiting subscribe will complete
142+
sub.then(pending.resolve).catch(pending.reject)
143+
return sub;
121144
}
122145
}
123146

@@ -167,6 +190,7 @@ export class RedisPubSub implements PubSubEngine {
167190

168191
private readonly subscriptionMap: { [subId: number]: [string, OnMessage<unknown>] };
169192
private readonly subsRefsMap: Map<string, Set<number>>;
193+
private readonly subsPendingRefsMap: Map<string, { refs: number[], pending: Promise<number> }>;
170194
private currentSubscriptionId: number;
171195

172196
private onMessage(pattern: string, channel: string, message: string) {
@@ -191,6 +215,19 @@ export class RedisPubSub implements PubSubEngine {
191215
}
192216
}
193217

218+
// Unexported deferrable promise used to complete waiting subscribe calls
219+
function Deferred() {
220+
const p = this.promise = new Promise((resolve, reject) => {
221+
this.resolve = resolve;
222+
this.reject = reject;
223+
});
224+
this.then = p.then.bind(p);
225+
this.catch = p.catch.bind(p);
226+
if (p.finally) {
227+
this.finally = p.finally.bind(p);
228+
}
229+
}
230+
194231
export type Path = Array<string | number>;
195232
export type Trigger = string | Path;
196233
export type TriggerTransform = (

src/test/tests.ts

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,60 @@ describe('RedisPubSub', () => {
140140
});
141141
});
142142

143+
it('concurrent subscribe, unsubscribe first sub before second sub complete', done => {
144+
const subs = {
145+
first: null as Promise<number>,
146+
second: null as Promise<number>,
147+
}
148+
149+
let firstCb, secondCb
150+
const redisSubCallback = (channel, cb) => {
151+
process.nextTick(() => {
152+
if (!firstCb) {
153+
firstCb = () => cb(null, channel)
154+
// Handling first call, init second sub
155+
subs.second = pubSub.subscribe('Posts', () => null)
156+
// Continue first sub callback
157+
firstCb()
158+
} else {
159+
secondCb = () => cb(null, channel)
160+
}
161+
})
162+
}
163+
const subscribeStub = stub().callFn(redisSubCallback);
164+
const mockRedisClientWithSubStub = {...mockRedisClient, ...{subscribe: subscribeStub}};
165+
const mockOptionsWithSubStub = {...mockOptions, ...{subscriber: (mockRedisClientWithSubStub as any)}}
166+
const pubSub = new RedisPubSub(mockOptionsWithSubStub);
167+
168+
// First leg of the test, init first sub and immediately unsubscribe. The second sub is triggered in the redis cb
169+
// before the first promise sub complete
170+
subs.first = pubSub.subscribe('Posts', () => null)
171+
.then(subId => {
172+
// This assertion is done against a private member, if you change the internals, you may want to change that
173+
expect((pubSub as any).subscriptionMap[subId]).not.to.be.an('undefined');
174+
pubSub.unsubscribe(subId);
175+
176+
// Continue second sub callback
177+
subs.first.then(() => secondCb())
178+
return subId;
179+
});
180+
181+
// Second leg of the test, here we have unsubscribed from the first sub. We try unsubbing from the second sub
182+
// as soon it is ready
183+
subs.first
184+
.then((subId) => {
185+
// This assertion is done against a private member, if you change the internals, you may want to change that
186+
expect((pubSub as any).subscriptionMap[subId]).to.be.an('undefined');
187+
expect(() => pubSub.unsubscribe(subId)).to.throw(`There is no subscription of id "${subId}"`);
188+
189+
return subs.second.then(secondSubId => {
190+
pubSub.unsubscribe(secondSubId);
191+
})
192+
.then(done)
193+
.catch(done)
194+
});
195+
});
196+
143197
it('will not unsubscribe from the redis channel if there is another subscriber on it\'s subscriber list', done => {
144198
const pubSub = new RedisPubSub(mockOptions);
145199
const subscriptionPromises = [

0 commit comments

Comments
 (0)