1
1
import { Cluster , Ok , Redis , RedisOptions } from 'ioredis' ;
2
- import { PubSubEngine } from 'graphql-subscriptions' ;
2
+
3
3
import { PubSubAsyncIterator } from './pubsub-async-iterator' ;
4
+ import { PubSubEngine } from 'graphql-subscriptions' ;
4
5
5
6
type RedisClient = Redis | Cluster ;
6
7
type OnMessage < T > = ( message : T ) => void ;
@@ -74,7 +75,7 @@ export class RedisPubSub implements PubSubEngine {
74
75
this . redisSubscriber . on ( 'message' , this . onMessage . bind ( this , undefined ) ) ;
75
76
76
77
this . subscriptionMap = { } ;
77
- this . subsRefsMap = { } ;
78
+ this . subsRefsMap = new Map < string , Set < number > > ( ) ;
78
79
this . currentSubscriptionId = 0 ;
79
80
}
80
81
@@ -92,9 +93,9 @@ export class RedisPubSub implements PubSubEngine {
92
93
const id = this . currentSubscriptionId ++ ;
93
94
this . subscriptionMap [ id ] = [ triggerName , onMessage ] ;
94
95
95
- const refs = this . subsRefsMap [ triggerName ] ;
96
- if ( refs && refs . length > 0 ) {
97
- this . subsRefsMap [ triggerName ] = [ ... refs , id ] ;
96
+ const refs = this . subsRefsMap . get ( triggerName ) ;
97
+ if ( refs ?. size > 0 ) {
98
+ refs . add ( id )
98
99
return Promise . resolve ( id ) ;
99
100
} else {
100
101
return new Promise < number > ( ( resolve , reject ) => {
@@ -104,10 +105,7 @@ export class RedisPubSub implements PubSubEngine {
104
105
if ( err ) {
105
106
reject ( err ) ;
106
107
} else {
107
- this . subsRefsMap [ triggerName ] = [
108
- ...( this . subsRefsMap [ triggerName ] || [ ] ) ,
109
- id ,
110
- ] ;
108
+ refs . add ( id )
111
109
resolve ( id ) ;
112
110
}
113
111
} ) ;
@@ -117,21 +115,18 @@ export class RedisPubSub implements PubSubEngine {
117
115
118
116
public unsubscribe ( subId : number ) : void {
119
117
const [ triggerName = null ] = this . subscriptionMap [ subId ] || [ ] ;
120
- const refs = this . subsRefsMap [ triggerName ] ;
118
+ const refs = this . subsRefsMap . get ( triggerName ) ;
121
119
122
120
if ( ! refs ) throw new Error ( `There is no subscription of id "${ subId } "` ) ;
123
121
124
- if ( refs . length === 1 ) {
122
+ if ( refs . size === 1 ) {
125
123
// unsubscribe from specific channel and pattern match
126
124
this . redisSubscriber . unsubscribe ( triggerName ) ;
127
125
this . redisSubscriber . punsubscribe ( triggerName ) ;
128
126
129
- delete this . subsRefsMap [ triggerName ] ;
127
+ this . subsRefsMap . delete ( triggerName ) ;
130
128
} else {
131
- const index = refs . indexOf ( subId ) ;
132
- this . subsRefsMap [ triggerName ] = index === - 1
133
- ? refs
134
- : [ ...refs . slice ( 0 , index ) , ...refs . slice ( index + 1 ) ] ;
129
+ refs . delete ( subId )
135
130
}
136
131
delete this . subscriptionMap [ subId ] ;
137
132
}
@@ -163,14 +158,14 @@ export class RedisPubSub implements PubSubEngine {
163
158
private readonly reviver : Reviver ;
164
159
165
160
private readonly subscriptionMap : { [ subId : number ] : [ string , OnMessage < unknown > ] } ;
166
- private readonly subsRefsMap : { [ trigger : string ] : Array < number > } ;
161
+ private readonly subsRefsMap : Map < string , Set < number > > ;
167
162
private currentSubscriptionId : number ;
168
163
169
164
private onMessage ( pattern : string , channel : string , message : string ) {
170
- const subscribers = this . subsRefsMap [ pattern || channel ] ;
165
+ const subscribers = this . subsRefsMap . get ( pattern || channel ) ;
171
166
172
167
// Don't work for nothing..
173
- if ( ! subscribers || ! subscribers . length ) return ;
168
+ if ( ! subscribers ?. size ) return ;
174
169
175
170
let parsedMessage ;
176
171
try {
@@ -179,7 +174,7 @@ export class RedisPubSub implements PubSubEngine {
179
174
parsedMessage = message ;
180
175
}
181
176
182
- for ( const subId of subscribers ) {
177
+ for ( const subId in subscribers ) {
183
178
const [ , listener ] = this . subscriptionMap [ subId ] ;
184
179
listener ( parsedMessage ) ;
185
180
}
0 commit comments