Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
611ea7b
feat(rabbitmq): adds a message batching mechanism for RabbitMQ handlers
ckfngod Sep 10, 2024
9119827
refactor(rabbitmq): update error handling behaviour + clean up Subscr…
ckfngod Sep 17, 2024
b89249f
refactor(rabbitmq): add more context for resumeConsumer error
ckfngod Sep 18, 2024
01f9dcc
refactor(rabbitmq): add batch option validations / update JSDoc + ref…
ckfngod Sep 18, 2024
e4e0f43
fix(rabbitmq): prevent setting batch size of 1
ckfngod Sep 18, 2024
7a8226b
test(rabbitmq): adds integration tests for message batching behaviour
ckfngod Sep 18, 2024
6e709a0
test(rabbitmq): adds more integration tests for batching behaviour
ckfngod Sep 19, 2024
774d512
refactor(rabbitmq): refactor ConsumerHandler type
ckfngod Sep 19, 2024
b8992ba
refactor(rabbitmq): fall back on top-level `errorHandler` for batch m…
ckfngod Sep 20, 2024
dfb1e13
docs(rabbitmq): adds documentation for the message batching behaviour
ckfngod Sep 20, 2024
1fe3daa
refactor(rabbitmq): remove `handleMessage` and `handleMessages`
ckfngod Sep 20, 2024
df41f5f
refactor(rabbitmq): use common `setupChannel` func in `createRpc`
ckfngod Sep 20, 2024
e7be878
refactor(rabbitmq): improve tests + minor clean up
ckfngod Sep 20, 2024
8898269
fix(rabbitmq): fix e2e test uri
ckfngod Sep 20, 2024
c81e1f4
refactor(rabbitmq): pr feedback
ckfngod Sep 23, 2024
3899070
Merge branch 'master' into feature/rabbitmq-message-batching
underfisk Sep 23, 2024
6bed50c
refactor(rabbitmq): remove precondition error code comment
ckfngod Sep 23, 2024
26f271c
Merge branch 'master' into feature/rabbitmq-message-batching
underfisk Sep 24, 2024
503c827
test(rabbitmq): fix batching integration test flakiness
ckfngod Sep 24, 2024
393eb17
Merge branch 'master' into feature/rabbitmq-message-batching
underfisk Sep 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
270 changes: 260 additions & 10 deletions integration/rabbitmq/e2e/subscribe.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { INestApplication, Injectable, LoggerService } from '@nestjs/common';
import { Test } from '@nestjs/testing';
import { flatten, times } from 'lodash';
import { createMock } from '@golevelup/ts-jest';
import { setTimeout } from 'node:timers/promises';

const testHandler = jest.fn();

Expand Down Expand Up @@ -35,6 +36,15 @@ const deleteHandler = jest.fn();
const FANOUT = 'fanout';
const fanoutHandler = jest.fn();

const BATCH_SIZE = 10;
const BATCH_TIMEOUT = 200;
const batchHandler = jest.fn();
const batchRoutingKey = 'testSubscribeBatch';
const batchQueue = 'testSubscribeBatchQueue';
const batchErrorHandler = jest.fn();
const batchErrorRoutingKey = 'testSubscribeBatchError';
const batchErrorQueue = 'testSubscribeBatchErrorQueue';

@Injectable()
class SubscribeService {
@RabbitSubscribe({
Expand Down Expand Up @@ -139,6 +149,33 @@ class SubscribeService {
subscriberThatReturns() {
return Promise.resolve(true);
}

@RabbitSubscribe({
exchange,
routingKey: batchRoutingKey,
queue: batchQueue,
batchOptions: {
size: BATCH_SIZE,
timeout: BATCH_TIMEOUT,
},
})
batchSubscriber(messages) {
batchHandler(messages);
}

@RabbitSubscribe({
exchange,
routingKey: batchErrorRoutingKey,
queue: batchErrorQueue,
batchOptions: {
size: BATCH_SIZE,
timeout: BATCH_TIMEOUT,
errorHandler: batchErrorHandler,
},
})
batchErrorSubscriber() {
throw new Error();
}
}

describe('Rabbit Subscribe', () => {
Expand Down Expand Up @@ -211,7 +248,7 @@ describe('Rabbit Subscribe', () => {
amqpConnection.publish(exchange, x, `testMessage-${i}`),
);

await new Promise((resolve) => setTimeout(resolve, 50));
await setTimeout(50);

expect(testHandler).toHaveBeenCalledTimes(3);
expect(testHandler).toHaveBeenCalledWith(`testMessage-0`);
Expand All @@ -222,7 +259,7 @@ describe('Rabbit Subscribe', () => {
it('should receive messages when subscribed via handler name', async () => {
await amqpConnection.publish(exchange, routingKey3, 'testMessage');

await new Promise((resolve) => setTimeout(resolve, 50));
await setTimeout(50);

expect(testHandler).toHaveBeenCalledTimes(1);
expect(testHandler).toHaveBeenCalledWith(`testMessage`);
Expand All @@ -232,7 +269,7 @@ describe('Rabbit Subscribe', () => {
await amqpConnection.publish(exchange, routingKey4, 'testMessage');
await amqpConnection.publish(exchange, routingKey5, 'testMessage2');

await new Promise((resolve) => setTimeout(resolve, 50));
await setTimeout(50);

expect(testHandler).toHaveBeenCalledTimes(2);
expect(testHandler).toHaveBeenCalledWith(`testMessage`);
Expand All @@ -249,7 +286,7 @@ describe('Rabbit Subscribe', () => {
);

await Promise.all(promises);
await new Promise((resolve) => setTimeout(resolve, 150));
await setTimeout(150);

expect(createHandler).toHaveBeenCalledTimes(100);
times(100).forEach((x) => expect(createHandler).toHaveBeenCalledWith(x));
Expand All @@ -268,7 +305,7 @@ describe('Rabbit Subscribe', () => {
Buffer.from('{a:'),
);

await new Promise((resolve) => setTimeout(resolve, 50));
await setTimeout(50);

expect(testHandler).toHaveBeenCalledTimes(3);
expect(testHandler).toHaveBeenNthCalledWith(1, '');
Expand All @@ -286,7 +323,7 @@ describe('Rabbit Subscribe', () => {
amqpConnection.publish(amqDefaultExchange, preExistingQueue, message2);
amqpConnection.publish(amqDefaultExchange, preExistingQueue, message3);

await new Promise((resolve) => setTimeout(resolve, 50));
await setTimeout(50);

expect(testHandler).toHaveBeenCalledTimes(3);
expect(testHandler).toHaveBeenCalledWith(message1);
Expand All @@ -299,7 +336,7 @@ describe('Rabbit Subscribe', () => {
// publish to the default exchange, using the queue as routing key
amqpConnection.publish(amqDefaultExchange, nonExistingQueue, message);

await new Promise((resolve) => setTimeout(resolve, 50));
await setTimeout(50);

expect(testHandler).toHaveBeenCalledTimes(1);
expect(testHandler).toHaveBeenCalledWith(message);
Expand All @@ -309,7 +346,7 @@ describe('Rabbit Subscribe', () => {
const message = '{"key2":"value2"}';
amqpConnection.publish(amqDefaultExchange, nonExistingQueue, message);

await new Promise((resolve) => setTimeout(resolve, 50));
await setTimeout(50);
expect(testHandler).toHaveBeenCalledTimes(1);
const msg = testHandler.mock.calls[0][1];
expect(msg.fields.consumerTag).toEqual(preDefinedConsumerTag);
Expand All @@ -319,7 +356,7 @@ describe('Rabbit Subscribe', () => {
const message = { message: 'message' };
amqpConnection.publish(FANOUT, '', message);

await new Promise((resolve) => setTimeout(resolve, 50));
await setTimeout(50);

expect(fanoutHandler).toHaveBeenCalledTimes(1);
expect(fanoutHandler).toHaveBeenCalledWith(message);
Expand All @@ -331,10 +368,223 @@ describe('Rabbit Subscribe', () => {
// publish and expect to acknowledge but not throw
const warnSpy = jest.spyOn(customLogger, 'warn');
amqpConnection.publish(exchange, 'infinite-loop', message);
await new Promise((resolve) => setTimeout(resolve, 50));
await setTimeout(50);

expect(warnSpy).toHaveBeenCalledWith(
expect.stringContaining('Subscribe handlers should only return void'),
);
});

describe('Message Batching', () => {
const publishMessages = async (
size: number,
ex: string,
rk: string,
prefix = '',
) => {
const messages: string[] = [];
for (let i = 0; i < size; i++) {
const testMessage = `${prefix}testMessage${i}`;
await amqpConnection.publish(ex, rk, testMessage);
messages.push(testMessage);
}
return messages;
};

const parseMessages = (messages) => {
return messages.map((message) => JSON.parse(message.content.toString()));
};

const mockErrorHandler = (channel, messages) => {
for (const msg of messages) {
channel.ack(msg);
}
};

it('should return a full message batch immediately', async () => {
const testMessages = await publishMessages(
BATCH_SIZE,
exchange,
batchRoutingKey,
);

expect(batchHandler).toHaveBeenCalledTimes(1);
expect(batchHandler).toHaveBeenCalledWith(testMessages);
});

it('should return a partial message batch after timeout', async () => {
const testMessages = await publishMessages(1, exchange, batchRoutingKey);

await setTimeout(BATCH_TIMEOUT);
expect(batchHandler).toHaveBeenCalledTimes(1);
expect(batchHandler).toHaveBeenCalledWith(testMessages);
});

it('should return multiple batches of differing sizes', async () => {
const testMessageBatches: string[][] = [];

for (const [index, size] of [BATCH_SIZE, BATCH_SIZE, 1].entries()) {
testMessageBatches.push(
await publishMessages(
size,
exchange,
batchRoutingKey,
`batch${index}-`,
),
);
}

// two full batches should be immediately handled
expect(batchHandler).toHaveBeenCalledTimes(2);
for (const index of [0, 1]) {
expect(batchHandler).toHaveBeenNthCalledWith(
index + 1,
testMessageBatches[index],
);
}

await setTimeout(BATCH_TIMEOUT);
expect(batchHandler).toHaveBeenCalledTimes(3);
expect(batchHandler).toHaveBeenLastCalledWith(testMessageBatches[2]);
});

it('should return a full batch to the custom error handler', async () => {
// have to do this here because `resetAllMocks`
batchErrorHandler.mockImplementation(mockErrorHandler);

const testMessages = await publishMessages(
BATCH_SIZE,
exchange,
batchErrorRoutingKey,
);

// should be enough to place this after async error handling on the call stack
await setTimeout(1);

expect(batchErrorHandler).toHaveBeenCalledTimes(1);
expect(parseMessages(batchErrorHandler.mock.calls[0][1])).toEqual(
testMessages,
);
});

it('should return a partial batch to the custom error handler', async () => {
// have to do this here because `resetAllMocks`
batchErrorHandler.mockImplementation(mockErrorHandler);

const testMessages = await publishMessages(
1,
exchange,
batchErrorRoutingKey,
);

await setTimeout(BATCH_TIMEOUT);
expect(batchErrorHandler).toHaveBeenCalledTimes(1);
expect(parseMessages(batchErrorHandler.mock.calls[0][1])).toEqual(
testMessages,
);
});

it('should return multiple batches of differing sizes to the custom error handler', async () => {
// have to do this here because `resetAllMocks`
batchErrorHandler.mockImplementation(mockErrorHandler);

const testMessageBatches: string[][] = [];

for (const [index, size] of [BATCH_SIZE, BATCH_SIZE, 1].entries()) {
testMessageBatches.push(
await publishMessages(
size,
exchange,
batchErrorRoutingKey,
`batch${index}-`,
),
);
}

// should be enough to place this after async error handling on the call stack
await setTimeout(1);

// two full batches should be immediately handled
expect(batchErrorHandler).toHaveBeenCalledTimes(2);
for (const index of [0, 1]) {
expect(parseMessages(batchErrorHandler.mock.calls[index][1])).toEqual(
testMessageBatches[index],
);
}

await setTimeout(BATCH_TIMEOUT);
expect(batchErrorHandler).toHaveBeenCalledTimes(3);
expect(parseMessages(batchErrorHandler.mock.calls[2][1])).toEqual(
testMessageBatches[2],
);
});

it('should not return another full batch after batch timeout', async () => {
await publishMessages(BATCH_SIZE, exchange, batchRoutingKey);
expect(batchHandler).toHaveBeenCalledTimes(1);

await setTimeout(BATCH_TIMEOUT);
expect(batchHandler).toHaveBeenCalledTimes(1);
});

it('should not return a partial batch before batch timeout', async () => {
await publishMessages(1, exchange, batchRoutingKey);

await setTimeout(BATCH_TIMEOUT * 0.9);
expect(batchHandler).toHaveBeenCalledTimes(0);

await setTimeout(BATCH_TIMEOUT * 0.2);
expect(batchHandler).toHaveBeenCalledTimes(1);
});

it('should not return another partial batch after first batch timeout', async () => {
await publishMessages(1, exchange, batchRoutingKey);

await setTimeout(BATCH_TIMEOUT);
expect(batchHandler).toHaveBeenCalledTimes(1);

await setTimeout(BATCH_TIMEOUT);
expect(batchHandler).toHaveBeenCalledTimes(1);
});

it('should not return another full batch after batch timeout to the custom error handler', async () => {
// have to do this here because `resetAllMocks`
batchErrorHandler.mockImplementation(mockErrorHandler);

await publishMessages(BATCH_SIZE, exchange, batchErrorRoutingKey);

// should be enough to place this after async error handling on the call stack
await setTimeout(1);
expect(batchErrorHandler).toHaveBeenCalledTimes(1);

await setTimeout(BATCH_TIMEOUT);
expect(batchErrorHandler).toHaveBeenCalledTimes(1);
});

it('should not return a partial batch before batch timeout to the custom error handler', async () => {
// have to do this here because `resetAllMocks`
batchErrorHandler.mockImplementation(mockErrorHandler);

await publishMessages(1, exchange, batchErrorRoutingKey);

await setTimeout(BATCH_TIMEOUT * 0.9);
expect(batchErrorHandler).toHaveBeenCalledTimes(0);

await setTimeout(BATCH_TIMEOUT * 0.2);
expect(batchErrorHandler).toHaveBeenCalledTimes(1);
});

it('should not return another partial batch after first batch timeout to the custom error handler', async () => {
// have to do this here because `resetAllMocks`
batchErrorHandler.mockImplementation(mockErrorHandler);

await publishMessages(1, exchange, batchErrorRoutingKey);

await setTimeout(BATCH_TIMEOUT);
expect(batchErrorHandler).toHaveBeenCalledTimes(1);

await setTimeout(BATCH_TIMEOUT);
expect(batchErrorHandler).toHaveBeenCalledTimes(1);
});
});
});
Loading
Loading