From 8bdfe1133656a0d634165cb0f801c22debd20d34 Mon Sep 17 00:00:00 2001 From: Nitzan Savion Date: Thu, 23 May 2024 23:41:12 +0300 Subject: [PATCH 1/9] fix(BaseRedis): remove listeners on destroy and stop pooling when no subscription --- packages/brokers/src/brokers/redis/BaseRedis.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/brokers/src/brokers/redis/BaseRedis.ts b/packages/brokers/src/brokers/redis/BaseRedis.ts index e7c3bf51cef2..320c3b6151ab 100644 --- a/packages/brokers/src/brokers/redis/BaseRedis.ts +++ b/packages/brokers/src/brokers/redis/BaseRedis.ts @@ -138,7 +138,7 @@ export abstract class BaseRedisBroker> this.listening = true; - while (true) { + while (this.subscribedEvents.size > 0) { try { const data = await this.streamReadClient.xreadgroupBuffer( 'GROUP', @@ -187,6 +187,7 @@ export abstract class BaseRedisBroker> public async destroy() { this.streamReadClient.disconnect(); this.redisClient.disconnect(); + this.removeAllListeners(); } /** From 818829b500b28bd1a61e318f4ca58e37d6352b01 Mon Sep 17 00:00:00 2001 From: Nitzan Savion Date: Wed, 29 May 2024 00:58:56 +0300 Subject: [PATCH 2/9] refactor(BaseRedis): group as constructor param and cleanup subscribers --- packages/brokers/src/brokers/Broker.ts | 4 +-- .../brokers/src/brokers/redis/BaseRedis.ts | 30 ++++++++++++------- 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/packages/brokers/src/brokers/Broker.ts b/packages/brokers/src/brokers/Broker.ts index 49b0b223751f..caed1a705450 100644 --- a/packages/brokers/src/brokers/Broker.ts +++ b/packages/brokers/src/brokers/Broker.ts @@ -44,11 +44,11 @@ export interface IBaseBroker> { /** * Subscribes to the given events, grouping them by the given group name */ - subscribe(group: string, events: (keyof TEvents)[]): Promise; + subscribe(events: (keyof TEvents)[]): Promise; /** * Unsubscribes from the given events - it's required to pass the same group name as when subscribing for proper cleanup */ - unsubscribe(group: string, events: (keyof TEvents)[]): Promise; + unsubscribe(events: (keyof TEvents)[]): Promise; } export interface IPubSubBroker> diff --git a/packages/brokers/src/brokers/redis/BaseRedis.ts b/packages/brokers/src/brokers/redis/BaseRedis.ts index 320c3b6151ab..bbef541ace59 100644 --- a/packages/brokers/src/brokers/redis/BaseRedis.ts +++ b/packages/brokers/src/brokers/redis/BaseRedis.ts @@ -23,10 +23,19 @@ export interface RedisBrokerOptions extends BaseBrokerOptions { * How long to block for messages when polling */ blockTimeout?: number; + + /** + * Consumer group name to use for this broker + * + * @see {@link https://redis.io/commands/xreadgroup/} + */ + group: string; + /** * Max number of messages to poll at once */ maxChunk?: number; + /** * Unique consumer name. * @@ -43,7 +52,7 @@ export const DefaultRedisBrokerOptions = { name: randomBytes(20).toString('hex'), maxChunk: 10, blockTimeout: 5_000, -} as const satisfies Required; +} as const satisfies Required>; /** * Helper class with shared Redis logic @@ -93,13 +102,13 @@ export abstract class BaseRedisBroker> /** * {@inheritDoc IBaseBroker.subscribe} */ - public async subscribe(group: string, events: (keyof TEvents)[]): Promise { + public async subscribe(events: (keyof TEvents)[]): Promise { await Promise.all( // @ts-expect-error: Intended events.map(async (event) => { this.subscribedEvents.add(event as string); try { - return await this.redisClient.xgroup('CREATE', event as string, group, 0, 'MKSTREAM'); + return await this.redisClient.xgroup('CREATE', event as string, this.options.group, 0, 'MKSTREAM'); } catch (error) { if (!(error instanceof ReplyError)) { throw error; @@ -107,18 +116,18 @@ export abstract class BaseRedisBroker> } }), ); - void this.listen(group); + void this.listen(); } /** * {@inheritDoc IBaseBroker.unsubscribe} */ - public async unsubscribe(group: string, events: (keyof TEvents)[]): Promise { + public async unsubscribe(events: (keyof TEvents)[]): Promise { const commands: unknown[][] = Array.from({ length: events.length * 2 }); for (let idx = 0; idx < commands.length; idx += 2) { const event = events[idx / 2]; - commands[idx] = ['xgroup', 'delconsumer', event as string, group, this.options.name]; - commands[idx + 1] = ['xcleangroup', event as string, group]; + commands[idx] = ['xgroup', 'delconsumer', event as string, this.options.group, this.options.name]; + commands[idx + 1] = ['xcleangroup', event as string, this.options.group]; } await this.redisClient.pipeline(commands).exec(); @@ -131,7 +140,7 @@ export abstract class BaseRedisBroker> /** * Begins polling for events, firing them to {@link BaseRedisBroker.listen} */ - protected async listen(group: string): Promise { + protected async listen(): Promise { if (this.listening) { return; } @@ -142,7 +151,7 @@ export abstract class BaseRedisBroker> try { const data = await this.streamReadClient.xreadgroupBuffer( 'GROUP', - group, + this.options.group, this.options.name, 'COUNT', String(this.options.maxChunk), @@ -169,7 +178,7 @@ export abstract class BaseRedisBroker> continue; } - this.emitEvent(id, group, event.toString('utf8'), this.options.decode(data)); + this.emitEvent(id, this.options.group, event.toString('utf8'), this.options.decode(data)); } } } catch (error) { @@ -185,6 +194,7 @@ export abstract class BaseRedisBroker> * Destroys the broker, closing all connections */ public async destroy() { + await this.unsubscribe(Array.from(this.subscribedEvents)); this.streamReadClient.disconnect(); this.redisClient.disconnect(); this.removeAllListeners(); From fe017cb760599ad3ad5810940b4d9f947862aaf9 Mon Sep 17 00:00:00 2001 From: Nitzan Savion Date: Thu, 23 May 2024 23:41:12 +0300 Subject: [PATCH 3/9] fix(BaseRedis): remove listeners on destroy and stop pooling when no subscription --- packages/brokers/src/brokers/redis/BaseRedis.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/brokers/src/brokers/redis/BaseRedis.ts b/packages/brokers/src/brokers/redis/BaseRedis.ts index e7c3bf51cef2..320c3b6151ab 100644 --- a/packages/brokers/src/brokers/redis/BaseRedis.ts +++ b/packages/brokers/src/brokers/redis/BaseRedis.ts @@ -138,7 +138,7 @@ export abstract class BaseRedisBroker> this.listening = true; - while (true) { + while (this.subscribedEvents.size > 0) { try { const data = await this.streamReadClient.xreadgroupBuffer( 'GROUP', @@ -187,6 +187,7 @@ export abstract class BaseRedisBroker> public async destroy() { this.streamReadClient.disconnect(); this.redisClient.disconnect(); + this.removeAllListeners(); } /** From f89b286d0384459bf091a35d0243e2e8cdf070d9 Mon Sep 17 00:00:00 2001 From: Nitzan Savion Date: Wed, 29 May 2024 00:58:56 +0300 Subject: [PATCH 4/9] refactor(BaseRedis): group as constructor param and cleanup subscribers --- packages/brokers/src/brokers/Broker.ts | 4 +-- .../brokers/src/brokers/redis/BaseRedis.ts | 30 ++++++++++++------- 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/packages/brokers/src/brokers/Broker.ts b/packages/brokers/src/brokers/Broker.ts index 49b0b223751f..caed1a705450 100644 --- a/packages/brokers/src/brokers/Broker.ts +++ b/packages/brokers/src/brokers/Broker.ts @@ -44,11 +44,11 @@ export interface IBaseBroker> { /** * Subscribes to the given events, grouping them by the given group name */ - subscribe(group: string, events: (keyof TEvents)[]): Promise; + subscribe(events: (keyof TEvents)[]): Promise; /** * Unsubscribes from the given events - it's required to pass the same group name as when subscribing for proper cleanup */ - unsubscribe(group: string, events: (keyof TEvents)[]): Promise; + unsubscribe(events: (keyof TEvents)[]): Promise; } export interface IPubSubBroker> diff --git a/packages/brokers/src/brokers/redis/BaseRedis.ts b/packages/brokers/src/brokers/redis/BaseRedis.ts index 320c3b6151ab..bbef541ace59 100644 --- a/packages/brokers/src/brokers/redis/BaseRedis.ts +++ b/packages/brokers/src/brokers/redis/BaseRedis.ts @@ -23,10 +23,19 @@ export interface RedisBrokerOptions extends BaseBrokerOptions { * How long to block for messages when polling */ blockTimeout?: number; + + /** + * Consumer group name to use for this broker + * + * @see {@link https://redis.io/commands/xreadgroup/} + */ + group: string; + /** * Max number of messages to poll at once */ maxChunk?: number; + /** * Unique consumer name. * @@ -43,7 +52,7 @@ export const DefaultRedisBrokerOptions = { name: randomBytes(20).toString('hex'), maxChunk: 10, blockTimeout: 5_000, -} as const satisfies Required; +} as const satisfies Required>; /** * Helper class with shared Redis logic @@ -93,13 +102,13 @@ export abstract class BaseRedisBroker> /** * {@inheritDoc IBaseBroker.subscribe} */ - public async subscribe(group: string, events: (keyof TEvents)[]): Promise { + public async subscribe(events: (keyof TEvents)[]): Promise { await Promise.all( // @ts-expect-error: Intended events.map(async (event) => { this.subscribedEvents.add(event as string); try { - return await this.redisClient.xgroup('CREATE', event as string, group, 0, 'MKSTREAM'); + return await this.redisClient.xgroup('CREATE', event as string, this.options.group, 0, 'MKSTREAM'); } catch (error) { if (!(error instanceof ReplyError)) { throw error; @@ -107,18 +116,18 @@ export abstract class BaseRedisBroker> } }), ); - void this.listen(group); + void this.listen(); } /** * {@inheritDoc IBaseBroker.unsubscribe} */ - public async unsubscribe(group: string, events: (keyof TEvents)[]): Promise { + public async unsubscribe(events: (keyof TEvents)[]): Promise { const commands: unknown[][] = Array.from({ length: events.length * 2 }); for (let idx = 0; idx < commands.length; idx += 2) { const event = events[idx / 2]; - commands[idx] = ['xgroup', 'delconsumer', event as string, group, this.options.name]; - commands[idx + 1] = ['xcleangroup', event as string, group]; + commands[idx] = ['xgroup', 'delconsumer', event as string, this.options.group, this.options.name]; + commands[idx + 1] = ['xcleangroup', event as string, this.options.group]; } await this.redisClient.pipeline(commands).exec(); @@ -131,7 +140,7 @@ export abstract class BaseRedisBroker> /** * Begins polling for events, firing them to {@link BaseRedisBroker.listen} */ - protected async listen(group: string): Promise { + protected async listen(): Promise { if (this.listening) { return; } @@ -142,7 +151,7 @@ export abstract class BaseRedisBroker> try { const data = await this.streamReadClient.xreadgroupBuffer( 'GROUP', - group, + this.options.group, this.options.name, 'COUNT', String(this.options.maxChunk), @@ -169,7 +178,7 @@ export abstract class BaseRedisBroker> continue; } - this.emitEvent(id, group, event.toString('utf8'), this.options.decode(data)); + this.emitEvent(id, this.options.group, event.toString('utf8'), this.options.decode(data)); } } } catch (error) { @@ -185,6 +194,7 @@ export abstract class BaseRedisBroker> * Destroys the broker, closing all connections */ public async destroy() { + await this.unsubscribe(Array.from(this.subscribedEvents)); this.streamReadClient.disconnect(); this.redisClient.disconnect(); this.removeAllListeners(); From 4524ae9d341aaab5e5300b1635d13f0c55378f69 Mon Sep 17 00:00:00 2001 From: Nitzan Savion Date: Wed, 29 May 2024 01:08:58 +0300 Subject: [PATCH 5/9] chore(RPCRedis): group --- packages/brokers/src/brokers/redis/RPCRedis.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/brokers/src/brokers/redis/RPCRedis.ts b/packages/brokers/src/brokers/redis/RPCRedis.ts index 0e7662ac735c..5475654902a9 100644 --- a/packages/brokers/src/brokers/redis/RPCRedis.ts +++ b/packages/brokers/src/brokers/redis/RPCRedis.ts @@ -24,7 +24,7 @@ export interface RPCRedisBrokerOptions extends RedisBrokerOptions { export const DefaultRPCRedisBrokerOptions = { ...DefaultRedisBrokerOptions, timeout: 5_000, -} as const satisfies Required; +} as const satisfies Required>; /** * RPC broker powered by Redis @@ -114,11 +114,11 @@ export class RPCRedisBroker, TResponses exte }); } - protected emitEvent(id: Buffer, group: string, event: string, data: unknown) { + protected emitEvent(id: Buffer, event: string, data: unknown) { const payload: { ack(): Promise; data: unknown; reply(data: unknown): Promise } = { data, ack: async () => { - await this.redisClient.xack(event, group, id); + await this.redisClient.xack(event, this.options.group, id); }, reply: async (data) => { await this.redisClient.publish(`${event}:${id.toString()}`, this.options.encode(data)); From 98778995448048bf0da0cfd1f37eb057e7688875 Mon Sep 17 00:00:00 2001 From: Nitzan Savion <59604278+nizans@users.noreply.github.com> Date: Wed, 29 May 2024 10:29:47 +0300 Subject: [PATCH 6/9] Update packages/brokers/src/brokers/Broker.ts --- packages/brokers/src/brokers/Broker.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/brokers/src/brokers/Broker.ts b/packages/brokers/src/brokers/Broker.ts index caed1a705450..2ba8d7173d34 100644 --- a/packages/brokers/src/brokers/Broker.ts +++ b/packages/brokers/src/brokers/Broker.ts @@ -46,7 +46,7 @@ export interface IBaseBroker> { */ subscribe(events: (keyof TEvents)[]): Promise; /** - * Unsubscribes from the given events - it's required to pass the same group name as when subscribing for proper cleanup + * Unsubscribes from the given events */ unsubscribe(events: (keyof TEvents)[]): Promise; } From 4f4db17a8408379c7645daf1fd7d077f7d2488fd Mon Sep 17 00:00:00 2001 From: Nitzan Savion <59604278+nizans@users.noreply.github.com> Date: Wed, 29 May 2024 10:30:43 +0300 Subject: [PATCH 7/9] Update packages/brokers/src/brokers/Broker.ts --- packages/brokers/src/brokers/Broker.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/brokers/src/brokers/Broker.ts b/packages/brokers/src/brokers/Broker.ts index 2ba8d7173d34..889e50c0483d 100644 --- a/packages/brokers/src/brokers/Broker.ts +++ b/packages/brokers/src/brokers/Broker.ts @@ -42,7 +42,7 @@ export type ToEventMap< export interface IBaseBroker> { /** - * Subscribes to the given events, grouping them by the given group name + * Subscribes to the given events */ subscribe(events: (keyof TEvents)[]): Promise; /** From b1b1a6f915d821401f732a4a010b59238993bf0a Mon Sep 17 00:00:00 2001 From: Nitzan Savion <59604278+nizans@users.noreply.github.com> Date: Wed, 29 May 2024 10:35:45 +0300 Subject: [PATCH 8/9] Update packages/brokers/src/brokers/redis/BaseRedis.ts Removed `removeAllListeners` from destroy --- packages/brokers/src/brokers/redis/BaseRedis.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/brokers/src/brokers/redis/BaseRedis.ts b/packages/brokers/src/brokers/redis/BaseRedis.ts index bbef541ace59..62ee8ef79792 100644 --- a/packages/brokers/src/brokers/redis/BaseRedis.ts +++ b/packages/brokers/src/brokers/redis/BaseRedis.ts @@ -197,7 +197,6 @@ export abstract class BaseRedisBroker> await this.unsubscribe(Array.from(this.subscribedEvents)); this.streamReadClient.disconnect(); this.redisClient.disconnect(); - this.removeAllListeners(); } /** From 3fa9baa87567a0eacfe17d6bebafd2c9534609ca Mon Sep 17 00:00:00 2001 From: Nitzan Savion Date: Wed, 29 May 2024 16:08:22 +0300 Subject: [PATCH 9/9] chore(BaseRedis): destroy unsubscribe spread array --- packages/brokers/src/brokers/redis/BaseRedis.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/brokers/src/brokers/redis/BaseRedis.ts b/packages/brokers/src/brokers/redis/BaseRedis.ts index 62ee8ef79792..5ad4f04830bb 100644 --- a/packages/brokers/src/brokers/redis/BaseRedis.ts +++ b/packages/brokers/src/brokers/redis/BaseRedis.ts @@ -194,7 +194,7 @@ export abstract class BaseRedisBroker> * Destroys the broker, closing all connections */ public async destroy() { - await this.unsubscribe(Array.from(this.subscribedEvents)); + await this.unsubscribe([...this.subscribedEvents]); this.streamReadClient.disconnect(); this.redisClient.disconnect(); }