1
1
import { Cluster , Ok , Redis , RedisOptions } from 'ioredis' ;
2
-
3
- import { PubSubAsyncIterator } from './pubsub-async-iterator' ;
4
2
import { PubSubEngine } from 'graphql-subscriptions' ;
3
+ import { PubSubAsyncIterator } from './pubsub-async-iterator' ;
5
4
6
5
type RedisClient = Redis | Cluster ;
7
6
type OnMessage < T > = ( message : T ) => void ;
@@ -93,9 +92,13 @@ export class RedisPubSub implements PubSubEngine {
93
92
const id = this . currentSubscriptionId ++ ;
94
93
this . subscriptionMap [ id ] = [ triggerName , onMessage ] ;
95
94
95
+ if ( ! this . subsRefsMap . has ( triggerName ) ) {
96
+ this . subsRefsMap . set ( triggerName , new Set ( ) ) ;
97
+ }
98
+
96
99
const refs = this . subsRefsMap . get ( triggerName ) ;
97
- if ( refs ? .size > 0 ) {
98
- refs . add ( id )
100
+ if ( refs . size > 0 ) {
101
+ refs . add ( id ) ;
99
102
return Promise . resolve ( id ) ;
100
103
} else {
101
104
return new Promise < number > ( ( resolve , reject ) => {
@@ -105,7 +108,7 @@ export class RedisPubSub implements PubSubEngine {
105
108
if ( err ) {
106
109
reject ( err ) ;
107
110
} else {
108
- refs . add ( id )
111
+ refs . add ( id ) ;
109
112
resolve ( id ) ;
110
113
}
111
114
} ) ;
@@ -126,7 +129,7 @@ export class RedisPubSub implements PubSubEngine {
126
129
127
130
this . subsRefsMap . delete ( triggerName ) ;
128
131
} else {
129
- refs . delete ( subId )
132
+ refs . delete ( subId ) ;
130
133
}
131
134
delete this . subscriptionMap [ subId ] ;
132
135
}
@@ -174,10 +177,10 @@ export class RedisPubSub implements PubSubEngine {
174
177
parsedMessage = message ;
175
178
}
176
179
177
- for ( const subId in subscribers ) {
180
+ subscribers . forEach ( subId => {
178
181
const [ , listener ] = this . subscriptionMap [ subId ] ;
179
182
listener ( parsedMessage ) ;
180
- }
183
+ } ) ;
181
184
}
182
185
}
183
186
0 commit comments