Skip to content

Commit 19f4f83

Browse files
author
Eivind Siqveland Larsen
committed
Refactor NSQCommand creation
1 parent c9dceaf commit 19f4f83

File tree

6 files changed

+125
-66
lines changed

6 files changed

+125
-66
lines changed

src/main/java/com/github/brainlag/nsq/Connection.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,9 @@ public Connection(final ServerAddress serverAddress, final NSQConfig config) thr
6868
channel.write(buf);
6969
channel.flush();
7070

71-
//indentify
72-
final NSQCommand ident = NSQCommand.instance("IDENTIFY", config.toString().getBytes());
71+
//identify
72+
final NSQCommand ident = NSQCommand.identify(config.toString().getBytes());
73+
7374
try {
7475
final NSQFrame response = commandAndWait(ident);
7576
if (response != null) {
@@ -147,7 +148,7 @@ public void incoming(final NSQFrame frame) {
147148

148149
private void heartbeat() {
149150
LogManager.getLogger(this).trace("HEARTBEAT!");
150-
command(NSQCommand.instance("NOP"));
151+
command(NSQCommand.nop());
151152
lastHeartbeatSuccess.getAndSet(System.currentTimeMillis());
152153
}
153154

Lines changed: 111 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,121 @@
11
package com.github.brainlag.nsq;
22

3+
import java.nio.charset.Charset;
34
import java.util.ArrayList;
45
import java.util.List;
56

67
public class NSQCommand {
8+
String line;
9+
List<byte[]> data = new ArrayList<>();
710

8-
public static NSQCommand instance(String line) {
9-
NSQCommand n = new NSQCommand();
10-
n.setLine(line);
11-
return n;
12-
}
13-
14-
public static NSQCommand instance(String line, byte[] bytes) {
15-
NSQCommand n = instance(line);
16-
n.addBytes(bytes);
17-
return n;
18-
}
19-
20-
String line;
21-
List<byte[]> data = new ArrayList<>();
22-
23-
public void addBytes(byte[] bytes) {
24-
data.add(bytes);
25-
}
26-
27-
public String getLine() {
28-
return line;
29-
}
30-
public void setLine(String line) {
31-
if (!line.endsWith("\n")) {
32-
line = line +"\n";
33-
}
34-
35-
this.line = line;
36-
}
37-
public List<byte[]> getData() {
38-
return data;
39-
}
40-
public void setData(List<byte[]> data) {
41-
this.data = data;
42-
}
43-
44-
public String toString() {
11+
public void addBytes(byte[] bytes) {
12+
data.add(bytes);
13+
}
14+
15+
public String getLine() {
16+
return line;
17+
}
18+
19+
public void setLine(String line) {
20+
if (!line.endsWith("\n")) {
21+
line = line + "\n";
22+
}
23+
24+
this.line = line;
25+
}
26+
27+
public List<byte[]> getData() {
28+
return data;
29+
}
30+
31+
public void setData(List<byte[]> data) {
32+
this.data = data;
33+
}
34+
35+
public String toString() {
4536
return this.getLine().trim();
4637
}
38+
39+
// ASCII stores a reference to the charset needed for commands
40+
public static final Charset ASCII = Charset.forName("ascii");
41+
42+
// Identify creates a new Command to provide information about the client. After connecting,
43+
// it is generally the first message sent.
44+
//
45+
// The supplied body should be a map marshaled into JSON to provide some flexibility
46+
// for this command to evolve over time.
47+
//
48+
// See http://nsq.io/clients/tcp_protocol_spec.html#identify for information
49+
// on the supported options
50+
public static NSQCommand identify(byte[] body) {
51+
return NSQCommand.instance("IDENTIFY", body);
52+
}
53+
54+
// Touch creates a new Command to reset the timeout for
55+
// a given message (by id)
56+
public static NSQCommand touch(byte[] messageID) {
57+
return NSQCommand.instance("TOUCH " + new String(messageID, ASCII));
58+
}
59+
60+
// Finish creates a new Command to indiciate that
61+
// a given message (by id) has been processed successfully
62+
public static NSQCommand finish(byte[] messageID) {
63+
return NSQCommand.instance("FIN " + new String(messageID, ASCII));
64+
}
65+
66+
// Subscribe creates a new Command to subscribe to the given topic/channel
67+
public static NSQCommand subscribe(String topic, String channel) {
68+
return NSQCommand.instance("SUB " + topic + " " + channel);
69+
}
70+
71+
// StartClose creates a new Command to indicate that the
72+
// client would like to start a close cycle. nsqd will no longer
73+
// send messages to a client in this state and the client is expected
74+
// finish pending messages and close the connection
75+
public static NSQCommand startClose() {
76+
return NSQCommand.instance("CLS");
77+
}
78+
79+
public static NSQCommand requeue(byte[] messageID, int timeoutMillis) {
80+
return NSQCommand.instance("REQ " + new String(messageID, ASCII) + " " + timeoutMillis);
81+
}
82+
83+
// Nop creates a new Command that has no effect server side.
84+
// Commonly used to respond to heartbeats
85+
public static NSQCommand nop() {
86+
return NSQCommand.instance("NOP");
87+
}
88+
89+
// Ready creates a new Command to specify
90+
// the number of messages a client is willing to receive
91+
public static NSQCommand ready(int rdy) {
92+
return NSQCommand.instance("RDY " + rdy);
93+
}
94+
95+
// Publish creates a new Command to write a message to a given topic
96+
public static NSQCommand publish(String topic, byte[] message) {
97+
return NSQCommand.instance("PUB " + topic, message);
98+
}
99+
100+
// MultiPublish creates a new Command to write more than one message to a given topic
101+
// (useful for high-throughput situations to avoid roundtrips and saturate the pipe)
102+
// Note: can only be used with more than 1 bodies!
103+
public static NSQCommand multiPublish(String topic, List<byte[]> bodies) {
104+
NSQCommand cmd = NSQCommand.instance("MPUB " + topic);
105+
cmd.setData(bodies);
106+
return cmd;
107+
}
108+
109+
public static NSQCommand instance(String line) {
110+
NSQCommand n = new NSQCommand();
111+
n.setLine(line);
112+
return n;
113+
}
114+
115+
public static NSQCommand instance(String line, byte[] bytes) {
116+
NSQCommand n = instance(line);
117+
n.addBytes(bytes);
118+
return n;
119+
}
120+
47121
}

src/main/java/com/github/brainlag/nsq/NSQConsumer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ private Connection createConnection(final ServerAddress serverAddress) {
8383

8484
connection.setConsumer(this);
8585
connection.setErrorCallback(errorCallback);
86-
connection.command(NSQCommand.instance("SUB " + topic + " " + this.channel));
87-
connection.command(NSQCommand.instance("RDY " + messagesPerBatch));
86+
connection.command(NSQCommand.subscribe(topic, channel));
87+
connection.command(NSQCommand.ready(messagesPerBatch));
8888

8989
return connection;
9090
} catch (final NoConnectionsException e) {
@@ -131,7 +131,7 @@ private void updateTimeout(final NSQMessage message, long change) {
131131
}
132132

133133
private void rdy(final NSQMessage message, int size) {
134-
message.getConnection().command(NSQCommand.instance("RDY " + size));
134+
message.getConnection().command(NSQCommand.ready(size));
135135
}
136136

137137
private Date calculateTimeoutDate(final long i) {
@@ -150,7 +150,7 @@ public void shutdown() {
150150
}
151151

152152
private void cleanClose() {
153-
final NSQCommand command = NSQCommand.instance("CLS");
153+
final NSQCommand command = NSQCommand.startClose();
154154
try {
155155
for (final Connection connection : connections.values()) {
156156
final NSQFrame frame = connection.commandAndWait(command);

src/main/java/com/github/brainlag/nsq/NSQMessage.java

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
package com.github.brainlag.nsq;
22

3-
import org.apache.logging.log4j.LogManager;
4-
5-
import java.io.UnsupportedEncodingException;
63
import java.util.Date;
74

85
public class NSQMessage {
@@ -17,30 +14,18 @@ public class NSQMessage {
1714
* Finished processing this message, let nsq know so it doesnt get reprocessed.
1815
*/
1916
public void finished() {
20-
try {
21-
connection.command(NSQCommand.instance("FIN " + new String(id, "ascii")));
22-
} catch (UnsupportedEncodingException e) {
23-
LogManager.getLogger(this).error("ASCII charset is not supported by your JVM?", e);
24-
}
17+
connection.command(NSQCommand.finish(this.id));
2518
}
2619

2720
public void touch() {
28-
try {
29-
connection.command(NSQCommand.instance("TOUCH " + new String(id, "ascii")));
30-
} catch (UnsupportedEncodingException e) {
31-
LogManager.getLogger(this).error("ASCII charset is not supported by your JVM?", e);
32-
}
21+
connection.command(NSQCommand.touch(this.id));
3322
}
3423

3524
/**
3625
* indicates a problem with processing, puts it back on the queue.
3726
*/
3827
public void requeue(int timeoutMillis) {
39-
try {
40-
connection.command(NSQCommand.instance("REQ " + new String(id, "ascii") + " " + timeoutMillis));
41-
} catch (UnsupportedEncodingException e) {
42-
LogManager.getLogger(this).error("ASCII charset is not supported by your JVM?", e);
43-
}
28+
connection.command(NSQCommand.requeue(this.id, timeoutMillis));
4429
}
4530

4631
public void requeue() {

src/main/java/com/github/brainlag/nsq/NSQProducer.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,7 @@ public void produceMulti(String topic, List<byte[]> messages) throws TimeoutExce
8585

8686
Connection c = this.getConnection();
8787
try {
88-
NSQCommand command = NSQCommand.instance("MPUB " + topic);
89-
command.setData(messages);
88+
NSQCommand command = NSQCommand.multiPublish(topic, messages);
9089

9190

9291
NSQFrame frame = c.commandAndWait(command);
@@ -110,7 +109,7 @@ public void produce(String topic, byte[] message) throws NSQException, TimeoutEx
110109
}
111110
Connection c = getConnection();
112111
try {
113-
NSQCommand command = NSQCommand.instance("PUB " + topic, message);
112+
NSQCommand command = NSQCommand.publish(topic, message);
114113
NSQFrame frame = c.commandAndWait(command);
115114
if (frame != null && frame instanceof ErrorFrame) {
116115
String err = ((ErrorFrame) frame).getErrorMessage();

src/main/java/com/github/brainlag/nsq/pool/ConnectionPoolFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public PooledObject<Connection> wrap(final Connection con) {
3030

3131
@Override
3232
public boolean validateObject(final ServerAddress key, final PooledObject<Connection> p) {
33-
ChannelFuture command = p.getObject().command(NSQCommand.instance("NOP"));
33+
ChannelFuture command = p.getObject().command(NSQCommand.nop());
3434
return command.awaitUninterruptibly().isSuccess();
3535
}
3636

0 commit comments

Comments
 (0)