Skip to content

feat: many updates #8

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/event/event.decorators.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { applyDecorators } from "@nestjs/common";
import { OnEvent } from "@nestjs/event-emitter";
import { extendArrayMetadata } from "@nestjs/common/utils/extend-metadata.util";
import { OnEventOptions } from "@nestjs/event-emitter/dist/interfaces";

export const RABBITMQ_EVENT = "RABBITMQ_EVENT";
export const RABBITMQ_MESSAGE_EVENT = "RABBITMQ_MESSAGE_EVENT";

export interface EventMetadata {
event: string;
Expand All @@ -17,8 +18,8 @@ const AddMetadata = (metadataKey: string, metadata: any) => {
return factory;
};

export const Event = (event: string): MethodDecorator =>
export const OnMessageEvent = (event: string, options?: OnEventOptions): MethodDecorator =>
applyDecorators(
AddMetadata(RABBITMQ_EVENT, { event }),
OnEvent(event)
AddMetadata(RABBITMQ_MESSAGE_EVENT, { event }),
OnEvent(event, options)
);
2 changes: 1 addition & 1 deletion src/event/event.emitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export class EventEmitter {
}

public async emit<T extends object = any>(event: string, payload: T) {
const exchange = this.config.getOrThrow('rabbitmq.exchange');
const exchange = this.config.getOrThrow('rabbitmq.exchangeName');
await this.rmq.publish<T>(exchange, event, payload, {
headers: {
'X-Message-Type': 'event',
Expand Down
4 changes: 2 additions & 2 deletions src/event/event.explorer.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Injectable, Type } from "@nestjs/common";
import { DiscoveryService, MetadataScanner, Reflector } from "@nestjs/core";
import { InstanceWrapper } from "@nestjs/core/injector/instance-wrapper";
import { RABBITMQ_EVENT, EventMetadata } from "./event.decorators";
import { RABBITMQ_MESSAGE_EVENT, EventMetadata } from "./event.decorators";
import { castArray } from "lodash";

@Injectable()
Expand Down Expand Up @@ -47,7 +47,7 @@ export class EventExplorer {
return [];
}

const metadata = this.reflector.get(RABBITMQ_EVENT, target);
const metadata = this.reflector.get(RABBITMQ_MESSAGE_EVENT, target);
return castArray(metadata ?? []);
}
}
10 changes: 6 additions & 4 deletions src/event/event.listener.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Injectable } from "@nestjs/common";
import { EventEmitter2 } from "@nestjs/event-emitter";
import { MessageHandler } from "../rabbitmq.decorators";
import { MessageHandleEvent } from "../rabbitmq.service";
import { OnMessageReceived } from "../rabbitmq.decorators";
import { MessageReceivedEvent } from "../rabbitmq.service";

@Injectable()
export class EventListener {
Expand All @@ -12,14 +12,16 @@ export class EventListener {
//
}

@MessageHandler()
async onMessages({ message, headers }: MessageHandleEvent) {
@OnMessageReceived()
async onMessages({ message, headers, raw }: MessageReceivedEvent) {

// If the message type is not an event, ignore it
if (headers["X-Message-Type"] !== "event") {
return;
}

this.events.emit('rabbitmq:message:processing', { message, headers, raw });
this.events.emit(headers["X-Event-Name"], message);
this.events.emit('rabbitmq:message:processed', { message, headers, raw });
}
}
11 changes: 6 additions & 5 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
export { Event } from './event/event.decorators';
export { OnMessageEvent } from './event/event.decorators';
export { EventEmitter } from './event/event.emitter';
export { Method, InjectClient } from './rpc/rpc.decorators';
export { RemoteProcedure, InjectClient } from './rpc/rpc.decorators';
export { RPCClient as Client } from './rpc/rpc.client';
export { RabbitMQModule } from './rabbitmq.module';
export {
RabbitMQService, MessagePublishingEvent, MessageReceivedEvent, MessageHandleEvent,
export { RabbitMQService } from './rabbitmq.service';
export type {
MessagePublishingEvent, MessageReceivedEvent, MessageProcessingEvent, MessageProcessedEvent,
} from './rabbitmq.service';
export {
MessagePublishing, MessageReceived, MessageHandler,
OnMessagePublishing, OnMessageReceived, OnMessageProcessing, OnMessageProcessed,
} from './rabbitmq.decorators';
24 changes: 19 additions & 5 deletions src/rabbitmq.config.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,25 @@
import { registerAs } from "@nestjs/config";

export default registerAs("rabbitmq", () => ({
host: process.env.RABBITMQ_HOST,
port: process.env.RABBITMQ_PORT ?? "5672",
export interface RabbitMQConfig {
host: string;
port: number;
username?: string;
password?: string;
vhost: string;
exchangeName: string;
exchangeType: string;
queueName: string;
prefetchCount: number;
}

export default registerAs<RabbitMQConfig>("rabbitmq", () => ({
host: process.env.RABBITMQ_HOST ?? 'localhost',
port: parseInt(process.env.RABBITMQ_PORT ?? '5672'),
username: process.env.RABBITMQ_USERNAME,
password: process.env.RABBITMQ_PASSWORD,
vhost: process.env.RABBITMQ_VHOST ?? "/",
exchange: process.env.RABBITMQ_EXCHANGE,
queue: process.env.RABBITMQ_QUEUE
exchangeName: process.env.RABBITMQ_EXCHANGE_NAME ?? process.env.RABBITMQ_EXCHANGE ?? '',
exchangeType: process.env.RABBITMQ_EXCHANGE_TYPE ?? 'direct',
queueName: process.env.RABBITMQ_QUEUE_NAME ?? process.env.RABBITMQ_QUEUE ?? 'default',
prefetchCount: parseInt(process.env.RABBITMQ_PREFETCH_COUNT ?? '1', 10),
}));
15 changes: 9 additions & 6 deletions src/rabbitmq.decorators.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import { OnEvent } from "@nestjs/event-emitter";

export const MessagePublishing = (): MethodDecorator =>
OnEvent('rabbitmq:publishing');
export const OnMessagePublishing = (): MethodDecorator =>
OnEvent('rabbitmq:message:publishing');

export const MessageReceived = (): MethodDecorator =>
OnEvent('rabbitmq:received');
export const OnMessageReceived = (): MethodDecorator =>
OnEvent('rabbitmq:message:received');

export const MessageHandler = (): MethodDecorator =>
OnEvent('rabbitmq:handle');
export const OnMessageProcessing = (): MethodDecorator =>
OnEvent('rabbitmq:message:processing');

export const OnMessageProcessed = (): MethodDecorator =>
OnEvent('rabbitmq:message:processed');
19 changes: 11 additions & 8 deletions src/rabbitmq.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,28 +41,31 @@ export class RabbitMQModule {
const user = config.getOrThrow("rabbitmq.username");
const pass = config.getOrThrow("rabbitmq.password");

const queue = config.getOrThrow("rabbitmq.queue");
const exchange = config.getOrThrow("rabbitmq.exchange");
const queueName = config.getOrThrow("rabbitmq.queueName");
const exchangeName = config.getOrThrow("rabbitmq.exchangeName");
const exchangeType = config.getOrThrow("rabbitmq.exchangeType");

const prefetchCount = config.getOrThrow("rabbitmq.prefetchCount");

return {
uri: `amqp://${user}:${pass}@${host}:${port}`,
channels: {
"ch-1": {
prefetchCount: 1,
prefetchCount,
default: true,
}
},
exchanges: [
{
name: exchange,
type: "direct",
createExchangeIfNotExists: true
name: exchangeName,
type: exchangeType,
createExchangeIfNotExists: true,
}
],
queues: [
{
name: queue,
createQueueIfNotExists: true
name: queueName,
createQueueIfNotExists: true,
}
],
enableDirectReplyTo: true,
Expand Down
19 changes: 12 additions & 7 deletions src/rabbitmq.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@ export interface MessageReceivedEvent<M = any, H = any> {
raw: ConsumeMessage;
}

export interface MessageHandleEvent<M = any, H = any> {
export interface MessageProcessingEvent<M = any, H = any> {
message: M;
headers: H;
raw: ConsumeMessage;
}

export interface MessageProcessedEvent<M = any, H = any> {
message: M;
headers: H;
raw: ConsumeMessage;
Expand All @@ -39,33 +45,32 @@ export class RabbitMQService implements OnModuleInit {
public async publish<M = any>(exchange: string, routingKey: string, message: M, options?: Options.Publish): Promise<boolean> {
const { headers } = options ?? {};
const event: MessagePublishingEvent = { exchange, routingKey, message, headers };
this.events.emit('rabbitmq:publishing', event);
this.events.emit('rabbitmq:message:publishing', event);

return this.connection.publish<M>(exchange, routingKey, message, options);
}

public async request<T>(options: RequestOptions): Promise<T> {
const { exchange, routingKey, payload: message, headers } = options;
const event: MessagePublishingEvent = { exchange, routingKey, message, headers };
this.events.emit('rabbitmq:publishing', event);
this.events.emit('rabbitmq:message:publishing', event);

return this.connection.request<T>(options);
}

async onModuleInit(): Promise<void> {
const events = this.explorer.getEvents();

const queue = this.config.getOrThrow("rabbitmq.queue");
const exchange = this.config.getOrThrow("rabbitmq.exchange");
const queue = this.config.getOrThrow("rabbitmq.queueName");
const exchange = this.config.getOrThrow("rabbitmq.exchangeName");

const channel = this.connection.channel;
for (const event of events) {
await channel.bindQueue(queue, exchange, event);
}

const handler = async (message: any, raw?: ConsumeMessage, headers?: any) => {
this.events.emit("rabbitmq:received", { message, headers, raw } as MessageReceivedEvent);
this.events.emit("rabbitmq:handle", { message, headers, raw } as MessageHandleEvent);
this.events.emit("rabbitmq:message:received", { message, headers, raw } as MessageReceivedEvent);
};

await this.connection.createSubscriber(handler, { queue }, "messageHandler");
Expand Down
6 changes: 3 additions & 3 deletions src/rpc/rpc.decorators.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import { Inject, SetMetadata } from '@nestjs/common';
import { getClientToken } from "./rpc.utils";

export const RABBITMQ_RPC = 'RABBITMQ_RPC';
export const RABBITMQ_REMOTE_PROCEDURE = 'RABBITMQ_REMOTE_PROCEDURE';

export interface MethodMetadata {
name?: string;
}

export const Method = (name?: string): MethodDecorator =>
SetMetadata(RABBITMQ_RPC, { name });
export const RemoteProcedure = (name?: string): MethodDecorator =>
SetMetadata(RABBITMQ_REMOTE_PROCEDURE, { name });

export const InjectClient = (name: string): ParameterDecorator =>
Inject(getClientToken(name));
4 changes: 2 additions & 2 deletions src/rpc/rpc.explorer.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Injectable, Type } from "@nestjs/common";
import { DiscoveryService, MetadataScanner, Reflector } from "@nestjs/core";
import { InstanceWrapper } from "@nestjs/core/injector/instance-wrapper";
import { MethodMetadata, RABBITMQ_RPC } from "./rpc.decorators";
import { MethodMetadata, RABBITMQ_REMOTE_PROCEDURE } from "./rpc.decorators";
import { castArray } from "lodash";

export interface MethodMeta {
Expand Down Expand Up @@ -57,7 +57,7 @@ export class RPCExplorer {
return [];
}

const metadata = this.reflector.get(RABBITMQ_RPC, target);
const metadata = this.reflector.get(RABBITMQ_REMOTE_PROCEDURE, target);
return castArray(metadata ?? []);
}
}
12 changes: 8 additions & 4 deletions src/rpc/rpc.listener.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { Injectable, OnModuleInit } from "@nestjs/common";
import { RPCExplorer } from "./rpc.explorer";
import { JSONRPCServer } from "json-rpc-2.0";
import { MessageHandleEvent, RabbitMQService } from "../rabbitmq.service";
import { MessageHandler } from "../rabbitmq.decorators";
import { MessageReceivedEvent, RabbitMQService } from "../rabbitmq.service";
import { OnMessageReceived } from "../rabbitmq.decorators";
import { EventEmitter2 } from "@nestjs/event-emitter";

@Injectable()
export class RPCListener implements OnModuleInit {
Expand All @@ -12,6 +13,7 @@ export class RPCListener implements OnModuleInit {
constructor(
private readonly explorer: RPCExplorer,
private readonly rmq: RabbitMQService,
private readonly events: EventEmitter2,
) {
this.server = new JSONRPCServer();
}
Expand All @@ -23,14 +25,16 @@ export class RPCListener implements OnModuleInit {
}
}

@MessageHandler()
async onMessage({ message, headers, raw }: MessageHandleEvent) {
@OnMessageReceived()
async onMessage({ message, headers, raw }: MessageReceivedEvent) {

if (headers['X-Message-Type'] !== 'request') {
return;
}

this.events.emit('rabbitmq:message:processing', { message, headers, raw });
const response = await this.server.receive(message);
this.events.emit('rabbitmq:message:processed', { message, headers, raw });

if (!response) {
return;
Expand Down