diff --git a/packages/brokers/src/brokers/Broker.ts b/packages/brokers/src/brokers/Broker.ts index 49b0b223751f..889e50c0483d 100644 --- a/packages/brokers/src/brokers/Broker.ts +++ b/packages/brokers/src/brokers/Broker.ts @@ -42,13 +42,13 @@ export type ToEventMap< export interface IBaseBroker> { /** - * Subscribes to the given events, grouping them by the given group name + * Subscribes to the given events */ - subscribe(group: string, events: (keyof TEvents)[]): Promise; + subscribe(events: (keyof TEvents)[]): Promise; /** - * Unsubscribes from the given events - it's required to pass the same group name as when subscribing for proper cleanup + * Unsubscribes from the given events */ - unsubscribe(group: string, events: (keyof TEvents)[]): Promise; + unsubscribe(events: (keyof TEvents)[]): Promise; } export interface IPubSubBroker> diff --git a/packages/brokers/src/brokers/redis/BaseRedis.ts b/packages/brokers/src/brokers/redis/BaseRedis.ts index e7c3bf51cef2..5ad4f04830bb 100644 --- a/packages/brokers/src/brokers/redis/BaseRedis.ts +++ b/packages/brokers/src/brokers/redis/BaseRedis.ts @@ -23,10 +23,19 @@ export interface RedisBrokerOptions extends BaseBrokerOptions { * How long to block for messages when polling */ blockTimeout?: number; + + /** + * Consumer group name to use for this broker + * + * @see {@link https://redis.io/commands/xreadgroup/} + */ + group: string; + /** * Max number of messages to poll at once */ maxChunk?: number; + /** * Unique consumer name. * @@ -43,7 +52,7 @@ export const DefaultRedisBrokerOptions = { name: randomBytes(20).toString('hex'), maxChunk: 10, blockTimeout: 5_000, -} as const satisfies Required; +} as const satisfies Required>; /** * Helper class with shared Redis logic @@ -93,13 +102,13 @@ export abstract class BaseRedisBroker> /** * {@inheritDoc IBaseBroker.subscribe} */ - public async subscribe(group: string, events: (keyof TEvents)[]): Promise { + public async subscribe(events: (keyof TEvents)[]): Promise { await Promise.all( // @ts-expect-error: Intended events.map(async (event) => { this.subscribedEvents.add(event as string); try { - return await this.redisClient.xgroup('CREATE', event as string, group, 0, 'MKSTREAM'); + return await this.redisClient.xgroup('CREATE', event as string, this.options.group, 0, 'MKSTREAM'); } catch (error) { if (!(error instanceof ReplyError)) { throw error; @@ -107,18 +116,18 @@ export abstract class BaseRedisBroker> } }), ); - void this.listen(group); + void this.listen(); } /** * {@inheritDoc IBaseBroker.unsubscribe} */ - public async unsubscribe(group: string, events: (keyof TEvents)[]): Promise { + public async unsubscribe(events: (keyof TEvents)[]): Promise { const commands: unknown[][] = Array.from({ length: events.length * 2 }); for (let idx = 0; idx < commands.length; idx += 2) { const event = events[idx / 2]; - commands[idx] = ['xgroup', 'delconsumer', event as string, group, this.options.name]; - commands[idx + 1] = ['xcleangroup', event as string, group]; + commands[idx] = ['xgroup', 'delconsumer', event as string, this.options.group, this.options.name]; + commands[idx + 1] = ['xcleangroup', event as string, this.options.group]; } await this.redisClient.pipeline(commands).exec(); @@ -131,18 +140,18 @@ export abstract class BaseRedisBroker> /** * Begins polling for events, firing them to {@link BaseRedisBroker.listen} */ - protected async listen(group: string): Promise { + protected async listen(): Promise { if (this.listening) { return; } this.listening = true; - while (true) { + while (this.subscribedEvents.size > 0) { try { const data = await this.streamReadClient.xreadgroupBuffer( 'GROUP', - group, + this.options.group, this.options.name, 'COUNT', String(this.options.maxChunk), @@ -169,7 +178,7 @@ export abstract class BaseRedisBroker> continue; } - this.emitEvent(id, group, event.toString('utf8'), this.options.decode(data)); + this.emitEvent(id, this.options.group, event.toString('utf8'), this.options.decode(data)); } } } catch (error) { @@ -185,6 +194,7 @@ export abstract class BaseRedisBroker> * Destroys the broker, closing all connections */ public async destroy() { + await this.unsubscribe([...this.subscribedEvents]); this.streamReadClient.disconnect(); this.redisClient.disconnect(); } diff --git a/packages/brokers/src/brokers/redis/RPCRedis.ts b/packages/brokers/src/brokers/redis/RPCRedis.ts index 0e7662ac735c..5475654902a9 100644 --- a/packages/brokers/src/brokers/redis/RPCRedis.ts +++ b/packages/brokers/src/brokers/redis/RPCRedis.ts @@ -24,7 +24,7 @@ export interface RPCRedisBrokerOptions extends RedisBrokerOptions { export const DefaultRPCRedisBrokerOptions = { ...DefaultRedisBrokerOptions, timeout: 5_000, -} as const satisfies Required; +} as const satisfies Required>; /** * RPC broker powered by Redis @@ -114,11 +114,11 @@ export class RPCRedisBroker, TResponses exte }); } - protected emitEvent(id: Buffer, group: string, event: string, data: unknown) { + protected emitEvent(id: Buffer, event: string, data: unknown) { const payload: { ack(): Promise; data: unknown; reply(data: unknown): Promise } = { data, ack: async () => { - await this.redisClient.xack(event, group, id); + await this.redisClient.xack(event, this.options.group, id); }, reply: async (data) => { await this.redisClient.publish(`${event}:${id.toString()}`, this.options.encode(data));