Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
611ea7b
feat(rabbitmq): adds a message batching mechanism for RabbitMQ handlers
ckfngod Sep 10, 2024
9119827
refactor(rabbitmq): update error handling behaviour + clean up Subscr…
ckfngod Sep 17, 2024
b89249f
refactor(rabbitmq): add more context for resumeConsumer error
ckfngod Sep 18, 2024
01f9dcc
refactor(rabbitmq): add batch option validations / update JSDoc + ref…
ckfngod Sep 18, 2024
e4e0f43
fix(rabbitmq): prevent setting batch size of 1
ckfngod Sep 18, 2024
7a8226b
test(rabbitmq): adds integration tests for message batching behaviour
ckfngod Sep 18, 2024
6e709a0
test(rabbitmq): adds more integration tests for batching behaviour
ckfngod Sep 19, 2024
774d512
refactor(rabbitmq): refactor ConsumerHandler type
ckfngod Sep 19, 2024
b8992ba
refactor(rabbitmq): fall back on top-level `errorHandler` for batch m…
ckfngod Sep 20, 2024
dfb1e13
docs(rabbitmq): adds documentation for the message batching behaviour
ckfngod Sep 20, 2024
1fe3daa
refactor(rabbitmq): remove `handleMessage` and `handleMessages`
ckfngod Sep 20, 2024
df41f5f
refactor(rabbitmq): use common `setupChannel` func in `createRpc`
ckfngod Sep 20, 2024
e7be878
refactor(rabbitmq): improve tests + minor clean up
ckfngod Sep 20, 2024
8898269
fix(rabbitmq): fix e2e test uri
ckfngod Sep 20, 2024
c81e1f4
refactor(rabbitmq): pr feedback
ckfngod Sep 23, 2024
3899070
Merge branch 'master' into feature/rabbitmq-message-batching
underfisk Sep 23, 2024
6bed50c
refactor(rabbitmq): remove precondition error code comment
ckfngod Sep 23, 2024
26f271c
Merge branch 'master' into feature/rabbitmq-message-batching
underfisk Sep 24, 2024
503c827
test(rabbitmq): fix batching integration test flakiness
ckfngod Sep 24, 2024
393eb17
Merge branch 'master' into feature/rabbitmq-message-batching
underfisk Sep 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 205 additions & 9 deletions packages/rabbitmq/src/amqp/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,23 @@ const DIRECT_REPLY_QUEUE = 'amq.rabbitmq.reply-to';
export type ConsumerTag = string;

export type SubscriberHandler<T = unknown> = (
msg: (T | undefined) | (T | undefined)[],
rawMessage?: ConsumeMessage | ConsumeMessage[],
headers?: any | any[]
) => Promise<SubscribeResponse>;

export type SingleSubscriberHandler<T = unknown> = (
msg: T | undefined,
rawMessage?: ConsumeMessage,
headers?: any
) => Promise<SubscribeResponse>;

export type BatchSubscriberHandler<T = unknown> = (
msg: (T | undefined)[],
rawMessage?: ConsumeMessage[],
headers?: any[]
) => Promise<SubscribeResponse>;

export interface CorrelationMessage {
correlationId: string;
requestId?: string;
Expand All @@ -75,6 +87,15 @@ export type ConsumerHandler<T, U> =
headers?: any
) => Promise<SubscribeResponse>;
})
| (BaseConsumerHandler & {
type: 'subscribe-batch';
msgOptions: MessageHandlerOptions;
handler: (
msg: (T | undefined)[],
rawMessage?: ConsumeMessage[],
headers?: any[]
) => Promise<SubscribeResponse>;
})
| (BaseConsumerHandler & {
type: 'rpc';
rpcOptions: MessageHandlerOptions;
Expand Down Expand Up @@ -460,6 +481,32 @@ export class AmqpConnection {
originalHandlerName = 'unknown',
consumeOptions?: ConsumeOptions
): Promise<ConsumerTag> {
if (msgOptions.batchOptions) {
return this.setupBatchSubscriberChannel<T>(
handler,
msgOptions,
channel,
originalHandlerName,
consumeOptions
);
} else {
return this.setupSingleSubscriberChannel<T>(
handler,
msgOptions,
channel,
originalHandlerName,
consumeOptions
);
}
}

private async setupSingleSubscriberChannel<T>(
handler: SingleSubscriberHandler<T>,
msgOptions: MessageHandlerOptions,
channel: ConfirmChannel,
originalHandlerName = 'unknown',
consumeOptions?: ConsumeOptions
) {
const queue = await this.setupQueue(msgOptions, channel);

const { consumerTag }: { consumerTag: ConsumerTag } = await channel.consume(
Expand Down Expand Up @@ -520,6 +567,112 @@ export class AmqpConnection {
return consumerTag;
}

private async setupBatchSubscriberChannel<T>(
handler: BatchSubscriberHandler<T>,
msgOptions: MessageHandlerOptions,
channel: ConfirmChannel,
originalHandlerName = 'unknown',
consumeOptions?: ConsumeOptions
): Promise<ConsumerTag> {
const queue = await this.setupQueue(msgOptions, channel);
const batchSize = msgOptions.batchOptions?.size ?? 10;
const batchTimeout = msgOptions.batchOptions?.timeout ?? 200;

let batchMsgs: ConsumeMessage[] = [];
let batchTimer: NodeJS.Timeout;
let inflightBatchHandler: () => Promise<void>;

const { consumerTag }: { consumerTag: ConsumerTag } = await channel.consume(
queue,
this.wrapConsumer(async (msg) => {
if (isNull(msg)) {
return;
}

const handleBatch = async () => {
const batchMsgsToProcess = batchMsgs;
batchMsgs = [];

try {
const response = await this.handleMessages(
handler,
batchMsgsToProcess,
{
allowNonJsonMessages: msgOptions.allowNonJsonMessages,
deserializer: msgOptions.deserializer,
}
);

if (response instanceof Nack) {
for (const msg of batchMsgsToProcess) {
channel.nack(msg, false, response.requeue);
}
return;
}

// developers should be responsible to avoid subscribers that return therefore
// the request will be acknowledged
if (response) {
this.logger.warn(
`Received response: [${this.config.serializer(
response
)}] from subscribe handler [${originalHandlerName}]. Subscribe handlers should only return void`
);
}

for (const msg of batchMsgsToProcess) {
channel.ack(msg);
}
} catch (e) {
const errorHandler =
msgOptions.errorHandler ||
getHandlerForLegacyBehavior(
msgOptions.errorBehavior ||
this.config.defaultSubscribeErrorBehavior
);

for (const msg of batchMsgsToProcess) {
await errorHandler(channel, msg, e);
}
}
};

batchMsgs.push(msg);

if (batchMsgs.length === 1) {
console.log('received first message, starting timer');
// Wrapped in a Promise to ensure outstanding message processing logic is aware.
await new Promise<void>((resolve) => {
const batchHandler = async () => {
await handleBatch();
resolve();
};
batchTimer = setTimeout(batchHandler, batchTimeout);
inflightBatchHandler = batchHandler;
});
} else if (batchMsgs.length === batchSize) {
console.log('processing full batch');
clearTimeout(batchTimer);
await inflightBatchHandler();
} else {
console.log('not yet full batch, refreshing timer');
batchTimer.refresh();
}
}),
consumeOptions
);

this.registerConsumerForQueue({
type: 'subscribe-batch',
consumerTag,
handler,
msgOptions,
channel,
});

return consumerTag;
}

public async createRpc<T, U>(
handler: (
msg: T | undefined,
Expand Down Expand Up @@ -643,12 +796,7 @@ export class AmqpConnection {
return this._managedChannel.publish(exchange, routingKey, buffer, options);
}

private handleMessage<T, U>(
handler: (
msg: T | undefined,
rawMessage?: ConsumeMessage,
headers?: any
) => Promise<U>,
private deserializeMessage<T>(
msg: ConsumeMessage,
options: {
allowNonJsonMessages?: boolean;
Expand Down Expand Up @@ -676,7 +824,47 @@ export class AmqpConnection {
headers = msg.properties.headers;
}

return handler(message, msg, headers);
return { message, headers };
}

private handleMessage<T, U>(
handler: (
msg: T | undefined,
rawMessage?: ConsumeMessage,
headers?: any
) => Promise<U>,
msg: ConsumeMessage,
options: {
allowNonJsonMessages?: boolean;
deserializer?: MessageDeserializer;
}
) {
const result = this.deserializeMessage<T>(msg, options);
return handler(result.message, msg, result.headers);
}

private handleMessages<T, U>(
handler: (
msg: (T | undefined)[],
rawMessage?: ConsumeMessage[],
headers?: any[]
) => Promise<U>,
msgs: ConsumeMessage[],
options: {
allowNonJsonMessages?: boolean;
deserializer?: MessageDeserializer;
}
) {
const messages: (T | undefined)[] = [];
const headers: any[] = [];

for (const msg of msgs) {
const result = this.deserializeMessage<T>(msg, options);
messages.push(result.message);
headers.push(result.headers);
}

return handler(messages, msgs, headers);
}

private async setupQueue(
Expand Down Expand Up @@ -823,12 +1011,20 @@ export class AmqpConnection {
consumer.rpcOptions,
consumer.channel
);
} else {
newConsumerTag = await this.setupSubscriberChannel<T>(
} else if (consumer.type === 'subscribe') {
newConsumerTag = await this.setupSingleSubscriberChannel<T>(
consumer.handler,
consumer.msgOptions,
consumer.channel
);
} else if (consumer.type === 'subscribe-batch') {
newConsumerTag = await this.setupBatchSubscriberChannel<T>(
consumer.handler,
consumer.msgOptions,
consumer.channel
);
} else {
throw new Error('Unable to resume consumer, unexpected consumer type.');
}
// A new consumerTag was created, remove old
this.unregisterConsumerForQueue(consumerTag);
Expand Down
19 changes: 19 additions & 0 deletions packages/rabbitmq/src/rabbitmq.interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ export interface MessageHandlerOptions {
* If set, will override the module's default deserializer.
*/
deserializer?: MessageDeserializer;

/**
* Enables consumer-side batching.
*/
batchOptions?: BatchOptions;
}

export interface ConnectionInitOptions {
Expand Down Expand Up @@ -195,3 +200,17 @@ export interface RabbitMQChannelConfig {
*/
default?: boolean;
}

interface BatchOptions {
/**
* The number of messages to accumulate before calling the message handler.
*
* This should be smaller than the channel prefetch.
*/
size: number;

/**
* The time to wait, in milliseconds, for additional messages before returning a partial batch.
*/
timeout?: number;
}