Skip to content

Commit 38a37b5

Browse files
refactor(brokers): re-design API to make groups a constructor option (discordjs#10297)
* fix(BaseRedis): remove listeners on destroy and stop pooling when no subscription * refactor(BaseRedis): group as constructor param and cleanup subscribers * fix(BaseRedis): remove listeners on destroy and stop pooling when no subscription * refactor(BaseRedis): group as constructor param and cleanup subscribers * chore(RPCRedis): group * Update packages/brokers/src/brokers/Broker.ts * Update packages/brokers/src/brokers/Broker.ts * Update packages/brokers/src/brokers/redis/BaseRedis.ts Removed `removeAllListeners` from destroy * chore(BaseRedis): destroy unsubscribe spread array --------- Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
1 parent 29a50bb commit 38a37b5

File tree

3 files changed

+28
-18
lines changed

3 files changed

+28
-18
lines changed

packages/brokers/src/brokers/Broker.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,13 @@ export type ToEventMap<
4242

4343
export interface IBaseBroker<TEvents extends Record<string, any>> {
4444
/**
45-
* Subscribes to the given events, grouping them by the given group name
45+
* Subscribes to the given events
4646
*/
47-
subscribe(group: string, events: (keyof TEvents)[]): Promise<void>;
47+
subscribe(events: (keyof TEvents)[]): Promise<void>;
4848
/**
49-
* Unsubscribes from the given events - it's required to pass the same group name as when subscribing for proper cleanup
49+
* Unsubscribes from the given events
5050
*/
51-
unsubscribe(group: string, events: (keyof TEvents)[]): Promise<void>;
51+
unsubscribe(events: (keyof TEvents)[]): Promise<void>;
5252
}
5353

5454
export interface IPubSubBroker<TEvents extends Record<string, any>>

packages/brokers/src/brokers/redis/BaseRedis.ts

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,19 @@ export interface RedisBrokerOptions extends BaseBrokerOptions {
2323
* How long to block for messages when polling
2424
*/
2525
blockTimeout?: number;
26+
27+
/**
28+
* Consumer group name to use for this broker
29+
*
30+
* @see {@link https://redis.io/commands/xreadgroup/}
31+
*/
32+
group: string;
33+
2634
/**
2735
* Max number of messages to poll at once
2836
*/
2937
maxChunk?: number;
38+
3039
/**
3140
* Unique consumer name.
3241
*
@@ -43,7 +52,7 @@ export const DefaultRedisBrokerOptions = {
4352
name: randomBytes(20).toString('hex'),
4453
maxChunk: 10,
4554
blockTimeout: 5_000,
46-
} as const satisfies Required<RedisBrokerOptions>;
55+
} as const satisfies Required<Omit<RedisBrokerOptions, 'group'>>;
4756

4857
/**
4958
* Helper class with shared Redis logic
@@ -93,32 +102,32 @@ export abstract class BaseRedisBroker<TEvents extends Record<string, any>>
93102
/**
94103
* {@inheritDoc IBaseBroker.subscribe}
95104
*/
96-
public async subscribe(group: string, events: (keyof TEvents)[]): Promise<void> {
105+
public async subscribe(events: (keyof TEvents)[]): Promise<void> {
97106
await Promise.all(
98107
// @ts-expect-error: Intended
99108
events.map(async (event) => {
100109
this.subscribedEvents.add(event as string);
101110
try {
102-
return await this.redisClient.xgroup('CREATE', event as string, group, 0, 'MKSTREAM');
111+
return await this.redisClient.xgroup('CREATE', event as string, this.options.group, 0, 'MKSTREAM');
103112
} catch (error) {
104113
if (!(error instanceof ReplyError)) {
105114
throw error;
106115
}
107116
}
108117
}),
109118
);
110-
void this.listen(group);
119+
void this.listen();
111120
}
112121

113122
/**
114123
* {@inheritDoc IBaseBroker.unsubscribe}
115124
*/
116-
public async unsubscribe(group: string, events: (keyof TEvents)[]): Promise<void> {
125+
public async unsubscribe(events: (keyof TEvents)[]): Promise<void> {
117126
const commands: unknown[][] = Array.from({ length: events.length * 2 });
118127
for (let idx = 0; idx < commands.length; idx += 2) {
119128
const event = events[idx / 2];
120-
commands[idx] = ['xgroup', 'delconsumer', event as string, group, this.options.name];
121-
commands[idx + 1] = ['xcleangroup', event as string, group];
129+
commands[idx] = ['xgroup', 'delconsumer', event as string, this.options.group, this.options.name];
130+
commands[idx + 1] = ['xcleangroup', event as string, this.options.group];
122131
}
123132

124133
await this.redisClient.pipeline(commands).exec();
@@ -131,18 +140,18 @@ export abstract class BaseRedisBroker<TEvents extends Record<string, any>>
131140
/**
132141
* Begins polling for events, firing them to {@link BaseRedisBroker.listen}
133142
*/
134-
protected async listen(group: string): Promise<void> {
143+
protected async listen(): Promise<void> {
135144
if (this.listening) {
136145
return;
137146
}
138147

139148
this.listening = true;
140149

141-
while (true) {
150+
while (this.subscribedEvents.size > 0) {
142151
try {
143152
const data = await this.streamReadClient.xreadgroupBuffer(
144153
'GROUP',
145-
group,
154+
this.options.group,
146155
this.options.name,
147156
'COUNT',
148157
String(this.options.maxChunk),
@@ -169,7 +178,7 @@ export abstract class BaseRedisBroker<TEvents extends Record<string, any>>
169178
continue;
170179
}
171180

172-
this.emitEvent(id, group, event.toString('utf8'), this.options.decode(data));
181+
this.emitEvent(id, this.options.group, event.toString('utf8'), this.options.decode(data));
173182
}
174183
}
175184
} catch (error) {
@@ -185,6 +194,7 @@ export abstract class BaseRedisBroker<TEvents extends Record<string, any>>
185194
* Destroys the broker, closing all connections
186195
*/
187196
public async destroy() {
197+
await this.unsubscribe([...this.subscribedEvents]);
188198
this.streamReadClient.disconnect();
189199
this.redisClient.disconnect();
190200
}

packages/brokers/src/brokers/redis/RPCRedis.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ export interface RPCRedisBrokerOptions extends RedisBrokerOptions {
2424
export const DefaultRPCRedisBrokerOptions = {
2525
...DefaultRedisBrokerOptions,
2626
timeout: 5_000,
27-
} as const satisfies Required<RPCRedisBrokerOptions>;
27+
} as const satisfies Required<Omit<RPCRedisBrokerOptions, 'group'>>;
2828

2929
/**
3030
* RPC broker powered by Redis
@@ -114,11 +114,11 @@ export class RPCRedisBroker<TEvents extends Record<string, any>, TResponses exte
114114
});
115115
}
116116

117-
protected emitEvent(id: Buffer, group: string, event: string, data: unknown) {
117+
protected emitEvent(id: Buffer, event: string, data: unknown) {
118118
const payload: { ack(): Promise<void>; data: unknown; reply(data: unknown): Promise<void> } = {
119119
data,
120120
ack: async () => {
121-
await this.redisClient.xack(event, group, id);
121+
await this.redisClient.xack(event, this.options.group, id);
122122
},
123123
reply: async (data) => {
124124
await this.redisClient.publish(`${event}:${id.toString()}`, this.options.encode(data));

0 commit comments

Comments
 (0)