Skip to content

Commit 524e702

Browse files
committed
feat: Add redis streams support.
Closes #340.
1 parent 1775b7c commit 524e702

File tree

6 files changed

+798
-105
lines changed

6 files changed

+798
-105
lines changed

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
"lint": "eslint src --ext ts",
3232
"watch": "tsc -w",
3333
"testonly": "mocha --reporter spec src/test/tests.ts",
34-
"integration": "mocha --reporter spec src/test/integration-tests.ts",
34+
"integration": "mocha --reporter spec src/test/integration-tests.ts src/test/stream-tests.ts",
3535
"coverage": "nyc --reporter=html --reporter=text mocha src/test/**/*.ts",
3636
"prepublish": "tsc",
3737
"prepublishOnly": "npm run test"

src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
export { RedisPubSub } from './redis-pubsub';
2+
export { RedisStreamPubSub } from './redis-stream-pubsub';

src/redis-pubsub-base.ts

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
import { Cluster, Redis, RedisOptions } from 'ioredis';
2+
import { PubSubEngine } from 'graphql-subscriptions';
3+
import {PubSubAsyncIterator} from './pubsub-async-iterator';
4+
5+
type DeserializerContext = { channel: string, pattern?: string };
6+
7+
export type RedisClient = Redis | Cluster;
8+
export type OnMessage<T> = (message: T) => void;
9+
10+
export interface PubSubRedisBaseOptions {
11+
connection?: RedisOptions | string;
12+
triggerTransform?: TriggerTransform;
13+
connectionListener?: (err: Error) => void;
14+
publisher?: RedisClient;
15+
subscriber?: RedisClient;
16+
reviver?: Reviver;
17+
serializer?: Serializer;
18+
deserializer?: Deserializer;
19+
}
20+
21+
export abstract class RedisPubSubBase implements PubSubEngine {
22+
23+
constructor(options: PubSubRedisBaseOptions = {}) {
24+
const {
25+
triggerTransform,
26+
connection,
27+
connectionListener,
28+
subscriber,
29+
publisher,
30+
reviver,
31+
serializer,
32+
deserializer,
33+
} = options;
34+
35+
this.triggerTransform = triggerTransform || (trigger => trigger as string);
36+
37+
if (reviver && deserializer) {
38+
throw new Error("Reviver and deserializer can't be used together");
39+
}
40+
41+
this.reviver = reviver;
42+
this.serializer = serializer;
43+
this.deserializer = deserializer;
44+
45+
if (subscriber && publisher) {
46+
this.redisPublisher = publisher;
47+
this.redisSubscriber = subscriber;
48+
} else {
49+
try {
50+
// eslint-disable-next-line @typescript-eslint/no-var-requires
51+
const IORedis = require('ioredis');
52+
this.redisPublisher = new IORedis(connection);
53+
this.redisSubscriber = new IORedis(connection);
54+
55+
if (connectionListener) {
56+
this.redisPublisher
57+
.on('connect', connectionListener)
58+
.on('error', connectionListener);
59+
this.redisSubscriber
60+
.on('connect', connectionListener)
61+
.on('error', connectionListener);
62+
} else {
63+
this.redisPublisher.on('error', console.error);
64+
this.redisSubscriber.on('error', console.error);
65+
}
66+
} catch (error) {
67+
console.error(
68+
`No publisher or subscriber instances were provided and the package 'ioredis' wasn't found. Couldn't create Redis clients.`,
69+
);
70+
}
71+
}
72+
73+
this.currentSubscriptionId = 0;
74+
}
75+
76+
public abstract publish<T>(trigger: string, payload: T): Promise<void>;
77+
78+
public abstract subscribe<T = any>(
79+
trigger: string,
80+
onMessage: OnMessage<T>,
81+
options: unknown,
82+
): Promise<number>;
83+
84+
public abstract unsubscribe(subId: number): void;
85+
86+
public asyncIterator<T>(triggers: string | string[], options?: unknown): AsyncIterator<T> {
87+
return new PubSubAsyncIterator<T>(this, triggers, options);
88+
}
89+
90+
public getSubscriber(): RedisClient {
91+
return this.redisSubscriber;
92+
}
93+
94+
public getPublisher(): RedisClient {
95+
return this.redisPublisher;
96+
}
97+
98+
public close(): Promise<'OK'[]> {
99+
return Promise.all([
100+
this.redisPublisher.quit(),
101+
this.redisSubscriber.quit(),
102+
]);
103+
}
104+
105+
protected readonly serializer?: Serializer;
106+
protected readonly deserializer?: Deserializer;
107+
protected readonly triggerTransform: TriggerTransform;
108+
protected readonly redisSubscriber: RedisClient;
109+
protected readonly redisPublisher: RedisClient;
110+
protected readonly reviver: Reviver;
111+
112+
protected currentSubscriptionId: number;
113+
}
114+
115+
export type Path = Array<string | number>;
116+
export type Trigger = string | Path;
117+
export type TriggerTransform = (
118+
trigger: Trigger,
119+
channelOptions?: unknown,
120+
) => string;
121+
export type Reviver = (key: any, value: any) => any;
122+
export type Serializer = (source: any) => string;
123+
export type Deserializer = (source: string | Buffer, context: DeserializerContext) => any;

src/redis-pubsub.ts

Lines changed: 12 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -1,86 +1,32 @@
1-
import {Cluster, Redis, RedisOptions} from 'ioredis';
2-
import {PubSubEngine} from 'graphql-subscriptions';
3-
import {PubSubAsyncIterator} from './pubsub-async-iterator';
4-
5-
type RedisClient = Redis | Cluster;
6-
type OnMessage<T> = (message: T) => void;
7-
type DeserializerContext = { channel: string, pattern?: string };
8-
9-
export interface PubSubRedisOptions {
10-
connection?: RedisOptions | string;
11-
triggerTransform?: TriggerTransform;
12-
connectionListener?: (err: Error) => void;
13-
publisher?: RedisClient;
14-
subscriber?: RedisClient;
15-
reviver?: Reviver;
16-
serializer?: Serializer;
17-
deserializer?: Deserializer;
1+
import { OnMessage, PubSubRedisBaseOptions, RedisPubSubBase } from "./redis-pubsub-base";
2+
3+
interface PubSubRedisSubscribeOptions {
184
messageEventName?: string;
195
pmessageEventName?: string;
206
}
217

22-
export class RedisPubSub implements PubSubEngine {
8+
export type PubSubRedisOptions = PubSubRedisBaseOptions & PubSubRedisSubscribeOptions;
239

10+
/**
11+
* Redis PubSub implementation that uses `subscribe` or `psubscribe` redis commands
12+
* as the communication method.
13+
*/
14+
export class RedisPubSub extends RedisPubSubBase {
2415
constructor(options: PubSubRedisOptions = {}) {
16+
super(options);
17+
2518
const {
26-
triggerTransform,
27-
connection,
28-
connectionListener,
29-
subscriber,
30-
publisher,
31-
reviver,
32-
serializer,
33-
deserializer,
3419
messageEventName = 'message',
3520
pmessageEventName = 'pmessage',
3621
} = options;
3722

38-
this.triggerTransform = triggerTransform || (trigger => trigger as string);
39-
40-
if (reviver && deserializer) {
41-
throw new Error("Reviver and deserializer can't be used together");
42-
}
43-
44-
this.reviver = reviver;
45-
this.serializer = serializer;
46-
this.deserializer = deserializer;
47-
48-
if (subscriber && publisher) {
49-
this.redisPublisher = publisher;
50-
this.redisSubscriber = subscriber;
51-
} else {
52-
try {
53-
// eslint-disable-next-line @typescript-eslint/no-var-requires
54-
const IORedis = require('ioredis');
55-
this.redisPublisher = new IORedis(connection);
56-
this.redisSubscriber = new IORedis(connection);
57-
58-
if (connectionListener) {
59-
this.redisPublisher
60-
.on('connect', connectionListener)
61-
.on('error', connectionListener);
62-
this.redisSubscriber
63-
.on('connect', connectionListener)
64-
.on('error', connectionListener);
65-
} else {
66-
this.redisPublisher.on('error', console.error);
67-
this.redisSubscriber.on('error', console.error);
68-
}
69-
} catch (error) {
70-
console.error(
71-
`No publisher or subscriber instances were provided and the package 'ioredis' wasn't found. Couldn't create Redis clients.`,
72-
);
73-
}
74-
}
75-
7623
// handle messages received via psubscribe and subscribe
7724
this.redisSubscriber.on(pmessageEventName, this.onMessage.bind(this));
7825
// partially applied function passes undefined for pattern arg since 'message' event won't provide it:
7926
this.redisSubscriber.on(messageEventName, this.onMessage.bind(this, undefined));
8027

8128
this.subscriptionMap = {};
8229
this.subsRefsMap = new Map<string, Set<number>>();
83-
this.currentSubscriptionId = 0;
8430
}
8531

8632
public async publish<T>(trigger: string, payload: T): Promise<void> {
@@ -138,36 +84,8 @@ export class RedisPubSub implements PubSubEngine {
13884
}
13985
delete this.subscriptionMap[subId];
14086
}
141-
142-
public asyncIterator<T>(triggers: string | string[], options?: unknown): AsyncIterator<T> {
143-
return new PubSubAsyncIterator<T>(this, triggers, options);
144-
}
145-
146-
public getSubscriber(): RedisClient {
147-
return this.redisSubscriber;
148-
}
149-
150-
public getPublisher(): RedisClient {
151-
return this.redisPublisher;
152-
}
153-
154-
public close(): Promise<'OK'[]> {
155-
return Promise.all([
156-
this.redisPublisher.quit(),
157-
this.redisSubscriber.quit(),
158-
]);
159-
}
160-
161-
private readonly serializer?: Serializer;
162-
private readonly deserializer?: Deserializer;
163-
private readonly triggerTransform: TriggerTransform;
164-
private readonly redisSubscriber: RedisClient;
165-
private readonly redisPublisher: RedisClient;
166-
private readonly reviver: Reviver;
167-
16887
private readonly subscriptionMap: { [subId: number]: [string, OnMessage<unknown>] };
16988
private readonly subsRefsMap: Map<string, Set<number>>;
170-
private currentSubscriptionId: number;
17189

17290
private onMessage(pattern: string, channel: string, message: string) {
17391
const subscribers = this.subsRefsMap.get(pattern || channel);
@@ -189,14 +107,4 @@ export class RedisPubSub implements PubSubEngine {
189107
listener(parsedMessage);
190108
});
191109
}
192-
}
193-
194-
export type Path = Array<string | number>;
195-
export type Trigger = string | Path;
196-
export type TriggerTransform = (
197-
trigger: Trigger,
198-
channelOptions?: unknown,
199-
) => string;
200-
export type Reviver = (key: any, value: any) => any;
201-
export type Serializer = (source: any) => string;
202-
export type Deserializer = (source: string | Buffer, context: DeserializerContext) => any;
110+
}

0 commit comments

Comments
 (0)