Skip to content

Commit d4b795e

Browse files
Sunny  TyagiSunny  Tyagi
authored andcommitted
feat(pkg): refactor the implementation of package and add event bridge startegy
refactor the implementation of package and add event bridge startegy gh-3
1 parent 02bf3e5 commit d4b795e

File tree

110 files changed

+3017
-1079
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

110 files changed

+3017
-1079
lines changed

README.md

Lines changed: 165 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -1,140 +1,220 @@
11
# Message bus queue connectors
2+
23
This is the package for the message bus queue connectors component for LoopBack 4 applications.
3-
It provides components to work with queues such as SQS, BullMQ
4+
It provides components to work with queues such as SQS, BullMQ and EventBridge
45

5-
[![LoopBack](https://github.com/loopbackio/loopback-next/raw/master/docs/site/imgs/branding/Powered-by-LoopBack-Badge-(blue)-@2x.png)](http://loopback.io/)
6+
[![LoopBack](<https://github.com/loopbackio/loopback-next/raw/master/docs/site/imgs/branding/Powered-by-LoopBack-Badge-(blue)-@2x.png>)](http://loopback.io/)
67

78
## Installation
89

9-
Install MessageBusQueueConnectorsComponent using `npm`;
10+
Install EventStreamConnectorComponent using `npm`;
1011

1112
```sh
12-
$ [npm install | yarn add] message-bus-queue-connectors
13+
$ [npm install | yarn add] @sourceloop/message-bus-queue-connectors
1314
```
15+
## Flow Diagram
16+
17+
<img width="659" alt="Screenshot 2025-06-06 at 10 53 06 AM" src="https://github.com/user-attachments/assets/baf1bcaa-5f67-44bb-a01a-b8d1c41644bc" />
1418

1519
## Basic Use
1620

17-
Configure and load MessageBusQueueConnectorsComponent in the application constructor
21+
Configure and load EventStreamConnectorComponent in the application constructor
1822
as shown below.
1923

20-
### SQS
2124
```ts
22-
import {SqsProducerProvider, SQSBindings, SQSConsumerObserver, SQSConsumerProvider} from 'message-bus-queue-connectors/sqs';
25+
import {
26+
EventStreamConnectorComponent
27+
} from '@sourceloop/message-bus-queue-connectors';
2328

2429
// ...
25-
export class MyApplication extends BootMixin(ServiceMixin(RepositoryMixin(RestApplication))) {
30+
export class MyApplication extends BootMixin(
31+
ServiceMixin(RepositoryMixin(RestApplication)),
32+
) {
2633
constructor(options: ApplicationConfig = {}) {
2734
super();
28-
this.bind(SqsClientBindings.Config).to(
29-
options.sqsConfig
30-
);
31-
32-
this.bind(SQSBindings.SQSProducerProvider).toProvider(SqsProducerProvider);
35+
this.component(EventStreamConnectorComponent);
3336
// ...
37+
}
38+
// ...
39+
}
40+
```
3441

35-
// Add lifecycle observer
36-
this.lifeCycleObserver(SQSConsumerObserver);
42+
### SQS
43+
44+
To use SQS as their message queue, bind its required config and connector component in your application.
45+
46+
```ts
47+
import {
48+
SQSConnector,
49+
SQSBindings,
50+
EventStreamConnectorComponent
51+
} from '@sourceloop/message-bus-queue-connectors';
52+
53+
// ...
54+
export class MyApplication extends BootMixin(
55+
ServiceMixin(RepositoryMixin(RestApplication)),
56+
) {
57+
constructor(options: ApplicationConfig = {}) {
58+
super();
59+
60+
this.component(EventStreamConnectorComponent);
61+
// SQS Config and its connector
62+
this.bind(SQSBindings.Config).to({
63+
queueConfig: {
64+
QueueUrl: 'http://127.0.0.1:4566/000000000000/my-test-queue',
65+
MessageRetentionPeriod: 60, // at least 60 seconds
66+
MaximumMessageSize: 262144,
67+
ReceiveMessageWaitTimeSeconds: 20, // typical polling time
68+
VisibilityTimeout: 30, // 30 seconds
69+
},
70+
Credentials: {
71+
region: 'us-east-1',
72+
accessKeyId: 'test',
73+
secretAccessKey: 'test',
74+
},
75+
ConsumerConfig: {
76+
MaxNumberOfMessages: 10,
77+
WaitTimeSeconds: 20,
78+
maxConsumers: 2,
79+
},
80+
});
81+
82+
this.component(SQSConnector);
3783

3884
// ...
3985
}
4086
// ...
4187
}
4288
```
4389

44-
#### SQS Config
90+
to make the application as consumer, pass 'isConsumer' flag to be true in SQS config. like
91+
4592
```ts
4693
const config = {
47-
queueConfig: {
48-
QueueUrl: "sqs-queue-url",,
49-
MessageRetentionPeriod: 1,
50-
MaximumMessageSize: 262144 ,
51-
ReceiveMessageWaitTimeSeconds: 60,
52-
VisibilityTimeout: 300,
53-
},
54-
Credentials: {
55-
region: "aws-region",
56-
accessKeyId: "aws-access-key-id",
57-
secretAccessKey: "aws-secret-access-key",
58-
},
59-
ConsumerConfig: {
60-
MaxNumberOfMessages: 10,
61-
WaitTimeSeconds: 20,
62-
maxConsumers: 2,
63-
},
64-
}
65-
94+
// rest of ur config
95+
isConsumer: true,
96+
};
6697
```
67-
#### Consumer setup
68-
Below is consumer handler example.
69-
```ts
70-
this.bind(SQSBindings.SQSConsumerProvider).toProvider(SQSConsumerProvider);
71-
// eslint-disable-next-line @typescript-eslint/no-explicit-any
72-
this.bind(SQSBindings.SQSConsumerHandler).to(async (message: string) => {
73-
console.log('Processing message SQS---------:', message);
74-
75-
});
7698

77-
```
7899
Please follow the [AWS SDK for JavaScript](https://docs.aws.amazon.com/sdk-for-javascript/v2/developer-guide/sqs-examples-send-receive-messages.html) for more information on the configuration.
79100

80-
81101
### BullMQ
82102

103+
To use BullMq as their message queue, bind its required config and connector component in your application.
104+
83105
```ts
84-
import {BullMQProducerProvider, BullMQBindings, BullMQConsumerObserver} from 'message-bus-queue-connectors/bullmq';
106+
import {
107+
BullMQConnector,
108+
BullMQBindings,
109+
EventStreamConnectorComponent,
110+
} from '@sourceloop/message-bus-queue-connectors';
85111

86112
// ...
87-
export class MyApplication extends BootMixin(ServiceMixin(RepositoryMixin(RestApplication))) {
113+
export class MyApplication extends BootMixin(
114+
ServiceMixin(RepositoryMixin(RestApplication)),
115+
) {
88116
constructor(options: ApplicationConfig = {}) {
89117
super();
90-
this.bind(BullMQBindings.Config).to(
91-
options.config
92-
);
93-
94-
this.bind(BullMQBindings.BullMQProducerProvider).toProvider(BullMQProducerProvider);
95-
// ...
96-
// Add lifecycle observer
97-
this.lifeCycleObserver(BullMQConsumerObserver);
118+
119+
this.component(EventStreamConnectorComponent);
120+
121+
// Bull Mq config and connector
122+
this.bind(BullMQBindings.Config).to({
123+
QueueName: process.env.QUEUE_NAME ?? 'default-queue',
124+
redisConfig: {
125+
host: process.env.REDIS_HOST ?? 'localhost',
126+
port: parseInt(process.env.REDIS_PORT ?? '6379'),
127+
password: process.env.REDIS_PASSWORD ?? undefined,
128+
},
129+
producerConfig: {
130+
defaultJobOptions: {
131+
attempts: 3,
132+
backoff: 5000,
133+
},
134+
},
135+
consumerConfig: {
136+
MinConsumers: 1,
137+
MaxConsumers: 5,
138+
QueuePollInterval: 2000,
139+
},
140+
});
141+
this.component(BullMQConnector);
98142
// ...
99143
}
100144
// ...
101145
}
102146
```
103147

104-
#### BullMQ config
148+
to make the application as consumer, pass 'isConsumer' flag to be true in Bull config. like
149+
105150
```ts
106151
const config = {
107-
queueConfig:{
108-
QueueName: 'BullMQ1',
109-
},
110-
QueueName: 'BullMQ1',
111-
producerConfig: {
112-
defaultJobOptions: {attempts: 3,
113-
backoff: {
114-
type: 'exponential',
115-
delay: 5000,
116-
},
117-
}
118-
},
119-
consumerConfig: {
120-
MaxConsumers: 1,
121-
MinConsumers: 1,
122-
},
123-
redisConfig: {
124-
host: process.env.REDIS_HOST ?? 'localhost',
125-
port: +(process.env.REDIS_PORT ?? 6379),
126-
}
127-
152+
// rest of ur config
153+
isConsumer: true,
154+
};
155+
```
156+
157+
## Integration
158+
159+
@sourceloop/message-bus-queue-connectors provides a decorator '@producer()' that can be used to access the producer of each msg queue. It expects one arguement defining the type of queue, of which producer u want to use. like
160+
161+
```ts
162+
@injectable({scope: BindingScope.TRANSIENT})
163+
export class EventConnector implements IEventConnector<PublishedEvents> {
164+
constructor(
165+
@producer(QueueType.EventBridge)
166+
private producer: Producer,
167+
@producer(QueueType.SQS)
168+
private sqsProducer: Producer,
169+
@producer(QueueType.BullMQ)
170+
private bullMqProducer: Producer,
171+
) {}
172+
173+
// rest of implementation
174+
128175
}
176+
```
177+
178+
Producer provider two ways of sending events - single event at a time and multiple event at a time.
179+
180+
```ts
181+
export type Producer<Stream extends AnyObject = AnyObject> = {
182+
send: <Event extends keyof Stream>(data: Stream[Event], topic?: Event) => Promise<void>;
183+
sendMultiple: <Event extends keyof Stream>(data: Stream[Event][], topic?: Event) => Promise<void>;
184+
};
185+
```
129186

187+
It provides '@consumer' decorator to make a service as consumer. consumer needs to follow an interface.
188+
189+
```ts
190+
export interface IConsumer<Stream extends AnyObject, Event extends keyof Stream> {
191+
event: Event;
192+
queue: QueueType;
193+
handle(data: Stream[Event]): Promise<void>;
194+
}
130195
```
131196

132-
#### Consumer setup
133-
Below is consumer handler example.
197+
and can be used as
198+
134199
```ts
135-
this.bind(BullMQBindings.BullMQConsumerProvider).toProvider(BullMQConsumerProvider);
136-
137-
this.bind(BullMQBindings.BullMQConsumerHandler).to(async (message: string) => {
138-
console.log('Processing message ---------:', message);
139-
});
200+
import {
201+
IConsumer,
202+
QueueType,
203+
consumer,
204+
} from '@sourceloop/message-bus-queue-connectors';
205+
import { OrchestratorStream, EventTypes, ProvisioningInputs } from '../../types';
206+
207+
@consumer
208+
export class TenantProvisioningConsumerForEventSQS
209+
implements IConsumer<OrchestratorStream, EventTypes.TENANT_PROVISIONING>
210+
{
211+
constructor(
212+
) {}
213+
event: EventTypes.TENANT_PROVISIONING = EventTypes.TENANT_PROVISIONING;
214+
queue: QueueType = QueueType.SQS;
215+
async handle(data: ProvisioningInputs): Promise<void> {
216+
console.log(`SQS: ${this.event} Event Recieved ` + JSON.stringify(data));
217+
return;
218+
}
219+
}
140220
```
32.2 KB
Binary file not shown.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
"eslint": "eslint --report-unused-disable-directives .",
4444
"eslint:fix": "npm run eslint -- --fix",
4545
"pretest": "npm run rebuild",
46-
"test": "lb-mocha --exit --allow-console-logs \"dist/__tests__\"",
46+
"test": "lb-mocha --allow-console-logs \"dist/__tests__\"",
4747
"test:dev": "lb-mocha --allow-console-logs dist/__tests__/**/*.js",
4848
"clean": "lb-clean dist *.tsbuildinfo .eslintcache",
4949
"rebuild": "npm run clean && npm run build",
Binary file not shown.
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import {expect, sinon} from '@loopback/testlab';
2+
import {ProducerApp} from './fixtures/producers-app';
3+
import {TestProducerService} from './fixtures/services/test-producer.service';
4+
import {setupProducerApplication} from './helpers/app-builder';
5+
import {QueueStub} from './stubs/bullmq-queue.stub';
6+
import {Events} from '../test-stream';
7+
import {DEFAULT_SOURCE} from '../../../constants';
8+
9+
[
10+
{
11+
type: 'Stubbed queue',
12+
queue: new QueueStub(),
13+
condition: () => true,
14+
},
15+
].forEach(({type, queue, condition}) => {
16+
describe(`BullMq Connector: With ${type}`, () => {
17+
let producerApp: ProducerApp | undefined;
18+
let producerService: TestProducerService;
19+
let listenerStub: sinon.SinonStub;
20+
before(async function () {
21+
if (!condition()) {
22+
// eslint-disable-next-line @typescript-eslint/no-invalid-this
23+
this.skip();
24+
}
25+
producerApp = await setupProducerApplication(queue);
26+
producerService = producerApp.getSync<TestProducerService>(
27+
`services.TestProducerService`,
28+
);
29+
listenerStub = sinon.stub().resolves();
30+
if (queue) {
31+
queue.register(listenerStub);
32+
}
33+
});
34+
beforeEach(() => {
35+
listenerStub.reset();
36+
});
37+
after(async () => {
38+
await producerApp?.stop();
39+
});
40+
describe('Producer', () => {
41+
it('should produce an event for a particular topic', async () => {
42+
await producerService.produceEventA('test string');
43+
sinon.assert.calledWithExactly(
44+
listenerStub,
45+
'Event A',
46+
'test string',
47+
{},
48+
);
49+
});
50+
it('should produce multiple events for a particular topic', async () => {
51+
const data = ['test string 1', 'test string 2'];
52+
await producerService.produceMultipleA(data);
53+
sinon.assert.calledWithExactly(
54+
listenerStub,
55+
JSON.stringify(
56+
data.map(d => ({name: 'Event A', data: d, type: Events.A})),
57+
),
58+
'',
59+
{},
60+
);
61+
});
62+
});
63+
});
64+
});

0 commit comments

Comments
 (0)