Skip to content

Commit 2d2be6a

Browse files
committed
set exhange broker type topic allways
1 parent 74f159e commit 2d2be6a

14 files changed

+115
-39
lines changed

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "unoapi-cloud",
3-
"version": "2.0.6",
3+
"version": "2.0.7",
44
"description": "Unoapi Cloud",
55
"exports": "./dist/index.js",
66
"types": "./dist/index.d.ts",

src/amqp.ts

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -198,8 +198,7 @@ export const amqpPublish = async (
198198
dead: false,
199199
maxRetries: UNOAPI_MESSAGE_RETRY_LIMIT,
200200
countRetries: 0,
201-
priority: 0,
202-
type: 'topic'
201+
priority: 0
203202
},
204203
) => {
205204
validateRoutingKey(routingKey)
@@ -248,8 +247,7 @@ export const amqpConsume = async (
248247
options: Partial<CreateOption> = {
249248
delay: UNOAPI_MESSAGE_RETRY_DELAY,
250249
priority: 0,
251-
notifyFailedMessages: NOTIFY_FAILED_MESSAGES,
252-
type: 'topic'
250+
notifyFailedMessages: NOTIFY_FAILED_MESSAGES
253251
},
254252
) => {
255253
logger.debug('Configurate to consume exchange: %s, queue: %s, routing key: %s and type: %s', exchange, queue, routingKey, options.type)
@@ -297,14 +295,14 @@ export const amqpConsume = async (
297295
},
298296
},
299297
},
300-
{ maxRetries: 0 },
298+
{ maxRetries: 0, type: 'topic' },
301299
)
302300
logger.info('Sent error to whatsapp!')
303301
}
304-
await amqpPublish(exchange, queue, routingKey, data, { dead: true })
302+
await amqpPublish(exchange, queue, routingKey, data, { dead: true, type: options.type })
305303
} else {
306304
logger.info('Publish retry %s of %s', countRetries, maxRetries)
307-
await amqpPublish(exchange, queue, routingKey, data, { delay: 60000, maxRetries, countRetries })
305+
await amqpPublish(exchange, queue, routingKey, data, { delay: 60000, maxRetries, countRetries, type: options.type })
308306
}
309307
await channel.ack(payload)
310308
}

src/broker.ts

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,25 +46,55 @@ const startBroker = async () => {
4646
logger.info('Unoapi Cloud version %s starting broker...', version)
4747

4848
logger.info('Starting reload consumer')
49-
await amqpConsume(UNOAPI_EXCHANGE_BROKER_NAME, UNOAPI_QUEUE_RELOAD, '*', reloadJob.consume.bind(reloadJob))
49+
await amqpConsume(
50+
UNOAPI_EXCHANGE_BROKER_NAME,
51+
UNOAPI_QUEUE_RELOAD,
52+
'*',
53+
reloadJob.consume.bind(reloadJob),
54+
{ type: 'topic' }
55+
)
5056

5157
logger.info('Starting media consumer')
52-
await amqpConsume(UNOAPI_EXCHANGE_BROKER_NAME, UNOAPI_QUEUE_MEDIA, '*', mediaJob.consume.bind(mediaJob))
58+
await amqpConsume(
59+
UNOAPI_EXCHANGE_BROKER_NAME,
60+
UNOAPI_QUEUE_MEDIA,
61+
'*',
62+
mediaJob.consume.bind(mediaJob),
63+
{ type: 'topic' }
64+
)
5365

5466
logger.info('Binding queues consumer for server %s', UNOAPI_SERVER_NAME)
5567

5668
const notifyFailedMessages = NOTIFY_FAILED_MESSAGES
5769

5870
logger.info('Starting outgoing consumer %s', UNOAPI_SERVER_NAME)
59-
await amqpConsume(UNOAPI_EXCHANGE_BROKER_NAME, UNOAPI_QUEUE_OUTGOING, '*', outgingJob.consume.bind(outgingJob), { notifyFailedMessages, prefetch })
71+
await amqpConsume(
72+
UNOAPI_EXCHANGE_BROKER_NAME,
73+
UNOAPI_QUEUE_OUTGOING,
74+
'*',
75+
outgingJob.consume.bind(outgingJob),
76+
{ notifyFailedMessages, prefetch, type: 'topic' }
77+
)
6078

6179
if (notifyFailedMessages) {
6280
logger.debug('Starting notification consumer %s', UNOAPI_SERVER_NAME)
63-
await amqpConsume(UNOAPI_EXCHANGE_BROKER_NAME, UNOAPI_QUEUE_NOTIFICATION, '*', notificationJob.consume.bind(notificationJob), { notifyFailedMessages: false })
81+
await amqpConsume(
82+
UNOAPI_EXCHANGE_BROKER_NAME,
83+
UNOAPI_QUEUE_NOTIFICATION,
84+
'*',
85+
notificationJob.consume.bind(notificationJob),
86+
{ notifyFailedMessages: false, type: 'topic' }
87+
)
6488
}
6589

6690
logger.info('Starting blacklist add consumer %s', UNOAPI_SERVER_NAME)
67-
await amqpConsume(UNOAPI_EXCHANGE_BROKER_NAME, UNOAPI_QUEUE_BLACKLIST_ADD, '*', addToBlacklist, { notifyFailedMessages, prefetch })
91+
await amqpConsume(
92+
UNOAPI_EXCHANGE_BROKER_NAME,
93+
UNOAPI_QUEUE_BLACKLIST_ADD,
94+
'*',
95+
addToBlacklist,
96+
{ notifyFailedMessages, prefetch, type: 'topic' }
97+
)
6898

6999
logger.info('Unoapi Cloud version %s started broker!', version)
70100
}

src/bulker.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,22 +40,22 @@ const startBulker = async () => {
4040
logger.info('Unoapi Cloud version %s starting bulker...', version)
4141

4242
logger.info('Starting commander consumer')
43-
await amqpConsume(UNOAPI_EXCHANGE_BROKER_NAME, UNOAPI_QUEUE_COMMANDER, '*', commanderJob.consume.bind(commanderJob))
43+
await amqpConsume(UNOAPI_EXCHANGE_BROKER_NAME, UNOAPI_QUEUE_COMMANDER, '*', commanderJob.consume.bind(commanderJob), { type: 'topic' })
4444

4545
logger.info('Starting bulk parser consumer')
46-
await amqpConsume(UNOAPI_EXCHANGE_BROKER_NAME, UNOAPI_QUEUE_BULK_PARSER, '*', bulkParserJob.consume.bind(bulkParserJob))
46+
await amqpConsume(UNOAPI_EXCHANGE_BROKER_NAME, UNOAPI_QUEUE_BULK_PARSER, '*', bulkParserJob.consume.bind(bulkParserJob), { type: 'topic' })
4747

4848
logger.info('Starting bulk sender consumer')
49-
await amqpConsume(UNOAPI_EXCHANGE_BROKER_NAME, UNOAPI_QUEUE_BULK_SENDER, '*', bulkSenderJob.consume.bind(bulkSenderJob))
49+
await amqpConsume(UNOAPI_EXCHANGE_BROKER_NAME, UNOAPI_QUEUE_BULK_SENDER, '*', bulkSenderJob.consume.bind(bulkSenderJob), { type: 'topic' })
5050

5151
logger.info('Starting bulk status consumer')
52-
await amqpConsume(UNOAPI_EXCHANGE_BROKER_NAME, UNOAPI_QUEUE_BULK_STATUS, '*', bulkStatusJob.consume.bind(bulkStatusJob))
52+
await amqpConsume(UNOAPI_EXCHANGE_BROKER_NAME, UNOAPI_QUEUE_BULK_STATUS, '*', bulkStatusJob.consume.bind(bulkStatusJob), { type: 'topic' })
5353

5454
logger.info('Starting bulk report consumer')
55-
await amqpConsume(UNOAPI_EXCHANGE_BROKER_NAME, UNOAPI_QUEUE_BULK_REPORT, '*', bulkReportJob.consume.bind(bulkReportJob))
55+
await amqpConsume(UNOAPI_EXCHANGE_BROKER_NAME, UNOAPI_QUEUE_BULK_REPORT, '*', bulkReportJob.consume.bind(bulkReportJob), { type: 'topic' })
5656

5757
logger.info('Starting bulk webhook consumer')
58-
await amqpConsume(UNOAPI_EXCHANGE_BROKER_NAME, UNOAPI_QUEUE_BULK_WEBHOOK, '*', bulkWebhookJob.consume.bind(bulkWebhookJob))
58+
await amqpConsume(UNOAPI_EXCHANGE_BROKER_NAME, UNOAPI_QUEUE_BULK_WEBHOOK, '*', bulkWebhookJob.consume.bind(bulkWebhookJob), { type: 'topic' })
5959
}
6060
startBulker()
6161

src/jobs/bulk_parser.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ export class BulkParserJob {
237237
this.outgoing.formatAndSend(phone, phone, message)
238238
await amqpPublish(UNOAPI_EXCHANGE_BROKER_NAME, this.queueBulkSender, phone, {
239239
payload: { messages, id, length: messages.length },
240-
})
240+
}, { type: 'topic' })
241241
} catch (error) {
242242
logger.error(error, 'Error on parse bulk')
243243
const message = {

src/jobs/bulk_report.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,13 @@ export class BulkReportJob {
3030
message = { body: `Bulk ${id} phone ${phone} with ${length}, has retried generate ${count} and not retried more` }
3131
} else {
3232
message = { body: `Bulk ${id} phone ${phone} with ${length}, some messages is already scheduled status, try again later, this is ${count} try...` }
33-
await amqpPublish(UNOAPI_EXCHANGE_BROKER_NAME, UNOAPI_QUEUE_BULK_REPORT, phone, { payload: { id, length, count } }, { delay: UNOAPI_BULK_DELAY * 1000 })
33+
await amqpPublish(
34+
UNOAPI_EXCHANGE_BROKER_NAME,
35+
UNOAPI_QUEUE_BULK_REPORT,
36+
phone,
37+
{ payload: { id, length, count } },
38+
{ delay: UNOAPI_BULK_DELAY * 1000, type: 'topic' }
39+
)
3440
}
3541
} else {
3642
const caption = `Bulk ${id} phone ${phone} with ${length} message(s) status -> ${JSON.stringify(status)}`

src/jobs/bulk_sender.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ export class BulkSenderJob {
5757
{
5858
payload: { phone, messages: messagesToRenqueue, id, length },
5959
},
60-
{ delay: delayToResend },
60+
{ delay: delayToResend, type: 'topic' },
6161
)
6262
statusMessage = `Bulk ${id} phone ${phone} reenqueuing ${messagesToRenqueue.length} message(s) with delay ${delayToResend}...`
6363
} else {
@@ -66,7 +66,7 @@ export class BulkSenderJob {
6666
UNOAPI_EXCHANGE_BROKER_NAME,
6767
UNOAPI_QUEUE_BULK_REPORT, phone,
6868
{ payload: { id, length } },
69-
{ delay: UNOAPI_BULK_DELAY * 1000 }
69+
{ delay: UNOAPI_BULK_DELAY * 1000, type: 'topic' }
7070
)
7171
}
7272
const messageUpdate = {

src/jobs/commander.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ export class CommanderJob {
4848
template: 'sisodonto',
4949
url: payload?.document?.link,
5050
},
51-
}
51+
}, { type: 'topic' }
5252
)
5353
const message = {
5454
type: 'text',
@@ -73,7 +73,7 @@ export class CommanderJob {
7373
const config = { webhooks }
7474
logger.debug('Template webhooks %s', phone, JSON.stringify(webhooks))
7575
await setConfig(phone, config)
76-
await amqpPublish(UNOAPI_EXCHANGE_BROKER_NAME, `${UNOAPI_QUEUE_RELOAD}.${currentConfig.server!}`, phone , { phone })
76+
await amqpPublish(UNOAPI_EXCHANGE_BROKER_NAME, `${UNOAPI_QUEUE_RELOAD}.${currentConfig.server!}`, phone , { phone }, { type: 'topic' })
7777
} else if (payload?.to && phone === payload?.to && payload?.template && payload?.template.name == 'unoapi-bulk-report') {
7878
logger.debug('Parsing bulk report template... %s', phone)
7979
const service = new Template(this.getConfig)
@@ -85,7 +85,7 @@ export class CommanderJob {
8585
throw new YamlParseError(doc.errors)
8686
}
8787
const { bulk } = doc.toJS()
88-
await amqpPublish(UNOAPI_EXCHANGE_BROKER_NAME, UNOAPI_QUEUE_BULK_REPORT, phone, { payload: { phone, id: bulk, unverified: true } })
88+
await amqpPublish(UNOAPI_EXCHANGE_BROKER_NAME, UNOAPI_QUEUE_BULK_REPORT, phone, { payload: { phone, id: bulk, unverified: true } }, { type: 'topic' })
8989
} else if (payload?.to && phone === payload?.to && payload?.template && payload?.template.name == 'unoapi-config') {
9090
logger.debug('Parsing config template... %s', phone)
9191
const service = new Template(this.getConfig)

src/services/blacklist.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,6 @@ export const addToBlacklistRedis: addToBlacklist = async (from: string, webhookI
6969
}
7070

7171
export const addToBlacklistJob: addToBlacklist = async (from: string, webhookId: string, to: string, ttl: number) => {
72-
await amqpPublish(UNOAPI_EXCHANGE_BROKER_NAME, UNOAPI_QUEUE_BLACKLIST_ADD, from, { from, webhookId, to, ttl })
72+
await amqpPublish(UNOAPI_EXCHANGE_BROKER_NAME, UNOAPI_QUEUE_BLACKLIST_ADD, from, { from, webhookId, to, ttl }, { type: 'topic' })
7373
return true
7474
}

src/services/broadcast_amqp.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,6 @@ import { Broadcast } from './broadcast'
55
export class BroadcastAmqp extends Broadcast {
66
public async send(phone: string, type: string, content: string) {
77
const payload = { phone, type, content }
8-
await amqpPublish(UNOAPI_EXCHANGE_BROKER_NAME, UNOAPI_QUEUE_BROADCAST, phone, payload)
8+
await amqpPublish(UNOAPI_EXCHANGE_BROKER_NAME, UNOAPI_QUEUE_BROADCAST, phone, payload, { type: 'topic' })
99
}
1010
}

src/services/outgoing_amqp.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,16 @@ export class OutgoingAmqp implements Outgoing {
1818

1919
public async send(phone: string, payload: object) {
2020
const config = await this.getConfig(phone)
21-
await amqpPublish(UNOAPI_EXCHANGE_BROKER_NAME, UNOAPI_QUEUE_OUTGOING, phone, { webhooks: config.webhooks, payload, split: true })
21+
await amqpPublish(
22+
UNOAPI_EXCHANGE_BROKER_NAME,
23+
UNOAPI_QUEUE_OUTGOING, phone,
24+
{ webhooks: config.webhooks, payload, split: true },
25+
{ type: 'topic' }
26+
)
2227
}
2328

2429
public async sendHttp(phone: string, webhook: Webhook, payload: object, options: Partial<PublishOption> = {}) {
30+
options.type = 'topic'
2531
await amqpPublish(UNOAPI_EXCHANGE_BROKER_NAME, UNOAPI_QUEUE_OUTGOING, phone, { webhook, payload, split: false }, options)
2632
}
2733
}

src/services/reload_amqp.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ export class ReloadAmqp extends Reload {
1717
UNOAPI_EXCHANGE_BROKER_NAME,
1818
UNOAPI_QUEUE_RELOAD,
1919
phone,
20-
{ phone, ...params }
20+
{ phone, ...params },
21+
{ type: 'topic' }
2122
)
2223
await amqpPublish(
2324
UNOAPI_EXCHANGE_BRIDGE_NAME,

src/standalone.ts

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -117,29 +117,64 @@ if (process.env.AMQP_URL) {
117117
const bindBridgeJob = new BindBridgeJob()
118118
const logoutJob = new LogoutJob(logout)
119119
logger.info('Starting bind bridge consumer')
120-
amqpConsume(UNOAPI_EXCHANGE_BRIDGE_NAME, `${UNOAPI_QUEUE_BIND}.${UNOAPI_SERVER_NAME}`, '*', bindBridgeJob.consume.bind(bindBridgeJob), { type: 'direct' })
120+
amqpConsume(
121+
UNOAPI_EXCHANGE_BRIDGE_NAME,
122+
`${UNOAPI_QUEUE_BIND}.${UNOAPI_SERVER_NAME}`,
123+
'*',
124+
bindBridgeJob.consume.bind(bindBridgeJob),
125+
{ type: 'direct' }
126+
)
121127
logger.info('Starting reload consumer')
122-
amqpConsume(UNOAPI_EXCHANGE_BRIDGE_NAME, `${UNOAPI_QUEUE_RELOAD}.${UNOAPI_SERVER_NAME}`, '*', reloadJob.consume.bind(reloadJob), { type: 'direct' })
128+
amqpConsume(
129+
UNOAPI_EXCHANGE_BRIDGE_NAME,
130+
`${UNOAPI_QUEUE_RELOAD}.${UNOAPI_SERVER_NAME}`,
131+
'*',
132+
reloadJob.consume.bind(reloadJob),
133+
{ type: 'direct' }
134+
)
123135
logger.info('Starting logout consumer')
124-
amqpConsume(UNOAPI_EXCHANGE_BRIDGE_NAME, `${UNOAPI_QUEUE_LOGOUT}.${UNOAPI_SERVER_NAME}`, '*', logoutJob.consume.bind(logoutJob), { type: 'direct' })
136+
amqpConsume(
137+
UNOAPI_EXCHANGE_BRIDGE_NAME,
138+
`${UNOAPI_QUEUE_LOGOUT}.${UNOAPI_SERVER_NAME}`,
139+
'*',
140+
logoutJob.consume.bind(logoutJob),
141+
{ type: 'direct' }
142+
)
125143
logger.info('Starting media consumer')
126144
const mediaJob = new MediaJob(getConfigVar)
127-
amqpConsume(UNOAPI_EXCHANGE_BROKER_NAME, UNOAPI_QUEUE_MEDIA, '*', mediaJob.consume.bind(mediaJob))
145+
amqpConsume(UNOAPI_EXCHANGE_BROKER_NAME, UNOAPI_QUEUE_MEDIA, '*', mediaJob.consume.bind(mediaJob), { type: 'topic' })
128146
const prefetch = UNOAPI_QUEUE_OUTGOING_PREFETCH
129147
logger.info('Binding queues consumer for server %s', UNOAPI_SERVER_NAME)
130148
const notifyFailedMessages = NOTIFY_FAILED_MESSAGES
131149
logger.info('Starting outgoing consumer %s', UNOAPI_SERVER_NAME)
132150
const outgoingCloudApi: Outgoing = new OutgoingCloudApi(getConfigRedis, isInBlacklistInRedis)
133151
const outgoinJob = new OutgoingJob(getConfigVar, outgoingCloudApi)
134-
amqpConsume(UNOAPI_EXCHANGE_BROKER_NAME, UNOAPI_QUEUE_OUTGOING, '*', outgoinJob.consume.bind(outgoinJob), { notifyFailedMessages, prefetch })
152+
amqpConsume(
153+
UNOAPI_EXCHANGE_BROKER_NAME,
154+
UNOAPI_QUEUE_OUTGOING,
155+
'*',
156+
outgoinJob.consume.bind(outgoinJob),
157+
{ notifyFailedMessages, prefetch, type: 'topic' }
158+
)
135159
if (notifyFailedMessages) {
136160
logger.debug('Starting notification consumer %s', UNOAPI_SERVER_NAME)
137161
const notificationJob = new NotificationJob(incoming)
138-
amqpConsume(UNOAPI_EXCHANGE_BROKER_NAME, UNOAPI_QUEUE_NOTIFICATION, '*', notificationJob.consume.bind(notificationJob), { notifyFailedMessages: false })
162+
amqpConsume(
163+
UNOAPI_EXCHANGE_BROKER_NAME,
164+
UNOAPI_QUEUE_NOTIFICATION,
165+
'*',
166+
notificationJob.consume.bind(notificationJob),
167+
{ notifyFailedMessages: false, type: 'topic' })
139168
}
140169

141170
logger.info('Starting blacklist add consumer %s', UNOAPI_SERVER_NAME)
142-
amqpConsume(UNOAPI_EXCHANGE_BROKER_NAME, UNOAPI_QUEUE_BLACKLIST_ADD, '*', atbl, { notifyFailedMessages, prefetch })
171+
amqpConsume(
172+
UNOAPI_EXCHANGE_BROKER_NAME,
173+
UNOAPI_QUEUE_BLACKLIST_ADD,
174+
'*',
175+
atbl,
176+
{ notifyFailedMessages, prefetch, type: 'topic' }
177+
)
143178
} else {
144179
logger.info('Starting standard mode')
145180
}

src/web.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ const broadcastJob = new BroacastJob(broadcast)
4848

4949
app.server.listen(PORT, '0.0.0.0', async () => {
5050
logger.info('Starting broadcast consumer')
51-
await amqpConsume(UNOAPI_EXCHANGE_BROKER_NAME, UNOAPI_QUEUE_BROADCAST, '*', broadcastJob.consume.bind(broadcastJob))
52-
await amqpConsume(UNOAPI_EXCHANGE_BROKER_NAME, UNOAPI_QUEUE_RELOAD, '*', reload.run.bind(reloadJob))
51+
await amqpConsume(UNOAPI_EXCHANGE_BROKER_NAME, UNOAPI_QUEUE_BROADCAST, '*', broadcastJob.consume.bind(broadcastJob), { type: 'topic' })
52+
await amqpConsume(UNOAPI_EXCHANGE_BROKER_NAME, UNOAPI_QUEUE_RELOAD, '*', reload.run.bind(reloadJob), { type: 'topic' })
5353
logger.info('Unoapi Cloud version: %s, listening on port: %s | Linked Device: %s(%s)', version, PORT, CONFIG_SESSION_PHONE_CLIENT, CONFIG_SESSION_PHONE_NAME)
5454
})

0 commit comments

Comments
 (0)