1
- import { Cluster , Redis , RedisOptions } from 'ioredis' ;
2
- import { PubSubEngine } from 'graphql-subscriptions' ;
3
- import { PubSubAsyncIterator } from './pubsub-async-iterator' ;
4
-
5
- type RedisClient = Redis | Cluster ;
6
- type OnMessage < T > = ( message : T ) => void ;
7
- type DeserializerContext = { channel : string , pattern ?: string } ;
8
-
9
- export interface PubSubRedisOptions {
10
- connection ?: RedisOptions | string ;
11
- triggerTransform ?: TriggerTransform ;
12
- connectionListener ?: ( err : Error ) => void ;
13
- publisher ?: RedisClient ;
14
- subscriber ?: RedisClient ;
15
- reviver ?: Reviver ;
16
- serializer ?: Serializer ;
17
- deserializer ?: Deserializer ;
1
+ import { OnMessage , PubSubRedisBaseOptions , RedisPubSubBase } from "./redis-pubsub-base" ;
2
+
3
+ interface PubSubRedisSubscribeOptions {
18
4
messageEventName ?: string ;
19
5
pmessageEventName ?: string ;
20
6
}
21
7
22
- export class RedisPubSub implements PubSubEngine {
8
+ export type PubSubRedisOptions = PubSubRedisBaseOptions & PubSubRedisSubscribeOptions ;
23
9
10
+ /**
11
+ * Redis PubSub implementation that uses `subscribe` or `psubscribe` redis commands
12
+ * as the communication method.
13
+ */
14
+ export class RedisPubSub extends RedisPubSubBase {
24
15
constructor ( options : PubSubRedisOptions = { } ) {
16
+ super ( options ) ;
17
+
25
18
const {
26
- triggerTransform,
27
- connection,
28
- connectionListener,
29
- subscriber,
30
- publisher,
31
- reviver,
32
- serializer,
33
- deserializer,
34
19
messageEventName = 'message' ,
35
20
pmessageEventName = 'pmessage' ,
36
21
} = options ;
37
22
38
- this . triggerTransform = triggerTransform || ( trigger => trigger as string ) ;
39
-
40
- if ( reviver && deserializer ) {
41
- throw new Error ( "Reviver and deserializer can't be used together" ) ;
42
- }
43
-
44
- this . reviver = reviver ;
45
- this . serializer = serializer ;
46
- this . deserializer = deserializer ;
47
-
48
- if ( subscriber && publisher ) {
49
- this . redisPublisher = publisher ;
50
- this . redisSubscriber = subscriber ;
51
- } else {
52
- try {
53
- // eslint-disable-next-line @typescript-eslint/no-var-requires
54
- const IORedis = require ( 'ioredis' ) ;
55
- this . redisPublisher = new IORedis ( connection ) ;
56
- this . redisSubscriber = new IORedis ( connection ) ;
57
-
58
- if ( connectionListener ) {
59
- this . redisPublisher
60
- . on ( 'connect' , connectionListener )
61
- . on ( 'error' , connectionListener ) ;
62
- this . redisSubscriber
63
- . on ( 'connect' , connectionListener )
64
- . on ( 'error' , connectionListener ) ;
65
- } else {
66
- this . redisPublisher . on ( 'error' , console . error ) ;
67
- this . redisSubscriber . on ( 'error' , console . error ) ;
68
- }
69
- } catch ( error ) {
70
- console . error (
71
- `No publisher or subscriber instances were provided and the package 'ioredis' wasn't found. Couldn't create Redis clients.` ,
72
- ) ;
73
- }
74
- }
75
-
76
23
// handle messages received via psubscribe and subscribe
77
24
this . redisSubscriber . on ( pmessageEventName , this . onMessage . bind ( this ) ) ;
78
25
// partially applied function passes undefined for pattern arg since 'message' event won't provide it:
79
26
this . redisSubscriber . on ( messageEventName , this . onMessage . bind ( this , undefined ) ) ;
80
27
81
28
this . subscriptionMap = { } ;
82
29
this . subsRefsMap = new Map < string , Set < number > > ( ) ;
83
- this . currentSubscriptionId = 0 ;
84
30
}
85
31
86
32
public async publish < T > ( trigger : string , payload : T ) : Promise < void > {
@@ -138,36 +84,8 @@ export class RedisPubSub implements PubSubEngine {
138
84
}
139
85
delete this . subscriptionMap [ subId ] ;
140
86
}
141
-
142
- public asyncIterator < T > ( triggers : string | string [ ] , options ?: unknown ) : AsyncIterator < T > {
143
- return new PubSubAsyncIterator < T > ( this , triggers , options ) ;
144
- }
145
-
146
- public getSubscriber ( ) : RedisClient {
147
- return this . redisSubscriber ;
148
- }
149
-
150
- public getPublisher ( ) : RedisClient {
151
- return this . redisPublisher ;
152
- }
153
-
154
- public close ( ) : Promise < 'OK' [ ] > {
155
- return Promise . all ( [
156
- this . redisPublisher . quit ( ) ,
157
- this . redisSubscriber . quit ( ) ,
158
- ] ) ;
159
- }
160
-
161
- private readonly serializer ?: Serializer ;
162
- private readonly deserializer ?: Deserializer ;
163
- private readonly triggerTransform : TriggerTransform ;
164
- private readonly redisSubscriber : RedisClient ;
165
- private readonly redisPublisher : RedisClient ;
166
- private readonly reviver : Reviver ;
167
-
168
87
private readonly subscriptionMap : { [ subId : number ] : [ string , OnMessage < unknown > ] } ;
169
88
private readonly subsRefsMap : Map < string , Set < number > > ;
170
- private currentSubscriptionId : number ;
171
89
172
90
private onMessage ( pattern : string , channel : string , message : string ) {
173
91
const subscribers = this . subsRefsMap . get ( pattern || channel ) ;
@@ -189,14 +107,4 @@ export class RedisPubSub implements PubSubEngine {
189
107
listener ( parsedMessage ) ;
190
108
} ) ;
191
109
}
192
- }
193
-
194
- export type Path = Array < string | number > ;
195
- export type Trigger = string | Path ;
196
- export type TriggerTransform = (
197
- trigger : Trigger ,
198
- channelOptions ?: unknown ,
199
- ) => string ;
200
- export type Reviver = ( key : any , value : any ) => any ;
201
- export type Serializer = ( source : any ) => string ;
202
- export type Deserializer = ( source : string | Buffer , context : DeserializerContext ) => any ;
110
+ }
0 commit comments