1
1
package com .github .brainlag .nsq ;
2
2
3
+ import com .fasterxml .jackson .databind .ObjectMapper ;
3
4
import com .github .brainlag .nsq .exceptions .NSQException ;
4
5
import com .github .brainlag .nsq .lookup .DefaultNSQLookup ;
5
6
import com .github .brainlag .nsq .lookup .NSQLookup ;
27
28
import static org .junit .Assert .assertTrue ;
28
29
29
30
public class NSQProducerTest {
31
+ private ObjectMapper mapper = new ObjectMapper ();
30
32
31
33
private NSQConfig getSnappyConfig () {
32
34
final NSQConfig config = new NSQConfig ();
@@ -68,7 +70,7 @@ private NSQConfig getSslAndDeflateConfig() throws SSLException {
68
70
@ Test
69
71
public void testProduceOneMsgSnappy () throws NSQException , TimeoutException , InterruptedException {
70
72
AtomicInteger counter = new AtomicInteger (0 );
71
- NSQLookup lookup = new DefaultNSQLookup ();
73
+ NSQLookup lookup = new DefaultNSQLookup (mapper );
72
74
lookup .addLookupAddress (Nsq .getNsqLookupdHost (), 4161 );
73
75
74
76
NSQProducer producer = new NSQProducer ();
@@ -96,7 +98,7 @@ public void testProduceOneMsgSnappy() throws NSQException, TimeoutException, Int
96
98
public void testProduceOneMsgDeflate () throws NSQException , TimeoutException , InterruptedException {
97
99
System .setProperty ("io.netty.noJdkZlibDecoder" , "false" );
98
100
AtomicInteger counter = new AtomicInteger (0 );
99
- NSQLookup lookup = new DefaultNSQLookup ();
101
+ NSQLookup lookup = new DefaultNSQLookup (mapper );
100
102
lookup .addLookupAddress (Nsq .getNsqLookupdHost (), 4161 );
101
103
102
104
NSQProducer producer = new NSQProducer ();
@@ -123,7 +125,7 @@ public void testProduceOneMsgDeflate() throws NSQException, TimeoutException, In
123
125
@ Test
124
126
public void testProduceOneMsgSsl () throws InterruptedException , NSQException , TimeoutException , SSLException {
125
127
AtomicInteger counter = new AtomicInteger (0 );
126
- NSQLookup lookup = new DefaultNSQLookup ();
128
+ NSQLookup lookup = new DefaultNSQLookup (mapper );
127
129
lookup .addLookupAddress (Nsq .getNsqLookupdHost (), 4161 );
128
130
129
131
NSQProducer producer = new NSQProducer ();
@@ -150,7 +152,7 @@ public void testProduceOneMsgSsl() throws InterruptedException, NSQException, Ti
150
152
@ Test
151
153
public void testProduceOneMsgSslAndSnappy () throws InterruptedException , NSQException , TimeoutException , SSLException {
152
154
AtomicInteger counter = new AtomicInteger (0 );
153
- NSQLookup lookup = new DefaultNSQLookup ();
155
+ NSQLookup lookup = new DefaultNSQLookup (mapper );
154
156
lookup .addLookupAddress (Nsq .getNsqLookupdHost (), 4161 );
155
157
156
158
NSQProducer producer = new NSQProducer ();
@@ -178,7 +180,7 @@ public void testProduceOneMsgSslAndSnappy() throws InterruptedException, NSQExce
178
180
public void testProduceOneMsgSslAndDeflat () throws InterruptedException , NSQException , TimeoutException , SSLException {
179
181
System .setProperty ("io.netty.noJdkZlibDecoder" , "false" );
180
182
AtomicInteger counter = new AtomicInteger (0 );
181
- NSQLookup lookup = new DefaultNSQLookup ();
183
+ NSQLookup lookup = new DefaultNSQLookup (mapper );
182
184
lookup .addLookupAddress (Nsq .getNsqLookupdHost (), 4161 );
183
185
184
186
NSQProducer producer = new NSQProducer ();
@@ -206,7 +208,7 @@ public void testProduceOneMsgSslAndDeflat() throws InterruptedException, NSQExce
206
208
@ Test
207
209
public void testProduceMoreMsg () throws NSQException , TimeoutException , InterruptedException {
208
210
AtomicInteger counter = new AtomicInteger (0 );
209
- NSQLookup lookup = new DefaultNSQLookup ();
211
+ NSQLookup lookup = new DefaultNSQLookup (mapper );
210
212
lookup .addLookupAddress (Nsq .getNsqLookupdHost (), 4161 );
211
213
212
214
NSQConsumer consumer = new NSQConsumer (lookup , "test3" , "testconsumer" , (message ) -> {
@@ -235,7 +237,7 @@ public void testProduceMoreMsg() throws NSQException, TimeoutException, Interrup
235
237
@ Test
236
238
public void testParallelProducer () throws NSQException , TimeoutException , InterruptedException {
237
239
AtomicInteger counter = new AtomicInteger (0 );
238
- NSQLookup lookup = new DefaultNSQLookup ();
240
+ NSQLookup lookup = new DefaultNSQLookup (mapper );
239
241
lookup .addLookupAddress (Nsq .getNsqLookupdHost (), 4161 );
240
242
241
243
NSQConsumer consumer = new NSQConsumer (lookup , "test3" , "testconsumer" , (message ) -> {
@@ -271,7 +273,7 @@ public void testParallelProducer() throws NSQException, TimeoutException, Interr
271
273
@ Test
272
274
public void testMultiMessage () throws NSQException , TimeoutException , InterruptedException {
273
275
AtomicInteger counter = new AtomicInteger (0 );
274
- NSQLookup lookup = new DefaultNSQLookup ();
276
+ NSQLookup lookup = new DefaultNSQLookup (mapper );
275
277
lookup .addLookupAddress (Nsq .getNsqLookupdHost (), 4161 );
276
278
277
279
NSQConsumer consumer = new NSQConsumer (lookup , "test3" , "testconsumer" , (message ) -> {
@@ -301,7 +303,7 @@ public void testMultiMessage() throws NSQException, TimeoutException, Interrupte
301
303
@ Test
302
304
public void testBackoff () throws InterruptedException , NSQException , TimeoutException {
303
305
AtomicInteger counter = new AtomicInteger (0 );
304
- NSQLookup lookup = new DefaultNSQLookup ();
306
+ NSQLookup lookup = new DefaultNSQLookup (mapper );
305
307
lookup .addLookupAddress (Nsq .getNsqLookupdHost (), 4161 );
306
308
307
309
NSQConsumer consumer = new NSQConsumer (lookup , "test3" , "testconsumer" , (message ) -> {
@@ -335,7 +337,7 @@ public void testBackoff() throws InterruptedException, NSQException, TimeoutExce
335
337
@ Test
336
338
public void testScheduledCallback () throws NSQException , TimeoutException , InterruptedException {
337
339
AtomicInteger counter = new AtomicInteger (0 );
338
- NSQLookup lookup = new DefaultNSQLookup ();
340
+ NSQLookup lookup = new DefaultNSQLookup (mapper );
339
341
lookup .addLookupAddress (Nsq .getNsqLookupdHost (), 4161 );
340
342
341
343
NSQConsumer consumer = new NSQConsumer (lookup , "test3" , "testconsumer" , (message ) -> {});
@@ -349,7 +351,7 @@ public void testScheduledCallback() throws NSQException, TimeoutException, Inter
349
351
350
352
@ Test
351
353
public void testEphemeralTopic () throws InterruptedException , NSQException , TimeoutException {
352
- NSQLookup lookup = new DefaultNSQLookup ();
354
+ NSQLookup lookup = new DefaultNSQLookup (mapper );
353
355
lookup .addLookupAddress (Nsq .getNsqLookupdHost (), 4161 );
354
356
355
357
NSQProducer producer = new NSQProducer ();
0 commit comments