Skip to content

Commit 9af016c

Browse files
authored
Merge pull request #32 from esiqveland/refactor-nsqcommand
Refactor nsqcommand
2 parents c9dceaf + 3994e23 commit 9af016c

File tree

9 files changed

+150
-79
lines changed

9 files changed

+150
-79
lines changed

src/main/java/com/github/brainlag/nsq/Connection.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,9 @@ public Connection(final ServerAddress serverAddress, final NSQConfig config) thr
6868
channel.write(buf);
6969
channel.flush();
7070

71-
//indentify
72-
final NSQCommand ident = NSQCommand.instance("IDENTIFY", config.toString().getBytes());
71+
//identify
72+
final NSQCommand ident = NSQCommand.identify(config.toString().getBytes());
73+
7374
try {
7475
final NSQFrame response = commandAndWait(ident);
7576
if (response != null) {
@@ -147,7 +148,7 @@ public void incoming(final NSQFrame frame) {
147148

148149
private void heartbeat() {
149150
LogManager.getLogger(this).trace("HEARTBEAT!");
150-
command(NSQCommand.instance("NOP"));
151+
command(NSQCommand.nop());
151152
lastHeartbeatSuccess.getAndSet(System.currentTimeMillis());
152153
}
153154

Lines changed: 113 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,123 @@
11
package com.github.brainlag.nsq;
22

3+
import java.nio.charset.Charset;
34
import java.util.ArrayList;
45
import java.util.List;
56

67
public class NSQCommand {
8+
String line;
9+
List<byte[]> data = new ArrayList<>();
710

8-
public static NSQCommand instance(String line) {
9-
NSQCommand n = new NSQCommand();
10-
n.setLine(line);
11-
return n;
12-
}
13-
14-
public static NSQCommand instance(String line, byte[] bytes) {
15-
NSQCommand n = instance(line);
16-
n.addBytes(bytes);
17-
return n;
18-
}
19-
20-
String line;
21-
List<byte[]> data = new ArrayList<>();
22-
23-
public void addBytes(byte[] bytes) {
24-
data.add(bytes);
25-
}
26-
27-
public String getLine() {
28-
return line;
29-
}
30-
public void setLine(String line) {
31-
if (!line.endsWith("\n")) {
32-
line = line +"\n";
33-
}
34-
35-
this.line = line;
36-
}
37-
public List<byte[]> getData() {
38-
return data;
39-
}
40-
public void setData(List<byte[]> data) {
41-
this.data = data;
42-
}
43-
44-
public String toString() {
11+
private NSQCommand() { /** no instances */ }
12+
13+
public void addBytes(byte[] bytes) {
14+
data.add(bytes);
15+
}
16+
17+
public String getLine() {
18+
return line;
19+
}
20+
21+
public void setLine(String line) {
22+
if (!line.endsWith("\n")) {
23+
line = line + "\n";
24+
}
25+
26+
this.line = line;
27+
}
28+
29+
public List<byte[]> getData() {
30+
return data;
31+
}
32+
33+
public void setData(List<byte[]> data) {
34+
this.data = data;
35+
}
36+
37+
public String toString() {
4538
return this.getLine().trim();
4639
}
40+
41+
// ASCII stores a reference to the charset needed for commands
42+
public static final Charset ASCII = Charset.forName("ascii");
43+
44+
// Identify creates a new Command to provide information about the client. After connecting,
45+
// it is generally the first message sent.
46+
//
47+
// The supplied body should be a map marshaled into JSON to provide some flexibility
48+
// for this command to evolve over time.
49+
//
50+
// See http://nsq.io/clients/tcp_protocol_spec.html#identify for information
51+
// on the supported options
52+
public static NSQCommand identify(byte[] body) {
53+
return NSQCommand.instance("IDENTIFY", body);
54+
}
55+
56+
// Touch creates a new Command to reset the timeout for
57+
// a given message (by id)
58+
public static NSQCommand touch(byte[] messageID) {
59+
return NSQCommand.instance("TOUCH " + new String(messageID, ASCII));
60+
}
61+
62+
// Finish creates a new Command to indiciate that
63+
// a given message (by id) has been processed successfully
64+
public static NSQCommand finish(byte[] messageID) {
65+
return NSQCommand.instance("FIN " + new String(messageID, ASCII));
66+
}
67+
68+
// Subscribe creates a new Command to subscribe to the given topic/channel
69+
public static NSQCommand subscribe(String topic, String channel) {
70+
return NSQCommand.instance("SUB " + topic + " " + channel);
71+
}
72+
73+
// StartClose creates a new Command to indicate that the
74+
// client would like to start a close cycle. nsqd will no longer
75+
// send messages to a client in this state and the client is expected
76+
// finish pending messages and close the connection
77+
public static NSQCommand startClose() {
78+
return NSQCommand.instance("CLS");
79+
}
80+
81+
public static NSQCommand requeue(byte[] messageID, int timeoutMillis) {
82+
return NSQCommand.instance("REQ " + new String(messageID, ASCII) + " " + timeoutMillis);
83+
}
84+
85+
// Nop creates a new Command that has no effect server side.
86+
// Commonly used to respond to heartbeats
87+
public static NSQCommand nop() {
88+
return NSQCommand.instance("NOP");
89+
}
90+
91+
// Ready creates a new Command to specify
92+
// the number of messages a client is willing to receive
93+
public static NSQCommand ready(int rdy) {
94+
return NSQCommand.instance("RDY " + rdy);
95+
}
96+
97+
// Publish creates a new Command to write a message to a given topic
98+
public static NSQCommand publish(String topic, byte[] message) {
99+
return NSQCommand.instance("PUB " + topic, message);
100+
}
101+
102+
// MultiPublish creates a new Command to write more than one message to a given topic
103+
// (useful for high-throughput situations to avoid roundtrips and saturate the pipe)
104+
// Note: can only be used with more than 1 bodies!
105+
public static NSQCommand multiPublish(String topic, List<byte[]> bodies) {
106+
NSQCommand cmd = NSQCommand.instance("MPUB " + topic);
107+
cmd.setData(bodies);
108+
return cmd;
109+
}
110+
111+
private static NSQCommand instance(String line) {
112+
NSQCommand n = new NSQCommand();
113+
n.setLine(line);
114+
return n;
115+
}
116+
117+
private static NSQCommand instance(String line, byte[] bytes) {
118+
NSQCommand n = instance(line);
119+
n.addBytes(bytes);
120+
return n;
121+
}
122+
47123
}

src/main/java/com/github/brainlag/nsq/NSQConsumer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ private Connection createConnection(final ServerAddress serverAddress) {
8383

8484
connection.setConsumer(this);
8585
connection.setErrorCallback(errorCallback);
86-
connection.command(NSQCommand.instance("SUB " + topic + " " + this.channel));
87-
connection.command(NSQCommand.instance("RDY " + messagesPerBatch));
86+
connection.command(NSQCommand.subscribe(topic, channel));
87+
connection.command(NSQCommand.ready(messagesPerBatch));
8888

8989
return connection;
9090
} catch (final NoConnectionsException e) {
@@ -131,7 +131,7 @@ private void updateTimeout(final NSQMessage message, long change) {
131131
}
132132

133133
private void rdy(final NSQMessage message, int size) {
134-
message.getConnection().command(NSQCommand.instance("RDY " + size));
134+
message.getConnection().command(NSQCommand.ready(size));
135135
}
136136

137137
private Date calculateTimeoutDate(final long i) {
@@ -150,7 +150,7 @@ public void shutdown() {
150150
}
151151

152152
private void cleanClose() {
153-
final NSQCommand command = NSQCommand.instance("CLS");
153+
final NSQCommand command = NSQCommand.startClose();
154154
try {
155155
for (final Connection connection : connections.values()) {
156156
final NSQFrame frame = connection.commandAndWait(command);

src/main/java/com/github/brainlag/nsq/NSQMessage.java

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
package com.github.brainlag.nsq;
22

3-
import org.apache.logging.log4j.LogManager;
4-
5-
import java.io.UnsupportedEncodingException;
63
import java.util.Date;
74

85
public class NSQMessage {
@@ -17,30 +14,18 @@ public class NSQMessage {
1714
* Finished processing this message, let nsq know so it doesnt get reprocessed.
1815
*/
1916
public void finished() {
20-
try {
21-
connection.command(NSQCommand.instance("FIN " + new String(id, "ascii")));
22-
} catch (UnsupportedEncodingException e) {
23-
LogManager.getLogger(this).error("ASCII charset is not supported by your JVM?", e);
24-
}
17+
connection.command(NSQCommand.finish(this.id));
2518
}
2619

2720
public void touch() {
28-
try {
29-
connection.command(NSQCommand.instance("TOUCH " + new String(id, "ascii")));
30-
} catch (UnsupportedEncodingException e) {
31-
LogManager.getLogger(this).error("ASCII charset is not supported by your JVM?", e);
32-
}
21+
connection.command(NSQCommand.touch(this.id));
3322
}
3423

3524
/**
3625
* indicates a problem with processing, puts it back on the queue.
3726
*/
3827
public void requeue(int timeoutMillis) {
39-
try {
40-
connection.command(NSQCommand.instance("REQ " + new String(id, "ascii") + " " + timeoutMillis));
41-
} catch (UnsupportedEncodingException e) {
42-
LogManager.getLogger(this).error("ASCII charset is not supported by your JVM?", e);
43-
}
28+
connection.command(NSQCommand.requeue(this.id, timeoutMillis));
4429
}
4530

4631
public void requeue() {

src/main/java/com/github/brainlag/nsq/NSQProducer.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,7 @@ public void produceMulti(String topic, List<byte[]> messages) throws TimeoutExce
8585

8686
Connection c = this.getConnection();
8787
try {
88-
NSQCommand command = NSQCommand.instance("MPUB " + topic);
89-
command.setData(messages);
88+
NSQCommand command = NSQCommand.multiPublish(topic, messages);
9089

9190

9291
NSQFrame frame = c.commandAndWait(command);
@@ -110,7 +109,7 @@ public void produce(String topic, byte[] message) throws NSQException, TimeoutEx
110109
}
111110
Connection c = getConnection();
112111
try {
113-
NSQCommand command = NSQCommand.instance("PUB " + topic, message);
112+
NSQCommand command = NSQCommand.publish(topic, message);
114113
NSQFrame frame = c.commandAndWait(command);
115114
if (frame != null && frame instanceof ErrorFrame) {
116115
String err = ((ErrorFrame) frame).getErrorMessage();

src/main/java/com/github/brainlag/nsq/lookup/DefaultNSQLookup.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,15 @@
1414

1515
public class DefaultNSQLookup implements NSQLookup {
1616
Set<String> addresses = Sets.newHashSet();
17+
private final ObjectMapper mapper;
18+
19+
public DefaultNSQLookup() {
20+
this(new ObjectMapper());
21+
}
22+
23+
public DefaultNSQLookup(ObjectMapper mapper) {
24+
this.mapper = mapper;
25+
}
1726

1827
@Override
1928
public void addLookupAddress(String addr, int port) {
@@ -30,7 +39,6 @@ public Set<ServerAddress> lookup(String topic) {
3039

3140
for (String addr : getLookupAddresses()) {
3241
try {
33-
ObjectMapper mapper = new ObjectMapper();
3442
String topicEncoded = URLEncoder.encode(topic, Charsets.UTF_8.name());
3543
JsonNode jsonNode = mapper.readTree(new URL(addr + "/lookup?topic=" + topicEncoded));
3644
LogManager.getLogger(this).debug("Server connection information: {}", jsonNode);

src/main/java/com/github/brainlag/nsq/pool/ConnectionPoolFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public PooledObject<Connection> wrap(final Connection con) {
3030

3131
@Override
3232
public boolean validateObject(final ServerAddress key, final PooledObject<Connection> p) {
33-
ChannelFuture command = p.getObject().command(NSQCommand.instance("NOP"));
33+
ChannelFuture command = p.getObject().command(NSQCommand.nop());
3434
return command.awaitUninterruptibly().isSuccess();
3535
}
3636

src/test/java/com/github/brainlag/nsq/NSQConsumerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public class NSQConsumerTest {
2121
@Test
2222
public void testLongRunningConsumer() throws NSQException, TimeoutException, InterruptedException {
2323
AtomicInteger counter = new AtomicInteger(0);
24-
NSQLookup lookup = new DefaultNSQLookup();
24+
NSQLookup lookup = new DefaultNSQLookup(mapper);
2525
lookup.addLookupAddress(Nsq.getNsqLookupdHost(), 4161);
2626

2727
NSQConsumer consumer = new NSQConsumer(lookup, "test1", "testconsumer", (message) -> {

0 commit comments

Comments
 (0)