diff --git a/src/event/event.decorators.ts b/src/event/event.decorators.ts index 5559b79..0de0383 100644 --- a/src/event/event.decorators.ts +++ b/src/event/event.decorators.ts @@ -1,8 +1,9 @@ import { applyDecorators } from "@nestjs/common"; import { OnEvent } from "@nestjs/event-emitter"; import { extendArrayMetadata } from "@nestjs/common/utils/extend-metadata.util"; +import { OnEventOptions } from "@nestjs/event-emitter/dist/interfaces"; -export const RABBITMQ_EVENT = "RABBITMQ_EVENT"; +export const RABBITMQ_MESSAGE_EVENT = "RABBITMQ_MESSAGE_EVENT"; export interface EventMetadata { event: string; @@ -17,8 +18,8 @@ const AddMetadata = (metadataKey: string, metadata: any) => { return factory; }; -export const Event = (event: string): MethodDecorator => +export const OnMessageEvent = (event: string, options?: OnEventOptions): MethodDecorator => applyDecorators( - AddMetadata(RABBITMQ_EVENT, { event }), - OnEvent(event) + AddMetadata(RABBITMQ_MESSAGE_EVENT, { event }), + OnEvent(event, options) ); diff --git a/src/event/event.emitter.ts b/src/event/event.emitter.ts index 09395f5..3bc50db 100644 --- a/src/event/event.emitter.ts +++ b/src/event/event.emitter.ts @@ -13,7 +13,7 @@ export class EventEmitter { } public async emit(event: string, payload: T) { - const exchange = this.config.getOrThrow('rabbitmq.exchange'); + const exchange = this.config.getOrThrow('rabbitmq.exchangeName'); await this.rmq.publish(exchange, event, payload, { headers: { 'X-Message-Type': 'event', diff --git a/src/event/event.explorer.ts b/src/event/event.explorer.ts index 37d119e..0753652 100644 --- a/src/event/event.explorer.ts +++ b/src/event/event.explorer.ts @@ -1,7 +1,7 @@ import { Injectable, Type } from "@nestjs/common"; import { DiscoveryService, MetadataScanner, Reflector } from "@nestjs/core"; import { InstanceWrapper } from "@nestjs/core/injector/instance-wrapper"; -import { RABBITMQ_EVENT, EventMetadata } from "./event.decorators"; +import { RABBITMQ_MESSAGE_EVENT, EventMetadata } from "./event.decorators"; import { castArray } from "lodash"; @Injectable() @@ -47,7 +47,7 @@ export class EventExplorer { return []; } - const metadata = this.reflector.get(RABBITMQ_EVENT, target); + const metadata = this.reflector.get(RABBITMQ_MESSAGE_EVENT, target); return castArray(metadata ?? []); } } diff --git a/src/event/event.listener.ts b/src/event/event.listener.ts index 7a42004..dcf3976 100644 --- a/src/event/event.listener.ts +++ b/src/event/event.listener.ts @@ -1,7 +1,7 @@ import { Injectable } from "@nestjs/common"; import { EventEmitter2 } from "@nestjs/event-emitter"; -import { MessageHandler } from "../rabbitmq.decorators"; -import { MessageHandleEvent } from "../rabbitmq.service"; +import { OnMessageReceived } from "../rabbitmq.decorators"; +import { MessageReceivedEvent } from "../rabbitmq.service"; @Injectable() export class EventListener { @@ -12,14 +12,16 @@ export class EventListener { // } - @MessageHandler() - async onMessages({ message, headers }: MessageHandleEvent) { + @OnMessageReceived() + async onMessages({ message, headers, raw }: MessageReceivedEvent) { // If the message type is not an event, ignore it if (headers["X-Message-Type"] !== "event") { return; } + this.events.emit('rabbitmq:message:processing', { message, headers, raw }); this.events.emit(headers["X-Event-Name"], message); + this.events.emit('rabbitmq:message:processed', { message, headers, raw }); } } diff --git a/src/index.ts b/src/index.ts index 68437dd..c80c023 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,11 +1,12 @@ -export { Event } from './event/event.decorators'; +export { OnMessageEvent } from './event/event.decorators'; export { EventEmitter } from './event/event.emitter'; -export { Method, InjectClient } from './rpc/rpc.decorators'; +export { RemoteProcedure, InjectClient } from './rpc/rpc.decorators'; export { RPCClient as Client } from './rpc/rpc.client'; export { RabbitMQModule } from './rabbitmq.module'; -export { - RabbitMQService, MessagePublishingEvent, MessageReceivedEvent, MessageHandleEvent, +export { RabbitMQService } from './rabbitmq.service'; +export type { + MessagePublishingEvent, MessageReceivedEvent, MessageProcessingEvent, MessageProcessedEvent, } from './rabbitmq.service'; export { - MessagePublishing, MessageReceived, MessageHandler, + OnMessagePublishing, OnMessageReceived, OnMessageProcessing, OnMessageProcessed, } from './rabbitmq.decorators'; diff --git a/src/rabbitmq.config.ts b/src/rabbitmq.config.ts index 1c17404..e31513b 100644 --- a/src/rabbitmq.config.ts +++ b/src/rabbitmq.config.ts @@ -1,11 +1,25 @@ import { registerAs } from "@nestjs/config"; -export default registerAs("rabbitmq", () => ({ - host: process.env.RABBITMQ_HOST, - port: process.env.RABBITMQ_PORT ?? "5672", +export interface RabbitMQConfig { + host: string; + port: number; + username?: string; + password?: string; + vhost: string; + exchangeName: string; + exchangeType: string; + queueName: string; + prefetchCount: number; +} + +export default registerAs("rabbitmq", () => ({ + host: process.env.RABBITMQ_HOST ?? 'localhost', + port: parseInt(process.env.RABBITMQ_PORT ?? '5672'), username: process.env.RABBITMQ_USERNAME, password: process.env.RABBITMQ_PASSWORD, vhost: process.env.RABBITMQ_VHOST ?? "/", - exchange: process.env.RABBITMQ_EXCHANGE, - queue: process.env.RABBITMQ_QUEUE + exchangeName: process.env.RABBITMQ_EXCHANGE_NAME ?? process.env.RABBITMQ_EXCHANGE ?? '', + exchangeType: process.env.RABBITMQ_EXCHANGE_TYPE ?? 'direct', + queueName: process.env.RABBITMQ_QUEUE_NAME ?? process.env.RABBITMQ_QUEUE ?? 'default', + prefetchCount: parseInt(process.env.RABBITMQ_PREFETCH_COUNT ?? '1', 10), })); diff --git a/src/rabbitmq.decorators.ts b/src/rabbitmq.decorators.ts index be06de0..d40e507 100644 --- a/src/rabbitmq.decorators.ts +++ b/src/rabbitmq.decorators.ts @@ -1,10 +1,13 @@ import { OnEvent } from "@nestjs/event-emitter"; -export const MessagePublishing = (): MethodDecorator => - OnEvent('rabbitmq:publishing'); +export const OnMessagePublishing = (): MethodDecorator => + OnEvent('rabbitmq:message:publishing'); -export const MessageReceived = (): MethodDecorator => - OnEvent('rabbitmq:received'); +export const OnMessageReceived = (): MethodDecorator => + OnEvent('rabbitmq:message:received'); -export const MessageHandler = (): MethodDecorator => - OnEvent('rabbitmq:handle'); +export const OnMessageProcessing = (): MethodDecorator => + OnEvent('rabbitmq:message:processing'); + +export const OnMessageProcessed = (): MethodDecorator => + OnEvent('rabbitmq:message:processed'); diff --git a/src/rabbitmq.module.ts b/src/rabbitmq.module.ts index aabebc3..f5648ed 100644 --- a/src/rabbitmq.module.ts +++ b/src/rabbitmq.module.ts @@ -41,28 +41,31 @@ export class RabbitMQModule { const user = config.getOrThrow("rabbitmq.username"); const pass = config.getOrThrow("rabbitmq.password"); - const queue = config.getOrThrow("rabbitmq.queue"); - const exchange = config.getOrThrow("rabbitmq.exchange"); + const queueName = config.getOrThrow("rabbitmq.queueName"); + const exchangeName = config.getOrThrow("rabbitmq.exchangeName"); + const exchangeType = config.getOrThrow("rabbitmq.exchangeType"); + + const prefetchCount = config.getOrThrow("rabbitmq.prefetchCount"); return { uri: `amqp://${user}:${pass}@${host}:${port}`, channels: { "ch-1": { - prefetchCount: 1, + prefetchCount, default: true, } }, exchanges: [ { - name: exchange, - type: "direct", - createExchangeIfNotExists: true + name: exchangeName, + type: exchangeType, + createExchangeIfNotExists: true, } ], queues: [ { - name: queue, - createQueueIfNotExists: true + name: queueName, + createQueueIfNotExists: true, } ], enableDirectReplyTo: true, diff --git a/src/rabbitmq.service.ts b/src/rabbitmq.service.ts index 9a45202..5c7fb08 100644 --- a/src/rabbitmq.service.ts +++ b/src/rabbitmq.service.ts @@ -18,7 +18,13 @@ export interface MessageReceivedEvent { raw: ConsumeMessage; } -export interface MessageHandleEvent { +export interface MessageProcessingEvent { + message: M; + headers: H; + raw: ConsumeMessage; +} + +export interface MessageProcessedEvent { message: M; headers: H; raw: ConsumeMessage; @@ -39,7 +45,7 @@ export class RabbitMQService implements OnModuleInit { public async publish(exchange: string, routingKey: string, message: M, options?: Options.Publish): Promise { const { headers } = options ?? {}; const event: MessagePublishingEvent = { exchange, routingKey, message, headers }; - this.events.emit('rabbitmq:publishing', event); + this.events.emit('rabbitmq:message:publishing', event); return this.connection.publish(exchange, routingKey, message, options); } @@ -47,7 +53,7 @@ export class RabbitMQService implements OnModuleInit { public async request(options: RequestOptions): Promise { const { exchange, routingKey, payload: message, headers } = options; const event: MessagePublishingEvent = { exchange, routingKey, message, headers }; - this.events.emit('rabbitmq:publishing', event); + this.events.emit('rabbitmq:message:publishing', event); return this.connection.request(options); } @@ -55,8 +61,8 @@ export class RabbitMQService implements OnModuleInit { async onModuleInit(): Promise { const events = this.explorer.getEvents(); - const queue = this.config.getOrThrow("rabbitmq.queue"); - const exchange = this.config.getOrThrow("rabbitmq.exchange"); + const queue = this.config.getOrThrow("rabbitmq.queueName"); + const exchange = this.config.getOrThrow("rabbitmq.exchangeName"); const channel = this.connection.channel; for (const event of events) { @@ -64,8 +70,7 @@ export class RabbitMQService implements OnModuleInit { } const handler = async (message: any, raw?: ConsumeMessage, headers?: any) => { - this.events.emit("rabbitmq:received", { message, headers, raw } as MessageReceivedEvent); - this.events.emit("rabbitmq:handle", { message, headers, raw } as MessageHandleEvent); + this.events.emit("rabbitmq:message:received", { message, headers, raw } as MessageReceivedEvent); }; await this.connection.createSubscriber(handler, { queue }, "messageHandler"); diff --git a/src/rpc/rpc.decorators.ts b/src/rpc/rpc.decorators.ts index 6af5fc1..42d3fd5 100644 --- a/src/rpc/rpc.decorators.ts +++ b/src/rpc/rpc.decorators.ts @@ -1,14 +1,14 @@ import { Inject, SetMetadata } from '@nestjs/common'; import { getClientToken } from "./rpc.utils"; -export const RABBITMQ_RPC = 'RABBITMQ_RPC'; +export const RABBITMQ_REMOTE_PROCEDURE = 'RABBITMQ_REMOTE_PROCEDURE'; export interface MethodMetadata { name?: string; } -export const Method = (name?: string): MethodDecorator => - SetMetadata(RABBITMQ_RPC, { name }); +export const RemoteProcedure = (name?: string): MethodDecorator => + SetMetadata(RABBITMQ_REMOTE_PROCEDURE, { name }); export const InjectClient = (name: string): ParameterDecorator => Inject(getClientToken(name)); diff --git a/src/rpc/rpc.explorer.ts b/src/rpc/rpc.explorer.ts index 59f1f24..557be3b 100644 --- a/src/rpc/rpc.explorer.ts +++ b/src/rpc/rpc.explorer.ts @@ -1,7 +1,7 @@ import { Injectable, Type } from "@nestjs/common"; import { DiscoveryService, MetadataScanner, Reflector } from "@nestjs/core"; import { InstanceWrapper } from "@nestjs/core/injector/instance-wrapper"; -import { MethodMetadata, RABBITMQ_RPC } from "./rpc.decorators"; +import { MethodMetadata, RABBITMQ_REMOTE_PROCEDURE } from "./rpc.decorators"; import { castArray } from "lodash"; export interface MethodMeta { @@ -57,7 +57,7 @@ export class RPCExplorer { return []; } - const metadata = this.reflector.get(RABBITMQ_RPC, target); + const metadata = this.reflector.get(RABBITMQ_REMOTE_PROCEDURE, target); return castArray(metadata ?? []); } } diff --git a/src/rpc/rpc.listener.ts b/src/rpc/rpc.listener.ts index fc5106e..0271fd0 100644 --- a/src/rpc/rpc.listener.ts +++ b/src/rpc/rpc.listener.ts @@ -1,8 +1,9 @@ import { Injectable, OnModuleInit } from "@nestjs/common"; import { RPCExplorer } from "./rpc.explorer"; import { JSONRPCServer } from "json-rpc-2.0"; -import { MessageHandleEvent, RabbitMQService } from "../rabbitmq.service"; -import { MessageHandler } from "../rabbitmq.decorators"; +import { MessageReceivedEvent, RabbitMQService } from "../rabbitmq.service"; +import { OnMessageReceived } from "../rabbitmq.decorators"; +import { EventEmitter2 } from "@nestjs/event-emitter"; @Injectable() export class RPCListener implements OnModuleInit { @@ -12,6 +13,7 @@ export class RPCListener implements OnModuleInit { constructor( private readonly explorer: RPCExplorer, private readonly rmq: RabbitMQService, + private readonly events: EventEmitter2, ) { this.server = new JSONRPCServer(); } @@ -23,14 +25,16 @@ export class RPCListener implements OnModuleInit { } } - @MessageHandler() - async onMessage({ message, headers, raw }: MessageHandleEvent) { + @OnMessageReceived() + async onMessage({ message, headers, raw }: MessageReceivedEvent) { if (headers['X-Message-Type'] !== 'request') { return; } + this.events.emit('rabbitmq:message:processing', { message, headers, raw }); const response = await this.server.receive(message); + this.events.emit('rabbitmq:message:processed', { message, headers, raw }); if (!response) { return;