@@ -12,6 +12,16 @@ module.exports = function rabbitTopology() {
12
12
msqQueue = QueueModel . rabbit . exchange . queue ( { name, durable : true } )
13
13
}
14
14
15
+ function setupQueues ( Model ) {
16
+ QueueModel = Model
17
+
18
+ // Loop through all the defined queues
19
+ _ . forEach ( QueueModel . topology , ( handlers , queue ) => {
20
+ // Setup the actual queue on RabbitMQ
21
+ setupQueue ( queue )
22
+ } )
23
+ }
24
+
15
25
function setupQueueConsumer ( app , queue , definition ) {
16
26
debug ( 'setupQueueConsumer' )
17
27
const modelName = definition . model
@@ -31,7 +41,10 @@ module.exports = function rabbitTopology() {
31
41
}
32
42
33
43
// Start consuming the queue
34
- msqQueue . consume ( Method )
44
+ if ( msqQueue ) {
45
+ msqQueue . consume ( Method )
46
+ }
47
+
35
48
36
49
debug ( 'setupQueueConsumer: queue: %s, model: %s, method: %s' , queue , modelName , methodName )
37
50
}
@@ -49,7 +62,12 @@ module.exports = function rabbitTopology() {
49
62
50
63
Model [ methodName ] = function queueProducer ( params ) {
51
64
debug ( `${ modelName } .${ methodName } (%o)` , params )
52
- QueueModel . rabbit . exchange . publish ( params , { key : queue } )
65
+ if ( QueueModel . rabbit && QueueModel . rabbit . exchange ) {
66
+ QueueModel . rabbit . exchange . publish ( params , { key : queue } )
67
+ }
68
+ else {
69
+ debug ( 'setupQueueProducer: queue %s is not yet initialised' , queue )
70
+ }
53
71
}
54
72
}
55
73
@@ -59,9 +77,6 @@ module.exports = function rabbitTopology() {
59
77
// Loop through all the defined queues
60
78
_ . forEach ( QueueModel . topology , ( handlers , queue ) => {
61
79
62
- // Setup the actual queue on RabbitMQ
63
- setupQueue ( queue )
64
-
65
80
// Setup the consumer of this queue
66
81
if ( handlers . consumer ) {
67
82
setupQueueConsumer ( QueueModel . app , queue , handlers . consumer )
@@ -77,6 +92,7 @@ module.exports = function rabbitTopology() {
77
92
78
93
return {
79
94
setupQueue,
95
+ setupQueues,
80
96
setupTopology,
81
97
setupQueueProducer,
82
98
setupQueueConsumer,
0 commit comments