16
16
import java .util .Date ;
17
17
import java .util .List ;
18
18
import java .util .Set ;
19
- import java .util .concurrent .*;
19
+ import java .util .concurrent .ExecutorService ;
20
+ import java .util .concurrent .LinkedBlockingQueue ;
21
+ import java .util .concurrent .ThreadPoolExecutor ;
22
+ import java .util .concurrent .TimeUnit ;
23
+ import java .util .concurrent .TimeoutException ;
20
24
import java .util .concurrent .atomic .AtomicInteger ;
21
25
22
26
import static org .junit .Assert .assertEquals ;
@@ -65,11 +69,11 @@ private NSQConfig getSslAndDeflateConfig() throws SSLException {
65
69
public void testProduceOneMsgSnappy () throws NSQException , TimeoutException , InterruptedException {
66
70
AtomicInteger counter = new AtomicInteger (0 );
67
71
NSQLookup lookup = new DefaultNSQLookup ();
68
- lookup .addLookupAddress ("localhost" , 4161 );
72
+ lookup .addLookupAddress (Nsq . getNsqLookupdHost () , 4161 );
69
73
70
74
NSQProducer producer = new NSQProducer ();
71
75
producer .setConfig (getSnappyConfig ());
72
- producer .addAddress ("localhost" , 4150 );
76
+ producer .addAddress (Nsq . getNsqdHost () , 4150 );
73
77
producer .start ();
74
78
String msg = randomString ();
75
79
producer .produce ("test3" , msg .getBytes ());
@@ -93,11 +97,11 @@ public void testProduceOneMsgDeflate() throws NSQException, TimeoutException, In
93
97
System .setProperty ("io.netty.noJdkZlibDecoder" , "false" );
94
98
AtomicInteger counter = new AtomicInteger (0 );
95
99
NSQLookup lookup = new DefaultNSQLookup ();
96
- lookup .addLookupAddress ("localhost" , 4161 );
100
+ lookup .addLookupAddress (Nsq . getNsqLookupdHost () , 4161 );
97
101
98
102
NSQProducer producer = new NSQProducer ();
99
103
producer .setConfig (getDeflateConfig ());
100
- producer .addAddress ("localhost" , 4150 );
104
+ producer .addAddress (Nsq . getNsqdHost () , 4150 );
101
105
producer .start ();
102
106
String msg = randomString ();
103
107
producer .produce ("test3" , msg .getBytes ());
@@ -120,11 +124,11 @@ public void testProduceOneMsgDeflate() throws NSQException, TimeoutException, In
120
124
public void testProduceOneMsgSsl () throws InterruptedException , NSQException , TimeoutException , SSLException {
121
125
AtomicInteger counter = new AtomicInteger (0 );
122
126
NSQLookup lookup = new DefaultNSQLookup ();
123
- lookup .addLookupAddress ("localhost" , 4161 );
127
+ lookup .addLookupAddress (Nsq . getNsqLookupdHost () , 4161 );
124
128
125
129
NSQProducer producer = new NSQProducer ();
126
130
producer .setConfig (getSslConfig ());
127
- producer .addAddress ("localhost" , 4150 );
131
+ producer .addAddress (Nsq . getNsqdHost () , 4150 );
128
132
producer .start ();
129
133
String msg = randomString ();
130
134
producer .produce ("test3" , msg .getBytes ());
@@ -147,11 +151,11 @@ public void testProduceOneMsgSsl() throws InterruptedException, NSQException, Ti
147
151
public void testProduceOneMsgSslAndSnappy () throws InterruptedException , NSQException , TimeoutException , SSLException {
148
152
AtomicInteger counter = new AtomicInteger (0 );
149
153
NSQLookup lookup = new DefaultNSQLookup ();
150
- lookup .addLookupAddress ("localhost" , 4161 );
154
+ lookup .addLookupAddress (Nsq . getNsqLookupdHost () , 4161 );
151
155
152
156
NSQProducer producer = new NSQProducer ();
153
157
producer .setConfig (getSslAndSnappyConfig ());
154
- producer .addAddress ("localhost" , 4150 );
158
+ producer .addAddress (Nsq . getNsqdHost () , 4150 );
155
159
producer .start ();
156
160
String msg = randomString ();
157
161
producer .produce ("test3" , msg .getBytes ());
@@ -175,11 +179,11 @@ public void testProduceOneMsgSslAndDeflat() throws InterruptedException, NSQExce
175
179
System .setProperty ("io.netty.noJdkZlibDecoder" , "false" );
176
180
AtomicInteger counter = new AtomicInteger (0 );
177
181
NSQLookup lookup = new DefaultNSQLookup ();
178
- lookup .addLookupAddress ("localhost" , 4161 );
182
+ lookup .addLookupAddress (Nsq . getNsqLookupdHost () , 4161 );
179
183
180
184
NSQProducer producer = new NSQProducer ();
181
185
producer .setConfig (getSslAndDeflateConfig ());
182
- producer .addAddress ("localhost" , 4150 );
186
+ producer .addAddress (Nsq . getNsqdHost () , 4150 );
183
187
producer .start ();
184
188
String msg = randomString ();
185
189
producer .produce ("test3" , msg .getBytes ());
@@ -203,7 +207,7 @@ public void testProduceOneMsgSslAndDeflat() throws InterruptedException, NSQExce
203
207
public void testProduceMoreMsg () throws NSQException , TimeoutException , InterruptedException {
204
208
AtomicInteger counter = new AtomicInteger (0 );
205
209
NSQLookup lookup = new DefaultNSQLookup ();
206
- lookup .addLookupAddress ("localhost" , 4161 );
210
+ lookup .addLookupAddress (Nsq . getNsqLookupdHost () , 4161 );
207
211
208
212
NSQConsumer consumer = new NSQConsumer (lookup , "test3" , "testconsumer" , (message ) -> {
209
213
LogManager .getLogger (this ).info ("Processing message: " + new String (message .getMessage ()));
@@ -213,7 +217,7 @@ public void testProduceMoreMsg() throws NSQException, TimeoutException, Interrup
213
217
consumer .start ();
214
218
215
219
NSQProducer producer = new NSQProducer ();
216
- producer .addAddress ("localhost" , 4150 );
220
+ producer .addAddress (Nsq . getNsqdHost () , 4150 );
217
221
producer .start ();
218
222
for (int i = 0 ; i < 1000 ; i ++) {
219
223
String msg = randomString ();
@@ -232,7 +236,7 @@ public void testProduceMoreMsg() throws NSQException, TimeoutException, Interrup
232
236
public void testParallelProducer () throws NSQException , TimeoutException , InterruptedException {
233
237
AtomicInteger counter = new AtomicInteger (0 );
234
238
NSQLookup lookup = new DefaultNSQLookup ();
235
- lookup .addLookupAddress ("localhost" , 4161 );
239
+ lookup .addLookupAddress (Nsq . getNsqLookupdHost () , 4161 );
236
240
237
241
NSQConsumer consumer = new NSQConsumer (lookup , "test3" , "testconsumer" , (message ) -> {
238
242
LogManager .getLogger (this ).info ("Processing message: " + new String (message .getMessage ()));
@@ -242,7 +246,7 @@ public void testParallelProducer() throws NSQException, TimeoutException, Interr
242
246
consumer .start ();
243
247
244
248
NSQProducer producer = new NSQProducer ();
245
- producer .addAddress ("localhost" , 4150 );
249
+ producer .addAddress (Nsq . getNsqdHost () , 4150 );
246
250
producer .start ();
247
251
for (int n = 0 ; n < 5 ; n ++) {
248
252
new Thread (() -> {
@@ -268,7 +272,7 @@ public void testParallelProducer() throws NSQException, TimeoutException, Interr
268
272
public void testMultiMessage () throws NSQException , TimeoutException , InterruptedException {
269
273
AtomicInteger counter = new AtomicInteger (0 );
270
274
NSQLookup lookup = new DefaultNSQLookup ();
271
- lookup .addLookupAddress ("localhost" , 4161 );
275
+ lookup .addLookupAddress (Nsq . getNsqLookupdHost () , 4161 );
272
276
273
277
NSQConsumer consumer = new NSQConsumer (lookup , "test3" , "testconsumer" , (message ) -> {
274
278
LogManager .getLogger (this ).info ("Processing message: " + new String (message .getMessage ()));
@@ -278,7 +282,7 @@ public void testMultiMessage() throws NSQException, TimeoutException, Interrupte
278
282
consumer .start ();
279
283
280
284
NSQProducer producer = new NSQProducer ();
281
- producer .addAddress ("localhost" , 4150 );
285
+ producer .addAddress (Nsq . getNsqdHost () , 4150 );
282
286
producer .start ();
283
287
List <byte []> messages = Lists .newArrayList ();
284
288
for (int i = 0 ; i < 50 ; i ++) {
@@ -298,7 +302,7 @@ public void testMultiMessage() throws NSQException, TimeoutException, Interrupte
298
302
public void testBackoff () throws InterruptedException , NSQException , TimeoutException {
299
303
AtomicInteger counter = new AtomicInteger (0 );
300
304
NSQLookup lookup = new DefaultNSQLookup ();
301
- lookup .addLookupAddress ("localhost" , 4161 );
305
+ lookup .addLookupAddress (Nsq . getNsqLookupdHost () , 4161 );
302
306
303
307
NSQConsumer consumer = new NSQConsumer (lookup , "test3" , "testconsumer" , (message ) -> {
304
308
LogManager .getLogger (this ).info ("Processing message: " + new String (message .getMessage ()));
@@ -313,7 +317,7 @@ public void testBackoff() throws InterruptedException, NSQException, TimeoutExce
313
317
consumer .start ();
314
318
315
319
NSQProducer producer = new NSQProducer ();
316
- producer .addAddress ("localhost" , 4150 );
320
+ producer .addAddress (Nsq . getNsqdHost () , 4150 );
317
321
producer .start ();
318
322
for (int i = 0 ; i < 20 ; i ++) {
319
323
String msg = randomString ();
@@ -332,7 +336,7 @@ public void testBackoff() throws InterruptedException, NSQException, TimeoutExce
332
336
public void testScheduledCallback () throws NSQException , TimeoutException , InterruptedException {
333
337
AtomicInteger counter = new AtomicInteger (0 );
334
338
NSQLookup lookup = new DefaultNSQLookup ();
335
- lookup .addLookupAddress ("localhost" , 4161 );
339
+ lookup .addLookupAddress (Nsq . getNsqLookupdHost () , 4161 );
336
340
337
341
NSQConsumer consumer = new NSQConsumer (lookup , "test3" , "testconsumer" , (message ) -> {});
338
342
consumer .scheduleRun (() -> counter .incrementAndGet (), 1000 , 1000 , TimeUnit .MILLISECONDS );
@@ -346,11 +350,11 @@ public void testScheduledCallback() throws NSQException, TimeoutException, Inter
346
350
@ Test
347
351
public void testEphemeralTopic () throws InterruptedException , NSQException , TimeoutException {
348
352
NSQLookup lookup = new DefaultNSQLookup ();
349
- lookup .addLookupAddress ("localhost" , 4161 );
353
+ lookup .addLookupAddress (Nsq . getNsqLookupdHost () , 4161 );
350
354
351
355
NSQProducer producer = new NSQProducer ();
352
356
producer .setConfig (getDeflateConfig ());
353
- producer .addAddress ("localhost" , 4150 );
357
+ producer .addAddress (Nsq . getNsqdHost () , 4150 );
354
358
producer .start ();
355
359
String msg = randomString ();
356
360
producer .produce ("testephem#ephemeral" , msg .getBytes ());
0 commit comments