Skip to content

Commit 855a6e6

Browse files
pfeairhellerbrainlag
authored andcommitted
Added the TOUCH command to NSQMessage to allow long running consumers to notify nsqd that they are still working on a message so it won't be requeued.
(cherry picked from commit 8bb3bda)
1 parent 68217e0 commit 855a6e6

File tree

2 files changed

+66
-0
lines changed

2 files changed

+66
-0
lines changed

src/main/java/com/github/brainlag/nsq/NSQMessage.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,14 @@ public void finished() {
2424
}
2525
}
2626

27+
public void touch() {
28+
try {
29+
connection.command(NSQCommand.instance("TOUCH " + new String(id, "ascii")));
30+
} catch (UnsupportedEncodingException e) {
31+
LogManager.getLogger(this).error("ASCII charset is not supported by your JVM?", e);
32+
}
33+
}
34+
2735
/**
2836
* indicates a problem with processing, puts it back on the queue.
2937
*/
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package com.github.brainlag.nsq;
2+
3+
import com.github.brainlag.nsq.exceptions.NSQException;
4+
import com.github.brainlag.nsq.lookup.DefaultNSQLookup;
5+
import com.github.brainlag.nsq.lookup.NSQLookup;
6+
import org.apache.logging.log4j.LogManager;
7+
import org.junit.Test;
8+
9+
import java.util.concurrent.TimeoutException;
10+
import java.util.concurrent.atomic.AtomicInteger;
11+
12+
import static org.junit.Assert.assertTrue;
13+
14+
public class NSQConsumerTest {
15+
16+
//duration to wait before auto-requeing a message setting in nsqd, defined with -msg-timeout:
17+
//Set your timeout -msg-timeout="5s" command line when starting nsqd or changed this constant
18+
//to the default of 60000.
19+
private static final long NSQ_MSG_TIMEOUT = 5000;
20+
21+
@Test
22+
public void testLongRunningConsumer() throws NSQException, TimeoutException, InterruptedException {
23+
AtomicInteger counter = new AtomicInteger(0);
24+
NSQLookup lookup = new DefaultNSQLookup();
25+
lookup.addLookupAddress("localhost", 4161);
26+
27+
NSQConsumer consumer = new NSQConsumer(lookup, "test1", "testconsumer", (message) -> {
28+
LogManager.getLogger(this).info("Processing message: " + new String(message.getMessage()));
29+
counter.incrementAndGet();
30+
31+
long sleepTime = NSQ_MSG_TIMEOUT / 5;
32+
for (int i = 0; i < 5; i++) {
33+
try {
34+
Thread.sleep(sleepTime + 500);
35+
} catch (InterruptedException e) {
36+
e.printStackTrace();
37+
}
38+
message.touch();
39+
}
40+
message.finished();
41+
assertTrue(message.getAttempts() == 1);
42+
});
43+
consumer.start();
44+
45+
NSQProducer producer = new NSQProducer();
46+
producer.addAddress("localhost", 4150);
47+
producer.start();
48+
String msg = "test-one-message";
49+
producer.produce("test1", msg.getBytes());
50+
producer.shutdown();
51+
52+
Thread.sleep(NSQ_MSG_TIMEOUT * 2);
53+
assertTrue(counter.get() == 1);
54+
consumer.shutdown();
55+
}
56+
57+
58+
}

0 commit comments

Comments
 (0)