Skip to content

Commit ce44d4d

Browse files
ckfngodunderfisk
andauthored
feat(rabbitmq): adds a message batching mechanism for RabbitMQ handlers (#781)
* feat(rabbitmq): adds a message batching mechanism for RabbitMQ handlers * refactor(rabbitmq): update error handling behaviour + clean up SubscriberHandler * refactor(rabbitmq): add more context for resumeConsumer error * refactor(rabbitmq): add batch option validations / update JSDoc + refactor logic for cog complexity * fix(rabbitmq): prevent setting batch size of 1 * test(rabbitmq): adds integration tests for message batching behaviour * test(rabbitmq): adds more integration tests for batching behaviour * refactor(rabbitmq): refactor ConsumerHandler type * refactor(rabbitmq): fall back on top-level `errorHandler` for batch message errors * docs(rabbitmq): adds documentation for the message batching behaviour * refactor(rabbitmq): remove `handleMessage` and `handleMessages` * refactor(rabbitmq): use common `setupChannel` func in `createRpc` * refactor(rabbitmq): improve tests + minor clean up * fix(rabbitmq): fix e2e test uri * refactor(rabbitmq): pr feedback * refactor(rabbitmq): remove precondition error code comment * test(rabbitmq): fix batching integration test flakiness --------- Co-authored-by: Rodrigo <monstawoodwow@gmail.com>
1 parent ef5a4f7 commit ce44d4d

File tree

7 files changed

+590
-95
lines changed

7 files changed

+590
-95
lines changed

integration/rabbitmq/e2e/subscribe.e2e-spec.ts

Lines changed: 262 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { INestApplication, Injectable, LoggerService } from '@nestjs/common';
77
import { Test } from '@nestjs/testing';
88
import { flatten, times } from 'lodash';
99
import { createMock } from '@golevelup/ts-jest';
10+
import { setTimeout } from 'node:timers/promises';
1011

1112
const testHandler = jest.fn();
1213

@@ -35,6 +36,15 @@ const deleteHandler = jest.fn();
3536
const FANOUT = 'fanout';
3637
const fanoutHandler = jest.fn();
3738

39+
const BATCH_SIZE = 10;
40+
const BATCH_TIMEOUT = 200;
41+
const batchHandler = jest.fn();
42+
const batchRoutingKey = 'testSubscribeBatch';
43+
const batchQueue = 'testSubscribeBatchQueue';
44+
const batchErrorHandler = jest.fn();
45+
const batchErrorRoutingKey = 'testSubscribeBatchError';
46+
const batchErrorQueue = 'testSubscribeBatchErrorQueue';
47+
3848
@Injectable()
3949
class SubscribeService {
4050
@RabbitSubscribe({
@@ -139,6 +149,33 @@ class SubscribeService {
139149
subscriberThatReturns() {
140150
return Promise.resolve(true);
141151
}
152+
153+
@RabbitSubscribe({
154+
exchange,
155+
routingKey: batchRoutingKey,
156+
queue: batchQueue,
157+
batchOptions: {
158+
size: BATCH_SIZE,
159+
timeout: BATCH_TIMEOUT,
160+
},
161+
})
162+
batchSubscriber(messages) {
163+
batchHandler(messages);
164+
}
165+
166+
@RabbitSubscribe({
167+
exchange,
168+
routingKey: batchErrorRoutingKey,
169+
queue: batchErrorQueue,
170+
batchOptions: {
171+
size: BATCH_SIZE,
172+
timeout: BATCH_TIMEOUT,
173+
errorHandler: batchErrorHandler,
174+
},
175+
})
176+
batchErrorSubscriber() {
177+
throw new Error();
178+
}
142179
}
143180

144181
describe('Rabbit Subscribe', () => {
@@ -211,7 +248,7 @@ describe('Rabbit Subscribe', () => {
211248
amqpConnection.publish(exchange, x, `testMessage-${i}`),
212249
);
213250

214-
await new Promise((resolve) => setTimeout(resolve, 50));
251+
await setTimeout(50);
215252

216253
expect(testHandler).toHaveBeenCalledTimes(3);
217254
expect(testHandler).toHaveBeenCalledWith(`testMessage-0`);
@@ -222,7 +259,7 @@ describe('Rabbit Subscribe', () => {
222259
it('should receive messages when subscribed via handler name', async () => {
223260
await amqpConnection.publish(exchange, routingKey3, 'testMessage');
224261

225-
await new Promise((resolve) => setTimeout(resolve, 50));
262+
await setTimeout(50);
226263

227264
expect(testHandler).toHaveBeenCalledTimes(1);
228265
expect(testHandler).toHaveBeenCalledWith(`testMessage`);
@@ -232,7 +269,7 @@ describe('Rabbit Subscribe', () => {
232269
await amqpConnection.publish(exchange, routingKey4, 'testMessage');
233270
await amqpConnection.publish(exchange, routingKey5, 'testMessage2');
234271

235-
await new Promise((resolve) => setTimeout(resolve, 50));
272+
await setTimeout(50);
236273

237274
expect(testHandler).toHaveBeenCalledTimes(2);
238275
expect(testHandler).toHaveBeenCalledWith(`testMessage`);
@@ -249,7 +286,7 @@ describe('Rabbit Subscribe', () => {
249286
);
250287

251288
await Promise.all(promises);
252-
await new Promise((resolve) => setTimeout(resolve, 150));
289+
await setTimeout(150);
253290

254291
expect(createHandler).toHaveBeenCalledTimes(100);
255292
times(100).forEach((x) => expect(createHandler).toHaveBeenCalledWith(x));
@@ -268,7 +305,7 @@ describe('Rabbit Subscribe', () => {
268305
Buffer.from('{a:'),
269306
);
270307

271-
await new Promise((resolve) => setTimeout(resolve, 50));
308+
await setTimeout(50);
272309

273310
expect(testHandler).toHaveBeenCalledTimes(3);
274311
expect(testHandler).toHaveBeenNthCalledWith(1, '');
@@ -286,7 +323,7 @@ describe('Rabbit Subscribe', () => {
286323
amqpConnection.publish(amqDefaultExchange, preExistingQueue, message2);
287324
amqpConnection.publish(amqDefaultExchange, preExistingQueue, message3);
288325

289-
await new Promise((resolve) => setTimeout(resolve, 50));
326+
await setTimeout(50);
290327

291328
expect(testHandler).toHaveBeenCalledTimes(3);
292329
expect(testHandler).toHaveBeenCalledWith(message1);
@@ -299,7 +336,7 @@ describe('Rabbit Subscribe', () => {
299336
// publish to the default exchange, using the queue as routing key
300337
amqpConnection.publish(amqDefaultExchange, nonExistingQueue, message);
301338

302-
await new Promise((resolve) => setTimeout(resolve, 50));
339+
await setTimeout(50);
303340

304341
expect(testHandler).toHaveBeenCalledTimes(1);
305342
expect(testHandler).toHaveBeenCalledWith(message);
@@ -309,7 +346,7 @@ describe('Rabbit Subscribe', () => {
309346
const message = '{"key2":"value2"}';
310347
amqpConnection.publish(amqDefaultExchange, nonExistingQueue, message);
311348

312-
await new Promise((resolve) => setTimeout(resolve, 50));
349+
await setTimeout(50);
313350
expect(testHandler).toHaveBeenCalledTimes(1);
314351
const msg = testHandler.mock.calls[0][1];
315352
expect(msg.fields.consumerTag).toEqual(preDefinedConsumerTag);
@@ -319,7 +356,7 @@ describe('Rabbit Subscribe', () => {
319356
const message = { message: 'message' };
320357
amqpConnection.publish(FANOUT, '', message);
321358

322-
await new Promise((resolve) => setTimeout(resolve, 50));
359+
await setTimeout(50);
323360

324361
expect(fanoutHandler).toHaveBeenCalledTimes(1);
325362
expect(fanoutHandler).toHaveBeenCalledWith(message);
@@ -331,10 +368,225 @@ describe('Rabbit Subscribe', () => {
331368
// publish and expect to acknowledge but not throw
332369
const warnSpy = jest.spyOn(customLogger, 'warn');
333370
amqpConnection.publish(exchange, 'infinite-loop', message);
334-
await new Promise((resolve) => setTimeout(resolve, 50));
371+
await setTimeout(50);
335372

336373
expect(warnSpy).toHaveBeenCalledWith(
337374
expect.stringContaining('Subscribe handlers should only return void'),
338375
);
339376
});
377+
378+
describe('Message Batching', () => {
379+
const publishMessages = async (
380+
size: number,
381+
ex: string,
382+
rk: string,
383+
prefix = '',
384+
) => {
385+
const messages: string[] = [];
386+
for (let i = 0; i < size; i++) {
387+
const testMessage = `${prefix}testMessage${i}`;
388+
await amqpConnection.publish(ex, rk, testMessage);
389+
messages.push(testMessage);
390+
}
391+
return messages;
392+
};
393+
394+
const parseMessages = (messages) => {
395+
return messages.map((message) => JSON.parse(message.content.toString()));
396+
};
397+
398+
const mockErrorHandler = (channel, messages) => {
399+
for (const msg of messages) {
400+
channel.ack(msg);
401+
}
402+
};
403+
404+
const paddedBatchTimeout = BATCH_TIMEOUT + 10;
405+
406+
it('should return a full message batch immediately', async () => {
407+
const testMessages = await publishMessages(
408+
BATCH_SIZE,
409+
exchange,
410+
batchRoutingKey,
411+
);
412+
413+
expect(batchHandler).toHaveBeenCalledTimes(1);
414+
expect(batchHandler).toHaveBeenCalledWith(testMessages);
415+
});
416+
417+
it('should return a partial message batch after timeout', async () => {
418+
const testMessages = await publishMessages(1, exchange, batchRoutingKey);
419+
420+
await setTimeout(paddedBatchTimeout);
421+
expect(batchHandler).toHaveBeenCalledTimes(1);
422+
expect(batchHandler).toHaveBeenCalledWith(testMessages);
423+
});
424+
425+
it('should return multiple batches of differing sizes', async () => {
426+
const testMessageBatches: string[][] = [];
427+
428+
for (const [index, size] of [BATCH_SIZE, BATCH_SIZE, 1].entries()) {
429+
testMessageBatches.push(
430+
await publishMessages(
431+
size,
432+
exchange,
433+
batchRoutingKey,
434+
`batch${index}-`,
435+
),
436+
);
437+
}
438+
439+
// two full batches should be immediately handled
440+
expect(batchHandler).toHaveBeenCalledTimes(2);
441+
for (const index of [0, 1]) {
442+
expect(batchHandler).toHaveBeenNthCalledWith(
443+
index + 1,
444+
testMessageBatches[index],
445+
);
446+
}
447+
448+
await setTimeout(paddedBatchTimeout);
449+
expect(batchHandler).toHaveBeenCalledTimes(3);
450+
expect(batchHandler).toHaveBeenLastCalledWith(testMessageBatches[2]);
451+
});
452+
453+
it('should return a full batch to the custom error handler', async () => {
454+
// have to do this here because `resetAllMocks`
455+
batchErrorHandler.mockImplementation(mockErrorHandler);
456+
457+
const testMessages = await publishMessages(
458+
BATCH_SIZE,
459+
exchange,
460+
batchErrorRoutingKey,
461+
);
462+
463+
// should be enough to place this after async error handling on the call stack
464+
await setTimeout(1);
465+
466+
expect(batchErrorHandler).toHaveBeenCalledTimes(1);
467+
expect(parseMessages(batchErrorHandler.mock.calls[0][1])).toEqual(
468+
testMessages,
469+
);
470+
});
471+
472+
it('should return a partial batch to the custom error handler', async () => {
473+
// have to do this here because `resetAllMocks`
474+
batchErrorHandler.mockImplementation(mockErrorHandler);
475+
476+
const testMessages = await publishMessages(
477+
1,
478+
exchange,
479+
batchErrorRoutingKey,
480+
);
481+
482+
await setTimeout(paddedBatchTimeout);
483+
expect(batchErrorHandler).toHaveBeenCalledTimes(1);
484+
expect(parseMessages(batchErrorHandler.mock.calls[0][1])).toEqual(
485+
testMessages,
486+
);
487+
});
488+
489+
it('should return multiple batches of differing sizes to the custom error handler', async () => {
490+
// have to do this here because `resetAllMocks`
491+
batchErrorHandler.mockImplementation(mockErrorHandler);
492+
493+
const testMessageBatches: string[][] = [];
494+
495+
for (const [index, size] of [BATCH_SIZE, BATCH_SIZE, 1].entries()) {
496+
testMessageBatches.push(
497+
await publishMessages(
498+
size,
499+
exchange,
500+
batchErrorRoutingKey,
501+
`batch${index}-`,
502+
),
503+
);
504+
}
505+
506+
// should be enough to place this after async error handling on the call stack
507+
await setTimeout(1);
508+
509+
// two full batches should be immediately handled
510+
expect(batchErrorHandler).toHaveBeenCalledTimes(2);
511+
for (const index of [0, 1]) {
512+
expect(parseMessages(batchErrorHandler.mock.calls[index][1])).toEqual(
513+
testMessageBatches[index],
514+
);
515+
}
516+
517+
await setTimeout(paddedBatchTimeout);
518+
expect(batchErrorHandler).toHaveBeenCalledTimes(3);
519+
expect(parseMessages(batchErrorHandler.mock.calls[2][1])).toEqual(
520+
testMessageBatches[2],
521+
);
522+
});
523+
524+
it('should not return another full batch after batch timeout', async () => {
525+
await publishMessages(BATCH_SIZE, exchange, batchRoutingKey);
526+
expect(batchHandler).toHaveBeenCalledTimes(1);
527+
528+
await setTimeout(paddedBatchTimeout);
529+
expect(batchHandler).toHaveBeenCalledTimes(1);
530+
});
531+
532+
it('should not return a partial batch before batch timeout', async () => {
533+
await publishMessages(1, exchange, batchRoutingKey);
534+
535+
await setTimeout(paddedBatchTimeout * 0.9);
536+
expect(batchHandler).toHaveBeenCalledTimes(0);
537+
538+
await setTimeout(paddedBatchTimeout * 0.2);
539+
expect(batchHandler).toHaveBeenCalledTimes(1);
540+
});
541+
542+
it('should not return another partial batch after first batch timeout', async () => {
543+
await publishMessages(1, exchange, batchRoutingKey);
544+
545+
await setTimeout(paddedBatchTimeout);
546+
expect(batchHandler).toHaveBeenCalledTimes(1);
547+
548+
await setTimeout(paddedBatchTimeout);
549+
expect(batchHandler).toHaveBeenCalledTimes(1);
550+
});
551+
552+
it('should not return another full batch after batch timeout to the custom error handler', async () => {
553+
// have to do this here because `resetAllMocks`
554+
batchErrorHandler.mockImplementation(mockErrorHandler);
555+
556+
await publishMessages(BATCH_SIZE, exchange, batchErrorRoutingKey);
557+
558+
// should be enough to place this after async error handling on the call stack
559+
await setTimeout(1);
560+
expect(batchErrorHandler).toHaveBeenCalledTimes(1);
561+
562+
await setTimeout(paddedBatchTimeout);
563+
expect(batchErrorHandler).toHaveBeenCalledTimes(1);
564+
});
565+
566+
it('should not return a partial batch before batch timeout to the custom error handler', async () => {
567+
// have to do this here because `resetAllMocks`
568+
batchErrorHandler.mockImplementation(mockErrorHandler);
569+
570+
await publishMessages(1, exchange, batchErrorRoutingKey);
571+
572+
await setTimeout(paddedBatchTimeout * 0.9);
573+
expect(batchErrorHandler).toHaveBeenCalledTimes(0);
574+
575+
await setTimeout(paddedBatchTimeout * 0.2);
576+
expect(batchErrorHandler).toHaveBeenCalledTimes(1);
577+
});
578+
579+
it('should not return another partial batch after first batch timeout to the custom error handler', async () => {
580+
// have to do this here because `resetAllMocks`
581+
batchErrorHandler.mockImplementation(mockErrorHandler);
582+
583+
await publishMessages(1, exchange, batchErrorRoutingKey);
584+
585+
await setTimeout(paddedBatchTimeout);
586+
expect(batchErrorHandler).toHaveBeenCalledTimes(1);
587+
588+
await setTimeout(paddedBatchTimeout);
589+
expect(batchErrorHandler).toHaveBeenCalledTimes(1);
590+
});
591+
});
340592
});

0 commit comments

Comments
 (0)